diff options
author | Timo Kösters <timo@koesters.xyz> | 2022-10-05 20:34:31 +0200 |
---|---|---|
committer | Nyaaori <+@nyaaori.cat> | 2022-10-10 14:02:01 +0200 |
commit | a4637e2ba1093065a6fda3fa2ad2b2b9f30eea63 (patch) | |
tree | 2d31313957d699875fc61f570686318b523ae0f1 /src/database | |
parent | 33a2b2b7729bb40253fd174d99ad773869b5ecfe (diff) | |
download | conduit-a4637e2ba1093065a6fda3fa2ad2b2b9f30eea63.zip |
cargo fmt
Diffstat (limited to 'src/database')
30 files changed, 691 insertions, 562 deletions
diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 1388dc3..0727728 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -1,4 +1,4 @@ -use super::{super::Config, watchers::Watchers, KvTree, KeyValueDatabaseEngine}; +use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree}; use crate::{utils, Result}; use std::{ future::Future, diff --git a/src/database/key_value/account_data.rs b/src/database/key_value/account_data.rs index 5674ac0..7d2a870 100644 --- a/src/database/key_value/account_data.rs +++ b/src/database/key_value/account_data.rs @@ -1,9 +1,15 @@ use std::collections::HashMap; -use ruma::{UserId, DeviceId, signatures::CanonicalJsonValue, api::client::{uiaa::UiaaInfo, error::ErrorKind}, events::{RoomAccountDataEventType, AnyEphemeralRoomEvent}, serde::Raw, RoomId}; -use serde::{Serialize, de::DeserializeOwned}; - -use crate::{Result, database::KeyValueDatabase, service, Error, utils, services}; +use ruma::{ + api::client::{error::ErrorKind, uiaa::UiaaInfo}, + events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, + serde::Raw, + signatures::CanonicalJsonValue, + DeviceId, RoomId, UserId, +}; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::account_data::Data for KeyValueDatabase { /// Places one event in the account data of the user and removes the previous entry. diff --git a/src/database/key_value/appservice.rs b/src/database/key_value/appservice.rs index f427ba7..9a821a6 100644 --- a/src/database/key_value/appservice.rs +++ b/src/database/key_value/appservice.rs @@ -55,10 +55,13 @@ impl service::appservice::Data for KeyValueDatabase { } fn iter_ids<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<String>> + 'a>> { - Ok(Box::new(self.id_appserviceregistrations.iter().map(|(id, _)| { - utils::string_from_bytes(&id) - .map_err(|_| Error::bad_database("Invalid id bytes in id_appserviceregistrations.")) - }))) + Ok(Box::new(self.id_appserviceregistrations.iter().map( + |(id, _)| { + utils::string_from_bytes(&id).map_err(|_| { + Error::bad_database("Invalid id bytes in id_appserviceregistrations.") + }) + }, + ))) } fn all(&self) -> Result<Vec<(String, serde_yaml::Value)>> { diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index 199cbf6..fafaf49 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -2,9 +2,13 @@ use std::collections::BTreeMap; use async_trait::async_trait; use futures_util::{stream::FuturesUnordered, StreamExt}; -use ruma::{signatures::Ed25519KeyPair, UserId, DeviceId, ServerName, api::federation::discovery::{ServerSigningKeys, VerifyKey}, ServerSigningKeyId, MilliSecondsSinceUnixEpoch}; +use ruma::{ + api::federation::discovery::{ServerSigningKeys, VerifyKey}, + signatures::Ed25519KeyPair, + DeviceId, MilliSecondsSinceUnixEpoch, ServerName, ServerSigningKeyId, UserId, +}; -use crate::{Result, service, database::KeyValueDatabase, Error, utils, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; pub const COUNTER: &[u8] = b"c"; @@ -35,28 +39,24 @@ impl service::globals::Data for KeyValueDatabase { // Return when *any* user changed his key // TODO: only send for user they share a room with - futures.push( - self.todeviceid_events - .watch_prefix(&userdeviceid_prefix), - ); + futures.push(self.todeviceid_events.watch_prefix(&userdeviceid_prefix)); futures.push(self.userroomid_joined.watch_prefix(&userid_prefix)); - futures.push( - self.userroomid_invitestate - .watch_prefix(&userid_prefix), - ); + futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix)); futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix)); futures.push( self.userroomid_notificationcount .watch_prefix(&userid_prefix), ); - futures.push( - self.userroomid_highlightcount - .watch_prefix(&userid_prefix), - ); + futures.push(self.userroomid_highlightcount.watch_prefix(&userid_prefix)); // Events for rooms we are in - for room_id in services().rooms.state_cache.rooms_joined(user_id).filter_map(|r| r.ok()) { + for room_id in services() + .rooms + .state_cache + .rooms_joined(user_id) + .filter_map(|r| r.ok()) + { let short_roomid = services() .rooms .short @@ -75,15 +75,9 @@ impl service::globals::Data for KeyValueDatabase { futures.push(self.pduid_pdu.watch_prefix(&short_roomid)); // EDUs - futures.push( - self.roomid_lasttypingupdate - .watch_prefix(&roomid_bytes), - ); + futures.push(self.roomid_lasttypingupdate.watch_prefix(&roomid_bytes)); - futures.push( - self.readreceiptid_readreceipt - .watch_prefix(&roomid_prefix), - ); + futures.push(self.readreceiptid_readreceipt.watch_prefix(&roomid_prefix)); // Key changes futures.push(self.keychangeid_userid.watch_prefix(&roomid_prefix)); @@ -110,10 +104,7 @@ impl service::globals::Data for KeyValueDatabase { futures.push(self.keychangeid_userid.watch_prefix(&userid_prefix)); // One time keys - futures.push( - self.userid_lastonetimekeyupdate - .watch_prefix(&userid_bytes), - ); + futures.push(self.userid_lastonetimekeyupdate.watch_prefix(&userid_bytes)); futures.push(Box::pin(services().globals.rotate.watch())); @@ -238,10 +229,7 @@ impl service::globals::Data for KeyValueDatabase { } fn bump_database_version(&self, new_version: u64) -> Result<()> { - self.global - .insert(b"version", &new_version.to_be_bytes())?; + self.global.insert(b"version", &new_version.to_be_bytes())?; Ok(()) } - - } diff --git a/src/database/key_value/key_backups.rs b/src/database/key_value/key_backups.rs index 8171451..0738f73 100644 --- a/src/database/key_value/key_backups.rs +++ b/src/database/key_value/key_backups.rs @@ -1,8 +1,15 @@ use std::collections::BTreeMap; -use ruma::{UserId, serde::Raw, api::client::{backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, error::ErrorKind}, RoomId}; +use ruma::{ + api::client::{ + backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, + error::ErrorKind, + }, + serde::Raw, + RoomId, UserId, +}; -use crate::{Result, service, database::KeyValueDatabase, services, Error, utils}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::key_backups::Data for KeyValueDatabase { fn create_backup( @@ -118,11 +125,7 @@ impl service::key_backups::Data for KeyValueDatabase { .transpose() } - fn get_backup( - &self, - user_id: &UserId, - version: &str, - ) -> Result<Option<Raw<BackupAlgorithm>>> { + fn get_backup(&self, user_id: &UserId, version: &str) -> Result<Option<Raw<BackupAlgorithm>>> { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(version.as_bytes()); @@ -322,12 +325,7 @@ impl service::key_backups::Data for KeyValueDatabase { Ok(()) } - fn delete_room_keys( - &self, - user_id: &UserId, - version: &str, - room_id: &RoomId, - ) -> Result<()> { + fn delete_room_keys(&self, user_id: &UserId, version: &str, room_id: &RoomId) -> Result<()> { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(version.as_bytes()); diff --git a/src/database/key_value/media.rs b/src/database/key_value/media.rs index f024487..de96ace 100644 --- a/src/database/key_value/media.rs +++ b/src/database/key_value/media.rs @@ -1,9 +1,16 @@ use ruma::api::client::error::ErrorKind; -use crate::{database::KeyValueDatabase, service, Error, utils, Result}; +use crate::{database::KeyValueDatabase, service, utils, Error, Result}; impl service::media::Data for KeyValueDatabase { - fn create_file_metadata(&self, mxc: String, width: u32, height: u32, content_disposition: Option<&str>, content_type: Option<&str>) -> Result<Vec<u8>> { + fn create_file_metadata( + &self, + mxc: String, + width: u32, + height: u32, + content_disposition: Option<&str>, + content_type: Option<&str>, + ) -> Result<Vec<u8>> { let mut key = mxc.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(&width.to_be_bytes()); @@ -28,14 +35,23 @@ impl service::media::Data for KeyValueDatabase { Ok(key) } - fn search_file_metadata(&self, mxc: String, width: u32, height: u32) -> Result<(Option<String>, Option<String>, Vec<u8>)> { + fn search_file_metadata( + &self, + mxc: String, + width: u32, + height: u32, + ) -> Result<(Option<String>, Option<String>, Vec<u8>)> { let mut prefix = mxc.as_bytes().to_vec(); prefix.push(0xff); prefix.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail prefix.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail prefix.push(0xff); - let (key, _) = self.mediaid_file.scan_prefix(prefix).next().ok_or(Error::BadRequest(ErrorKind::NotFound, "Media not found"))?; + let (key, _) = self + .mediaid_file + .scan_prefix(prefix) + .next() + .ok_or(Error::BadRequest(ErrorKind::NotFound, "Media not found"))?; let mut parts = key.rsplit(|&b| b == 0xff); @@ -57,9 +73,7 @@ impl service::media::Data for KeyValueDatabase { } else { Some( utils::string_from_bytes(content_disposition_bytes).map_err(|_| { - Error::bad_database( - "Content Disposition in mediaid_file is invalid unicode.", - ) + Error::bad_database("Content Disposition in mediaid_file is invalid unicode.") })?, ) }; diff --git a/src/database/key_value/pusher.rs b/src/database/key_value/pusher.rs index b05e47b..15f4e26 100644 --- a/src/database/key_value/pusher.rs +++ b/src/database/key_value/pusher.rs @@ -1,6 +1,9 @@ -use ruma::{UserId, api::client::push::{set_pusher, get_pushers}}; +use ruma::{ + api::client::push::{get_pushers, set_pusher}, + UserId, +}; -use crate::{service, database::KeyValueDatabase, Error, Result}; +use crate::{database::KeyValueDatabase, service, Error, Result}; impl service::pusher::Data for KeyValueDatabase { fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> { @@ -48,10 +51,7 @@ impl service::pusher::Data for KeyValueDatabase { .collect() } - fn get_pusher_senderkeys<'a>( - &'a self, - sender: &UserId, - ) -> Box<dyn Iterator<Item = Vec<u8>>> { + fn get_pusher_senderkeys<'a>(&'a self, sender: &UserId) -> Box<dyn Iterator<Item = Vec<u8>>> { let mut prefix = sender.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/key_value/rooms/alias.rs b/src/database/key_value/rooms/alias.rs index 0aa8dd4..112d6eb 100644 --- a/src/database/key_value/rooms/alias.rs +++ b/src/database/key_value/rooms/alias.rs @@ -1,13 +1,9 @@ -use ruma::{RoomId, RoomAliasId, api::client::error::ErrorKind}; +use ruma::{api::client::error::ErrorKind, RoomAliasId, RoomId}; -use crate::{service, database::KeyValueDatabase, utils, Error, services, Result}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::alias::Data for KeyValueDatabase { - fn set_alias( - &self, - alias: &RoomAliasId, - room_id: &RoomId - ) -> Result<()> { + fn set_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> Result<()> { self.alias_roomid .insert(alias.alias().as_bytes(), room_id.as_bytes())?; let mut aliasid = room_id.as_bytes().to_vec(); @@ -17,10 +13,7 @@ impl service::rooms::alias::Data for KeyValueDatabase { Ok(()) } - fn remove_alias( - &self, - alias: &RoomAliasId, - ) -> Result<()> { + fn remove_alias(&self, alias: &RoomAliasId) -> Result<()> { if let Some(room_id) = self.alias_roomid.get(alias.alias().as_bytes())? { let mut prefix = room_id.to_vec(); prefix.push(0xff); @@ -38,10 +31,7 @@ impl service::rooms::alias::Data for KeyValueDatabase { Ok(()) } - fn resolve_local_alias( - &self, - alias: &RoomAliasId - ) -> Result<Option<Box<RoomId>>> { + fn resolve_local_alias(&self, alias: &RoomAliasId) -> Result<Option<Box<RoomId>>> { self.alias_roomid .get(alias.alias().as_bytes())? .map(|bytes| { diff --git a/src/database/key_value/rooms/auth_chain.rs b/src/database/key_value/rooms/auth_chain.rs index 49d3956..60057ac 100644 --- a/src/database/key_value/rooms/auth_chain.rs +++ b/src/database/key_value/rooms/auth_chain.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, mem::size_of, sync::Arc}; -use crate::{service, database::KeyValueDatabase, Result, utils}; +use crate::{database::KeyValueDatabase, service, utils, Result}; impl service::rooms::auth_chain::Data for KeyValueDatabase { fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result<Option<Arc<HashSet<u64>>>> { @@ -12,14 +12,13 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase { // We only save auth chains for single events in the db if key.len() == 1 { // Check DB cache - let chain = self.shorteventid_authchain + let chain = self + .shorteventid_authchain .get(&key[0].to_be_bytes())? .map(|chain| { chain .chunks_exact(size_of::<u64>()) - .map(|chunk| { - utils::u64_from_bytes(chunk).expect("byte length is correct") - }) + .map(|chunk| utils::u64_from_bytes(chunk).expect("byte length is correct")) .collect() }); @@ -37,7 +36,6 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase { } Ok(None) - } fn cache_auth_chain(&self, key: Vec<u64>, auth_chain: Arc<HashSet<u64>>) -> Result<()> { @@ -53,7 +51,10 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase { } // Cache in RAM - self.auth_chain_cache.lock().unwrap().insert(key, auth_chain); + self.auth_chain_cache + .lock() + .unwrap() + .insert(key, auth_chain); Ok(()) } diff --git a/src/database/key_value/rooms/directory.rs b/src/database/key_value/rooms/directory.rs index 727004e..661c202 100644 --- a/src/database/key_value/rooms/directory.rs +++ b/src/database/key_value/rooms/directory.rs @@ -1,6 +1,6 @@ use ruma::RoomId; -use crate::{service, database::KeyValueDatabase, utils, Error, Result}; +use crate::{database::KeyValueDatabase, service, utils, Error, Result}; impl service::rooms::directory::Data for KeyValueDatabase { fn set_public(&self, room_id: &RoomId) -> Result<()> { diff --git a/src/database/key_value/rooms/edus/mod.rs b/src/database/key_value/rooms/edus/mod.rs index b5007f8..6c65291 100644 --- a/src/database/key_value/rooms/edus/mod.rs +++ b/src/database/key_value/rooms/edus/mod.rs @@ -1,7 +1,7 @@ mod presence; -mod typing; mod read_receipt; +mod typing; -use crate::{service, database::KeyValueDatabase}; +use crate::{database::KeyValueDatabase, service}; impl service::rooms::edus::Data for KeyValueDatabase {} diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs index 1477c28..fdd51ce 100644 --- a/src/database/key_value/rooms/edus/presence.rs +++ b/src/database/key_value/rooms/edus/presence.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -use ruma::{UserId, RoomId, events::presence::PresenceEvent, presence::PresenceState, UInt}; +use ruma::{events::presence::PresenceEvent, presence::PresenceState, RoomId, UInt, UserId}; -use crate::{service, database::KeyValueDatabase, utils, Error, services, Result}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::edus::presence::Data for KeyValueDatabase { fn update_presence( diff --git a/src/database/key_value/rooms/edus/read_receipt.rs b/src/database/key_value/rooms/edus/read_receipt.rs index a12e265..c78f0f5 100644 --- a/src/database/key_value/rooms/edus/read_receipt.rs +++ b/src/database/key_value/rooms/edus/read_receipt.rs @@ -1,8 +1,10 @@ use std::mem; -use ruma::{UserId, RoomId, events::receipt::ReceiptEvent, serde::Raw, signatures::CanonicalJsonObject}; +use ruma::{ + events::receipt::ReceiptEvent, serde::Raw, signatures::CanonicalJsonObject, RoomId, UserId, +}; -use crate::{database::KeyValueDatabase, service, utils, Error, services, Result}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { fn readreceipt_update( @@ -50,13 +52,15 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { &'a self, room_id: &RoomId, since: u64, - ) -> Box<dyn Iterator< - Item=Result<( - Box<UserId>, - u64, - Raw<ruma::events::AnySyncEphemeralRoomEvent>, - )>, - >> { + ) -> Box< + dyn Iterator< + Item = Result<( + Box<UserId>, + u64, + Raw<ruma::events::AnySyncEphemeralRoomEvent>, + )>, + >, + > { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); let prefix2 = prefix.clone(); @@ -64,42 +68,44 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { let mut first_possible_edu = prefix.clone(); first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since - Box::new(self.readreceiptid_readreceipt - .iter_from(&first_possible_edu, false) - .take_while(move |(k, _)| k.starts_with(&prefix2)) - .map(move |(k, v)| { - let count = - utils::u64_from_bytes(&k[prefix.len()..prefix.len() + mem::size_of::<u64>()]) - .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; - let user_id = UserId::parse( - utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..]) - .map_err(|_| { - Error::bad_database("Invalid readreceiptid userid bytes in db.") - })?, - ) + Box::new( + self.readreceiptid_readreceipt + .iter_from(&first_possible_edu, false) + .take_while(move |(k, _)| k.starts_with(&prefix2)) + .map(move |(k, v)| { + let count = utils::u64_from_bytes( + &k[prefix.len()..prefix.len() + mem::size_of::<u64>()], + ) + .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; + let user_id = UserId::parse( + utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..]) + .map_err(|_| { + Error::bad_database("Invalid readreceiptid userid bytes in db.") + })?, + ) .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; - let mut json = serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| { - Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") - })?; - json.remove("room_id"); - - Ok(( - user_id, - count, - Raw::from_json( - serde_json::value::to_raw_value(&json).expect("json is valid raw value"), - ), - )) - })) + let mut json = + serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| { + Error::bad_database( + "Read receipt in roomlatestid_roomlatest is invalid json.", + ) + })?; + json.remove("room_id"); + + Ok(( + user_id, + count, + Raw::from_json( + serde_json::value::to_raw_value(&json) + .expect("json is valid raw value"), + ), + )) + }), + ) } - fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()> { + fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { let mut key = room_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(user_id.as_bytes()); diff --git a/src/database/key_value/rooms/edus/typing.rs b/src/database/key_value/rooms/edus/typing.rs index b7d3596..7b211e7 100644 --- a/src/database/key_value/rooms/edus/typing.rs +++ b/src/database/key_value/rooms/edus/typing.rs @@ -1,16 +1,11 @@ use std::collections::HashSet; -use ruma::{UserId, RoomId}; +use ruma::{RoomId, UserId}; -use crate::{database::KeyValueDatabase, service, utils, Error, services, Result}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::edus::typing::Data for KeyValueDatabase { - fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()> { + fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -30,11 +25,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { Ok(()) } - fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()> { + fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -53,17 +44,16 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { } if found_outdated { - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?; + self.roomid_lasttypingupdate.insert( + room_id.as_bytes(), + &services().globals.next_count()?.to_be_bytes(), + )?; } Ok(()) } - fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result<u64> { + fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> { Ok(self .roomid_lasttypingupdate .get(room_id.as_bytes())? @@ -76,10 +66,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { .unwrap_or(0)) } - fn typings_all( - &self, - room_id: &RoomId, - ) -> Result<HashSet<Box<UserId>>> { + fn typings_all(&self, room_id: &RoomId) -> Result<HashSet<Box<UserId>>> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -89,7 +76,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| { Error::bad_database("User ID in typingid_userid is invalid unicode.") })?) - .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; + .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; user_ids.insert(user_id); } diff --git a/src/database/key_value/rooms/lazy_load.rs b/src/database/key_value/rooms/lazy_load.rs index 133e1d0..a19d52c 100644 --- a/src/database/key_value/rooms/lazy_load.rs +++ b/src/database/key_value/rooms/lazy_load.rs @@ -1,6 +1,6 @@ -use ruma::{UserId, DeviceId, RoomId}; +use ruma::{DeviceId, RoomId, UserId}; -use crate::{service, database::KeyValueDatabase, Result}; +use crate::{database::KeyValueDatabase, service, Result}; impl service::rooms::lazy_loading::Data for KeyValueDatabase { fn lazy_load_was_sent_before( diff --git a/src/database/key_value/rooms/metadata.rs b/src/database/key_value/rooms/metadata.rs index 72f6251..63a6b1a 100644 --- a/src/database/key_value/rooms/metadata.rs +++ b/src/database/key_value/rooms/metadata.rs @@ -1,6 +1,6 @@ use ruma::RoomId; -use crate::{service, database::KeyValueDatabase, Result, services}; +use crate::{database::KeyValueDatabase, service, services, Result}; impl service::rooms::metadata::Data for KeyValueDatabase { fn exists(&self, room_id: &RoomId) -> Result<bool> { diff --git a/src/database/key_value/rooms/outlier.rs b/src/database/key_value/rooms/outlier.rs index aa97544..2ecaadb 100644 --- a/src/database/key_value/rooms/outlier.rs +++ b/src/database/key_value/rooms/outlier.rs @@ -1,6 +1,6 @@ -use ruma::{EventId, signatures::CanonicalJsonObject}; +use ruma::{signatures::CanonicalJsonObject, EventId}; -use crate::{service, database::KeyValueDatabase, PduEvent, Error, Result}; +use crate::{database::KeyValueDatabase, service, Error, PduEvent, Result}; impl service::rooms::outlier::Data for KeyValueDatabase { fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> { diff --git a/src/database/key_value/rooms/pdu_metadata.rs b/src/database/key_value/rooms/pdu_metadata.rs index f3ac414..76ec734 100644 --- a/src/database/key_value/rooms/pdu_metadata.rs +++ b/src/database/key_value/rooms/pdu_metadata.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use ruma::{RoomId, EventId}; +use ruma::{EventId, RoomId}; -use crate::{service, database::KeyValueDatabase, Result}; +use crate::{database::KeyValueDatabase, service, Result}; impl service::rooms::pdu_metadata::Data for KeyValueDatabase { fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> { diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index 41df544..79e6a32 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -2,7 +2,7 @@ use std::mem::size_of; use ruma::RoomId; -use crate::{service, database::KeyValueDatabase, utils, Result, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Result}; impl service::rooms::search::Data for KeyValueDatabase { fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: String) -> Result<()> { @@ -27,7 +27,9 @@ impl service::rooms::search::Data for KeyValueDatabase { room_id: &RoomId, search_string: &str, ) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>>>, Vec<String>)>> { - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -63,10 +65,10 @@ impl service::rooms::search::Data for KeyValueDatabase { }; let mapped = common_elements.map(move |id| { - let mut pduid = prefix_clone.clone(); - pduid.extend_from_slice(&id); - pduid - }); + let mut pduid = prefix_clone.clone(); + pduid.extend_from_slice(&id); + pduid + }); Ok(Some((Box::new(mapped), words))) } diff --git a/src/database/key_value/rooms/short.rs b/src/database/key_value/rooms/short.rs index ecd12da..c022317 100644 --- a/src/database/key_value/rooms/short.rs +++ b/src/database/key_value/rooms/short.rs @@ -1,14 +1,11 @@ use std::sync::Arc; -use ruma::{EventId, events::StateEventType, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId}; -use crate::{Result, database::KeyValueDatabase, service, utils, Error, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::short::Data for KeyValueDatabase { - fn get_or_create_shorteventid( - &self, - event_id: &EventId, - ) -> Result<u64> { + fn get_or_create_shorteventid(&self, event_id: &EventId) -> Result<u64> { if let Some(short) = self.eventidshort_cache.lock().unwrap().get_mut(event_id) { return Ok(*short); } @@ -180,10 +177,7 @@ impl service::rooms::short::Data for KeyValueDatabase { } /// Returns (shortstatehash, already_existed) - fn get_or_create_shortstatehash( - &self, - state_hash: &[u8], - ) -> Result<(u64, bool)> { + fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> Result<(u64, bool)> { Ok(match self.statehash_shortstatehash.get(state_hash)? { Some(shortstatehash) => ( utils::u64_from_bytes(&shortstatehash) @@ -209,10 +203,7 @@ impl service::rooms::short::Data for KeyValueDatabase { .transpose() } - fn get_or_create_shortroomid( - &self, - room_id: &RoomId, - ) -> Result<u64> { + fn get_or_create_shortroomid(&self, room_id: &RoomId) -> Result<u64> { Ok(match self.roomid_shortroomid.get(room_id.as_bytes())? { Some(short) => utils::u64_from_bytes(&short) .map_err(|_| Error::bad_database("Invalid shortroomid in db."))?, diff --git a/src/database/key_value/rooms/state.rs b/src/database/key_value/rooms/state.rs index 90ac0d5..80a7458 100644 --- a/src/database/key_value/rooms/state.rs +++ b/src/database/key_value/rooms/state.rs @@ -1,10 +1,10 @@ -use ruma::{RoomId, EventId}; -use tokio::sync::MutexGuard; -use std::sync::Arc; +use ruma::{EventId, RoomId}; use std::collections::HashSet; use std::fmt::Debug; +use std::sync::Arc; +use tokio::sync::MutexGuard; -use crate::{service, database::KeyValueDatabase, utils, Error, Result}; +use crate::{database::KeyValueDatabase, service, utils, Error, Result}; impl service::rooms::state::Data for KeyValueDatabase { fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> { @@ -17,9 +17,12 @@ impl service::rooms::state::Data for KeyValueDatabase { }) } - fn set_room_state(&self, room_id: &RoomId, new_shortstatehash: u64, + fn set_room_state( + &self, + room_id: &RoomId, + new_shortstatehash: u64, _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex - ) -> Result<()> { + ) -> Result<()> { self.roomid_shortstatehash .insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?; Ok(()) diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 4d5bd4a..39c261f 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -1,13 +1,18 @@ -use std::{collections::{BTreeMap, HashMap}, sync::Arc}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; -use crate::{database::KeyValueDatabase, service, PduEvent, Error, utils, Result, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; -use ruma::{EventId, events::StateEventType, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId}; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> { - let full_state = services().rooms.state_compressor + let full_state = services() + .rooms + .state_compressor .load_shortstatehash_info(shortstatehash)? .pop() .expect("there is always one layer") @@ -15,7 +20,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { let mut result = BTreeMap::new(); let mut i = 0; for compressed in full_state.into_iter() { - let parsed = services().rooms.state_compressor.parse_compressed_state_event(compressed)?; + let parsed = services() + .rooms + .state_compressor + .parse_compressed_state_event(compressed)?; result.insert(parsed.0, parsed.1); i += 1; @@ -30,7 +38,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { &self, shortstatehash: u64, ) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> { - let full_state = services().rooms.state_compressor + let full_state = services() + .rooms + .state_compressor .load_shortstatehash_info(shortstatehash)? .pop() .expect("there is always one layer") @@ -39,7 +49,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { let mut result = HashMap::new(); let mut i = 0; for compressed in full_state { - let (_, eventid) = services().rooms.state_compressor.parse_compressed_state_event(compressed)?; + let (_, eventid) = services() + .rooms + .state_compressor + .parse_compressed_state_event(compressed)?; if let Some(pdu) = services().rooms.timeline.get_pdu(&eventid)? { result.insert( ( @@ -69,11 +82,17 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { event_type: &StateEventType, state_key: &str, ) -> Result<Option<Arc<EventId>>> { - let shortstatekey = match services().rooms.short.get_shortstatekey(event_type, state_key)? { + let shortstatekey = match services() + .rooms + .short + .get_shortstatekey(event_type, state_key)? + { Some(s) => s, None => return Ok(None), }; - let full_state = services().rooms.state_compressor + let full_state = services() + .rooms + .state_compressor .load_shortstatehash_info(shortstatehash)? .pop() .expect("there is always one layer") @@ -82,7 +101,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { .into_iter() .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) .and_then(|compressed| { - services().rooms.state_compressor.parse_compressed_state_event(compressed) + services() + .rooms + .state_compressor + .parse_compressed_state_event(compressed) .ok() .map(|(_, id)| id) })) @@ -96,7 +118,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { state_key: &str, ) -> Result<Option<Arc<PduEvent>>> { self.state_get_id(shortstatehash, event_type, state_key)? - .map_or(Ok(None), |event_id| services().rooms.timeline.get_pdu(&event_id)) + .map_or(Ok(None), |event_id| { + services().rooms.timeline.get_pdu(&event_id) + }) } /// Returns the state hash for this pdu. @@ -122,7 +146,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { &self, room_id: &RoomId, ) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> { - if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? { + if let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + { self.state_full(current_shortstatehash).await } else { Ok(HashMap::new()) @@ -136,7 +162,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { event_type: &StateEventType, state_key: &str, ) -> Result<Option<Arc<EventId>>> { - if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? { + if let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + { self.state_get_id(current_shortstatehash, event_type, state_key) } else { Ok(None) @@ -150,7 +178,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { event_type: &StateEventType, state_key: &str, ) -> Result<Option<Arc<PduEvent>>> { - if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? { + if let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + { self.state_get(current_shortstatehash, event_type, state_key) } else { Ok(None) diff --git a/src/database/key_value/rooms/state_cache.rs b/src/database/key_value/rooms/state_cache.rs index 4043bc4..4ca6ac4 100644 --- a/src/database/key_value/rooms/state_cache.rs +++ b/src/database/key_value/rooms/state_cache.rs @@ -1,9 +1,13 @@ use std::{collections::HashSet, sync::Arc}; use regex::Regex; -use ruma::{UserId, RoomId, events::{AnyStrippedStateEvent, AnySyncStateEvent}, serde::Raw, ServerName}; +use ruma::{ + events::{AnyStrippedStateEvent, AnySyncStateEvent}, + serde::Raw, + RoomId, ServerName, UserId, +}; -use crate::{service, database::KeyValueDatabase, services, Result, Error, utils}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::state_cache::Data for KeyValueDatabase { fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { @@ -31,8 +35,13 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { Ok(()) } - - fn mark_as_invited(&self, user_id: &UserId, room_id: &RoomId, last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>) -> Result<()> { + + fn mark_as_invited( + &self, + user_id: &UserId, + room_id: &RoomId, + last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>, + ) -> Result<()> { let mut roomuser_id = room_id.as_bytes().to_vec(); roomuser_id.push(0xff); roomuser_id.extend_from_slice(user_id.as_bytes()); @@ -46,8 +55,10 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { &serde_json::to_vec(&last_state.unwrap_or_default()) .expect("state to bytes always works"), )?; - self.roomuserid_invitecount - .insert(&roomuser_id, &services().globals.next_count()?.to_be_bytes())?; + self.roomuserid_invitecount.insert( + &roomuser_id, + &services().globals.next_count()?.to_be_bytes(), + )?; self.userroomid_joined.remove(&userroom_id)?; self.roomuserid_joined.remove(&roomuser_id)?; self.userroomid_leftstate.remove(&userroom_id)?; @@ -69,8 +80,10 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { &userroom_id, &serde_json::to_vec(&Vec::<Raw<AnySyncStateEvent>>::new()).unwrap(), )?; // TODO - self.roomuserid_leftcount - .insert(&roomuser_id, &services().globals.next_count()?.to_be_bytes())?; + self.roomuserid_leftcount.insert( + &roomuser_id, + &services().globals.next_count()?.to_be_bytes(), + )?; self.userroomid_joined.remove(&userroom_id)?; self.roomuserid_joined.remove(&roomuser_id)?; self.userroomid_invitestate.remove(&userroom_id)?; @@ -324,21 +337,25 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.roomuseroncejoinedids - .scan_prefix(prefix) - .map(|(key, _)| { - UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.roomuseroncejoinedids + .scan_prefix(prefix) + .map(|(key, _)| { + UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database( + "User ID in room_useroncejoined is invalid unicode.", + ) + })?, ) - .map_err(|_| { - Error::bad_database("User ID in room_useroncejoined is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("User ID in room_useroncejoined is invalid.")) - })) + .map_err(|_| Error::bad_database("User ID in room_useroncejoined is invalid.")) + }), + ) } /// Returns an iterator over all invited members of a room. @@ -350,21 +367,23 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.roomuserid_invitecount - .scan_prefix(prefix) - .map(|(key, _)| { - UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.roomuserid_invitecount + .scan_prefix(prefix) + .map(|(key, _)| { + UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("User ID in roomuserid_invited is invalid unicode.") + })?, ) - .map_err(|_| { - Error::bad_database("User ID in roomuserid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("User ID in roomuserid_invited is invalid.")) - })) + .map_err(|_| Error::bad_database("User ID in roomuserid_invited is invalid.")) + }), + ) } #[tracing::instrument(skip(self))] @@ -403,21 +422,23 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { &'a self, user_id: &UserId, ) -> Box<dyn Iterator<Item = Result<Box<RoomId>>> + 'a> { - Box::new(self.userroomid_joined - .scan_prefix(user_id.as_bytes().to_vec()) - .map(|(key, _)| { - RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.userroomid_joined + .scan_prefix(user_id.as_bytes().to_vec()) + .map(|(key, _)| { + RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_joined is invalid unicode.") + })?, ) - .map_err(|_| { - Error::bad_database("Room ID in userroomid_joined is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_joined is invalid.")) - })) + .map_err(|_| Error::bad_database("Room ID in userroomid_joined is invalid.")) + }), + ) } /// Returns an iterator over all rooms a user was invited to. @@ -429,26 +450,31 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.userroomid_invitestate - .scan_prefix(prefix) - .map(|(key, state)| { - let room_id = RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.userroomid_invitestate + .scan_prefix(prefix) + .map(|(key, state)| { + let room_id = RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_invited is invalid unicode.") + })?, ) .map_err(|_| { - Error::bad_database("Room ID in userroomid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; + Error::bad_database("Room ID in userroomid_invited is invalid.") + })?; - let state = serde_json::from_slice(&state) - .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?; + let state = serde_json::from_slice(&state).map_err(|_| { + Error::bad_database("Invalid state in userroomid_invitestate.") + })?; - Ok((room_id, state)) - })) + Ok((room_id, state)) + }), + ) } #[tracing::instrument(skip(self))] @@ -502,26 +528,31 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.userroomid_leftstate - .scan_prefix(prefix) - .map(|(key, state)| { - let room_id = RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.userroomid_leftstate + .scan_prefix(prefix) + .map(|(key, state)| { + let room_id = RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_invited is invalid unicode.") + })?, ) .map_err(|_| { - Error::bad_database("Room ID in userroomid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; + Error::bad_database("Room ID in userroomid_invited is invalid.") + })?; - let state = serde_json::from_slice(&state) - .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?; + let state = serde_json::from_slice(&state).map_err(|_| { + Error::bad_database("Invalid state in userroomid_leftstate.") + })?; - Ok((room_id, state)) - })) + Ok((room_id, state)) + }), + ) } #[tracing::instrument(skip(self))] diff --git a/src/database/key_value/rooms/state_compressor.rs b/src/database/key_value/rooms/state_compressor.rs index aee1890..d0a9be4 100644 --- a/src/database/key_value/rooms/state_compressor.rs +++ b/src/database/key_value/rooms/state_compressor.rs @@ -1,6 +1,10 @@ use std::{collections::HashSet, mem::size_of}; -use crate::{service::{self, rooms::state_compressor::data::StateDiff}, database::KeyValueDatabase, Error, utils, Result}; +use crate::{ + database::KeyValueDatabase, + service::{self, rooms::state_compressor::data::StateDiff}, + utils, Error, Result, +}; impl service::rooms::state_compressor::Data for KeyValueDatabase { fn get_statediff(&self, shortstatehash: u64) -> Result<StateDiff> { @@ -10,11 +14,7 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase { .ok_or_else(|| Error::bad_database("State hash does not exist"))?; let parent = utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length"); - let parent = if parent != 0 { - Some(parent) - } else { - None - }; + let parent = if parent != 0 { Some(parent) } else { None }; let mut add_mode = true; let mut added = HashSet::new(); @@ -35,7 +35,11 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase { i += 2 * size_of::<u64>(); } - Ok(StateDiff { parent, added, removed }) + Ok(StateDiff { + parent, + added, + removed, + }) } fn save_statediff(&self, shortstatehash: u64, diff: StateDiff) -> Result<()> { diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 1723186..5d684a1 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -1,13 +1,17 @@ use std::{collections::hash_map, mem::size_of, sync::Arc}; -use ruma::{UserId, RoomId, api::client::error::ErrorKind, EventId, signatures::CanonicalJsonObject}; +use ruma::{ + api::client::error::ErrorKind, signatures::CanonicalJsonObject, EventId, RoomId, UserId, +}; use tracing::error; -use crate::{service, database::KeyValueDatabase, utils, Error, PduEvent, Result, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; impl service::rooms::timeline::Data for KeyValueDatabase { fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> { - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -82,10 +86,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { } /// Returns the json of a pdu. - fn get_non_outlier_pdu_json( - &self, - event_id: &EventId, - ) -> Result<Option<CanonicalJsonObject>> { + fn get_non_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> { self.eventid_pduid .get(event_id.as_bytes())? .map(|pduid| { @@ -187,10 +188,17 @@ impl service::rooms::timeline::Data for KeyValueDatabase { .map_err(|_| Error::bad_database("PDU has invalid count bytes.")) } - fn append_pdu(&self, pdu_id: &[u8], pdu: &PduEvent, json: &CanonicalJsonObject, count: u64) -> Result<()> { + fn append_pdu( + &self, + pdu_id: &[u8], + pdu: &PduEvent, + json: &CanonicalJsonObject, + count: u64, + ) -> Result<()> { self.pduid_pdu.insert( pdu_id, - &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"))?; + &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"), + )?; self.lasttimelinecount_cache .lock() @@ -209,7 +217,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase { if self.pduid_pdu.get(pdu_id)?.is_some() { self.pduid_pdu.insert( pdu_id, - &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"))?; + &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"), + )?; Ok(()) } else { Err(Error::BadRequest( @@ -227,7 +236,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase { room_id: &RoomId, since: u64, ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -239,18 +250,19 @@ impl service::rooms::timeline::Data for KeyValueDatabase { let user_id = user_id.to_owned(); - Ok(Box::new(self - .pduid_pdu - .iter_from(&first_pdu_id, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::<PduEvent>(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - }))) + Ok(Box::new( + self.pduid_pdu + .iter_from(&first_pdu_id, false) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pdu_id, v)| { + let mut pdu = serde_json::from_slice::<PduEvent>(&v) + .map_err(|_| Error::bad_database("PDU in db is invalid."))?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + Ok((pdu_id, pdu)) + }), + )) } /// Returns an iterator over all events and their tokens in a room that happened before the @@ -262,7 +274,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase { until: u64, ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { // Create the first part of the full pdu id - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -275,18 +289,19 @@ impl service::rooms::timeline::Data for KeyValueDatabase { let user_id = user_id.to_owned(); - Ok(Box::new(self - .pduid_pdu - .iter_from(current, true) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::<PduEvent>(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - }))) + Ok(Box::new( + self.pduid_pdu + .iter_from(current, true) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pdu_id, v)| { + let mut pdu = serde_json::from_slice::<PduEvent>(&v) + .map_err(|_| Error::bad_database("PDU in db is invalid."))?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + Ok((pdu_id, pdu)) + }), + )) } fn pdus_after<'a>( @@ -296,7 +311,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase { from: u64, ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { // Create the first part of the full pdu id - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -309,21 +326,27 @@ impl service::rooms::timeline::Data for KeyValueDatabase { let user_id = user_id.to_owned(); - Ok(Box::new(self - .pduid_pdu - .iter_from(current, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::<PduEvent>(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - }))) + Ok(Box::new( + self.pduid_pdu + .iter_from(current, false) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pdu_id, v)| { + let mut pdu = serde_json::from_slice::<PduEvent>(&v) + .map_err(|_| Error::bad_database("PDU in db is invalid."))?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + Ok((pdu_id, pdu)) + }), + )) } - fn increment_notification_counts(&self, room_id: &RoomId, notifies: Vec<Box<UserId>>, highlights: Vec<Box<UserId>>) -> Result<()> { + fn increment_notification_counts( + &self, + room_id: &RoomId, + notifies: Vec<Box<UserId>>, + highlights: Vec<Box<UserId>>, + ) -> Result<()> { let notifies_batch = Vec::new(); let highlights_batch = Vec::new(); for user in notifies { diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs index 3759bda..78c78e1 100644 --- a/src/database/key_value/rooms/user.rs +++ b/src/database/key_value/rooms/user.rs @@ -1,6 +1,6 @@ -use ruma::{UserId, RoomId}; +use ruma::{RoomId, UserId}; -use crate::{service, database::KeyValueDatabase, utils, Error, Result, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::user::Data for KeyValueDatabase { fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { @@ -50,7 +50,11 @@ impl service::rooms::user::Data for KeyValueDatabase { token: u64, shortstatehash: u64, ) -> Result<()> { - let shortroomid = services().rooms.short.get_shortroomid(room_id)?.expect("room exists"); + let shortroomid = services() + .rooms + .short + .get_shortroomid(room_id)? + .expect("room exists"); let mut key = shortroomid.to_be_bytes().to_vec(); key.extend_from_slice(&token.to_be_bytes()); @@ -60,7 +64,11 @@ impl service::rooms::user::Data for KeyValueDatabase { } fn get_token_shortstatehash(&self, room_id: &RoomId, token: u64) -> Result<Option<u64>> { - let shortroomid = services().rooms.short.get_shortroomid(room_id)?.expect("room exists"); + let shortroomid = services() + .rooms + .short + .get_shortroomid(room_id)? + .expect("room exists"); let mut key = shortroomid.to_be_bytes().to_vec(); key.extend_from_slice(&token.to_be_bytes()); @@ -102,13 +110,15 @@ impl service::rooms::user::Data for KeyValueDatabase { }); // We use the default compare function because keys are sorted correctly (not reversed) - Ok(Box::new(Box::new(utils::common_elements(iterators, Ord::cmp) - .expect("users is not empty") - .map(|bytes| { - RoomId::parse(utils::string_from_bytes(&*bytes).map_err(|_| { - Error::bad_database("Invalid RoomId bytes in userroomid_joined") - })?) - .map_err(|_| Error::bad_database("Invalid RoomId in userroomid_joined.")) - })))) + Ok(Box::new(Box::new( + utils::common_elements(iterators, Ord::cmp) + .expect("users is not empty") + .map(|bytes| { + RoomId::parse(utils::string_from_bytes(&*bytes).map_err(|_| { + Error::bad_database("Invalid RoomId bytes in userroomid_joined") + })?) + .map_err(|_| Error::bad_database("Invalid RoomId in userroomid_joined.")) + }), + ))) } } diff --git a/src/database/key_value/transaction_ids.rs b/src/database/key_value/transaction_ids.rs index a63b3c5..2ea6ad4 100644 --- a/src/database/key_value/transaction_ids.rs +++ b/src/database/key_value/transaction_ids.rs @@ -1,6 +1,6 @@ -use ruma::{UserId, DeviceId, TransactionId}; +use ruma::{DeviceId, TransactionId, UserId}; -use crate::{service, database::KeyValueDatabase, Result}; +use crate::{database::KeyValueDatabase, service, Result}; impl service::transaction_ids::Data for KeyValueDatabase { fn add_txnid( diff --git a/src/database/key_value/uiaa.rs b/src/database/key_value/uiaa.rs index cf242de..8a9f176 100644 --- a/src/database/key_value/uiaa.rs +++ b/src/database/key_value/uiaa.rs @@ -1,4 +1,8 @@ -use ruma::{UserId, DeviceId, signatures::CanonicalJsonValue, api::client::{uiaa::UiaaInfo, error::ErrorKind}}; +use ruma::{ + api::client::{error::ErrorKind, uiaa::UiaaInfo}, + signatures::CanonicalJsonValue, + DeviceId, UserId, +}; use crate::{database::KeyValueDatabase, service, Error, Result}; diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs index 55a518d..15699a1 100644 --- a/src/database/key_value/users.rs +++ b/src/database/key_value/users.rs @@ -1,9 +1,20 @@ -use std::{mem::size_of, collections::BTreeMap}; - -use ruma::{api::client::{filter::IncomingFilterDefinition, error::ErrorKind, device::Device}, UserId, RoomAliasId, MxcUri, DeviceId, MilliSecondsSinceUnixEpoch, DeviceKeyId, encryption::{OneTimeKey, CrossSigningKey, DeviceKeys}, serde::Raw, events::{AnyToDeviceEvent, StateEventType}, DeviceKeyAlgorithm, UInt}; +use std::{collections::BTreeMap, mem::size_of}; + +use ruma::{ + api::client::{device::Device, error::ErrorKind, filter::IncomingFilterDefinition}, + encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, + events::{AnyToDeviceEvent, StateEventType}, + serde::Raw, + DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, MxcUri, RoomAliasId, + UInt, UserId, +}; use tracing::warn; -use crate::{service::{self, users::clean_signatures}, database::KeyValueDatabase, Error, utils, services, Result}; +use crate::{ + database::KeyValueDatabase, + service::{self, users::clean_signatures}, + services, utils, Error, Result, +}; impl service::users::Data for KeyValueDatabase { /// Check if a user has an account on this homeserver. @@ -274,18 +285,21 @@ impl service::users::Data for KeyValueDatabase { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); // All devices have metadata - Box::new(self.userdeviceid_metadata - .scan_prefix(prefix) - .map(|(bytes, _)| { - Ok(utils::string_from_bytes( - bytes - .rsplit(|&b| b == 0xff) - .next() - .ok_or_else(|| Error::bad_database("UserDevice ID in db is invalid."))?, - ) - .map_err(|_| Error::bad_database("Device ID in userdeviceid_metadata is invalid."))? - .into()) - })) + Box::new( + self.userdeviceid_metadata + .scan_prefix(prefix) + .map(|(bytes, _)| { + Ok(utils::string_from_bytes( + bytes.rsplit(|&b| b == 0xff).next().ok_or_else(|| { + Error::bad_database("UserDevice ID in db is invalid.") + })?, + ) + .map_err(|_| { + Error::bad_database("Device ID in userdeviceid_metadata is invalid.") + })? + .into()) + }), + ) } /// Replaces the access token of one device. @@ -341,8 +355,10 @@ impl service::users::Data for KeyValueDatabase { &serde_json::to_vec(&one_time_key_value).expect("OneTimeKey::to_vec always works"), )?; - self.userid_lastonetimekeyupdate - .insert(user_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?; + self.userid_lastonetimekeyupdate.insert( + user_id.as_bytes(), + &services().globals.next_count()?.to_be_bytes(), + )?; Ok(()) } @@ -372,8 +388,10 @@ impl service::users::Data for KeyValueDatabase { prefix.extend_from_slice(key_algorithm.as_ref().as_bytes()); prefix.push(b':'); - self.userid_lastonetimekeyupdate - .insert(user_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?; + self.userid_lastonetimekeyupdate.insert( + user_id.as_bytes(), + &services().globals.next_count()?.to_be_bytes(), + )?; self.onetimekeyid_onetimekeys .scan_prefix(prefix) @@ -617,38 +635,47 @@ impl service::users::Data for KeyValueDatabase { let to = to.unwrap_or(u64::MAX); - Box::new(self.keychangeid_userid - .iter_from(&start, false) - .take_while(move |(k, _)| { - k.starts_with(&prefix) - && if let Some(current) = k.splitn(2, |&b| b == 0xff).nth(1) { - if let Ok(c) = utils::u64_from_bytes(current) { - c <= to + Box::new( + self.keychangeid_userid + .iter_from(&start, false) + .take_while(move |(k, _)| { + k.starts_with(&prefix) + && if let Some(current) = k.splitn(2, |&b| b == 0xff).nth(1) { + if let Ok(c) = utils::u64_from_bytes(current) { + c <= to + } else { + warn!("BadDatabase: Could not parse keychangeid_userid bytes"); + false + } } else { - warn!("BadDatabase: Could not parse keychangeid_userid bytes"); + warn!("BadDatabase: Could not parse keychangeid_userid"); false } - } else { - warn!("BadDatabase: Could not parse keychangeid_userid"); - false - } - }) - .map(|(_, bytes)| { - UserId::parse(utils::string_from_bytes(&bytes).map_err(|_| { - Error::bad_database("User ID in devicekeychangeid_userid is invalid unicode.") - })?) - .map_err(|_| Error::bad_database("User ID in devicekeychangeid_userid is invalid.")) - })) + }) + .map(|(_, bytes)| { + UserId::parse(utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database( + "User ID in devicekeychangeid_userid is invalid unicode.", + ) + })?) + .map_err(|_| { + Error::bad_database("User ID in devicekeychangeid_userid is invalid.") + }) + }), + ) } - fn mark_device_key_update( - &self, - user_id: &UserId, - ) -> Result<()> { + fn mark_device_key_update(&self, user_id: &UserId) -> Result<()> { let count = services().globals.next_count()?.to_be_bytes(); - for room_id in services().rooms.state_cache.rooms_joined(user_id).filter_map(|r| r.ok()) { + for room_id in services() + .rooms + .state_cache + .rooms_joined(user_id) + .filter_map(|r| r.ok()) + { // Don't send key updates to unencrypted rooms - if services().rooms + if services() + .rooms .state_accessor .room_state_get(&room_id, &StateEventType::RoomEncryption, "")? .is_none() @@ -883,20 +910,19 @@ impl service::users::Data for KeyValueDatabase { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); - Box::new(self.userdeviceid_metadata - .scan_prefix(key) - .map(|(_, bytes)| { - serde_json::from_slice::<Device>(&bytes) - .map_err(|_| Error::bad_database("Device in userdeviceid_metadata is invalid.")) - })) + Box::new( + self.userdeviceid_metadata + .scan_prefix(key) + .map(|(_, bytes)| { + serde_json::from_slice::<Device>(&bytes).map_err(|_| { + Error::bad_database("Device in userdeviceid_metadata is invalid.") + }) + }), + ) } /// Creates a new sync filter. Returns the filter id. - fn create_filter( - &self, - user_id: &UserId, - filter: &IncomingFilterDefinition, - ) -> Result<String> { + fn create_filter(&self, user_id: &UserId, filter: &IncomingFilterDefinition) -> Result<String> { let filter_id = utils::random_string(4); let mut key = user_id.as_bytes().to_vec(); diff --git a/src/database/mod.rs b/src/database/mod.rs index 6868467..8a7c78e 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,8 +1,16 @@ pub mod abstraction; pub mod key_value; -use crate::{utils, Config, Error, Result, service::{users, globals, uiaa, rooms::{self, state_compressor::CompressedStateEvent}, account_data, media, key_backups, transaction_ids, sending, appservice, pusher}, services, PduEvent, Services, SERVICES}; +use crate::{ + service::{ + account_data, appservice, globals, key_backups, media, pusher, + rooms::{self, state_compressor::CompressedStateEvent}, + sending, transaction_ids, uiaa, users, + }, + services, utils, Config, Error, PduEvent, Result, Services, SERVICES, +}; use abstraction::KeyValueDatabaseEngine; +use abstraction::KvTree; use directories::ProjectDirs; use futures_util::{stream::FuturesUnordered, StreamExt}; use lru_cache::LruCache; @@ -12,7 +20,8 @@ use ruma::{ GlobalAccountDataEvent, GlobalAccountDataEventType, StateEventType, }, push::Ruleset, - DeviceId, EventId, RoomId, UserId, signatures::CanonicalJsonValue, + signatures::CanonicalJsonValue, + DeviceId, EventId, RoomId, UserId, }; use std::{ collections::{BTreeMap, HashMap, HashSet}, @@ -25,7 +34,6 @@ use std::{ }; use tokio::sync::{mpsc, OwnedRwLockReadGuard, RwLock as TokioRwLock, Semaphore}; use tracing::{debug, error, info, warn}; -use abstraction::KvTree; pub struct KeyValueDatabase { _db: Arc<dyn KeyValueDatabaseEngine>, @@ -65,9 +73,9 @@ pub struct KeyValueDatabase { pub(super) readreceiptid_readreceipt: Arc<dyn KvTree>, // ReadReceiptId = RoomId + Count + UserId pub(super) roomuserid_privateread: Arc<dyn KvTree>, // RoomUserId = Room + User, PrivateRead = Count pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count - pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count + pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count - pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId + pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId pub(super) userid_lastpresenceupdate: Arc<dyn KvTree>, // LastPresenceUpdate = Count //pub rooms: rooms::Rooms, @@ -279,127 +287,126 @@ impl KeyValueDatabase { let db = Arc::new(Self { _db: builder.clone(), - userid_password: builder.open_tree("userid_password")?, - userid_displayname: builder.open_tree("userid_displayname")?, - userid_avatarurl: builder.open_tree("userid_avatarurl")?, - userid_blurhash: builder.open_tree("userid_blurhash")?, - userdeviceid_token: builder.open_tree("userdeviceid_token")?, - userdeviceid_metadata: builder.open_tree("userdeviceid_metadata")?, - userid_devicelistversion: builder.open_tree("userid_devicelistversion")?, - token_userdeviceid: builder.open_tree("token_userdeviceid")?, - onetimekeyid_onetimekeys: builder.open_tree("onetimekeyid_onetimekeys")?, - userid_lastonetimekeyupdate: builder.open_tree("userid_lastonetimekeyupdate")?, - keychangeid_userid: builder.open_tree("keychangeid_userid")?, - keyid_key: builder.open_tree("keyid_key")?, - userid_masterkeyid: builder.open_tree("userid_masterkeyid")?, - userid_selfsigningkeyid: builder.open_tree("userid_selfsigningkeyid")?, - userid_usersigningkeyid: builder.open_tree("userid_usersigningkeyid")?, - userfilterid_filter: builder.open_tree("userfilterid_filter")?, - todeviceid_events: builder.open_tree("todeviceid_events")?, - - userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?, - userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()), - readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?, - roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt - roomuserid_lastprivatereadupdate: builder - .open_tree("roomuserid_lastprivatereadupdate")?, - typingid_userid: builder.open_tree("typingid_userid")?, - roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?, - presenceid_presence: builder.open_tree("presenceid_presence")?, - userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?, - pduid_pdu: builder.open_tree("pduid_pdu")?, - eventid_pduid: builder.open_tree("eventid_pduid")?, - roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, - - alias_roomid: builder.open_tree("alias_roomid")?, - aliasid_alias: builder.open_tree("aliasid_alias")?, - publicroomids: builder.open_tree("publicroomids")?, - - tokenids: builder.open_tree("tokenids")?, - - roomserverids: builder.open_tree("roomserverids")?, - serverroomids: builder.open_tree("serverroomids")?, - userroomid_joined: builder.open_tree("userroomid_joined")?, - roomuserid_joined: builder.open_tree("roomuserid_joined")?, - roomid_joinedcount: builder.open_tree("roomid_joinedcount")?, - roomid_invitedcount: builder.open_tree("roomid_invitedcount")?, - roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?, - userroomid_invitestate: builder.open_tree("userroomid_invitestate")?, - roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?, - userroomid_leftstate: builder.open_tree("userroomid_leftstate")?, - roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?, - - disabledroomids: builder.open_tree("disabledroomids")?, - - lazyloadedids: builder.open_tree("lazyloadedids")?, - - userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?, - userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?, - - statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?, - shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?, - - shorteventid_authchain: builder.open_tree("shorteventid_authchain")?, - - roomid_shortroomid: builder.open_tree("roomid_shortroomid")?, - - shortstatehash_statediff: builder.open_tree("shortstatehash_statediff")?, - eventid_shorteventid: builder.open_tree("eventid_shorteventid")?, - shorteventid_eventid: builder.open_tree("shorteventid_eventid")?, - shorteventid_shortstatehash: builder.open_tree("shorteventid_shortstatehash")?, - roomid_shortstatehash: builder.open_tree("roomid_shortstatehash")?, - roomsynctoken_shortstatehash: builder.open_tree("roomsynctoken_shortstatehash")?, - statehash_shortstatehash: builder.open_tree("statehash_shortstatehash")?, - - eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, - softfailedeventids: builder.open_tree("softfailedeventids")?, - - referencedevents: builder.open_tree("referencedevents")?, - roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, - roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?, - mediaid_file: builder.open_tree("mediaid_file")?, - backupid_algorithm: builder.open_tree("backupid_algorithm")?, - backupid_etag: builder.open_tree("backupid_etag")?, - backupkeyid_backup: builder.open_tree("backupkeyid_backup")?, - userdevicetxnid_response: builder.open_tree("userdevicetxnid_response")?, - servername_educount: builder.open_tree("servername_educount")?, - servernameevent_data: builder.open_tree("servernameevent_data")?, - servercurrentevent_data: builder.open_tree("servercurrentevent_data")?, - id_appserviceregistrations: builder.open_tree("id_appserviceregistrations")?, - senderkey_pusher: builder.open_tree("senderkey_pusher")?, - global: builder.open_tree("global")?, - server_signingkeys: builder.open_tree("server_signingkeys")?, - - cached_registrations: Arc::new(RwLock::new(HashMap::new())), - pdu_cache: Mutex::new(LruCache::new( - config - .pdu_cache_capacity - .try_into() - .expect("pdu cache capacity fits into usize"), - )), - auth_chain_cache: Mutex::new(LruCache::new( - (100_000.0 * config.conduit_cache_capacity_modifier) as usize, - )), - shorteventid_cache: Mutex::new(LruCache::new( - (100_000.0 * config.conduit_cache_capacity_modifier) as usize, - )), - eventidshort_cache: Mutex::new(LruCache::new( - (100_000.0 * config.conduit_cache_capacity_modifier) as usize, - )), - shortstatekey_cache: Mutex::new(LruCache::new( - (100_000.0 * config.conduit_cache_capacity_modifier) as usize, - )), - statekeyshort_cache: Mutex::new(LruCache::new( - (100_000.0 * config.conduit_cache_capacity_modifier) as usize, - )), - our_real_users_cache: RwLock::new(HashMap::new()), - appservice_in_room_cache: RwLock::new(HashMap::new()), - lazy_load_waiting: Mutex::new(HashMap::new()), - stateinfo_cache: Mutex::new(LruCache::new( - (100.0 * config.conduit_cache_capacity_modifier) as usize, - )), - lasttimelinecount_cache: Mutex::new(HashMap::new()), - + userid_password: builder.open_tree("userid_password")?, + userid_displayname: builder.open_tree("userid_displayname")?, + userid_avatarurl: builder.open_tree("userid_avatarurl")?, + userid_blurhash: builder.open_tree("userid_blurhash")?, + userdeviceid_token: builder.open_tree("userdeviceid_token")?, + userdeviceid_metadata: builder.open_tree("userdeviceid_metadata")?, + userid_devicelistversion: builder.open_tree("userid_devicelistversion")?, + token_userdeviceid: builder.open_tree("token_userdeviceid")?, + onetimekeyid_onetimekeys: builder.open_tree("onetimekeyid_onetimekeys")?, + userid_lastonetimekeyupdate: builder.open_tree("userid_lastonetimekeyupdate")?, + keychangeid_userid: builder.open_tree("keychangeid_userid")?, + keyid_key: builder.open_tree("keyid_key")?, + userid_masterkeyid: builder.open_tree("userid_masterkeyid")?, + userid_selfsigningkeyid: builder.open_tree("userid_selfsigningkeyid")?, + userid_usersigningkeyid: builder.open_tree("userid_usersigningkeyid")?, + userfilterid_filter: builder.open_tree("userfilterid_filter")?, + todeviceid_events: builder.open_tree("todeviceid_events")?, + + userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?, + userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()), + readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?, + roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt + roomuserid_lastprivatereadupdate: builder + .open_tree("roomuserid_lastprivatereadupdate")?, + typingid_userid: builder.open_tree("typingid_userid")?, + roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?, + presenceid_presence: builder.open_tree("presenceid_presence")?, + userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?, + pduid_pdu: builder.open_tree("pduid_pdu")?, + eventid_pduid: builder.open_tree("eventid_pduid")?, + roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, + + alias_roomid: builder.open_tree("alias_roomid")?, + aliasid_alias: builder.open_tree("aliasid_alias")?, + publicroomids: builder.open_tree("publicroomids")?, + + tokenids: builder.open_tree("tokenids")?, + + roomserverids: builder.open_tree("roomserverids")?, + serverroomids: builder.open_tree("serverroomids")?, + userroomid_joined: builder.open_tree("userroomid_joined")?, + roomuserid_joined: builder.open_tree("roomuserid_joined")?, + roomid_joinedcount: builder.open_tree("roomid_joinedcount")?, + roomid_invitedcount: builder.open_tree("roomid_invitedcount")?, + roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?, + userroomid_invitestate: builder.open_tree("userroomid_invitestate")?, + roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?, + userroomid_leftstate: builder.open_tree("userroomid_leftstate")?, + roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?, + + disabledroomids: builder.open_tree("disabledroomids")?, + + lazyloadedids: builder.open_tree("lazyloadedids")?, + + userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?, + userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?, + + statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?, + shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?, + + shorteventid_authchain: builder.open_tree("shorteventid_authchain")?, + + roomid_shortroomid: builder.open_tree("roomid_shortroomid")?, + + shortstatehash_statediff: builder.open_tree("shortstatehash_statediff")?, + eventid_shorteventid: builder.open_tree("eventid_shorteventid")?, + shorteventid_eventid: builder.open_tree("shorteventid_eventid")?, + shorteventid_shortstatehash: builder.open_tree("shorteventid_shortstatehash")?, + roomid_shortstatehash: builder.open_tree("roomid_shortstatehash")?, + roomsynctoken_shortstatehash: builder.open_tree("roomsynctoken_shortstatehash")?, + statehash_shortstatehash: builder.open_tree("statehash_shortstatehash")?, + + eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, + softfailedeventids: builder.open_tree("softfailedeventids")?, + + referencedevents: builder.open_tree("referencedevents")?, + roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, + roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?, + mediaid_file: builder.open_tree("mediaid_file")?, + backupid_algorithm: builder.open_tree("backupid_algorithm")?, + backupid_etag: builder.open_tree("backupid_etag")?, + backupkeyid_backup: builder.open_tree("backupkeyid_backup")?, + userdevicetxnid_response: builder.open_tree("userdevicetxnid_response")?, + servername_educount: builder.open_tree("servername_educount")?, + servernameevent_data: builder.open_tree("servernameevent_data")?, + servercurrentevent_data: builder.open_tree("servercurrentevent_data")?, + id_appserviceregistrations: builder.open_tree("id_appserviceregistrations")?, + senderkey_pusher: builder.open_tree("senderkey_pusher")?, + global: builder.open_tree("global")?, + server_signingkeys: builder.open_tree("server_signingkeys")?, + + cached_registrations: Arc::new(RwLock::new(HashMap::new())), + pdu_cache: Mutex::new(LruCache::new( + config + .pdu_cache_capacity + .try_into() + .expect("pdu cache capacity fits into usize"), + )), + auth_chain_cache: Mutex::new(LruCache::new( + (100_000.0 * config.conduit_cache_capacity_modifier) as usize, + )), + shorteventid_cache: Mutex::new(LruCache::new( + (100_000.0 * config.conduit_cache_capacity_modifier) as usize, + )), + eventidshort_cache: Mutex::new(LruCache::new( + (100_000.0 * config.conduit_cache_capacity_modifier) as usize, + )), + shortstatekey_cache: Mutex::new(LruCache::new( + (100_000.0 * config.conduit_cache_capacity_modifier) as usize, + )), + statekeyshort_cache: Mutex::new(LruCache::new( + (100_000.0 * config.conduit_cache_capacity_modifier) as usize, + )), + our_real_users_cache: RwLock::new(HashMap::new()), + appservice_in_room_cache: RwLock::new(HashMap::new()), + lazy_load_waiting: Mutex::new(HashMap::new()), + stateinfo_cache: Mutex::new(LruCache::new( + (100.0 * config.conduit_cache_capacity_modifier) as usize, + )), + lasttimelinecount_cache: Mutex::new(HashMap::new()), }); let services_raw = Box::new(Services::build(Arc::clone(&db), config)?); @@ -407,7 +414,6 @@ impl KeyValueDatabase { // This is the first and only time we initialize the SERVICE static *SERVICES.write().unwrap() = Some(Box::leak(services_raw)); - // Matrix resource ownership is based on the server name; changing it // requires recreating the database from scratch. if services().users.count()? > 0 { @@ -570,7 +576,10 @@ impl KeyValueDatabase { let states_parents = last_roomsstatehash.map_or_else( || Ok(Vec::new()), |&last_roomsstatehash| { - services().rooms.state_compressor.load_shortstatehash_info(dbg!(last_roomsstatehash)) + services() + .rooms + .state_compressor + .load_shortstatehash_info(dbg!(last_roomsstatehash)) }, )?; @@ -643,14 +652,15 @@ impl KeyValueDatabase { current_state = HashSet::new(); current_sstatehash = Some(sstatehash); - let event_id = db - .shorteventid_eventid - .get(&seventid) - .unwrap() - .unwrap(); + let event_id = db.shorteventid_eventid.get(&seventid).unwrap().unwrap(); let string = utils::string_from_bytes(&event_id).unwrap(); let event_id = <&EventId>::try_from(string.as_str()).unwrap(); - let pdu = services().rooms.timeline.get_pdu(event_id).unwrap().unwrap(); + let pdu = services() + .rooms + .timeline + .get_pdu(event_id) + .unwrap() + .unwrap(); if Some(&pdu.room_id) != current_room.as_ref() { current_room = Some(pdu.room_id.clone()); @@ -764,8 +774,7 @@ impl KeyValueDatabase { .peekable(); while iter.peek().is_some() { - db.tokenids - .insert_batch(&mut iter.by_ref().take(1000))?; + db.tokenids.insert_batch(&mut iter.by_ref().take(1000))?; println!("smaller batch done"); } @@ -803,8 +812,7 @@ impl KeyValueDatabase { // Force E2EE device list updates so we can send them over federation for user_id in services().users.iter().filter_map(|r| r.ok()) { - services().users - .mark_device_key_update(&user_id)?; + services().users.mark_device_key_update(&user_id)?; } services().globals.bump_database_version(10)?; @@ -825,7 +833,8 @@ impl KeyValueDatabase { info!( "Loaded {} database with version {}", - services().globals.config.database_backend, latest_database_version + services().globals.config.database_backend, + latest_database_version ); } else { services() @@ -837,7 +846,8 @@ impl KeyValueDatabase { warn!( "Created new {} database with version {}", - services().globals.config.database_backend, latest_database_version + services().globals.config.database_backend, + latest_database_version ); } @@ -862,9 +872,7 @@ impl KeyValueDatabase { } }; - services() - .sending - .start_handler(sending_receiver); + services().sending.start_handler(sending_receiver); Self::start_cleanup_task().await; @@ -898,7 +906,8 @@ impl KeyValueDatabase { use std::time::{Duration, Instant}; - let timer_interval = Duration::from_secs(services().globals.config.cleanup_second_interval as u64); + let timer_interval = + Duration::from_secs(services().globals.config.cleanup_second_interval as u64); tokio::spawn(async move { let mut i = interval(timer_interval); @@ -937,8 +946,10 @@ fn set_emergency_access() -> Result<bool> { let conduit_user = UserId::parse_with_server_name("conduit", services().globals.server_name()) .expect("@conduit:server_name is a valid UserId"); - services().users - .set_password(&conduit_user, services().globals.emergency_password().as_deref())?; + services().users.set_password( + &conduit_user, + services().globals.emergency_password().as_deref(), + )?; let (ruleset, res) = match services().globals.emergency_password() { Some(_) => (Ruleset::server_default(&conduit_user), Ok(true)), @@ -951,7 +962,8 @@ fn set_emergency_access() -> Result<bool> { GlobalAccountDataEventType::PushRules.to_string().into(), &serde_json::to_value(&GlobalAccountDataEvent { content: PushRulesEventContent { global: ruleset }, - }).expect("to json value always works"), + }) + .expect("to json value always works"), )?; res |