summaryrefslogtreecommitdiff
path: root/src/service/rooms/event_handler/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/rooms/event_handler/mod.rs')
-rw-r--r--src/service/rooms/event_handler/mod.rs454
1 files changed, 218 insertions, 236 deletions
diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs
index bc67f7a..899f035 100644
--- a/src/service/rooms/event_handler/mod.rs
+++ b/src/service/rooms/event_handler/mod.rs
@@ -38,6 +38,8 @@ use tracing::{debug, error, info, trace, warn};
use crate::{service::*, services, Error, PduEvent, Result};
+use super::state_compressor::CompressedStateEvent;
+
pub struct Service;
impl Service {
@@ -62,9 +64,8 @@ impl Service {
/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by
/// doing state res where one of the inputs was a previously trusted set of state, don't just
/// trust a set of state we got from a remote)
- /// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail"
- /// it
- /// 14. Use state resolution to find new room state
+ /// 13. Use state resolution to find new room state
+ /// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
#[tracing::instrument(skip(self, value, is_timeline_event, pub_key_map))]
pub(crate) async fn handle_incoming_pdu<'a>(
@@ -118,6 +119,7 @@ impl Service {
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, &create_event, event_id, room_id, value, pub_key_map)
.await?;
+ self.check_room_id(room_id, &incoming_pdu)?;
// 8. if not timeline event: stop
if !is_timeline_event {
@@ -304,7 +306,7 @@ impl Service {
) {
Err(e) => {
// Drop
- warn!("Dropping bad event {}: {}", event_id, e);
+ warn!("Dropping bad event {}: {}", event_id, e,);
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Signature verification failed",
@@ -337,6 +339,8 @@ impl Service {
)
.map_err(|_| Error::bad_database("Event is not a valid PDU."))?;
+ self.check_room_id(room_id, &incoming_pdu)?;
+
// 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// NOTE: Step 5 is not applied anymore because it failed too often
@@ -356,7 +360,7 @@ impl Service {
.await;
// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
- info!(
+ debug!(
"Auth check for {} based on auth events",
incoming_pdu.event_id
);
@@ -372,6 +376,8 @@ impl Service {
}
};
+ self.check_room_id(room_id, &auth_event)?;
+
match auth_events.entry((
auth_event.kind.to_string().into(),
auth_event
@@ -392,11 +398,12 @@ impl Service {
}
// The original create event must be in the auth events
- if auth_events
- .get(&(StateEventType::RoomCreate, "".to_owned()))
- .map(|a| a.as_ref())
- != Some(create_event)
- {
+ if !matches!(
+ auth_events
+ .get(&(StateEventType::RoomCreate, "".to_owned()))
+ .map(|a| a.as_ref()),
+ Some(_) | None
+ ) {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Incoming event refers to wrong create event.",
@@ -417,7 +424,7 @@ impl Service {
));
}
- info!("Validation successful.");
+ debug!("Validation successful.");
// 7. Persist the event as an outlier.
services()
@@ -425,7 +432,7 @@ impl Service {
.outlier
.add_pdu_outlier(&incoming_pdu.event_id, &val)?;
- info!("Added pdu as outlier.");
+ debug!("Added pdu as outlier.");
Ok((Arc::new(incoming_pdu), val))
})
@@ -474,7 +481,7 @@ impl Service {
// TODO: if we know the prev_events of the incoming event we can avoid the request and build
// the state from a known point and resolve if > 1 prev_event
- info!("Requesting state at event");
+ debug!("Requesting state at event");
let mut state_at_incoming_event = None;
if incoming_pdu.prev_events.len() == 1 {
@@ -497,7 +504,7 @@ impl Service {
};
if let Some(Ok(mut state)) = state {
- info!("Using cached state");
+ debug!("Using cached state");
let prev_pdu = services()
.rooms
.timeline
@@ -521,7 +528,7 @@ impl Service {
state_at_incoming_event = Some(state);
}
} else {
- info!("Calculating state at event using state res");
+ debug!("Calculating state at event using state res");
let mut extremity_sstatehashes = HashMap::new();
let mut okay = true;
@@ -630,7 +637,7 @@ impl Service {
}
if state_at_incoming_event.is_none() {
- info!("Calling /state_ids");
+ debug!("Calling /state_ids");
// Call /state_ids to find out what the state at this pdu is. We trust the server's
// response to some extend, but we still do a lot of checks on the events
match services()
@@ -645,7 +652,7 @@ impl Service {
.await
{
Ok(res) => {
- info!("Fetching state events at event.");
+ debug!("Fetching state events at event.");
let state_vec = self
.fetch_and_handle_outliers(
origin,
@@ -708,7 +715,7 @@ impl Service {
let state_at_incoming_event =
state_at_incoming_event.expect("we always set this to some above");
- info!("Starting auth check");
+ debug!("Starting auth check");
// 11. Check the auth of the event passes based on the state of the event
let check_result = state_res::event_auth::auth_check(
&room_version,
@@ -732,10 +739,28 @@ impl Service {
"Event has failed auth check with state at the event.",
));
}
- info!("Auth check succeeded");
+ debug!("Auth check succeeded");
- // We start looking at current room state now, so lets lock the room
+ // Soft fail check before doing state res
+ let auth_events = services().rooms.state.get_auth_events(
+ room_id,
+ &incoming_pdu.kind,
+ &incoming_pdu.sender,
+ incoming_pdu.state_key.as_deref(),
+ &incoming_pdu.content,
+ )?;
+ let soft_fail = !state_res::event_auth::auth_check(
+ &room_version,
+ &incoming_pdu,
+ None::<PduEvent>,
+ |k, s| auth_events.get(&(k.clone(), s.to_owned())),
+ )
+ .map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed."))?;
+
+ // 13. Use state resolution to find new room state
+
+ // We start looking at current room state now, so lets lock the room
let mutex_state = Arc::clone(
services()
.globals
@@ -749,7 +774,7 @@ impl Service {
// Now we calculate the set of extremities this room has after the incoming event has been
// applied. We start with the previous extremities (aka leaves)
- info!("Calculating extremities");
+ debug!("Calculating extremities");
let mut extremities = services().rooms.state.get_forward_extremities(room_id)?;
// Remove any forward extremities that are referenced by this incoming event's prev_events
@@ -770,35 +795,54 @@ impl Service {
)
});
- info!("Compressing state at event");
- let state_ids_compressed = state_at_incoming_event
- .iter()
- .map(|(shortstatekey, id)| {
- services()
- .rooms
- .state_compressor
- .compress_state_event(*shortstatekey, id)
- })
- .collect::<Result<_>>()?;
+ debug!("Compressing state at event");
+ let state_ids_compressed = Arc::new(
+ state_at_incoming_event
+ .iter()
+ .map(|(shortstatekey, id)| {
+ services()
+ .rooms
+ .state_compressor
+ .compress_state_event(*shortstatekey, id)
+ })
+ .collect::<Result<_>>()?,
+ );
- // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
- info!("Starting soft fail auth check");
+ if incoming_pdu.state_key.is_some() {
+ debug!("Preparing for stateres to derive new room state");
- let auth_events = services().rooms.state.get_auth_events(
- room_id,
- &incoming_pdu.kind,
- &incoming_pdu.sender,
- incoming_pdu.state_key.as_deref(),
- &incoming_pdu.content,
- )?;
+ // We also add state after incoming event to the fork states
+ let mut state_after = state_at_incoming_event.clone();
+ if let Some(state_key) = &incoming_pdu.state_key {
+ let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
+ &incoming_pdu.kind.to_string().into(),
+ state_key,
+ )?;
- let soft_fail = !state_res::event_auth::auth_check(
- &room_version,
- &incoming_pdu,
- None::<PduEvent>,
- |k, s| auth_events.get(&(k.clone(), s.to_owned())),
- )
- .map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed."))?;
+ state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
+ }
+
+ let new_room_state = self
+ .resolve_state(room_id, room_version_id, state_after)
+ .await?;
+
+ // Set the new room state to the resolved state
+ debug!("Forcing new room state");
+
+ let (sstatehash, new, removed) = services()
+ .rooms
+ .state_compressor
+ .save_state(room_id, new_room_state)?;
+
+ services()
+ .rooms
+ .state
+ .force_state(room_id, sstatehash, new, removed, &state_lock)
+ .await?;
+ }
+
+ // 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
+ debug!("Starting soft fail auth check");
if soft_fail {
services().rooms.timeline.append_incoming_pdu(
@@ -822,202 +866,115 @@ impl Service {
));
}
- if incoming_pdu.state_key.is_some() {
- info!("Loading current room state ids");
- let current_sstatehash = services()
- .rooms
- .state
- .get_room_shortstatehash(room_id)?
- .expect("every room has state");
+ debug!("Appending pdu to timeline");
+ extremities.insert(incoming_pdu.event_id.clone());
- let current_state_ids = services()
- .rooms
- .state_accessor
- .state_full_ids(current_sstatehash)
- .await?;
+ // Now that the event has passed all auth it is added into the timeline.
+ // We use the `state_at_event` instead of `state_after` so we accurately
+ // represent the state for this event.
- info!("Preparing for stateres to derive new room state");
- let mut extremity_sstatehashes = HashMap::new();
+ let pdu_id = services().rooms.timeline.append_incoming_pdu(
+ &incoming_pdu,
+ val,
+ extremities.iter().map(|e| (**e).to_owned()).collect(),
+ state_ids_compressed,
+ soft_fail,
+ &state_lock,
+ )?;
- info!(?extremities, "Loading extremities");
- for id in &extremities {
- match services().rooms.timeline.get_pdu(id)? {
- Some(leaf_pdu) => {
- extremity_sstatehashes.insert(
- services()
- .rooms
- .state_accessor
- .pdu_shortstatehash(&leaf_pdu.event_id)?
- .ok_or_else(|| {
- error!(
- "Found extremity pdu with no statehash in db: {:?}",
- leaf_pdu
- );
- Error::bad_database("Found pdu with no statehash in db.")
- })?,
- leaf_pdu,
- );
- }
- _ => {
- error!("Missing state snapshot for {:?}", id);
- return Err(Error::BadDatabase("Missing state snapshot."));
- }
- }
- }
+ debug!("Appended incoming pdu");
- let mut fork_states = Vec::new();
+ // Event has passed all auth/stateres checks
+ drop(state_lock);
+ Ok(pdu_id)
+ }
- // 12. Ensure that the state is derived from the previous current state (i.e. we calculated
- // by doing state res where one of the inputs was a previously trusted set of state,
- // don't just trust a set of state we got from a remote).
+ async fn resolve_state(
+ &self,
+ room_id: &RoomId,
+ room_version_id: &RoomVersionId,
+ incoming_state: HashMap<u64, Arc<EventId>>,
+ ) -> Result<Arc<HashSet<CompressedStateEvent>>> {
+ debug!("Loading current room state ids");
+ let current_sstatehash = services()
+ .rooms
+ .state
+ .get_room_shortstatehash(room_id)?
+ .expect("every room has state");
+
+ let current_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(current_sstatehash)
+ .await?;
- // We do this by adding the current state to the list of fork states
- extremity_sstatehashes.remove(&current_sstatehash);
- fork_states.push(current_state_ids);
+ let fork_states = [current_state_ids, incoming_state];
- // We also add state after incoming event to the fork states
- let mut state_after = state_at_incoming_event.clone();
- if let Some(state_key) = &incoming_pdu.state_key {
- let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
- &incoming_pdu.kind.to_string().into(),
- state_key,
- )?;
+ let mut auth_chain_sets = Vec::new();
+ for state in &fork_states {
+ auth_chain_sets.push(
+ services()
+ .rooms
+ .auth_chain
+ .get_auth_chain(room_id, state.iter().map(|(_, id)| id.clone()).collect())
+ .await?
+ .collect(),
+ );
+ }
- state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
- }
- fork_states.push(state_after);
-
- let mut update_state = false;
- // 14. Use state resolution to find new room state
- let new_room_state = if fork_states.is_empty() {
- panic!("State is empty");
- } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) {
- info!("State resolution trivial");
- // There was only one state, so it has to be the room's current state (because that is
- // always included)
- fork_states[0]
- .iter()
- .map(|(k, id)| {
- services()
- .rooms
- .state_compressor
- .compress_state_event(*k, id)
- })
- .collect::<Result<_>>()?
- } else {
- info!("Loading auth chains");
- // We do need to force an update to this room's state
- update_state = true;
+ debug!("Loading fork states");
- let mut auth_chain_sets = Vec::new();
- for state in &fork_states {
- auth_chain_sets.push(
+ let fork_states: Vec<_> = fork_states
+ .into_iter()
+ .map(|map| {
+ map.into_iter()
+ .filter_map(|(k, id)| {
services()
.rooms
- .auth_chain
- .get_auth_chain(
- room_id,
- state.iter().map(|(_, id)| id.clone()).collect(),
- )
- .await?
- .collect(),
- );
- }
-
- info!("Loading fork states");
-
- let fork_states: Vec<_> = fork_states
- .into_iter()
- .map(|map| {
- map.into_iter()
- .filter_map(|(k, id)| {
- services()
- .rooms
- .short
- .get_statekey_from_short(k)
- .map(|(ty, st_key)| ((ty.to_string().into(), st_key), id))
- .ok()
- })
- .collect::<StateMap<_>>()
+ .short
+ .get_statekey_from_short(k)
+ .map(|(ty, st_key)| ((ty.to_string().into(), st_key), id))
+ .ok()
})
- .collect();
-
- info!("Resolving state");
+ .collect::<StateMap<_>>()
+ })
+ .collect();
- let lock = services().globals.stateres_mutex.lock();
- let state = match state_res::resolve(
- room_version_id,
- &fork_states,
- auth_chain_sets,
- |id| {
- let res = services().rooms.timeline.get_pdu(id);
- if let Err(e) = &res {
- error!("LOOK AT ME Failed to fetch event: {}", e);
- }
- res.ok().flatten()
- },
- ) {
- Ok(new_state) => new_state,
- Err(_) => {
- return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization"));
- }
- };
+ debug!("Resolving state");
- drop(lock);
+ let lock = services().globals.stateres_mutex.lock();
+ let state = match state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| {
+ let res = services().rooms.timeline.get_pdu(id);
+ if let Err(e) = &res {
+ error!("LOOK AT ME Failed to fetch event: {}", e);
+ }
+ res.ok().flatten()
+ }) {
+ Ok(new_state) => new_state,
+ Err(_) => {
+ return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization"));
+ }
+ };
- info!("State resolution done. Compressing state");
+ drop(lock);
- state
- .into_iter()
- .map(|((event_type, state_key), event_id)| {
- let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
- &event_type.to_string().into(),
- &state_key,
- )?;
- services()
- .rooms
- .state_compressor
- .compress_state_event(shortstatekey, &event_id)
- })
- .collect::<Result<_>>()?
- };
+ debug!("State resolution done. Compressing state");
- // Set the new room state to the resolved state
- if update_state {
- info!("Forcing new room state");
- let (sstatehash, new, removed) = services()
+ let new_room_state = state
+ .into_iter()
+ .map(|((event_type, state_key), event_id)| {
+ let shortstatekey = services()
.rooms
- .state_compressor
- .save_state(room_id, new_room_state)?;
+ .short
+ .get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?;
services()
.rooms
- .state
- .force_state(room_id, sstatehash, new, removed, &state_lock)
- .await?;
- }
- }
-
- info!("Appending pdu to timeline");
- extremities.insert(incoming_pdu.event_id.clone());
-
- // Now that the event has passed all auth it is added into the timeline.
- // We use the `state_at_event` instead of `state_after` so we accurately
- // represent the state for this event.
-
- let pdu_id = services().rooms.timeline.append_incoming_pdu(
- &incoming_pdu,
- val,
- extremities.iter().map(|e| (**e).to_owned()).collect(),
- state_ids_compressed,
- soft_fail,
- &state_lock,
- )?;
-
- info!("Appended incoming pdu");
+ .state_compressor
+ .compress_state_event(shortstatekey, &event_id)
+ })
+ .collect::<Result<_>>()?;
- // Event has passed all auth/stateres checks
- drop(state_lock);
- Ok(pdu_id)
+ Ok(Arc::new(new_room_state))
}
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
@@ -1226,6 +1183,8 @@ impl Service {
.await
.pop()
{
+ self.check_room_id(room_id, &pdu)?;
+
if amount > services().globals.max_fetch_prev_events() {
// Max limit reached
warn!("Max prev event limit reached!");
@@ -1459,12 +1418,12 @@ impl Service {
}
if servers.is_empty() {
- // We had all keys locally
+ info!("We had all keys locally");
return Ok(());
}
for server in services().globals.trusted_servers() {
- trace!("Asking batch signing keys from trusted server {}", server);
+ info!("Asking batch signing keys from trusted server {}", server);
if let Ok(keys) = services()
.sending
.send_federation_request(
@@ -1507,10 +1466,12 @@ impl Service {
}
if servers.is_empty() {
+ info!("Trusted server supplied all signing keys");
return Ok(());
}
}
+ info!("Asking individual servers for signing keys: {servers:?}");
let mut futures: FuturesUnordered<_> = servers
.into_keys()
.map(|server| async move {
@@ -1525,21 +1486,27 @@ impl Service {
.collect();
while let Some(result) = futures.next().await {
+ info!("Received new result");
if let (Ok(get_keys_response), origin) = result {
- let result: BTreeMap<_, _> = services()
- .globals
- .add_signing_key(&origin, get_keys_response.server_key.deserialize().unwrap())?
- .into_iter()
- .map(|(k, v)| (k.to_string(), v.key))
- .collect();
-
- pub_key_map
- .write()
- .map_err(|_| Error::bad_database("RwLock is poisoned."))?
- .insert(origin.to_string(), result);
+ info!("Result is from {origin}");
+ if let Ok(key) = get_keys_response.server_key.deserialize() {
+ let result: BTreeMap<_, _> = services()
+ .globals
+ .add_signing_key(&origin, key)?
+ .into_iter()
+ .map(|(k, v)| (k.to_string(), v.key))
+ .collect();
+ pub_key_map
+ .write()
+ .map_err(|_| Error::bad_database("RwLock is poisoned."))?
+ .insert(origin.to_string(), result);
+ }
}
+ info!("Done handling result");
}
+ info!("Search for signing keys done");
+
Ok(())
}
@@ -1566,9 +1533,13 @@ impl Service {
if acl_event_content.is_allowed(server_name) {
Ok(())
} else {
+ info!(
+ "Server {} was denied by room ACL in {}",
+ server_name, room_id
+ );
Err(Error::BadRequest(
ErrorKind::Forbidden,
- "Server was denied by ACL",
+ "Server was denied by room ACL",
))
}
}
@@ -1738,4 +1709,15 @@ impl Service {
"Failed to find public key for server",
))
}
+
+ fn check_room_id(&self, room_id: &RoomId, pdu: &PduEvent) -> Result<()> {
+ if pdu.room_id != room_id {
+ warn!("Found event from room {} in room {}", pdu.room_id, room_id);
+ return Err(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Event has wrong room id",
+ ));
+ }
+ Ok(())
+ }
}