summaryrefslogtreecommitdiff
path: root/src/api/server_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/server_server.rs')
-rw-r--r--src/api/server_server.rs295
1 files changed, 222 insertions, 73 deletions
diff --git a/src/api/server_server.rs b/src/api/server_server.rs
index fc3e2c0..bb92405 100644
--- a/src/api/server_server.rs
+++ b/src/api/server_server.rs
@@ -1,3 +1,5 @@
+#![allow(deprecated)]
+
use crate::{
api::client_server::{self, claim_keys_helper, get_keys_helper},
service::pdu::{gen_event_id_canonical_json, PduBuilder},
@@ -12,16 +14,13 @@ use ruma::{
client::error::{Error as RumaError, ErrorKind},
federation::{
authorization::get_event_authorization,
+ backfill::get_backfill,
device::get_devices::{self, v1::UserDevice},
directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
keys::{claim_keys, get_keys},
- membership::{
- create_invite,
- create_join_event::{self, RoomState},
- prepare_join_event,
- },
+ membership::{create_invite, create_join_event, prepare_join_event},
query::{get_profile_information, get_room_information},
transactions::{
edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent},
@@ -38,12 +37,13 @@ use ruma::{
join_rules::{JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
},
- RoomEventType, StateEventType,
+ StateEventType, TimelineEventType,
},
serde::{Base64, JsonObject, Raw},
to_device::DeviceIdOrAllDevices,
- CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
- OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
+ uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
+ OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId,
+ ServerName,
};
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use std::{
@@ -55,7 +55,7 @@ use std::{
time::{Duration, Instant, SystemTime},
};
-use tracing::{debug, error, info, warn};
+use tracing::{debug, error, warn};
/// Wraps either an literal IP address plus port, or a hostname plus complement
/// (colon-plus-port if it was specified).
@@ -123,6 +123,14 @@ where
return Err(Error::bad_config("Federation is disabled."));
}
+ if destination == services().globals.server_name() {
+ return Err(Error::bad_config(
+ "Won't send federation request to ourselves",
+ ));
+ }
+
+ debug!("Preparing to send request to {destination}");
+
let mut write_destination_to_cache = false;
let cached_result = services()
@@ -149,7 +157,7 @@ where
.try_into_http_request::<Vec<u8>>(
&actual_destination_str,
SendAccessToken::IfRequired(""),
- &[MatrixVersion::V1_0],
+ &[MatrixVersion::V1_4],
)
.map_err(|e| {
warn!(
@@ -229,11 +237,13 @@ where
let url = reqwest_request.url().clone();
+ debug!("Sending request to {destination} at {url}");
let response = services()
.globals
.federation_client()
.execute(reqwest_request)
.await;
+ debug!("Received response from {destination} at {url}");
match response {
Ok(mut response) => {
@@ -249,10 +259,12 @@ where
.expect("http::response::Builder is usable"),
);
+ debug!("Getting response bytes from {destination}");
let body = response.bytes().await.unwrap_or_else(|e| {
warn!("server error {}", e);
Vec::new().into()
}); // TODO: handle timeout
+ debug!("Got response bytes from {destination}");
if status != 200 {
warn!(
@@ -271,6 +283,7 @@ where
.expect("reqwest body is valid http body");
if status == 200 {
+ debug!("Parsing response bytes from {destination}");
let response = T::IncomingResponse::try_from_http_response(http_response);
if response.is_ok() && write_destination_to_cache {
services()
@@ -292,6 +305,7 @@ where
Error::BadServerResponse("Server returned bad 200 response.")
})
} else {
+ debug!("Returning error from {destination}");
Err(Error::FederationError(
destination.to_owned(),
RumaError::from_http_response(http_response),
@@ -330,36 +344,38 @@ fn add_port_to_hostname(destination_str: &str) -> FedDest {
/// Implemented according to the specification at https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names
/// Numbers in comments below refer to bullet points in linked section of specification
async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
+ debug!("Finding actual destination for {destination}");
let destination_str = destination.as_str().to_owned();
let mut hostname = destination_str.clone();
let actual_destination = match get_ip_with_port(&destination_str) {
Some(host_port) => {
- // 1: IP literal with provided or default port
+ debug!("1: IP literal with provided or default port");
host_port
}
None => {
if let Some(pos) = destination_str.find(':') {
- // 2: Hostname with included port
+ debug!("2: Hostname with included port");
let (host, port) = destination_str.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
} else {
+ debug!("Requesting well known for {destination}");
match request_well_known(destination.as_str()).await {
- // 3: A .well-known file is available
Some(delegated_hostname) => {
+ debug!("3: A .well-known file is available");
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
match get_ip_with_port(&delegated_hostname) {
Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
None => {
if let Some(pos) = delegated_hostname.find(':') {
- // 3.2: Hostname with port in .well-known file
+ debug!("3.2: Hostname with port in .well-known file");
let (host, port) = delegated_hostname.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
} else {
- // Delegated hostname has no port in this branch
+ debug!("Delegated hostname has no port in this branch");
if let Some(hostname_override) =
query_srv_record(&delegated_hostname).await
{
- // 3.3: SRV lookup successful
+ debug!("3.3: SRV lookup successful");
let force_port = hostname_override.port();
if let Ok(override_ip) = services()
@@ -390,18 +406,18 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
add_port_to_hostname(&delegated_hostname)
}
} else {
- // 3.4: No SRV records, just use the hostname from .well-known
+ debug!("3.4: No SRV records, just use the hostname from .well-known");
add_port_to_hostname(&delegated_hostname)
}
}
}
}
}
- // 4: No .well-known or an error occured
None => {
+ debug!("4: No .well-known or an error occured");
match query_srv_record(&destination_str).await {
- // 4: SRV record found
Some(hostname_override) => {
+ debug!("4: SRV record found");
let force_port = hostname_override.port();
if let Ok(override_ip) = services()
@@ -432,14 +448,17 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
add_port_to_hostname(&hostname)
}
}
- // 5: No SRV record found
- None => add_port_to_hostname(&destination_str),
+ None => {
+ debug!("5: No SRV record found");
+ add_port_to_hostname(&destination_str)
+ }
}
}
}
}
}
};
+ debug!("Actual destination: {actual_destination:?}");
// Can't use get_ip_with_port here because we don't want to add a port
// to an IP address if it wasn't specified
@@ -457,10 +476,11 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
}
async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
+ let hostname = hostname.trim_end_matches('.');
if let Ok(Some(host_port)) = services()
.globals
.dns_resolver()
- .srv_lookup(format!("_matrix._tcp.{hostname}"))
+ .srv_lookup(format!("_matrix._tcp.{hostname}."))
.await
.map(|srv| {
srv.iter().next().map(|result| {
@@ -478,19 +498,20 @@ async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
}
async fn request_well_known(destination: &str) -> Option<String> {
- let body: serde_json::Value = serde_json::from_str(
- &services()
- .globals
- .default_client()
- .get(&format!("https://{destination}/.well-known/matrix/server"))
- .send()
- .await
- .ok()?
- .text()
- .await
- .ok()?,
- )
- .ok()?;
+ let response = services()
+ .globals
+ .default_client()
+ .get(&format!("https://{destination}/.well-known/matrix/server"))
+ .send()
+ .await;
+ debug!("Got well known response");
+ if let Err(e) = &response {
+ debug!("Well known error: {e:?}");
+ return None;
+ }
+ let text = response.ok()?.text().await;
+ debug!("Got well known response text");
+ let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?;
Some(body.get("m.server")?.as_str()?.to_owned())
}
@@ -627,6 +648,37 @@ pub async fn get_public_rooms_route(
})
}
+pub fn parse_incoming_pdu(
+ pdu: &RawJsonValue,
+) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
+ let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
+ warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
+ Error::BadServerResponse("Invalid PDU in server response")
+ })?;
+
+ let room_id: OwnedRoomId = value
+ .get("room_id")
+ .and_then(|id| RoomId::parse(id.as_str()?).ok())
+ .ok_or(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Invalid room id in pdu",
+ ))?;
+
+ let room_version_id = services().rooms.state.get_room_version(&room_id)?;
+
+ let (event_id, value) = match gen_event_id_canonical_json(&pdu, &room_version_id) {
+ Ok(t) => t,
+ Err(_) => {
+ // Event could not be converted to canonical json
+ return Err(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Could not convert event to canonical json.",
+ ));
+ }
+ };
+ Ok((event_id, value, room_id))
+}
+
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
@@ -655,33 +707,12 @@ pub async fn send_transaction_message_route(
// let mut auth_cache = EventMap::new();
for pdu in &body.pdus {
- let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
- warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
- Error::BadServerResponse("Invalid PDU in server response")
- })?;
-
- let room_id: OwnedRoomId = match value
- .get("room_id")
- .and_then(|id| RoomId::parse(id.as_str()?).ok())
- {
- Some(id) => id,
- None => {
- // Event is invalid
- continue;
- }
- };
-
- let room_version_id = match services().rooms.state.get_room_version(&room_id) {
- Ok(v) => v,
- Err(_) => {
- continue;
- }
- };
-
- let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
+ let r = parse_incoming_pdu(&pdu);
+ let (event_id, value, room_id) = match r {
Ok(t) => t,
- Err(_) => {
- // Event could not be converted to canonical json
+ Err(e) => {
+ warn!("Could not parse PDU: {e}");
+ warn!("Full PDU: {:?}", &pdu);
continue;
}
};
@@ -782,7 +813,7 @@ pub async fn send_transaction_message_route(
.readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
- info!("No known event ids in read receipt: {:?}", user_updates);
+ debug!("No known event ids in read receipt: {:?}", user_updates);
}
}
}
@@ -886,6 +917,7 @@ pub async fn send_transaction_message_route(
&master_key,
&self_signing_key,
&None,
+ true,
)?;
}
}
@@ -896,7 +928,7 @@ pub async fn send_transaction_message_route(
Ok(send_transaction_message::v1::Response {
pdus: resolved_map
.into_iter()
- .map(|(e, r)| (e, r.map_err(|e| e.to_string())))
+ .map(|(e, r)| (e, r.map_err(|e| e.sanitized_error())))
.collect(),
})
}
@@ -922,7 +954,10 @@ pub async fn get_event_route(
.rooms
.timeline
.get_pdu_json(&body.event_id)?
- .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
+ .ok_or_else(|| {
+ warn!("Event not found, event ID: {:?}", &body.event_id);
+ Error::BadRequest(ErrorKind::NotFound, "Event not found.")
+ })?;
let room_id_str = event
.get("room_id")
@@ -943,6 +978,17 @@ pub async fn get_event_route(
));
}
+ if !services().rooms.state_accessor.server_can_see_event(
+ sender_servername,
+ &room_id,
+ &body.event_id,
+ )? {
+ return Err(Error::BadRequest(
+ ErrorKind::Forbidden,
+ "Server is not allowed to see event.",
+ ));
+ }
+
Ok(get_event::v1::Response {
origin: services().globals.server_name().to_owned(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
@@ -950,6 +996,83 @@ pub async fn get_event_route(
})
}
+/// # `GET /_matrix/federation/v1/backfill/<room_id>`
+///
+/// Retrieves events from before the sender joined the room, if the room's
+/// history visibility allows.
+pub async fn get_backfill_route(
+ body: Ruma<get_backfill::v1::Request>,
+) -> Result<get_backfill::v1::Response> {
+ if !services().globals.allow_federation() {
+ return Err(Error::bad_config("Federation is disabled."));
+ }
+
+ let sender_servername = body
+ .sender_servername
+ .as_ref()
+ .expect("server is authenticated");
+
+ debug!("Got backfill request from: {}", sender_servername);
+
+ if !services()
+ .rooms
+ .state_cache
+ .server_in_room(sender_servername, &body.room_id)?
+ {
+ return Err(Error::BadRequest(
+ ErrorKind::Forbidden,
+ "Server is not in room.",
+ ));
+ }
+
+ services()
+ .rooms
+ .event_handler
+ .acl_check(sender_servername, &body.room_id)?;
+
+ let until = body
+ .v
+ .iter()
+ .map(|eventid| services().rooms.timeline.get_pdu_count(eventid))
+ .filter_map(|r| r.ok().flatten())
+ .max()
+ .ok_or(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "No known eventid in v",
+ ))?;
+
+ let limit = body.limit.min(uint!(100));
+
+ let all_events = services()
+ .rooms
+ .timeline
+ .pdus_until(&user_id!("@doesntmatter:conduit.rs"), &body.room_id, until)?
+ .take(limit.try_into().unwrap());
+
+ let events = all_events
+ .filter_map(|r| r.ok())
+ .filter(|(_, e)| {
+ matches!(
+ services().rooms.state_accessor.server_can_see_event(
+ sender_servername,
+ &e.room_id,
+ &e.event_id,
+ ),
+ Ok(true),
+ )
+ })
+ .map(|(_, pdu)| services().rooms.timeline.get_pdu_json(&pdu.event_id))
+ .filter_map(|r| r.ok().flatten())
+ .map(|pdu| PduEvent::convert_to_outgoing_federation_event(pdu))
+ .collect();
+
+ Ok(get_backfill::v1::Response {
+ origin: services().globals.server_name().to_owned(),
+ origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
+ pdus: events,
+ })
+}
+
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
///
/// Retrieves events that the sender is missing.
@@ -1010,6 +1133,16 @@ pub async fn get_missing_events_route(
i += 1;
continue;
}
+
+ if !services().rooms.state_accessor.server_can_see_event(
+ sender_servername,
+ &body.room_id,
+ &queued_events[i],
+ )? {
+ i += 1;
+ continue;
+ }
+
queued_events.extend_from_slice(
&serde_json::from_value::<Vec<OwnedEventId>>(
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| {
@@ -1064,7 +1197,10 @@ pub async fn get_event_authorization_route(
.rooms
.timeline
.get_pdu_json(&body.event_id)?
- .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
+ .ok_or_else(|| {
+ warn!("Event not found, event ID: {:?}", &body.event_id);
+ Error::BadRequest(ErrorKind::NotFound, "Event not found.")
+ })?;
let room_id_str = event
.get("room_id")
@@ -1320,7 +1456,7 @@ pub async fn create_join_event_template_route(
let (_pdu, mut pdu_json) = services().rooms.timeline.create_hash_and_sign_event(
PduBuilder {
- event_type: RoomEventType::RoomMember,
+ event_type: TimelineEventType::RoomMember,
content,
unsigned: None,
state_key: Some(body.user_id.to_string()),
@@ -1345,7 +1481,7 @@ async fn create_join_event(
sender_servername: &ServerName,
room_id: &RoomId,
pdu: &RawJsonValue,
-) -> Result<RoomState> {
+) -> Result<create_join_event::v1::RoomState> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
@@ -1467,7 +1603,7 @@ async fn create_join_event(
services().sending.send_pdu(servers, &pdu_id)?;
- Ok(RoomState {
+ Ok(create_join_event::v1::RoomState {
auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
@@ -1508,7 +1644,18 @@ pub async fn create_join_event_v2_route(
.as_ref()
.expect("server is authenticated");
- let room_state = create_join_event(sender_servername, &body.room_id, &body.pdu).await?;
+ let create_join_event::v1::RoomState {
+ auth_chain,
+ state,
+ event,
+ } = create_join_event(sender_servername, &body.room_id, &body.pdu).await?;
+ let room_state = create_join_event::v2::RoomState {
+ members_omitted: false,
+ auth_chain,
+ state,
+ event,
+ servers_in_room: None,
+ };
Ok(create_join_event::v2::Response { room_state })
}
@@ -1668,12 +1815,14 @@ pub async fn get_devices_route(
})
})
.collect(),
- master_key: services()
- .users
- .get_master_key(&body.user_id, &|u| u.server_name() == sender_servername)?,
+ master_key: services().users.get_master_key(None, &body.user_id, &|u| {
+ u.server_name() == sender_servername
+ })?,
self_signing_key: services()
.users
- .get_self_signing_key(&body.user_id, &|u| u.server_name() == sender_servername)?,
+ .get_self_signing_key(None, &body.user_id, &|u| {
+ u.server_name() == sender_servername
+ })?,
})
}