summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-07-03 17:43:27 +0000
committerTimo Kösters <timo@koesters.xyz>2023-07-03 17:43:27 +0000
commitf8a36e7554672b1541e7e6a0d6f62c02a6c8a30e (patch)
tree0791b7eb0dc722852e6c0aa4dfddd80cb4ddd56d /src
parent833c1505f1067c1b467fb59e0fd6ab8bdeee674e (diff)
parenta2c3256cedeeb696b303b6ca9533acd73d110e1e (diff)
downloadconduit-f8a36e7554672b1541e7e6a0d6f62c02a6c8a30e.zip
Merge branch 'memory' into 'next'
improvement: better memory usage and admin commands to analyze it See merge request famedly/conduit!497
Diffstat (limited to 'src')
-rw-r--r--src/api/client_server/sync.rs600
-rw-r--r--src/config/mod.rs2
-rw-r--r--src/database/abstraction.rs1
-rw-r--r--src/database/abstraction/rocksdb.rs6
-rw-r--r--src/database/key_value/globals.rs55
-rw-r--r--src/service/admin/mod.rs32
-rw-r--r--src/service/globals/data.rs3
-rw-r--r--src/service/globals/mod.rs4
-rw-r--r--src/service/mod.rs107
9 files changed, 498 insertions, 312 deletions
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index b4baec1..dd75347 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -653,213 +653,133 @@ async fn load_joined_room(
.user
.get_token_shortstatehash(&room_id, since)?;
- // Calculates joined_member_count, invited_member_count and heroes
- let calculate_counts = || {
- let joined_member_count = services()
- .rooms
- .state_cache
- .room_joined_count(&room_id)?
- .unwrap_or(0);
- let invited_member_count = services()
- .rooms
- .state_cache
- .room_invited_count(&room_id)?
- .unwrap_or(0);
-
- // Recalculate heroes (first 5 members)
- let mut heroes = Vec::new();
-
- if joined_member_count + invited_member_count <= 5 {
- // Go through all PDUs and for each member event, check if the user is still joined or
- // invited until we have 5 or we reach the end
-
- for hero in services()
- .rooms
- .timeline
- .all_pdus(&sender_user, &room_id)?
- .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
- .filter(|(_, pdu)| pdu.kind == TimelineEventType::RoomMember)
- .map(|(_, pdu)| {
- let content: RoomMemberEventContent = serde_json::from_str(pdu.content.get())
- .map_err(|_| {
- Error::bad_database("Invalid member event in database.")
- })?;
-
- if let Some(state_key) = &pdu.state_key {
- let user_id = UserId::parse(state_key.clone())
- .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
-
- // The membership was and still is invite or join
- if matches!(
- content.membership,
- MembershipState::Join | MembershipState::Invite
- ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)?
- || services()
- .rooms
- .state_cache
- .is_invited(&user_id, &room_id)?)
- {
- Ok::<_, Error>(Some(state_key.clone()))
- } else {
- Ok(None)
- }
- } else {
- Ok(None)
- }
- })
- // Filter out buggy users
- .filter_map(|u| u.ok())
- // Filter for possible heroes
- .flatten()
- {
- if heroes.contains(&hero) || hero == sender_user.as_str() {
- continue;
- }
-
- heroes.push(hero);
- }
- }
-
- Ok::<_, Error>((
- Some(joined_member_count),
- Some(invited_member_count),
- heroes,
- ))
- };
-
- let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
- .and_then(|shortstatehash| {
- services()
- .rooms
- .state_accessor
- .state_get(
- shortstatehash,
- &StateEventType::RoomMember,
- sender_user.as_str(),
- )
- .transpose()
- })
- .transpose()?
- .and_then(|pdu| {
- serde_json::from_str(pdu.content.get())
- .map_err(|_| Error::bad_database("Invalid PDU in database."))
- .ok()
- });
-
- let joined_since_last_sync =
- since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
-
let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) =
- if since_shortstatehash.is_none() || joined_since_last_sync {
- // Probably since = 0, we will do an initial sync
-
- let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
-
- let current_state_ids = services()
- .rooms
- .state_accessor
- .state_full_ids(current_shortstatehash)
- .await?;
+ if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
+ // No state changes
+ (Vec::new(), None, None, false, Vec::new())
+ } else {
+ // Calculates joined_member_count, invited_member_count and heroes
+ let calculate_counts = || {
+ let joined_member_count = services()
+ .rooms
+ .state_cache
+ .room_joined_count(&room_id)?
+ .unwrap_or(0);
+ let invited_member_count = services()
+ .rooms
+ .state_cache
+ .room_invited_count(&room_id)?
+ .unwrap_or(0);
- let mut state_events = Vec::new();
- let mut lazy_loaded = HashSet::new();
+ // Recalculate heroes (first 5 members)
+ let mut heroes = Vec::new();
- let mut i = 0;
- for (shortstatekey, id) in current_state_ids {
- let (event_type, state_key) = services()
- .rooms
- .short
- .get_statekey_from_short(shortstatekey)?;
+ if joined_member_count + invited_member_count <= 5 {
+ // Go through all PDUs and for each member event, check if the user is still joined or
+ // invited until we have 5 or we reach the end
- if event_type != StateEventType::RoomMember {
- let pdu = match services().rooms.timeline.get_pdu(&id)? {
- Some(pdu) => pdu,
- None => {
- error!("Pdu in state not found: {}", id);
+ for hero in services()
+ .rooms
+ .timeline
+ .all_pdus(&sender_user, &room_id)?
+ .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
+ .filter(|(_, pdu)| pdu.kind == TimelineEventType::RoomMember)
+ .map(|(_, pdu)| {
+ let content: RoomMemberEventContent =
+ serde_json::from_str(pdu.content.get()).map_err(|_| {
+ Error::bad_database("Invalid member event in database.")
+ })?;
+
+ if let Some(state_key) = &pdu.state_key {
+ let user_id = UserId::parse(state_key.clone()).map_err(|_| {
+ Error::bad_database("Invalid UserId in member PDU.")
+ })?;
+
+ // The membership was and still is invite or join
+ if matches!(
+ content.membership,
+ MembershipState::Join | MembershipState::Invite
+ ) && (services()
+ .rooms
+ .state_cache
+ .is_joined(&user_id, &room_id)?
+ || services()
+ .rooms
+ .state_cache
+ .is_invited(&user_id, &room_id)?)
+ {
+ Ok::<_, Error>(Some(state_key.clone()))
+ } else {
+ Ok(None)
+ }
+ } else {
+ Ok(None)
+ }
+ })
+ // Filter out buggy users
+ .filter_map(|u| u.ok())
+ // Filter for possible heroes
+ .flatten()
+ {
+ if heroes.contains(&hero) || hero == sender_user.as_str() {
continue;
}
- };
- state_events.push(pdu);
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
+ heroes.push(hero);
}
- } else if !lazy_load_enabled
- || full_state
- || timeline_users.contains(&state_key)
- // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
- || *sender_user == state_key
- {
- let pdu = match services().rooms.timeline.get_pdu(&id)? {
- Some(pdu) => pdu,
- None => {
- error!("Pdu in state not found: {}", id);
- continue;
- }
- };
+ }
- // This check is in case a bad user ID made it into the database
- if let Ok(uid) = UserId::parse(&state_key) {
- lazy_loaded.insert(uid);
- }
- state_events.push(pdu);
+ Ok::<_, Error>((
+ Some(joined_member_count),
+ Some(invited_member_count),
+ heroes,
+ ))
+ };
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
- }
- }
- }
+ let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
+ .and_then(|shortstatehash| {
+ services()
+ .rooms
+ .state_accessor
+ .state_get(
+ shortstatehash,
+ &StateEventType::RoomMember,
+ sender_user.as_str(),
+ )
+ .transpose()
+ })
+ .transpose()?
+ .and_then(|pdu| {
+ serde_json::from_str(pdu.content.get())
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))
+ .ok()
+ });
- // Reset lazy loading because this is an initial sync
- services().rooms.lazy_loading.lazy_load_reset(
- &sender_user,
- &sender_device,
- &room_id,
- )?;
-
- // The state_events above should contain all timeline_users, let's mark them as lazy
- // loaded.
- services().rooms.lazy_loading.lazy_load_mark_sent(
- &sender_user,
- &sender_device,
- &room_id,
- lazy_loaded,
- next_batchcount,
- );
+ let joined_since_last_sync = since_sender_member
+ .map_or(true, |member| member.membership != MembershipState::Join);
- (
- heroes,
- joined_member_count,
- invited_member_count,
- true,
- state_events,
- )
- } else if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
- // No state changes
- (Vec::new(), None, None, false, Vec::new())
- } else {
- // Incremental /sync
- let since_shortstatehash = since_shortstatehash.unwrap();
+ if since_shortstatehash.is_none() || joined_since_last_sync {
+ // Probably since = 0, we will do an initial sync
- let mut state_events = Vec::new();
- let mut lazy_loaded = HashSet::new();
+ let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
- if since_shortstatehash != current_shortstatehash {
let current_state_ids = services()
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
- let since_state_ids = services()
- .rooms
- .state_accessor
- .state_full_ids(since_shortstatehash)
- .await?;
- for (key, id) in current_state_ids {
- if full_state || since_state_ids.get(&key) != Some(&id) {
+ let mut state_events = Vec::new();
+ let mut lazy_loaded = HashSet::new();
+
+ let mut i = 0;
+ for (shortstatekey, id) in current_state_ids {
+ let (event_type, state_key) = services()
+ .rooms
+ .short
+ .get_statekey_from_short(shortstatekey)?;
+
+ if event_type != StateEventType::RoomMember {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
@@ -867,146 +787,234 @@ async fn load_joined_room(
continue;
}
};
+ state_events.push(pdu);
- if pdu.kind == TimelineEventType::RoomMember {
- match UserId::parse(
- pdu.state_key
- .as_ref()
- .expect("State event has state key")
- .clone(),
- ) {
- Ok(state_key_userid) => {
- lazy_loaded.insert(state_key_userid);
- }
- Err(e) => error!("Invalid state key for member event: {}", e),
- }
+ i += 1;
+ if i % 100 == 0 {
+ tokio::task::yield_now().await;
}
+ } else if !lazy_load_enabled
+ || full_state
+ || timeline_users.contains(&state_key)
+ // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
+ || *sender_user == state_key
+ {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+ // This check is in case a bad user ID made it into the database
+ if let Ok(uid) = UserId::parse(&state_key) {
+ lazy_loaded.insert(uid);
+ }
state_events.push(pdu);
- tokio::task::yield_now().await;
+
+ i += 1;
+ if i % 100 == 0 {
+ tokio::task::yield_now().await;
+ }
}
}
- }
- for (_, event) in &timeline_pdus {
- if lazy_loaded.contains(&event.sender) {
- continue;
- }
+ // Reset lazy loading because this is an initial sync
+ services().rooms.lazy_loading.lazy_load_reset(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ )?;
- if !services().rooms.lazy_loading.lazy_load_was_sent_before(
+ // The state_events above should contain all timeline_users, let's mark them as lazy
+ // loaded.
+ services().rooms.lazy_loading.lazy_load_mark_sent(
&sender_user,
&sender_device,
&room_id,
- &event.sender,
- )? || lazy_load_send_redundant
- {
- if let Some(member_event) = services().rooms.state_accessor.room_state_get(
- &room_id,
- &StateEventType::RoomMember,
- event.sender.as_str(),
- )? {
- lazy_loaded.insert(event.sender.clone());
- state_events.push(member_event);
+ lazy_loaded,
+ next_batchcount,
+ );
+
+ (
+ heroes,
+ joined_member_count,
+ invited_member_count,
+ true,
+ state_events,
+ )
+ } else {
+ // Incremental /sync
+ let since_shortstatehash = since_shortstatehash.unwrap();
+
+ let mut state_events = Vec::new();
+ let mut lazy_loaded = HashSet::new();
+
+ if since_shortstatehash != current_shortstatehash {
+ let current_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(current_shortstatehash)
+ .await?;
+ let since_state_ids = services()
+ .rooms
+ .state_accessor
+ .state_full_ids(since_shortstatehash)
+ .await?;
+
+ for (key, id) in current_state_ids {
+ if full_state || since_state_ids.get(&key) != Some(&id) {
+ let pdu = match services().rooms.timeline.get_pdu(&id)? {
+ Some(pdu) => pdu,
+ None => {
+ error!("Pdu in state not found: {}", id);
+ continue;
+ }
+ };
+
+ if pdu.kind == TimelineEventType::RoomMember {
+ match UserId::parse(
+ pdu.state_key
+ .as_ref()
+ .expect("State event has state key")
+ .clone(),
+ ) {
+ Ok(state_key_userid) => {
+ lazy_loaded.insert(state_key_userid);
+ }
+ Err(e) => error!("Invalid state key for member event: {}", e),
+ }
+ }
+
+ state_events.push(pdu);
+ tokio::task::yield_now().await;
+ }
}
}
- }
- services().rooms.lazy_loading.lazy_load_mark_sent(
- &sender_user,
- &sender_device,
- &room_id,
- lazy_loaded,
- next_batchcount,
- );
+ for (_, event) in &timeline_pdus {
+ if lazy_loaded.contains(&event.sender) {
+ continue;
+ }
- let encrypted_room = services()
- .rooms
- .state_accessor
- .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
- .is_some();
+ if !services().rooms.lazy_loading.lazy_load_was_sent_before(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ &event.sender,
+ )? || lazy_load_send_redundant
+ {
+ if let Some(member_event) = services().rooms.state_accessor.room_state_get(
+ &room_id,
+ &StateEventType::RoomMember,
+ event.sender.as_str(),
+ )? {
+ lazy_loaded.insert(event.sender.clone());
+ state_events.push(member_event);
+ }
+ }
+ }
- let since_encryption = services().rooms.state_accessor.state_get(
- since_shortstatehash,
- &StateEventType::RoomEncryption,
- "",
- )?;
+ services().rooms.lazy_loading.lazy_load_mark_sent(
+ &sender_user,
+ &sender_device,
+ &room_id,
+ lazy_loaded,
+ next_batchcount,
+ );
- // Calculations:
- let new_encrypted_room = encrypted_room && since_encryption.is_none();
+ let encrypted_room = services()
+ .rooms
+ .state_accessor
+ .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
+ .is_some();
- let send_member_count = state_events
- .iter()
- .any(|event| event.kind == TimelineEventType::RoomMember);
+ let since_encryption = services().rooms.state_accessor.state_get(
+ since_shortstatehash,
+ &StateEventType::RoomEncryption,
+ "",
+ )?;
- if encrypted_room {
- for state_event in &state_events {
- if state_event.kind != TimelineEventType::RoomMember {
- continue;
- }
+ // Calculations:
+ let new_encrypted_room = encrypted_room && since_encryption.is_none();
- if let Some(state_key) = &state_event.state_key {
- let user_id = UserId::parse(state_key.clone())
- .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
+ let send_member_count = state_events
+ .iter()
+ .any(|event| event.kind == TimelineEventType::RoomMember);
- if user_id == sender_user {
+ if encrypted_room {
+ for state_event in &state_events {
+ if state_event.kind != TimelineEventType::RoomMember {
continue;
}
- let new_membership = serde_json::from_str::<RoomMemberEventContent>(
- state_event.content.get(),
- )
- .map_err(|_| Error::bad_database("Invalid PDU in database."))?
- .membership;
-
- match new_membership {
- MembershipState::Join => {
- // A new user joined an encrypted room
- if !share_encrypted_room(&sender_user, &user_id, &room_id)? {
- device_list_updates.insert(user_id);
- }
+ if let Some(state_key) = &state_event.state_key {
+ let user_id = UserId::parse(state_key.clone()).map_err(|_| {
+ Error::bad_database("Invalid UserId in member PDU.")
+ })?;
+
+ if user_id == sender_user {
+ continue;
}
- MembershipState::Leave => {
- // Write down users that have left encrypted rooms we are in
- left_encrypted_users.insert(user_id);
+
+ let new_membership = serde_json::from_str::<RoomMemberEventContent>(
+ state_event.content.get(),
+ )
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))?
+ .membership;
+
+ match new_membership {
+ MembershipState::Join => {
+ // A new user joined an encrypted room
+ if !share_encrypted_room(&sender_user, &user_id, &room_id)? {
+ device_list_updates.insert(user_id);
+ }
+ }
+ MembershipState::Leave => {
+ // Write down users that have left encrypted rooms we are in
+ left_encrypted_users.insert(user_id);
+ }
+ _ => {}
}
- _ => {}
}
}
}
- }
-
- if joined_since_last_sync && encrypted_room || new_encrypted_room {
- // If the user is in a new encrypted room, give them all joined users
- device_list_updates.extend(
- services()
- .rooms
- .state_cache
- .room_members(&room_id)
- .flatten()
- .filter(|user_id| {
- // Don't send key updates from the sender to the sender
- &sender_user != user_id
- })
- .filter(|user_id| {
- // Only send keys if the sender doesn't share an encrypted room with the target already
- !share_encrypted_room(&sender_user, user_id, &room_id).unwrap_or(false)
- }),
- );
- }
- let (joined_member_count, invited_member_count, heroes) = if send_member_count {
- calculate_counts()?
- } else {
- (None, None, Vec::new())
- };
+ if joined_since_last_sync && encrypted_room || new_encrypted_room {
+ // If the user is in a new encrypted room, give them all joined users
+ device_list_updates.extend(
+ services()
+ .rooms
+ .state_cache
+ .room_members(&room_id)
+ .flatten()
+ .filter(|user_id| {
+ // Don't send key updates from the sender to the sender
+ &sender_user != user_id
+ })
+ .filter(|user_id| {
+ // Only send keys if the sender doesn't share an encrypted room with the target already
+ !share_encrypted_room(&sender_user, user_id, &room_id)
+ .unwrap_or(false)
+ }),
+ );
+ }
- (
- heroes,
- joined_member_count,
- invited_member_count,
- joined_since_last_sync,
- state_events,
- )
+ let (joined_member_count, invited_member_count, heroes) = if send_member_count {
+ calculate_counts()?
+ } else {
+ (None, None, Vec::new())
+ };
+
+ (
+ heroes,
+ joined_member_count,
+ invited_member_count,
+ joined_since_last_sync,
+ state_events,
+ )
+ }
};
// Look for device list updates in this room
diff --git a/src/config/mod.rs b/src/config/mod.rs
index 31a586f..f922282 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -224,7 +224,7 @@ fn default_database_backend() -> String {
}
fn default_db_cache_capacity_mb() -> f64 {
- 1000.0
+ 300.0
}
fn default_conduit_cache_capacity_modifier() -> f64 {
diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs
index 93660f9..0a32105 100644
--- a/src/database/abstraction.rs
+++ b/src/database/abstraction.rs
@@ -38,6 +38,7 @@ pub trait KeyValueDatabaseEngine: Send + Sync {
fn memory_usage(&self) -> Result<String> {
Ok("Current database engine does not support memory usage reporting.".to_owned())
}
+ fn clear_caches(&self) {}
}
pub trait KvTree: Send + Sync {
diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs
index 3e64e8b..f0b5f2a 100644
--- a/src/database/abstraction/rocksdb.rs
+++ b/src/database/abstraction/rocksdb.rs
@@ -45,6 +45,10 @@ fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::O
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.optimize_level_style_compaction(10 * 1024 * 1024);
+ // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
+ db_opts.set_max_background_jobs(6);
+ db_opts.set_bytes_per_sync(1048576);
+
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
db_opts.set_prefix_extractor(prefix_extractor);
@@ -121,6 +125,8 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
self.cache.get_pinned_usage() as f64 / 1024.0 / 1024.0,
))
}
+
+ fn clear_caches(&self) {}
}
impl RocksDbEngineTree<'_> {
diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs
index 7b7675c..ab3dfe0 100644
--- a/src/database/key_value/globals.rs
+++ b/src/database/key_value/globals.rs
@@ -118,8 +118,59 @@ impl service::globals::Data for KeyValueDatabase {
self._db.cleanup()
}
- fn memory_usage(&self) -> Result<String> {
- self._db.memory_usage()
+ fn memory_usage(&self) -> String {
+ let pdu_cache = self.pdu_cache.lock().unwrap().len();
+ let shorteventid_cache = self.shorteventid_cache.lock().unwrap().len();
+ let auth_chain_cache = self.auth_chain_cache.lock().unwrap().len();
+ let eventidshort_cache = self.eventidshort_cache.lock().unwrap().len();
+ let statekeyshort_cache = self.statekeyshort_cache.lock().unwrap().len();
+ let our_real_users_cache = self.our_real_users_cache.read().unwrap().len();
+ let appservice_in_room_cache = self.appservice_in_room_cache.read().unwrap().len();
+ let lasttimelinecount_cache = self.lasttimelinecount_cache.lock().unwrap().len();
+
+ let mut response = format!(
+ "\
+pdu_cache: {pdu_cache}
+shorteventid_cache: {shorteventid_cache}
+auth_chain_cache: {auth_chain_cache}
+eventidshort_cache: {eventidshort_cache}
+statekeyshort_cache: {statekeyshort_cache}
+our_real_users_cache: {our_real_users_cache}
+appservice_in_room_cache: {appservice_in_room_cache}
+lasttimelinecount_cache: {lasttimelinecount_cache}\n"
+ );
+ if let Ok(db_stats) = self._db.memory_usage() {
+ response += &db_stats;
+ }
+
+ response
+ }
+
+ fn clear_caches(&self, amount: u32) {
+ if amount > 0 {
+ self.pdu_cache.lock().unwrap().clear();
+ }
+ if amount > 1 {
+ self.shorteventid_cache.lock().unwrap().clear();
+ }
+ if amount > 2 {
+ self.auth_chain_cache.lock().unwrap().clear();
+ }
+ if amount > 3 {
+ self.eventidshort_cache.lock().unwrap().clear();
+ }
+ if amount > 4 {
+ self.statekeyshort_cache.lock().unwrap().clear();
+ }
+ if amount > 5 {
+ self.our_real_users_cache.write().unwrap().clear();
+ }
+ if amount > 6 {
+ self.appservice_in_room_cache.write().unwrap().clear();
+ }
+ if amount > 7 {
+ self.lasttimelinecount_cache.lock().unwrap().clear();
+ }
}
fn load_keypair(&self) -> Result<Ed25519KeyPair> {
diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs
index d37ec69..9250a3e 100644
--- a/src/service/admin/mod.rs
+++ b/src/service/admin/mod.rs
@@ -134,7 +134,13 @@ enum AdminCommand {
},
/// Print database memory usage statistics
- DatabaseMemoryUsage,
+ MemoryUsage,
+
+ /// Clears all of Conduit's database caches with index smaller than the amount
+ ClearDatabaseCaches { amount: u32 },
+
+ /// Clears all of Conduit's service caches with index smaller than the amount
+ ClearServiceCaches { amount: u32 },
/// Show configuration values
ShowConfig,
@@ -531,12 +537,24 @@ impl Service {
None => RoomMessageEventContent::text_plain("PDU not found."),
}
}
- AdminCommand::DatabaseMemoryUsage => match services().globals.db.memory_usage() {
- Ok(response) => RoomMessageEventContent::text_plain(response),
- Err(e) => RoomMessageEventContent::text_plain(format!(
- "Failed to get database memory usage: {e}"
- )),
- },
+ AdminCommand::MemoryUsage => {
+ let response1 = services().memory_usage();
+ let response2 = services().globals.db.memory_usage();
+
+ RoomMessageEventContent::text_plain(format!(
+ "Services:\n{response1}\n\nDatabase:\n{response2}"
+ ))
+ }
+ AdminCommand::ClearDatabaseCaches { amount } => {
+ services().globals.db.clear_caches(amount);
+
+ RoomMessageEventContent::text_plain("Done.")
+ }
+ AdminCommand::ClearServiceCaches { amount } => {
+ services().clear_caches(amount);
+
+ RoomMessageEventContent::text_plain("Done.")
+ }
AdminCommand::ShowConfig => {
// Construct and send the response
RoomMessageEventContent::text_plain(format!("{}", services().globals.config))
diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs
index 04371a0..171b3fe 100644
--- a/src/service/globals/data.rs
+++ b/src/service/globals/data.rs
@@ -15,7 +15,8 @@ pub trait Data: Send + Sync {
fn current_count(&self) -> Result<u64>;
async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()>;
fn cleanup(&self) -> Result<()>;
- fn memory_usage(&self) -> Result<String>;
+ fn memory_usage(&self) -> String;
+ fn clear_caches(&self, amount: u32);
fn load_keypair(&self) -> Result<Ed25519KeyPair>;
fn remove_keypair(&self) -> Result<()>;
fn add_signing_key(
diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs
index 9206d43..e4affde 100644
--- a/src/service/globals/mod.rs
+++ b/src/service/globals/mod.rs
@@ -214,10 +214,6 @@ impl Service {
self.db.cleanup()
}
- pub fn memory_usage(&self) -> Result<String> {
- self.db.memory_usage()
- }
-
pub fn server_name(&self) -> &ServerName {
self.config.server_name.as_ref()
}
diff --git a/src/service/mod.rs b/src/service/mod.rs
index dfdc5a6..56aed7f 100644
--- a/src/service/mod.rs
+++ b/src/service/mod.rs
@@ -90,7 +90,7 @@ impl Services {
state_compressor: rooms::state_compressor::Service {
db,
stateinfo_cache: Mutex::new(LruCache::new(
- (300.0 * config.conduit_cache_capacity_modifier) as usize,
+ (100.0 * config.conduit_cache_capacity_modifier) as usize,
)),
},
timeline: rooms::timeline::Service {
@@ -115,4 +115,109 @@ impl Services {
globals: globals::Service::load(db, config)?,
})
}
+ fn memory_usage(&self) -> String {
+ let lazy_load_waiting = self
+ .rooms
+ .lazy_loading
+ .lazy_load_waiting
+ .lock()
+ .unwrap()
+ .len();
+ let server_visibility_cache = self
+ .rooms
+ .state_accessor
+ .server_visibility_cache
+ .lock()
+ .unwrap()
+ .len();
+ let user_visibility_cache = self
+ .rooms
+ .state_accessor
+ .user_visibility_cache
+ .lock()
+ .unwrap()
+ .len();
+ let stateinfo_cache = self
+ .rooms
+ .state_compressor
+ .stateinfo_cache
+ .lock()
+ .unwrap()
+ .len();
+ let lasttimelinecount_cache = self
+ .rooms
+ .timeline
+ .lasttimelinecount_cache
+ .lock()
+ .unwrap()
+ .len();
+ let roomid_spacechunk_cache = self
+ .rooms
+ .spaces
+ .roomid_spacechunk_cache
+ .lock()
+ .unwrap()
+ .len();
+
+ format!(
+ "\
+lazy_load_waiting: {lazy_load_waiting}
+server_visibility_cache: {server_visibility_cache}
+user_visibility_cache: {user_visibility_cache}
+stateinfo_cache: {stateinfo_cache}
+lasttimelinecount_cache: {lasttimelinecount_cache}
+roomid_spacechunk_cache: {roomid_spacechunk_cache}\
+ "
+ )
+ }
+ fn clear_caches(&self, amount: u32) {
+ if amount > 0 {
+ self.rooms
+ .lazy_loading
+ .lazy_load_waiting
+ .lock()
+ .unwrap()
+ .clear();
+ }
+ if amount > 1 {
+ self.rooms
+ .state_accessor
+ .server_visibility_cache
+ .lock()
+ .unwrap()
+ .clear();
+ }
+ if amount > 2 {
+ self.rooms
+ .state_accessor
+ .user_visibility_cache
+ .lock()
+ .unwrap()
+ .clear();
+ }
+ if amount > 3 {
+ self.rooms
+ .state_compressor
+ .stateinfo_cache
+ .lock()
+ .unwrap()
+ .clear();
+ }
+ if amount > 4 {
+ self.rooms
+ .timeline
+ .lasttimelinecount_cache
+ .lock()
+ .unwrap()
+ .clear();
+ }
+ if amount > 5 {
+ self.rooms
+ .spaces
+ .roomid_spacechunk_cache
+ .lock()
+ .unwrap()
+ .clear();
+ }
+ }
}