summaryrefslogtreecommitdiff
path: root/src/api/server_server.rs
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2022-10-05 20:34:31 +0200
committerNyaaori <+@nyaaori.cat>2022-10-10 14:02:01 +0200
commita4637e2ba1093065a6fda3fa2ad2b2b9f30eea63 (patch)
tree2d31313957d699875fc61f570686318b523ae0f1 /src/api/server_server.rs
parent33a2b2b7729bb40253fd174d99ad773869b5ecfe (diff)
downloadconduit-a4637e2ba1093065a6fda3fa2ad2b2b9f30eea63.zip
cargo fmt
Diffstat (limited to 'src/api/server_server.rs')
-rw-r--r--src/api/server_server.rs388
1 files changed, 266 insertions, 122 deletions
diff --git a/src/api/server_server.rs b/src/api/server_server.rs
index 11f7ec3..dba4489 100644
--- a/src/api/server_server.rs
+++ b/src/api/server_server.rs
@@ -1,6 +1,7 @@
use crate::{
api::client_server::{self, claim_keys_helper, get_keys_helper},
- utils, Error, PduEvent, Result, Ruma, services, service::pdu::{gen_event_id_canonical_json, PduBuilder},
+ service::pdu::{gen_event_id_canonical_json, PduBuilder},
+ services, utils, Error, PduEvent, Result, Ruma,
};
use axum::{response::IntoResponse, Json};
use futures_util::{stream::FuturesUnordered, StreamExt};
@@ -138,7 +139,8 @@ where
let mut write_destination_to_cache = false;
- let cached_result = services().globals
+ let cached_result = services()
+ .globals
.actual_destination_cache
.read()
.unwrap()
@@ -191,7 +193,10 @@ where
.to_string()
.into(),
);
- request_map.insert("origin".to_owned(), services().globals.server_name().as_str().into());
+ request_map.insert(
+ "origin".to_owned(),
+ services().globals.server_name().as_str().into(),
+ );
request_map.insert("destination".to_owned(), destination.as_str().into());
let mut request_json =
@@ -238,7 +243,11 @@ where
let url = reqwest_request.url().clone();
- let response = services().globals.federation_client().execute(reqwest_request).await;
+ let response = services()
+ .globals
+ .federation_client()
+ .execute(reqwest_request)
+ .await;
match response {
Ok(mut response) => {
@@ -278,10 +287,15 @@ where
if status == 200 {
let response = T::IncomingResponse::try_from_http_response(http_response);
if response.is_ok() && write_destination_to_cache {
- services().globals.actual_destination_cache.write().unwrap().insert(
- Box::<ServerName>::from(destination),
- (actual_destination, host),
- );
+ services()
+ .globals
+ .actual_destination_cache
+ .write()
+ .unwrap()
+ .insert(
+ Box::<ServerName>::from(destination),
+ (actual_destination, host),
+ );
}
response.map_err(|e| {
@@ -329,9 +343,7 @@ fn add_port_to_hostname(destination_str: &str) -> FedDest {
/// Returns: actual_destination, host header
/// Implemented according to the specification at https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names
/// Numbers in comments below refer to bullet points in linked section of specification
-async fn find_actual_destination(
- destination: &'_ ServerName,
-) -> (FedDest, FedDest) {
+async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
let destination_str = destination.as_str().to_owned();
let mut hostname = destination_str.clone();
let actual_destination = match get_ip_with_port(&destination_str) {
@@ -364,18 +376,24 @@ async fn find_actual_destination(
// 3.3: SRV lookup successful
let force_port = hostname_override.port();
- if let Ok(override_ip) = services().globals
+ if let Ok(override_ip) = services()
+ .globals
.dns_resolver()
.lookup_ip(hostname_override.hostname())
.await
{
- services().globals.tls_name_override.write().unwrap().insert(
- delegated_hostname.clone(),
- (
- override_ip.iter().collect(),
- force_port.unwrap_or(8448),
- ),
- );
+ services()
+ .globals
+ .tls_name_override
+ .write()
+ .unwrap()
+ .insert(
+ delegated_hostname.clone(),
+ (
+ override_ip.iter().collect(),
+ force_port.unwrap_or(8448),
+ ),
+ );
} else {
warn!("Using SRV record, but could not resolve to IP");
}
@@ -400,15 +418,24 @@ async fn find_actual_destination(
Some(hostname_override) => {
let force_port = hostname_override.port();
- if let Ok(override_ip) = services().globals
+ if let Ok(override_ip) = services()
+ .globals
.dns_resolver()
.lookup_ip(hostname_override.hostname())
.await
{
- services().globals.tls_name_override.write().unwrap().insert(
- hostname.clone(),
- (override_ip.iter().collect(), force_port.unwrap_or(8448)),
- );
+ services()
+ .globals
+ .tls_name_override
+ .write()
+ .unwrap()
+ .insert(
+ hostname.clone(),
+ (
+ override_ip.iter().collect(),
+ force_port.unwrap_or(8448),
+ ),
+ );
} else {
warn!("Using SRV record, but could not resolve to IP");
}
@@ -443,10 +470,9 @@ async fn find_actual_destination(
(actual_destination, hostname)
}
-async fn query_srv_record(
- hostname: &'_ str,
-) -> Option<FedDest> {
- if let Ok(Some(host_port)) = services().globals
+async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
+ if let Ok(Some(host_port)) = services()
+ .globals
.dns_resolver()
.srv_lookup(format!("_matrix._tcp.{}", hostname))
.await
@@ -465,11 +491,10 @@ async fn query_srv_record(
}
}
-async fn request_well_known(
- destination: &str,
-) -> Option<String> {
+async fn request_well_known(destination: &str) -> Option<String> {
let body: serde_json::Value = serde_json::from_str(
- &services().globals
+ &services()
+ .globals
.default_client()
.get(&format!(
"https://{}/.well-known/matrix/server",
@@ -664,15 +689,22 @@ pub async fn send_transaction_message_route(
Some(id) => id,
None => {
// Event is invalid
- resolved_map.insert(event_id, Err(Error::bad_database("Event needs a valid RoomId.")));
+ resolved_map.insert(
+ event_id,
+ Err(Error::bad_database("Event needs a valid RoomId.")),
+ );
continue;
}
};
- services().rooms.event_handler.acl_check(&sender_servername, &room_id)?;
+ services()
+ .rooms
+ .event_handler
+ .acl_check(&sender_servername, &room_id)?;
let mutex = Arc::clone(
- services().globals
+ services()
+ .globals
.roomid_mutex_federation
.write()
.unwrap()
@@ -683,16 +715,19 @@ pub async fn send_transaction_message_route(
let start_time = Instant::now();
resolved_map.insert(
event_id.clone(),
- services().rooms.event_handler.handle_incoming_pdu(
- &sender_servername,
- &event_id,
- &room_id,
- value,
- true,
- &pub_key_map,
- )
- .await
- .map(|_| ()),
+ services()
+ .rooms
+ .event_handler
+ .handle_incoming_pdu(
+ &sender_servername,
+ &event_id,
+ &room_id,
+ value,
+ true,
+ &pub_key_map,
+ )
+ .await
+ .map(|_| ()),
);
drop(mutex_lock);
@@ -727,7 +762,13 @@ pub async fn send_transaction_message_route(
.event_ids
.iter()
.filter_map(|id| {
- services().rooms.timeline.get_pdu_count(id).ok().flatten().map(|r| (id, r))
+ services()
+ .rooms
+ .timeline
+ .get_pdu_count(id)
+ .ok()
+ .flatten()
+ .map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
@@ -744,11 +785,11 @@ pub async fn send_transaction_message_route(
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
- services().rooms.edus.read_receipt.readreceipt_update(
- &user_id,
- &room_id,
- event,
- )?;
+ services()
+ .rooms
+ .edus
+ .read_receipt
+ .readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
info!("No known event ids in read receipt: {:?}", user_updates);
@@ -757,7 +798,11 @@ pub async fn send_transaction_message_route(
}
}
Edu::Typing(typing) => {
- if services().rooms.state_cache.is_joined(&typing.user_id, &typing.room_id)? {
+ if services()
+ .rooms
+ .state_cache
+ .is_joined(&typing.user_id, &typing.room_id)?
+ {
if typing.typing {
services().rooms.edus.typing.typing_add(
&typing.user_id,
@@ -765,16 +810,16 @@ pub async fn send_transaction_message_route(
3000 + utils::millis_since_unix_epoch(),
)?;
} else {
- services().rooms.edus.typing.typing_remove(
- &typing.user_id,
- &typing.room_id,
- )?;
+ services()
+ .rooms
+ .edus
+ .typing
+ .typing_remove(&typing.user_id, &typing.room_id)?;
}
}
}
Edu::DeviceListUpdate(DeviceListUpdateContent { user_id, .. }) => {
- services().users
- .mark_device_key_update(&user_id)?;
+ services().users.mark_device_key_update(&user_id)?;
}
Edu::DirectToDevice(DirectDeviceContent {
sender,
@@ -810,7 +855,9 @@ pub async fn send_transaction_message_route(
}
DeviceIdOrAllDevices::AllDevices => {
- for target_device_id in services().users.all_device_ids(target_user_id) {
+ for target_device_id in
+ services().users.all_device_ids(target_user_id)
+ {
services().users.add_to_device_event(
&sender,
target_user_id,
@@ -830,7 +877,8 @@ pub async fn send_transaction_message_route(
}
// Save transaction id with empty data
- services().transaction_ids
+ services()
+ .transaction_ids
.add_txnid(&sender, None, &message_id, &[])?;
}
Edu::SigningKeyUpdate(SigningKeyUpdateContent {
@@ -854,7 +902,12 @@ pub async fn send_transaction_message_route(
}
}
- Ok(send_transaction_message::v1::Response { pdus: resolved_map.into_iter().map(|(e, r)| (e, r.map_err(|e| e.to_string()))).collect() })
+ Ok(send_transaction_message::v1::Response {
+ pdus: resolved_map
+ .into_iter()
+ .map(|(e, r)| (e, r.map_err(|e| e.to_string())))
+ .collect(),
+ })
}
/// # `GET /_matrix/federation/v1/event/{eventId}`
@@ -875,7 +928,8 @@ pub async fn get_event_route(
.expect("server is authenticated");
let event = services()
- .rooms.timeline
+ .rooms
+ .timeline
.get_pdu_json(&body.event_id)?
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
@@ -887,7 +941,11 @@ pub async fn get_event_route(
let room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
- if !services().rooms.state_cache.server_in_room(sender_servername, room_id)? {
+ if !services()
+ .rooms
+ .state_cache
+ .server_in_room(sender_servername, room_id)?
+ {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room",
@@ -916,14 +974,21 @@ pub async fn get_missing_events_route(
.as_ref()
.expect("server is authenticated");
- if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
+ if !services()
+ .rooms
+ .state_cache
+ .server_in_room(sender_servername, &body.room_id)?
+ {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room",
));
}
- services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
+ services()
+ .rooms
+ .event_handler
+ .acl_check(&sender_servername, &body.room_id)?;
let mut queued_events = body.latest_events.clone();
let mut events = Vec::new();
@@ -988,17 +1053,25 @@ pub async fn get_event_authorization_route(
.as_ref()
.expect("server is authenticated");
- if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
+ if !services()
+ .rooms
+ .state_cache
+ .server_in_room(sender_servername, &body.room_id)?
+ {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
- services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
+ services()
+ .rooms
+ .event_handler
+ .acl_check(&sender_servername, &body.room_id)?;
let event = services()
- .rooms.timeline
+ .rooms
+ .timeline
.get_pdu_json(&body.event_id)?
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
@@ -1010,7 +1083,11 @@ pub async fn get_event_authorization_route(
let room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
- let auth_chain_ids = services().rooms.auth_chain.get_auth_chain(room_id, vec![Arc::from(&*body.event_id)]).await?;
+ let auth_chain_ids = services()
+ .rooms
+ .auth_chain
+ .get_auth_chain(room_id, vec![Arc::from(&*body.event_id)])
+ .await?;
Ok(get_event_authorization::v1::Response {
auth_chain: auth_chain_ids
@@ -1035,17 +1112,25 @@ pub async fn get_room_state_route(
.as_ref()
.expect("server is authenticated");
- if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
+ if !services()
+ .rooms
+ .state_cache
+ .server_in_room(sender_servername, &body.room_id)?
+ {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
- services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
+ services()
+ .rooms
+ .event_handler
+ .acl_check(&sender_servername, &body.room_id)?;
let shortstatehash = services()
- .rooms.state_accessor
+ .rooms
+ .state_accessor
.pdu_shortstatehash(&body.event_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
@@ -1053,26 +1138,39 @@ pub async fn get_room_state_route(
))?;
let pdus = services()
- .rooms.state_accessor
+ .rooms
+ .state_accessor
.state_full_ids(shortstatehash)
.await?
.into_iter()
.map(|(_, id)| {
PduEvent::convert_to_outgoing_federation_event(
- services().rooms.timeline.get_pdu_json(&id).unwrap().unwrap(),
+ services()
+ .rooms
+ .timeline
+ .get_pdu_json(&id)
+ .unwrap()
+ .unwrap(),
)
})
.collect();
- let auth_chain_ids =
- services().rooms.auth_chain.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?;
+ let auth_chain_ids = services()
+ .rooms
+ .auth_chain
+ .get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)])
+ .await?;
Ok(get_room_state::v1::Response {
auth_chain: auth_chain_ids
.map(|id| {
- services().rooms.timeline.get_pdu_json(&id).map(|maybe_json| {
- PduEvent::convert_to_outgoing_federation_event(maybe_json.unwrap())
- })
+ services()
+ .rooms
+ .timeline
+ .get_pdu_json(&id)
+ .map(|maybe_json| {
+ PduEvent::convert_to_outgoing_federation_event(maybe_json.unwrap())
+ })
})
.filter_map(|r| r.ok())
.collect(),
@@ -1095,17 +1193,25 @@ pub async fn get_room_state_ids_route(
.as_ref()
.expect("server is authenticated");
- if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
+ if !services()
+ .rooms
+ .state_cache
+ .server_in_room(sender_servername, &body.room_id)?
+ {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
- services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
+ services()
+ .rooms
+ .event_handler
+ .acl_check(&sender_servername, &body.room_id)?;
let shortstatehash = services()
- .rooms.state_accessor
+ .rooms
+ .state_accessor
.pdu_shortstatehash(&body.event_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
@@ -1113,15 +1219,19 @@ pub async fn get_room_state_ids_route(
))?;
let pdu_ids = services()
- .rooms.state_accessor
+ .rooms
+ .state_accessor
.state_full_ids(shortstatehash)
.await?
.into_iter()
.map(|(_, id)| (*id).to_owned())
.collect();
- let auth_chain_ids =
- services().rooms.auth_chain.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?;
+ let auth_chain_ids = services()
+ .rooms
+ .auth_chain
+ .get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)])
+ .await?;
Ok(get_room_state_ids::v1::Response {
auth_chain_ids: auth_chain_ids.map(|id| (*id).to_owned()).collect(),
@@ -1151,10 +1261,14 @@ pub async fn create_join_event_template_route(
.as_ref()
.expect("server is authenticated");
- services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
+ services()
+ .rooms
+ .event_handler
+ .acl_check(&sender_servername, &body.room_id)?;
let mutex_state = Arc::clone(
- services().globals
+ services()
+ .globals
.roomid_mutex_state
.write()
.unwrap()
@@ -1164,9 +1278,11 @@ pub async fn create_join_event_template_route(
let state_lock = mutex_state.lock().await;
// TODO: Conduit does not implement restricted join rules yet, we always reject
- let join_rules_event =
- services().rooms.state_accessor
- .room_state_get(&body.room_id, &StateEventType::RoomJoinRules, "")?;
+ let join_rules_event = services().rooms.state_accessor.room_state_get(
+ &body.room_id,
+ &StateEventType::RoomJoinRules,
+ "",
+ )?;
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
.as_ref()
@@ -1212,13 +1328,18 @@ pub async fn create_join_event_template_route(
})
.expect("member event is valid value");
- let (pdu, pdu_json) = services().rooms.timeline.create_hash_and_sign_event(PduBuilder {
- event_type: RoomEventType::RoomMember,
- content,
- unsigned: None,
- state_key: Some(body.user_id.to_string()),
- redacts: None,
- }, &body.user_id, &body.room_id, &state_lock)?;
+ let (pdu, pdu_json) = services().rooms.timeline.create_hash_and_sign_event(
+ PduBuilder {
+ event_type: RoomEventType::RoomMember,
+ content,
+ unsigned: None,
+ state_key: Some(body.user_id.to_string()),
+ redacts: None,
+ },
+ &body.user_id,
+ &body.room_id,
+ &state_lock,
+ )?;
drop(state_lock);
@@ -1244,12 +1365,17 @@ async fn create_join_event(
));
}
- services().rooms.event_handler.acl_check(&sender_servername, room_id)?;
+ services()
+ .rooms
+ .event_handler
+ .acl_check(&sender_servername, room_id)?;
// TODO: Conduit does not implement restricted join rules yet, we always reject
- let join_rules_event = services()
- .rooms.state_accessor
- .room_state_get(room_id, &StateEventType::RoomJoinRules, "")?;
+ let join_rules_event = services().rooms.state_accessor.room_state_get(
+ room_id,
+ &StateEventType::RoomJoinRules,
+ "",
+ )?;
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
.as_ref()
@@ -1275,7 +1401,8 @@ async fn create_join_event(
// We need to return the state prior to joining, let's keep a reference to that here
let shortstatehash = services()
- .rooms.state
+ .rooms
+ .state
.get_room_shortstatehash(room_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
@@ -1307,7 +1434,8 @@ async fn create_join_event(
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?;
let mutex = Arc::clone(
- services().globals
+ services()
+ .globals
.roomid_mutex_federation
.write()
.unwrap()
@@ -1315,7 +1443,10 @@ async fn create_join_event(
.or_default(),
);
let mutex_lock = mutex.lock().await;
- let pdu_id: Vec<u8> = services().rooms.event_handler.handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map)
+ let pdu_id: Vec<u8> = services()
+ .rooms
+ .event_handler
+ .handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map)
.await?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
@@ -1323,12 +1454,19 @@ async fn create_join_event(
))?;
drop(mutex_lock);
- let state_ids = services().rooms.state_accessor.state_full_ids(shortstatehash).await?;
- let auth_chain_ids = services().rooms.auth_chain.get_auth_chain(
- room_id,
- state_ids.iter().map(|(_, id)| id.clone()).collect(),
- )
- .await?;
+ let state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(shortstatehash)
+ .await?;
+ let auth_chain_ids = services()
+ .rooms
+ .auth_chain
+ .get_auth_chain(
+ room_id,
+ state_ids.iter().map(|(_, id)| id.clone()).collect(),
+ )
+ .await?;
let servers = services()
.rooms
@@ -1399,9 +1537,16 @@ pub async fn create_invite_route(
.as_ref()
.expect("server is authenticated");
- services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
+ services()
+ .rooms
+ .event_handler
+ .acl_check(&sender_servername, &body.room_id)?;
- if !services().globals.supported_room_versions().contains(&body.room_version) {
+ if !services()
+ .globals
+ .supported_room_versions()
+ .contains(&body.room_version)
+ {
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion {
room_version: body.room_version.clone(),
@@ -1549,7 +1694,8 @@ pub async fn get_room_information_route(
let room_id = services()
.rooms
- .alias.resolve_local_alias(&body.room_alias)?
+ .alias
+ .resolve_local_alias(&body.room_alias)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"Room alias not found.",
@@ -1576,7 +1722,9 @@ pub async fn get_profile_information_route(
let mut blurhash = None;
match &body.field {
- Some(ProfileField::DisplayName) => displayname = services().users.displayname(&body.user_id)?,
+ Some(ProfileField::DisplayName) => {
+ displayname = services().users.displayname(&body.user_id)?
+ }
Some(ProfileField::AvatarUrl) => {
avatar_url = services().users.avatar_url(&body.user_id)?;
blurhash = services().users.blurhash(&body.user_id)?
@@ -1600,18 +1748,14 @@ pub async fn get_profile_information_route(
/// # `POST /_matrix/federation/v1/user/keys/query`
///
/// Gets devices and identity keys for the given users.
-pub async fn get_keys_route(
- body: Ruma<get_keys::v1::Request>,
-) -> Result<get_keys::v1::Response> {
+pub async fn get_keys_route(body: Ruma<get_keys::v1::Request>) -> Result<get_keys::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
- let result = get_keys_helper(
- None,
- &body.device_keys,
- |u| Some(u.server_name()) == body.sender_servername.as_deref(),
- )
+ let result = get_keys_helper(None, &body.device_keys, |u| {
+ Some(u.server_name()) == body.sender_servername.as_deref()
+ })
.await?;
Ok(get_keys::v1::Response {