diff options
author | Timo Kösters <timo@koesters.xyz> | 2023-08-10 17:01:56 +0000 |
---|---|---|
committer | Timo Kösters <timo@koesters.xyz> | 2023-08-10 17:01:56 +0000 |
commit | 0c2cfda3ae923d9e922d5edf379e4d8976a52d4e (patch) | |
tree | a8a8ecddf12f8ea183fc8f9948d8483648b9e187 /src/api/server_server.rs | |
parent | 53f14a2c4c216b529cc63137d8704573197aed19 (diff) | |
parent | 4bf8ee1f7481a222efe87235fa400f6cd14ebd11 (diff) | |
download | conduit-0c2cfda3ae923d9e922d5edf379e4d8976a52d4e.zip |
Merge branch 'next' into 'master'v0.6.0
Merge remote-tracking branch 'origin/next'
See merge request famedly/conduit!538
Diffstat (limited to 'src/api/server_server.rs')
-rw-r--r-- | src/api/server_server.rs | 295 |
1 files changed, 222 insertions, 73 deletions
diff --git a/src/api/server_server.rs b/src/api/server_server.rs index fc3e2c0..bb92405 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1,3 +1,5 @@ +#![allow(deprecated)] + use crate::{ api::client_server::{self, claim_keys_helper, get_keys_helper}, service::pdu::{gen_event_id_canonical_json, PduBuilder}, @@ -12,16 +14,13 @@ use ruma::{ client::error::{Error as RumaError, ErrorKind}, federation::{ authorization::get_event_authorization, + backfill::get_backfill, device::get_devices::{self, v1::UserDevice}, directory::{get_public_rooms, get_public_rooms_filtered}, discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey}, event::{get_event, get_missing_events, get_room_state, get_room_state_ids}, keys::{claim_keys, get_keys}, - membership::{ - create_invite, - create_join_event::{self, RoomState}, - prepare_join_event, - }, + membership::{create_invite, create_join_event, prepare_join_event}, query::{get_profile_information, get_room_information}, transactions::{ edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent}, @@ -38,12 +37,13 @@ use ruma::{ join_rules::{JoinRule, RoomJoinRulesEventContent}, member::{MembershipState, RoomMemberEventContent}, }, - RoomEventType, StateEventType, + StateEventType, TimelineEventType, }, serde::{Base64, JsonObject, Raw}, to_device::DeviceIdOrAllDevices, - CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, - OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, + uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, + OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, + ServerName, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ @@ -55,7 +55,7 @@ use std::{ time::{Duration, Instant, SystemTime}, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, warn}; /// Wraps either an literal IP address plus port, or a hostname plus complement /// (colon-plus-port if it was specified). @@ -123,6 +123,14 @@ where return Err(Error::bad_config("Federation is disabled.")); } + if destination == services().globals.server_name() { + return Err(Error::bad_config( + "Won't send federation request to ourselves", + )); + } + + debug!("Preparing to send request to {destination}"); + let mut write_destination_to_cache = false; let cached_result = services() @@ -149,7 +157,7 @@ where .try_into_http_request::<Vec<u8>>( &actual_destination_str, SendAccessToken::IfRequired(""), - &[MatrixVersion::V1_0], + &[MatrixVersion::V1_4], ) .map_err(|e| { warn!( @@ -229,11 +237,13 @@ where let url = reqwest_request.url().clone(); + debug!("Sending request to {destination} at {url}"); let response = services() .globals .federation_client() .execute(reqwest_request) .await; + debug!("Received response from {destination} at {url}"); match response { Ok(mut response) => { @@ -249,10 +259,12 @@ where .expect("http::response::Builder is usable"), ); + debug!("Getting response bytes from {destination}"); let body = response.bytes().await.unwrap_or_else(|e| { warn!("server error {}", e); Vec::new().into() }); // TODO: handle timeout + debug!("Got response bytes from {destination}"); if status != 200 { warn!( @@ -271,6 +283,7 @@ where .expect("reqwest body is valid http body"); if status == 200 { + debug!("Parsing response bytes from {destination}"); let response = T::IncomingResponse::try_from_http_response(http_response); if response.is_ok() && write_destination_to_cache { services() @@ -292,6 +305,7 @@ where Error::BadServerResponse("Server returned bad 200 response.") }) } else { + debug!("Returning error from {destination}"); Err(Error::FederationError( destination.to_owned(), RumaError::from_http_response(http_response), @@ -330,36 +344,38 @@ fn add_port_to_hostname(destination_str: &str) -> FedDest { /// Implemented according to the specification at https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names /// Numbers in comments below refer to bullet points in linked section of specification async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) { + debug!("Finding actual destination for {destination}"); let destination_str = destination.as_str().to_owned(); let mut hostname = destination_str.clone(); let actual_destination = match get_ip_with_port(&destination_str) { Some(host_port) => { - // 1: IP literal with provided or default port + debug!("1: IP literal with provided or default port"); host_port } None => { if let Some(pos) = destination_str.find(':') { - // 2: Hostname with included port + debug!("2: Hostname with included port"); let (host, port) = destination_str.split_at(pos); FedDest::Named(host.to_owned(), port.to_owned()) } else { + debug!("Requesting well known for {destination}"); match request_well_known(destination.as_str()).await { - // 3: A .well-known file is available Some(delegated_hostname) => { + debug!("3: A .well-known file is available"); hostname = add_port_to_hostname(&delegated_hostname).into_uri_string(); match get_ip_with_port(&delegated_hostname) { Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file None => { if let Some(pos) = delegated_hostname.find(':') { - // 3.2: Hostname with port in .well-known file + debug!("3.2: Hostname with port in .well-known file"); let (host, port) = delegated_hostname.split_at(pos); FedDest::Named(host.to_owned(), port.to_owned()) } else { - // Delegated hostname has no port in this branch + debug!("Delegated hostname has no port in this branch"); if let Some(hostname_override) = query_srv_record(&delegated_hostname).await { - // 3.3: SRV lookup successful + debug!("3.3: SRV lookup successful"); let force_port = hostname_override.port(); if let Ok(override_ip) = services() @@ -390,18 +406,18 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe add_port_to_hostname(&delegated_hostname) } } else { - // 3.4: No SRV records, just use the hostname from .well-known + debug!("3.4: No SRV records, just use the hostname from .well-known"); add_port_to_hostname(&delegated_hostname) } } } } } - // 4: No .well-known or an error occured None => { + debug!("4: No .well-known or an error occured"); match query_srv_record(&destination_str).await { - // 4: SRV record found Some(hostname_override) => { + debug!("4: SRV record found"); let force_port = hostname_override.port(); if let Ok(override_ip) = services() @@ -432,14 +448,17 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe add_port_to_hostname(&hostname) } } - // 5: No SRV record found - None => add_port_to_hostname(&destination_str), + None => { + debug!("5: No SRV record found"); + add_port_to_hostname(&destination_str) + } } } } } } }; + debug!("Actual destination: {actual_destination:?}"); // Can't use get_ip_with_port here because we don't want to add a port // to an IP address if it wasn't specified @@ -457,10 +476,11 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe } async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> { + let hostname = hostname.trim_end_matches('.'); if let Ok(Some(host_port)) = services() .globals .dns_resolver() - .srv_lookup(format!("_matrix._tcp.{hostname}")) + .srv_lookup(format!("_matrix._tcp.{hostname}.")) .await .map(|srv| { srv.iter().next().map(|result| { @@ -478,19 +498,20 @@ async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> { } async fn request_well_known(destination: &str) -> Option<String> { - let body: serde_json::Value = serde_json::from_str( - &services() - .globals - .default_client() - .get(&format!("https://{destination}/.well-known/matrix/server")) - .send() - .await - .ok()? - .text() - .await - .ok()?, - ) - .ok()?; + let response = services() + .globals + .default_client() + .get(&format!("https://{destination}/.well-known/matrix/server")) + .send() + .await; + debug!("Got well known response"); + if let Err(e) = &response { + debug!("Well known error: {e:?}"); + return None; + } + let text = response.ok()?.text().await; + debug!("Got well known response text"); + let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?; Some(body.get("m.server")?.as_str()?.to_owned()) } @@ -627,6 +648,37 @@ pub async fn get_public_rooms_route( }) } +pub fn parse_incoming_pdu( + pdu: &RawJsonValue, +) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> { + let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { + warn!("Error parsing incoming event {:?}: {:?}", pdu, e); + Error::BadServerResponse("Invalid PDU in server response") + })?; + + let room_id: OwnedRoomId = value + .get("room_id") + .and_then(|id| RoomId::parse(id.as_str()?).ok()) + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Invalid room id in pdu", + ))?; + + let room_version_id = services().rooms.state.get_room_version(&room_id)?; + + let (event_id, value) = match gen_event_id_canonical_json(&pdu, &room_version_id) { + Ok(t) => t, + Err(_) => { + // Event could not be converted to canonical json + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Could not convert event to canonical json.", + )); + } + }; + Ok((event_id, value, room_id)) +} + /// # `PUT /_matrix/federation/v1/send/{txnId}` /// /// Push EDUs and PDUs to this server. @@ -655,33 +707,12 @@ pub async fn send_transaction_message_route( // let mut auth_cache = EventMap::new(); for pdu in &body.pdus { - let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { - warn!("Error parsing incoming event {:?}: {:?}", pdu, e); - Error::BadServerResponse("Invalid PDU in server response") - })?; - - let room_id: OwnedRoomId = match value - .get("room_id") - .and_then(|id| RoomId::parse(id.as_str()?).ok()) - { - Some(id) => id, - None => { - // Event is invalid - continue; - } - }; - - let room_version_id = match services().rooms.state.get_room_version(&room_id) { - Ok(v) => v, - Err(_) => { - continue; - } - }; - - let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) { + let r = parse_incoming_pdu(&pdu); + let (event_id, value, room_id) = match r { Ok(t) => t, - Err(_) => { - // Event could not be converted to canonical json + Err(e) => { + warn!("Could not parse PDU: {e}"); + warn!("Full PDU: {:?}", &pdu); continue; } }; @@ -782,7 +813,7 @@ pub async fn send_transaction_message_route( .readreceipt_update(&user_id, &room_id, event)?; } else { // TODO fetch missing events - info!("No known event ids in read receipt: {:?}", user_updates); + debug!("No known event ids in read receipt: {:?}", user_updates); } } } @@ -886,6 +917,7 @@ pub async fn send_transaction_message_route( &master_key, &self_signing_key, &None, + true, )?; } } @@ -896,7 +928,7 @@ pub async fn send_transaction_message_route( Ok(send_transaction_message::v1::Response { pdus: resolved_map .into_iter() - .map(|(e, r)| (e, r.map_err(|e| e.to_string()))) + .map(|(e, r)| (e, r.map_err(|e| e.sanitized_error()))) .collect(), }) } @@ -922,7 +954,10 @@ pub async fn get_event_route( .rooms .timeline .get_pdu_json(&body.event_id)? - .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?; + .ok_or_else(|| { + warn!("Event not found, event ID: {:?}", &body.event_id); + Error::BadRequest(ErrorKind::NotFound, "Event not found.") + })?; let room_id_str = event .get("room_id") @@ -943,6 +978,17 @@ pub async fn get_event_route( )); } + if !services().rooms.state_accessor.server_can_see_event( + sender_servername, + &room_id, + &body.event_id, + )? { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Server is not allowed to see event.", + )); + } + Ok(get_event::v1::Response { origin: services().globals.server_name().to_owned(), origin_server_ts: MilliSecondsSinceUnixEpoch::now(), @@ -950,6 +996,83 @@ pub async fn get_event_route( }) } +/// # `GET /_matrix/federation/v1/backfill/<room_id>` +/// +/// Retrieves events from before the sender joined the room, if the room's +/// history visibility allows. +pub async fn get_backfill_route( + body: Ruma<get_backfill::v1::Request>, +) -> Result<get_backfill::v1::Response> { + if !services().globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + let sender_servername = body + .sender_servername + .as_ref() + .expect("server is authenticated"); + + debug!("Got backfill request from: {}", sender_servername); + + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Server is not in room.", + )); + } + + services() + .rooms + .event_handler + .acl_check(sender_servername, &body.room_id)?; + + let until = body + .v + .iter() + .map(|eventid| services().rooms.timeline.get_pdu_count(eventid)) + .filter_map(|r| r.ok().flatten()) + .max() + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "No known eventid in v", + ))?; + + let limit = body.limit.min(uint!(100)); + + let all_events = services() + .rooms + .timeline + .pdus_until(&user_id!("@doesntmatter:conduit.rs"), &body.room_id, until)? + .take(limit.try_into().unwrap()); + + let events = all_events + .filter_map(|r| r.ok()) + .filter(|(_, e)| { + matches!( + services().rooms.state_accessor.server_can_see_event( + sender_servername, + &e.room_id, + &e.event_id, + ), + Ok(true), + ) + }) + .map(|(_, pdu)| services().rooms.timeline.get_pdu_json(&pdu.event_id)) + .filter_map(|r| r.ok().flatten()) + .map(|pdu| PduEvent::convert_to_outgoing_federation_event(pdu)) + .collect(); + + Ok(get_backfill::v1::Response { + origin: services().globals.server_name().to_owned(), + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + pdus: events, + }) +} + /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// /// Retrieves events that the sender is missing. @@ -1010,6 +1133,16 @@ pub async fn get_missing_events_route( i += 1; continue; } + + if !services().rooms.state_accessor.server_can_see_event( + sender_servername, + &body.room_id, + &queued_events[i], + )? { + i += 1; + continue; + } + queued_events.extend_from_slice( &serde_json::from_value::<Vec<OwnedEventId>>( serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { @@ -1064,7 +1197,10 @@ pub async fn get_event_authorization_route( .rooms .timeline .get_pdu_json(&body.event_id)? - .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?; + .ok_or_else(|| { + warn!("Event not found, event ID: {:?}", &body.event_id); + Error::BadRequest(ErrorKind::NotFound, "Event not found.") + })?; let room_id_str = event .get("room_id") @@ -1320,7 +1456,7 @@ pub async fn create_join_event_template_route( let (_pdu, mut pdu_json) = services().rooms.timeline.create_hash_and_sign_event( PduBuilder { - event_type: RoomEventType::RoomMember, + event_type: TimelineEventType::RoomMember, content, unsigned: None, state_key: Some(body.user_id.to_string()), @@ -1345,7 +1481,7 @@ async fn create_join_event( sender_servername: &ServerName, room_id: &RoomId, pdu: &RawJsonValue, -) -> Result<RoomState> { +) -> Result<create_join_event::v1::RoomState> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); } @@ -1467,7 +1603,7 @@ async fn create_join_event( services().sending.send_pdu(servers, &pdu_id)?; - Ok(RoomState { + Ok(create_join_event::v1::RoomState { auth_chain: auth_chain_ids .filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten()) .map(PduEvent::convert_to_outgoing_federation_event) @@ -1508,7 +1644,18 @@ pub async fn create_join_event_v2_route( .as_ref() .expect("server is authenticated"); - let room_state = create_join_event(sender_servername, &body.room_id, &body.pdu).await?; + let create_join_event::v1::RoomState { + auth_chain, + state, + event, + } = create_join_event(sender_servername, &body.room_id, &body.pdu).await?; + let room_state = create_join_event::v2::RoomState { + members_omitted: false, + auth_chain, + state, + event, + servers_in_room: None, + }; Ok(create_join_event::v2::Response { room_state }) } @@ -1668,12 +1815,14 @@ pub async fn get_devices_route( }) }) .collect(), - master_key: services() - .users - .get_master_key(&body.user_id, &|u| u.server_name() == sender_servername)?, + master_key: services().users.get_master_key(None, &body.user_id, &|u| { + u.server_name() == sender_servername + })?, self_signing_key: services() .users - .get_self_signing_key(&body.user_id, &|u| u.server_name() == sender_servername)?, + .get_self_signing_key(None, &body.user_id, &|u| { + u.server_name() == sender_servername + })?, }) } |