diff options
author | Timo Kösters <timo@koesters.xyz> | 2022-10-05 20:34:31 +0200 |
---|---|---|
committer | Nyaaori <+@nyaaori.cat> | 2022-10-10 14:02:01 +0200 |
commit | a4637e2ba1093065a6fda3fa2ad2b2b9f30eea63 (patch) | |
tree | 2d31313957d699875fc61f570686318b523ae0f1 /src/database/key_value | |
parent | 33a2b2b7729bb40253fd174d99ad773869b5ecfe (diff) | |
download | conduit-a4637e2ba1093065a6fda3fa2ad2b2b9f30eea63.zip |
cargo fmt
Diffstat (limited to 'src/database/key_value')
28 files changed, 531 insertions, 414 deletions
diff --git a/src/database/key_value/account_data.rs b/src/database/key_value/account_data.rs index 5674ac0..7d2a870 100644 --- a/src/database/key_value/account_data.rs +++ b/src/database/key_value/account_data.rs @@ -1,9 +1,15 @@ use std::collections::HashMap; -use ruma::{UserId, DeviceId, signatures::CanonicalJsonValue, api::client::{uiaa::UiaaInfo, error::ErrorKind}, events::{RoomAccountDataEventType, AnyEphemeralRoomEvent}, serde::Raw, RoomId}; -use serde::{Serialize, de::DeserializeOwned}; - -use crate::{Result, database::KeyValueDatabase, service, Error, utils, services}; +use ruma::{ + api::client::{error::ErrorKind, uiaa::UiaaInfo}, + events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, + serde::Raw, + signatures::CanonicalJsonValue, + DeviceId, RoomId, UserId, +}; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::account_data::Data for KeyValueDatabase { /// Places one event in the account data of the user and removes the previous entry. diff --git a/src/database/key_value/appservice.rs b/src/database/key_value/appservice.rs index f427ba7..9a821a6 100644 --- a/src/database/key_value/appservice.rs +++ b/src/database/key_value/appservice.rs @@ -55,10 +55,13 @@ impl service::appservice::Data for KeyValueDatabase { } fn iter_ids<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<String>> + 'a>> { - Ok(Box::new(self.id_appserviceregistrations.iter().map(|(id, _)| { - utils::string_from_bytes(&id) - .map_err(|_| Error::bad_database("Invalid id bytes in id_appserviceregistrations.")) - }))) + Ok(Box::new(self.id_appserviceregistrations.iter().map( + |(id, _)| { + utils::string_from_bytes(&id).map_err(|_| { + Error::bad_database("Invalid id bytes in id_appserviceregistrations.") + }) + }, + ))) } fn all(&self) -> Result<Vec<(String, serde_yaml::Value)>> { diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index 199cbf6..fafaf49 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -2,9 +2,13 @@ use std::collections::BTreeMap; use async_trait::async_trait; use futures_util::{stream::FuturesUnordered, StreamExt}; -use ruma::{signatures::Ed25519KeyPair, UserId, DeviceId, ServerName, api::federation::discovery::{ServerSigningKeys, VerifyKey}, ServerSigningKeyId, MilliSecondsSinceUnixEpoch}; +use ruma::{ + api::federation::discovery::{ServerSigningKeys, VerifyKey}, + signatures::Ed25519KeyPair, + DeviceId, MilliSecondsSinceUnixEpoch, ServerName, ServerSigningKeyId, UserId, +}; -use crate::{Result, service, database::KeyValueDatabase, Error, utils, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; pub const COUNTER: &[u8] = b"c"; @@ -35,28 +39,24 @@ impl service::globals::Data for KeyValueDatabase { // Return when *any* user changed his key // TODO: only send for user they share a room with - futures.push( - self.todeviceid_events - .watch_prefix(&userdeviceid_prefix), - ); + futures.push(self.todeviceid_events.watch_prefix(&userdeviceid_prefix)); futures.push(self.userroomid_joined.watch_prefix(&userid_prefix)); - futures.push( - self.userroomid_invitestate - .watch_prefix(&userid_prefix), - ); + futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix)); futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix)); futures.push( self.userroomid_notificationcount .watch_prefix(&userid_prefix), ); - futures.push( - self.userroomid_highlightcount - .watch_prefix(&userid_prefix), - ); + futures.push(self.userroomid_highlightcount.watch_prefix(&userid_prefix)); // Events for rooms we are in - for room_id in services().rooms.state_cache.rooms_joined(user_id).filter_map(|r| r.ok()) { + for room_id in services() + .rooms + .state_cache + .rooms_joined(user_id) + .filter_map(|r| r.ok()) + { let short_roomid = services() .rooms .short @@ -75,15 +75,9 @@ impl service::globals::Data for KeyValueDatabase { futures.push(self.pduid_pdu.watch_prefix(&short_roomid)); // EDUs - futures.push( - self.roomid_lasttypingupdate - .watch_prefix(&roomid_bytes), - ); + futures.push(self.roomid_lasttypingupdate.watch_prefix(&roomid_bytes)); - futures.push( - self.readreceiptid_readreceipt - .watch_prefix(&roomid_prefix), - ); + futures.push(self.readreceiptid_readreceipt.watch_prefix(&roomid_prefix)); // Key changes futures.push(self.keychangeid_userid.watch_prefix(&roomid_prefix)); @@ -110,10 +104,7 @@ impl service::globals::Data for KeyValueDatabase { futures.push(self.keychangeid_userid.watch_prefix(&userid_prefix)); // One time keys - futures.push( - self.userid_lastonetimekeyupdate - .watch_prefix(&userid_bytes), - ); + futures.push(self.userid_lastonetimekeyupdate.watch_prefix(&userid_bytes)); futures.push(Box::pin(services().globals.rotate.watch())); @@ -238,10 +229,7 @@ impl service::globals::Data for KeyValueDatabase { } fn bump_database_version(&self, new_version: u64) -> Result<()> { - self.global - .insert(b"version", &new_version.to_be_bytes())?; + self.global.insert(b"version", &new_version.to_be_bytes())?; Ok(()) } - - } diff --git a/src/database/key_value/key_backups.rs b/src/database/key_value/key_backups.rs index 8171451..0738f73 100644 --- a/src/database/key_value/key_backups.rs +++ b/src/database/key_value/key_backups.rs @@ -1,8 +1,15 @@ use std::collections::BTreeMap; -use ruma::{UserId, serde::Raw, api::client::{backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, error::ErrorKind}, RoomId}; +use ruma::{ + api::client::{ + backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, + error::ErrorKind, + }, + serde::Raw, + RoomId, UserId, +}; -use crate::{Result, service, database::KeyValueDatabase, services, Error, utils}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::key_backups::Data for KeyValueDatabase { fn create_backup( @@ -118,11 +125,7 @@ impl service::key_backups::Data for KeyValueDatabase { .transpose() } - fn get_backup( - &self, - user_id: &UserId, - version: &str, - ) -> Result<Option<Raw<BackupAlgorithm>>> { + fn get_backup(&self, user_id: &UserId, version: &str) -> Result<Option<Raw<BackupAlgorithm>>> { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(version.as_bytes()); @@ -322,12 +325,7 @@ impl service::key_backups::Data for KeyValueDatabase { Ok(()) } - fn delete_room_keys( - &self, - user_id: &UserId, - version: &str, - room_id: &RoomId, - ) -> Result<()> { + fn delete_room_keys(&self, user_id: &UserId, version: &str, room_id: &RoomId) -> Result<()> { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(version.as_bytes()); diff --git a/src/database/key_value/media.rs b/src/database/key_value/media.rs index f024487..de96ace 100644 --- a/src/database/key_value/media.rs +++ b/src/database/key_value/media.rs @@ -1,9 +1,16 @@ use ruma::api::client::error::ErrorKind; -use crate::{database::KeyValueDatabase, service, Error, utils, Result}; +use crate::{database::KeyValueDatabase, service, utils, Error, Result}; impl service::media::Data for KeyValueDatabase { - fn create_file_metadata(&self, mxc: String, width: u32, height: u32, content_disposition: Option<&str>, content_type: Option<&str>) -> Result<Vec<u8>> { + fn create_file_metadata( + &self, + mxc: String, + width: u32, + height: u32, + content_disposition: Option<&str>, + content_type: Option<&str>, + ) -> Result<Vec<u8>> { let mut key = mxc.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(&width.to_be_bytes()); @@ -28,14 +35,23 @@ impl service::media::Data for KeyValueDatabase { Ok(key) } - fn search_file_metadata(&self, mxc: String, width: u32, height: u32) -> Result<(Option<String>, Option<String>, Vec<u8>)> { + fn search_file_metadata( + &self, + mxc: String, + width: u32, + height: u32, + ) -> Result<(Option<String>, Option<String>, Vec<u8>)> { let mut prefix = mxc.as_bytes().to_vec(); prefix.push(0xff); prefix.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail prefix.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail prefix.push(0xff); - let (key, _) = self.mediaid_file.scan_prefix(prefix).next().ok_or(Error::BadRequest(ErrorKind::NotFound, "Media not found"))?; + let (key, _) = self + .mediaid_file + .scan_prefix(prefix) + .next() + .ok_or(Error::BadRequest(ErrorKind::NotFound, "Media not found"))?; let mut parts = key.rsplit(|&b| b == 0xff); @@ -57,9 +73,7 @@ impl service::media::Data for KeyValueDatabase { } else { Some( utils::string_from_bytes(content_disposition_bytes).map_err(|_| { - Error::bad_database( - "Content Disposition in mediaid_file is invalid unicode.", - ) + Error::bad_database("Content Disposition in mediaid_file is invalid unicode.") })?, ) }; diff --git a/src/database/key_value/pusher.rs b/src/database/key_value/pusher.rs index b05e47b..15f4e26 100644 --- a/src/database/key_value/pusher.rs +++ b/src/database/key_value/pusher.rs @@ -1,6 +1,9 @@ -use ruma::{UserId, api::client::push::{set_pusher, get_pushers}}; +use ruma::{ + api::client::push::{get_pushers, set_pusher}, + UserId, +}; -use crate::{service, database::KeyValueDatabase, Error, Result}; +use crate::{database::KeyValueDatabase, service, Error, Result}; impl service::pusher::Data for KeyValueDatabase { fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> { @@ -48,10 +51,7 @@ impl service::pusher::Data for KeyValueDatabase { .collect() } - fn get_pusher_senderkeys<'a>( - &'a self, - sender: &UserId, - ) -> Box<dyn Iterator<Item = Vec<u8>>> { + fn get_pusher_senderkeys<'a>(&'a self, sender: &UserId) -> Box<dyn Iterator<Item = Vec<u8>>> { let mut prefix = sender.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/key_value/rooms/alias.rs b/src/database/key_value/rooms/alias.rs index 0aa8dd4..112d6eb 100644 --- a/src/database/key_value/rooms/alias.rs +++ b/src/database/key_value/rooms/alias.rs @@ -1,13 +1,9 @@ -use ruma::{RoomId, RoomAliasId, api::client::error::ErrorKind}; +use ruma::{api::client::error::ErrorKind, RoomAliasId, RoomId}; -use crate::{service, database::KeyValueDatabase, utils, Error, services, Result}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::alias::Data for KeyValueDatabase { - fn set_alias( - &self, - alias: &RoomAliasId, - room_id: &RoomId - ) -> Result<()> { + fn set_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> Result<()> { self.alias_roomid .insert(alias.alias().as_bytes(), room_id.as_bytes())?; let mut aliasid = room_id.as_bytes().to_vec(); @@ -17,10 +13,7 @@ impl service::rooms::alias::Data for KeyValueDatabase { Ok(()) } - fn remove_alias( - &self, - alias: &RoomAliasId, - ) -> Result<()> { + fn remove_alias(&self, alias: &RoomAliasId) -> Result<()> { if let Some(room_id) = self.alias_roomid.get(alias.alias().as_bytes())? { let mut prefix = room_id.to_vec(); prefix.push(0xff); @@ -38,10 +31,7 @@ impl service::rooms::alias::Data for KeyValueDatabase { Ok(()) } - fn resolve_local_alias( - &self, - alias: &RoomAliasId - ) -> Result<Option<Box<RoomId>>> { + fn resolve_local_alias(&self, alias: &RoomAliasId) -> Result<Option<Box<RoomId>>> { self.alias_roomid .get(alias.alias().as_bytes())? .map(|bytes| { diff --git a/src/database/key_value/rooms/auth_chain.rs b/src/database/key_value/rooms/auth_chain.rs index 49d3956..60057ac 100644 --- a/src/database/key_value/rooms/auth_chain.rs +++ b/src/database/key_value/rooms/auth_chain.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, mem::size_of, sync::Arc}; -use crate::{service, database::KeyValueDatabase, Result, utils}; +use crate::{database::KeyValueDatabase, service, utils, Result}; impl service::rooms::auth_chain::Data for KeyValueDatabase { fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result<Option<Arc<HashSet<u64>>>> { @@ -12,14 +12,13 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase { // We only save auth chains for single events in the db if key.len() == 1 { // Check DB cache - let chain = self.shorteventid_authchain + let chain = self + .shorteventid_authchain .get(&key[0].to_be_bytes())? .map(|chain| { chain .chunks_exact(size_of::<u64>()) - .map(|chunk| { - utils::u64_from_bytes(chunk).expect("byte length is correct") - }) + .map(|chunk| utils::u64_from_bytes(chunk).expect("byte length is correct")) .collect() }); @@ -37,7 +36,6 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase { } Ok(None) - } fn cache_auth_chain(&self, key: Vec<u64>, auth_chain: Arc<HashSet<u64>>) -> Result<()> { @@ -53,7 +51,10 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase { } // Cache in RAM - self.auth_chain_cache.lock().unwrap().insert(key, auth_chain); + self.auth_chain_cache + .lock() + .unwrap() + .insert(key, auth_chain); Ok(()) } diff --git a/src/database/key_value/rooms/directory.rs b/src/database/key_value/rooms/directory.rs index 727004e..661c202 100644 --- a/src/database/key_value/rooms/directory.rs +++ b/src/database/key_value/rooms/directory.rs @@ -1,6 +1,6 @@ use ruma::RoomId; -use crate::{service, database::KeyValueDatabase, utils, Error, Result}; +use crate::{database::KeyValueDatabase, service, utils, Error, Result}; impl service::rooms::directory::Data for KeyValueDatabase { fn set_public(&self, room_id: &RoomId) -> Result<()> { diff --git a/src/database/key_value/rooms/edus/mod.rs b/src/database/key_value/rooms/edus/mod.rs index b5007f8..6c65291 100644 --- a/src/database/key_value/rooms/edus/mod.rs +++ b/src/database/key_value/rooms/edus/mod.rs @@ -1,7 +1,7 @@ mod presence; -mod typing; mod read_receipt; +mod typing; -use crate::{service, database::KeyValueDatabase}; +use crate::{database::KeyValueDatabase, service}; impl service::rooms::edus::Data for KeyValueDatabase {} diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs index 1477c28..fdd51ce 100644 --- a/src/database/key_value/rooms/edus/presence.rs +++ b/src/database/key_value/rooms/edus/presence.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -use ruma::{UserId, RoomId, events::presence::PresenceEvent, presence::PresenceState, UInt}; +use ruma::{events::presence::PresenceEvent, presence::PresenceState, RoomId, UInt, UserId}; -use crate::{service, database::KeyValueDatabase, utils, Error, services, Result}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::edus::presence::Data for KeyValueDatabase { fn update_presence( diff --git a/src/database/key_value/rooms/edus/read_receipt.rs b/src/database/key_value/rooms/edus/read_receipt.rs index a12e265..c78f0f5 100644 --- a/src/database/key_value/rooms/edus/read_receipt.rs +++ b/src/database/key_value/rooms/edus/read_receipt.rs @@ -1,8 +1,10 @@ use std::mem; -use ruma::{UserId, RoomId, events::receipt::ReceiptEvent, serde::Raw, signatures::CanonicalJsonObject}; +use ruma::{ + events::receipt::ReceiptEvent, serde::Raw, signatures::CanonicalJsonObject, RoomId, UserId, +}; -use crate::{database::KeyValueDatabase, service, utils, Error, services, Result}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { fn readreceipt_update( @@ -50,13 +52,15 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { &'a self, room_id: &RoomId, since: u64, - ) -> Box<dyn Iterator< - Item=Result<( - Box<UserId>, - u64, - Raw<ruma::events::AnySyncEphemeralRoomEvent>, - )>, - >> { + ) -> Box< + dyn Iterator< + Item = Result<( + Box<UserId>, + u64, + Raw<ruma::events::AnySyncEphemeralRoomEvent>, + )>, + >, + > { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); let prefix2 = prefix.clone(); @@ -64,42 +68,44 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { let mut first_possible_edu = prefix.clone(); first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since - Box::new(self.readreceiptid_readreceipt - .iter_from(&first_possible_edu, false) - .take_while(move |(k, _)| k.starts_with(&prefix2)) - .map(move |(k, v)| { - let count = - utils::u64_from_bytes(&k[prefix.len()..prefix.len() + mem::size_of::<u64>()]) - .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; - let user_id = UserId::parse( - utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..]) - .map_err(|_| { - Error::bad_database("Invalid readreceiptid userid bytes in db.") - })?, - ) + Box::new( + self.readreceiptid_readreceipt + .iter_from(&first_possible_edu, false) + .take_while(move |(k, _)| k.starts_with(&prefix2)) + .map(move |(k, v)| { + let count = utils::u64_from_bytes( + &k[prefix.len()..prefix.len() + mem::size_of::<u64>()], + ) + .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; + let user_id = UserId::parse( + utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..]) + .map_err(|_| { + Error::bad_database("Invalid readreceiptid userid bytes in db.") + })?, + ) .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; - let mut json = serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| { - Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") - })?; - json.remove("room_id"); - - Ok(( - user_id, - count, - Raw::from_json( - serde_json::value::to_raw_value(&json).expect("json is valid raw value"), - ), - )) - })) + let mut json = + serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| { + Error::bad_database( + "Read receipt in roomlatestid_roomlatest is invalid json.", + ) + })?; + json.remove("room_id"); + + Ok(( + user_id, + count, + Raw::from_json( + serde_json::value::to_raw_value(&json) + .expect("json is valid raw value"), + ), + )) + }), + ) } - fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()> { + fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { let mut key = room_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(user_id.as_bytes()); diff --git a/src/database/key_value/rooms/edus/typing.rs b/src/database/key_value/rooms/edus/typing.rs index b7d3596..7b211e7 100644 --- a/src/database/key_value/rooms/edus/typing.rs +++ b/src/database/key_value/rooms/edus/typing.rs @@ -1,16 +1,11 @@ use std::collections::HashSet; -use ruma::{UserId, RoomId}; +use ruma::{RoomId, UserId}; -use crate::{database::KeyValueDatabase, service, utils, Error, services, Result}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::edus::typing::Data for KeyValueDatabase { - fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()> { + fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -30,11 +25,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { Ok(()) } - fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()> { + fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -53,17 +44,16 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { } if found_outdated { - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?; + self.roomid_lasttypingupdate.insert( + room_id.as_bytes(), + &services().globals.next_count()?.to_be_bytes(), + )?; } Ok(()) } - fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result<u64> { + fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> { Ok(self .roomid_lasttypingupdate .get(room_id.as_bytes())? @@ -76,10 +66,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { .unwrap_or(0)) } - fn typings_all( - &self, - room_id: &RoomId, - ) -> Result<HashSet<Box<UserId>>> { + fn typings_all(&self, room_id: &RoomId) -> Result<HashSet<Box<UserId>>> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -89,7 +76,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| { Error::bad_database("User ID in typingid_userid is invalid unicode.") })?) - .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; + .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; user_ids.insert(user_id); } diff --git a/src/database/key_value/rooms/lazy_load.rs b/src/database/key_value/rooms/lazy_load.rs index 133e1d0..a19d52c 100644 --- a/src/database/key_value/rooms/lazy_load.rs +++ b/src/database/key_value/rooms/lazy_load.rs @@ -1,6 +1,6 @@ -use ruma::{UserId, DeviceId, RoomId}; +use ruma::{DeviceId, RoomId, UserId}; -use crate::{service, database::KeyValueDatabase, Result}; +use crate::{database::KeyValueDatabase, service, Result}; impl service::rooms::lazy_loading::Data for KeyValueDatabase { fn lazy_load_was_sent_before( diff --git a/src/database/key_value/rooms/metadata.rs b/src/database/key_value/rooms/metadata.rs index 72f6251..63a6b1a 100644 --- a/src/database/key_value/rooms/metadata.rs +++ b/src/database/key_value/rooms/metadata.rs @@ -1,6 +1,6 @@ use ruma::RoomId; -use crate::{service, database::KeyValueDatabase, Result, services}; +use crate::{database::KeyValueDatabase, service, services, Result}; impl service::rooms::metadata::Data for KeyValueDatabase { fn exists(&self, room_id: &RoomId) -> Result<bool> { diff --git a/src/database/key_value/rooms/outlier.rs b/src/database/key_value/rooms/outlier.rs index aa97544..2ecaadb 100644 --- a/src/database/key_value/rooms/outlier.rs +++ b/src/database/key_value/rooms/outlier.rs @@ -1,6 +1,6 @@ -use ruma::{EventId, signatures::CanonicalJsonObject}; +use ruma::{signatures::CanonicalJsonObject, EventId}; -use crate::{service, database::KeyValueDatabase, PduEvent, Error, Result}; +use crate::{database::KeyValueDatabase, service, Error, PduEvent, Result}; impl service::rooms::outlier::Data for KeyValueDatabase { fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> { diff --git a/src/database/key_value/rooms/pdu_metadata.rs b/src/database/key_value/rooms/pdu_metadata.rs index f3ac414..76ec734 100644 --- a/src/database/key_value/rooms/pdu_metadata.rs +++ b/src/database/key_value/rooms/pdu_metadata.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use ruma::{RoomId, EventId}; +use ruma::{EventId, RoomId}; -use crate::{service, database::KeyValueDatabase, Result}; +use crate::{database::KeyValueDatabase, service, Result}; impl service::rooms::pdu_metadata::Data for KeyValueDatabase { fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> { diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index 41df544..79e6a32 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -2,7 +2,7 @@ use std::mem::size_of; use ruma::RoomId; -use crate::{service, database::KeyValueDatabase, utils, Result, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Result}; impl service::rooms::search::Data for KeyValueDatabase { fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: String) -> Result<()> { @@ -27,7 +27,9 @@ impl service::rooms::search::Data for KeyValueDatabase { room_id: &RoomId, search_string: &str, ) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>>>, Vec<String>)>> { - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -63,10 +65,10 @@ impl service::rooms::search::Data for KeyValueDatabase { }; let mapped = common_elements.map(move |id| { - let mut pduid = prefix_clone.clone(); - pduid.extend_from_slice(&id); - pduid - }); + let mut pduid = prefix_clone.clone(); + pduid.extend_from_slice(&id); + pduid + }); Ok(Some((Box::new(mapped), words))) } diff --git a/src/database/key_value/rooms/short.rs b/src/database/key_value/rooms/short.rs index ecd12da..c022317 100644 --- a/src/database/key_value/rooms/short.rs +++ b/src/database/key_value/rooms/short.rs @@ -1,14 +1,11 @@ use std::sync::Arc; -use ruma::{EventId, events::StateEventType, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId}; -use crate::{Result, database::KeyValueDatabase, service, utils, Error, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::short::Data for KeyValueDatabase { - fn get_or_create_shorteventid( - &self, - event_id: &EventId, - ) -> Result<u64> { + fn get_or_create_shorteventid(&self, event_id: &EventId) -> Result<u64> { if let Some(short) = self.eventidshort_cache.lock().unwrap().get_mut(event_id) { return Ok(*short); } @@ -180,10 +177,7 @@ impl service::rooms::short::Data for KeyValueDatabase { } /// Returns (shortstatehash, already_existed) - fn get_or_create_shortstatehash( - &self, - state_hash: &[u8], - ) -> Result<(u64, bool)> { + fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> Result<(u64, bool)> { Ok(match self.statehash_shortstatehash.get(state_hash)? { Some(shortstatehash) => ( utils::u64_from_bytes(&shortstatehash) @@ -209,10 +203,7 @@ impl service::rooms::short::Data for KeyValueDatabase { .transpose() } - fn get_or_create_shortroomid( - &self, - room_id: &RoomId, - ) -> Result<u64> { + fn get_or_create_shortroomid(&self, room_id: &RoomId) -> Result<u64> { Ok(match self.roomid_shortroomid.get(room_id.as_bytes())? { Some(short) => utils::u64_from_bytes(&short) .map_err(|_| Error::bad_database("Invalid shortroomid in db."))?, diff --git a/src/database/key_value/rooms/state.rs b/src/database/key_value/rooms/state.rs index 90ac0d5..80a7458 100644 --- a/src/database/key_value/rooms/state.rs +++ b/src/database/key_value/rooms/state.rs @@ -1,10 +1,10 @@ -use ruma::{RoomId, EventId}; -use tokio::sync::MutexGuard; -use std::sync::Arc; +use ruma::{EventId, RoomId}; use std::collections::HashSet; use std::fmt::Debug; +use std::sync::Arc; +use tokio::sync::MutexGuard; -use crate::{service, database::KeyValueDatabase, utils, Error, Result}; +use crate::{database::KeyValueDatabase, service, utils, Error, Result}; impl service::rooms::state::Data for KeyValueDatabase { fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> { @@ -17,9 +17,12 @@ impl service::rooms::state::Data for KeyValueDatabase { }) } - fn set_room_state(&self, room_id: &RoomId, new_shortstatehash: u64, + fn set_room_state( + &self, + room_id: &RoomId, + new_shortstatehash: u64, _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex - ) -> Result<()> { + ) -> Result<()> { self.roomid_shortstatehash .insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?; Ok(()) diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 4d5bd4a..39c261f 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -1,13 +1,18 @@ -use std::{collections::{BTreeMap, HashMap}, sync::Arc}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; -use crate::{database::KeyValueDatabase, service, PduEvent, Error, utils, Result, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; -use ruma::{EventId, events::StateEventType, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId}; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> { - let full_state = services().rooms.state_compressor + let full_state = services() + .rooms + .state_compressor .load_shortstatehash_info(shortstatehash)? .pop() .expect("there is always one layer") @@ -15,7 +20,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { let mut result = BTreeMap::new(); let mut i = 0; for compressed in full_state.into_iter() { - let parsed = services().rooms.state_compressor.parse_compressed_state_event(compressed)?; + let parsed = services() + .rooms + .state_compressor + .parse_compressed_state_event(compressed)?; result.insert(parsed.0, parsed.1); i += 1; @@ -30,7 +38,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { &self, shortstatehash: u64, ) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> { - let full_state = services().rooms.state_compressor + let full_state = services() + .rooms + .state_compressor .load_shortstatehash_info(shortstatehash)? .pop() .expect("there is always one layer") @@ -39,7 +49,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { let mut result = HashMap::new(); let mut i = 0; for compressed in full_state { - let (_, eventid) = services().rooms.state_compressor.parse_compressed_state_event(compressed)?; + let (_, eventid) = services() + .rooms + .state_compressor + .parse_compressed_state_event(compressed)?; if let Some(pdu) = services().rooms.timeline.get_pdu(&eventid)? { result.insert( ( @@ -69,11 +82,17 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { event_type: &StateEventType, state_key: &str, ) -> Result<Option<Arc<EventId>>> { - let shortstatekey = match services().rooms.short.get_shortstatekey(event_type, state_key)? { + let shortstatekey = match services() + .rooms + .short + .get_shortstatekey(event_type, state_key)? + { Some(s) => s, None => return Ok(None), }; - let full_state = services().rooms.state_compressor + let full_state = services() + .rooms + .state_compressor .load_shortstatehash_info(shortstatehash)? .pop() .expect("there is always one layer") @@ -82,7 +101,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { .into_iter() .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) .and_then(|compressed| { - services().rooms.state_compressor.parse_compressed_state_event(compressed) + services() + .rooms + .state_compressor + .parse_compressed_state_event(compressed) .ok() .map(|(_, id)| id) })) @@ -96,7 +118,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { state_key: &str, ) -> Result<Option<Arc<PduEvent>>> { self.state_get_id(shortstatehash, event_type, state_key)? - .map_or(Ok(None), |event_id| services().rooms.timeline.get_pdu(&event_id)) + .map_or(Ok(None), |event_id| { + services().rooms.timeline.get_pdu(&event_id) + }) } /// Returns the state hash for this pdu. @@ -122,7 +146,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { &self, room_id: &RoomId, ) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> { - if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? { + if let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + { self.state_full(current_shortstatehash).await } else { Ok(HashMap::new()) @@ -136,7 +162,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { event_type: &StateEventType, state_key: &str, ) -> Result<Option<Arc<EventId>>> { - if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? { + if let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + { self.state_get_id(current_shortstatehash, event_type, state_key) } else { Ok(None) @@ -150,7 +178,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { event_type: &StateEventType, state_key: &str, ) -> Result<Option<Arc<PduEvent>>> { - if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? { + if let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + { self.state_get(current_shortstatehash, event_type, state_key) } else { Ok(None) diff --git a/src/database/key_value/rooms/state_cache.rs b/src/database/key_value/rooms/state_cache.rs index 4043bc4..4ca6ac4 100644 --- a/src/database/key_value/rooms/state_cache.rs +++ b/src/database/key_value/rooms/state_cache.rs @@ -1,9 +1,13 @@ use std::{collections::HashSet, sync::Arc}; use regex::Regex; -use ruma::{UserId, RoomId, events::{AnyStrippedStateEvent, AnySyncStateEvent}, serde::Raw, ServerName}; +use ruma::{ + events::{AnyStrippedStateEvent, AnySyncStateEvent}, + serde::Raw, + RoomId, ServerName, UserId, +}; -use crate::{service, database::KeyValueDatabase, services, Result, Error, utils}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::state_cache::Data for KeyValueDatabase { fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { @@ -31,8 +35,13 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { Ok(()) } - - fn mark_as_invited(&self, user_id: &UserId, room_id: &RoomId, last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>) -> Result<()> { + + fn mark_as_invited( + &self, + user_id: &UserId, + room_id: &RoomId, + last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>, + ) -> Result<()> { let mut roomuser_id = room_id.as_bytes().to_vec(); roomuser_id.push(0xff); roomuser_id.extend_from_slice(user_id.as_bytes()); @@ -46,8 +55,10 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { &serde_json::to_vec(&last_state.unwrap_or_default()) .expect("state to bytes always works"), )?; - self.roomuserid_invitecount - .insert(&roomuser_id, &services().globals.next_count()?.to_be_bytes())?; + self.roomuserid_invitecount.insert( + &roomuser_id, + &services().globals.next_count()?.to_be_bytes(), + )?; self.userroomid_joined.remove(&userroom_id)?; self.roomuserid_joined.remove(&roomuser_id)?; self.userroomid_leftstate.remove(&userroom_id)?; @@ -69,8 +80,10 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { &userroom_id, &serde_json::to_vec(&Vec::<Raw<AnySyncStateEvent>>::new()).unwrap(), )?; // TODO - self.roomuserid_leftcount - .insert(&roomuser_id, &services().globals.next_count()?.to_be_bytes())?; + self.roomuserid_leftcount.insert( + &roomuser_id, + &services().globals.next_count()?.to_be_bytes(), + )?; self.userroomid_joined.remove(&userroom_id)?; self.roomuserid_joined.remove(&roomuser_id)?; self.userroomid_invitestate.remove(&userroom_id)?; @@ -324,21 +337,25 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.roomuseroncejoinedids - .scan_prefix(prefix) - .map(|(key, _)| { - UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.roomuseroncejoinedids + .scan_prefix(prefix) + .map(|(key, _)| { + UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database( + "User ID in room_useroncejoined is invalid unicode.", + ) + })?, ) - .map_err(|_| { - Error::bad_database("User ID in room_useroncejoined is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("User ID in room_useroncejoined is invalid.")) - })) + .map_err(|_| Error::bad_database("User ID in room_useroncejoined is invalid.")) + }), + ) } /// Returns an iterator over all invited members of a room. @@ -350,21 +367,23 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.roomuserid_invitecount - .scan_prefix(prefix) - .map(|(key, _)| { - UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.roomuserid_invitecount + .scan_prefix(prefix) + .map(|(key, _)| { + UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("User ID in roomuserid_invited is invalid unicode.") + })?, ) - .map_err(|_| { - Error::bad_database("User ID in roomuserid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("User ID in roomuserid_invited is invalid.")) - })) + .map_err(|_| Error::bad_database("User ID in roomuserid_invited is invalid.")) + }), + ) } #[tracing::instrument(skip(self))] @@ -403,21 +422,23 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { &'a self, user_id: &UserId, ) -> Box<dyn Iterator<Item = Result<Box<RoomId>>> + 'a> { - Box::new(self.userroomid_joined - .scan_prefix(user_id.as_bytes().to_vec()) - .map(|(key, _)| { - RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.userroomid_joined + .scan_prefix(user_id.as_bytes().to_vec()) + .map(|(key, _)| { + RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_joined is invalid unicode.") + })?, ) - .map_err(|_| { - Error::bad_database("Room ID in userroomid_joined is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_joined is invalid.")) - })) + .map_err(|_| Error::bad_database("Room ID in userroomid_joined is invalid.")) + }), + ) } /// Returns an iterator over all rooms a user was invited to. @@ -429,26 +450,31 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.userroomid_invitestate - .scan_prefix(prefix) - .map(|(key, state)| { - let room_id = RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.userroomid_invitestate + .scan_prefix(prefix) + .map(|(key, state)| { + let room_id = RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_invited is invalid unicode.") + })?, ) .map_err(|_| { - Error::bad_database("Room ID in userroomid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; + Error::bad_database("Room ID in userroomid_invited is invalid.") + })?; - let state = serde_json::from_slice(&state) - .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?; + let state = serde_json::from_slice(&state).map_err(|_| { + Error::bad_database("Invalid state in userroomid_invitestate.") + })?; - Ok((room_id, state)) - })) + Ok((room_id, state)) + }), + ) } #[tracing::instrument(skip(self))] @@ -502,26 +528,31 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); - Box::new(self.userroomid_leftstate - .scan_prefix(prefix) - .map(|(key, state)| { - let room_id = RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + Box::new( + self.userroomid_leftstate + .scan_prefix(prefix) + .map(|(key, state)| { + let room_id = RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_invited is invalid unicode.") + })?, ) .map_err(|_| { - Error::bad_database("Room ID in userroomid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; + Error::bad_database("Room ID in userroomid_invited is invalid.") + })?; - let state = serde_json::from_slice(&state) - .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?; + let state = serde_json::from_slice(&state).map_err(|_| { + Error::bad_database("Invalid state in userroomid_leftstate.") + })?; - Ok((room_id, state)) - })) + Ok((room_id, state)) + }), + ) } #[tracing::instrument(skip(self))] diff --git a/src/database/key_value/rooms/state_compressor.rs b/src/database/key_value/rooms/state_compressor.rs index aee1890..d0a9be4 100644 --- a/src/database/key_value/rooms/state_compressor.rs +++ b/src/database/key_value/rooms/state_compressor.rs @@ -1,6 +1,10 @@ use std::{collections::HashSet, mem::size_of}; -use crate::{service::{self, rooms::state_compressor::data::StateDiff}, database::KeyValueDatabase, Error, utils, Result}; +use crate::{ + database::KeyValueDatabase, + service::{self, rooms::state_compressor::data::StateDiff}, + utils, Error, Result, +}; impl service::rooms::state_compressor::Data for KeyValueDatabase { fn get_statediff(&self, shortstatehash: u64) -> Result<StateDiff> { @@ -10,11 +14,7 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase { .ok_or_else(|| Error::bad_database("State hash does not exist"))?; let parent = utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length"); - let parent = if parent != 0 { - Some(parent) - } else { - None - }; + let parent = if parent != 0 { Some(parent) } else { None }; let mut add_mode = true; let mut added = HashSet::new(); @@ -35,7 +35,11 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase { i += 2 * size_of::<u64>(); } - Ok(StateDiff { parent, added, removed }) + Ok(StateDiff { + parent, + added, + removed, + }) } fn save_statediff(&self, shortstatehash: u64, diff: StateDiff) -> Result<()> { diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 1723186..5d684a1 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -1,13 +1,17 @@ use std::{collections::hash_map, mem::size_of, sync::Arc}; -use ruma::{UserId, RoomId, api::client::error::ErrorKind, EventId, signatures::CanonicalJsonObject}; +use ruma::{ + api::client::error::ErrorKind, signatures::CanonicalJsonObject, EventId, RoomId, UserId, +}; use tracing::error; -use crate::{service, database::KeyValueDatabase, utils, Error, PduEvent, Result, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; impl service::rooms::timeline::Data for KeyValueDatabase { fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> { - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -82,10 +86,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { } /// Returns the json of a pdu. - fn get_non_outlier_pdu_json( - &self, - event_id: &EventId, - ) -> Result<Option<CanonicalJsonObject>> { + fn get_non_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> { self.eventid_pduid .get(event_id.as_bytes())? .map(|pduid| { @@ -187,10 +188,17 @@ impl service::rooms::timeline::Data for KeyValueDatabase { .map_err(|_| Error::bad_database("PDU has invalid count bytes.")) } - fn append_pdu(&self, pdu_id: &[u8], pdu: &PduEvent, json: &CanonicalJsonObject, count: u64) -> Result<()> { + fn append_pdu( + &self, + pdu_id: &[u8], + pdu: &PduEvent, + json: &CanonicalJsonObject, + count: u64, + ) -> Result<()> { self.pduid_pdu.insert( pdu_id, - &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"))?; + &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"), + )?; self.lasttimelinecount_cache .lock() @@ -209,7 +217,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase { if self.pduid_pdu.get(pdu_id)?.is_some() { self.pduid_pdu.insert( pdu_id, - &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"))?; + &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"), + )?; Ok(()) } else { Err(Error::BadRequest( @@ -227,7 +236,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase { room_id: &RoomId, since: u64, ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -239,18 +250,19 @@ impl service::rooms::timeline::Data for KeyValueDatabase { let user_id = user_id.to_owned(); - Ok(Box::new(self - .pduid_pdu - .iter_from(&first_pdu_id, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::<PduEvent>(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - }))) + Ok(Box::new( + self.pduid_pdu + .iter_from(&first_pdu_id, false) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pdu_id, v)| { + let mut pdu = serde_json::from_slice::<PduEvent>(&v) + .map_err(|_| Error::bad_database("PDU in db is invalid."))?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + Ok((pdu_id, pdu)) + }), + )) } /// Returns an iterator over all events and their tokens in a room that happened before the @@ -262,7 +274,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase { until: u64, ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { // Create the first part of the full pdu id - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -275,18 +289,19 @@ impl service::rooms::timeline::Data for KeyValueDatabase { let user_id = user_id.to_owned(); - Ok(Box::new(self - .pduid_pdu - .iter_from(current, true) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::<PduEvent>(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - }))) + Ok(Box::new( + self.pduid_pdu + .iter_from(current, true) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pdu_id, v)| { + let mut pdu = serde_json::from_slice::<PduEvent>(&v) + .map_err(|_| Error::bad_database("PDU in db is invalid."))?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + Ok((pdu_id, pdu)) + }), + )) } fn pdus_after<'a>( @@ -296,7 +311,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase { from: u64, ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> { // Create the first part of the full pdu id - let prefix = services().rooms.short + let prefix = services() + .rooms + .short .get_shortroomid(room_id)? .expect("room exists") .to_be_bytes() @@ -309,21 +326,27 @@ impl service::rooms::timeline::Data for KeyValueDatabase { let user_id = user_id.to_owned(); - Ok(Box::new(self - .pduid_pdu - .iter_from(current, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::<PduEvent>(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - }))) + Ok(Box::new( + self.pduid_pdu + .iter_from(current, false) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pdu_id, v)| { + let mut pdu = serde_json::from_slice::<PduEvent>(&v) + .map_err(|_| Error::bad_database("PDU in db is invalid."))?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + Ok((pdu_id, pdu)) + }), + )) } - fn increment_notification_counts(&self, room_id: &RoomId, notifies: Vec<Box<UserId>>, highlights: Vec<Box<UserId>>) -> Result<()> { + fn increment_notification_counts( + &self, + room_id: &RoomId, + notifies: Vec<Box<UserId>>, + highlights: Vec<Box<UserId>>, + ) -> Result<()> { let notifies_batch = Vec::new(); let highlights_batch = Vec::new(); for user in notifies { diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs index 3759bda..78c78e1 100644 --- a/src/database/key_value/rooms/user.rs +++ b/src/database/key_value/rooms/user.rs @@ -1,6 +1,6 @@ -use ruma::{UserId, RoomId}; +use ruma::{RoomId, UserId}; -use crate::{service, database::KeyValueDatabase, utils, Error, Result, services}; +use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::user::Data for KeyValueDatabase { fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { @@ -50,7 +50,11 @@ impl service::rooms::user::Data for KeyValueDatabase { token: u64, shortstatehash: u64, ) -> Result<()> { - let shortroomid = services().rooms.short.get_shortroomid(room_id)?.expect("room exists"); + let shortroomid = services() + .rooms + .short + .get_shortroomid(room_id)? + .expect("room exists"); let mut key = shortroomid.to_be_bytes().to_vec(); key.extend_from_slice(&token.to_be_bytes()); @@ -60,7 +64,11 @@ impl service::rooms::user::Data for KeyValueDatabase { } fn get_token_shortstatehash(&self, room_id: &RoomId, token: u64) -> Result<Option<u64>> { - let shortroomid = services().rooms.short.get_shortroomid(room_id)?.expect("room exists"); + let shortroomid = services() + .rooms + .short + .get_shortroomid(room_id)? + .expect("room exists"); let mut key = shortroomid.to_be_bytes().to_vec(); key.extend_from_slice(&token.to_be_bytes()); @@ -102,13 +110,15 @@ impl service::rooms::user::Data for KeyValueDatabase { }); // We use the default compare function because keys are sorted correctly (not reversed) - Ok(Box::new(Box::new(utils::common_elements(iterators, Ord::cmp) - .expect("users is not empty") - .map(|bytes| { - RoomId::parse(utils::string_from_bytes(&*bytes).map_err(|_| { - Error::bad_database("Invalid RoomId bytes in userroomid_joined") - })?) - .map_err(|_| Error::bad_database("Invalid RoomId in userroomid_joined.")) - })))) + Ok(Box::new(Box::new( + utils::common_elements(iterators, Ord::cmp) + .expect("users is not empty") + .map(|bytes| { + RoomId::parse(utils::string_from_bytes(&*bytes).map_err(|_| { + Error::bad_database("Invalid RoomId bytes in userroomid_joined") + })?) + .map_err(|_| Error::bad_database("Invalid RoomId in userroomid_joined.")) + }), + ))) } } diff --git a/src/database/key_value/transaction_ids.rs b/src/database/key_value/transaction_ids.rs index a63b3c5..2ea6ad4 100644 --- a/src/database/key_value/transaction_ids.rs +++ b/src/database/key_value/transaction_ids.rs @@ -1,6 +1,6 @@ -use ruma::{UserId, DeviceId, TransactionId}; +use ruma::{DeviceId, TransactionId, UserId}; -use crate::{service, database::KeyValueDatabase, Result}; +use crate::{database::KeyValueDatabase, service, Result}; impl service::transaction_ids::Data for KeyValueDatabase { fn add_txnid( diff --git a/src/database/key_value/uiaa.rs b/src/database/key_value/uiaa.rs index cf242de..8a9f176 100644 --- a/src/database/key_value/uiaa.rs +++ b/src/database/key_value/uiaa.rs @@ -1,4 +1,8 @@ -use ruma::{UserId, DeviceId, signatures::CanonicalJsonValue, api::client::{uiaa::UiaaInfo, error::ErrorKind}}; +use ruma::{ + api::client::{error::ErrorKind, uiaa::UiaaInfo}, + signatures::CanonicalJsonValue, + DeviceId, UserId, +}; use crate::{database::KeyValueDatabase, service, Error, Result}; diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs index 55a518d..15699a1 100644 --- a/src/database/key_value/users.rs +++ b/src/database/key_value/users.rs @@ -1,9 +1,20 @@ -use std::{mem::size_of, collections::BTreeMap}; - -use ruma::{api::client::{filter::IncomingFilterDefinition, error::ErrorKind, device::Device}, UserId, RoomAliasId, MxcUri, DeviceId, MilliSecondsSinceUnixEpoch, DeviceKeyId, encryption::{OneTimeKey, CrossSigningKey, DeviceKeys}, serde::Raw, events::{AnyToDeviceEvent, StateEventType}, DeviceKeyAlgorithm, UInt}; +use std::{collections::BTreeMap, mem::size_of}; + +use ruma::{ + api::client::{device::Device, error::ErrorKind, filter::IncomingFilterDefinition}, + encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, + events::{AnyToDeviceEvent, StateEventType}, + serde::Raw, + DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, MxcUri, RoomAliasId, + UInt, UserId, +}; use tracing::warn; -use crate::{service::{self, users::clean_signatures}, database::KeyValueDatabase, Error, utils, services, Result}; +use crate::{ + database::KeyValueDatabase, + service::{self, users::clean_signatures}, + services, utils, Error, Result, +}; impl service::users::Data for KeyValueDatabase { /// Check if a user has an account on this homeserver. @@ -274,18 +285,21 @@ impl service::users::Data for KeyValueDatabase { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); // All devices have metadata - Box::new(self.userdeviceid_metadata - .scan_prefix(prefix) - .map(|(bytes, _)| { - Ok(utils::string_from_bytes( - bytes - .rsplit(|&b| b == 0xff) - .next() - .ok_or_else(|| Error::bad_database("UserDevice ID in db is invalid."))?, - ) - .map_err(|_| Error::bad_database("Device ID in userdeviceid_metadata is invalid."))? - .into()) - })) + Box::new( + self.userdeviceid_metadata + .scan_prefix(prefix) + .map(|(bytes, _)| { + Ok(utils::string_from_bytes( + bytes.rsplit(|&b| b == 0xff).next().ok_or_else(|| { + Error::bad_database("UserDevice ID in db is invalid.") + })?, + ) + .map_err(|_| { + Error::bad_database("Device ID in userdeviceid_metadata is invalid.") + })? + .into()) + }), + ) } /// Replaces the access token of one device. @@ -341,8 +355,10 @@ impl service::users::Data for KeyValueDatabase { &serde_json::to_vec(&one_time_key_value).expect("OneTimeKey::to_vec always works"), )?; - self.userid_lastonetimekeyupdate - .insert(user_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?; + self.userid_lastonetimekeyupdate.insert( + user_id.as_bytes(), + &services().globals.next_count()?.to_be_bytes(), + )?; Ok(()) } @@ -372,8 +388,10 @@ impl service::users::Data for KeyValueDatabase { prefix.extend_from_slice(key_algorithm.as_ref().as_bytes()); prefix.push(b':'); - self.userid_lastonetimekeyupdate - .insert(user_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?; + self.userid_lastonetimekeyupdate.insert( + user_id.as_bytes(), + &services().globals.next_count()?.to_be_bytes(), + )?; self.onetimekeyid_onetimekeys .scan_prefix(prefix) @@ -617,38 +635,47 @@ impl service::users::Data for KeyValueDatabase { let to = to.unwrap_or(u64::MAX); - Box::new(self.keychangeid_userid - .iter_from(&start, false) - .take_while(move |(k, _)| { - k.starts_with(&prefix) - && if let Some(current) = k.splitn(2, |&b| b == 0xff).nth(1) { - if let Ok(c) = utils::u64_from_bytes(current) { - c <= to + Box::new( + self.keychangeid_userid + .iter_from(&start, false) + .take_while(move |(k, _)| { + k.starts_with(&prefix) + && if let Some(current) = k.splitn(2, |&b| b == 0xff).nth(1) { + if let Ok(c) = utils::u64_from_bytes(current) { + c <= to + } else { + warn!("BadDatabase: Could not parse keychangeid_userid bytes"); + false + } } else { - warn!("BadDatabase: Could not parse keychangeid_userid bytes"); + warn!("BadDatabase: Could not parse keychangeid_userid"); false } - } else { - warn!("BadDatabase: Could not parse keychangeid_userid"); - false - } - }) - .map(|(_, bytes)| { - UserId::parse(utils::string_from_bytes(&bytes).map_err(|_| { - Error::bad_database("User ID in devicekeychangeid_userid is invalid unicode.") - })?) - .map_err(|_| Error::bad_database("User ID in devicekeychangeid_userid is invalid.")) - })) + }) + .map(|(_, bytes)| { + UserId::parse(utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database( + "User ID in devicekeychangeid_userid is invalid unicode.", + ) + })?) + .map_err(|_| { + Error::bad_database("User ID in devicekeychangeid_userid is invalid.") + }) + }), + ) } - fn mark_device_key_update( - &self, - user_id: &UserId, - ) -> Result<()> { + fn mark_device_key_update(&self, user_id: &UserId) -> Result<()> { let count = services().globals.next_count()?.to_be_bytes(); - for room_id in services().rooms.state_cache.rooms_joined(user_id).filter_map(|r| r.ok()) { + for room_id in services() + .rooms + .state_cache + .rooms_joined(user_id) + .filter_map(|r| r.ok()) + { // Don't send key updates to unencrypted rooms - if services().rooms + if services() + .rooms .state_accessor .room_state_get(&room_id, &StateEventType::RoomEncryption, "")? .is_none() @@ -883,20 +910,19 @@ impl service::users::Data for KeyValueDatabase { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); - Box::new(self.userdeviceid_metadata - .scan_prefix(key) - .map(|(_, bytes)| { - serde_json::from_slice::<Device>(&bytes) - .map_err(|_| Error::bad_database("Device in userdeviceid_metadata is invalid.")) - })) + Box::new( + self.userdeviceid_metadata + .scan_prefix(key) + .map(|(_, bytes)| { + serde_json::from_slice::<Device>(&bytes).map_err(|_| { + Error::bad_database("Device in userdeviceid_metadata is invalid.") + }) + }), + ) } /// Creates a new sync filter. Returns the filter id. - fn create_filter( - &self, - user_id: &UserId, - filter: &IncomingFilterDefinition, - ) -> Result<String> { + fn create_filter(&self, user_id: &UserId, filter: &IncomingFilterDefinition) -> Result<String> { let filter_id = utils::random_string(4); let mut key = user_id.as_bytes().to_vec(); |