diff options
author | Timo Kösters <timo@koesters.xyz> | 2022-10-08 13:02:52 +0200 |
---|---|---|
committer | Nyaaori <+@nyaaori.cat> | 2022-10-10 14:02:02 +0200 |
commit | d5b4754cf47982c91898bde9a9bb61a8cbf6ab40 (patch) | |
tree | f5285a3c888a4be1f669565915681d1ee5ce63e9 /src/database | |
parent | f47a5cd5d5ce20b5996d28e37415771ee6d8a34b (diff) | |
download | conduit-d5b4754cf47982c91898bde9a9bb61a8cbf6ab40.zip |
0 errors left!
Diffstat (limited to 'src/database')
-rw-r--r-- | src/database/key_value/media.rs | 4 | ||||
-rw-r--r-- | src/database/key_value/mod.rs | 2 | ||||
-rw-r--r-- | src/database/key_value/pusher.rs | 21 | ||||
-rw-r--r-- | src/database/key_value/rooms/alias.rs | 6 | ||||
-rw-r--r-- | src/database/key_value/rooms/directory.rs | 2 | ||||
-rw-r--r-- | src/database/key_value/rooms/edus/read_receipt.rs | 2 | ||||
-rw-r--r-- | src/database/key_value/rooms/metadata.rs | 14 | ||||
-rw-r--r-- | src/database/key_value/rooms/search.rs | 4 | ||||
-rw-r--r-- | src/database/key_value/rooms/timeline.rs | 10 | ||||
-rw-r--r-- | src/database/key_value/rooms/user.rs | 2 | ||||
-rw-r--r-- | src/database/key_value/sending.rs | 203 | ||||
-rw-r--r-- | src/database/key_value/users.rs | 55 | ||||
-rw-r--r-- | src/database/mod.rs | 36 |
13 files changed, 280 insertions, 81 deletions
diff --git a/src/database/key_value/media.rs b/src/database/key_value/media.rs index de96ace..6abe5ba 100644 --- a/src/database/key_value/media.rs +++ b/src/database/key_value/media.rs @@ -43,8 +43,8 @@ impl service::media::Data for KeyValueDatabase { ) -> Result<(Option<String>, Option<String>, Vec<u8>)> { let mut prefix = mxc.as_bytes().to_vec(); prefix.push(0xff); - prefix.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail - prefix.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail + prefix.extend_from_slice(&width.to_be_bytes()); + prefix.extend_from_slice(&height.to_be_bytes()); prefix.push(0xff); let (key, _) = self diff --git a/src/database/key_value/mod.rs b/src/database/key_value/mod.rs index efb8550..c4496af 100644 --- a/src/database/key_value/mod.rs +++ b/src/database/key_value/mod.rs @@ -7,7 +7,7 @@ mod media; //mod pdu; mod pusher; mod rooms; -//mod sending; +mod sending; mod transaction_ids; mod uiaa; mod users; diff --git a/src/database/key_value/pusher.rs b/src/database/key_value/pusher.rs index 15f4e26..1468a55 100644 --- a/src/database/key_value/pusher.rs +++ b/src/database/key_value/pusher.rs @@ -3,7 +3,7 @@ use ruma::{ UserId, }; -use crate::{database::KeyValueDatabase, service, Error, Result}; +use crate::{database::KeyValueDatabase, service, Error, Result, utils}; impl service::pusher::Data for KeyValueDatabase { fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> { @@ -28,9 +28,13 @@ impl service::pusher::Data for KeyValueDatabase { Ok(()) } - fn get_pusher(&self, senderkey: &[u8]) -> Result<Option<get_pushers::v3::Pusher>> { + fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result<Option<get_pushers::v3::Pusher>> { + let mut senderkey = sender.as_bytes().to_vec(); + senderkey.push(0xff); + senderkey.extend_from_slice(pushkey.as_bytes()); + self.senderkey_pusher - .get(senderkey)? + .get(&senderkey)? .map(|push| { serde_json::from_slice(&*push) .map_err(|_| Error::bad_database("Invalid Pusher in db.")) @@ -51,10 +55,17 @@ impl service::pusher::Data for KeyValueDatabase { .collect() } - fn get_pusher_senderkeys<'a>(&'a self, sender: &UserId) -> Box<dyn Iterator<Item = Vec<u8>>> { + fn get_pushkeys<'a>(&'a self, sender: &UserId) -> Box<dyn Iterator<Item = Result<String>> + 'a> { let mut prefix = sender.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.senderkey_pusher.scan_prefix(prefix).map(|(k, _)| k)) + Box::new(self.senderkey_pusher.scan_prefix(prefix).map(|(k, _)| { + let mut parts = k.splitn(2, |&b| b == 0xff); + let _senderkey = parts.next(); + let push_key = parts.next().ok_or_else(|| Error::bad_database("Invalid senderkey_pusher in db"))?; + let push_key_string = utils::string_from_bytes(push_key).map_err(|_| Error::bad_database("Invalid pusher bytes in senderkey_pusher"))?; + + Ok(push_key_string) + })) } } diff --git a/src/database/key_value/rooms/alias.rs b/src/database/key_value/rooms/alias.rs index 112d6eb..f3de89d 100644 --- a/src/database/key_value/rooms/alias.rs +++ b/src/database/key_value/rooms/alias.rs @@ -43,10 +43,10 @@ impl service::rooms::alias::Data for KeyValueDatabase { .transpose() } - fn local_aliases_for_room( - &self, + fn local_aliases_for_room<'a>( + &'a self, room_id: &RoomId, - ) -> Box<dyn Iterator<Item = Result<Box<RoomAliasId>>>> { + ) -> Box<dyn Iterator<Item = Result<Box<RoomAliasId>>> + 'a> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/key_value/rooms/directory.rs b/src/database/key_value/rooms/directory.rs index 661c202..212ced9 100644 --- a/src/database/key_value/rooms/directory.rs +++ b/src/database/key_value/rooms/directory.rs @@ -15,7 +15,7 @@ impl service::rooms::directory::Data for KeyValueDatabase { Ok(self.publicroomids.get(room_id.as_bytes())?.is_some()) } - fn public_rooms(&self) -> Box<dyn Iterator<Item = Result<Box<RoomId>>>> { + fn public_rooms<'a>(&'a self) -> Box<dyn Iterator<Item = Result<Box<RoomId>>> + 'a> { Box::new(self.publicroomids.iter().map(|(bytes, _)| { RoomId::parse( utils::string_from_bytes(&bytes).map_err(|_| { diff --git a/src/database/key_value/rooms/edus/read_receipt.rs b/src/database/key_value/rooms/edus/read_receipt.rs index c78f0f5..19c1ced 100644 --- a/src/database/key_value/rooms/edus/read_receipt.rs +++ b/src/database/key_value/rooms/edus/read_receipt.rs @@ -59,7 +59,7 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { u64, Raw<ruma::events::AnySyncEphemeralRoomEvent>, )>, - >, + > + 'a, > { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/key_value/rooms/metadata.rs b/src/database/key_value/rooms/metadata.rs index 63a6b1a..2ec18be 100644 --- a/src/database/key_value/rooms/metadata.rs +++ b/src/database/key_value/rooms/metadata.rs @@ -1,6 +1,6 @@ use ruma::RoomId; -use crate::{database::KeyValueDatabase, service, services, Result}; +use crate::{database::KeyValueDatabase, service, services, Result, utils, Error}; impl service::rooms::metadata::Data for KeyValueDatabase { fn exists(&self, room_id: &RoomId) -> Result<bool> { @@ -18,6 +18,18 @@ impl service::rooms::metadata::Data for KeyValueDatabase { .is_some()) } + fn iter_ids<'a>(&'a self) -> Box<dyn Iterator<Item = Result<Box<RoomId>>> + 'a> { + Box::new(self.roomid_shortroomid.iter().map(|(bytes, _)| { + RoomId::parse( + utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Room ID in publicroomids is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Room ID in roomid_shortroomid is invalid.")) + })) + + } + fn is_disabled(&self, room_id: &RoomId) -> Result<bool> { Ok(self.disabledroomids.get(room_id.as_bytes())?.is_some()) } diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index 79e6a32..8aa7a63 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -5,7 +5,7 @@ use ruma::RoomId; use crate::{database::KeyValueDatabase, service, services, utils, Result}; impl service::rooms::search::Data for KeyValueDatabase { - fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: String) -> Result<()> { + fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()> { let mut batch = message_body .split_terminator(|c: char| !c.is_alphanumeric()) .filter(|s| !s.is_empty()) @@ -26,7 +26,7 @@ impl service::rooms::search::Data for KeyValueDatabase { &'a self, room_id: &RoomId, search_string: &str, - ) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>>>, Vec<String>)>> { + ) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>>+ 'a>, Vec<String>)>> { let prefix = services() .rooms .short diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 5d684a1..1660a9e 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -235,7 +235,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { user_id: &UserId, room_id: &RoomId, since: u64, - ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { + ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> { let prefix = services() .rooms .short @@ -272,7 +272,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { user_id: &UserId, room_id: &RoomId, until: u64, - ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { + ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> { // Create the first part of the full pdu id let prefix = services() .rooms @@ -309,7 +309,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { user_id: &UserId, room_id: &RoomId, from: u64, - ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { + ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> { // Create the first part of the full pdu id let prefix = services() .rooms @@ -347,8 +347,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase { notifies: Vec<Box<UserId>>, highlights: Vec<Box<UserId>>, ) -> Result<()> { - let notifies_batch = Vec::new(); - let highlights_batch = Vec::new(); + let mut notifies_batch = Vec::new(); + let mut highlights_batch = Vec::new(); for user in notifies { let mut userroom_id = user.as_bytes().to_vec(); userroom_id.push(0xff); diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs index 78c78e1..9230e61 100644 --- a/src/database/key_value/rooms/user.rs +++ b/src/database/key_value/rooms/user.rs @@ -86,7 +86,7 @@ impl service::rooms::user::Data for KeyValueDatabase { fn get_shared_rooms<'a>( &'a self, users: Vec<Box<UserId>>, - ) -> Result<Box<dyn Iterator<Item = Result<Box<RoomId>>>>> { + ) -> Result<Box<dyn Iterator<Item = Result<Box<RoomId>>> + 'a>> { let iterators = users.into_iter().map(move |user_id| { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/key_value/sending.rs b/src/database/key_value/sending.rs new file mode 100644 index 0000000..d84bd49 --- /dev/null +++ b/src/database/key_value/sending.rs @@ -0,0 +1,203 @@ +use ruma::{ServerName, UserId}; + +use crate::{ + database::KeyValueDatabase, + service::{ + self, + sending::{OutgoingKind, SendingEventType}, + }, + utils, Error, Result, +}; + +impl service::sending::Data for KeyValueDatabase { + fn active_requests<'a>( + &'a self, + ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a> { + Box::new( + self.servercurrentevent_data + .iter() + .map(|(key, v)| parse_servercurrentevent(&key, v).map(|(k, e)| (key, k, e))), + ) + } + + fn active_requests_for<'a>( + &'a self, + outgoing_kind: &OutgoingKind, + ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> { + let prefix = outgoing_kind.get_prefix(); + Box::new( + self.servercurrentevent_data + .scan_prefix(prefix) + .map(|(key, v)| parse_servercurrentevent(&key, v).map(|(_, e)| (key, e))), + ) + } + + fn delete_active_request(&self, key: Vec<u8>) -> Result<()> { + self.servercurrentevent_data.remove(&key) + } + + fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { + let prefix = outgoing_kind.get_prefix(); + for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) { + self.servercurrentevent_data.remove(&key)?; + } + + Ok(()) + } + + fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { + let prefix = outgoing_kind.get_prefix(); + for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) { + self.servercurrentevent_data.remove(&key).unwrap(); + } + + for (key, _) in self.servernameevent_data.scan_prefix(prefix.clone()) { + self.servernameevent_data.remove(&key).unwrap(); + } + + Ok(()) + } + + fn queue_requests( + &self, + requests: &[(&OutgoingKind, SendingEventType)], + ) -> Result<Vec<Vec<u8>>> { + let mut batch = Vec::new(); + let mut keys = Vec::new(); + for (outgoing_kind, event) in requests { + let mut key = outgoing_kind.get_prefix(); + key.push(0xff); + key.extend_from_slice(if let SendingEventType::Pdu(value) = &event { + &**value + } else { + &[] + }); + let value = if let SendingEventType::Edu(value) = &event { + &**value + } else { + &[] + }; + batch.push((key.clone(), value.to_owned())); + keys.push(key); + } + self.servernameevent_data + .insert_batch(&mut batch.into_iter())?; + Ok(keys) + } + + fn queued_requests<'a>( + &'a self, + outgoing_kind: &OutgoingKind, + ) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> { + let prefix = outgoing_kind.get_prefix(); + return Box::new( + self.servernameevent_data + .scan_prefix(prefix.clone()) + .map(|(k, v)| parse_servercurrentevent(&k, v).map(|(_, ev)| (ev, k))), + ); + } + + fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()> { + for (e, key) in events { + let value = if let SendingEventType::Edu(value) = &e { + &**value + } else { + &[] + }; + self.servercurrentevent_data.insert(key, value)?; + self.servernameevent_data.remove(key)?; + } + + Ok(()) + } + + fn set_latest_educount(&self, server_name: &ServerName, last_count: u64) -> Result<()> { + self.servername_educount + .insert(server_name.as_bytes(), &last_count.to_be_bytes()) + } + + fn get_latest_educount(&self, server_name: &ServerName) -> Result<u64> { + self.servername_educount + .get(server_name.as_bytes())? + .map_or(Ok(0), |bytes| { + utils::u64_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Invalid u64 in servername_educount.")) + }) + } +} + +#[tracing::instrument(skip(key))] +fn parse_servercurrentevent( + key: &[u8], + value: Vec<u8>, +) -> Result<(OutgoingKind, SendingEventType)> { + // Appservices start with a plus + Ok::<_, Error>(if key.starts_with(b"+") { + let mut parts = key[1..].splitn(2, |&b| b == 0xff); + + let server = parts.next().expect("splitn always returns one element"); + let event = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; + let server = utils::string_from_bytes(server).map_err(|_| { + Error::bad_database("Invalid server bytes in server_currenttransaction") + })?; + + ( + OutgoingKind::Appservice(server), + if value.is_empty() { + SendingEventType::Pdu(event.to_vec()) + } else { + SendingEventType::Edu(value) + }, + ) + } else if key.starts_with(b"$") { + let mut parts = key[1..].splitn(3, |&b| b == 0xff); + + let user = parts.next().expect("splitn always returns one element"); + let user_string = utils::string_from_bytes(&user) + .map_err(|_| Error::bad_database("Invalid user string in servercurrentevent"))?; + let user_id = UserId::parse(user_string) + .map_err(|_| Error::bad_database("Invalid user id in servercurrentevent"))?; + + let pushkey = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; + let pushkey_string = utils::string_from_bytes(pushkey) + .map_err(|_| Error::bad_database("Invalid pushkey in servercurrentevent"))?; + + let event = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; + ( + OutgoingKind::Push(user_id, pushkey_string), + if value.is_empty() { + SendingEventType::Pdu(event.to_vec()) + } else { + // I'm pretty sure this should never be called + SendingEventType::Edu(value) + }, + ) + } else { + let mut parts = key.splitn(2, |&b| b == 0xff); + + let server = parts.next().expect("splitn always returns one element"); + let event = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; + let server = utils::string_from_bytes(server).map_err(|_| { + Error::bad_database("Invalid server bytes in server_currenttransaction") + })?; + + ( + OutgoingKind::Normal(ServerName::parse(server).map_err(|_| { + Error::bad_database("Invalid server string in server_currenttransaction") + })?), + if value.is_empty() { + SendingEventType::Pdu(event.to_vec()) + } else { + SendingEventType::Edu(value) + }, + ) + }) +} diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs index 791e249..86689f8 100644 --- a/src/database/key_value/users.rs +++ b/src/database/key_value/users.rs @@ -67,7 +67,7 @@ impl service::users::Data for KeyValueDatabase { } /// Returns an iterator over all users on this homeserver. - fn iter(&self) -> Box<dyn Iterator<Item = Result<Box<UserId>>>> { + fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Result<Box<UserId>>> + 'a> { Box::new(self.userid_password.iter().map(|(bytes, _)| { UserId::parse(utils::string_from_bytes(&bytes).map_err(|_| { Error::bad_database("User ID in userid_password is invalid unicode.") @@ -83,33 +83,11 @@ impl service::users::Data for KeyValueDatabase { let users: Vec<String> = self .userid_password .iter() - .filter_map(|(username, pw)| self.get_username_with_valid_password(&username, &pw)) + .filter_map(|(username, pw)| get_username_with_valid_password(&username, &pw)) .collect(); Ok(users) } - /// Will only return with Some(username) if the password was not empty and the - /// username could be successfully parsed. - /// If utils::string_from_bytes(...) returns an error that username will be skipped - /// and the error will be logged. - fn get_username_with_valid_password(&self, username: &[u8], password: &[u8]) -> Option<String> { - // A valid password is not empty - if password.is_empty() { - None - } else { - match utils::string_from_bytes(username) { - Ok(u) => Some(u), - Err(e) => { - warn!( - "Failed to parse username while calling get_local_users(): {}", - e.to_string() - ); - None - } - } - } - } - /// Returns the password hash for the given user. fn password_hash(&self, user_id: &UserId) -> Result<Option<String>> { self.userid_password @@ -281,7 +259,7 @@ impl service::users::Data for KeyValueDatabase { fn all_device_ids<'a>( &'a self, user_id: &UserId, - ) -> Box<dyn Iterator<Item = Result<Box<DeviceId>>>> { + ) -> Box<dyn Iterator<Item = Result<Box<DeviceId>>> + 'a> { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); // All devices have metadata @@ -626,7 +604,7 @@ impl service::users::Data for KeyValueDatabase { user_or_room_id: &str, from: u64, to: Option<u64>, - ) -> Box<dyn Iterator<Item = Result<Box<UserId>>>> { + ) -> Box<dyn Iterator<Item = Result<Box<UserId>>> + 'a> { let mut prefix = user_or_room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -906,7 +884,7 @@ impl service::users::Data for KeyValueDatabase { fn all_devices_metadata<'a>( &'a self, user_id: &UserId, - ) -> Box<dyn Iterator<Item = Result<Device>>> { + ) -> Box<dyn Iterator<Item = Result<Device>> + 'a> { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); @@ -956,3 +934,26 @@ impl service::users::Data for KeyValueDatabase { } } } + +/// Will only return with Some(username) if the password was not empty and the +/// username could be successfully parsed. +/// If utils::string_from_bytes(...) returns an error that username will be skipped +/// and the error will be logged. +fn get_username_with_valid_password(username: &[u8], password: &[u8]) -> Option<String> { + // A valid password is not empty + if password.is_empty() { + None + } else { + match utils::string_from_bytes(username) { + Ok(u) => Some(u), + Err(e) => { + warn!( + "Failed to parse username while calling get_local_users(): {}", + e.to_string() + ); + None + } + } + } +} + diff --git a/src/database/mod.rs b/src/database/mod.rs index c4e64af..191cd62 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -166,19 +166,6 @@ pub struct KeyValueDatabase { pub(super) shortstatekey_cache: Mutex<LruCache<u64, (StateEventType, String)>>, pub(super) our_real_users_cache: RwLock<HashMap<Box<RoomId>, Arc<HashSet<Box<UserId>>>>>, pub(super) appservice_in_room_cache: RwLock<HashMap<Box<RoomId>, HashMap<String, bool>>>, - pub(super) lazy_load_waiting: - Mutex<HashMap<(Box<UserId>, Box<DeviceId>, Box<RoomId>, u64), HashSet<Box<UserId>>>>, - pub(super) stateinfo_cache: Mutex< - LruCache< - u64, - Vec<( - u64, // sstatehash - HashSet<CompressedStateEvent>, // full state - HashSet<CompressedStateEvent>, // added - HashSet<CompressedStateEvent>, // removed - )>, - >, - >, pub(super) lasttimelinecount_cache: Mutex<HashMap<Box<RoomId>, u64>>, } @@ -279,10 +266,7 @@ impl KeyValueDatabase { eprintln!("ERROR: Max request size is less than 1KB. Please increase it."); } - let (admin_sender, admin_receiver) = mpsc::unbounded_channel(); - let (sending_sender, sending_receiver) = mpsc::unbounded_channel(); - - let db = Arc::new(Self { + let db_raw = Box::new(Self { _db: builder.clone(), userid_password: builder.open_tree("userid_password")?, userid_displayname: builder.open_tree("userid_displayname")?, @@ -399,14 +383,12 @@ impl KeyValueDatabase { )), our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), - lazy_load_waiting: Mutex::new(HashMap::new()), - stateinfo_cache: Mutex::new(LruCache::new( - (100.0 * config.conduit_cache_capacity_modifier) as usize, - )), lasttimelinecount_cache: Mutex::new(HashMap::new()), }); - let services_raw = Box::new(Services::build(Arc::clone(&db), config)?); + let db = Box::leak(db_raw); + + let services_raw = Box::new(Services::build(db, config)?); // This is the first and only time we initialize the SERVICE static *SERVICES.write().unwrap() = Some(Box::leak(services_raw)); @@ -851,8 +833,6 @@ impl KeyValueDatabase { // This data is probably outdated db.presenceid_presence.clear()?; - services().admin.start_handler(admin_receiver); - // Set emergency access for the conduit user match set_emergency_access() { Ok(pwd_set) => { @@ -869,19 +849,11 @@ impl KeyValueDatabase { } }; - services().sending.start_handler(sending_receiver); - Self::start_cleanup_task().await; Ok(()) } - #[cfg(feature = "conduit_bin")] - pub async fn on_shutdown() { - info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers..."); - services().globals.rotate.fire(); - } - #[tracing::instrument(skip(self))] pub fn flush(&self) -> Result<()> { let start = std::time::Instant::now(); |