diff options
author | Timo Kösters <timo@koesters.xyz> | 2023-03-13 10:39:02 +0100 |
---|---|---|
committer | Timo Kösters <timo@koesters.xyz> | 2023-03-13 10:43:09 +0100 |
commit | 42b12934e33c81b0684347d5c02e874bf3492674 (patch) | |
tree | c63660690f722a99d2a5b14b86011415537c95fe /src | |
parent | 63f787f6357ef1344243d97ffa79cacba4694bb8 (diff) | |
download | conduit-42b12934e33c81b0684347d5c02e874bf3492674.zip |
Don't crash when a room errors
Diffstat (limited to 'src')
-rw-r--r-- | src/api/client_server/sync.rs | 1145 |
1 files changed, 587 insertions, 558 deletions
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 834438c..5eb820c 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -2,7 +2,14 @@ use crate::{service::rooms::timeline::PduCount, services, Error, Result, Ruma, R use ruma::{ api::client::{ filter::{FilterDefinition, LazyLoadOptions}, - sync::sync_events::{self, DeviceLists, UnreadNotificationsCount}, + sync::sync_events::{ + self, + v3::{ + Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, + LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice, + }, + DeviceLists, UnreadNotificationsCount, + }, uiaa::UiaaResponse, }, events::{ @@ -10,7 +17,7 @@ use ruma::{ RoomEventType, StateEventType, }, serde::Raw, - OwnedDeviceId, OwnedUserId, RoomId, UserId, + DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UserId, }; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, @@ -160,11 +167,6 @@ async fn sync_helper( body: sync_events::v3::Request, // bool = caching allowed ) -> Result<(sync_events::v3::Response, bool), Error> { - use sync_events::v3::{ - Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, LeftRoom, - Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice, - }; - // TODO: match body.set_presence { services().rooms.edus.presence.ping_presence(&sender_user)?; @@ -192,6 +194,8 @@ async fn sync_helper( _ => (false, false), }; + let full_state = body.full_state; + let mut joined_rooms = BTreeMap::new(); let since = body .since @@ -220,10 +224,76 @@ async fn sync_helper( .collect::<Vec<_>>(); for room_id in all_joined_rooms { let room_id = room_id?; + if let Ok(joined_room) = load_joined_room( + &sender_user, + &sender_device, + &room_id, + since, + sincecount, + next_batch, + next_batchcount, + lazy_load_enabled, + lazy_load_send_redundant, + full_state, + &mut device_list_updates, + &mut left_encrypted_users, + ) + .await + { + if !joined_room.is_empty() { + joined_rooms.insert(room_id.clone(), joined_room); + } + + // Take presence updates from this room + for (user_id, presence) in services() + .rooms + .edus + .presence + .presence_since(&room_id, since)? + { + match presence_updates.entry(user_id) { + Entry::Vacant(v) => { + v.insert(presence); + } + Entry::Occupied(mut o) => { + let p = o.get_mut(); + + // Update existing presence event with more info + p.content.presence = presence.content.presence; + if let Some(status_msg) = presence.content.status_msg { + p.content.status_msg = Some(status_msg); + } + if let Some(last_active_ago) = presence.content.last_active_ago { + p.content.last_active_ago = Some(last_active_ago); + } + if let Some(displayname) = presence.content.displayname { + p.content.displayname = Some(displayname); + } + if let Some(avatar_url) = presence.content.avatar_url { + p.content.avatar_url = Some(avatar_url); + } + if let Some(currently_active) = presence.content.currently_active { + p.content.currently_active = Some(currently_active); + } + } + } + } + } + } + + let mut left_rooms = BTreeMap::new(); + let all_left_rooms: Vec<_> = services() + .rooms + .state_cache + .rooms_left(&sender_user) + .collect(); + for result in all_left_rooms { + let (room_id, _) = result?; + + let mut left_state_events = Vec::new(); { // Get and drop the lock to wait for remaining operations to finish - // This will make sure the we have all events until next_batch let mutex_insert = Arc::clone( services() .globals @@ -237,182 +307,450 @@ async fn sync_helper( drop(insert_lock); } - let timeline_pdus; - let limited; - if services() + let left_count = services() .rooms - .timeline - .last_timeline_count(&sender_user, &room_id)? - > sincecount - { - let mut non_timeline_pdus = services() - .rooms - .timeline - .pdus_until(&sender_user, &room_id, PduCount::max())? - .filter_map(|r| { - // Filter out buggy events - if r.is_err() { - error!("Bad pdu in pdus_since: {:?}", r); - } - r.ok() - }) - .take_while(|(pducount, _)| pducount > &sincecount); + .state_cache + .get_left_count(&room_id, &sender_user)?; - // Take the last 10 events for the timeline - timeline_pdus = non_timeline_pdus - .by_ref() - .take(10) - .collect::<Vec<_>>() - .into_iter() - .rev() - .collect::<Vec<_>>(); + // Left before last sync + if Some(since) >= left_count { + continue; + } - // They /sync response doesn't always return all messages, so we say the output is - // limited unless there are events in non_timeline_pdus - limited = non_timeline_pdus.next().is_some(); - } else { - timeline_pdus = Vec::new(); - limited = false; + if !services().rooms.metadata.exists(&room_id)? { + // This is just a rejected invite, not a room we know + continue; } - let send_notification_counts = !timeline_pdus.is_empty() - || services() - .rooms - .user - .last_notification_read(&sender_user, &room_id)? - > since; + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(&room_id, since)?; - let mut timeline_users = HashSet::new(); - for (_, event) in &timeline_pdus { - timeline_users.insert(event.sender.as_str().to_owned()); - } + let since_state_ids = match since_shortstatehash { + Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, + None => HashMap::new(), + }; - services().rooms.lazy_loading.lazy_load_confirm_delivery( - &sender_user, - &sender_device, + let left_event_id = match services().rooms.state_accessor.room_state_get_id( &room_id, - sincecount, - )?; - - // Database queries: + &StateEventType::RoomMember, + sender_user.as_str(), + )? { + Some(e) => e, + None => { + error!("Left room but no left state event"); + continue; + } + }; - let current_shortstatehash = - if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? { - s - } else { - error!("Room {} has no state", room_id); + let left_shortstatehash = match services() + .rooms + .state_accessor + .pdu_shortstatehash(&left_event_id)? + { + Some(s) => s, + None => { + error!("Leave event has no state"); continue; - }; + } + }; - let since_shortstatehash = services() + let mut left_state_ids = services() .rooms - .user - .get_token_shortstatehash(&room_id, since)?; + .state_accessor + .state_full_ids(left_shortstatehash) + .await?; - // Calculates joined_member_count, invited_member_count and heroes - let calculate_counts = || { - let joined_member_count = services() - .rooms - .state_cache - .room_joined_count(&room_id)? - .unwrap_or(0); - let invited_member_count = services() - .rooms - .state_cache - .room_invited_count(&room_id)? - .unwrap_or(0); + let leave_shortstatekey = services() + .rooms + .short + .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?; - // Recalculate heroes (first 5 members) - let mut heroes = Vec::new(); + left_state_ids.insert(leave_shortstatekey, left_event_id); - if joined_member_count + invited_member_count <= 5 { - // Go through all PDUs and for each member event, check if the user is still joined or - // invited until we have 5 or we reach the end + let mut i = 0; + for (key, id) in left_state_ids { + if full_state || since_state_ids.get(&key) != Some(&id) { + let (event_type, state_key) = + services().rooms.short.get_statekey_from_short(key)?; - for hero in services() - .rooms - .timeline - .all_pdus(&sender_user, &room_id)? - .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus - .filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember) - .map(|(_, pdu)| { - let content: RoomMemberEventContent = - serde_json::from_str(pdu.content.get()).map_err(|_| { - Error::bad_database("Invalid member event in database.") - })?; - - if let Some(state_key) = &pdu.state_key { - let user_id = UserId::parse(state_key.clone()).map_err(|_| { - Error::bad_database("Invalid UserId in member PDU.") - })?; - - // The membership was and still is invite or join - if matches!( - content.membership, - MembershipState::Join | MembershipState::Invite - ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)? - || services() - .rooms - .state_cache - .is_invited(&user_id, &room_id)?) - { - Ok::<_, Error>(Some(state_key.clone())) - } else { - Ok(None) - } - } else { - Ok(None) - } - }) - // Filter out buggy users - .filter_map(|u| u.ok()) - // Filter for possible heroes - .flatten() + if !lazy_load_enabled + || event_type != StateEventType::RoomMember + || full_state + // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 + || *sender_user == state_key { - if heroes.contains(&hero) || hero == sender_user.as_str() { - continue; - } + let pdu = match services().rooms.timeline.get_pdu(&id)? { + Some(pdu) => pdu, + None => { + error!("Pdu in state not found: {}", id); + continue; + } + }; + + left_state_events.push(pdu.to_sync_state_event()); - heroes.push(hero); + i += 1; + if i % 100 == 0 { + tokio::task::yield_now().await; + } } } + } - Ok::<_, Error>(( - Some(joined_member_count), - Some(invited_member_count), - heroes, - )) - }; + left_rooms.insert( + room_id.clone(), + LeftRoom { + account_data: RoomAccountData { events: Vec::new() }, + timeline: Timeline { + limited: false, + prev_batch: Some(next_batch_string.clone()), + events: Vec::new(), + }, + state: State { + events: left_state_events, + }, + }, + ); + } + + let mut invited_rooms = BTreeMap::new(); + let all_invited_rooms: Vec<_> = services() + .rooms + .state_cache + .rooms_invited(&sender_user) + .collect(); + for result in all_invited_rooms { + let (room_id, invite_state_events) = result?; - let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash - .and_then(|shortstatehash| { + { + // Get and drop the lock to wait for remaining operations to finish + let mutex_insert = Arc::clone( services() - .rooms - .state_accessor - .state_get( - shortstatehash, - &StateEventType::RoomMember, - sender_user.as_str(), - ) - .transpose() + .globals + .roomid_mutex_insert + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let insert_lock = mutex_insert.lock().unwrap(); + drop(insert_lock); + } + + let invite_count = services() + .rooms + .state_cache + .get_invite_count(&room_id, &sender_user)?; + + // Invited before last sync + if Some(since) >= invite_count { + continue; + } + + invited_rooms.insert( + room_id.clone(), + InvitedRoom { + invite_state: InviteState { + events: invite_state_events, + }, + }, + ); + } + + for user_id in left_encrypted_users { + let still_share_encrypted_room = services() + .rooms + .user + .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? + .filter_map(|r| r.ok()) + .filter_map(|other_room_id| { + Some( + services() + .rooms + .state_accessor + .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "") + .ok()? + .is_some(), + ) + }) + .all(|encrypted| !encrypted); + // If the user doesn't share an encrypted room with the target anymore, we need to tell + // them + if still_share_encrypted_room { + device_list_left.insert(user_id); + } + } + + // Remove all to-device events the device received *last time* + services() + .users + .remove_to_device_events(&sender_user, &sender_device, since)?; + + let response = sync_events::v3::Response { + next_batch: next_batch_string, + rooms: Rooms { + leave: left_rooms, + join: joined_rooms, + invite: invited_rooms, + knock: BTreeMap::new(), // TODO + }, + presence: Presence { + events: presence_updates + .into_values() + .map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully")) + .collect(), + }, + account_data: GlobalAccountData { + events: services() + .account_data + .changes_since(None, &sender_user, since)? + .into_iter() + .filter_map(|(_, v)| { + serde_json::from_str(v.json().get()) + .map_err(|_| Error::bad_database("Invalid account event in database.")) + .ok() + }) + .collect(), + }, + device_lists: DeviceLists { + changed: device_list_updates.into_iter().collect(), + left: device_list_left.into_iter().collect(), + }, + device_one_time_keys_count: services() + .users + .count_one_time_keys(&sender_user, &sender_device)?, + to_device: ToDevice { + events: services() + .users + .get_to_device_events(&sender_user, &sender_device)?, + }, + // Fallback keys are not yet supported + device_unused_fallback_key_types: None, + }; + + // TODO: Retry the endpoint instead of returning (waiting for #118) + if !full_state + && response.rooms.is_empty() + && response.presence.is_empty() + && response.account_data.is_empty() + && response.device_lists.is_empty() + && response.to_device.is_empty() + { + // Hang a few seconds so requests are not spammed + // Stop hanging if new info arrives + let mut duration = body.timeout.unwrap_or_default(); + if duration.as_secs() > 30 { + duration = Duration::from_secs(30); + } + let _ = tokio::time::timeout(duration, watcher).await; + Ok((response, false)) + } else { + Ok((response, since != next_batch)) // Only cache if we made progress + } +} + +async fn load_joined_room( + sender_user: &UserId, + sender_device: &DeviceId, + room_id: &RoomId, + since: u64, + sincecount: PduCount, + next_batch: u64, + next_batchcount: PduCount, + lazy_load_enabled: bool, + lazy_load_send_redundant: bool, + full_state: bool, + device_list_updates: &mut HashSet<OwnedUserId>, + left_encrypted_users: &mut HashSet<OwnedUserId>, +) -> Result<JoinedRoom> { + { + // Get and drop the lock to wait for remaining operations to finish + // This will make sure the we have all events until next_batch + let mutex_insert = Arc::clone( + services() + .globals + .roomid_mutex_insert + .write() + .unwrap() + .entry(room_id.to_owned()) + .or_default(), + ); + let insert_lock = mutex_insert.lock().unwrap(); + drop(insert_lock); + } + + let timeline_pdus; + let limited; + if services() + .rooms + .timeline + .last_timeline_count(&sender_user, &room_id)? + > sincecount + { + let mut non_timeline_pdus = services() + .rooms + .timeline + .pdus_until(&sender_user, &room_id, PduCount::max())? + .filter_map(|r| { + // Filter out buggy events + if r.is_err() { + error!("Bad pdu in pdus_since: {:?}", r); + } + r.ok() }) - .transpose()? - .and_then(|pdu| { - serde_json::from_str(pdu.content.get()) - .map_err(|_| Error::bad_database("Invalid PDU in database.")) - .ok() - }); + .take_while(|(pducount, _)| pducount > &sincecount); + + // Take the last 10 events for the timeline + timeline_pdus = non_timeline_pdus + .by_ref() + .take(10) + .collect::<Vec<_>>() + .into_iter() + .rev() + .collect::<Vec<_>>(); + + // They /sync response doesn't always return all messages, so we say the output is + // limited unless there are events in non_timeline_pdus + limited = non_timeline_pdus.next().is_some(); + } else { + timeline_pdus = Vec::new(); + limited = false; + } + + let send_notification_counts = !timeline_pdus.is_empty() + || services() + .rooms + .user + .last_notification_read(&sender_user, &room_id)? + > since; + + let mut timeline_users = HashSet::new(); + for (_, event) in &timeline_pdus { + timeline_users.insert(event.sender.as_str().to_owned()); + } + + services().rooms.lazy_loading.lazy_load_confirm_delivery( + &sender_user, + &sender_device, + &room_id, + sincecount, + )?; - let joined_since_last_sync = - since_sender_member.map_or(true, |member| member.membership != MembershipState::Join); + // Database queries: - let ( + let current_shortstatehash = + if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? { + s + } else { + error!("Room {} has no state", room_id); + return Err(Error::BadDatabase("Room has no state")); + }; + + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(&room_id, since)?; + + // Calculates joined_member_count, invited_member_count and heroes + let calculate_counts = || { + let joined_member_count = services() + .rooms + .state_cache + .room_joined_count(&room_id)? + .unwrap_or(0); + let invited_member_count = services() + .rooms + .state_cache + .room_invited_count(&room_id)? + .unwrap_or(0); + + // Recalculate heroes (first 5 members) + let mut heroes = Vec::new(); + + if joined_member_count + invited_member_count <= 5 { + // Go through all PDUs and for each member event, check if the user is still joined or + // invited until we have 5 or we reach the end + + for hero in services() + .rooms + .timeline + .all_pdus(&sender_user, &room_id)? + .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus + .filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember) + .map(|(_, pdu)| { + let content: RoomMemberEventContent = serde_json::from_str(pdu.content.get()) + .map_err(|_| { + Error::bad_database("Invalid member event in database.") + })?; + + if let Some(state_key) = &pdu.state_key { + let user_id = UserId::parse(state_key.clone()) + .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; + + // The membership was and still is invite or join + if matches!( + content.membership, + MembershipState::Join | MembershipState::Invite + ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)? + || services() + .rooms + .state_cache + .is_invited(&user_id, &room_id)?) + { + Ok::<_, Error>(Some(state_key.clone())) + } else { + Ok(None) + } + } else { + Ok(None) + } + }) + // Filter out buggy users + .filter_map(|u| u.ok()) + // Filter for possible heroes + .flatten() + { + if heroes.contains(&hero) || hero == sender_user.as_str() { + continue; + } + + heroes.push(hero); + } + } + + Ok::<_, Error>(( + Some(joined_member_count), + Some(invited_member_count), heroes, - joined_member_count, - invited_member_count, - joined_since_last_sync, - state_events, - ) = if since_shortstatehash.is_none() || joined_since_last_sync { + )) + }; + + let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash + .and_then(|shortstatehash| { + services() + .rooms + .state_accessor + .state_get( + shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) + .transpose() + }) + .transpose()? + .and_then(|pdu| { + serde_json::from_str(pdu.content.get()) + .map_err(|_| Error::bad_database("Invalid PDU in database.")) + .ok() + }); + + let joined_since_last_sync = + since_sender_member.map_or(true, |member| member.membership != MembershipState::Join); + + let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) = + if since_shortstatehash.is_none() || joined_since_last_sync { // Probably since = 0, we will do an initial sync let (joined_member_count, invited_member_count, heroes) = calculate_counts()?; @@ -448,10 +786,10 @@ async fn sync_helper( tokio::task::yield_now().await; } } else if !lazy_load_enabled - || body.full_state - || timeline_users.contains(&state_key) - // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 - || *sender_user == state_key + || full_state + || timeline_users.contains(&state_key) + // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 + || *sender_user == state_key { let pdu = match services().rooms.timeline.get_pdu(&id)? { Some(pdu) => pdu, @@ -521,7 +859,7 @@ async fn sync_helper( .await?; for (key, id) in current_state_ids { - if body.full_state || since_state_ids.get(&key) != Some(&id) { + if full_state || since_state_ids.get(&key) != Some(&id) { let pdu = match services().rooms.timeline.get_pdu(&id)? { Some(pdu) => pdu, None => { @@ -671,385 +1009,88 @@ async fn sync_helper( ) }; - // Look for device list updates in this room - device_list_updates.extend( - services() - .users - .keys_changed(room_id.as_ref(), since, None) - .filter_map(|r| r.ok()), - ); - - let notification_count = if send_notification_counts { - Some( - services() - .rooms - .user - .notification_count(&sender_user, &room_id)? - .try_into() - .expect("notification count can't go that high"), - ) - } else { - None - }; - - let highlight_count = if send_notification_counts { - Some( - services() - .rooms - .user - .highlight_count(&sender_user, &room_id)? - .try_into() - .expect("highlight count can't go that high"), - ) - } else { - None - }; - - let prev_batch = timeline_pdus - .first() - .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { - Ok(Some(match pdu_count { - PduCount::Backfilled(_) => { - error!("timeline in backfill state?!"); - "0".to_owned() - } - PduCount::Normal(c) => c.to_string(), - })) - })?; - - let room_events: Vec<_> = timeline_pdus - .iter() - .map(|(_, pdu)| pdu.to_sync_room_event()) - .collect(); - - let mut edus: Vec<_> = services() - .rooms - .edus - .read_receipt - .readreceipts_since(&room_id, since) - .filter_map(|r| r.ok()) // Filter out buggy events - .map(|(_, _, v)| v) - .collect(); - - if services().rooms.edus.typing.last_typing_update(&room_id)? > since { - edus.push( - serde_json::from_str( - &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?) - .expect("event is valid, we just created it"), - ) - .expect("event is valid, we just created it"), - ); - } - - // Save the state after this sync so we can send the correct state diff next sync - services().rooms.user.associate_token_shortstatehash( - &room_id, - next_batch, - current_shortstatehash, - )?; - - let joined_room = JoinedRoom { - account_data: RoomAccountData { - events: services() - .account_data - .changes_since(Some(&room_id), &sender_user, since)? - .into_iter() - .filter_map(|(_, v)| { - serde_json::from_str(v.json().get()) - .map_err(|_| Error::bad_database("Invalid account event in database.")) - .ok() - }) - .collect(), - }, - summary: RoomSummary { - heroes, - joined_member_count: joined_member_count.map(|n| (n as u32).into()), - invited_member_count: invited_member_count.map(|n| (n as u32).into()), - }, - unread_notifications: UnreadNotificationsCount { - highlight_count, - notification_count, - }, - timeline: Timeline { - limited: limited || joined_since_last_sync, - prev_batch, - events: room_events, - }, - state: State { - events: state_events - .iter() - .map(|pdu| pdu.to_sync_state_event()) - .collect(), - }, - ephemeral: Ephemeral { events: edus }, - unread_thread_notifications: BTreeMap::new(), - }; + // Look for device list updates in this room + device_list_updates.extend( + services() + .users + .keys_changed(room_id.as_ref(), since, None) + .filter_map(|r| r.ok()), + ); - if !joined_room.is_empty() { - joined_rooms.insert(room_id.clone(), joined_room); - } + let notification_count = if send_notification_counts { + Some( + services() + .rooms + .user + .notification_count(&sender_user, &room_id)? + .try_into() + .expect("notification count can't go that high"), + ) + } else { + None + }; - // Take presence updates from this room - for (user_id, presence) in services() - .rooms - .edus - .presence - .presence_since(&room_id, since)? - { - match presence_updates.entry(user_id) { - Entry::Vacant(v) => { - v.insert(presence); - } - Entry::Occupied(mut o) => { - let p = o.get_mut(); + let highlight_count = if send_notification_counts { + Some( + services() + .rooms + .user + .highlight_count(&sender_user, &room_id)? + .try_into() + .expect("highlight count can't go that high"), + ) + } else { + None + }; - // Update existing presence event with more info - p.content.presence = presence.content.presence; - if let Some(status_msg) = presence.content.status_msg { - p.content.status_msg = Some(status_msg); - } - if let Some(last_active_ago) = presence.content.last_active_ago { - p.content.last_active_ago = Some(last_active_ago); - } - if let Some(displayname) = presence.content.displayname { - p.content.displayname = Some(displayname); - } - if let Some(avatar_url) = presence.content.avatar_url { - p.content.avatar_url = Some(avatar_url); - } - if let Some(currently_active) = presence.content.currently_active { - p.content.currently_active = Some(currently_active); - } + let prev_batch = timeline_pdus + .first() + .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { + Ok(Some(match pdu_count { + PduCount::Backfilled(_) => { + error!("timeline in backfill state?!"); + "0".to_owned() } - } - } - } + PduCount::Normal(c) => c.to_string(), + })) + })?; - let mut left_rooms = BTreeMap::new(); - let all_left_rooms: Vec<_> = services() - .rooms - .state_cache - .rooms_left(&sender_user) + let room_events: Vec<_> = timeline_pdus + .iter() + .map(|(_, pdu)| pdu.to_sync_room_event()) .collect(); - for result in all_left_rooms { - let (room_id, _) = result?; - - let mut left_state_events = Vec::new(); - - { - // Get and drop the lock to wait for remaining operations to finish - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .unwrap() - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().unwrap(); - drop(insert_lock); - } - - let left_count = services() - .rooms - .state_cache - .get_left_count(&room_id, &sender_user)?; - - // Left before last sync - if Some(since) >= left_count { - continue; - } - - if !services().rooms.metadata.exists(&room_id)? { - // This is just a rejected invite, not a room we know - continue; - } - - let since_shortstatehash = services() - .rooms - .user - .get_token_shortstatehash(&room_id, since)?; - let since_state_ids = match since_shortstatehash { - Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, - None => HashMap::new(), - }; - - let left_event_id = match services().rooms.state_accessor.room_state_get_id( - &room_id, - &StateEventType::RoomMember, - sender_user.as_str(), - )? { - Some(e) => e, - None => { - error!("Left room but no left state event"); - continue; - } - }; - - let left_shortstatehash = match services() - .rooms - .state_accessor - .pdu_shortstatehash(&left_event_id)? - { - Some(s) => s, - None => { - error!("Leave event has no state"); - continue; - } - }; - - let mut left_state_ids = services() - .rooms - .state_accessor - .state_full_ids(left_shortstatehash) - .await?; - - let leave_shortstatekey = services() - .rooms - .short - .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?; - - left_state_ids.insert(leave_shortstatekey, left_event_id); - - let mut i = 0; - for (key, id) in left_state_ids { - if body.full_state || since_state_ids.get(&key) != Some(&id) { - let (event_type, state_key) = - services().rooms.short.get_statekey_from_short(key)?; - - if !lazy_load_enabled - || event_type != StateEventType::RoomMember - || body.full_state - // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 - || *sender_user == state_key - { - let pdu = match services().rooms.timeline.get_pdu(&id)? { - Some(pdu) => pdu, - None => { - error!("Pdu in state not found: {}", id); - continue; - } - }; - - left_state_events.push(pdu.to_sync_state_event()); - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } - } - } - - left_rooms.insert( - room_id.clone(), - LeftRoom { - account_data: RoomAccountData { events: Vec::new() }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch_string.clone()), - events: Vec::new(), - }, - state: State { - events: left_state_events, - }, - }, - ); - } - - let mut invited_rooms = BTreeMap::new(); - let all_invited_rooms: Vec<_> = services() + let mut edus: Vec<_> = services() .rooms - .state_cache - .rooms_invited(&sender_user) + .edus + .read_receipt + .readreceipts_since(&room_id, since) + .filter_map(|r| r.ok()) // Filter out buggy events + .map(|(_, _, v)| v) .collect(); - for result in all_invited_rooms { - let (room_id, invite_state_events) = result?; - { - // Get and drop the lock to wait for remaining operations to finish - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .unwrap() - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().unwrap(); - drop(insert_lock); - } - - let invite_count = services() - .rooms - .state_cache - .get_invite_count(&room_id, &sender_user)?; - - // Invited before last sync - if Some(since) >= invite_count { - continue; - } - - invited_rooms.insert( - room_id.clone(), - InvitedRoom { - invite_state: InviteState { - events: invite_state_events, - }, - }, + if services().rooms.edus.typing.last_typing_update(&room_id)? > since { + edus.push( + serde_json::from_str( + &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?) + .expect("event is valid, we just created it"), + ) + .expect("event is valid, we just created it"), ); } - for user_id in left_encrypted_users { - let still_share_encrypted_room = services() - .rooms - .user - .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? - .filter_map(|r| r.ok()) - .filter_map(|other_room_id| { - Some( - services() - .rooms - .state_accessor - .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "") - .ok()? - .is_some(), - ) - }) - .all(|encrypted| !encrypted); - // If the user doesn't share an encrypted room with the target anymore, we need to tell - // them - if still_share_encrypted_room { - device_list_left.insert(user_id); - } - } - - // Remove all to-device events the device received *last time* - services() - .users - .remove_to_device_events(&sender_user, &sender_device, since)?; + // Save the state after this sync so we can send the correct state diff next sync + services().rooms.user.associate_token_shortstatehash( + &room_id, + next_batch, + current_shortstatehash, + )?; - let response = sync_events::v3::Response { - next_batch: next_batch_string, - rooms: Rooms { - leave: left_rooms, - join: joined_rooms, - invite: invited_rooms, - knock: BTreeMap::new(), // TODO - }, - presence: Presence { - events: presence_updates - .into_values() - .map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully")) - .collect(), - }, - account_data: GlobalAccountData { + Ok(JoinedRoom { + account_data: RoomAccountData { events: services() .account_data - .changes_since(None, &sender_user, since)? + .changes_since(Some(&room_id), &sender_user, since)? .into_iter() .filter_map(|(_, v)| { serde_json::from_str(v.json().get()) @@ -1058,41 +1099,29 @@ async fn sync_helper( }) .collect(), }, - device_lists: DeviceLists { - changed: device_list_updates.into_iter().collect(), - left: device_list_left.into_iter().collect(), + summary: RoomSummary { + heroes, + joined_member_count: joined_member_count.map(|n| (n as u32).into()), + invited_member_count: invited_member_count.map(|n| (n as u32).into()), }, - device_one_time_keys_count: services() - .users - .count_one_time_keys(&sender_user, &sender_device)?, - to_device: ToDevice { - events: services() - .users - .get_to_device_events(&sender_user, &sender_device)?, + unread_notifications: UnreadNotificationsCount { + highlight_count, + notification_count, }, - // Fallback keys are not yet supported - device_unused_fallback_key_types: None, - }; - - // TODO: Retry the endpoint instead of returning (waiting for #118) - if !body.full_state - && response.rooms.is_empty() - && response.presence.is_empty() - && response.account_data.is_empty() - && response.device_lists.is_empty() - && response.to_device.is_empty() - { - // Hang a few seconds so requests are not spammed - // Stop hanging if new info arrives - let mut duration = body.timeout.unwrap_or_default(); - if duration.as_secs() > 30 { - duration = Duration::from_secs(30); - } - let _ = tokio::time::timeout(duration, watcher).await; - Ok((response, false)) - } else { - Ok((response, since != next_batch)) // Only cache if we made progress - } + timeline: Timeline { + limited: limited || joined_since_last_sync, + prev_batch, + events: room_events, + }, + state: State { + events: state_events + .iter() + .map(|pdu| pdu.to_sync_state_event()) + .collect(), + }, + ephemeral: Ephemeral { events: edus }, + unread_thread_notifications: BTreeMap::new(), + }) } fn share_encrypted_room( |