diff options
author | Timo Kösters <timo@koesters.xyz> | 2022-10-05 20:34:31 +0200 |
---|---|---|
committer | Nyaaori <+@nyaaori.cat> | 2022-10-10 14:02:01 +0200 |
commit | a4637e2ba1093065a6fda3fa2ad2b2b9f30eea63 (patch) | |
tree | 2d31313957d699875fc61f570686318b523ae0f1 /src/api/server_server.rs | |
parent | 33a2b2b7729bb40253fd174d99ad773869b5ecfe (diff) | |
download | conduit-a4637e2ba1093065a6fda3fa2ad2b2b9f30eea63.zip |
cargo fmt
Diffstat (limited to 'src/api/server_server.rs')
-rw-r--r-- | src/api/server_server.rs | 388 |
1 files changed, 266 insertions, 122 deletions
diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 11f7ec3..dba4489 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1,6 +1,7 @@ use crate::{ api::client_server::{self, claim_keys_helper, get_keys_helper}, - utils, Error, PduEvent, Result, Ruma, services, service::pdu::{gen_event_id_canonical_json, PduBuilder}, + service::pdu::{gen_event_id_canonical_json, PduBuilder}, + services, utils, Error, PduEvent, Result, Ruma, }; use axum::{response::IntoResponse, Json}; use futures_util::{stream::FuturesUnordered, StreamExt}; @@ -138,7 +139,8 @@ where let mut write_destination_to_cache = false; - let cached_result = services().globals + let cached_result = services() + .globals .actual_destination_cache .read() .unwrap() @@ -191,7 +193,10 @@ where .to_string() .into(), ); - request_map.insert("origin".to_owned(), services().globals.server_name().as_str().into()); + request_map.insert( + "origin".to_owned(), + services().globals.server_name().as_str().into(), + ); request_map.insert("destination".to_owned(), destination.as_str().into()); let mut request_json = @@ -238,7 +243,11 @@ where let url = reqwest_request.url().clone(); - let response = services().globals.federation_client().execute(reqwest_request).await; + let response = services() + .globals + .federation_client() + .execute(reqwest_request) + .await; match response { Ok(mut response) => { @@ -278,10 +287,15 @@ where if status == 200 { let response = T::IncomingResponse::try_from_http_response(http_response); if response.is_ok() && write_destination_to_cache { - services().globals.actual_destination_cache.write().unwrap().insert( - Box::<ServerName>::from(destination), - (actual_destination, host), - ); + services() + .globals + .actual_destination_cache + .write() + .unwrap() + .insert( + Box::<ServerName>::from(destination), + (actual_destination, host), + ); } response.map_err(|e| { @@ -329,9 +343,7 @@ fn add_port_to_hostname(destination_str: &str) -> FedDest { /// Returns: actual_destination, host header /// Implemented according to the specification at https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names /// Numbers in comments below refer to bullet points in linked section of specification -async fn find_actual_destination( - destination: &'_ ServerName, -) -> (FedDest, FedDest) { +async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) { let destination_str = destination.as_str().to_owned(); let mut hostname = destination_str.clone(); let actual_destination = match get_ip_with_port(&destination_str) { @@ -364,18 +376,24 @@ async fn find_actual_destination( // 3.3: SRV lookup successful let force_port = hostname_override.port(); - if let Ok(override_ip) = services().globals + if let Ok(override_ip) = services() + .globals .dns_resolver() .lookup_ip(hostname_override.hostname()) .await { - services().globals.tls_name_override.write().unwrap().insert( - delegated_hostname.clone(), - ( - override_ip.iter().collect(), - force_port.unwrap_or(8448), - ), - ); + services() + .globals + .tls_name_override + .write() + .unwrap() + .insert( + delegated_hostname.clone(), + ( + override_ip.iter().collect(), + force_port.unwrap_or(8448), + ), + ); } else { warn!("Using SRV record, but could not resolve to IP"); } @@ -400,15 +418,24 @@ async fn find_actual_destination( Some(hostname_override) => { let force_port = hostname_override.port(); - if let Ok(override_ip) = services().globals + if let Ok(override_ip) = services() + .globals .dns_resolver() .lookup_ip(hostname_override.hostname()) .await { - services().globals.tls_name_override.write().unwrap().insert( - hostname.clone(), - (override_ip.iter().collect(), force_port.unwrap_or(8448)), - ); + services() + .globals + .tls_name_override + .write() + .unwrap() + .insert( + hostname.clone(), + ( + override_ip.iter().collect(), + force_port.unwrap_or(8448), + ), + ); } else { warn!("Using SRV record, but could not resolve to IP"); } @@ -443,10 +470,9 @@ async fn find_actual_destination( (actual_destination, hostname) } -async fn query_srv_record( - hostname: &'_ str, -) -> Option<FedDest> { - if let Ok(Some(host_port)) = services().globals +async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> { + if let Ok(Some(host_port)) = services() + .globals .dns_resolver() .srv_lookup(format!("_matrix._tcp.{}", hostname)) .await @@ -465,11 +491,10 @@ async fn query_srv_record( } } -async fn request_well_known( - destination: &str, -) -> Option<String> { +async fn request_well_known(destination: &str) -> Option<String> { let body: serde_json::Value = serde_json::from_str( - &services().globals + &services() + .globals .default_client() .get(&format!( "https://{}/.well-known/matrix/server", @@ -664,15 +689,22 @@ pub async fn send_transaction_message_route( Some(id) => id, None => { // Event is invalid - resolved_map.insert(event_id, Err(Error::bad_database("Event needs a valid RoomId."))); + resolved_map.insert( + event_id, + Err(Error::bad_database("Event needs a valid RoomId.")), + ); continue; } }; - services().rooms.event_handler.acl_check(&sender_servername, &room_id)?; + services() + .rooms + .event_handler + .acl_check(&sender_servername, &room_id)?; let mutex = Arc::clone( - services().globals + services() + .globals .roomid_mutex_federation .write() .unwrap() @@ -683,16 +715,19 @@ pub async fn send_transaction_message_route( let start_time = Instant::now(); resolved_map.insert( event_id.clone(), - services().rooms.event_handler.handle_incoming_pdu( - &sender_servername, - &event_id, - &room_id, - value, - true, - &pub_key_map, - ) - .await - .map(|_| ()), + services() + .rooms + .event_handler + .handle_incoming_pdu( + &sender_servername, + &event_id, + &room_id, + value, + true, + &pub_key_map, + ) + .await + .map(|_| ()), ); drop(mutex_lock); @@ -727,7 +762,13 @@ pub async fn send_transaction_message_route( .event_ids .iter() .filter_map(|id| { - services().rooms.timeline.get_pdu_count(id).ok().flatten().map(|r| (id, r)) + services() + .rooms + .timeline + .get_pdu_count(id) + .ok() + .flatten() + .map(|r| (id, r)) }) .max_by_key(|(_, count)| *count) { @@ -744,11 +785,11 @@ pub async fn send_transaction_message_route( content: ReceiptEventContent(receipt_content), room_id: room_id.clone(), }; - services().rooms.edus.read_receipt.readreceipt_update( - &user_id, - &room_id, - event, - )?; + services() + .rooms + .edus + .read_receipt + .readreceipt_update(&user_id, &room_id, event)?; } else { // TODO fetch missing events info!("No known event ids in read receipt: {:?}", user_updates); @@ -757,7 +798,11 @@ pub async fn send_transaction_message_route( } } Edu::Typing(typing) => { - if services().rooms.state_cache.is_joined(&typing.user_id, &typing.room_id)? { + if services() + .rooms + .state_cache + .is_joined(&typing.user_id, &typing.room_id)? + { if typing.typing { services().rooms.edus.typing.typing_add( &typing.user_id, @@ -765,16 +810,16 @@ pub async fn send_transaction_message_route( 3000 + utils::millis_since_unix_epoch(), )?; } else { - services().rooms.edus.typing.typing_remove( - &typing.user_id, - &typing.room_id, - )?; + services() + .rooms + .edus + .typing + .typing_remove(&typing.user_id, &typing.room_id)?; } } } Edu::DeviceListUpdate(DeviceListUpdateContent { user_id, .. }) => { - services().users - .mark_device_key_update(&user_id)?; + services().users.mark_device_key_update(&user_id)?; } Edu::DirectToDevice(DirectDeviceContent { sender, @@ -810,7 +855,9 @@ pub async fn send_transaction_message_route( } DeviceIdOrAllDevices::AllDevices => { - for target_device_id in services().users.all_device_ids(target_user_id) { + for target_device_id in + services().users.all_device_ids(target_user_id) + { services().users.add_to_device_event( &sender, target_user_id, @@ -830,7 +877,8 @@ pub async fn send_transaction_message_route( } // Save transaction id with empty data - services().transaction_ids + services() + .transaction_ids .add_txnid(&sender, None, &message_id, &[])?; } Edu::SigningKeyUpdate(SigningKeyUpdateContent { @@ -854,7 +902,12 @@ pub async fn send_transaction_message_route( } } - Ok(send_transaction_message::v1::Response { pdus: resolved_map.into_iter().map(|(e, r)| (e, r.map_err(|e| e.to_string()))).collect() }) + Ok(send_transaction_message::v1::Response { + pdus: resolved_map + .into_iter() + .map(|(e, r)| (e, r.map_err(|e| e.to_string()))) + .collect(), + }) } /// # `GET /_matrix/federation/v1/event/{eventId}` @@ -875,7 +928,8 @@ pub async fn get_event_route( .expect("server is authenticated"); let event = services() - .rooms.timeline + .rooms + .timeline .get_pdu_json(&body.event_id)? .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?; @@ -887,7 +941,11 @@ pub async fn get_event_route( let room_id = <&RoomId>::try_from(room_id_str) .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - if !services().rooms.state_cache.server_in_room(sender_servername, room_id)? { + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, room_id)? + { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room", @@ -916,14 +974,21 @@ pub async fn get_missing_events_route( .as_ref() .expect("server is authenticated"); - if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? { + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room", )); } - services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; + services() + .rooms + .event_handler + .acl_check(&sender_servername, &body.room_id)?; let mut queued_events = body.latest_events.clone(); let mut events = Vec::new(); @@ -988,17 +1053,25 @@ pub async fn get_event_authorization_route( .as_ref() .expect("server is authenticated"); - if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? { + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room.", )); } - services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; + services() + .rooms + .event_handler + .acl_check(&sender_servername, &body.room_id)?; let event = services() - .rooms.timeline + .rooms + .timeline .get_pdu_json(&body.event_id)? .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?; @@ -1010,7 +1083,11 @@ pub async fn get_event_authorization_route( let room_id = <&RoomId>::try_from(room_id_str) .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - let auth_chain_ids = services().rooms.auth_chain.get_auth_chain(room_id, vec![Arc::from(&*body.event_id)]).await?; + let auth_chain_ids = services() + .rooms + .auth_chain + .get_auth_chain(room_id, vec![Arc::from(&*body.event_id)]) + .await?; Ok(get_event_authorization::v1::Response { auth_chain: auth_chain_ids @@ -1035,17 +1112,25 @@ pub async fn get_room_state_route( .as_ref() .expect("server is authenticated"); - if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? { + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room.", )); } - services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; + services() + .rooms + .event_handler + .acl_check(&sender_servername, &body.room_id)?; let shortstatehash = services() - .rooms.state_accessor + .rooms + .state_accessor .pdu_shortstatehash(&body.event_id)? .ok_or(Error::BadRequest( ErrorKind::NotFound, @@ -1053,26 +1138,39 @@ pub async fn get_room_state_route( ))?; let pdus = services() - .rooms.state_accessor + .rooms + .state_accessor .state_full_ids(shortstatehash) .await? .into_iter() .map(|(_, id)| { PduEvent::convert_to_outgoing_federation_event( - services().rooms.timeline.get_pdu_json(&id).unwrap().unwrap(), + services() + .rooms + .timeline + .get_pdu_json(&id) + .unwrap() + .unwrap(), ) }) .collect(); - let auth_chain_ids = - services().rooms.auth_chain.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?; + let auth_chain_ids = services() + .rooms + .auth_chain + .get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]) + .await?; Ok(get_room_state::v1::Response { auth_chain: auth_chain_ids .map(|id| { - services().rooms.timeline.get_pdu_json(&id).map(|maybe_json| { - PduEvent::convert_to_outgoing_federation_event(maybe_json.unwrap()) - }) + services() + .rooms + .timeline + .get_pdu_json(&id) + .map(|maybe_json| { + PduEvent::convert_to_outgoing_federation_event(maybe_json.unwrap()) + }) }) .filter_map(|r| r.ok()) .collect(), @@ -1095,17 +1193,25 @@ pub async fn get_room_state_ids_route( .as_ref() .expect("server is authenticated"); - if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? { + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room.", )); } - services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; + services() + .rooms + .event_handler + .acl_check(&sender_servername, &body.room_id)?; let shortstatehash = services() - .rooms.state_accessor + .rooms + .state_accessor .pdu_shortstatehash(&body.event_id)? .ok_or(Error::BadRequest( ErrorKind::NotFound, @@ -1113,15 +1219,19 @@ pub async fn get_room_state_ids_route( ))?; let pdu_ids = services() - .rooms.state_accessor + .rooms + .state_accessor .state_full_ids(shortstatehash) .await? .into_iter() .map(|(_, id)| (*id).to_owned()) .collect(); - let auth_chain_ids = - services().rooms.auth_chain.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?; + let auth_chain_ids = services() + .rooms + .auth_chain + .get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]) + .await?; Ok(get_room_state_ids::v1::Response { auth_chain_ids: auth_chain_ids.map(|id| (*id).to_owned()).collect(), @@ -1151,10 +1261,14 @@ pub async fn create_join_event_template_route( .as_ref() .expect("server is authenticated"); - services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; + services() + .rooms + .event_handler + .acl_check(&sender_servername, &body.room_id)?; let mutex_state = Arc::clone( - services().globals + services() + .globals .roomid_mutex_state .write() .unwrap() @@ -1164,9 +1278,11 @@ pub async fn create_join_event_template_route( let state_lock = mutex_state.lock().await; // TODO: Conduit does not implement restricted join rules yet, we always reject - let join_rules_event = - services().rooms.state_accessor - .room_state_get(&body.room_id, &StateEventType::RoomJoinRules, "")?; + let join_rules_event = services().rooms.state_accessor.room_state_get( + &body.room_id, + &StateEventType::RoomJoinRules, + "", + )?; let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event .as_ref() @@ -1212,13 +1328,18 @@ pub async fn create_join_event_template_route( }) .expect("member event is valid value"); - let (pdu, pdu_json) = services().rooms.timeline.create_hash_and_sign_event(PduBuilder { - event_type: RoomEventType::RoomMember, - content, - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, &body.user_id, &body.room_id, &state_lock)?; + let (pdu, pdu_json) = services().rooms.timeline.create_hash_and_sign_event( + PduBuilder { + event_type: RoomEventType::RoomMember, + content, + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + &body.user_id, + &body.room_id, + &state_lock, + )?; drop(state_lock); @@ -1244,12 +1365,17 @@ async fn create_join_event( )); } - services().rooms.event_handler.acl_check(&sender_servername, room_id)?; + services() + .rooms + .event_handler + .acl_check(&sender_servername, room_id)?; // TODO: Conduit does not implement restricted join rules yet, we always reject - let join_rules_event = services() - .rooms.state_accessor - .room_state_get(room_id, &StateEventType::RoomJoinRules, "")?; + let join_rules_event = services().rooms.state_accessor.room_state_get( + room_id, + &StateEventType::RoomJoinRules, + "", + )?; let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event .as_ref() @@ -1275,7 +1401,8 @@ async fn create_join_event( // We need to return the state prior to joining, let's keep a reference to that here let shortstatehash = services() - .rooms.state + .rooms + .state .get_room_shortstatehash(room_id)? .ok_or(Error::BadRequest( ErrorKind::NotFound, @@ -1307,7 +1434,8 @@ async fn create_join_event( .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?; let mutex = Arc::clone( - services().globals + services() + .globals .roomid_mutex_federation .write() .unwrap() @@ -1315,7 +1443,10 @@ async fn create_join_event( .or_default(), ); let mutex_lock = mutex.lock().await; - let pdu_id: Vec<u8> = services().rooms.event_handler.handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map) + let pdu_id: Vec<u8> = services() + .rooms + .event_handler + .handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map) .await? .ok_or(Error::BadRequest( ErrorKind::InvalidParam, @@ -1323,12 +1454,19 @@ async fn create_join_event( ))?; drop(mutex_lock); - let state_ids = services().rooms.state_accessor.state_full_ids(shortstatehash).await?; - let auth_chain_ids = services().rooms.auth_chain.get_auth_chain( - room_id, - state_ids.iter().map(|(_, id)| id.clone()).collect(), - ) - .await?; + let state_ids = services() + .rooms + .state_accessor + .state_full_ids(shortstatehash) + .await?; + let auth_chain_ids = services() + .rooms + .auth_chain + .get_auth_chain( + room_id, + state_ids.iter().map(|(_, id)| id.clone()).collect(), + ) + .await?; let servers = services() .rooms @@ -1399,9 +1537,16 @@ pub async fn create_invite_route( .as_ref() .expect("server is authenticated"); - services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; + services() + .rooms + .event_handler + .acl_check(&sender_servername, &body.room_id)?; - if !services().globals.supported_room_versions().contains(&body.room_version) { + if !services() + .globals + .supported_room_versions() + .contains(&body.room_version) + { return Err(Error::BadRequest( ErrorKind::IncompatibleRoomVersion { room_version: body.room_version.clone(), @@ -1549,7 +1694,8 @@ pub async fn get_room_information_route( let room_id = services() .rooms - .alias.resolve_local_alias(&body.room_alias)? + .alias + .resolve_local_alias(&body.room_alias)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "Room alias not found.", @@ -1576,7 +1722,9 @@ pub async fn get_profile_information_route( let mut blurhash = None; match &body.field { - Some(ProfileField::DisplayName) => displayname = services().users.displayname(&body.user_id)?, + Some(ProfileField::DisplayName) => { + displayname = services().users.displayname(&body.user_id)? + } Some(ProfileField::AvatarUrl) => { avatar_url = services().users.avatar_url(&body.user_id)?; blurhash = services().users.blurhash(&body.user_id)? @@ -1600,18 +1748,14 @@ pub async fn get_profile_information_route( /// # `POST /_matrix/federation/v1/user/keys/query` /// /// Gets devices and identity keys for the given users. -pub async fn get_keys_route( - body: Ruma<get_keys::v1::Request>, -) -> Result<get_keys::v1::Response> { +pub async fn get_keys_route(body: Ruma<get_keys::v1::Request>) -> Result<get_keys::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); } - let result = get_keys_helper( - None, - &body.device_keys, - |u| Some(u.server_name()) == body.sender_servername.as_deref(), - ) + let result = get_keys_helper(None, &body.device_keys, |u| { + Some(u.server_name()) == body.sender_servername.as_deref() + }) .await?; Ok(get_keys::v1::Response { |