summaryrefslogtreecommitdiff
path: root/src/api/client_server/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/client_server/sync.rs')
-rw-r--r--src/api/client_server/sync.rs1827
1 files changed, 1223 insertions, 604 deletions
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index 568a23c..dd815b5 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -1,19 +1,29 @@
-use crate::{services, Error, Result, Ruma, RumaResponse};
+use crate::{
+ service::rooms::timeline::PduCount, services, Error, PduEvent, Result, Ruma, RumaResponse,
+};
use ruma::{
api::client::{
filter::{FilterDefinition, LazyLoadOptions},
- sync::sync_events::{self, DeviceLists, UnreadNotificationsCount},
+ sync::sync_events::{
+ self,
+ v3::{
+ Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
+ LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
+ },
+ v4::SlidingOp,
+ DeviceLists, UnreadNotificationsCount,
+ },
uiaa::UiaaResponse,
},
events::{
room::member::{MembershipState, RoomMemberEventContent},
- RoomEventType, StateEventType,
+ StateEventType, TimelineEventType,
},
serde::Raw,
- OwnedDeviceId, OwnedUserId, RoomId, UserId,
+ uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
};
use std::{
- collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
+ collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
time::Duration,
};
@@ -160,11 +170,6 @@ async fn sync_helper(
body: sync_events::v3::Request,
// bool = caching allowed
) -> Result<(sync_events::v3::Response, bool), Error> {
- use sync_events::v3::{
- Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, LeftRoom,
- Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
- };
-
// TODO: match body.set_presence {
services().rooms.edus.presence.ping_presence(&sender_user)?;
@@ -172,6 +177,7 @@ async fn sync_helper(
let watcher = services().globals.watch(&sender_user, &sender_device);
let next_batch = services().globals.current_count()?;
+ let next_batchcount = PduCount::Normal(next_batch);
let next_batch_string = next_batch.to_string();
// Load filter
@@ -191,12 +197,15 @@ async fn sync_helper(
_ => (false, false),
};
+ let full_state = body.full_state;
+
let mut joined_rooms = BTreeMap::new();
let since = body
.since
- .clone()
+ .as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
+ let sincecount = PduCount::Normal(since);
let mut presence_updates = HashMap::new();
let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
@@ -218,610 +227,60 @@ async fn sync_helper(
.collect::<Vec<_>>();
for room_id in all_joined_rooms {
let room_id = room_id?;
-
- {
- // Get and drop the lock to wait for remaining operations to finish
- // This will make sure the we have all events until next_batch
- let mutex_insert = Arc::clone(
- services()
- .globals
- .roomid_mutex_insert
- .write()
- .unwrap()
- .entry(room_id.clone())
- .or_default(),
- );
- let insert_lock = mutex_insert.lock().unwrap();
- drop(insert_lock);
- }
-
- let timeline_pdus;
- let limited;
- if services()
- .rooms
- .timeline
- .last_timeline_count(&sender_user, &room_id)?
- > since
- {
- let mut non_timeline_pdus = services()
- .rooms
- .timeline
- .pdus_until(&sender_user, &room_id, u64::MAX)?
- .filter_map(|r| {
- // Filter out buggy events
- if r.is_err() {
- error!("Bad pdu in pdus_since: {:?}", r);
- }
- r.ok()
- })
- .take_while(|(pduid, _)| {
- services()
- .rooms
- .timeline
- .pdu_count(pduid)
- .map_or(false, |count| count > since)
- });
-
- // Take the last 10 events for the timeline
- timeline_pdus = non_timeline_pdus
- .by_ref()
- .take(10)
- .collect::<Vec<_>>()
- .into_iter()
- .rev()
- .collect::<Vec<_>>();
-
- // They /sync response doesn't always return all messages, so we say the output is
- // limited unless there are events in non_timeline_pdus
- limited = non_timeline_pdus.next().is_some();
- } else {
- timeline_pdus = Vec::new();
- limited = false;
- }
-
- let send_notification_counts = !timeline_pdus.is_empty()
- || services()
- .rooms
- .user
- .last_notification_read(&sender_user, &room_id)?
- > since;
-
- let mut timeline_users = HashSet::new();
- for (_, event) in &timeline_pdus {
- timeline_users.insert(event.sender.as_str().to_owned());
- }
-
- services().rooms.lazy_loading.lazy_load_confirm_delivery(
+ if let Ok(joined_room) = load_joined_room(
&sender_user,
&sender_device,
&room_id,
since,
- )?;
-
- // Database queries:
-
- let current_shortstatehash =
- if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
- s
- } else {
- error!("Room {} has no state", room_id);
- continue;
- };
-
- let since_shortstatehash = services()
- .rooms
- .user
- .get_token_shortstatehash(&room_id, since)?;
-
- // Calculates joined_member_count, invited_member_count and heroes
- let calculate_counts = || {
- let joined_member_count = services()
- .rooms
- .state_cache
- .room_joined_count(&room_id)?
- .unwrap_or(0);
- let invited_member_count = services()
- .rooms
- .state_cache
- .room_invited_count(&room_id)?
- .unwrap_or(0);
-
- // Recalculate heroes (first 5 members)
- let mut heroes = Vec::new();
-
- if joined_member_count + invited_member_count <= 5 {
- // Go through all PDUs and for each member event, check if the user is still joined or
- // invited until we have 5 or we reach the end
-
- for hero in services()
- .rooms
- .timeline
- .all_pdus(&sender_user, &room_id)?
- .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
- .filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember)
- .map(|(_, pdu)| {
- let content: RoomMemberEventContent =
- serde_json::from_str(pdu.content.get()).map_err(|_| {
- Error::bad_database("Invalid member event in database.")
- })?;
-
- if let Some(state_key) = &pdu.state_key {
- let user_id = UserId::parse(state_key.clone()).map_err(|_| {
- Error::bad_database("Invalid UserId in member PDU.")
- })?;
-
- // The membership was and still is invite or join
- if matches!(
- content.membership,
- MembershipState::Join | MembershipState::Invite
- ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)?
- || services()
- .rooms
- .state_cache
- .is_invited(&user_id, &room_id)?)
- {
- Ok::<_, Error>(Some(state_key.clone()))
- } else {
- Ok(None)
- }
- } else {
- Ok(None)
- }
- })
- // Filter out buggy users
- .filter_map(|u| u.ok())
- // Filter for possible heroes
- .flatten()
- {
- if heroes.contains(&hero) || hero == sender_user.as_str() {
- continue;
- }
-
- heroes.push(hero);
- }
+ sincecount,
+ next_batch,
+ next_batchcount,
+ lazy_load_enabled,
+ lazy_load_send_redundant,
+ full_state,
+ &mut device_list_updates,
+ &mut left_encrypted_users,
+ )
+ .await
+ {
+ if !joined_room.is_empty() {
+ joined_rooms.insert(room_id.clone(), joined_room);
}
- Ok::<_, Error>((
- Some(joined_member_count),
- Some(invited_member_count),
- heroes,
- ))
- };
-
- let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
- .and_then(|shortstatehash| {
- services()
- .rooms
- .state_accessor
- .state_get(
- shortstatehash,
- &StateEventType::RoomMember,
- sender_user.as_str(),
- )
- .transpose()
- })
- .transpose()?
- .and_then(|pdu| {
- serde_json::from_str(pdu.content.get())
- .map_err(|_| Error::bad_database("Invalid PDU in database."))
- .ok()
- });
-
- let joined_since_last_sync =
- since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
-
- let (
- heroes,
- joined_member_count,
- invited_member_count,
- joined_since_last_sync,
- state_events,
- ) = if since_shortstatehash.is_none() || joined_since_last_sync {
- // Probably since = 0, we will do an initial sync
-
- let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
-
- let current_state_ids = services()
+ // Take presence updates from this room
+ for (user_id, presence) in services()
.rooms
- .state_accessor
- .state_full_ids(current_shortstatehash)
- .await?;
-
- let mut state_events = Vec::new();
- let mut lazy_loaded = HashSet::new();
-
- let mut i = 0;
- for (shortstatekey, id) in current_state_ids {
- let (event_type, state_key) = services()
- .rooms
- .short
- .get_statekey_from_short(shortstatekey)?;
+ .edus
+ .presence
+ .presence_since(&room_id, since)?
+ {
+ match presence_updates.entry(user_id) {
+ Entry::Vacant(v) => {
+ v.insert(presence);
+ }
+ Entry::Occupied(mut o) => {
+ let p = o.get_mut();
- if event_type != StateEventType::RoomMember {
- let pdu = match services().rooms.timeline.get_pdu(&id)? {
- Some(pdu) => pdu,
- None => {
- error!("Pdu in state not found: {}", id);
- continue;
+ // Update existing presence event with more info
+ p.content.presence = presence.content.presence;
+ if let Some(status_msg) = presence.content.status_msg {
+ p.content.status_msg = Some(status_msg);
}
- };
- state_events.push(pdu);
-
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
- }
- } else if !lazy_load_enabled
- || body.full_state
- || timeline_users.contains(&state_key)
- // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
- || *sender_user == state_key
- {
- let pdu = match services().rooms.timeline.get_pdu(&id)? {
- Some(pdu) => pdu,
- None => {
- error!("Pdu in state not found: {}", id);
- continue;
+ if let Some(last_active_ago) = presence.content.last_active_ago {
+ p.content.last_active_ago = Some(last_active_ago);
}
- };
-
- // This check is in case a bad user ID made it into the database
- if let Ok(uid) = UserId::parse(&state_key) {
- lazy_loaded.insert(uid);
- }
- state_events.push(pdu);
-
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
- }
- }
- }
-
- // Reset lazy loading because this is an initial sync
- services().rooms.lazy_loading.lazy_load_reset(
- &sender_user,
- &sender_device,
- &room_id,
- )?;
-
- // The state_events above should contain all timeline_users, let's mark them as lazy
- // loaded.
- services().rooms.lazy_loading.lazy_load_mark_sent(
- &sender_user,
- &sender_device,
- &room_id,
- lazy_loaded,
- next_batch,
- );
-
- (
- heroes,
- joined_member_count,
- invited_member_count,
- true,
- state_events,
- )
- } else if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
- // No state changes
- (Vec::new(), None, None, false, Vec::new())
- } else {
- // Incremental /sync
- let since_shortstatehash = since_shortstatehash.unwrap();
-
- let mut state_events = Vec::new();
- let mut lazy_loaded = HashSet::new();
-
- if since_shortstatehash != current_shortstatehash {
- let current_state_ids = services()
- .rooms
- .state_accessor
- .state_full_ids(current_shortstatehash)
- .await?;
- let since_state_ids = services()
- .rooms
- .state_accessor
- .state_full_ids(since_shortstatehash)
- .await?;
-
- for (key, id) in current_state_ids {
- if body.full_state || since_state_ids.get(&key) != Some(&id) {
- let pdu = match services().rooms.timeline.get_pdu(&id)? {
- Some(pdu) => pdu,
- None => {
- error!("Pdu in state not found: {}", id);
- continue;
- }
- };
-
- if pdu.kind == RoomEventType::RoomMember {
- match UserId::parse(
- pdu.state_key
- .as_ref()
- .expect("State event has state key")
- .clone(),
- ) {
- Ok(state_key_userid) => {
- lazy_loaded.insert(state_key_userid);
- }
- Err(e) => error!("Invalid state key for member event: {}", e),
- }
+ if let Some(displayname) = presence.content.displayname {
+ p.content.displayname = Some(displayname);
}
-
- state_events.push(pdu);
- tokio::task::yield_now().await;
- }
- }
- }
-
- for (_, event) in &timeline_pdus {
- if lazy_loaded.contains(&event.sender) {
- continue;
- }
-
- if !services().rooms.lazy_loading.lazy_load_was_sent_before(
- &sender_user,
- &sender_device,
- &room_id,
- &event.sender,
- )? || lazy_load_send_redundant
- {
- if let Some(member_event) = services().rooms.state_accessor.room_state_get(
- &room_id,
- &StateEventType::RoomMember,
- event.sender.as_str(),
- )? {
- lazy_loaded.insert(event.sender.clone());
- state_events.push(member_event);
- }
- }
- }
-
- services().rooms.lazy_loading.lazy_load_mark_sent(
- &sender_user,
- &sender_device,
- &room_id,
- lazy_loaded,
- next_batch,
- );
-
- let encrypted_room = services()
- .rooms
- .state_accessor
- .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
- .is_some();
-
- let since_encryption = services().rooms.state_accessor.state_get(
- since_shortstatehash,
- &StateEventType::RoomEncryption,
- "",
- )?;
-
- // Calculations:
- let new_encrypted_room = encrypted_room && since_encryption.is_none();
-
- let send_member_count = state_events
- .iter()
- .any(|event| event.kind == RoomEventType::RoomMember);
-
- if encrypted_room {
- for state_event in &state_events {
- if state_event.kind != RoomEventType::RoomMember {
- continue;
- }
-
- if let Some(state_key) = &state_event.state_key {
- let user_id = UserId::parse(state_key.clone())
- .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
-
- if user_id == sender_user {
- continue;
+ if let Some(avatar_url) = presence.content.avatar_url {
+ p.content.avatar_url = Some(avatar_url);
}
-
- let new_membership = serde_json::from_str::<RoomMemberEventContent>(
- state_event.content.get(),
- )
- .map_err(|_| Error::bad_database("Invalid PDU in database."))?
- .membership;
-
- match new_membership {
- MembershipState::Join => {
- // A new user joined an encrypted room
- if !share_encrypted_room(&sender_user, &user_id, &room_id)? {
- device_list_updates.insert(user_id);
- }
- }
- MembershipState::Leave => {
- // Write down users that have left encrypted rooms we are in
- left_encrypted_users.insert(user_id);
- }
- _ => {}
+ if let Some(currently_active) = presence.content.currently_active {
+ p.content.currently_active = Some(currently_active);
}
}
}
}
-
- if joined_since_last_sync && encrypted_room || new_encrypted_room {
- // If the user is in a new encrypted room, give them all joined users
- device_list_updates.extend(
- services()
- .rooms
- .state_cache
- .room_members(&room_id)
- .flatten()
- .filter(|user_id| {
- // Don't send key updates from the sender to the sender
- &sender_user != user_id
- })
- .filter(|user_id| {
- // Only send keys if the sender doesn't share an encrypted room with the target already
- !share_encrypted_room(&sender_user, user_id, &room_id).unwrap_or(false)
- }),
- );
- }
-
- let (joined_member_count, invited_member_count, heroes) = if send_member_count {
- calculate_counts()?
- } else {
- (None, None, Vec::new())
- };
-
- (
- heroes,
- joined_member_count,
- invited_member_count,
- joined_since_last_sync,
- state_events,
- )
- };
-
- // Look for device list updates in this room
- device_list_updates.extend(
- services()
- .users
- .keys_changed(room_id.as_ref(), since, None)
- .filter_map(|r| r.ok()),
- );
-
- let notification_count = if send_notification_counts {
- Some(
- services()
- .rooms
- .user
- .notification_count(&sender_user, &room_id)?
- .try_into()
- .expect("notification count can't go that high"),
- )
- } else {
- None
- };
-
- let highlight_count = if send_notification_counts {
- Some(
- services()
- .rooms
- .user
- .highlight_count(&sender_user, &room_id)?
- .try_into()
- .expect("highlight count can't go that high"),
- )
- } else {
- None
- };
-
- let prev_batch = timeline_pdus
- .first()
- .map_or(Ok::<_, Error>(None), |(pdu_id, _)| {
- Ok(Some(
- services().rooms.timeline.pdu_count(pdu_id)?.to_string(),
- ))
- })?;
-
- let room_events: Vec<_> = timeline_pdus
- .iter()
- .map(|(_, pdu)| pdu.to_sync_room_event())
- .collect();
-
- let mut edus: Vec<_> = services()
- .rooms
- .edus
- .read_receipt
- .readreceipts_since(&room_id, since)
- .filter_map(|r| r.ok()) // Filter out buggy events
- .map(|(_, _, v)| v)
- .collect();
-
- if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
- edus.push(
- serde_json::from_str(
- &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?)
- .expect("event is valid, we just created it"),
- )
- .expect("event is valid, we just created it"),
- );
- }
-
- // Save the state after this sync so we can send the correct state diff next sync
- services().rooms.user.associate_token_shortstatehash(
- &room_id,
- next_batch,
- current_shortstatehash,
- )?;
-
- let joined_room = JoinedRoom {
- account_data: RoomAccountData {
- events: services()
- .account_data
- .changes_since(Some(&room_id), &sender_user, since)?
- .into_iter()
- .filter_map(|(_, v)| {
- serde_json::from_str(v.json().get())
- .map_err(|_| Error::bad_database("Invalid account event in database."))
- .ok()
- })
- .collect(),
- },
- summary: RoomSummary {
- heroes,
- joined_member_count: joined_member_count.map(|n| (n as u32).into()),
- invited_member_count: invited_member_count.map(|n| (n as u32).into()),
- },
- unread_notifications: UnreadNotificationsCount {
- highlight_count,
- notification_count,
- },
- timeline: Timeline {
- limited: limited || joined_since_last_sync,
- prev_batch,
- events: room_events,
- },
- state: State {
- events: state_events
- .iter()
- .map(|pdu| pdu.to_sync_state_event())
- .collect(),
- },
- ephemeral: Ephemeral { events: edus },
- unread_thread_notifications: BTreeMap::new(),
- };
-
- if !joined_room.is_empty() {
- joined_rooms.insert(room_id.clone(), joined_room);
- }
-
- // Take presence updates from this room
- for (user_id, presence) in services()
- .rooms
- .edus
- .presence
- .presence_since(&room_id, since)?
- {
- match presence_updates.entry(user_id) {
- Entry::Vacant(v) => {
- v.insert(presence);
- }
- Entry::Occupied(mut o) => {
- let p = o.get_mut();
-
- // Update existing presence event with more info
- p.content.presence = presence.content.presence;
- if let Some(status_msg) = presence.content.status_msg {
- p.content.status_msg = Some(status_msg);
- }
- if let Some(last_active_ago) = presence.content.last_active_ago {
- p.content.last_active_ago = Some(last_active_ago);
- }
- if let Some(displayname) = presence.content.displayname {
- p.content.displayname = Some(displayname);
- }
- if let Some(avatar_url) = presence.content.avatar_url {
- p.content.avatar_url = Some(avatar_url);
- }
- if let Some(currently_active) = presence.content.currently_active {
- p.content.currently_active = Some(currently_active);
- }
- }
- }
}
}
@@ -915,13 +374,13 @@ async fn sync_helper(
let mut i = 0;
for (key, id) in left_state_ids {
- if body.full_state || since_state_ids.get(&key) != Some(&id) {
+ if full_state || since_state_ids.get(&key) != Some(&id) {
let (event_type, state_key) =
services().rooms.short.get_statekey_from_short(key)?;
if !lazy_load_enabled
|| event_type != StateEventType::RoomMember
- || body.full_state
+ || full_state
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key
{
@@ -1004,7 +463,7 @@ async fn sync_helper(
}
for user_id in left_encrypted_users {
- let still_share_encrypted_room = services()
+ let dont_share_encrypted_room = services()
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
@@ -1022,7 +481,7 @@ async fn sync_helper(
.all(|encrypted| !encrypted);
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
- if still_share_encrypted_room {
+ if dont_share_encrypted_room {
device_list_left.insert(user_id);
}
}
@@ -1075,7 +534,7 @@ async fn sync_helper(
};
// TODO: Retry the endpoint instead of returning (waiting for #118)
- if !body.full_state
+ if !full_state
&& response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
@@ -1095,6 +554,597 @@ async fn sync_helper(
}
}
+async fn load_joined_room(
+ sender_user: &UserId,
+ sender_device: &DeviceId,
+ room_id: &RoomId,
+ since: u64,
+ sincecount: PduCount,
+ next_batch: u64,
+ next_batchcount: PduCount,
+ lazy_load_enabled: bool,
+ lazy_load_send_redundant: bool,
+ full_state: bool,
+ device_list_updates: &mut HashSet<OwnedUserId>,
+ left_encrypted_users: &mut HashSet<OwnedUserId>,
+) -> Result<JoinedRoom> {
+ {
+ // Get and drop the lock to wait for remaining operations to finish
+ // This will make sure the we have all events until next_batch
+ let mutex_insert = Arc::clone(
+ services()
+ .globals
+ .roomid_mutex_insert
+ .write()
+ .unwrap()
+ .entry(room_id.to_owned())
+ .or_default(),
+ );
+ let insert_lock = mutex_insert.lock().unwrap();
+ drop(insert_lock);
+ }
+
+ let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?;
+
+ let send_notification_counts = !timeline_pdus.is_empty()
+ || services()
+ .rooms
+ .user
+ .last_notification_read(&sender_user, &room_id)?
+ > since;
+
+ let mut timeline_users = HashSet::new();
+ for (_, event) in &timeline_pdus {
+ timeline_users.insert(event.sender.as_str().to_owned());
+ }
+
+ services().rooms.lazy_loading.lazy_load_confirm_delivery(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ sincecount,
+ )?;
+
+ // Database queries:
+
+ let current_shortstatehash =
+ if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
+ s
+ } else {
+ error!("Room {} has no state", room_id);
+ return Err(Error::BadDatabase("Room has no state"));
+ };
+
+ let since_shortstatehash = services()
+ .rooms
+ .user
+ .get_token_shortstatehash(&room_id, since)?;
+
+ let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) =
+ if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
+ // No state changes
+ (Vec::new(), None, None, false, Vec::new())
+ } else {
+ // Calculates joined_member_count, invited_member_count and heroes
+ let calculate_counts = || {
+ let joined_member_count = services()
+ .rooms
+ .state_cache
+ .room_joined_count(&room_id)?
+ .unwrap_or(0);
+ let invited_member_count = services()
+ .rooms
+ .state_cache
+ .room_invited_count(&room_id)?
+ .unwrap_or(0);
+
+ // Recalculate heroes (first 5 members)
+ let mut heroes = Vec::new();
+
+ if joined_member_count + invited_member_count <= 5 {
+ // Go through all PDUs and for each member event, check if the user is still joined or
+ // invited until we have 5 or we reach the end
+
+ for hero in services()
+ .rooms
+ .timeline
+ .all_pdus(&sender_user, &room_id)?
+ .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
+ .filter(|(_, pdu)| pdu.kind == TimelineEventType::RoomMember)
+ .map(|(_, pdu)| {
+ let content: RoomMemberEventContent =
+ serde_json::from_str(pdu.content.get()).map_err(|_| {
+ Error::bad_database("Invalid member event in database.")
+ })?;
+
+ if let Some(state_key) = &pdu.state_key {
+ let user_id = UserId::parse(state_key.clone()).map_err(|_| {
+ Error::bad_database("Invalid UserId in member PDU.")
+ })?;
+
+ // The membership was and still is invite or join
+ if matches!(
+ content.membership,
+ MembershipState::Join | MembershipState::Invite
+ ) && (services()
+ .rooms
+ .state_cache
+ .is_joined(&user_id, &room_id)?
+ || services()
+ .rooms
+ .state_cache
+ .is_invited(&user_id, &room_id)?)
+ {
+ Ok::<_, Error>(Some(state_key.clone()))
+ } else {
+ Ok(None)
+ }
+ } else {
+ Ok(None)
+ }
+ })
+ // Filter out buggy users
+ .filter_map(|u| u.ok())
+ // Filter for possible heroes
+ .flatten()
+ {
+ if heroes.contains(&hero) || hero == sender_user.as_str() {
+ continue;
+ }
+
+ heroes.push(hero);
+ }
+ }
+
+ Ok::<_, Error>((
+ Some(joined_member_count),
+ Some(invited_member_count),
+ heroes,
+ ))
+ };
+
+ let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
+ .and_then(|shortstatehash| {
+ services()
+ .rooms
+ .state_accessor
+ .state_get(
+ shortstatehash,
+ &StateEventType::RoomMember,
+ sender_user.as_str(),
+ )
+ .transpose()
+ })
+ .transpose()?
+ .and_then(|pdu| {
+ serde_json::from_str(pdu.content.get())
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))
+ .ok()
+ });
+
+ let joined_since_last_sync = since_sender_member
+ .map_or(true, |member| member.membership != MembershipState::Join);
+
+ if since_shortstatehash.is_none() || joined_since_last_sync {
+ // Probably since = 0, we will do an initial sync
+
+ let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
+
+ let current_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(current_shortstatehash)
+ .await?;
+
+ let mut state_events = Vec::new();
+ let mut lazy_loaded = HashSet::new();
+
+ let mut i = 0;
+ for (shortstatekey, id) in current_state_ids {
+ let (event_type, state_key) = services()
+ .rooms
+ .short
+ .get_statekey_from_short(shortstatekey)?;
+
+ if event_type != StateEventType::RoomMember {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+ state_events.push(pdu);
+
+ i += 1;
+ if i % 100 == 0 {
+ tokio::task::yield_now().await;
+ }
+ } else if !lazy_load_enabled
+ || full_state
+ || timeline_users.contains(&state_key)
+ // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
+ || *sender_user == state_key
+ {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+
+ // This check is in case a bad user ID made it into the database
+ if let Ok(uid) = UserId::parse(&state_key) {
+ lazy_loaded.insert(uid);
+ }
+ state_events.push(pdu);
+
+ i += 1;
+ if i % 100 == 0 {
+ tokio::task::yield_now().await;
+ }
+ }
+ }
+
+ // Reset lazy loading because this is an initial sync
+ services().rooms.lazy_loading.lazy_load_reset(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ )?;
+
+ // The state_events above should contain all timeline_users, let's mark them as lazy
+ // loaded.
+ services().rooms.lazy_loading.lazy_load_mark_sent(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ lazy_loaded,
+ next_batchcount,
+ );
+
+ (
+ heroes,
+ joined_member_count,
+ invited_member_count,
+ true,
+ state_events,
+ )
+ } else {
+ // Incremental /sync
+ let since_shortstatehash = since_shortstatehash.unwrap();
+
+ let mut state_events = Vec::new();
+ let mut lazy_loaded = HashSet::new();
+
+ if since_shortstatehash != current_shortstatehash {
+ let current_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(current_shortstatehash)
+ .await?;
+ let since_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(since_shortstatehash)
+ .await?;
+
+ for (key, id) in current_state_ids {
+ if full_state || since_state_ids.get(&key) != Some(&id) {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+
+ if pdu.kind == TimelineEventType::RoomMember {
+ match UserId::parse(
+ pdu.state_key
+ .as_ref()
+ .expect("State event has state key")
+ .clone(),
+ ) {
+ Ok(state_key_userid) => {
+ lazy_loaded.insert(state_key_userid);
+ }
+ Err(e) => error!("Invalid state key for member event: {}", e),
+ }
+ }
+
+ state_events.push(pdu);
+ tokio::task::yield_now().await;
+ }
+ }
+ }
+
+ for (_, event) in &timeline_pdus {
+ if lazy_loaded.contains(&event.sender) {
+ continue;
+ }
+
+ if !services().rooms.lazy_loading.lazy_load_was_sent_before(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ &event.sender,
+ )? || lazy_load_send_redundant
+ {
+ if let Some(member_event) = services().rooms.state_accessor.room_state_get(
+ &room_id,
+ &StateEventType::RoomMember,
+ event.sender.as_str(),
+ )? {
+ lazy_loaded.insert(event.sender.clone());
+ state_events.push(member_event);
+ }
+ }
+ }
+
+ services().rooms.lazy_loading.lazy_load_mark_sent(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ lazy_loaded,
+ next_batchcount,
+ );
+
+ let encrypted_room = services()
+ .rooms
+ .state_accessor
+ .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
+ .is_some();
+
+ let since_encryption = services().rooms.state_accessor.state_get(
+ since_shortstatehash,
+ &StateEventType::RoomEncryption,
+ "",
+ )?;
+
+ // Calculations:
+ let new_encrypted_room = encrypted_room && since_encryption.is_none();
+
+ let send_member_count = state_events
+ .iter()
+ .any(|event| event.kind == TimelineEventType::RoomMember);
+
+ if encrypted_room {
+ for state_event in &state_events {
+ if state_event.kind != TimelineEventType::RoomMember {
+ continue;
+ }
+
+ if let Some(state_key) = &state_event.state_key {
+ let user_id = UserId::parse(state_key.clone()).map_err(|_| {
+ Error::bad_database("Invalid UserId in member PDU.")
+ })?;
+
+ if user_id == sender_user {
+ continue;
+ }
+
+ let new_membership = serde_json::from_str::<RoomMemberEventContent>(
+ state_event.content.get(),
+ )
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))?
+ .membership;
+
+ match new_membership {
+ MembershipState::Join => {
+ // A new user joined an encrypted room
+ if !share_encrypted_room(&sender_user, &user_id, &room_id)? {
+ device_list_updates.insert(user_id);
+ }
+ }
+ MembershipState::Leave => {
+ // Write down users that have left encrypted rooms we are in
+ left_encrypted_users.insert(user_id);
+ }
+ _ => {}
+ }
+ }
+ }
+ }
+
+ if joined_since_last_sync && encrypted_room || new_encrypted_room {
+ // If the user is in a new encrypted room, give them all joined users
+ device_list_updates.extend(
+ services()
+ .rooms
+ .state_cache
+ .room_members(&room_id)
+ .flatten()
+ .filter(|user_id| {
+ // Don't send key updates from the sender to the sender
+ &sender_user != user_id
+ })
+ .filter(|user_id| {
+ // Only send keys if the sender doesn't share an encrypted room with the target already
+ !share_encrypted_room(&sender_user, user_id, &room_id)
+ .unwrap_or(false)
+ }),
+ );
+ }
+
+ let (joined_member_count, invited_member_count, heroes) = if send_member_count {
+ calculate_counts()?
+ } else {
+ (None, None, Vec::new())
+ };
+
+ (
+ heroes,
+ joined_member_count,
+ invited_member_count,
+ joined_since_last_sync,
+ state_events,
+ )
+ }
+ };
+
+ // Look for device list updates in this room
+ device_list_updates.extend(
+ services()
+ .users
+ .keys_changed(room_id.as_ref(), since, None)
+ .filter_map(|r| r.ok()),
+ );
+
+ let notification_count = if send_notification_counts {
+ Some(
+ services()
+ .rooms
+ .user
+ .notification_count(&sender_user, &room_id)?
+ .try_into()
+ .expect("notification count can't go that high"),
+ )
+ } else {
+ None
+ };
+
+ let highlight_count = if send_notification_counts {
+ Some(
+ services()
+ .rooms
+ .user
+ .highlight_count(&sender_user, &room_id)?
+ .try_into()
+ .expect("highlight count can't go that high"),
+ )
+ } else {
+ None
+ };
+
+ let prev_batch = timeline_pdus
+ .first()
+ .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
+ Ok(Some(match pdu_count {
+ PduCount::Backfilled(_) => {
+ error!("timeline in backfill state?!");
+ "0".to_owned()
+ }
+ PduCount::Normal(c) => c.to_string(),
+ }))
+ })?;
+
+ let room_events: Vec<_> = timeline_pdus
+ .iter()
+ .map(|(_, pdu)| pdu.to_sync_room_event())
+ .collect();
+
+ let mut edus: Vec<_> = services()
+ .rooms
+ .edus
+ .read_receipt
+ .readreceipts_since(&room_id, since)
+ .filter_map(|r| r.ok()) // Filter out buggy events
+ .map(|(_, _, v)| v)
+ .collect();
+
+ if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
+ edus.push(
+ serde_json::from_str(
+ &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?)
+ .expect("event is valid, we just created it"),
+ )
+ .expect("event is valid, we just created it"),
+ );
+ }
+
+ // Save the state after this sync so we can send the correct state diff next sync
+ services().rooms.user.associate_token_shortstatehash(
+ &room_id,
+ next_batch,
+ current_shortstatehash,
+ )?;
+
+ Ok(JoinedRoom {
+ account_data: RoomAccountData {
+ events: services()
+ .account_data
+ .changes_since(Some(&room_id), &sender_user, since)?
+ .into_iter()
+ .filter_map(|(_, v)| {
+ serde_json::from_str(v.json().get())
+ .map_err(|_| Error::bad_database("Invalid account event in database."))
+ .ok()
+ })
+ .collect(),
+ },
+ summary: RoomSummary {
+ heroes,
+ joined_member_count: joined_member_count.map(|n| (n as u32).into()),
+ invited_member_count: invited_member_count.map(|n| (n as u32).into()),
+ },
+ unread_notifications: UnreadNotificationsCount {
+ highlight_count,
+ notification_count,
+ },
+ timeline: Timeline {
+ limited: limited || joined_since_last_sync,
+ prev_batch,
+ events: room_events,
+ },
+ state: State {
+ events: state_events
+ .iter()
+ .map(|pdu| pdu.to_sync_state_event())
+ .collect(),
+ },
+ ephemeral: Ephemeral { events: edus },
+ unread_thread_notifications: BTreeMap::new(),
+ })
+}
+
+fn load_timeline(
+ sender_user: &UserId,
+ room_id: &RoomId,
+ sincecount: PduCount,
+ limit: u64,
+) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
+ let timeline_pdus;
+ let limited;
+ if services()
+ .rooms
+ .timeline
+ .last_timeline_count(&sender_user, &room_id)?
+ > sincecount
+ {
+ let mut non_timeline_pdus = services()
+ .rooms
+ .timeline
+ .pdus_until(&sender_user, &room_id, PduCount::max())?
+ .filter_map(|r| {
+ // Filter out buggy events
+ if r.is_err() {
+ error!("Bad pdu in pdus_since: {:?}", r);
+ }
+ r.ok()
+ })
+ .take_while(|(pducount, _)| pducount > &sincecount);
+
+ // Take the last events for the timeline
+ timeline_pdus = non_timeline_pdus
+ .by_ref()
+ .take(limit as usize)
+ .collect::<Vec<_>>()
+ .into_iter()
+ .rev()
+ .collect::<Vec<_>>();
+
+ // They /sync response doesn't always return all messages, so we say the output is
+ // limited unless there are events in non_timeline_pdus
+ limited = non_timeline_pdus.next().is_some();
+ } else {
+ timeline_pdus = Vec::new();
+ limited = false;
+ }
+ Ok((timeline_pdus, limited))
+}
+
fn share_encrypted_room(
sender_user: &UserId,
user_id: &UserId,
@@ -1118,3 +1168,572 @@ fn share_encrypted_room(
})
.any(|encrypted| encrypted))
}
+
+pub async fn sync_events_v4_route(
+ body: Ruma<sync_events::v4::Request>,
+) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
+ let sender_user = body.sender_user.expect("user is authenticated");
+ let sender_device = body.sender_device.expect("user is authenticated");
+ let mut body = body.body;
+ // Setup watchers, so if there's no response, we can wait for them
+ let watcher = services().globals.watch(&sender_user, &sender_device);
+
+ let next_batch = services().globals.current_count()?;
+
+ let since = body
+ .pos
+ .as_ref()
+ .and_then(|string| string.parse().ok())
+ .unwrap_or(0);
+ let sincecount = PduCount::Normal(since);
+
+ if since == 0 {
+ if let Some(conn_id) = &body.conn_id {
+ services().users.forget_sync_request_connection(
+ sender_user.clone(),
+ sender_device.clone(),
+ conn_id.clone(),
+ )
+ }
+ }
+
+ // Get sticky parameters from cache
+ let known_rooms = services().users.update_sync_request_with_cache(
+ sender_user.clone(),
+ sender_device.clone(),
+ &mut body,
+ );
+
+ let all_joined_rooms = services()
+ .rooms
+ .state_cache
+ .rooms_joined(&sender_user)
+ .filter_map(|r| r.ok())
+ .collect::<Vec<_>>();
+
+ if body.extensions.to_device.enabled.unwrap_or(false) {
+ services()
+ .users
+ .remove_to_device_events(&sender_user, &sender_device, since)?;
+ }
+
+ let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
+ let mut device_list_changes = HashSet::new();
+ let mut device_list_left = HashSet::new();
+
+ if body.extensions.e2ee.enabled.unwrap_or(false) {
+ // Look for device list updates of this account
+ device_list_changes.extend(
+ services()
+ .users
+ .keys_changed(sender_user.as_ref(), since, None)
+ .filter_map(|r| r.ok()),
+ );
+
+ for room_id in &all_joined_rooms {
+ let current_shortstatehash =
+ if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
+ s
+ } else {
+ error!("Room {} has no state", room_id);
+ continue;
+ };
+
+ let since_shortstatehash = services()
+ .rooms
+ .user
+ .get_token_shortstatehash(&room_id, since)?;
+
+ let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
+ .and_then(|shortstatehash| {
+ services()
+ .rooms
+ .state_accessor
+ .state_get(
+ shortstatehash,
+ &StateEventType::RoomMember,
+ sender_user.as_str(),
+ )
+ .transpose()
+ })
+ .transpose()?
+ .and_then(|pdu| {
+ serde_json::from_str(pdu.content.get())
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))
+ .ok()
+ });
+
+ let encrypted_room = services()
+ .rooms
+ .state_accessor
+ .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
+ .is_some();
+
+ if let Some(since_shortstatehash) = since_shortstatehash {
+ // Skip if there are only timeline changes
+ if since_shortstatehash == current_shortstatehash {
+ continue;
+ }
+
+ let since_encryption = services().rooms.state_accessor.state_get(
+ since_shortstatehash,
+ &StateEventType::RoomEncryption,
+ "",
+ )?;
+
+ let joined_since_last_sync = since_sender_member
+ .map_or(true, |member| member.membership != MembershipState::Join);
+
+ let new_encrypted_room = encrypted_room && since_encryption.is_none();
+ if encrypted_room {
+ let current_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(current_shortstatehash)
+ .await?;
+ let since_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(since_shortstatehash)
+ .await?;
+
+ for (key, id) in current_state_ids {
+ if since_state_ids.get(&key) != Some(&id) {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+ if pdu.kind == TimelineEventType::RoomMember {
+ if let Some(state_key) = &pdu.state_key {
+ let user_id =
+ UserId::parse(state_key.clone()).map_err(|_| {
+ Error::bad_database("Invalid UserId in member PDU.")
+ })?;
+
+ if user_id == sender_user {
+ continue;
+ }
+
+ let new_membership = serde_json::from_str::<
+ RoomMemberEventContent,
+ >(
+ pdu.content.get()
+ )
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))?
+ .membership;
+
+ match new_membership {
+ MembershipState::Join => {
+ // A new user joined an encrypted room
+ if !share_encrypted_room(
+ &sender_user,
+ &user_id,
+ &room_id,
+ )? {
+ device_list_changes.insert(user_id);
+ }
+ }
+ MembershipState::Leave => {
+ // Write down users that have left encrypted rooms we are in
+ left_encrypted_users.insert(user_id);
+ }
+ _ => {}
+ }
+ }
+ }
+ }
+ }
+ if joined_since_last_sync || new_encrypted_room {
+ // If the user is in a new encrypted room, give them all joined users
+ device_list_changes.extend(
+ services()
+ .rooms
+ .state_cache
+ .room_members(&room_id)
+ .flatten()
+ .filter(|user_id| {
+ // Don't send key updates from the sender to the sender
+ &sender_user != user_id
+ })
+ .filter(|user_id| {
+ // Only send keys if the sender doesn't share an encrypted room with the target already
+ !share_encrypted_room(&sender_user, user_id, &room_id)
+ .unwrap_or(false)
+ }),
+ );
+ }
+ }
+ }
+ // Look for device list updates in this room
+ device_list_changes.extend(
+ services()
+ .users
+ .keys_changed(room_id.as_ref(), since, None)
+ .filter_map(|r| r.ok()),
+ );
+ }
+ for user_id in left_encrypted_users {
+ let dont_share_encrypted_room = services()
+ .rooms
+ .user
+ .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
+ .filter_map(|r| r.ok())
+ .filter_map(|other_room_id| {
+ Some(
+ services()
+ .rooms
+ .state_accessor
+ .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
+ .ok()?
+ .is_some(),
+ )
+ })
+ .all(|encrypted| !encrypted);
+ // If the user doesn't share an encrypted room with the target anymore, we need to tell
+ // them
+ if dont_share_encrypted_room {
+ device_list_left.insert(user_id);
+ }
+ }
+ }
+
+ let mut lists = BTreeMap::new();
+ let mut todo_rooms = BTreeMap::new(); // and required state
+
+ for (list_id, list) in body.lists {
+ if list.filters.and_then(|f| f.is_invite).unwrap_or(false) {
+ continue;
+ }
+
+ let mut new_known_rooms = BTreeMap::new();
+
+ lists.insert(
+ list_id.clone(),
+ sync_events::v4::SyncList {
+ ops: list
+ .ranges
+ .into_iter()
+ .map(|mut r| {
+ r.0 =
+ r.0.clamp(uint!(0), UInt::from(all_joined_rooms.len() as u32 - 1));
+ r.1 =
+ r.1.clamp(r.0, UInt::from(all_joined_rooms.len() as u32 - 1));
+ let room_ids = all_joined_rooms
+ [(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
+ .to_vec();
+ new_known_rooms.extend(room_ids.iter().cloned().map(|r| (r, true)));
+ for room_id in &room_ids {
+ let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
+ BTreeSet::new(),
+ 0,
+ true,
+ ));
+ let limit = list
+ .room_details
+ .timeline_limit
+ .map_or(10, u64::from)
+ .min(100);
+ todo_room
+ .0
+ .extend(list.room_details.required_state.iter().cloned());
+ todo_room.1 = todo_room.1.max(limit);
+ if known_rooms.get(&list_id).and_then(|k| k.get(room_id)) != Some(&true)
+ {
+ todo_room.2 = false;
+ }
+ }
+ sync_events::v4::SyncOp {
+ op: SlidingOp::Sync,
+ range: Some(r.clone()),
+ index: None,
+ room_ids,
+ room_id: None,
+ }
+ })
+ .collect(),
+ count: UInt::from(all_joined_rooms.len() as u32),
+ },
+ );
+
+ if let Some(conn_id) = &body.conn_id {
+ services().users.update_sync_known_rooms(
+ sender_user.clone(),
+ sender_device.clone(),
+ conn_id.clone(),
+ list_id,
+ new_known_rooms,
+ );
+ }
+ }
+
+ let mut known_subscription_rooms = BTreeMap::new();
+ for (room_id, room) in &body.room_subscriptions {
+ let todo_room = todo_rooms
+ .entry(room_id.clone())
+ .or_insert((BTreeSet::new(), 0, true));
+ let limit = room.timeline_limit.map_or(10, u64::from).min(100);
+ todo_room.0.extend(room.required_state.iter().cloned());
+ todo_room.1 = todo_room.1.max(limit);
+ if known_rooms
+ .get("subscriptions")
+ .and_then(|k| k.get(room_id))
+ != Some(&true)
+ {
+ todo_room.2 = false;
+ }
+ known_subscription_rooms.insert(room_id.clone(), true);
+ }
+
+ for r in body.unsubscribe_rooms {
+ known_subscription_rooms.remove(&r);
+ body.room_subscriptions.remove(&r);
+ }
+
+ if let Some(conn_id) = &body.conn_id {
+ services().users.update_sync_known_rooms(
+ sender_user.clone(),
+ sender_device.clone(),
+ conn_id.clone(),
+ "subscriptions".to_owned(),
+ known_subscription_rooms,
+ );
+ }
+
+ if let Some(conn_id) = &body.conn_id {
+ services().users.update_sync_subscriptions(
+ sender_user.clone(),
+ sender_device.clone(),
+ conn_id.clone(),
+ body.room_subscriptions,
+ );
+ }
+
+ let mut rooms = BTreeMap::new();
+ for (room_id, (required_state_request, timeline_limit, known)) in &todo_rooms {
+ // TODO: per-room sync tokens
+ let (timeline_pdus, limited) =
+ load_timeline(&sender_user, &room_id, sincecount, *timeline_limit)?;
+
+ if *known && timeline_pdus.is_empty() {
+ continue;
+ }
+
+ let prev_batch = timeline_pdus
+ .first()
+ .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
+ Ok(Some(match pdu_count {
+ PduCount::Backfilled(_) => {
+ error!("timeline in backfill state?!");
+ "0".to_owned()
+ }
+ PduCount::Normal(c) => c.to_string(),
+ }))
+ })?
+ .or_else(|| {
+ if since != 0 {
+ Some(since.to_string())
+ } else {
+ None
+ }
+ });
+
+ let room_events: Vec<_> = timeline_pdus
+ .iter()
+ .map(|(_, pdu)| pdu.to_sync_room_event())
+ .collect();
+
+ let required_state = required_state_request
+ .iter()
+ .map(|state| {
+ services()
+ .rooms
+ .state_accessor
+ .room_state_get(&room_id, &state.0, &state.1)
+ })
+ .filter_map(|r| r.ok())
+ .filter_map(|o| o)
+ .map(|state| state.to_sync_state_event())
+ .collect();
+
+ // Heroes
+ let heroes = services()
+ .rooms
+ .state_cache
+ .room_members(&room_id)
+ .filter_map(|r| r.ok())
+ .filter(|member| member != &sender_user)
+ .map(|member| {
+ Ok::<_, Error>(
+ services()
+ .rooms
+ .state_accessor
+ .get_member(&room_id, &member)?
+ .map(|memberevent| {
+ (
+ memberevent
+ .displayname
+ .unwrap_or_else(|| member.to_string()),
+ memberevent.avatar_url,
+ )
+ }),
+ )
+ })
+ .filter_map(|r| r.ok())
+ .filter_map(|o| o)
+ .take(5)
+ .collect::<Vec<_>>();
+ let name = if heroes.len() > 1 {
+ let last = heroes[0].0.clone();
+ Some(
+ heroes[1..]
+ .iter()
+ .map(|h| h.0.clone())
+ .collect::<Vec<_>>()
+ .join(", ")
+ + " and "
+ + &last,
+ )
+ } else if heroes.len() == 1 {
+ Some(heroes[0].0.clone())
+ } else {
+ None
+ };
+
+ let avatar = if heroes.len() == 1 {
+ heroes[0].1.clone()
+ } else {
+ None
+ };
+
+ rooms.insert(
+ room_id.clone(),
+ sync_events::v4::SlidingSyncRoom {
+ name: services()
+ .rooms
+ .state_accessor
+ .get_name(&room_id)?
+ .or_else(|| name),
+ avatar: services()
+ .rooms
+ .state_accessor
+ .get_avatar(&room_id)?
+ .map_or(avatar, |a| a.url),
+ initial: Some(!known),
+ is_dm: None,
+ invite_state: None,
+ unread_notifications: UnreadNotificationsCount {
+ highlight_count: Some(
+ services()
+ .rooms
+ .user
+ .highlight_count(&sender_user, &room_id)?
+ .try_into()
+ .expect("notification count can't go that high"),
+ ),
+ notification_count: Some(
+ services()
+ .rooms
+ .user
+ .notification_count(&sender_user, &room_id)?
+ .try_into()
+ .expect("notification count can't go that high"),
+ ),
+ },
+ timeline: room_events,
+ required_state,
+ prev_batch,
+ limited,
+ joined_count: Some(
+ (services()
+ .rooms
+ .state_cache
+ .room_joined_count(&room_id)?
+ .unwrap_or(0) as u32)
+ .into(),
+ ),
+ invited_count: Some(
+ (services()
+ .rooms
+ .state_cache
+ .room_invited_count(&room_id)?
+ .unwrap_or(0) as u32)
+ .into(),
+ ),
+ num_live: None, // Count events in timeline greater than global sync counter
+ },
+ );
+ }
+
+ if rooms
+ .iter()
+ .all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
+ {
+ // Hang a few seconds so requests are not spammed
+ // Stop hanging if new info arrives
+ let mut duration = body.timeout.unwrap_or(Duration::from_secs(30));
+ if duration.as_secs() > 30 {
+ duration = Duration::from_secs(30);
+ }
+ let _ = tokio::time::timeout(duration, watcher).await;
+ }
+
+ Ok(sync_events::v4::Response {
+ initial: since == 0,
+ txn_id: body.txn_id.clone(),
+ pos: next_batch.to_string(),
+ lists,
+ rooms,
+ extensions: sync_events::v4::Extensions {
+ to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
+ Some(sync_events::v4::ToDevice {
+ events: services()
+ .users
+ .get_to_device_events(&sender_user, &sender_device)?,
+ next_batch: next_batch.to_string(),
+ })
+ } else {
+ None
+ },
+ e2ee: sync_events::v4::E2EE {
+ device_lists: DeviceLists {
+ changed: device_list_changes.into_iter().collect(),
+ left: device_list_left.into_iter().collect(),
+ },
+ device_one_time_keys_count: services()
+ .users
+ .count_one_time_keys(&sender_user, &sender_device)?,
+ // Fallback keys are not yet supported
+ device_unused_fallback_key_types: None,
+ },
+ account_data: sync_events::v4::AccountData {
+ global: if body.extensions.account_data.enabled.unwrap_or(false) {
+ services()
+ .account_data
+ .changes_since(None, &sender_user, since)?
+ .into_iter()
+ .filter_map(|(_, v)| {
+ serde_json::from_str(v.json().get())
+ .map_err(|_| {
+ Error::bad_database("Invalid account event in database.")
+ })
+ .ok()
+ })
+ .collect()
+ } else {
+ Vec::new()
+ },
+ rooms: BTreeMap::new(),
+ },
+ receipts: sync_events::v4::Receipts {
+ rooms: BTreeMap::new(),
+ },
+ typing: sync_events::v4::Typing {
+ rooms: BTreeMap::new(),
+ },
+ },
+ delta_token: None,
+ })
+}