summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-07-24 08:48:27 +0000
committerTimo Kösters <timo@koesters.xyz>2023-07-24 08:48:27 +0000
commit90a10c84efe1ed1032b5cf670e73a6bac2dd017e (patch)
tree35048516729c1ba800ba3b46d3eb59f1c32f9329 /src
parent1e560529d8aa95503f5e037576ec64500eb16881 (diff)
parentd220641d6453b8be767197f58c0bb32f255e2a5c (diff)
downloadconduit-90a10c84efe1ed1032b5cf670e73a6bac2dd017e.zip
Merge branch 'slidingfixes' into 'next'
Better sliding sync See merge request famedly/conduit!511
Diffstat (limited to 'src')
-rw-r--r--src/api/client_server/sync.rs404
-rw-r--r--src/service/mod.rs7
-rw-r--r--src/service/rooms/state_accessor/mod.rs15
-rw-r--r--src/service/users/mod.rs212
4 files changed, 609 insertions, 29 deletions
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index fed4fb7..527625a 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -23,7 +23,7 @@ use ruma::{
uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
};
use std::{
- collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
+ collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
time::Duration,
};
@@ -463,7 +463,7 @@ async fn sync_helper(
}
for user_id in left_encrypted_users {
- let still_share_encrypted_room = services()
+ let dont_share_encrypted_room = services()
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
@@ -481,7 +481,7 @@ async fn sync_helper(
.all(|encrypted| !encrypted);
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
- if still_share_encrypted_room {
+ if dont_share_encrypted_room {
device_list_left.insert(user_id);
}
}
@@ -1174,8 +1174,7 @@ pub async fn sync_events_v4_route(
) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
- let body = dbg!(body.body);
-
+ let mut body = dbg!(body.body);
// Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device);
@@ -1188,7 +1187,22 @@ pub async fn sync_events_v4_route(
.unwrap_or(0);
let sincecount = PduCount::Normal(since);
- let initial = since == 0;
+ if since == 0 {
+ if let Some(conn_id) = &body.conn_id {
+ services().users.forget_sync_request_connection(
+ sender_user.clone(),
+ sender_device.clone(),
+ conn_id.clone(),
+ )
+ }
+ }
+
+ // Get sticky parameters from cache
+ let known_rooms = services().users.update_sync_request_with_cache(
+ sender_user.clone(),
+ sender_device.clone(),
+ &mut body,
+ );
let all_joined_rooms = services()
.rooms
@@ -1197,6 +1211,195 @@ pub async fn sync_events_v4_route(
.filter_map(|r| r.ok())
.collect::<Vec<_>>();
+ if body.extensions.to_device.enabled.unwrap_or(false) {
+ services()
+ .users
+ .remove_to_device_events(&sender_user, &sender_device, since)?;
+ }
+
+ let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
+ let mut device_list_changes = HashSet::new();
+ let mut device_list_left = HashSet::new();
+
+ if body.extensions.e2ee.enabled.unwrap_or(false) {
+ // Look for device list updates of this account
+ device_list_changes.extend(
+ services()
+ .users
+ .keys_changed(sender_user.as_ref(), since, None)
+ .filter_map(|r| r.ok()),
+ );
+
+ for room_id in &all_joined_rooms {
+ let current_shortstatehash =
+ if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
+ s
+ } else {
+ error!("Room {} has no state", room_id);
+ continue;
+ };
+
+ let since_shortstatehash = services()
+ .rooms
+ .user
+ .get_token_shortstatehash(&room_id, since)?;
+
+ let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
+ .and_then(|shortstatehash| {
+ services()
+ .rooms
+ .state_accessor
+ .state_get(
+ shortstatehash,
+ &StateEventType::RoomMember,
+ sender_user.as_str(),
+ )
+ .transpose()
+ })
+ .transpose()?
+ .and_then(|pdu| {
+ serde_json::from_str(pdu.content.get())
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))
+ .ok()
+ });
+
+ let encrypted_room = services()
+ .rooms
+ .state_accessor
+ .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
+ .is_some();
+
+ if let Some(since_shortstatehash) = since_shortstatehash {
+ // Skip if there are only timeline changes
+ if since_shortstatehash == current_shortstatehash {
+ continue;
+ }
+
+ let since_encryption = services().rooms.state_accessor.state_get(
+ since_shortstatehash,
+ &StateEventType::RoomEncryption,
+ "",
+ )?;
+
+ let joined_since_last_sync = since_sender_member
+ .map_or(true, |member| member.membership != MembershipState::Join);
+
+ let new_encrypted_room = encrypted_room && since_encryption.is_none();
+ if encrypted_room {
+ let current_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(current_shortstatehash)
+ .await?;
+ let since_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(since_shortstatehash)
+ .await?;
+
+ for (key, id) in current_state_ids {
+ if since_state_ids.get(&key) != Some(&id) {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+ if pdu.kind == TimelineEventType::RoomMember {
+ if let Some(state_key) = &pdu.state_key {
+ let user_id =
+ UserId::parse(state_key.clone()).map_err(|_| {
+ Error::bad_database("Invalid UserId in member PDU.")
+ })?;
+
+ if user_id == sender_user {
+ continue;
+ }
+
+ let new_membership = serde_json::from_str::<
+ RoomMemberEventContent,
+ >(
+ pdu.content.get()
+ )
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))?
+ .membership;
+
+ match new_membership {
+ MembershipState::Join => {
+ // A new user joined an encrypted room
+ if !share_encrypted_room(
+ &sender_user,
+ &user_id,
+ &room_id,
+ )? {
+ device_list_changes.insert(user_id);
+ }
+ }
+ MembershipState::Leave => {
+ // Write down users that have left encrypted rooms we are in
+ left_encrypted_users.insert(user_id);
+ }
+ _ => {}
+ }
+ }
+ }
+ }
+ }
+ if joined_since_last_sync || new_encrypted_room {
+ // If the user is in a new encrypted room, give them all joined users
+ device_list_changes.extend(
+ services()
+ .rooms
+ .state_cache
+ .room_members(&room_id)
+ .flatten()
+ .filter(|user_id| {
+ // Don't send key updates from the sender to the sender
+ &sender_user != user_id
+ })
+ .filter(|user_id| {
+ // Only send keys if the sender doesn't share an encrypted room with the target already
+ !share_encrypted_room(&sender_user, user_id, &room_id)
+ .unwrap_or(false)
+ }),
+ );
+ }
+ }
+ }
+ // Look for device list updates in this room
+ device_list_changes.extend(
+ services()
+ .users
+ .keys_changed(room_id.as_ref(), since, None)
+ .filter_map(|r| r.ok()),
+ );
+ }
+ for user_id in left_encrypted_users {
+ let dont_share_encrypted_room = services()
+ .rooms
+ .user
+ .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
+ .filter_map(|r| r.ok())
+ .filter_map(|other_room_id| {
+ Some(
+ services()
+ .rooms
+ .state_accessor
+ .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
+ .ok()?
+ .is_some(),
+ )
+ })
+ .all(|encrypted| !encrypted);
+ // If the user doesn't share an encrypted room with the target anymore, we need to tell
+ // them
+ if dont_share_encrypted_room {
+ device_list_left.insert(user_id);
+ }
+ }
+ }
+
let mut lists = BTreeMap::new();
let mut todo_rooms = BTreeMap::new(); // and required state
@@ -1205,8 +1408,10 @@ pub async fn sync_events_v4_route(
continue;
}
+ let mut new_known_rooms = BTreeMap::new();
+
lists.insert(
- list_id,
+ list_id.clone(),
sync_events::v4::SyncList {
ops: list
.ranges
@@ -1219,14 +1424,27 @@ pub async fn sync_events_v4_route(
let room_ids = all_joined_rooms
[(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
.to_vec();
- todo_rooms.extend(room_ids.iter().cloned().map(|r| {
+ new_known_rooms.extend(room_ids.iter().cloned().map(|r| (r, true)));
+ for room_id in &room_ids {
+ let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
+ BTreeSet::new(),
+ 0,
+ true,
+ ));
let limit = list
.room_details
.timeline_limit
.map_or(10, u64::from)
.min(100);
- (r, (list.room_details.required_state.clone(), limit))
- }));
+ todo_room
+ .0
+ .extend(list.room_details.required_state.iter().cloned());
+ todo_room.1 = todo_room.1.max(limit);
+ if known_rooms.get(&list_id).and_then(|k| k.get(room_id)) != Some(&true)
+ {
+ todo_room.2 = false;
+ }
+ }
sync_events::v4::SyncOp {
op: SlidingOp::Sync,
range: Some(r.clone()),
@@ -1239,12 +1457,69 @@ pub async fn sync_events_v4_route(
count: UInt::from(all_joined_rooms.len() as u32),
},
);
+
+ if let Some(conn_id) = &body.conn_id {
+ services().users.update_sync_known_rooms(
+ sender_user.clone(),
+ sender_device.clone(),
+ conn_id.clone(),
+ list_id,
+ new_known_rooms,
+ );
+ }
+ }
+
+ let mut known_subscription_rooms = BTreeMap::new();
+ for (room_id, room) in dbg!(&body.room_subscriptions) {
+ let todo_room = todo_rooms
+ .entry(room_id.clone())
+ .or_insert((BTreeSet::new(), 0, true));
+ let limit = room.timeline_limit.map_or(10, u64::from).min(100);
+ todo_room.0.extend(room.required_state.iter().cloned());
+ todo_room.1 = todo_room.1.max(limit);
+ if known_rooms
+ .get("subscriptions")
+ .and_then(|k| k.get(room_id))
+ != Some(&true)
+ {
+ todo_room.2 = false;
+ }
+ known_subscription_rooms.insert(room_id.clone(), true);
+ }
+
+ for r in body.unsubscribe_rooms {
+ known_subscription_rooms.remove(&r);
+ body.room_subscriptions.remove(&r);
+ }
+
+ if let Some(conn_id) = &body.conn_id {
+ services().users.update_sync_known_rooms(
+ sender_user.clone(),
+ sender_device.clone(),
+ conn_id.clone(),
+ "subscriptions".to_owned(),
+ known_subscription_rooms,
+ );
+ }
+
+ if let Some(conn_id) = &body.conn_id {
+ services().users.update_sync_subscriptions(
+ sender_user.clone(),
+ sender_device.clone(),
+ conn_id.clone(),
+ body.room_subscriptions,
+ );
}
let mut rooms = BTreeMap::new();
- for (room_id, (required_state_request, timeline_limit)) in todo_rooms {
+ for (room_id, (required_state_request, timeline_limit, known)) in &todo_rooms {
+ // TODO: per-room sync tokens
let (timeline_pdus, limited) =
- load_timeline(&sender_user, &room_id, sincecount, timeline_limit)?;
+ load_timeline(&sender_user, &room_id, sincecount, *timeline_limit)?;
+
+ if *known && timeline_pdus.is_empty() {
+ continue;
+ }
let prev_batch = timeline_pdus
.first()
@@ -1256,7 +1531,14 @@ pub async fn sync_events_v4_route(
}
PduCount::Normal(c) => c.to_string(),
}))
- })?;
+ })?
+ .or_else(|| {
+ if since != 0 {
+ Some(since.to_string())
+ } else {
+ None
+ }
+ });
let room_events: Vec<_> = timeline_pdus
.iter()
@@ -1279,13 +1561,60 @@ pub async fn sync_events_v4_route(
rooms.insert(
room_id.clone(),
sync_events::v4::SlidingSyncRoom {
- name: services().rooms.state_accessor.get_name(&room_id)?,
- initial: Some(initial),
+ name: services()
+ .rooms
+ .state_accessor
+ .get_name(&room_id)?
+ .or_else(|| {
+ // Heroes
+ let mut names = services()
+ .rooms
+ .state_cache
+ .room_members(&room_id)
+ .filter_map(|r| r.ok())
+ .filter(|member| member != &sender_user)
+ .map(|member| {
+ Ok::<_, Error>(
+ services()
+ .rooms
+ .state_accessor
+ .get_member(&room_id, &member)?
+ .and_then(|memberevent| memberevent.displayname)
+ .unwrap_or(member.to_string()),
+ )
+ })
+ .filter_map(|r| r.ok())
+ .take(5)
+ .collect::<Vec<_>>();
+ if names.len() > 1 {
+ let last = names.pop().unwrap();
+ Some(names.join(", ") + " and " + &last)
+ } else if names.len() == 1 {
+ Some(names.pop().unwrap())
+ } else {
+ None
+ }
+ }),
+ initial: Some(!known),
is_dm: None,
invite_state: None,
unread_notifications: UnreadNotificationsCount {
- highlight_count: None,
- notification_count: None,
+ highlight_count: Some(
+ services()
+ .rooms
+ .user
+ .highlight_count(&sender_user, &room_id)?
+ .try_into()
+ .expect("notification count can't go that high"),
+ ),
+ notification_count: Some(
+ services()
+ .rooms
+ .user
+ .notification_count(&sender_user, &room_id)?
+ .try_into()
+ .expect("notification count can't go that high"),
+ ),
},
timeline: room_events,
required_state,
@@ -1307,7 +1636,7 @@ pub async fn sync_events_v4_route(
.unwrap_or(0) as u32)
.into(),
),
- num_live: None,
+ num_live: None, // Count events in timeline greater than global sync counter
},
);
}
@@ -1326,23 +1655,50 @@ pub async fn sync_events_v4_route(
}
Ok(dbg!(sync_events::v4::Response {
- initial: initial,
+ initial: since == 0,
txn_id: body.txn_id.clone(),
pos: next_batch.to_string(),
lists,
rooms,
extensions: sync_events::v4::Extensions {
- to_device: None,
+ to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
+ Some(sync_events::v4::ToDevice {
+ events: services()
+ .users
+ .get_to_device_events(&sender_user, &sender_device)?,
+ next_batch: next_batch.to_string(),
+ })
+ } else {
+ None
+ },
e2ee: sync_events::v4::E2EE {
device_lists: DeviceLists {
- changed: Vec::new(),
- left: Vec::new(),
+ changed: device_list_changes.into_iter().collect(),
+ left: device_list_left.into_iter().collect(),
},
- device_one_time_keys_count: BTreeMap::new(),
+ device_one_time_keys_count: services()
+ .users
+ .count_one_time_keys(&sender_user, &sender_device)?,
+ // Fallback keys are not yet supported
device_unused_fallback_key_types: None,
},
account_data: sync_events::v4::AccountData {
- global: Vec::new(),
+ global: if body.extensions.account_data.enabled.unwrap_or(false) {
+ services()
+ .account_data
+ .changes_since(None, &sender_user, since)?
+ .into_iter()
+ .filter_map(|(_, v)| {
+ serde_json::from_str(v.json().get())
+ .map_err(|_| {
+ Error::bad_database("Invalid account event in database.")
+ })
+ .ok()
+ })
+ .collect()
+ } else {
+ Vec::new()
+ },
rooms: BTreeMap::new(),
},
receipts: sync_events::v4::Receipts {
diff --git a/src/service/mod.rs b/src/service/mod.rs
index 56aed7f..f85da78 100644
--- a/src/service/mod.rs
+++ b/src/service/mod.rs
@@ -1,5 +1,5 @@
use std::{
- collections::HashMap,
+ collections::{BTreeMap, HashMap},
sync::{Arc, Mutex},
};
@@ -105,7 +105,10 @@ impl Services {
},
transaction_ids: transaction_ids::Service { db },
uiaa: uiaa::Service { db },
- users: users::Service { db },
+ users: users::Service {
+ db,
+ connections: Mutex::new(BTreeMap::new()),
+ },
account_data: account_data::Service { db },
admin: admin::Service::build(),
key_backups: key_backups::Service { db },
diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs
index 9d071a5..435f4df 100644
--- a/src/service/rooms/state_accessor/mod.rs
+++ b/src/service/rooms/state_accessor/mod.rs
@@ -282,4 +282,19 @@ impl Service {
.map_err(|_| Error::bad_database("Invalid room name event in database."))
})
}
+
+ pub fn get_member(
+ &self,
+ room_id: &RoomId,
+ user_id: &UserId,
+ ) -> Result<Option<RoomMemberEventContent>> {
+ services()
+ .rooms
+ .state_accessor
+ .room_state_get(&room_id, &StateEventType::RoomMember, user_id.as_str())?
+ .map_or(Ok(None), |s| {
+ serde_json::from_str(s.content.get())
+ .map_err(|_| Error::bad_database("Invalid room member event in database."))
+ })
+ }
}
diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs
index 2311c30..f5b914f 100644
--- a/src/service/users/mod.rs
+++ b/src/service/users/mod.rs
@@ -1,20 +1,41 @@
mod data;
-use std::{collections::BTreeMap, mem};
+use std::{
+ collections::BTreeMap,
+ mem,
+ sync::{Arc, Mutex},
+};
pub use data::Data;
use ruma::{
- api::client::{device::Device, error::ErrorKind, filter::FilterDefinition},
+ api::client::{
+ device::Device,
+ error::ErrorKind,
+ filter::FilterDefinition,
+ sync::sync_events::{
+ self,
+ v4::{ExtensionsConfig, SyncRequestList},
+ },
+ },
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::AnyToDeviceEvent,
serde::Raw,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri,
- OwnedUserId, RoomAliasId, UInt, UserId,
+ OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId,
};
use crate::{services, Error, Result};
+pub struct SlidingSyncCache {
+ lists: BTreeMap<String, SyncRequestList>,
+ subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
+ known_rooms: BTreeMap<String, BTreeMap<OwnedRoomId, bool>>,
+ extensions: ExtensionsConfig,
+}
+
pub struct Service {
pub db: &'static dyn Data,
+ pub connections:
+ Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>,
}
impl Service {
@@ -23,6 +44,191 @@ impl Service {
self.db.exists(user_id)
}
+ pub fn forget_sync_request_connection(
+ &self,
+ user_id: OwnedUserId,
+ device_id: OwnedDeviceId,
+ conn_id: String,
+ ) {
+ self.connections
+ .lock()
+ .unwrap()
+ .remove(&(user_id, device_id, conn_id));
+ }
+
+ pub fn update_sync_request_with_cache(
+ &self,
+ user_id: OwnedUserId,
+ device_id: OwnedDeviceId,
+ request: &mut sync_events::v4::Request,
+ ) -> BTreeMap<String, BTreeMap<OwnedRoomId, bool>> {
+ let Some(conn_id) = request.conn_id.clone() else { return BTreeMap::new(); };
+
+ let cache = &mut self.connections.lock().unwrap();
+ let cached = Arc::clone(
+ cache
+ .entry((user_id, device_id, conn_id))
+ .or_insert_with(|| {
+ Arc::new(Mutex::new(SlidingSyncCache {
+ lists: BTreeMap::new(),
+ subscriptions: BTreeMap::new(),
+ known_rooms: BTreeMap::new(),
+ extensions: ExtensionsConfig::default(),
+ }))
+ }),
+ );
+ let cached = &mut cached.lock().unwrap();
+ drop(cache);
+
+ for (list_id, list) in &mut request.lists {
+ if let Some(cached_list) = cached.lists.get(list_id) {
+ if list.sort.is_empty() {
+ list.sort = cached_list.sort.clone();
+ };
+ if list.room_details.required_state.is_empty() {
+ list.room_details.required_state =
+ cached_list.room_details.required_state.clone();
+ };
+ list.room_details.timeline_limit = list
+ .room_details
+ .timeline_limit
+ .or(cached_list.room_details.timeline_limit);
+ list.include_old_rooms = list
+ .include_old_rooms
+ .clone()
+ .or(cached_list.include_old_rooms.clone());
+ match (&mut list.filters, cached_list.filters.clone()) {
+ (Some(list_filters), Some(cached_filters)) => {
+ list_filters.is_dm = list_filters.is_dm.or(cached_filters.is_dm);
+ if list_filters.spaces.is_empty() {
+ list_filters.spaces = cached_filters.spaces;
+ }
+ list_filters.is_encrypted =
+ list_filters.is_encrypted.or(cached_filters.is_encrypted);
+ list_filters.is_invite =
+ list_filters.is_invite.or(cached_filters.is_invite);
+ if list_filters.room_types.is_empty() {
+ list_filters.room_types = cached_filters.room_types;
+ }
+ if list_filters.not_room_types.is_empty() {
+ list_filters.not_room_types = cached_filters.not_room_types;
+ }
+ list_filters.room_name_like = list_filters
+ .room_name_like
+ .clone()
+ .or(cached_filters.room_name_like);
+ if list_filters.tags.is_empty() {
+ list_filters.tags = cached_filters.tags;
+ }
+ if list_filters.not_tags.is_empty() {
+ list_filters.not_tags = cached_filters.not_tags;
+ }
+ }
+ (_, Some(cached_filters)) => list.filters = Some(cached_filters),
+ (_, _) => {}
+ }
+ if list.bump_event_types.is_empty() {
+ list.bump_event_types = cached_list.bump_event_types.clone();
+ };
+ }
+ cached.lists.insert(list_id.clone(), list.clone());
+ }
+
+ cached
+ .subscriptions
+ .extend(request.room_subscriptions.clone().into_iter());
+ request
+ .room_subscriptions
+ .extend(cached.subscriptions.clone().into_iter());
+
+ request.extensions.e2ee.enabled = request
+ .extensions
+ .e2ee
+ .enabled
+ .or(cached.extensions.e2ee.enabled);
+
+ request.extensions.to_device.enabled = request
+ .extensions
+ .to_device
+ .enabled
+ .or(cached.extensions.to_device.enabled);
+
+ request.extensions.account_data.enabled = request
+ .extensions
+ .account_data
+ .enabled
+ .or(cached.extensions.account_data.enabled);
+ request.extensions.account_data.lists = request
+ .extensions
+ .account_data
+ .lists
+ .clone()
+ .or(cached.extensions.account_data.lists.clone());
+ request.extensions.account_data.rooms = request
+ .extensions
+ .account_data
+ .rooms
+ .clone()
+ .or(cached.extensions.account_data.rooms.clone());
+
+ cached.extensions = request.extensions.clone();
+
+ cached.known_rooms.clone()
+ }
+
+ pub fn update_sync_subscriptions(
+ &self,
+ user_id: OwnedUserId,
+ device_id: OwnedDeviceId,
+ conn_id: String,
+ subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
+ ) {
+ let cache = &mut self.connections.lock().unwrap();
+ let cached = Arc::clone(
+ cache
+ .entry((user_id, device_id, conn_id))
+ .or_insert_with(|| {
+ Arc::new(Mutex::new(SlidingSyncCache {
+ lists: BTreeMap::new(),
+ subscriptions: BTreeMap::new(),
+ known_rooms: BTreeMap::new(),
+ extensions: ExtensionsConfig::default(),
+ }))
+ }),
+ );
+ let cached = &mut cached.lock().unwrap();
+ drop(cache);
+
+ cached.subscriptions = subscriptions;
+ }
+
+ pub fn update_sync_known_rooms(
+ &self,
+ user_id: OwnedUserId,
+ device_id: OwnedDeviceId,
+ conn_id: String,
+ list_id: String,
+ new_cached_rooms: BTreeMap<OwnedRoomId, bool>,
+ ) {
+ let cache = &mut self.connections.lock().unwrap();
+ let cached = Arc::clone(
+ cache
+ .entry((user_id, device_id, conn_id))
+ .or_insert_with(|| {
+ Arc::new(Mutex::new(SlidingSyncCache {
+ lists: BTreeMap::new(),
+ subscriptions: BTreeMap::new(),
+ known_rooms: BTreeMap::new(),
+ extensions: ExtensionsConfig::default(),
+ }))
+ }),
+ );
+ let cached = &mut cached.lock().unwrap();
+ drop(cache);
+
+ cached.known_rooms.insert(list_id, new_cached_rooms);
+ }
+
/// Check if account is deactivated
pub fn is_deactivated(&self, user_id: &UserId) -> Result<bool> {
self.db.is_deactivated(user_id)