diff options
Diffstat (limited to 'src/service/rooms/event_handler/mod.rs')
-rw-r--r-- | src/service/rooms/event_handler/mod.rs | 454 |
1 files changed, 218 insertions, 236 deletions
diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index bc67f7a..899f035 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -38,6 +38,8 @@ use tracing::{debug, error, info, trace, warn}; use crate::{service::*, services, Error, PduEvent, Result}; +use super::state_compressor::CompressedStateEvent; + pub struct Service; impl Service { @@ -62,9 +64,8 @@ impl Service { /// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by /// doing state res where one of the inputs was a previously trusted set of state, don't just /// trust a set of state we got from a remote) - /// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" - /// it - /// 14. Use state resolution to find new room state + /// 13. Use state resolution to find new room state + /// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it // We use some AsyncRecursiveType hacks here so we can call this async funtion recursively #[tracing::instrument(skip(self, value, is_timeline_event, pub_key_map))] pub(crate) async fn handle_incoming_pdu<'a>( @@ -118,6 +119,7 @@ impl Service { let (incoming_pdu, val) = self .handle_outlier_pdu(origin, &create_event, event_id, room_id, value, pub_key_map) .await?; + self.check_room_id(room_id, &incoming_pdu)?; // 8. if not timeline event: stop if !is_timeline_event { @@ -304,7 +306,7 @@ impl Service { ) { Err(e) => { // Drop - warn!("Dropping bad event {}: {}", event_id, e); + warn!("Dropping bad event {}: {}", event_id, e,); return Err(Error::BadRequest( ErrorKind::InvalidParam, "Signature verification failed", @@ -337,6 +339,8 @@ impl Service { ) .map_err(|_| Error::bad_database("Event is not a valid PDU."))?; + self.check_room_id(room_id, &incoming_pdu)?; + // 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // NOTE: Step 5 is not applied anymore because it failed too often @@ -356,7 +360,7 @@ impl Service { .await; // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events - info!( + debug!( "Auth check for {} based on auth events", incoming_pdu.event_id ); @@ -372,6 +376,8 @@ impl Service { } }; + self.check_room_id(room_id, &auth_event)?; + match auth_events.entry(( auth_event.kind.to_string().into(), auth_event @@ -392,11 +398,12 @@ impl Service { } // The original create event must be in the auth events - if auth_events - .get(&(StateEventType::RoomCreate, "".to_owned())) - .map(|a| a.as_ref()) - != Some(create_event) - { + if !matches!( + auth_events + .get(&(StateEventType::RoomCreate, "".to_owned())) + .map(|a| a.as_ref()), + Some(_) | None + ) { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Incoming event refers to wrong create event.", @@ -417,7 +424,7 @@ impl Service { )); } - info!("Validation successful."); + debug!("Validation successful."); // 7. Persist the event as an outlier. services() @@ -425,7 +432,7 @@ impl Service { .outlier .add_pdu_outlier(&incoming_pdu.event_id, &val)?; - info!("Added pdu as outlier."); + debug!("Added pdu as outlier."); Ok((Arc::new(incoming_pdu), val)) }) @@ -474,7 +481,7 @@ impl Service { // TODO: if we know the prev_events of the incoming event we can avoid the request and build // the state from a known point and resolve if > 1 prev_event - info!("Requesting state at event"); + debug!("Requesting state at event"); let mut state_at_incoming_event = None; if incoming_pdu.prev_events.len() == 1 { @@ -497,7 +504,7 @@ impl Service { }; if let Some(Ok(mut state)) = state { - info!("Using cached state"); + debug!("Using cached state"); let prev_pdu = services() .rooms .timeline @@ -521,7 +528,7 @@ impl Service { state_at_incoming_event = Some(state); } } else { - info!("Calculating state at event using state res"); + debug!("Calculating state at event using state res"); let mut extremity_sstatehashes = HashMap::new(); let mut okay = true; @@ -630,7 +637,7 @@ impl Service { } if state_at_incoming_event.is_none() { - info!("Calling /state_ids"); + debug!("Calling /state_ids"); // Call /state_ids to find out what the state at this pdu is. We trust the server's // response to some extend, but we still do a lot of checks on the events match services() @@ -645,7 +652,7 @@ impl Service { .await { Ok(res) => { - info!("Fetching state events at event."); + debug!("Fetching state events at event."); let state_vec = self .fetch_and_handle_outliers( origin, @@ -708,7 +715,7 @@ impl Service { let state_at_incoming_event = state_at_incoming_event.expect("we always set this to some above"); - info!("Starting auth check"); + debug!("Starting auth check"); // 11. Check the auth of the event passes based on the state of the event let check_result = state_res::event_auth::auth_check( &room_version, @@ -732,10 +739,28 @@ impl Service { "Event has failed auth check with state at the event.", )); } - info!("Auth check succeeded"); + debug!("Auth check succeeded"); - // We start looking at current room state now, so lets lock the room + // Soft fail check before doing state res + let auth_events = services().rooms.state.get_auth_events( + room_id, + &incoming_pdu.kind, + &incoming_pdu.sender, + incoming_pdu.state_key.as_deref(), + &incoming_pdu.content, + )?; + let soft_fail = !state_res::event_auth::auth_check( + &room_version, + &incoming_pdu, + None::<PduEvent>, + |k, s| auth_events.get(&(k.clone(), s.to_owned())), + ) + .map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed."))?; + + // 13. Use state resolution to find new room state + + // We start looking at current room state now, so lets lock the room let mutex_state = Arc::clone( services() .globals @@ -749,7 +774,7 @@ impl Service { // Now we calculate the set of extremities this room has after the incoming event has been // applied. We start with the previous extremities (aka leaves) - info!("Calculating extremities"); + debug!("Calculating extremities"); let mut extremities = services().rooms.state.get_forward_extremities(room_id)?; // Remove any forward extremities that are referenced by this incoming event's prev_events @@ -770,35 +795,54 @@ impl Service { ) }); - info!("Compressing state at event"); - let state_ids_compressed = state_at_incoming_event - .iter() - .map(|(shortstatekey, id)| { - services() - .rooms - .state_compressor - .compress_state_event(*shortstatekey, id) - }) - .collect::<Result<_>>()?; + debug!("Compressing state at event"); + let state_ids_compressed = Arc::new( + state_at_incoming_event + .iter() + .map(|(shortstatekey, id)| { + services() + .rooms + .state_compressor + .compress_state_event(*shortstatekey, id) + }) + .collect::<Result<_>>()?, + ); - // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it - info!("Starting soft fail auth check"); + if incoming_pdu.state_key.is_some() { + debug!("Preparing for stateres to derive new room state"); - let auth_events = services().rooms.state.get_auth_events( - room_id, - &incoming_pdu.kind, - &incoming_pdu.sender, - incoming_pdu.state_key.as_deref(), - &incoming_pdu.content, - )?; + // We also add state after incoming event to the fork states + let mut state_after = state_at_incoming_event.clone(); + if let Some(state_key) = &incoming_pdu.state_key { + let shortstatekey = services().rooms.short.get_or_create_shortstatekey( + &incoming_pdu.kind.to_string().into(), + state_key, + )?; - let soft_fail = !state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - None::<PduEvent>, - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed."))?; + state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); + } + + let new_room_state = self + .resolve_state(room_id, room_version_id, state_after) + .await?; + + // Set the new room state to the resolved state + debug!("Forcing new room state"); + + let (sstatehash, new, removed) = services() + .rooms + .state_compressor + .save_state(room_id, new_room_state)?; + + services() + .rooms + .state + .force_state(room_id, sstatehash, new, removed, &state_lock) + .await?; + } + + // 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it + debug!("Starting soft fail auth check"); if soft_fail { services().rooms.timeline.append_incoming_pdu( @@ -822,202 +866,115 @@ impl Service { )); } - if incoming_pdu.state_key.is_some() { - info!("Loading current room state ids"); - let current_sstatehash = services() - .rooms - .state - .get_room_shortstatehash(room_id)? - .expect("every room has state"); + debug!("Appending pdu to timeline"); + extremities.insert(incoming_pdu.event_id.clone()); - let current_state_ids = services() - .rooms - .state_accessor - .state_full_ids(current_sstatehash) - .await?; + // Now that the event has passed all auth it is added into the timeline. + // We use the `state_at_event` instead of `state_after` so we accurately + // represent the state for this event. - info!("Preparing for stateres to derive new room state"); - let mut extremity_sstatehashes = HashMap::new(); + let pdu_id = services().rooms.timeline.append_incoming_pdu( + &incoming_pdu, + val, + extremities.iter().map(|e| (**e).to_owned()).collect(), + state_ids_compressed, + soft_fail, + &state_lock, + )?; - info!(?extremities, "Loading extremities"); - for id in &extremities { - match services().rooms.timeline.get_pdu(id)? { - Some(leaf_pdu) => { - extremity_sstatehashes.insert( - services() - .rooms - .state_accessor - .pdu_shortstatehash(&leaf_pdu.event_id)? - .ok_or_else(|| { - error!( - "Found extremity pdu with no statehash in db: {:?}", - leaf_pdu - ); - Error::bad_database("Found pdu with no statehash in db.") - })?, - leaf_pdu, - ); - } - _ => { - error!("Missing state snapshot for {:?}", id); - return Err(Error::BadDatabase("Missing state snapshot.")); - } - } - } + debug!("Appended incoming pdu"); - let mut fork_states = Vec::new(); + // Event has passed all auth/stateres checks + drop(state_lock); + Ok(pdu_id) + } - // 12. Ensure that the state is derived from the previous current state (i.e. we calculated - // by doing state res where one of the inputs was a previously trusted set of state, - // don't just trust a set of state we got from a remote). + async fn resolve_state( + &self, + room_id: &RoomId, + room_version_id: &RoomVersionId, + incoming_state: HashMap<u64, Arc<EventId>>, + ) -> Result<Arc<HashSet<CompressedStateEvent>>> { + debug!("Loading current room state ids"); + let current_sstatehash = services() + .rooms + .state + .get_room_shortstatehash(room_id)? + .expect("every room has state"); + + let current_state_ids = services() + .rooms + .state_accessor + .state_full_ids(current_sstatehash) + .await?; - // We do this by adding the current state to the list of fork states - extremity_sstatehashes.remove(¤t_sstatehash); - fork_states.push(current_state_ids); + let fork_states = [current_state_ids, incoming_state]; - // We also add state after incoming event to the fork states - let mut state_after = state_at_incoming_event.clone(); - if let Some(state_key) = &incoming_pdu.state_key { - let shortstatekey = services().rooms.short.get_or_create_shortstatekey( - &incoming_pdu.kind.to_string().into(), - state_key, - )?; + let mut auth_chain_sets = Vec::new(); + for state in &fork_states { + auth_chain_sets.push( + services() + .rooms + .auth_chain + .get_auth_chain(room_id, state.iter().map(|(_, id)| id.clone()).collect()) + .await? + .collect(), + ); + } - state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); - } - fork_states.push(state_after); - - let mut update_state = false; - // 14. Use state resolution to find new room state - let new_room_state = if fork_states.is_empty() { - panic!("State is empty"); - } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { - info!("State resolution trivial"); - // There was only one state, so it has to be the room's current state (because that is - // always included) - fork_states[0] - .iter() - .map(|(k, id)| { - services() - .rooms - .state_compressor - .compress_state_event(*k, id) - }) - .collect::<Result<_>>()? - } else { - info!("Loading auth chains"); - // We do need to force an update to this room's state - update_state = true; + debug!("Loading fork states"); - let mut auth_chain_sets = Vec::new(); - for state in &fork_states { - auth_chain_sets.push( + let fork_states: Vec<_> = fork_states + .into_iter() + .map(|map| { + map.into_iter() + .filter_map(|(k, id)| { services() .rooms - .auth_chain - .get_auth_chain( - room_id, - state.iter().map(|(_, id)| id.clone()).collect(), - ) - .await? - .collect(), - ); - } - - info!("Loading fork states"); - - let fork_states: Vec<_> = fork_states - .into_iter() - .map(|map| { - map.into_iter() - .filter_map(|(k, id)| { - services() - .rooms - .short - .get_statekey_from_short(k) - .map(|(ty, st_key)| ((ty.to_string().into(), st_key), id)) - .ok() - }) - .collect::<StateMap<_>>() + .short + .get_statekey_from_short(k) + .map(|(ty, st_key)| ((ty.to_string().into(), st_key), id)) + .ok() }) - .collect(); - - info!("Resolving state"); + .collect::<StateMap<_>>() + }) + .collect(); - let lock = services().globals.stateres_mutex.lock(); - let state = match state_res::resolve( - room_version_id, - &fork_states, - auth_chain_sets, - |id| { - let res = services().rooms.timeline.get_pdu(id); - if let Err(e) = &res { - error!("LOOK AT ME Failed to fetch event: {}", e); - } - res.ok().flatten() - }, - ) { - Ok(new_state) => new_state, - Err(_) => { - return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization")); - } - }; + debug!("Resolving state"); - drop(lock); + let lock = services().globals.stateres_mutex.lock(); + let state = match state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { + let res = services().rooms.timeline.get_pdu(id); + if let Err(e) = &res { + error!("LOOK AT ME Failed to fetch event: {}", e); + } + res.ok().flatten() + }) { + Ok(new_state) => new_state, + Err(_) => { + return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization")); + } + }; - info!("State resolution done. Compressing state"); + drop(lock); - state - .into_iter() - .map(|((event_type, state_key), event_id)| { - let shortstatekey = services().rooms.short.get_or_create_shortstatekey( - &event_type.to_string().into(), - &state_key, - )?; - services() - .rooms - .state_compressor - .compress_state_event(shortstatekey, &event_id) - }) - .collect::<Result<_>>()? - }; + debug!("State resolution done. Compressing state"); - // Set the new room state to the resolved state - if update_state { - info!("Forcing new room state"); - let (sstatehash, new, removed) = services() + let new_room_state = state + .into_iter() + .map(|((event_type, state_key), event_id)| { + let shortstatekey = services() .rooms - .state_compressor - .save_state(room_id, new_room_state)?; + .short + .get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?; services() .rooms - .state - .force_state(room_id, sstatehash, new, removed, &state_lock) - .await?; - } - } - - info!("Appending pdu to timeline"); - extremities.insert(incoming_pdu.event_id.clone()); - - // Now that the event has passed all auth it is added into the timeline. - // We use the `state_at_event` instead of `state_after` so we accurately - // represent the state for this event. - - let pdu_id = services().rooms.timeline.append_incoming_pdu( - &incoming_pdu, - val, - extremities.iter().map(|e| (**e).to_owned()).collect(), - state_ids_compressed, - soft_fail, - &state_lock, - )?; - - info!("Appended incoming pdu"); + .state_compressor + .compress_state_event(shortstatekey, &event_id) + }) + .collect::<Result<_>>()?; - // Event has passed all auth/stateres checks - drop(state_lock); - Ok(pdu_id) + Ok(Arc::new(new_room_state)) } /// Find the event and auth it. Once the event is validated (steps 1 - 8) @@ -1226,6 +1183,8 @@ impl Service { .await .pop() { + self.check_room_id(room_id, &pdu)?; + if amount > services().globals.max_fetch_prev_events() { // Max limit reached warn!("Max prev event limit reached!"); @@ -1459,12 +1418,12 @@ impl Service { } if servers.is_empty() { - // We had all keys locally + info!("We had all keys locally"); return Ok(()); } for server in services().globals.trusted_servers() { - trace!("Asking batch signing keys from trusted server {}", server); + info!("Asking batch signing keys from trusted server {}", server); if let Ok(keys) = services() .sending .send_federation_request( @@ -1507,10 +1466,12 @@ impl Service { } if servers.is_empty() { + info!("Trusted server supplied all signing keys"); return Ok(()); } } + info!("Asking individual servers for signing keys: {servers:?}"); let mut futures: FuturesUnordered<_> = servers .into_keys() .map(|server| async move { @@ -1525,21 +1486,27 @@ impl Service { .collect(); while let Some(result) = futures.next().await { + info!("Received new result"); if let (Ok(get_keys_response), origin) = result { - let result: BTreeMap<_, _> = services() - .globals - .add_signing_key(&origin, get_keys_response.server_key.deserialize().unwrap())? - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect(); - - pub_key_map - .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))? - .insert(origin.to_string(), result); + info!("Result is from {origin}"); + if let Ok(key) = get_keys_response.server_key.deserialize() { + let result: BTreeMap<_, _> = services() + .globals + .add_signing_key(&origin, key)? + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect(); + pub_key_map + .write() + .map_err(|_| Error::bad_database("RwLock is poisoned."))? + .insert(origin.to_string(), result); + } } + info!("Done handling result"); } + info!("Search for signing keys done"); + Ok(()) } @@ -1566,9 +1533,13 @@ impl Service { if acl_event_content.is_allowed(server_name) { Ok(()) } else { + info!( + "Server {} was denied by room ACL in {}", + server_name, room_id + ); Err(Error::BadRequest( ErrorKind::Forbidden, - "Server was denied by ACL", + "Server was denied by room ACL", )) } } @@ -1738,4 +1709,15 @@ impl Service { "Failed to find public key for server", )) } + + fn check_room_id(&self, room_id: &RoomId, pdu: &PduEvent) -> Result<()> { + if pdu.room_id != room_id { + warn!("Found event from room {} in room {}", pdu.room_id, room_id); + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Event has wrong room id", + )); + } + Ok(()) + } } |