summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-03-13 10:39:02 +0100
committerTimo Kösters <timo@koesters.xyz>2023-03-13 10:43:09 +0100
commit42b12934e33c81b0684347d5c02e874bf3492674 (patch)
treec63660690f722a99d2a5b14b86011415537c95fe /src
parent63f787f6357ef1344243d97ffa79cacba4694bb8 (diff)
downloadconduit-42b12934e33c81b0684347d5c02e874bf3492674.zip
Don't crash when a room errors
Diffstat (limited to 'src')
-rw-r--r--src/api/client_server/sync.rs1145
1 files changed, 587 insertions, 558 deletions
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index 834438c..5eb820c 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -2,7 +2,14 @@ use crate::{service::rooms::timeline::PduCount, services, Error, Result, Ruma, R
use ruma::{
api::client::{
filter::{FilterDefinition, LazyLoadOptions},
- sync::sync_events::{self, DeviceLists, UnreadNotificationsCount},
+ sync::sync_events::{
+ self,
+ v3::{
+ Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
+ LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
+ },
+ DeviceLists, UnreadNotificationsCount,
+ },
uiaa::UiaaResponse,
},
events::{
@@ -10,7 +17,7 @@ use ruma::{
RoomEventType, StateEventType,
},
serde::Raw,
- OwnedDeviceId, OwnedUserId, RoomId, UserId,
+ DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UserId,
};
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
@@ -160,11 +167,6 @@ async fn sync_helper(
body: sync_events::v3::Request,
// bool = caching allowed
) -> Result<(sync_events::v3::Response, bool), Error> {
- use sync_events::v3::{
- Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, LeftRoom,
- Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
- };
-
// TODO: match body.set_presence {
services().rooms.edus.presence.ping_presence(&sender_user)?;
@@ -192,6 +194,8 @@ async fn sync_helper(
_ => (false, false),
};
+ let full_state = body.full_state;
+
let mut joined_rooms = BTreeMap::new();
let since = body
.since
@@ -220,10 +224,76 @@ async fn sync_helper(
.collect::<Vec<_>>();
for room_id in all_joined_rooms {
let room_id = room_id?;
+ if let Ok(joined_room) = load_joined_room(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ since,
+ sincecount,
+ next_batch,
+ next_batchcount,
+ lazy_load_enabled,
+ lazy_load_send_redundant,
+ full_state,
+ &mut device_list_updates,
+ &mut left_encrypted_users,
+ )
+ .await
+ {
+ if !joined_room.is_empty() {
+ joined_rooms.insert(room_id.clone(), joined_room);
+ }
+
+ // Take presence updates from this room
+ for (user_id, presence) in services()
+ .rooms
+ .edus
+ .presence
+ .presence_since(&room_id, since)?
+ {
+ match presence_updates.entry(user_id) {
+ Entry::Vacant(v) => {
+ v.insert(presence);
+ }
+ Entry::Occupied(mut o) => {
+ let p = o.get_mut();
+
+ // Update existing presence event with more info
+ p.content.presence = presence.content.presence;
+ if let Some(status_msg) = presence.content.status_msg {
+ p.content.status_msg = Some(status_msg);
+ }
+ if let Some(last_active_ago) = presence.content.last_active_ago {
+ p.content.last_active_ago = Some(last_active_ago);
+ }
+ if let Some(displayname) = presence.content.displayname {
+ p.content.displayname = Some(displayname);
+ }
+ if let Some(avatar_url) = presence.content.avatar_url {
+ p.content.avatar_url = Some(avatar_url);
+ }
+ if let Some(currently_active) = presence.content.currently_active {
+ p.content.currently_active = Some(currently_active);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ let mut left_rooms = BTreeMap::new();
+ let all_left_rooms: Vec<_> = services()
+ .rooms
+ .state_cache
+ .rooms_left(&sender_user)
+ .collect();
+ for result in all_left_rooms {
+ let (room_id, _) = result?;
+
+ let mut left_state_events = Vec::new();
{
// Get and drop the lock to wait for remaining operations to finish
- // This will make sure the we have all events until next_batch
let mutex_insert = Arc::clone(
services()
.globals
@@ -237,182 +307,450 @@ async fn sync_helper(
drop(insert_lock);
}
- let timeline_pdus;
- let limited;
- if services()
+ let left_count = services()
.rooms
- .timeline
- .last_timeline_count(&sender_user, &room_id)?
- > sincecount
- {
- let mut non_timeline_pdus = services()
- .rooms
- .timeline
- .pdus_until(&sender_user, &room_id, PduCount::max())?
- .filter_map(|r| {
- // Filter out buggy events
- if r.is_err() {
- error!("Bad pdu in pdus_since: {:?}", r);
- }
- r.ok()
- })
- .take_while(|(pducount, _)| pducount > &sincecount);
+ .state_cache
+ .get_left_count(&room_id, &sender_user)?;
- // Take the last 10 events for the timeline
- timeline_pdus = non_timeline_pdus
- .by_ref()
- .take(10)
- .collect::<Vec<_>>()
- .into_iter()
- .rev()
- .collect::<Vec<_>>();
+ // Left before last sync
+ if Some(since) >= left_count {
+ continue;
+ }
- // They /sync response doesn't always return all messages, so we say the output is
- // limited unless there are events in non_timeline_pdus
- limited = non_timeline_pdus.next().is_some();
- } else {
- timeline_pdus = Vec::new();
- limited = false;
+ if !services().rooms.metadata.exists(&room_id)? {
+ // This is just a rejected invite, not a room we know
+ continue;
}
- let send_notification_counts = !timeline_pdus.is_empty()
- || services()
- .rooms
- .user
- .last_notification_read(&sender_user, &room_id)?
- > since;
+ let since_shortstatehash = services()
+ .rooms
+ .user
+ .get_token_shortstatehash(&room_id, since)?;
- let mut timeline_users = HashSet::new();
- for (_, event) in &timeline_pdus {
- timeline_users.insert(event.sender.as_str().to_owned());
- }
+ let since_state_ids = match since_shortstatehash {
+ Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
+ None => HashMap::new(),
+ };
- services().rooms.lazy_loading.lazy_load_confirm_delivery(
- &sender_user,
- &sender_device,
+ let left_event_id = match services().rooms.state_accessor.room_state_get_id(
&room_id,
- sincecount,
- )?;
-
- // Database queries:
+ &StateEventType::RoomMember,
+ sender_user.as_str(),
+ )? {
+ Some(e) => e,
+ None => {
+ error!("Left room but no left state event");
+ continue;
+ }
+ };
- let current_shortstatehash =
- if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
- s
- } else {
- error!("Room {} has no state", room_id);
+ let left_shortstatehash = match services()
+ .rooms
+ .state_accessor
+ .pdu_shortstatehash(&left_event_id)?
+ {
+ Some(s) => s,
+ None => {
+ error!("Leave event has no state");
continue;
- };
+ }
+ };
- let since_shortstatehash = services()
+ let mut left_state_ids = services()
.rooms
- .user
- .get_token_shortstatehash(&room_id, since)?;
+ .state_accessor
+ .state_full_ids(left_shortstatehash)
+ .await?;
- // Calculates joined_member_count, invited_member_count and heroes
- let calculate_counts = || {
- let joined_member_count = services()
- .rooms
- .state_cache
- .room_joined_count(&room_id)?
- .unwrap_or(0);
- let invited_member_count = services()
- .rooms
- .state_cache
- .room_invited_count(&room_id)?
- .unwrap_or(0);
+ let leave_shortstatekey = services()
+ .rooms
+ .short
+ .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
- // Recalculate heroes (first 5 members)
- let mut heroes = Vec::new();
+ left_state_ids.insert(leave_shortstatekey, left_event_id);
- if joined_member_count + invited_member_count <= 5 {
- // Go through all PDUs and for each member event, check if the user is still joined or
- // invited until we have 5 or we reach the end
+ let mut i = 0;
+ for (key, id) in left_state_ids {
+ if full_state || since_state_ids.get(&key) != Some(&id) {
+ let (event_type, state_key) =
+ services().rooms.short.get_statekey_from_short(key)?;
- for hero in services()
- .rooms
- .timeline
- .all_pdus(&sender_user, &room_id)?
- .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
- .filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember)
- .map(|(_, pdu)| {
- let content: RoomMemberEventContent =
- serde_json::from_str(pdu.content.get()).map_err(|_| {
- Error::bad_database("Invalid member event in database.")
- })?;
-
- if let Some(state_key) = &pdu.state_key {
- let user_id = UserId::parse(state_key.clone()).map_err(|_| {
- Error::bad_database("Invalid UserId in member PDU.")
- })?;
-
- // The membership was and still is invite or join
- if matches!(
- content.membership,
- MembershipState::Join | MembershipState::Invite
- ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)?
- || services()
- .rooms
- .state_cache
- .is_invited(&user_id, &room_id)?)
- {
- Ok::<_, Error>(Some(state_key.clone()))
- } else {
- Ok(None)
- }
- } else {
- Ok(None)
- }
- })
- // Filter out buggy users
- .filter_map(|u| u.ok())
- // Filter for possible heroes
- .flatten()
+ if !lazy_load_enabled
+ || event_type != StateEventType::RoomMember
+ || full_state
+ // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
+ || *sender_user == state_key
{
- if heroes.contains(&hero) || hero == sender_user.as_str() {
- continue;
- }
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+
+ left_state_events.push(pdu.to_sync_state_event());
- heroes.push(hero);
+ i += 1;
+ if i % 100 == 0 {
+ tokio::task::yield_now().await;
+ }
}
}
+ }
- Ok::<_, Error>((
- Some(joined_member_count),
- Some(invited_member_count),
- heroes,
- ))
- };
+ left_rooms.insert(
+ room_id.clone(),
+ LeftRoom {
+ account_data: RoomAccountData { events: Vec::new() },
+ timeline: Timeline {
+ limited: false,
+ prev_batch: Some(next_batch_string.clone()),
+ events: Vec::new(),
+ },
+ state: State {
+ events: left_state_events,
+ },
+ },
+ );
+ }
+
+ let mut invited_rooms = BTreeMap::new();
+ let all_invited_rooms: Vec<_> = services()
+ .rooms
+ .state_cache
+ .rooms_invited(&sender_user)
+ .collect();
+ for result in all_invited_rooms {
+ let (room_id, invite_state_events) = result?;
- let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
- .and_then(|shortstatehash| {
+ {
+ // Get and drop the lock to wait for remaining operations to finish
+ let mutex_insert = Arc::clone(
services()
- .rooms
- .state_accessor
- .state_get(
- shortstatehash,
- &StateEventType::RoomMember,
- sender_user.as_str(),
- )
- .transpose()
+ .globals
+ .roomid_mutex_insert
+ .write()
+ .unwrap()
+ .entry(room_id.clone())
+ .or_default(),
+ );
+ let insert_lock = mutex_insert.lock().unwrap();
+ drop(insert_lock);
+ }
+
+ let invite_count = services()
+ .rooms
+ .state_cache
+ .get_invite_count(&room_id, &sender_user)?;
+
+ // Invited before last sync
+ if Some(since) >= invite_count {
+ continue;
+ }
+
+ invited_rooms.insert(
+ room_id.clone(),
+ InvitedRoom {
+ invite_state: InviteState {
+ events: invite_state_events,
+ },
+ },
+ );
+ }
+
+ for user_id in left_encrypted_users {
+ let still_share_encrypted_room = services()
+ .rooms
+ .user
+ .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
+ .filter_map(|r| r.ok())
+ .filter_map(|other_room_id| {
+ Some(
+ services()
+ .rooms
+ .state_accessor
+ .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
+ .ok()?
+ .is_some(),
+ )
+ })
+ .all(|encrypted| !encrypted);
+ // If the user doesn't share an encrypted room with the target anymore, we need to tell
+ // them
+ if still_share_encrypted_room {
+ device_list_left.insert(user_id);
+ }
+ }
+
+ // Remove all to-device events the device received *last time*
+ services()
+ .users
+ .remove_to_device_events(&sender_user, &sender_device, since)?;
+
+ let response = sync_events::v3::Response {
+ next_batch: next_batch_string,
+ rooms: Rooms {
+ leave: left_rooms,
+ join: joined_rooms,
+ invite: invited_rooms,
+ knock: BTreeMap::new(), // TODO
+ },
+ presence: Presence {
+ events: presence_updates
+ .into_values()
+ .map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully"))
+ .collect(),
+ },
+ account_data: GlobalAccountData {
+ events: services()
+ .account_data
+ .changes_since(None, &sender_user, since)?
+ .into_iter()
+ .filter_map(|(_, v)| {
+ serde_json::from_str(v.json().get())
+ .map_err(|_| Error::bad_database("Invalid account event in database."))
+ .ok()
+ })
+ .collect(),
+ },
+ device_lists: DeviceLists {
+ changed: device_list_updates.into_iter().collect(),
+ left: device_list_left.into_iter().collect(),
+ },
+ device_one_time_keys_count: services()
+ .users
+ .count_one_time_keys(&sender_user, &sender_device)?,
+ to_device: ToDevice {
+ events: services()
+ .users
+ .get_to_device_events(&sender_user, &sender_device)?,
+ },
+ // Fallback keys are not yet supported
+ device_unused_fallback_key_types: None,
+ };
+
+ // TODO: Retry the endpoint instead of returning (waiting for #118)
+ if !full_state
+ && response.rooms.is_empty()
+ && response.presence.is_empty()
+ && response.account_data.is_empty()
+ && response.device_lists.is_empty()
+ && response.to_device.is_empty()
+ {
+ // Hang a few seconds so requests are not spammed
+ // Stop hanging if new info arrives
+ let mut duration = body.timeout.unwrap_or_default();
+ if duration.as_secs() > 30 {
+ duration = Duration::from_secs(30);
+ }
+ let _ = tokio::time::timeout(duration, watcher).await;
+ Ok((response, false))
+ } else {
+ Ok((response, since != next_batch)) // Only cache if we made progress
+ }
+}
+
+async fn load_joined_room(
+ sender_user: &UserId,
+ sender_device: &DeviceId,
+ room_id: &RoomId,
+ since: u64,
+ sincecount: PduCount,
+ next_batch: u64,
+ next_batchcount: PduCount,
+ lazy_load_enabled: bool,
+ lazy_load_send_redundant: bool,
+ full_state: bool,
+ device_list_updates: &mut HashSet<OwnedUserId>,
+ left_encrypted_users: &mut HashSet<OwnedUserId>,
+) -> Result<JoinedRoom> {
+ {
+ // Get and drop the lock to wait for remaining operations to finish
+ // This will make sure the we have all events until next_batch
+ let mutex_insert = Arc::clone(
+ services()
+ .globals
+ .roomid_mutex_insert
+ .write()
+ .unwrap()
+ .entry(room_id.to_owned())
+ .or_default(),
+ );
+ let insert_lock = mutex_insert.lock().unwrap();
+ drop(insert_lock);
+ }
+
+ let timeline_pdus;
+ let limited;
+ if services()
+ .rooms
+ .timeline
+ .last_timeline_count(&sender_user, &room_id)?
+ > sincecount
+ {
+ let mut non_timeline_pdus = services()
+ .rooms
+ .timeline
+ .pdus_until(&sender_user, &room_id, PduCount::max())?
+ .filter_map(|r| {
+ // Filter out buggy events
+ if r.is_err() {
+ error!("Bad pdu in pdus_since: {:?}", r);
+ }
+ r.ok()
})
- .transpose()?
- .and_then(|pdu| {
- serde_json::from_str(pdu.content.get())
- .map_err(|_| Error::bad_database("Invalid PDU in database."))
- .ok()
- });
+ .take_while(|(pducount, _)| pducount > &sincecount);
+
+ // Take the last 10 events for the timeline
+ timeline_pdus = non_timeline_pdus
+ .by_ref()
+ .take(10)
+ .collect::<Vec<_>>()
+ .into_iter()
+ .rev()
+ .collect::<Vec<_>>();
+
+ // They /sync response doesn't always return all messages, so we say the output is
+ // limited unless there are events in non_timeline_pdus
+ limited = non_timeline_pdus.next().is_some();
+ } else {
+ timeline_pdus = Vec::new();
+ limited = false;
+ }
+
+ let send_notification_counts = !timeline_pdus.is_empty()
+ || services()
+ .rooms
+ .user
+ .last_notification_read(&sender_user, &room_id)?
+ > since;
+
+ let mut timeline_users = HashSet::new();
+ for (_, event) in &timeline_pdus {
+ timeline_users.insert(event.sender.as_str().to_owned());
+ }
+
+ services().rooms.lazy_loading.lazy_load_confirm_delivery(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ sincecount,
+ )?;
- let joined_since_last_sync =
- since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
+ // Database queries:
- let (
+ let current_shortstatehash =
+ if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
+ s
+ } else {
+ error!("Room {} has no state", room_id);
+ return Err(Error::BadDatabase("Room has no state"));
+ };
+
+ let since_shortstatehash = services()
+ .rooms
+ .user
+ .get_token_shortstatehash(&room_id, since)?;
+
+ // Calculates joined_member_count, invited_member_count and heroes
+ let calculate_counts = || {
+ let joined_member_count = services()
+ .rooms
+ .state_cache
+ .room_joined_count(&room_id)?
+ .unwrap_or(0);
+ let invited_member_count = services()
+ .rooms
+ .state_cache
+ .room_invited_count(&room_id)?
+ .unwrap_or(0);
+
+ // Recalculate heroes (first 5 members)
+ let mut heroes = Vec::new();
+
+ if joined_member_count + invited_member_count <= 5 {
+ // Go through all PDUs and for each member event, check if the user is still joined or
+ // invited until we have 5 or we reach the end
+
+ for hero in services()
+ .rooms
+ .timeline
+ .all_pdus(&sender_user, &room_id)?
+ .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
+ .filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember)
+ .map(|(_, pdu)| {
+ let content: RoomMemberEventContent = serde_json::from_str(pdu.content.get())
+ .map_err(|_| {
+ Error::bad_database("Invalid member event in database.")
+ })?;
+
+ if let Some(state_key) = &pdu.state_key {
+ let user_id = UserId::parse(state_key.clone())
+ .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
+
+ // The membership was and still is invite or join
+ if matches!(
+ content.membership,
+ MembershipState::Join | MembershipState::Invite
+ ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)?
+ || services()
+ .rooms
+ .state_cache
+ .is_invited(&user_id, &room_id)?)
+ {
+ Ok::<_, Error>(Some(state_key.clone()))
+ } else {
+ Ok(None)
+ }
+ } else {
+ Ok(None)
+ }
+ })
+ // Filter out buggy users
+ .filter_map(|u| u.ok())
+ // Filter for possible heroes
+ .flatten()
+ {
+ if heroes.contains(&hero) || hero == sender_user.as_str() {
+ continue;
+ }
+
+ heroes.push(hero);
+ }
+ }
+
+ Ok::<_, Error>((
+ Some(joined_member_count),
+ Some(invited_member_count),
heroes,
- joined_member_count,
- invited_member_count,
- joined_since_last_sync,
- state_events,
- ) = if since_shortstatehash.is_none() || joined_since_last_sync {
+ ))
+ };
+
+ let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
+ .and_then(|shortstatehash| {
+ services()
+ .rooms
+ .state_accessor
+ .state_get(
+ shortstatehash,
+ &StateEventType::RoomMember,
+ sender_user.as_str(),
+ )
+ .transpose()
+ })
+ .transpose()?
+ .and_then(|pdu| {
+ serde_json::from_str(pdu.content.get())
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))
+ .ok()
+ });
+
+ let joined_since_last_sync =
+ since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
+
+ let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) =
+ if since_shortstatehash.is_none() || joined_since_last_sync {
// Probably since = 0, we will do an initial sync
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
@@ -448,10 +786,10 @@ async fn sync_helper(
tokio::task::yield_now().await;
}
} else if !lazy_load_enabled
- || body.full_state
- || timeline_users.contains(&state_key)
- // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
- || *sender_user == state_key
+ || full_state
+ || timeline_users.contains(&state_key)
+ // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
+ || *sender_user == state_key
{
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
@@ -521,7 +859,7 @@ async fn sync_helper(
.await?;
for (key, id) in current_state_ids {
- if body.full_state || since_state_ids.get(&key) != Some(&id) {
+ if full_state || since_state_ids.get(&key) != Some(&id) {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
@@ -671,385 +1009,88 @@ async fn sync_helper(
)
};
- // Look for device list updates in this room
- device_list_updates.extend(
- services()
- .users
- .keys_changed(room_id.as_ref(), since, None)
- .filter_map(|r| r.ok()),
- );
-
- let notification_count = if send_notification_counts {
- Some(
- services()
- .rooms
- .user
- .notification_count(&sender_user, &room_id)?
- .try_into()
- .expect("notification count can't go that high"),
- )
- } else {
- None
- };
-
- let highlight_count = if send_notification_counts {
- Some(
- services()
- .rooms
- .user
- .highlight_count(&sender_user, &room_id)?
- .try_into()
- .expect("highlight count can't go that high"),
- )
- } else {
- None
- };
-
- let prev_batch = timeline_pdus
- .first()
- .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
- Ok(Some(match pdu_count {
- PduCount::Backfilled(_) => {
- error!("timeline in backfill state?!");
- "0".to_owned()
- }
- PduCount::Normal(c) => c.to_string(),
- }))
- })?;
-
- let room_events: Vec<_> = timeline_pdus
- .iter()
- .map(|(_, pdu)| pdu.to_sync_room_event())
- .collect();
-
- let mut edus: Vec<_> = services()
- .rooms
- .edus
- .read_receipt
- .readreceipts_since(&room_id, since)
- .filter_map(|r| r.ok()) // Filter out buggy events
- .map(|(_, _, v)| v)
- .collect();
-
- if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
- edus.push(
- serde_json::from_str(
- &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?)
- .expect("event is valid, we just created it"),
- )
- .expect("event is valid, we just created it"),
- );
- }
-
- // Save the state after this sync so we can send the correct state diff next sync
- services().rooms.user.associate_token_shortstatehash(
- &room_id,
- next_batch,
- current_shortstatehash,
- )?;
-
- let joined_room = JoinedRoom {
- account_data: RoomAccountData {
- events: services()
- .account_data
- .changes_since(Some(&room_id), &sender_user, since)?
- .into_iter()
- .filter_map(|(_, v)| {
- serde_json::from_str(v.json().get())
- .map_err(|_| Error::bad_database("Invalid account event in database."))
- .ok()
- })
- .collect(),
- },
- summary: RoomSummary {
- heroes,
- joined_member_count: joined_member_count.map(|n| (n as u32).into()),
- invited_member_count: invited_member_count.map(|n| (n as u32).into()),
- },
- unread_notifications: UnreadNotificationsCount {
- highlight_count,
- notification_count,
- },
- timeline: Timeline {
- limited: limited || joined_since_last_sync,
- prev_batch,
- events: room_events,
- },
- state: State {
- events: state_events
- .iter()
- .map(|pdu| pdu.to_sync_state_event())
- .collect(),
- },
- ephemeral: Ephemeral { events: edus },
- unread_thread_notifications: BTreeMap::new(),
- };
+ // Look for device list updates in this room
+ device_list_updates.extend(
+ services()
+ .users
+ .keys_changed(room_id.as_ref(), since, None)
+ .filter_map(|r| r.ok()),
+ );
- if !joined_room.is_empty() {
- joined_rooms.insert(room_id.clone(), joined_room);
- }
+ let notification_count = if send_notification_counts {
+ Some(
+ services()
+ .rooms
+ .user
+ .notification_count(&sender_user, &room_id)?
+ .try_into()
+ .expect("notification count can't go that high"),
+ )
+ } else {
+ None
+ };
- // Take presence updates from this room
- for (user_id, presence) in services()
- .rooms
- .edus
- .presence
- .presence_since(&room_id, since)?
- {
- match presence_updates.entry(user_id) {
- Entry::Vacant(v) => {
- v.insert(presence);
- }
- Entry::Occupied(mut o) => {
- let p = o.get_mut();
+ let highlight_count = if send_notification_counts {
+ Some(
+ services()
+ .rooms
+ .user
+ .highlight_count(&sender_user, &room_id)?
+ .try_into()
+ .expect("highlight count can't go that high"),
+ )
+ } else {
+ None
+ };
- // Update existing presence event with more info
- p.content.presence = presence.content.presence;
- if let Some(status_msg) = presence.content.status_msg {
- p.content.status_msg = Some(status_msg);
- }
- if let Some(last_active_ago) = presence.content.last_active_ago {
- p.content.last_active_ago = Some(last_active_ago);
- }
- if let Some(displayname) = presence.content.displayname {
- p.content.displayname = Some(displayname);
- }
- if let Some(avatar_url) = presence.content.avatar_url {
- p.content.avatar_url = Some(avatar_url);
- }
- if let Some(currently_active) = presence.content.currently_active {
- p.content.currently_active = Some(currently_active);
- }
+ let prev_batch = timeline_pdus
+ .first()
+ .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
+ Ok(Some(match pdu_count {
+ PduCount::Backfilled(_) => {
+ error!("timeline in backfill state?!");
+ "0".to_owned()
}
- }
- }
- }
+ PduCount::Normal(c) => c.to_string(),
+ }))
+ })?;
- let mut left_rooms = BTreeMap::new();
- let all_left_rooms: Vec<_> = services()
- .rooms
- .state_cache
- .rooms_left(&sender_user)
+ let room_events: Vec<_> = timeline_pdus
+ .iter()
+ .map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
- for result in all_left_rooms {
- let (room_id, _) = result?;
-
- let mut left_state_events = Vec::new();
-
- {
- // Get and drop the lock to wait for remaining operations to finish
- let mutex_insert = Arc::clone(
- services()
- .globals
- .roomid_mutex_insert
- .write()
- .unwrap()
- .entry(room_id.clone())
- .or_default(),
- );
- let insert_lock = mutex_insert.lock().unwrap();
- drop(insert_lock);
- }
-
- let left_count = services()
- .rooms
- .state_cache
- .get_left_count(&room_id, &sender_user)?;
-
- // Left before last sync
- if Some(since) >= left_count {
- continue;
- }
-
- if !services().rooms.metadata.exists(&room_id)? {
- // This is just a rejected invite, not a room we know
- continue;
- }
-
- let since_shortstatehash = services()
- .rooms
- .user
- .get_token_shortstatehash(&room_id, since)?;
- let since_state_ids = match since_shortstatehash {
- Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
- None => HashMap::new(),
- };
-
- let left_event_id = match services().rooms.state_accessor.room_state_get_id(
- &room_id,
- &StateEventType::RoomMember,
- sender_user.as_str(),
- )? {
- Some(e) => e,
- None => {
- error!("Left room but no left state event");
- continue;
- }
- };
-
- let left_shortstatehash = match services()
- .rooms
- .state_accessor
- .pdu_shortstatehash(&left_event_id)?
- {
- Some(s) => s,
- None => {
- error!("Leave event has no state");
- continue;
- }
- };
-
- let mut left_state_ids = services()
- .rooms
- .state_accessor
- .state_full_ids(left_shortstatehash)
- .await?;
-
- let leave_shortstatekey = services()
- .rooms
- .short
- .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
-
- left_state_ids.insert(leave_shortstatekey, left_event_id);
-
- let mut i = 0;
- for (key, id) in left_state_ids {
- if body.full_state || since_state_ids.get(&key) != Some(&id) {
- let (event_type, state_key) =
- services().rooms.short.get_statekey_from_short(key)?;
-
- if !lazy_load_enabled
- || event_type != StateEventType::RoomMember
- || body.full_state
- // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
- || *sender_user == state_key
- {
- let pdu = match services().rooms.timeline.get_pdu(&id)? {
- Some(pdu) => pdu,
- None => {
- error!("Pdu in state not found: {}", id);
- continue;
- }
- };
-
- left_state_events.push(pdu.to_sync_state_event());
-
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
- }
- }
- }
- }
-
- left_rooms.insert(
- room_id.clone(),
- LeftRoom {
- account_data: RoomAccountData { events: Vec::new() },
- timeline: Timeline {
- limited: false,
- prev_batch: Some(next_batch_string.clone()),
- events: Vec::new(),
- },
- state: State {
- events: left_state_events,
- },
- },
- );
- }
-
- let mut invited_rooms = BTreeMap::new();
- let all_invited_rooms: Vec<_> = services()
+ let mut edus: Vec<_> = services()
.rooms
- .state_cache
- .rooms_invited(&sender_user)
+ .edus
+ .read_receipt
+ .readreceipts_since(&room_id, since)
+ .filter_map(|r| r.ok()) // Filter out buggy events
+ .map(|(_, _, v)| v)
.collect();
- for result in all_invited_rooms {
- let (room_id, invite_state_events) = result?;
- {
- // Get and drop the lock to wait for remaining operations to finish
- let mutex_insert = Arc::clone(
- services()
- .globals
- .roomid_mutex_insert
- .write()
- .unwrap()
- .entry(room_id.clone())
- .or_default(),
- );
- let insert_lock = mutex_insert.lock().unwrap();
- drop(insert_lock);
- }
-
- let invite_count = services()
- .rooms
- .state_cache
- .get_invite_count(&room_id, &sender_user)?;
-
- // Invited before last sync
- if Some(since) >= invite_count {
- continue;
- }
-
- invited_rooms.insert(
- room_id.clone(),
- InvitedRoom {
- invite_state: InviteState {
- events: invite_state_events,
- },
- },
+ if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
+ edus.push(
+ serde_json::from_str(
+ &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?)
+ .expect("event is valid, we just created it"),
+ )
+ .expect("event is valid, we just created it"),
);
}
- for user_id in left_encrypted_users {
- let still_share_encrypted_room = services()
- .rooms
- .user
- .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
- .filter_map(|r| r.ok())
- .filter_map(|other_room_id| {
- Some(
- services()
- .rooms
- .state_accessor
- .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
- .ok()?
- .is_some(),
- )
- })
- .all(|encrypted| !encrypted);
- // If the user doesn't share an encrypted room with the target anymore, we need to tell
- // them
- if still_share_encrypted_room {
- device_list_left.insert(user_id);
- }
- }
-
- // Remove all to-device events the device received *last time*
- services()
- .users
- .remove_to_device_events(&sender_user, &sender_device, since)?;
+ // Save the state after this sync so we can send the correct state diff next sync
+ services().rooms.user.associate_token_shortstatehash(
+ &room_id,
+ next_batch,
+ current_shortstatehash,
+ )?;
- let response = sync_events::v3::Response {
- next_batch: next_batch_string,
- rooms: Rooms {
- leave: left_rooms,
- join: joined_rooms,
- invite: invited_rooms,
- knock: BTreeMap::new(), // TODO
- },
- presence: Presence {
- events: presence_updates
- .into_values()
- .map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully"))
- .collect(),
- },
- account_data: GlobalAccountData {
+ Ok(JoinedRoom {
+ account_data: RoomAccountData {
events: services()
.account_data
- .changes_since(None, &sender_user, since)?
+ .changes_since(Some(&room_id), &sender_user, since)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
@@ -1058,41 +1099,29 @@ async fn sync_helper(
})
.collect(),
},
- device_lists: DeviceLists {
- changed: device_list_updates.into_iter().collect(),
- left: device_list_left.into_iter().collect(),
+ summary: RoomSummary {
+ heroes,
+ joined_member_count: joined_member_count.map(|n| (n as u32).into()),
+ invited_member_count: invited_member_count.map(|n| (n as u32).into()),
},
- device_one_time_keys_count: services()
- .users
- .count_one_time_keys(&sender_user, &sender_device)?,
- to_device: ToDevice {
- events: services()
- .users
- .get_to_device_events(&sender_user, &sender_device)?,
+ unread_notifications: UnreadNotificationsCount {
+ highlight_count,
+ notification_count,
},
- // Fallback keys are not yet supported
- device_unused_fallback_key_types: None,
- };
-
- // TODO: Retry the endpoint instead of returning (waiting for #118)
- if !body.full_state
- && response.rooms.is_empty()
- && response.presence.is_empty()
- && response.account_data.is_empty()
- && response.device_lists.is_empty()
- && response.to_device.is_empty()
- {
- // Hang a few seconds so requests are not spammed
- // Stop hanging if new info arrives
- let mut duration = body.timeout.unwrap_or_default();
- if duration.as_secs() > 30 {
- duration = Duration::from_secs(30);
- }
- let _ = tokio::time::timeout(duration, watcher).await;
- Ok((response, false))
- } else {
- Ok((response, since != next_batch)) // Only cache if we made progress
- }
+ timeline: Timeline {
+ limited: limited || joined_since_last_sync,
+ prev_batch,
+ events: room_events,
+ },
+ state: State {
+ events: state_events
+ .iter()
+ .map(|pdu| pdu.to_sync_state_event())
+ .collect(),
+ },
+ ephemeral: Ephemeral { events: edus },
+ unread_thread_notifications: BTreeMap::new(),
+ })
}
fn share_encrypted_room(