diff options
author | Timo Kösters <timo@koesters.xyz> | 2022-06-20 11:31:27 +0200 |
---|---|---|
committer | Nyaaori <+@nyaaori.cat> | 2022-08-15 17:12:22 +0200 |
commit | dcdbcc08519643b7501b1e5695a9f211703f8b41 (patch) | |
tree | 3c59e7c862e45b72937287188141ed8b094659df /src/api | |
parent | 1b0477d569714ec3187d03d2681a9b6dfb785c39 (diff) | |
download | conduit-dcdbcc08519643b7501b1e5695a9f211703f8b41.zip |
refactor: event handling code
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/client_server/membership.rs | 123 | ||||
-rw-r--r-- | src/api/server_server.rs | 1356 |
2 files changed, 2 insertions, 1477 deletions
diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 4dda11a..ecd26d1 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -806,36 +806,6 @@ pub(crate) async fn invite_helper<'a>( ); let state_lock = mutex_state.lock().await; - let prev_events: Vec<_> = db - .rooms - .get_pdu_leaves(room_id)? - .into_iter() - .take(20) - .collect(); - - let create_event = db - .rooms - .room_state_get(room_id, &StateEventType::RoomCreate, "")?; - - let create_event_content: Option<RoomCreateEventContent> = create_event - .as_ref() - .map(|create_event| { - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - Error::bad_database("Invalid create event in db.") - }) - }) - .transpose()?; - - // If there was no create event yet, assume we are creating a room with the default - // version right now - let room_version_id = create_event_content - .map_or(db.globals.default_room_version(), |create_event| { - create_event.room_version - }); - let room_version = - RoomVersion::new(&room_version_id).expect("room version is supported"); - let content = to_raw_value(&RoomMemberEventContent { avatar_url: None, displayname: None, @@ -851,98 +821,7 @@ pub(crate) async fn invite_helper<'a>( let state_key = user_id.to_string(); let kind = StateEventType::RoomMember; - let auth_events = db.rooms.get_auth_events( - room_id, - &kind.to_string().into(), - sender_user, - Some(&state_key), - &content, - )?; - - // Our depth is the maximum depth of prev_events + 1 - let depth = prev_events - .iter() - .filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth)) - .max() - .unwrap_or_else(|| uint!(0)) - + uint!(1); - - let mut unsigned = BTreeMap::new(); - - if let Some(prev_pdu) = db.rooms.room_state_get(room_id, &kind, &state_key)? { - unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone()); - unsigned.insert( - "prev_sender".to_owned(), - to_raw_value(&prev_pdu.sender).expect("UserId is valid"), - ); - } - - let pdu = PduEvent { - event_id: ruma::event_id!("$thiswillbefilledinlater").into(), - room_id: room_id.to_owned(), - sender: sender_user.to_owned(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - kind: kind.to_string().into(), - content, - state_key: Some(state_key), - prev_events, - depth, - auth_events: auth_events - .iter() - .map(|(_, pdu)| pdu.event_id.clone()) - .collect(), - redacts: None, - unsigned: if unsigned.is_empty() { - None - } else { - Some(to_raw_value(&unsigned).expect("to_raw_value always works")) - }, - hashes: EventHash { - sha256: "aaa".to_owned(), - }, - signatures: None, - }; - - let auth_check = state_res::auth_check( - &room_version, - &pdu, - None::<PduEvent>, // TODO: third_party_invite - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|e| { - error!("{:?}", e); - Error::bad_database("Auth check failed.") - })?; - - if !auth_check { - return Err(Error::BadRequest( - ErrorKind::Forbidden, - "Event is not authorized.", - )); - } - - // Hash and sign - let mut pdu_json = - utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); - - pdu_json.remove("event_id"); - - // Add origin because synapse likes that (and it's required in the spec) - pdu_json.insert( - "origin".to_owned(), - to_canonical_value(db.globals.server_name()) - .expect("server name is a valid CanonicalJsonValue"), - ); - - ruma::signatures::hash_and_sign_event( - db.globals.server_name().as_str(), - db.globals.keypair(), - &mut pdu_json, - &room_version_id, - ) - .expect("event is valid, we just created it"); + let (pdu, pdu_json) = create_hash_and_sign_event(); let invite_room_state = db.rooms.calculate_invite_state(&pdu)?; diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 6fa83e4..f60f735 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -882,1163 +882,6 @@ pub async fn send_transaction_message_route( Ok(send_transaction_message::v1::Response { pdus: resolved_map }) } -/// An async function that can recursively call itself. -type AsyncRecursiveType<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>; - -/// When receiving an event one needs to: -/// 0. Check the server is in the room -/// 1. Skip the PDU if we already know about it -/// 2. Check signatures, otherwise drop -/// 3. Check content hash, redact if doesn't match -/// 4. Fetch any missing auth events doing all checks listed here starting at 1. These are not -/// timeline events -/// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are -/// also rejected "due to auth events" -/// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events -/// 7. Persist this event as an outlier -/// 8. If not timeline event: stop -/// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline -/// events -/// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities -/// doing all the checks in this list starting at 1. These are not timeline events -/// 11. Check the auth of the event passes based on the state of the event -/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by -/// doing state res where one of the inputs was a previously trusted set of state, don't just -/// trust a set of state we got from a remote) -/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" -/// it -/// 14. Use state resolution to find new room state -// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively -#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] -pub(crate) async fn handle_incoming_pdu<'a>( - origin: &'a ServerName, - event_id: &'a EventId, - room_id: &'a RoomId, - value: BTreeMap<String, CanonicalJsonValue>, - is_timeline_event: bool, - db: &'a Database, - pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, -) -> Result<Option<Vec<u8>>, String> { - match db.rooms.exists(room_id) { - Ok(true) => {} - _ => { - return Err("Room is unknown to this server.".to_owned()); - } - } - - match db.rooms.is_disabled(room_id) { - Ok(false) => {} - _ => { - return Err("Federation of this room is currently disabled on this server.".to_owned()); - } - } - - // 1. Skip the PDU if we already have it as a timeline event - if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(event_id) { - return Ok(Some(pdu_id.to_vec())); - } - - let create_event = db - .rooms - .room_state_get(room_id, &StateEventType::RoomCreate, "") - .map_err(|_| "Failed to ask database for event.".to_owned())? - .ok_or_else(|| "Failed to find create event in db.".to_owned())?; - - let first_pdu_in_room = db - .rooms - .first_pdu_in_room(room_id) - .map_err(|_| "Error loading first room event.".to_owned())? - .expect("Room exists"); - - let (incoming_pdu, val) = handle_outlier_pdu( - origin, - &create_event, - event_id, - room_id, - value, - db, - pub_key_map, - ) - .await?; - - // 8. if not timeline event: stop - if !is_timeline_event { - return Ok(None); - } - - if incoming_pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { - return Ok(None); - } - - // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - let mut graph: HashMap<Arc<EventId>, _> = HashMap::new(); - let mut eventid_info = HashMap::new(); - let mut todo_outlier_stack: Vec<Arc<EventId>> = incoming_pdu.prev_events.clone(); - - let mut amount = 0; - - while let Some(prev_event_id) = todo_outlier_stack.pop() { - if let Some((pdu, json_opt)) = fetch_and_handle_outliers( - db, - origin, - &[prev_event_id.clone()], - &create_event, - room_id, - pub_key_map, - ) - .await - .pop() - { - if amount > 100 { - // Max limit reached - warn!("Max prev event limit reached!"); - graph.insert(prev_event_id.clone(), HashSet::new()); - continue; - } - - if let Some(json) = - json_opt.or_else(|| db.rooms.get_outlier_pdu_json(&prev_event_id).ok().flatten()) - { - if pdu.origin_server_ts > first_pdu_in_room.origin_server_ts { - amount += 1; - for prev_prev in &pdu.prev_events { - if !graph.contains_key(prev_prev) { - todo_outlier_stack.push(dbg!(prev_prev.clone())); - } - } - - graph.insert( - prev_event_id.clone(), - pdu.prev_events.iter().cloned().collect(), - ); - } else { - // Time based check failed - graph.insert(prev_event_id.clone(), HashSet::new()); - } - - eventid_info.insert(prev_event_id.clone(), (pdu, json)); - } else { - // Get json failed - graph.insert(prev_event_id.clone(), HashSet::new()); - } - } else { - // Fetch and handle failed - graph.insert(prev_event_id.clone(), HashSet::new()); - } - } - - let sorted = state_res::lexicographical_topological_sort(dbg!(&graph), |event_id| { - // This return value is the key used for sorting events, - // events are then sorted by power level, time, - // and lexically by event_id. - println!("{}", event_id); - Ok(( - int!(0), - MilliSecondsSinceUnixEpoch( - eventid_info - .get(event_id) - .map_or_else(|| uint!(0), |info| info.0.origin_server_ts), - ), - )) - }) - .map_err(|_| "Error sorting prev events".to_owned())?; - - let mut errors = 0; - for prev_id in dbg!(sorted) { - match db.rooms.is_disabled(room_id) { - Ok(false) => {} - _ => { - return Err( - "Federation of this room is currently disabled on this server.".to_owned(), - ); - } - } - - if let Some((time, tries)) = db - .globals - .bad_event_ratelimiter - .read() - .unwrap() - .get(&*prev_id) - { - // Exponential backoff - let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - info!("Backing off from {}", prev_id); - continue; - } - } - - if errors >= 5 { - break; - } - if let Some((pdu, json)) = eventid_info.remove(&*prev_id) { - if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { - continue; - } - - let start_time = Instant::now(); - db.globals - .roomid_federationhandletime - .write() - .unwrap() - .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); - if let Err(e) = upgrade_outlier_to_timeline_pdu( - pdu, - json, - &create_event, - origin, - db, - room_id, - pub_key_map, - ) - .await - { - errors += 1; - warn!("Prev event {} failed: {}", prev_id, e); - match db - .globals - .bad_event_ratelimiter - .write() - .unwrap() - .entry((*prev_id).to_owned()) - { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - } - hash_map::Entry::Occupied(mut e) => { - *e.get_mut() = (Instant::now(), e.get().1 + 1) - } - } - } - let elapsed = start_time.elapsed(); - db.globals - .roomid_federationhandletime - .write() - .unwrap() - .remove(&room_id.to_owned()); - warn!( - "Handling prev event {} took {}m{}s", - prev_id, - elapsed.as_secs() / 60, - elapsed.as_secs() % 60 - ); - } - } - - let start_time = Instant::now(); - db.globals - .roomid_federationhandletime - .write() - .unwrap() - .insert(room_id.to_owned(), (event_id.to_owned(), start_time)); - let r = upgrade_outlier_to_timeline_pdu( - incoming_pdu, - val, - &create_event, - origin, - db, - room_id, - pub_key_map, - ) - .await; - db.globals - .roomid_federationhandletime - .write() - .unwrap() - .remove(&room_id.to_owned()); - - r -} - -#[tracing::instrument(skip(create_event, value, db, pub_key_map))] -fn handle_outlier_pdu<'a>( - origin: &'a ServerName, - create_event: &'a PduEvent, - event_id: &'a EventId, - room_id: &'a RoomId, - value: BTreeMap<String, CanonicalJsonValue>, - db: &'a Database, - pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, -) -> AsyncRecursiveType<'a, Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> { - Box::pin(async move { - // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json - - // We go through all the signatures we see on the value and fetch the corresponding signing - // keys - fetch_required_signing_keys(&value, pub_key_map, db) - .await - .map_err(|e| e.to_string())?; - - // 2. Check signatures, otherwise drop - // 3. check content hash, redact if doesn't match - - let create_event_content: RoomCreateEventContent = - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - "Invalid create event in db.".to_owned() - })?; - - let room_version_id = &create_event_content.room_version; - let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); - - let mut val = match ruma::signatures::verify_event( - &*pub_key_map.read().map_err(|_| "RwLock is poisoned.")?, - &value, - room_version_id, - ) { - Err(e) => { - // Drop - warn!("Dropping bad event {}: {}", event_id, e); - return Err("Signature verification failed".to_owned()); - } - Ok(ruma::signatures::Verified::Signatures) => { - // Redact - warn!("Calculated hash does not match: {}", event_id); - match ruma::signatures::redact(&value, room_version_id) { - Ok(obj) => obj, - Err(_) => return Err("Redaction failed".to_owned()), - } - } - Ok(ruma::signatures::Verified::All) => value, - }; - - // Now that we have checked the signature and hashes we can add the eventID and convert - // to our PduEvent type - val.insert( - "event_id".to_owned(), - CanonicalJsonValue::String(event_id.as_str().to_owned()), - ); - let incoming_pdu = serde_json::from_value::<PduEvent>( - serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"), - ) - .map_err(|_| "Event is not a valid PDU.".to_owned())?; - - // 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events - // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" - // EDIT: Step 5 is not applied anymore because it failed too often - warn!("Fetching auth events for {}", incoming_pdu.event_id); - fetch_and_handle_outliers( - db, - origin, - &incoming_pdu - .auth_events - .iter() - .map(|x| Arc::from(&**x)) - .collect::<Vec<_>>(), - create_event, - room_id, - pub_key_map, - ) - .await; - - // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events - info!( - "Auth check for {} based on auth events", - incoming_pdu.event_id - ); - - // Build map of auth events - let mut auth_events = HashMap::new(); - for id in &incoming_pdu.auth_events { - let auth_event = match db.rooms.get_pdu(id).map_err(|e| e.to_string())? { - Some(e) => e, - None => { - warn!("Could not find auth event {}", id); - continue; - } - }; - - match auth_events.entry(( - auth_event.kind.to_string().into(), - auth_event - .state_key - .clone() - .expect("all auth events have state keys"), - )) { - hash_map::Entry::Vacant(v) => { - v.insert(auth_event); - } - hash_map::Entry::Occupied(_) => { - return Err( - "Auth event's type and state_key combination exists multiple times." - .to_owned(), - ) - } - } - } - - // The original create event must be in the auth events - if auth_events - .get(&(StateEventType::RoomCreate, "".to_owned())) - .map(|a| a.as_ref()) - != Some(create_event) - { - return Err("Incoming event refers to wrong create event.".to_owned()); - } - - if !state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - None::<PduEvent>, // TODO: third party invite - |k, s| auth_events.get(&(k.to_string().into(), s.to_owned())), - ) - .map_err(|_e| "Auth check failed".to_owned())? - { - return Err("Event has failed auth check with auth events.".to_owned()); - } - - info!("Validation successful."); - - // 7. Persist the event as an outlier. - db.rooms - .add_pdu_outlier(&incoming_pdu.event_id, &val) - .map_err(|_| "Failed to add pdu as outlier.".to_owned())?; - info!("Added pdu as outlier."); - - Ok((Arc::new(incoming_pdu), val)) - }) -} - -#[tracing::instrument(skip(incoming_pdu, val, create_event, db, pub_key_map))] -async fn upgrade_outlier_to_timeline_pdu( - incoming_pdu: Arc<PduEvent>, - val: BTreeMap<String, CanonicalJsonValue>, - create_event: &PduEvent, - origin: &ServerName, - db: &Database, - room_id: &RoomId, - pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, -) -> Result<Option<Vec<u8>>, String> { - if let Ok(Some(pduid)) = db.rooms.get_pdu_id(&incoming_pdu.event_id) { - return Ok(Some(pduid)); - } - - if db - .rooms - .is_event_soft_failed(&incoming_pdu.event_id) - .map_err(|_| "Failed to ask db for soft fail".to_owned())? - { - return Err("Event has been soft failed".into()); - } - - info!("Upgrading {} to timeline pdu", incoming_pdu.event_id); - - let create_event_content: RoomCreateEventContent = - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - "Invalid create event in db.".to_owned() - })?; - - let room_version_id = &create_event_content.room_version; - let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); - - // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities - // doing all the checks in this list starting at 1. These are not timeline events. - - // TODO: if we know the prev_events of the incoming event we can avoid the request and build - // the state from a known point and resolve if > 1 prev_event - - info!("Requesting state at event"); - let mut state_at_incoming_event = None; - - if incoming_pdu.prev_events.len() == 1 { - let prev_event = &*incoming_pdu.prev_events[0]; - let prev_event_sstatehash = db - .rooms - .pdu_shortstatehash(prev_event) - .map_err(|_| "Failed talking to db".to_owned())?; - - let state = if let Some(shortstatehash) = prev_event_sstatehash { - Some(db.rooms.state_full_ids(shortstatehash).await) - } else { - None - }; - - if let Some(Ok(mut state)) = state { - info!("Using cached state"); - let prev_pdu = - db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { - "Could not find prev event, but we know the state.".to_owned() - })?; - - if let Some(state_key) = &prev_pdu.state_key { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &prev_pdu.kind.to_string().into(), - state_key, - &db.globals, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - - state.insert(shortstatekey, Arc::from(prev_event)); - // Now it's the state after the pdu - } - - state_at_incoming_event = Some(state); - } - } else { - info!("Calculating state at event using state res"); - let mut extremity_sstatehashes = HashMap::new(); - - let mut okay = true; - for prev_eventid in &incoming_pdu.prev_events { - let prev_event = if let Ok(Some(pdu)) = db.rooms.get_pdu(prev_eventid) { - pdu - } else { - okay = false; - break; - }; - - let sstatehash = if let Ok(Some(s)) = db.rooms.pdu_shortstatehash(prev_eventid) { - s - } else { - okay = false; - break; - }; - - extremity_sstatehashes.insert(sstatehash, prev_event); - } - - if okay { - let mut fork_states = Vec::with_capacity(extremity_sstatehashes.len()); - let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); - - for (sstatehash, prev_event) in extremity_sstatehashes { - let mut leaf_state: BTreeMap<_, _> = db - .rooms - .state_full_ids(sstatehash) - .await - .map_err(|_| "Failed to ask db for room state.".to_owned())?; - - if let Some(state_key) = &prev_event.state_key { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &prev_event.kind.to_string().into(), - state_key, - &db.globals, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - leaf_state.insert(shortstatekey, Arc::from(&*prev_event.event_id)); - // Now it's the state after the pdu - } - - let mut state = StateMap::with_capacity(leaf_state.len()); - let mut starting_events = Vec::with_capacity(leaf_state.len()); - - for (k, id) in leaf_state { - if let Ok((ty, st_key)) = db.rooms.get_statekey_from_short(k) { - // FIXME: Undo .to_string().into() when StateMap - // is updated to use StateEventType - state.insert((ty.to_string().into(), st_key), id.clone()); - } else { - warn!("Failed to get_statekey_from_short."); - } - starting_events.push(id); - } - - auth_chain_sets.push( - get_auth_chain(room_id, starting_events, db) - .await - .map_err(|_| "Failed to load auth chain.".to_owned())? - .collect(), - ); - - fork_states.push(state); - } - - let lock = db.globals.stateres_mutex.lock(); - - let result = state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { - let res = db.rooms.get_pdu(id); - if let Err(e) = &res { - error!("LOOK AT ME Failed to fetch event: {}", e); - } - res.ok().flatten() - }); - drop(lock); - - state_at_incoming_event = match result { - Ok(new_state) => Some( - new_state - .into_iter() - .map(|((event_type, state_key), event_id)| { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &event_type.to_string().into(), - &state_key, - &db.globals, - ) - .map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; - Ok((shortstatekey, event_id)) - }) - .collect::<Result<_, String>>()?, - ), - Err(e) => { - warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e); - None - } - } - } - } - - if state_at_incoming_event.is_none() { - info!("Calling /state_ids"); - // Call /state_ids to find out what the state at this pdu is. We trust the server's - // response to some extend, but we still do a lot of checks on the events - match db - .sending - .send_federation_request( - &db.globals, - origin, - get_room_state_ids::v1::Request { - room_id, - event_id: &incoming_pdu.event_id, - }, - ) - .await - { - Ok(res) => { - info!("Fetching state events at event."); - let state_vec = fetch_and_handle_outliers( - db, - origin, - &res.pdu_ids - .iter() - .map(|x| Arc::from(&**x)) - .collect::<Vec<_>>(), - create_event, - room_id, - pub_key_map, - ) - .await; - - let mut state: BTreeMap<_, Arc<EventId>> = BTreeMap::new(); - for (pdu, _) in state_vec { - let state_key = pdu - .state_key - .clone() - .ok_or_else(|| "Found non-state pdu in state events.".to_owned())?; - - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &pdu.kind.to_string().into(), - &state_key, - &db.globals, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - - match state.entry(shortstatekey) { - btree_map::Entry::Vacant(v) => { - v.insert(Arc::from(&*pdu.event_id)); - } - btree_map::Entry::Occupied(_) => return Err( - "State event's type and state_key combination exists multiple times." - .to_owned(), - ), - } - } - - // The original create event must still be in the state - let create_shortstatekey = db - .rooms - .get_shortstatekey(&StateEventType::RoomCreate, "") - .map_err(|_| "Failed to talk to db.")? - .expect("Room exists"); - - if state.get(&create_shortstatekey).map(|id| id.as_ref()) - != Some(&create_event.event_id) - { - return Err("Incoming event refers to wrong create event.".to_owned()); - } - - state_at_incoming_event = Some(state); - } - Err(e) => { - warn!("Fetching state for event failed: {}", e); - return Err("Fetching state for event failed".into()); - } - }; - } - - let state_at_incoming_event = - state_at_incoming_event.expect("we always set this to some above"); - - info!("Starting auth check"); - // 11. Check the auth of the event passes based on the state of the event - let check_result = state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - None::<PduEvent>, // TODO: third party invite - |k, s| { - db.rooms - .get_shortstatekey(&k.to_string().into(), s) - .ok() - .flatten() - .and_then(|shortstatekey| state_at_incoming_event.get(&shortstatekey)) - .and_then(|event_id| db.rooms.get_pdu(event_id).ok().flatten()) - }, - ) - .map_err(|_e| "Auth check failed.".to_owned())?; - - if !check_result { - return Err("Event has failed auth check with state at the event.".into()); - } - info!("Auth check succeeded"); - - // We start looking at current room state now, so lets lock the room - - let mutex_state = Arc::clone( - db.globals - .roomid_mutex_state - .write() - .unwrap() - .entry(room_id.to_owned()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - - // Now we calculate the set of extremities this room has after the incoming event has been - // applied. We start with the previous extremities (aka leaves) - info!("Calculating extremities"); - let mut extremities = db - .rooms - .get_pdu_leaves(room_id) - .map_err(|_| "Failed to load room leaves".to_owned())?; - - // Remove any forward extremities that are referenced by this incoming event's prev_events - for prev_event in &incoming_pdu.prev_events { - if extremities.contains(prev_event) { - extremities.remove(prev_event); - } - } - - // Only keep those extremities were not referenced yet - extremities.retain(|id| !matches!(db.rooms.is_event_referenced(room_id, id), Ok(true))); - - info!("Compressing state at event"); - let state_ids_compressed = state_at_incoming_event - .iter() - .map(|(shortstatekey, id)| { - db.rooms - .compress_state_event(*shortstatekey, id, &db.globals) - .map_err(|_| "Failed to compress_state_event".to_owned()) - }) - .collect::<Result<_, _>>()?; - - // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it - info!("Starting soft fail auth check"); - - let auth_events = db - .rooms - .get_auth_events( - room_id, - &incoming_pdu.kind, - &incoming_pdu.sender, - incoming_pdu.state_key.as_deref(), - &incoming_pdu.content, - ) - .map_err(|_| "Failed to get_auth_events.".to_owned())?; - - let soft_fail = !state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - None::<PduEvent>, - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|_e| "Auth check failed.".to_owned())?; - - if soft_fail { - append_incoming_pdu( - db, - &incoming_pdu, - val, - extremities.iter().map(Deref::deref), - state_ids_compressed, - soft_fail, - &state_lock, - ) - .map_err(|e| { - warn!("Failed to add pdu to db: {}", e); - "Failed to add pdu to db.".to_owned() - })?; - - // Soft fail, we keep the event as an outlier but don't add it to the timeline - warn!("Event was soft failed: {:?}", incoming_pdu); - db.rooms - .mark_event_soft_failed(&incoming_pdu.event_id) - .map_err(|_| "Failed to set soft failed flag".to_owned())?; - return Err("Event has been soft failed".into()); - } - - if incoming_pdu.state_key.is_some() { - info!("Loading current room state ids"); - let current_sstatehash = db - .rooms - .current_shortstatehash(room_id) - .map_err(|_| "Failed to load current state hash.".to_owned())? - .expect("every room has state"); - - let current_state_ids = db - .rooms - .state_full_ids(current_sstatehash) - .await - .map_err(|_| "Failed to load room state.")?; - - info!("Preparing for stateres to derive new room state"); - let mut extremity_sstatehashes = HashMap::new(); - - info!("Loading extremities"); - for id in dbg!(&extremities) { - match db - .rooms - .get_pdu(id) - .map_err(|_| "Failed to ask db for pdu.".to_owned())? - { - Some(leaf_pdu) => { - extremity_sstatehashes.insert( - db.rooms - .pdu_shortstatehash(&leaf_pdu.event_id) - .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? - .ok_or_else(|| { - error!( - "Found extremity pdu with no statehash in db: {:?}", - leaf_pdu - ); - "Found pdu with no statehash in db.".to_owned() - })?, - leaf_pdu, - ); - } - _ => { - error!("Missing state snapshot for {:?}", id); - return Err("Missing state snapshot.".to_owned()); - } - } - } - - let mut fork_states = Vec::new(); - - // 12. Ensure that the state is derived from the previous current state (i.e. we calculated - // by doing state res where one of the inputs was a previously trusted set of state, - // don't just trust a set of state we got from a remote). - - // We do this by adding the current state to the list of fork states - extremity_sstatehashes.remove(¤t_sstatehash); - fork_states.push(current_state_ids); - - // We also add state after incoming event to the fork states - let mut state_after = state_at_incoming_event.clone(); - if let Some(state_key) = &incoming_pdu.state_key { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &incoming_pdu.kind.to_string().into(), - state_key, - &db.globals, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; - - state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); - } - fork_states.push(state_after); - - let mut update_state = false; - // 14. Use state resolution to find new room state - let new_room_state = if fork_states.is_empty() { - return Err("State is empty.".to_owned()); - } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { - info!("State resolution trivial"); - // There was only one state, so it has to be the room's current state (because that is - // always included) - fork_states[0] - .iter() - .map(|(k, id)| { - db.rooms - .compress_state_event(*k, id, &db.globals) - .map_err(|_| "Failed to compress_state_event.".to_owned()) - }) - .collect::<Result<_, _>>()? - } else { - info!("Loading auth chains"); - // We do need to force an update to this room's state - update_state = true; - - let mut auth_chain_sets = Vec::new(); - for state in &fork_states { - auth_chain_sets.push( - get_auth_chain( - room_id, - state.iter().map(|(_, id)| id.clone()).collect(), - db, - ) - .await - .map_err(|_| "Failed to load auth chain.".to_owned())? - .collect(), - ); - } - - info!("Loading fork states"); - - let fork_states: Vec<_> = fork_states - .into_iter() - .map(|map| { - map.into_iter() - .filter_map(|(k, id)| { - db.rooms - .get_statekey_from_short(k) - // FIXME: Undo .to_string().into() when StateMap - // is updated to use StateEventType - .map(|(ty, st_key)| ((ty.to_string().into(), st_key), id)) - .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) - .ok() - }) - .collect::<StateMap<_>>() - }) - .collect(); - - info!("Resolving state"); - - let lock = db.globals.stateres_mutex.lock(); - let state = match state_res::resolve( - room_version_id, - &fork_states, - auth_chain_sets, - |id| { - let res = db.rooms.get_pdu(id); - if let Err(e) = &res { - error!("LOOK AT ME Failed to fetch event: {}", e); - } - res.ok().flatten() - }, - ) { - Ok(new_state) => new_state, - Err(_) => { - return Err("State resolution failed, either an event could not be found or deserialization".into()); - } - }; - - drop(lock); - - info!("State resolution done. Compressing state"); - - state - .into_iter() - .map(|((event_type, state_key), event_id)| { - let shortstatekey = db - .rooms - .get_or_create_shortstatekey( - &event_type.to_string().into(), - &state_key, - &db.globals, - ) - .map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; - db.rooms - .compress_state_event(shortstatekey, &event_id, &db.globals) - .map_err(|_| "Failed to compress state event".to_owned()) - }) - .collect::<Result<_, _>>()? - }; - - // Set the new room state to the resolved state - if update_state { - info!("Forcing new room state"); - db.rooms - .force_state(room_id, new_room_state, db) - .map_err(|_| "Failed to set new room state.".to_owned())?; - } - } - - info!("Appending pdu to timeline"); - extremities.insert(incoming_pdu.event_id.clone()); - - // Now that the event has passed all auth it is added into the timeline. - // We use the `state_at_event` instead of `state_after` so we accurately - // represent the state for this event. - - let pdu_id = append_incoming_pdu( - db, - &incoming_pdu, - val, - extremities.iter().map(Deref::deref), - state_ids_compressed, - soft_fail, - &state_lock, - ) - .map_err(|e| { - warn!("Failed to add pdu to db: {}", e); - "Failed to add pdu to db.".to_owned() - })?; - - info!("Appended incoming pdu"); - - // Event has passed all auth/stateres checks - drop(state_lock); - Ok(pdu_id) -} - -/// Find the event and auth it. Once the event is validated (steps 1 - 8) -/// it is appended to the outliers Tree. -/// -/// Returns pdu and if we fetched it over federation the raw json. -/// -/// a. Look in the main timeline (pduid_pdu tree) -/// b. Look at outlier pdu tree -/// c. Ask origin server over federation -/// d. TODO: Ask other servers over federation? -#[tracing::instrument(skip_all)] -pub(crate) fn fetch_and_handle_outliers<'a>( - db: &'a Database, - origin: &'a ServerName, - events: &'a [Arc<EventId>], - create_event: &'a PduEvent, - room_id: &'a RoomId, - pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, -) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> { - Box::pin(async move { - let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - } - hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), - }; - - let mut pdus = vec![]; - for id in events { - if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&**id) - { - // Exponential backoff - let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - info!("Backing off from {}", id); - continue; - } - } - - // a. Look in the main timeline (pduid_pdu tree) - // b. Look at outlier pdu tree - // (get_pdu_json checks both) - if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) { - trace!("Found {} in db", id); - pdus.push((local_pdu, None)); - continue; - } - - // c. Ask origin server over federation - // We also handle its auth chain here so we don't get a stack overflow in - // handle_outlier_pdu. - let mut todo_auth_events = vec![Arc::clone(id)]; - let mut events_in_reverse_order = Vec::new(); - let mut events_all = HashSet::new(); - let mut i = 0; - while let Some(next_id) = todo_auth_events.pop() { - if events_all.contains(&next_id) { - continue; - } - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - - if let Ok(Some(_)) = db.rooms.get_pdu(&next_id) { - trace!("Found {} in db", id); - continue; - } - - info!("Fetching {} over federation.", next_id); - match db - .sending - .send_federation_request( - &db.globals, - origin, - get_event::v1::Request { event_id: &next_id }, - ) - .await - { - Ok(res) => { - info!("Got {} over federation", next_id); - let (calculated_event_id, value) = - match crate::pdu::gen_event_id_canonical_json(&res.pdu, &db) { - Ok(t) => t, - Err(_) => { - back_off((*next_id).to_owned()); - continue; - } - }; - - if calculated_event_id != *next_id { - warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", - next_id, calculated_event_id, &res.pdu); - } - - if let Some(auth_events) = - value.get("auth_events").and_then(|c| c.as_array()) - { - for auth_event in auth_events { - if let Ok(auth_event) = - serde_json::from_value(auth_event.clone().into()) - { - let a: Arc<EventId> = auth_event; - todo_auth_events.push(a); - } else { - warn!("Auth event id is not valid"); - } - } - } else { - warn!("Auth event list invalid"); - } - - events_in_reverse_order.push((next_id.clone(), value)); - events_all.insert(next_id); - } - Err(_) => { - warn!("Failed to fetch event: {}", next_id); - back_off((*next_id).to_owned()); - } - } - } - - for (next_id, value) in events_in_reverse_order.iter().rev() { - match handle_outlier_pdu( - origin, - create_event, - next_id, - room_id, - value.clone(), - db, - pub_key_map, - ) - .await - { - Ok((pdu, json)) => { - if next_id == id { - pdus.push((pdu, Some(json))); - } - } - Err(e) => { - warn!("Authentication of event {} failed: {:?}", next_id, e); - back_off((**next_id).to_owned()); - } - } - } - } - pdus - }) -} - /// Search the DB for the signing keys of the given server, if we don't have them /// fetch them from the server and save to our DB. #[tracing::instrument(skip_all)] @@ -2204,92 +1047,6 @@ pub(crate) async fn fetch_signing_keys( )) } -/// Append the incoming event setting the state snapshot to the state from the -/// server that sent the event. -#[tracing::instrument(skip_all)] -fn append_incoming_pdu<'a>( - db: &Database, - pdu: &PduEvent, - pdu_json: CanonicalJsonObject, - new_room_leaves: impl IntoIterator<Item = &'a EventId> + Clone + Debug, - state_ids_compressed: HashSet<CompressedStateEvent>, - soft_fail: bool, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex -) -> Result<Option<Vec<u8>>> { - // We append to state before appending the pdu, so we don't have a moment in time with the - // pdu without it's state. This is okay because append_pdu can't fail. - db.rooms.set_event_state( - &pdu.event_id, - &pdu.room_id, - state_ids_compressed, - &db.globals, - )?; - - if soft_fail { - db.rooms - .mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; - db.rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?; - return Ok(None); - } - - let pdu_id = db.rooms.append_pdu(pdu, pdu_json, new_room_leaves, db)?; - - for appservice in db.appservice.all()? { - if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { - db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; - continue; - } - - if let Some(namespaces) = appservice.1.get("namespaces") { - let users = namespaces - .get("users") - .and_then(|users| users.as_sequence()) - .map_or_else(Vec::new, |users| { - users - .iter() - .filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok()) - .collect::<Vec<_>>() - }); - let aliases = namespaces - .get("aliases") - .and_then(|aliases| aliases.as_sequence()) - .map_or_else(Vec::new, |aliases| { - aliases - .iter() - .filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok()) - .collect::<Vec<_>>() - }); - let rooms = namespaces - .get("rooms") - .and_then(|rooms| rooms.as_sequence()); - - let matching_users = |users: &Regex| { - users.is_match(pdu.sender.as_str()) - || pdu.kind == RoomEventType::RoomMember - && pdu - .state_key - .as_ref() - .map_or(false, |state_key| users.is_match(state_key)) - }; - let matching_aliases = |aliases: &Regex| { - db.rooms - .room_aliases(&pdu.room_id) - .filter_map(|r| r.ok()) - .any(|room_alias| aliases.is_match(room_alias.as_str())) - }; - - if aliases.iter().any(matching_aliases) - || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) - || users.iter().any(matching_users) - { - db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; - } - } - } - - Ok(Some(pdu_id)) -} - #[tracing::instrument(skip(starting_events, db))] pub(crate) async fn get_auth_chain<'a>( room_id: &RoomId, @@ -2745,35 +1502,6 @@ pub async fn create_join_event_template_route( } } - let prev_events: Vec<_> = db - .rooms - .get_pdu_leaves(&body.room_id)? - .into_iter() - .take(20) - .collect(); - - let create_event = db - .rooms - .room_state_get(&body.room_id, &StateEventType::RoomCreate, "")?; - - let create_event_content: Option<RoomCreateEventContent> = create_event - .as_ref() - .map(|create_event| { - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - Error::bad_database("Invalid create event in db.") - }) - }) - .transpose()?; - - // If there was no create event yet, assume we are creating a room with the default version - // right now - let room_version_id = create_event_content - .map_or(db.globals.default_room_version(), |create_event| { - create_event.room_version - }); - let room_version = RoomVersion::new(&room_version_id).expect("room version is supported"); - if !body.ver.contains(&room_version_id) { return Err(Error::BadRequest( ErrorKind::IncompatibleRoomVersion { @@ -2798,89 +1526,7 @@ pub async fn create_join_event_template_route( let state_key = body.user_id.to_string(); let kind = StateEventType::RoomMember; - let auth_events = db.rooms.get_auth_events( - &body.room_id, - &kind.to_string().into(), - &body.user_id, - Some(&state_key), - &content, - )?; - - // Our depth is the maximum depth of prev_events + 1 - let depth = prev_events - .iter() - .filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth)) - .max() - .unwrap_or_else(|| uint!(0)) - + uint!(1); - - let mut unsigned = BTreeMap::new(); - - if let Some(prev_pdu) = db.rooms.room_state_get(&body.room_id, &kind, &state_key)? { - unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone()); - unsigned.insert( - "prev_sender".to_owned(), - to_raw_value(&prev_pdu.sender).expect("UserId is valid"), - ); - } - - let pdu = PduEvent { - event_id: ruma::event_id!("$thiswillbefilledinlater").into(), - room_id: body.room_id.clone(), - sender: body.user_id.clone(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - kind: kind.to_string().into(), - content, - state_key: Some(state_key), - prev_events, - depth, - auth_events: auth_events - .iter() - .map(|(_, pdu)| pdu.event_id.clone()) - .collect(), - redacts: None, - unsigned: if unsigned.is_empty() { - None - } else { - Some(to_raw_value(&unsigned).expect("to_raw_value always works")) - }, - hashes: EventHash { - sha256: "aaa".to_owned(), - }, - signatures: None, - }; - - let auth_check = state_res::auth_check( - &room_version, - &pdu, - None::<PduEvent>, // TODO: third_party_invite - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|e| { - error!("{:?}", e); - Error::bad_database("Auth check failed.") - })?; - - if !auth_check { - return Err(Error::BadRequest( - ErrorKind::Forbidden, - "Event is not authorized.", - )); - } - - // Hash and sign - let mut pdu_json = - utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); - - pdu_json.remove("event_id"); - - // Add origin because synapse likes that (and it's required in the spec) - pdu_json.insert( - "origin".to_owned(), - CanonicalJsonValue::String(db.globals.server_name().as_str().to_owned()), - ); + let (pdu, pdu_json) = create_hash_and_sign_event(); Ok(prepare_join_event::v1::Response { room_version: Some(room_version_id), |