summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-06-27 13:06:55 +0200
committerTimo Kösters <timo@koesters.xyz>2023-06-27 13:15:11 +0200
commitbe877ef7191a2d6cbe9a3c9b40125f1bc42e6961 (patch)
tree585372dc5779527a3605dcee9dacdd6c7a3323a3 /src
parent7c6d25dcd165ffa3535ba103ab5ffb4acbe8c558 (diff)
downloadconduit-be877ef7191a2d6cbe9a3c9b40125f1bc42e6961.zip
Improve sync performance with more caching and wrapping things in Arcs to avoid copies
Diffstat (limited to 'src')
-rw-r--r--src/api/client_server/membership.rs20
-rw-r--r--src/database/key_value/rooms/state_accessor.rs6
-rw-r--r--src/database/key_value/rooms/state_compressor.rs10
-rw-r--r--src/database/mod.rs4
-rw-r--r--src/service/mod.rs2
-rw-r--r--src/service/rooms/event_handler/mod.rs24
-rw-r--r--src/service/rooms/state/mod.rs18
-rw-r--r--src/service/rooms/state_compressor/data.rs6
-rw-r--r--src/service/rooms/state_compressor/mod.rs94
-rw-r--r--src/service/rooms/timeline/mod.rs2
10 files changed, 100 insertions, 86 deletions
diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs
index 11e37e6..ccd8d7a 100644
--- a/src/api/client_server/membership.rs
+++ b/src/api/client_server/membership.rs
@@ -743,15 +743,17 @@ async fn join_room_by_id_helper(
info!("Saving state from send_join");
let (statehash_before_join, new, removed) = services().rooms.state_compressor.save_state(
room_id,
- state
- .into_iter()
- .map(|(k, id)| {
- services()
- .rooms
- .state_compressor
- .compress_state_event(k, &id)
- })
- .collect::<Result<_>>()?,
+ Arc::new(
+ state
+ .into_iter()
+ .map(|(k, id)| {
+ services()
+ .rooms
+ .state_compressor
+ .compress_state_event(k, &id)
+ })
+ .collect::<Result<_>>()?,
+ ),
)?;
services()
diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs
index 0f0c0dc..ad08f46 100644
--- a/src/database/key_value/rooms/state_accessor.rs
+++ b/src/database/key_value/rooms/state_accessor.rs
@@ -16,7 +16,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
.1;
let mut result = HashMap::new();
let mut i = 0;
- for compressed in full_state.into_iter() {
+ for compressed in full_state.iter() {
let parsed = services()
.rooms
.state_compressor
@@ -45,7 +45,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
let mut result = HashMap::new();
let mut i = 0;
- for compressed in full_state {
+ for compressed in full_state.iter() {
let (_, eventid) = services()
.rooms
.state_compressor
@@ -95,7 +95,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
.expect("there is always one layer")
.1;
Ok(full_state
- .into_iter()
+ .iter()
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
.and_then(|compressed| {
services()
diff --git a/src/database/key_value/rooms/state_compressor.rs b/src/database/key_value/rooms/state_compressor.rs
index d0a9be4..65ea603 100644
--- a/src/database/key_value/rooms/state_compressor.rs
+++ b/src/database/key_value/rooms/state_compressor.rs
@@ -1,4 +1,4 @@
-use std::{collections::HashSet, mem::size_of};
+use std::{collections::HashSet, mem::size_of, sync::Arc};
use crate::{
database::KeyValueDatabase,
@@ -37,20 +37,20 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase {
Ok(StateDiff {
parent,
- added,
- removed,
+ added: Arc::new(added),
+ removed: Arc::new(removed),
})
}
fn save_statediff(&self, shortstatehash: u64, diff: StateDiff) -> Result<()> {
let mut value = diff.parent.unwrap_or(0).to_be_bytes().to_vec();
- for new in &diff.added {
+ for new in diff.added.iter() {
value.extend_from_slice(&new[..]);
}
if !diff.removed.is_empty() {
value.extend_from_slice(&0_u64.to_be_bytes());
- for removed in &diff.removed {
+ for removed in diff.removed.iter() {
value.extend_from_slice(&removed[..]);
}
}
diff --git a/src/database/mod.rs b/src/database/mod.rs
index 5d89d4a..4e7bda6 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -587,8 +587,8 @@ impl KeyValueDatabase {
services().rooms.state_compressor.save_state_from_diff(
current_sstatehash,
- statediffnew,
- statediffremoved,
+ Arc::new(statediffnew),
+ Arc::new(statediffremoved),
2, // every state change is 2 event changes on average
states_parents,
)?;
diff --git a/src/service/mod.rs b/src/service/mod.rs
index 3b48810..7a2bb64 100644
--- a/src/service/mod.rs
+++ b/src/service/mod.rs
@@ -90,7 +90,7 @@ impl Services {
state_compressor: rooms::state_compressor::Service {
db,
stateinfo_cache: Mutex::new(LruCache::new(
- (100.0 * config.conduit_cache_capacity_modifier) as usize,
+ (1000.0 * config.conduit_cache_capacity_modifier) as usize,
)),
},
timeline: rooms::timeline::Service {
diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs
index 800d849..066cef4 100644
--- a/src/service/rooms/event_handler/mod.rs
+++ b/src/service/rooms/event_handler/mod.rs
@@ -774,15 +774,17 @@ impl Service {
});
info!("Compressing state at event");
- let state_ids_compressed = state_at_incoming_event
- .iter()
- .map(|(shortstatekey, id)| {
- services()
- .rooms
- .state_compressor
- .compress_state_event(*shortstatekey, id)
- })
- .collect::<Result<_>>()?;
+ let state_ids_compressed = Arc::new(
+ state_at_incoming_event
+ .iter()
+ .map(|(shortstatekey, id)| {
+ services()
+ .rooms
+ .state_compressor
+ .compress_state_event(*shortstatekey, id)
+ })
+ .collect::<Result<_>>()?,
+ );
if incoming_pdu.state_key.is_some() {
info!("Preparing for stateres to derive new room state");
@@ -886,7 +888,7 @@ impl Service {
room_id: &RoomId,
room_version_id: &RoomVersionId,
incoming_state: HashMap<u64, Arc<EventId>>,
- ) -> Result<HashSet<CompressedStateEvent>> {
+ ) -> Result<Arc<HashSet<CompressedStateEvent>>> {
info!("Loading current room state ids");
let current_sstatehash = services()
.rooms
@@ -966,7 +968,7 @@ impl Service {
})
.collect::<Result<_>>()?;
- Ok(new_room_state)
+ Ok(Arc::new(new_room_state))
}
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs
index 21ad2f9..ca9430f 100644
--- a/src/service/rooms/state/mod.rs
+++ b/src/service/rooms/state/mod.rs
@@ -32,11 +32,11 @@ impl Service {
&self,
room_id: &RoomId,
shortstatehash: u64,
- statediffnew: HashSet<CompressedStateEvent>,
- _statediffremoved: HashSet<CompressedStateEvent>,
+ statediffnew: Arc<HashSet<CompressedStateEvent>>,
+ _statediffremoved: Arc<HashSet<CompressedStateEvent>>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
- for event_id in statediffnew.into_iter().filter_map(|new| {
+ for event_id in statediffnew.iter().filter_map(|new| {
services()
.rooms
.state_compressor
@@ -107,7 +107,7 @@ impl Service {
&self,
event_id: &EventId,
room_id: &RoomId,
- state_ids_compressed: HashSet<CompressedStateEvent>,
+ state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
) -> Result<u64> {
let shorteventid = services()
.rooms
@@ -152,9 +152,9 @@ impl Service {
.copied()
.collect();
- (statediffnew, statediffremoved)
+ (Arc::new(statediffnew), Arc::new(statediffremoved))
} else {
- (state_ids_compressed, HashSet::new())
+ (state_ids_compressed, Arc::new(HashSet::new()))
};
services().rooms.state_compressor.save_state_from_diff(
shortstatehash,
@@ -234,8 +234,8 @@ impl Service {
services().rooms.state_compressor.save_state_from_diff(
shortstatehash,
- statediffnew,
- statediffremoved,
+ Arc::new(statediffnew),
+ Arc::new(statediffremoved),
2,
states_parents,
)?;
@@ -396,7 +396,7 @@ impl Service {
.1;
Ok(full_state
- .into_iter()
+ .iter()
.filter_map(|compressed| {
services()
.rooms
diff --git a/src/service/rooms/state_compressor/data.rs b/src/service/rooms/state_compressor/data.rs
index ce164c6..d221d57 100644
--- a/src/service/rooms/state_compressor/data.rs
+++ b/src/service/rooms/state_compressor/data.rs
@@ -1,12 +1,12 @@
-use std::collections::HashSet;
+use std::{collections::HashSet, sync::Arc};
use super::CompressedStateEvent;
use crate::Result;
pub struct StateDiff {
pub parent: Option<u64>,
- pub added: HashSet<CompressedStateEvent>,
- pub removed: HashSet<CompressedStateEvent>,
+ pub added: Arc<HashSet<CompressedStateEvent>>,
+ pub removed: Arc<HashSet<CompressedStateEvent>>,
}
pub trait Data: Send + Sync {
diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs
index 356f32c..d29b020 100644
--- a/src/service/rooms/state_compressor/mod.rs
+++ b/src/service/rooms/state_compressor/mod.rs
@@ -20,10 +20,10 @@ pub struct Service {
LruCache<
u64,
Vec<(
- u64, // sstatehash
- HashSet<CompressedStateEvent>, // full state
- HashSet<CompressedStateEvent>, // added
- HashSet<CompressedStateEvent>, // removed
+ u64, // sstatehash
+ Arc<HashSet<CompressedStateEvent>>, // full state
+ Arc<HashSet<CompressedStateEvent>>, // added
+ Arc<HashSet<CompressedStateEvent>>, // removed
)>,
>,
>,
@@ -39,10 +39,10 @@ impl Service {
shortstatehash: u64,
) -> Result<
Vec<(
- u64, // sstatehash
- HashSet<CompressedStateEvent>, // full state
- HashSet<CompressedStateEvent>, // added
- HashSet<CompressedStateEvent>, // removed
+ u64, // sstatehash
+ Arc<HashSet<CompressedStateEvent>>, // full state
+ Arc<HashSet<CompressedStateEvent>>, // added
+ Arc<HashSet<CompressedStateEvent>>, // removed
)>,
> {
if let Some(r) = self
@@ -62,13 +62,19 @@ impl Service {
if let Some(parent) = parent {
let mut response = self.load_shortstatehash_info(parent)?;
- let mut state = response.last().unwrap().1.clone();
+ let mut state = (*response.last().unwrap().1).clone();
state.extend(added.iter().copied());
+ let removed = (*removed).clone();
for r in &removed {
state.remove(r);
}
- response.push((shortstatehash, state, added, removed));
+ response.push((shortstatehash, Arc::new(state), added, Arc::new(removed)));
+
+ self.stateinfo_cache
+ .lock()
+ .unwrap()
+ .insert(shortstatehash, response.clone());
Ok(response)
} else {
@@ -135,14 +141,14 @@ impl Service {
pub fn save_state_from_diff(
&self,
shortstatehash: u64,
- statediffnew: HashSet<CompressedStateEvent>,
- statediffremoved: HashSet<CompressedStateEvent>,
+ statediffnew: Arc<HashSet<CompressedStateEvent>>,
+ statediffremoved: Arc<HashSet<CompressedStateEvent>>,
diff_to_sibling: usize,
mut parent_states: Vec<(
- u64, // sstatehash
- HashSet<CompressedStateEvent>, // full state
- HashSet<CompressedStateEvent>, // added
- HashSet<CompressedStateEvent>, // removed
+ u64, // sstatehash
+ Arc<HashSet<CompressedStateEvent>>, // full state
+ Arc<HashSet<CompressedStateEvent>>, // added
+ Arc<HashSet<CompressedStateEvent>>, // removed
)>,
) -> Result<()> {
let diffsum = statediffnew.len() + statediffremoved.len();
@@ -152,29 +158,29 @@ impl Service {
// To many layers, we have to go deeper
let parent = parent_states.pop().unwrap();
- let mut parent_new = parent.2;
- let mut parent_removed = parent.3;
+ let mut parent_new = (*parent.2).clone();
+ let mut parent_removed = (*parent.3).clone();
- for removed in statediffremoved {
- if !parent_new.remove(&removed) {
+ for removed in statediffremoved.iter() {
+ if !parent_new.remove(removed) {
// It was not added in the parent and we removed it
- parent_removed.insert(removed);
+ parent_removed.insert(removed.clone());
}
// Else it was added in the parent and we removed it again. We can forget this change
}
- for new in statediffnew {
- if !parent_removed.remove(&new) {
+ for new in statediffnew.iter() {
+ if !parent_removed.remove(new) {
// It was not touched in the parent and we added it
- parent_new.insert(new);
+ parent_new.insert(new.clone());
}
// Else it was removed in the parent and we added it again. We can forget this change
}
self.save_state_from_diff(
shortstatehash,
- parent_new,
- parent_removed,
+ Arc::new(parent_new),
+ Arc::new(parent_removed),
diffsum,
parent_states,
)?;
@@ -205,29 +211,29 @@ impl Service {
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
// Diff too big, we replace above layer(s)
- let mut parent_new = parent.2;
- let mut parent_removed = parent.3;
+ let mut parent_new = (*parent.2).clone();
+ let mut parent_removed = (*parent.3).clone();
- for removed in statediffremoved {
- if !parent_new.remove(&removed) {
+ for removed in statediffremoved.iter() {
+ if !parent_new.remove(removed) {
// It was not added in the parent and we removed it
- parent_removed.insert(removed);
+ parent_removed.insert(removed.clone());
}
// Else it was added in the parent and we removed it again. We can forget this change
}
- for new in statediffnew {
- if !parent_removed.remove(&new) {
+ for new in statediffnew.iter() {
+ if !parent_removed.remove(new) {
// It was not touched in the parent and we added it
- parent_new.insert(new);
+ parent_new.insert(new.clone());
}
// Else it was removed in the parent and we added it again. We can forget this change
}
self.save_state_from_diff(
shortstatehash,
- parent_new,
- parent_removed,
+ Arc::new(parent_new),
+ Arc::new(parent_removed),
diffsum,
parent_states,
)?;
@@ -250,11 +256,11 @@ impl Service {
pub fn save_state(
&self,
room_id: &RoomId,
- new_state_ids_compressed: HashSet<CompressedStateEvent>,
+ new_state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
) -> Result<(
u64,
- HashSet<CompressedStateEvent>,
- HashSet<CompressedStateEvent>,
+ Arc<HashSet<CompressedStateEvent>>,
+ Arc<HashSet<CompressedStateEvent>>,
)> {
let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?;
@@ -271,7 +277,11 @@ impl Service {
.get_or_create_shortstatehash(&state_hash)?;
if Some(new_shortstatehash) == previous_shortstatehash {
- return Ok((new_shortstatehash, HashSet::new(), HashSet::new()));
+ return Ok((
+ new_shortstatehash,
+ Arc::new(HashSet::new()),
+ Arc::new(HashSet::new()),
+ ));
}
let states_parents = previous_shortstatehash
@@ -290,9 +300,9 @@ impl Service {
.copied()
.collect();
- (statediffnew, statediffremoved)
+ (Arc::new(statediffnew), Arc::new(statediffremoved))
} else {
- (new_state_ids_compressed, HashSet::new())
+ (new_state_ids_compressed, Arc::new(HashSet::new()))
};
if !already_existed {
diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs
index 2356a00..56769d5 100644
--- a/src/service/rooms/timeline/mod.rs
+++ b/src/service/rooms/timeline/mod.rs
@@ -946,7 +946,7 @@ impl Service {
pdu: &PduEvent,
pdu_json: CanonicalJsonObject,
new_room_leaves: Vec<OwnedEventId>,
- state_ids_compressed: HashSet<CompressedStateEvent>,
+ state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
soft_fail: bool,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<Option<Vec<u8>>> {