summaryrefslogtreecommitdiff
path: root/src/api/client_server/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/client_server/sync.rs')
-rw-r--r--src/api/client_server/sync.rs600
1 files changed, 304 insertions, 296 deletions
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index b4baec1..dd75347 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -653,213 +653,133 @@ async fn load_joined_room(
.user
.get_token_shortstatehash(&room_id, since)?;
- // Calculates joined_member_count, invited_member_count and heroes
- let calculate_counts = || {
- let joined_member_count = services()
- .rooms
- .state_cache
- .room_joined_count(&room_id)?
- .unwrap_or(0);
- let invited_member_count = services()
- .rooms
- .state_cache
- .room_invited_count(&room_id)?
- .unwrap_or(0);
-
- // Recalculate heroes (first 5 members)
- let mut heroes = Vec::new();
-
- if joined_member_count + invited_member_count <= 5 {
- // Go through all PDUs and for each member event, check if the user is still joined or
- // invited until we have 5 or we reach the end
-
- for hero in services()
- .rooms
- .timeline
- .all_pdus(&sender_user, &room_id)?
- .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
- .filter(|(_, pdu)| pdu.kind == TimelineEventType::RoomMember)
- .map(|(_, pdu)| {
- let content: RoomMemberEventContent = serde_json::from_str(pdu.content.get())
- .map_err(|_| {
- Error::bad_database("Invalid member event in database.")
- })?;
-
- if let Some(state_key) = &pdu.state_key {
- let user_id = UserId::parse(state_key.clone())
- .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
-
- // The membership was and still is invite or join
- if matches!(
- content.membership,
- MembershipState::Join | MembershipState::Invite
- ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)?
- || services()
- .rooms
- .state_cache
- .is_invited(&user_id, &room_id)?)
- {
- Ok::<_, Error>(Some(state_key.clone()))
- } else {
- Ok(None)
- }
- } else {
- Ok(None)
- }
- })
- // Filter out buggy users
- .filter_map(|u| u.ok())
- // Filter for possible heroes
- .flatten()
- {
- if heroes.contains(&hero) || hero == sender_user.as_str() {
- continue;
- }
-
- heroes.push(hero);
- }
- }
-
- Ok::<_, Error>((
- Some(joined_member_count),
- Some(invited_member_count),
- heroes,
- ))
- };
-
- let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
- .and_then(|shortstatehash| {
- services()
- .rooms
- .state_accessor
- .state_get(
- shortstatehash,
- &StateEventType::RoomMember,
- sender_user.as_str(),
- )
- .transpose()
- })
- .transpose()?
- .and_then(|pdu| {
- serde_json::from_str(pdu.content.get())
- .map_err(|_| Error::bad_database("Invalid PDU in database."))
- .ok()
- });
-
- let joined_since_last_sync =
- since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
-
let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) =
- if since_shortstatehash.is_none() || joined_since_last_sync {
- // Probably since = 0, we will do an initial sync
-
- let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
-
- let current_state_ids = services()
- .rooms
- .state_accessor
- .state_full_ids(current_shortstatehash)
- .await?;
+ if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
+ // No state changes
+ (Vec::new(), None, None, false, Vec::new())
+ } else {
+ // Calculates joined_member_count, invited_member_count and heroes
+ let calculate_counts = || {
+ let joined_member_count = services()
+ .rooms
+ .state_cache
+ .room_joined_count(&room_id)?
+ .unwrap_or(0);
+ let invited_member_count = services()
+ .rooms
+ .state_cache
+ .room_invited_count(&room_id)?
+ .unwrap_or(0);
- let mut state_events = Vec::new();
- let mut lazy_loaded = HashSet::new();
+ // Recalculate heroes (first 5 members)
+ let mut heroes = Vec::new();
- let mut i = 0;
- for (shortstatekey, id) in current_state_ids {
- let (event_type, state_key) = services()
- .rooms
- .short
- .get_statekey_from_short(shortstatekey)?;
+ if joined_member_count + invited_member_count <= 5 {
+ // Go through all PDUs and for each member event, check if the user is still joined or
+ // invited until we have 5 or we reach the end
- if event_type != StateEventType::RoomMember {
- let pdu = match services().rooms.timeline.get_pdu(&id)? {
- Some(pdu) => pdu,
- None => {
- error!("Pdu in state not found: {}", id);
+ for hero in services()
+ .rooms
+ .timeline
+ .all_pdus(&sender_user, &room_id)?
+ .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
+ .filter(|(_, pdu)| pdu.kind == TimelineEventType::RoomMember)
+ .map(|(_, pdu)| {
+ let content: RoomMemberEventContent =
+ serde_json::from_str(pdu.content.get()).map_err(|_| {
+ Error::bad_database("Invalid member event in database.")
+ })?;
+
+ if let Some(state_key) = &pdu.state_key {
+ let user_id = UserId::parse(state_key.clone()).map_err(|_| {
+ Error::bad_database("Invalid UserId in member PDU.")
+ })?;
+
+ // The membership was and still is invite or join
+ if matches!(
+ content.membership,
+ MembershipState::Join | MembershipState::Invite
+ ) && (services()
+ .rooms
+ .state_cache
+ .is_joined(&user_id, &room_id)?
+ || services()
+ .rooms
+ .state_cache
+ .is_invited(&user_id, &room_id)?)
+ {
+ Ok::<_, Error>(Some(state_key.clone()))
+ } else {
+ Ok(None)
+ }
+ } else {
+ Ok(None)
+ }
+ })
+ // Filter out buggy users
+ .filter_map(|u| u.ok())
+ // Filter for possible heroes
+ .flatten()
+ {
+ if heroes.contains(&hero) || hero == sender_user.as_str() {
continue;
}
- };
- state_events.push(pdu);
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
+ heroes.push(hero);
}
- } else if !lazy_load_enabled
- || full_state
- || timeline_users.contains(&state_key)
- // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
- || *sender_user == state_key
- {
- let pdu = match services().rooms.timeline.get_pdu(&id)? {
- Some(pdu) => pdu,
- None => {
- error!("Pdu in state not found: {}", id);
- continue;
- }
- };
+ }
- // This check is in case a bad user ID made it into the database
- if let Ok(uid) = UserId::parse(&state_key) {
- lazy_loaded.insert(uid);
- }
- state_events.push(pdu);
+ Ok::<_, Error>((
+ Some(joined_member_count),
+ Some(invited_member_count),
+ heroes,
+ ))
+ };
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
- }
- }
- }
+ let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
+ .and_then(|shortstatehash| {
+ services()
+ .rooms
+ .state_accessor
+ .state_get(
+ shortstatehash,
+ &StateEventType::RoomMember,
+ sender_user.as_str(),
+ )
+ .transpose()
+ })
+ .transpose()?
+ .and_then(|pdu| {
+ serde_json::from_str(pdu.content.get())
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))
+ .ok()
+ });
- // Reset lazy loading because this is an initial sync
- services().rooms.lazy_loading.lazy_load_reset(
- &sender_user,
- &sender_device,
- &room_id,
- )?;
-
- // The state_events above should contain all timeline_users, let's mark them as lazy
- // loaded.
- services().rooms.lazy_loading.lazy_load_mark_sent(
- &sender_user,
- &sender_device,
- &room_id,
- lazy_loaded,
- next_batchcount,
- );
+ let joined_since_last_sync = since_sender_member
+ .map_or(true, |member| member.membership != MembershipState::Join);
- (
- heroes,
- joined_member_count,
- invited_member_count,
- true,
- state_events,
- )
- } else if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
- // No state changes
- (Vec::new(), None, None, false, Vec::new())
- } else {
- // Incremental /sync
- let since_shortstatehash = since_shortstatehash.unwrap();
+ if since_shortstatehash.is_none() || joined_since_last_sync {
+ // Probably since = 0, we will do an initial sync
- let mut state_events = Vec::new();
- let mut lazy_loaded = HashSet::new();
+ let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
- if since_shortstatehash != current_shortstatehash {
let current_state_ids = services()
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
- let since_state_ids = services()
- .rooms
- .state_accessor
- .state_full_ids(since_shortstatehash)
- .await?;
- for (key, id) in current_state_ids {
- if full_state || since_state_ids.get(&key) != Some(&id) {
+ let mut state_events = Vec::new();
+ let mut lazy_loaded = HashSet::new();
+
+ let mut i = 0;
+ for (shortstatekey, id) in current_state_ids {
+ let (event_type, state_key) = services()
+ .rooms
+ .short
+ .get_statekey_from_short(shortstatekey)?;
+
+ if event_type != StateEventType::RoomMember {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
@@ -867,146 +787,234 @@ async fn load_joined_room(
continue;
}
};
+ state_events.push(pdu);
- if pdu.kind == TimelineEventType::RoomMember {
- match UserId::parse(
- pdu.state_key
- .as_ref()
- .expect("State event has state key")
- .clone(),
- ) {
- Ok(state_key_userid) => {
- lazy_loaded.insert(state_key_userid);
- }
- Err(e) => error!("Invalid state key for member event: {}", e),
- }
+ i += 1;
+ if i % 100 == 0 {
+ tokio::task::yield_now().await;
}
+ } else if !lazy_load_enabled
+ || full_state
+ || timeline_users.contains(&state_key)
+ // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
+ || *sender_user == state_key
+ {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+ // This check is in case a bad user ID made it into the database
+ if let Ok(uid) = UserId::parse(&state_key) {
+ lazy_loaded.insert(uid);
+ }
state_events.push(pdu);
- tokio::task::yield_now().await;
+
+ i += 1;
+ if i % 100 == 0 {
+ tokio::task::yield_now().await;
+ }
}
}
- }
- for (_, event) in &timeline_pdus {
- if lazy_loaded.contains(&event.sender) {
- continue;
- }
+ // Reset lazy loading because this is an initial sync
+ services().rooms.lazy_loading.lazy_load_reset(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ )?;
- if !services().rooms.lazy_loading.lazy_load_was_sent_before(
+ // The state_events above should contain all timeline_users, let's mark them as lazy
+ // loaded.
+ services().rooms.lazy_loading.lazy_load_mark_sent(
&sender_user,
&sender_device,
&room_id,
- &event.sender,
- )? || lazy_load_send_redundant
- {
- if let Some(member_event) = services().rooms.state_accessor.room_state_get(
- &room_id,
- &StateEventType::RoomMember,
- event.sender.as_str(),
- )? {
- lazy_loaded.insert(event.sender.clone());
- state_events.push(member_event);
+ lazy_loaded,
+ next_batchcount,
+ );
+
+ (
+ heroes,
+ joined_member_count,
+ invited_member_count,
+ true,
+ state_events,
+ )
+ } else {
+ // Incremental /sync
+ let since_shortstatehash = since_shortstatehash.unwrap();
+
+ let mut state_events = Vec::new();
+ let mut lazy_loaded = HashSet::new();
+
+ if since_shortstatehash != current_shortstatehash {
+ let current_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(current_shortstatehash)
+ .await?;
+ let since_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(since_shortstatehash)
+ .await?;
+
+ for (key, id) in current_state_ids {
+ if full_state || since_state_ids.get(&key) != Some(&id) {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+
+ if pdu.kind == TimelineEventType::RoomMember {
+ match UserId::parse(
+ pdu.state_key
+ .as_ref()
+ .expect("State event has state key")
+ .clone(),
+ ) {
+ Ok(state_key_userid) => {
+ lazy_loaded.insert(state_key_userid);
+ }
+ Err(e) => error!("Invalid state key for member event: {}", e),
+ }
+ }
+
+ state_events.push(pdu);
+ tokio::task::yield_now().await;
+ }
}
}
- }
- services().rooms.lazy_loading.lazy_load_mark_sent(
- &sender_user,
- &sender_device,
- &room_id,
- lazy_loaded,
- next_batchcount,
- );
+ for (_, event) in &timeline_pdus {
+ if lazy_loaded.contains(&event.sender) {
+ continue;
+ }
- let encrypted_room = services()
- .rooms
- .state_accessor
- .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
- .is_some();
+ if !services().rooms.lazy_loading.lazy_load_was_sent_before(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ &event.sender,
+ )? || lazy_load_send_redundant
+ {
+ if let Some(member_event) = services().rooms.state_accessor.room_state_get(
+ &room_id,
+ &StateEventType::RoomMember,
+ event.sender.as_str(),
+ )? {
+ lazy_loaded.insert(event.sender.clone());
+ state_events.push(member_event);
+ }
+ }
+ }
- let since_encryption = services().rooms.state_accessor.state_get(
- since_shortstatehash,
- &StateEventType::RoomEncryption,
- "",
- )?;
+ services().rooms.lazy_loading.lazy_load_mark_sent(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ lazy_loaded,
+ next_batchcount,
+ );
- // Calculations:
- let new_encrypted_room = encrypted_room && since_encryption.is_none();
+ let encrypted_room = services()
+ .rooms
+ .state_accessor
+ .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
+ .is_some();
- let send_member_count = state_events
- .iter()
- .any(|event| event.kind == TimelineEventType::RoomMember);
+ let since_encryption = services().rooms.state_accessor.state_get(
+ since_shortstatehash,
+ &StateEventType::RoomEncryption,
+ "",
+ )?;
- if encrypted_room {
- for state_event in &state_events {
- if state_event.kind != TimelineEventType::RoomMember {
- continue;
- }
+ // Calculations:
+ let new_encrypted_room = encrypted_room && since_encryption.is_none();
- if let Some(state_key) = &state_event.state_key {
- let user_id = UserId::parse(state_key.clone())
- .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
+ let send_member_count = state_events
+ .iter()
+ .any(|event| event.kind == TimelineEventType::RoomMember);
- if user_id == sender_user {
+ if encrypted_room {
+ for state_event in &state_events {
+ if state_event.kind != TimelineEventType::RoomMember {
continue;
}
- let new_membership = serde_json::from_str::<RoomMemberEventContent>(
- state_event.content.get(),
- )
- .map_err(|_| Error::bad_database("Invalid PDU in database."))?
- .membership;
-
- match new_membership {
- MembershipState::Join => {
- // A new user joined an encrypted room
- if !share_encrypted_room(&sender_user, &user_id, &room_id)? {
- device_list_updates.insert(user_id);
- }
+ if let Some(state_key) = &state_event.state_key {
+ let user_id = UserId::parse(state_key.clone()).map_err(|_| {
+ Error::bad_database("Invalid UserId in member PDU.")
+ })?;
+
+ if user_id == sender_user {
+ continue;
}
- MembershipState::Leave => {
- // Write down users that have left encrypted rooms we are in
- left_encrypted_users.insert(user_id);
+
+ let new_membership = serde_json::from_str::<RoomMemberEventContent>(
+ state_event.content.get(),
+ )
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))?
+ .membership;
+
+ match new_membership {
+ MembershipState::Join => {
+ // A new user joined an encrypted room
+ if !share_encrypted_room(&sender_user, &user_id, &room_id)? {
+ device_list_updates.insert(user_id);
+ }
+ }
+ MembershipState::Leave => {
+ // Write down users that have left encrypted rooms we are in
+ left_encrypted_users.insert(user_id);
+ }
+ _ => {}
}
- _ => {}
}
}
}
- }
-
- if joined_since_last_sync && encrypted_room || new_encrypted_room {
- // If the user is in a new encrypted room, give them all joined users
- device_list_updates.extend(
- services()
- .rooms
- .state_cache
- .room_members(&room_id)
- .flatten()
- .filter(|user_id| {
- // Don't send key updates from the sender to the sender
- &sender_user != user_id
- })
- .filter(|user_id| {
- // Only send keys if the sender doesn't share an encrypted room with the target already
- !share_encrypted_room(&sender_user, user_id, &room_id).unwrap_or(false)
- }),
- );
- }
- let (joined_member_count, invited_member_count, heroes) = if send_member_count {
- calculate_counts()?
- } else {
- (None, None, Vec::new())
- };
+ if joined_since_last_sync && encrypted_room || new_encrypted_room {
+ // If the user is in a new encrypted room, give them all joined users
+ device_list_updates.extend(
+ services()
+ .rooms
+ .state_cache
+ .room_members(&room_id)
+ .flatten()
+ .filter(|user_id| {
+ // Don't send key updates from the sender to the sender
+ &sender_user != user_id
+ })
+ .filter(|user_id| {
+ // Only send keys if the sender doesn't share an encrypted room with the target already
+ !share_encrypted_room(&sender_user, user_id, &room_id)
+ .unwrap_or(false)
+ }),
+ );
+ }
- (
- heroes,
- joined_member_count,
- invited_member_count,
- joined_since_last_sync,
- state_events,
- )
+ let (joined_member_count, invited_member_count, heroes) = if send_member_count {
+ calculate_counts()?
+ } else {
+ (None, None, Vec::new())
+ };
+
+ (
+ heroes,
+ joined_member_count,
+ invited_member_count,
+ joined_since_last_sync,
+ state_events,
+ )
+ }
};
// Look for device list updates in this room