summaryrefslogtreecommitdiff
path: root/src/database
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2022-10-05 20:34:31 +0200
committerNyaaori <+@nyaaori.cat>2022-10-10 14:02:01 +0200
commita4637e2ba1093065a6fda3fa2ad2b2b9f30eea63 (patch)
tree2d31313957d699875fc61f570686318b523ae0f1 /src/database
parent33a2b2b7729bb40253fd174d99ad773869b5ecfe (diff)
downloadconduit-a4637e2ba1093065a6fda3fa2ad2b2b9f30eea63.zip
cargo fmt
Diffstat (limited to 'src/database')
-rw-r--r--src/database/abstraction/rocksdb.rs2
-rw-r--r--src/database/key_value/account_data.rs14
-rw-r--r--src/database/key_value/appservice.rs11
-rw-r--r--src/database/key_value/globals.rs50
-rw-r--r--src/database/key_value/key_backups.rs24
-rw-r--r--src/database/key_value/media.rs28
-rw-r--r--src/database/key_value/pusher.rs12
-rw-r--r--src/database/key_value/rooms/alias.rs20
-rw-r--r--src/database/key_value/rooms/auth_chain.rs15
-rw-r--r--src/database/key_value/rooms/directory.rs2
-rw-r--r--src/database/key_value/rooms/edus/mod.rs4
-rw-r--r--src/database/key_value/rooms/edus/presence.rs4
-rw-r--r--src/database/key_value/rooms/edus/read_receipt.rs88
-rw-r--r--src/database/key_value/rooms/edus/typing.rs35
-rw-r--r--src/database/key_value/rooms/lazy_load.rs4
-rw-r--r--src/database/key_value/rooms/metadata.rs2
-rw-r--r--src/database/key_value/rooms/outlier.rs4
-rw-r--r--src/database/key_value/rooms/pdu_metadata.rs4
-rw-r--r--src/database/key_value/rooms/search.rs14
-rw-r--r--src/database/key_value/rooms/short.rs19
-rw-r--r--src/database/key_value/rooms/state.rs15
-rw-r--r--src/database/key_value/rooms/state_accessor.rs58
-rw-r--r--src/database/key_value/rooms/state_cache.rs195
-rw-r--r--src/database/key_value/rooms/state_compressor.rs18
-rw-r--r--src/database/key_value/rooms/timeline.rs123
-rw-r--r--src/database/key_value/rooms/user.rs34
-rw-r--r--src/database/key_value/transaction_ids.rs4
-rw-r--r--src/database/key_value/uiaa.rs6
-rw-r--r--src/database/key_value/users.rs138
-rw-r--r--src/database/mod.rs306
30 files changed, 691 insertions, 562 deletions
diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs
index 1388dc3..0727728 100644
--- a/src/database/abstraction/rocksdb.rs
+++ b/src/database/abstraction/rocksdb.rs
@@ -1,4 +1,4 @@
-use super::{super::Config, watchers::Watchers, KvTree, KeyValueDatabaseEngine};
+use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree};
use crate::{utils, Result};
use std::{
future::Future,
diff --git a/src/database/key_value/account_data.rs b/src/database/key_value/account_data.rs
index 5674ac0..7d2a870 100644
--- a/src/database/key_value/account_data.rs
+++ b/src/database/key_value/account_data.rs
@@ -1,9 +1,15 @@
use std::collections::HashMap;
-use ruma::{UserId, DeviceId, signatures::CanonicalJsonValue, api::client::{uiaa::UiaaInfo, error::ErrorKind}, events::{RoomAccountDataEventType, AnyEphemeralRoomEvent}, serde::Raw, RoomId};
-use serde::{Serialize, de::DeserializeOwned};
-
-use crate::{Result, database::KeyValueDatabase, service, Error, utils, services};
+use ruma::{
+ api::client::{error::ErrorKind, uiaa::UiaaInfo},
+ events::{AnyEphemeralRoomEvent, RoomAccountDataEventType},
+ serde::Raw,
+ signatures::CanonicalJsonValue,
+ DeviceId, RoomId, UserId,
+};
+use serde::{de::DeserializeOwned, Serialize};
+
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::account_data::Data for KeyValueDatabase {
/// Places one event in the account data of the user and removes the previous entry.
diff --git a/src/database/key_value/appservice.rs b/src/database/key_value/appservice.rs
index f427ba7..9a821a6 100644
--- a/src/database/key_value/appservice.rs
+++ b/src/database/key_value/appservice.rs
@@ -55,10 +55,13 @@ impl service::appservice::Data for KeyValueDatabase {
}
fn iter_ids<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<String>> + 'a>> {
- Ok(Box::new(self.id_appserviceregistrations.iter().map(|(id, _)| {
- utils::string_from_bytes(&id)
- .map_err(|_| Error::bad_database("Invalid id bytes in id_appserviceregistrations."))
- })))
+ Ok(Box::new(self.id_appserviceregistrations.iter().map(
+ |(id, _)| {
+ utils::string_from_bytes(&id).map_err(|_| {
+ Error::bad_database("Invalid id bytes in id_appserviceregistrations.")
+ })
+ },
+ )))
}
fn all(&self) -> Result<Vec<(String, serde_yaml::Value)>> {
diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs
index 199cbf6..fafaf49 100644
--- a/src/database/key_value/globals.rs
+++ b/src/database/key_value/globals.rs
@@ -2,9 +2,13 @@ use std::collections::BTreeMap;
use async_trait::async_trait;
use futures_util::{stream::FuturesUnordered, StreamExt};
-use ruma::{signatures::Ed25519KeyPair, UserId, DeviceId, ServerName, api::federation::discovery::{ServerSigningKeys, VerifyKey}, ServerSigningKeyId, MilliSecondsSinceUnixEpoch};
+use ruma::{
+ api::federation::discovery::{ServerSigningKeys, VerifyKey},
+ signatures::Ed25519KeyPair,
+ DeviceId, MilliSecondsSinceUnixEpoch, ServerName, ServerSigningKeyId, UserId,
+};
-use crate::{Result, service, database::KeyValueDatabase, Error, utils, services};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
pub const COUNTER: &[u8] = b"c";
@@ -35,28 +39,24 @@ impl service::globals::Data for KeyValueDatabase {
// Return when *any* user changed his key
// TODO: only send for user they share a room with
- futures.push(
- self.todeviceid_events
- .watch_prefix(&userdeviceid_prefix),
- );
+ futures.push(self.todeviceid_events.watch_prefix(&userdeviceid_prefix));
futures.push(self.userroomid_joined.watch_prefix(&userid_prefix));
- futures.push(
- self.userroomid_invitestate
- .watch_prefix(&userid_prefix),
- );
+ futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix));
futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix));
futures.push(
self.userroomid_notificationcount
.watch_prefix(&userid_prefix),
);
- futures.push(
- self.userroomid_highlightcount
- .watch_prefix(&userid_prefix),
- );
+ futures.push(self.userroomid_highlightcount.watch_prefix(&userid_prefix));
// Events for rooms we are in
- for room_id in services().rooms.state_cache.rooms_joined(user_id).filter_map(|r| r.ok()) {
+ for room_id in services()
+ .rooms
+ .state_cache
+ .rooms_joined(user_id)
+ .filter_map(|r| r.ok())
+ {
let short_roomid = services()
.rooms
.short
@@ -75,15 +75,9 @@ impl service::globals::Data for KeyValueDatabase {
futures.push(self.pduid_pdu.watch_prefix(&short_roomid));
// EDUs
- futures.push(
- self.roomid_lasttypingupdate
- .watch_prefix(&roomid_bytes),
- );
+ futures.push(self.roomid_lasttypingupdate.watch_prefix(&roomid_bytes));
- futures.push(
- self.readreceiptid_readreceipt
- .watch_prefix(&roomid_prefix),
- );
+ futures.push(self.readreceiptid_readreceipt.watch_prefix(&roomid_prefix));
// Key changes
futures.push(self.keychangeid_userid.watch_prefix(&roomid_prefix));
@@ -110,10 +104,7 @@ impl service::globals::Data for KeyValueDatabase {
futures.push(self.keychangeid_userid.watch_prefix(&userid_prefix));
// One time keys
- futures.push(
- self.userid_lastonetimekeyupdate
- .watch_prefix(&userid_bytes),
- );
+ futures.push(self.userid_lastonetimekeyupdate.watch_prefix(&userid_bytes));
futures.push(Box::pin(services().globals.rotate.watch()));
@@ -238,10 +229,7 @@ impl service::globals::Data for KeyValueDatabase {
}
fn bump_database_version(&self, new_version: u64) -> Result<()> {
- self.global
- .insert(b"version", &new_version.to_be_bytes())?;
+ self.global.insert(b"version", &new_version.to_be_bytes())?;
Ok(())
}
-
-
}
diff --git a/src/database/key_value/key_backups.rs b/src/database/key_value/key_backups.rs
index 8171451..0738f73 100644
--- a/src/database/key_value/key_backups.rs
+++ b/src/database/key_value/key_backups.rs
@@ -1,8 +1,15 @@
use std::collections::BTreeMap;
-use ruma::{UserId, serde::Raw, api::client::{backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, error::ErrorKind}, RoomId};
+use ruma::{
+ api::client::{
+ backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup},
+ error::ErrorKind,
+ },
+ serde::Raw,
+ RoomId, UserId,
+};
-use crate::{Result, service, database::KeyValueDatabase, services, Error, utils};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::key_backups::Data for KeyValueDatabase {
fn create_backup(
@@ -118,11 +125,7 @@ impl service::key_backups::Data for KeyValueDatabase {
.transpose()
}
- fn get_backup(
- &self,
- user_id: &UserId,
- version: &str,
- ) -> Result<Option<Raw<BackupAlgorithm>>> {
+ fn get_backup(&self, user_id: &UserId, version: &str) -> Result<Option<Raw<BackupAlgorithm>>> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(version.as_bytes());
@@ -322,12 +325,7 @@ impl service::key_backups::Data for KeyValueDatabase {
Ok(())
}
- fn delete_room_keys(
- &self,
- user_id: &UserId,
- version: &str,
- room_id: &RoomId,
- ) -> Result<()> {
+ fn delete_room_keys(&self, user_id: &UserId, version: &str, room_id: &RoomId) -> Result<()> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(version.as_bytes());
diff --git a/src/database/key_value/media.rs b/src/database/key_value/media.rs
index f024487..de96ace 100644
--- a/src/database/key_value/media.rs
+++ b/src/database/key_value/media.rs
@@ -1,9 +1,16 @@
use ruma::api::client::error::ErrorKind;
-use crate::{database::KeyValueDatabase, service, Error, utils, Result};
+use crate::{database::KeyValueDatabase, service, utils, Error, Result};
impl service::media::Data for KeyValueDatabase {
- fn create_file_metadata(&self, mxc: String, width: u32, height: u32, content_disposition: Option<&str>, content_type: Option<&str>) -> Result<Vec<u8>> {
+ fn create_file_metadata(
+ &self,
+ mxc: String,
+ width: u32,
+ height: u32,
+ content_disposition: Option<&str>,
+ content_type: Option<&str>,
+ ) -> Result<Vec<u8>> {
let mut key = mxc.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&width.to_be_bytes());
@@ -28,14 +35,23 @@ impl service::media::Data for KeyValueDatabase {
Ok(key)
}
- fn search_file_metadata(&self, mxc: String, width: u32, height: u32) -> Result<(Option<String>, Option<String>, Vec<u8>)> {
+ fn search_file_metadata(
+ &self,
+ mxc: String,
+ width: u32,
+ height: u32,
+ ) -> Result<(Option<String>, Option<String>, Vec<u8>)> {
let mut prefix = mxc.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail
prefix.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail
prefix.push(0xff);
- let (key, _) = self.mediaid_file.scan_prefix(prefix).next().ok_or(Error::BadRequest(ErrorKind::NotFound, "Media not found"))?;
+ let (key, _) = self
+ .mediaid_file
+ .scan_prefix(prefix)
+ .next()
+ .ok_or(Error::BadRequest(ErrorKind::NotFound, "Media not found"))?;
let mut parts = key.rsplit(|&b| b == 0xff);
@@ -57,9 +73,7 @@ impl service::media::Data for KeyValueDatabase {
} else {
Some(
utils::string_from_bytes(content_disposition_bytes).map_err(|_| {
- Error::bad_database(
- "Content Disposition in mediaid_file is invalid unicode.",
- )
+ Error::bad_database("Content Disposition in mediaid_file is invalid unicode.")
})?,
)
};
diff --git a/src/database/key_value/pusher.rs b/src/database/key_value/pusher.rs
index b05e47b..15f4e26 100644
--- a/src/database/key_value/pusher.rs
+++ b/src/database/key_value/pusher.rs
@@ -1,6 +1,9 @@
-use ruma::{UserId, api::client::push::{set_pusher, get_pushers}};
+use ruma::{
+ api::client::push::{get_pushers, set_pusher},
+ UserId,
+};
-use crate::{service, database::KeyValueDatabase, Error, Result};
+use crate::{database::KeyValueDatabase, service, Error, Result};
impl service::pusher::Data for KeyValueDatabase {
fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> {
@@ -48,10 +51,7 @@ impl service::pusher::Data for KeyValueDatabase {
.collect()
}
- fn get_pusher_senderkeys<'a>(
- &'a self,
- sender: &UserId,
- ) -> Box<dyn Iterator<Item = Vec<u8>>> {
+ fn get_pusher_senderkeys<'a>(&'a self, sender: &UserId) -> Box<dyn Iterator<Item = Vec<u8>>> {
let mut prefix = sender.as_bytes().to_vec();
prefix.push(0xff);
diff --git a/src/database/key_value/rooms/alias.rs b/src/database/key_value/rooms/alias.rs
index 0aa8dd4..112d6eb 100644
--- a/src/database/key_value/rooms/alias.rs
+++ b/src/database/key_value/rooms/alias.rs
@@ -1,13 +1,9 @@
-use ruma::{RoomId, RoomAliasId, api::client::error::ErrorKind};
+use ruma::{api::client::error::ErrorKind, RoomAliasId, RoomId};
-use crate::{service, database::KeyValueDatabase, utils, Error, services, Result};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::alias::Data for KeyValueDatabase {
- fn set_alias(
- &self,
- alias: &RoomAliasId,
- room_id: &RoomId
- ) -> Result<()> {
+ fn set_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> Result<()> {
self.alias_roomid
.insert(alias.alias().as_bytes(), room_id.as_bytes())?;
let mut aliasid = room_id.as_bytes().to_vec();
@@ -17,10 +13,7 @@ impl service::rooms::alias::Data for KeyValueDatabase {
Ok(())
}
- fn remove_alias(
- &self,
- alias: &RoomAliasId,
- ) -> Result<()> {
+ fn remove_alias(&self, alias: &RoomAliasId) -> Result<()> {
if let Some(room_id) = self.alias_roomid.get(alias.alias().as_bytes())? {
let mut prefix = room_id.to_vec();
prefix.push(0xff);
@@ -38,10 +31,7 @@ impl service::rooms::alias::Data for KeyValueDatabase {
Ok(())
}
- fn resolve_local_alias(
- &self,
- alias: &RoomAliasId
- ) -> Result<Option<Box<RoomId>>> {
+ fn resolve_local_alias(&self, alias: &RoomAliasId) -> Result<Option<Box<RoomId>>> {
self.alias_roomid
.get(alias.alias().as_bytes())?
.map(|bytes| {
diff --git a/src/database/key_value/rooms/auth_chain.rs b/src/database/key_value/rooms/auth_chain.rs
index 49d3956..60057ac 100644
--- a/src/database/key_value/rooms/auth_chain.rs
+++ b/src/database/key_value/rooms/auth_chain.rs
@@ -1,6 +1,6 @@
use std::{collections::HashSet, mem::size_of, sync::Arc};
-use crate::{service, database::KeyValueDatabase, Result, utils};
+use crate::{database::KeyValueDatabase, service, utils, Result};
impl service::rooms::auth_chain::Data for KeyValueDatabase {
fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result<Option<Arc<HashSet<u64>>>> {
@@ -12,14 +12,13 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase {
// We only save auth chains for single events in the db
if key.len() == 1 {
// Check DB cache
- let chain = self.shorteventid_authchain
+ let chain = self
+ .shorteventid_authchain
.get(&key[0].to_be_bytes())?
.map(|chain| {
chain
.chunks_exact(size_of::<u64>())
- .map(|chunk| {
- utils::u64_from_bytes(chunk).expect("byte length is correct")
- })
+ .map(|chunk| utils::u64_from_bytes(chunk).expect("byte length is correct"))
.collect()
});
@@ -37,7 +36,6 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase {
}
Ok(None)
-
}
fn cache_auth_chain(&self, key: Vec<u64>, auth_chain: Arc<HashSet<u64>>) -> Result<()> {
@@ -53,7 +51,10 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase {
}
// Cache in RAM
- self.auth_chain_cache.lock().unwrap().insert(key, auth_chain);
+ self.auth_chain_cache
+ .lock()
+ .unwrap()
+ .insert(key, auth_chain);
Ok(())
}
diff --git a/src/database/key_value/rooms/directory.rs b/src/database/key_value/rooms/directory.rs
index 727004e..661c202 100644
--- a/src/database/key_value/rooms/directory.rs
+++ b/src/database/key_value/rooms/directory.rs
@@ -1,6 +1,6 @@
use ruma::RoomId;
-use crate::{service, database::KeyValueDatabase, utils, Error, Result};
+use crate::{database::KeyValueDatabase, service, utils, Error, Result};
impl service::rooms::directory::Data for KeyValueDatabase {
fn set_public(&self, room_id: &RoomId) -> Result<()> {
diff --git a/src/database/key_value/rooms/edus/mod.rs b/src/database/key_value/rooms/edus/mod.rs
index b5007f8..6c65291 100644
--- a/src/database/key_value/rooms/edus/mod.rs
+++ b/src/database/key_value/rooms/edus/mod.rs
@@ -1,7 +1,7 @@
mod presence;
-mod typing;
mod read_receipt;
+mod typing;
-use crate::{service, database::KeyValueDatabase};
+use crate::{database::KeyValueDatabase, service};
impl service::rooms::edus::Data for KeyValueDatabase {}
diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs
index 1477c28..fdd51ce 100644
--- a/src/database/key_value/rooms/edus/presence.rs
+++ b/src/database/key_value/rooms/edus/presence.rs
@@ -1,8 +1,8 @@
use std::collections::HashMap;
-use ruma::{UserId, RoomId, events::presence::PresenceEvent, presence::PresenceState, UInt};
+use ruma::{events::presence::PresenceEvent, presence::PresenceState, RoomId, UInt, UserId};
-use crate::{service, database::KeyValueDatabase, utils, Error, services, Result};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::edus::presence::Data for KeyValueDatabase {
fn update_presence(
diff --git a/src/database/key_value/rooms/edus/read_receipt.rs b/src/database/key_value/rooms/edus/read_receipt.rs
index a12e265..c78f0f5 100644
--- a/src/database/key_value/rooms/edus/read_receipt.rs
+++ b/src/database/key_value/rooms/edus/read_receipt.rs
@@ -1,8 +1,10 @@
use std::mem;
-use ruma::{UserId, RoomId, events::receipt::ReceiptEvent, serde::Raw, signatures::CanonicalJsonObject};
+use ruma::{
+ events::receipt::ReceiptEvent, serde::Raw, signatures::CanonicalJsonObject, RoomId, UserId,
+};
-use crate::{database::KeyValueDatabase, service, utils, Error, services, Result};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
fn readreceipt_update(
@@ -50,13 +52,15 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
&'a self,
room_id: &RoomId,
since: u64,
- ) -> Box<dyn Iterator<
- Item=Result<(
- Box<UserId>,
- u64,
- Raw<ruma::events::AnySyncEphemeralRoomEvent>,
- )>,
- >> {
+ ) -> Box<
+ dyn Iterator<
+ Item = Result<(
+ Box<UserId>,
+ u64,
+ Raw<ruma::events::AnySyncEphemeralRoomEvent>,
+ )>,
+ >,
+ > {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let prefix2 = prefix.clone();
@@ -64,42 +68,44 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
let mut first_possible_edu = prefix.clone();
first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
- Box::new(self.readreceiptid_readreceipt
- .iter_from(&first_possible_edu, false)
- .take_while(move |(k, _)| k.starts_with(&prefix2))
- .map(move |(k, v)| {
- let count =
- utils::u64_from_bytes(&k[prefix.len()..prefix.len() + mem::size_of::<u64>()])
- .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?;
- let user_id = UserId::parse(
- utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..])
- .map_err(|_| {
- Error::bad_database("Invalid readreceiptid userid bytes in db.")
- })?,
- )
+ Box::new(
+ self.readreceiptid_readreceipt
+ .iter_from(&first_possible_edu, false)
+ .take_while(move |(k, _)| k.starts_with(&prefix2))
+ .map(move |(k, v)| {
+ let count = utils::u64_from_bytes(
+ &k[prefix.len()..prefix.len() + mem::size_of::<u64>()],
+ )
+ .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?;
+ let user_id = UserId::parse(
+ utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..])
+ .map_err(|_| {
+ Error::bad_database("Invalid readreceiptid userid bytes in db.")
+ })?,
+ )
.map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?;
- let mut json = serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| {
- Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.")
- })?;
- json.remove("room_id");
-
- Ok((
- user_id,
- count,
- Raw::from_json(
- serde_json::value::to_raw_value(&json).expect("json is valid raw value"),
- ),
- ))
- }))
+ let mut json =
+ serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| {
+ Error::bad_database(
+ "Read receipt in roomlatestid_roomlatest is invalid json.",
+ )
+ })?;
+ json.remove("room_id");
+
+ Ok((
+ user_id,
+ count,
+ Raw::from_json(
+ serde_json::value::to_raw_value(&json)
+ .expect("json is valid raw value"),
+ ),
+ ))
+ }),
+ )
}
- fn private_read_set(
- &self,
- room_id: &RoomId,
- user_id: &UserId,
- count: u64,
- ) -> Result<()> {
+ fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
diff --git a/src/database/key_value/rooms/edus/typing.rs b/src/database/key_value/rooms/edus/typing.rs
index b7d3596..7b211e7 100644
--- a/src/database/key_value/rooms/edus/typing.rs
+++ b/src/database/key_value/rooms/edus/typing.rs
@@ -1,16 +1,11 @@
use std::collections::HashSet;
-use ruma::{UserId, RoomId};
+use ruma::{RoomId, UserId};
-use crate::{database::KeyValueDatabase, service, utils, Error, services, Result};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::edus::typing::Data for KeyValueDatabase {
- fn typing_add(
- &self,
- user_id: &UserId,
- room_id: &RoomId,
- timeout: u64,
- ) -> Result<()> {
+ fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@@ -30,11 +25,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase {
Ok(())
}
- fn typing_remove(
- &self,
- user_id: &UserId,
- room_id: &RoomId,
- ) -> Result<()> {
+ fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@@ -53,17 +44,16 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase {
}
if found_outdated {
- self.roomid_lasttypingupdate
- .insert(room_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?;
+ self.roomid_lasttypingupdate.insert(
+ room_id.as_bytes(),
+ &services().globals.next_count()?.to_be_bytes(),
+ )?;
}
Ok(())
}
- fn last_typing_update(
- &self,
- room_id: &RoomId,
- ) -> Result<u64> {
+ fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
Ok(self
.roomid_lasttypingupdate
.get(room_id.as_bytes())?
@@ -76,10 +66,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase {
.unwrap_or(0))
}
- fn typings_all(
- &self,
- room_id: &RoomId,
- ) -> Result<HashSet<Box<UserId>>> {
+ fn typings_all(&self, room_id: &RoomId) -> Result<HashSet<Box<UserId>>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@@ -89,7 +76,7 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase {
let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| {
Error::bad_database("User ID in typingid_userid is invalid unicode.")
})?)
- .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?;
+ .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?;
user_ids.insert(user_id);
}
diff --git a/src/database/key_value/rooms/lazy_load.rs b/src/database/key_value/rooms/lazy_load.rs
index 133e1d0..a19d52c 100644
--- a/src/database/key_value/rooms/lazy_load.rs
+++ b/src/database/key_value/rooms/lazy_load.rs
@@ -1,6 +1,6 @@
-use ruma::{UserId, DeviceId, RoomId};
+use ruma::{DeviceId, RoomId, UserId};
-use crate::{service, database::KeyValueDatabase, Result};
+use crate::{database::KeyValueDatabase, service, Result};
impl service::rooms::lazy_loading::Data for KeyValueDatabase {
fn lazy_load_was_sent_before(
diff --git a/src/database/key_value/rooms/metadata.rs b/src/database/key_value/rooms/metadata.rs
index 72f6251..63a6b1a 100644
--- a/src/database/key_value/rooms/metadata.rs
+++ b/src/database/key_value/rooms/metadata.rs
@@ -1,6 +1,6 @@
use ruma::RoomId;
-use crate::{service, database::KeyValueDatabase, Result, services};
+use crate::{database::KeyValueDatabase, service, services, Result};
impl service::rooms::metadata::Data for KeyValueDatabase {
fn exists(&self, room_id: &RoomId) -> Result<bool> {
diff --git a/src/database/key_value/rooms/outlier.rs b/src/database/key_value/rooms/outlier.rs
index aa97544..2ecaadb 100644
--- a/src/database/key_value/rooms/outlier.rs
+++ b/src/database/key_value/rooms/outlier.rs
@@ -1,6 +1,6 @@
-use ruma::{EventId, signatures::CanonicalJsonObject};
+use ruma::{signatures::CanonicalJsonObject, EventId};
-use crate::{service, database::KeyValueDatabase, PduEvent, Error, Result};
+use crate::{database::KeyValueDatabase, service, Error, PduEvent, Result};
impl service::rooms::outlier::Data for KeyValueDatabase {
fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
diff --git a/src/database/key_value/rooms/pdu_metadata.rs b/src/database/key_value/rooms/pdu_metadata.rs
index f3ac414..76ec734 100644
--- a/src/database/key_value/rooms/pdu_metadata.rs
+++ b/src/database/key_value/rooms/pdu_metadata.rs
@@ -1,8 +1,8 @@
use std::sync::Arc;
-use ruma::{RoomId, EventId};
+use ruma::{EventId, RoomId};
-use crate::{service, database::KeyValueDatabase, Result};
+use crate::{database::KeyValueDatabase, service, Result};
impl service::rooms::pdu_metadata::Data for KeyValueDatabase {
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> {
diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs
index 41df544..79e6a32 100644
--- a/src/database/key_value/rooms/search.rs
+++ b/src/database/key_value/rooms/search.rs
@@ -2,7 +2,7 @@ use std::mem::size_of;
use ruma::RoomId;
-use crate::{service, database::KeyValueDatabase, utils, Result, services};
+use crate::{database::KeyValueDatabase, service, services, utils, Result};
impl service::rooms::search::Data for KeyValueDatabase {
fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: String) -> Result<()> {
@@ -27,7 +27,9 @@ impl service::rooms::search::Data for KeyValueDatabase {
room_id: &RoomId,
search_string: &str,
) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>>>, Vec<String>)>> {
- let prefix = services().rooms.short
+ let prefix = services()
+ .rooms
+ .short
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
@@ -63,10 +65,10 @@ impl service::rooms::search::Data for KeyValueDatabase {
};
let mapped = common_elements.map(move |id| {
- let mut pduid = prefix_clone.clone();
- pduid.extend_from_slice(&id);
- pduid
- });
+ let mut pduid = prefix_clone.clone();
+ pduid.extend_from_slice(&id);
+ pduid
+ });
Ok(Some((Box::new(mapped), words)))
}
diff --git a/src/database/key_value/rooms/short.rs b/src/database/key_value/rooms/short.rs
index ecd12da..c022317 100644
--- a/src/database/key_value/rooms/short.rs
+++ b/src/database/key_value/rooms/short.rs
@@ -1,14 +1,11 @@
use std::sync::Arc;
-use ruma::{EventId, events::StateEventType, RoomId};
+use ruma::{events::StateEventType, EventId, RoomId};
-use crate::{Result, database::KeyValueDatabase, service, utils, Error, services};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::short::Data for KeyValueDatabase {
- fn get_or_create_shorteventid(
- &self,
- event_id: &EventId,
- ) -> Result<u64> {
+ fn get_or_create_shorteventid(&self, event_id: &EventId) -> Result<u64> {
if let Some(short) = self.eventidshort_cache.lock().unwrap().get_mut(event_id) {
return Ok(*short);
}
@@ -180,10 +177,7 @@ impl service::rooms::short::Data for KeyValueDatabase {
}
/// Returns (shortstatehash, already_existed)
- fn get_or_create_shortstatehash(
- &self,
- state_hash: &[u8],
- ) -> Result<(u64, bool)> {
+ fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> Result<(u64, bool)> {
Ok(match self.statehash_shortstatehash.get(state_hash)? {
Some(shortstatehash) => (
utils::u64_from_bytes(&shortstatehash)
@@ -209,10 +203,7 @@ impl service::rooms::short::Data for KeyValueDatabase {
.transpose()
}
- fn get_or_create_shortroomid(
- &self,
- room_id: &RoomId,
- ) -> Result<u64> {
+ fn get_or_create_shortroomid(&self, room_id: &RoomId) -> Result<u64> {
Ok(match self.roomid_shortroomid.get(room_id.as_bytes())? {
Some(short) => utils::u64_from_bytes(&short)
.map_err(|_| Error::bad_database("Invalid shortroomid in db."))?,
diff --git a/src/database/key_value/rooms/state.rs b/src/database/key_value/rooms/state.rs
index 90ac0d5..80a7458 100644
--- a/src/database/key_value/rooms/state.rs
+++ b/src/database/key_value/rooms/state.rs
@@ -1,10 +1,10 @@
-use ruma::{RoomId, EventId};
-use tokio::sync::MutexGuard;
-use std::sync::Arc;
+use ruma::{EventId, RoomId};
use std::collections::HashSet;
use std::fmt::Debug;
+use std::sync::Arc;
+use tokio::sync::MutexGuard;
-use crate::{service, database::KeyValueDatabase, utils, Error, Result};
+use crate::{database::KeyValueDatabase, service, utils, Error, Result};
impl service::rooms::state::Data for KeyValueDatabase {
fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> {
@@ -17,9 +17,12 @@ impl service::rooms::state::Data for KeyValueDatabase {
})
}
- fn set_room_state(&self, room_id: &RoomId, new_shortstatehash: u64,
+ fn set_room_state(
+ &self,
+ room_id: &RoomId,
+ new_shortstatehash: u64,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
- ) -> Result<()> {
+ ) -> Result<()> {
self.roomid_shortstatehash
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?;
Ok(())
diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs
index 4d5bd4a..39c261f 100644
--- a/src/database/key_value/rooms/state_accessor.rs
+++ b/src/database/key_value/rooms/state_accessor.rs
@@ -1,13 +1,18 @@
-use std::{collections::{BTreeMap, HashMap}, sync::Arc};
+use std::{
+ collections::{BTreeMap, HashMap},
+ sync::Arc,
+};
-use crate::{database::KeyValueDatabase, service, PduEvent, Error, utils, Result, services};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
use async_trait::async_trait;
-use ruma::{EventId, events::StateEventType, RoomId};
+use ruma::{events::StateEventType, EventId, RoomId};
#[async_trait]
impl service::rooms::state_accessor::Data for KeyValueDatabase {
async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
- let full_state = services().rooms.state_compressor
+ let full_state = services()
+ .rooms
+ .state_compressor
.load_shortstatehash_info(shortstatehash)?
.pop()
.expect("there is always one layer")
@@ -15,7 +20,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
let mut result = BTreeMap::new();
let mut i = 0;
for compressed in full_state.into_iter() {
- let parsed = services().rooms.state_compressor.parse_compressed_state_event(compressed)?;
+ let parsed = services()
+ .rooms
+ .state_compressor
+ .parse_compressed_state_event(compressed)?;
result.insert(parsed.0, parsed.1);
i += 1;
@@ -30,7 +38,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
&self,
shortstatehash: u64,
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
- let full_state = services().rooms.state_compressor
+ let full_state = services()
+ .rooms
+ .state_compressor
.load_shortstatehash_info(shortstatehash)?
.pop()
.expect("there is always one layer")
@@ -39,7 +49,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
let mut result = HashMap::new();
let mut i = 0;
for compressed in full_state {
- let (_, eventid) = services().rooms.state_compressor.parse_compressed_state_event(compressed)?;
+ let (_, eventid) = services()
+ .rooms
+ .state_compressor
+ .parse_compressed_state_event(compressed)?;
if let Some(pdu) = services().rooms.timeline.get_pdu(&eventid)? {
result.insert(
(
@@ -69,11 +82,17 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
event_type: &StateEventType,
state_key: &str,
) -> Result<Option<Arc<EventId>>> {
- let shortstatekey = match services().rooms.short.get_shortstatekey(event_type, state_key)? {
+ let shortstatekey = match services()
+ .rooms
+ .short
+ .get_shortstatekey(event_type, state_key)?
+ {
Some(s) => s,
None => return Ok(None),
};
- let full_state = services().rooms.state_compressor
+ let full_state = services()
+ .rooms
+ .state_compressor
.load_shortstatehash_info(shortstatehash)?
.pop()
.expect("there is always one layer")
@@ -82,7 +101,10 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
.into_iter()
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
.and_then(|compressed| {
- services().rooms.state_compressor.parse_compressed_state_event(compressed)
+ services()
+ .rooms
+ .state_compressor
+ .parse_compressed_state_event(compressed)
.ok()
.map(|(_, id)| id)
}))
@@ -96,7 +118,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
state_key: &str,
) -> Result<Option<Arc<PduEvent>>> {
self.state_get_id(shortstatehash, event_type, state_key)?
- .map_or(Ok(None), |event_id| services().rooms.timeline.get_pdu(&event_id))
+ .map_or(Ok(None), |event_id| {
+ services().rooms.timeline.get_pdu(&event_id)
+ })
}
/// Returns the state hash for this pdu.
@@ -122,7 +146,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
&self,
room_id: &RoomId,
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
- if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? {
+ if let Some(current_shortstatehash) =
+ services().rooms.state.get_room_shortstatehash(room_id)?
+ {
self.state_full(current_shortstatehash).await
} else {
Ok(HashMap::new())
@@ -136,7 +162,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
event_type: &StateEventType,
state_key: &str,
) -> Result<Option<Arc<EventId>>> {
- if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? {
+ if let Some(current_shortstatehash) =
+ services().rooms.state.get_room_shortstatehash(room_id)?
+ {
self.state_get_id(current_shortstatehash, event_type, state_key)
} else {
Ok(None)
@@ -150,7 +178,9 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
event_type: &StateEventType,
state_key: &str,
) -> Result<Option<Arc<PduEvent>>> {
- if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? {
+ if let Some(current_shortstatehash) =
+ services().rooms.state.get_room_shortstatehash(room_id)?
+ {
self.state_get(current_shortstatehash, event_type, state_key)
} else {
Ok(None)
diff --git a/src/database/key_value/rooms/state_cache.rs b/src/database/key_value/rooms/state_cache.rs
index 4043bc4..4ca6ac4 100644
--- a/src/database/key_value/rooms/state_cache.rs
+++ b/src/database/key_value/rooms/state_cache.rs
@@ -1,9 +1,13 @@
use std::{collections::HashSet, sync::Arc};
use regex::Regex;
-use ruma::{UserId, RoomId, events::{AnyStrippedStateEvent, AnySyncStateEvent}, serde::Raw, ServerName};
+use ruma::{
+ events::{AnyStrippedStateEvent, AnySyncStateEvent},
+ serde::Raw,
+ RoomId, ServerName, UserId,
+};
-use crate::{service, database::KeyValueDatabase, services, Result, Error, utils};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::state_cache::Data for KeyValueDatabase {
fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
@@ -31,8 +35,13 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
Ok(())
}
-
- fn mark_as_invited(&self, user_id: &UserId, room_id: &RoomId, last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>) -> Result<()> {
+
+ fn mark_as_invited(
+ &self,
+ user_id: &UserId,
+ room_id: &RoomId,
+ last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
+ ) -> Result<()> {
let mut roomuser_id = room_id.as_bytes().to_vec();
roomuser_id.push(0xff);
roomuser_id.extend_from_slice(user_id.as_bytes());
@@ -46,8 +55,10 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
&serde_json::to_vec(&last_state.unwrap_or_default())
.expect("state to bytes always works"),
)?;
- self.roomuserid_invitecount
- .insert(&roomuser_id, &services().globals.next_count()?.to_be_bytes())?;
+ self.roomuserid_invitecount.insert(
+ &roomuser_id,
+ &services().globals.next_count()?.to_be_bytes(),
+ )?;
self.userroomid_joined.remove(&userroom_id)?;
self.roomuserid_joined.remove(&roomuser_id)?;
self.userroomid_leftstate.remove(&userroom_id)?;
@@ -69,8 +80,10 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
&userroom_id,
&serde_json::to_vec(&Vec::<Raw<AnySyncStateEvent>>::new()).unwrap(),
)?; // TODO
- self.roomuserid_leftcount
- .insert(&roomuser_id, &services().globals.next_count()?.to_be_bytes())?;
+ self.roomuserid_leftcount.insert(
+ &roomuser_id,
+ &services().globals.next_count()?.to_be_bytes(),
+ )?;
self.userroomid_joined.remove(&userroom_id)?;
self.roomuserid_joined.remove(&roomuser_id)?;
self.userroomid_invitestate.remove(&userroom_id)?;
@@ -324,21 +337,25 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
- Box::new(self.roomuseroncejoinedids
- .scan_prefix(prefix)
- .map(|(key, _)| {
- UserId::parse(
- utils::string_from_bytes(
- key.rsplit(|&b| b == 0xff)
- .next()
- .expect("rsplit always returns an element"),
+ Box::new(
+ self.roomuseroncejoinedids
+ .scan_prefix(prefix)
+ .map(|(key, _)| {
+ UserId::parse(
+ utils::string_from_bytes(
+ key.rsplit(|&b| b == 0xff)
+ .next()
+ .expect("rsplit always returns an element"),
+ )
+ .map_err(|_| {
+ Error::bad_database(
+ "User ID in room_useroncejoined is invalid unicode.",
+ )
+ })?,
)
- .map_err(|_| {
- Error::bad_database("User ID in room_useroncejoined is invalid unicode.")
- })?,
- )
- .map_err(|_| Error::bad_database("User ID in room_useroncejoined is invalid."))
- }))
+ .map_err(|_| Error::bad_database("User ID in room_useroncejoined is invalid."))
+ }),
+ )
}
/// Returns an iterator over all invited members of a room.
@@ -350,21 +367,23 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
- Box::new(self.roomuserid_invitecount
- .scan_prefix(prefix)
- .map(|(key, _)| {
- UserId::parse(
- utils::string_from_bytes(
- key.rsplit(|&b| b == 0xff)
- .next()
- .expect("rsplit always returns an element"),
+ Box::new(
+ self.roomuserid_invitecount
+ .scan_prefix(prefix)
+ .map(|(key, _)| {
+ UserId::parse(
+ utils::string_from_bytes(
+ key.rsplit(|&b| b == 0xff)
+ .next()
+ .expect("rsplit always returns an element"),
+ )
+ .map_err(|_| {
+ Error::bad_database("User ID in roomuserid_invited is invalid unicode.")
+ })?,
)
- .map_err(|_| {
- Error::bad_database("User ID in roomuserid_invited is invalid unicode.")
- })?,
- )
- .map_err(|_| Error::bad_database("User ID in roomuserid_invited is invalid."))
- }))
+ .map_err(|_| Error::bad_database("User ID in roomuserid_invited is invalid."))
+ }),
+ )
}
#[tracing::instrument(skip(self))]
@@ -403,21 +422,23 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<Box<RoomId>>> + 'a> {
- Box::new(self.userroomid_joined
- .scan_prefix(user_id.as_bytes().to_vec())
- .map(|(key, _)| {
- RoomId::parse(
- utils::string_from_bytes(
- key.rsplit(|&b| b == 0xff)
- .next()
- .expect("rsplit always returns an element"),
+ Box::new(
+ self.userroomid_joined
+ .scan_prefix(user_id.as_bytes().to_vec())
+ .map(|(key, _)| {
+ RoomId::parse(
+ utils::string_from_bytes(
+ key.rsplit(|&b| b == 0xff)
+ .next()
+ .expect("rsplit always returns an element"),
+ )
+ .map_err(|_| {
+ Error::bad_database("Room ID in userroomid_joined is invalid unicode.")
+ })?,
)
- .map_err(|_| {
- Error::bad_database("Room ID in userroomid_joined is invalid unicode.")
- })?,
- )
- .map_err(|_| Error::bad_database("Room ID in userroomid_joined is invalid."))
- }))
+ .map_err(|_| Error::bad_database("Room ID in userroomid_joined is invalid."))
+ }),
+ )
}
/// Returns an iterator over all rooms a user was invited to.
@@ -429,26 +450,31 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
- Box::new(self.userroomid_invitestate
- .scan_prefix(prefix)
- .map(|(key, state)| {
- let room_id = RoomId::parse(
- utils::string_from_bytes(
- key.rsplit(|&b| b == 0xff)
- .next()
- .expect("rsplit always returns an element"),
+ Box::new(
+ self.userroomid_invitestate
+ .scan_prefix(prefix)
+ .map(|(key, state)| {
+ let room_id = RoomId::parse(
+ utils::string_from_bytes(
+ key.rsplit(|&b| b == 0xff)
+ .next()
+ .expect("rsplit always returns an element"),
+ )
+ .map_err(|_| {
+ Error::bad_database("Room ID in userroomid_invited is invalid unicode.")
+ })?,
)
.map_err(|_| {
- Error::bad_database("Room ID in userroomid_invited is invalid unicode.")
- })?,
- )
- .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?;
+ Error::bad_database("Room ID in userroomid_invited is invalid.")
+ })?;
- let state = serde_json::from_slice(&state)
- .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?;
+ let state = serde_json::from_slice(&state).map_err(|_| {
+ Error::bad_database("Invalid state in userroomid_invitestate.")
+ })?;
- Ok((room_id, state))
- }))
+ Ok((room_id, state))
+ }),
+ )
}
#[tracing::instrument(skip(self))]
@@ -502,26 +528,31 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
- Box::new(self.userroomid_leftstate
- .scan_prefix(prefix)
- .map(|(key, state)| {
- let room_id = RoomId::parse(
- utils::string_from_bytes(
- key.rsplit(|&b| b == 0xff)
- .next()
- .expect("rsplit always returns an element"),
+ Box::new(
+ self.userroomid_leftstate
+ .scan_prefix(prefix)
+ .map(|(key, state)| {
+ let room_id = RoomId::parse(
+ utils::string_from_bytes(
+ key.rsplit(|&b| b == 0xff)
+ .next()
+ .expect("rsplit always returns an element"),
+ )
+ .map_err(|_| {
+ Error::bad_database("Room ID in userroomid_invited is invalid unicode.")
+ })?,
)
.map_err(|_| {
- Error::bad_database("Room ID in userroomid_invited is invalid unicode.")
- })?,
- )
- .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?;
+ Error::bad_database("Room ID in userroomid_invited is invalid.")
+ })?;
- let state = serde_json::from_slice(&state)
- .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?;
+ let state = serde_json::from_slice(&state).map_err(|_| {
+ Error::bad_database("Invalid state in userroomid_leftstate.")
+ })?;
- Ok((room_id, state))
- }))
+ Ok((room_id, state))
+ }),
+ )
}
#[tracing::instrument(skip(self))]
diff --git a/src/database/key_value/rooms/state_compressor.rs b/src/database/key_value/rooms/state_compressor.rs
index aee1890..d0a9be4 100644
--- a/src/database/key_value/rooms/state_compressor.rs
+++ b/src/database/key_value/rooms/state_compressor.rs
@@ -1,6 +1,10 @@
use std::{collections::HashSet, mem::size_of};
-use crate::{service::{self, rooms::state_compressor::data::StateDiff}, database::KeyValueDatabase, Error, utils, Result};
+use crate::{
+ database::KeyValueDatabase,
+ service::{self, rooms::state_compressor::data::StateDiff},
+ utils, Error, Result,
+};
impl service::rooms::state_compressor::Data for KeyValueDatabase {
fn get_statediff(&self, shortstatehash: u64) -> Result<StateDiff> {
@@ -10,11 +14,7 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase {
.ok_or_else(|| Error::bad_database("State hash does not exist"))?;
let parent =
utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length");
- let parent = if parent != 0 {
- Some(parent)
- } else {
- None
- };
+ let parent = if parent != 0 { Some(parent) } else { None };
let mut add_mode = true;
let mut added = HashSet::new();
@@ -35,7 +35,11 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase {
i += 2 * size_of::<u64>();
}
- Ok(StateDiff { parent, added, removed })
+ Ok(StateDiff {
+ parent,
+ added,
+ removed,
+ })
}
fn save_statediff(&self, shortstatehash: u64, diff: StateDiff) -> Result<()> {
diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs
index 1723186..5d684a1 100644
--- a/src/database/key_value/rooms/timeline.rs
+++ b/src/database/key_value/rooms/timeline.rs
@@ -1,13 +1,17 @@
use std::{collections::hash_map, mem::size_of, sync::Arc};
-use ruma::{UserId, RoomId, api::client::error::ErrorKind, EventId, signatures::CanonicalJsonObject};
+use ruma::{
+ api::client::error::ErrorKind, signatures::CanonicalJsonObject, EventId, RoomId, UserId,
+};
use tracing::error;
-use crate::{service, database::KeyValueDatabase, utils, Error, PduEvent, Result, services};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
impl service::rooms::timeline::Data for KeyValueDatabase {
fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
- let prefix = services().rooms.short
+ let prefix = services()
+ .rooms
+ .short
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
@@ -82,10 +86,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
}
/// Returns the json of a pdu.
- fn get_non_outlier_pdu_json(
- &self,
- event_id: &EventId,
- ) -> Result<Option<CanonicalJsonObject>> {
+ fn get_non_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map(|pduid| {
@@ -187,10 +188,17 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))
}
- fn append_pdu(&self, pdu_id: &[u8], pdu: &PduEvent, json: &CanonicalJsonObject, count: u64) -> Result<()> {
+ fn append_pdu(
+ &self,
+ pdu_id: &[u8],
+ pdu: &PduEvent,
+ json: &CanonicalJsonObject,
+ count: u64,
+ ) -> Result<()> {
self.pduid_pdu.insert(
pdu_id,
- &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"))?;
+ &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"),
+ )?;
self.lasttimelinecount_cache
.lock()
@@ -209,7 +217,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
if self.pduid_pdu.get(pdu_id)?.is_some() {
self.pduid_pdu.insert(
pdu_id,
- &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"))?;
+ &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"),
+ )?;
Ok(())
} else {
Err(Error::BadRequest(
@@ -227,7 +236,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
room_id: &RoomId,
since: u64,
) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> {
- let prefix = services().rooms.short
+ let prefix = services()
+ .rooms
+ .short
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
@@ -239,18 +250,19 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
let user_id = user_id.to_owned();
- Ok(Box::new(self
- .pduid_pdu
- .iter_from(&first_pdu_id, false)
- .take_while(move |(k, _)| k.starts_with(&prefix))
- .map(move |(pdu_id, v)| {
- let mut pdu = serde_json::from_slice::<PduEvent>(&v)
- .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
- if pdu.sender != user_id {
- pdu.remove_transaction_id()?;
- }
- Ok((pdu_id, pdu))
- })))
+ Ok(Box::new(
+ self.pduid_pdu
+ .iter_from(&first_pdu_id, false)
+ .take_while(move |(k, _)| k.starts_with(&prefix))
+ .map(move |(pdu_id, v)| {
+ let mut pdu = serde_json::from_slice::<PduEvent>(&v)
+ .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
+ if pdu.sender != user_id {
+ pdu.remove_transaction_id()?;
+ }
+ Ok((pdu_id, pdu))
+ }),
+ ))
}
/// Returns an iterator over all events and their tokens in a room that happened before the
@@ -262,7 +274,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
until: u64,
) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> {
// Create the first part of the full pdu id
- let prefix = services().rooms.short
+ let prefix = services()
+ .rooms
+ .short
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
@@ -275,18 +289,19 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
let user_id = user_id.to_owned();
- Ok(Box::new(self
- .pduid_pdu
- .iter_from(current, true)
- .take_while(move |(k, _)| k.starts_with(&prefix))
- .map(move |(pdu_id, v)| {
- let mut pdu = serde_json::from_slice::<PduEvent>(&v)
- .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
- if pdu.sender != user_id {
- pdu.remove_transaction_id()?;
- }
- Ok((pdu_id, pdu))
- })))
+ Ok(Box::new(
+ self.pduid_pdu
+ .iter_from(current, true)
+ .take_while(move |(k, _)| k.starts_with(&prefix))
+ .map(move |(pdu_id, v)| {
+ let mut pdu = serde_json::from_slice::<PduEvent>(&v)
+ .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
+ if pdu.sender != user_id {
+ pdu.remove_transaction_id()?;
+ }
+ Ok((pdu_id, pdu))
+ }),
+ ))
}
fn pdus_after<'a>(
@@ -296,7 +311,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
from: u64,
) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>>>> {
// Create the first part of the full pdu id
- let prefix = services().rooms.short
+ let prefix = services()
+ .rooms
+ .short
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
@@ -309,21 +326,27 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
let user_id = user_id.to_owned();
- Ok(Box::new(self
- .pduid_pdu
- .iter_from(current, false)
- .take_while(move |(k, _)| k.starts_with(&prefix))
- .map(move |(pdu_id, v)| {
- let mut pdu = serde_json::from_slice::<PduEvent>(&v)
- .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
- if pdu.sender != user_id {
- pdu.remove_transaction_id()?;
- }
- Ok((pdu_id, pdu))
- })))
+ Ok(Box::new(
+ self.pduid_pdu
+ .iter_from(current, false)
+ .take_while(move |(k, _)| k.starts_with(&prefix))
+ .map(move |(pdu_id, v)| {
+ let mut pdu = serde_json::from_slice::<PduEvent>(&v)
+ .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
+ if pdu.sender != user_id {
+ pdu.remove_transaction_id()?;
+ }
+ Ok((pdu_id, pdu))
+ }),
+ ))
}
- fn increment_notification_counts(&self, room_id: &RoomId, notifies: Vec<Box<UserId>>, highlights: Vec<Box<UserId>>) -> Result<()> {
+ fn increment_notification_counts(
+ &self,
+ room_id: &RoomId,
+ notifies: Vec<Box<UserId>>,
+ highlights: Vec<Box<UserId>>,
+ ) -> Result<()> {
let notifies_batch = Vec::new();
let highlights_batch = Vec::new();
for user in notifies {
diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs
index 3759bda..78c78e1 100644
--- a/src/database/key_value/rooms/user.rs
+++ b/src/database/key_value/rooms/user.rs
@@ -1,6 +1,6 @@
-use ruma::{UserId, RoomId};
+use ruma::{RoomId, UserId};
-use crate::{service, database::KeyValueDatabase, utils, Error, Result, services};
+use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::user::Data for KeyValueDatabase {
fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
@@ -50,7 +50,11 @@ impl service::rooms::user::Data for KeyValueDatabase {
token: u64,
shortstatehash: u64,
) -> Result<()> {
- let shortroomid = services().rooms.short.get_shortroomid(room_id)?.expect("room exists");
+ let shortroomid = services()
+ .rooms
+ .short
+ .get_shortroomid(room_id)?
+ .expect("room exists");
let mut key = shortroomid.to_be_bytes().to_vec();
key.extend_from_slice(&token.to_be_bytes());
@@ -60,7 +64,11 @@ impl service::rooms::user::Data for KeyValueDatabase {
}
fn get_token_shortstatehash(&self, room_id: &RoomId, token: u64) -> Result<Option<u64>> {
- let shortroomid = services().rooms.short.get_shortroomid(room_id)?.expect("room exists");
+ let shortroomid = services()
+ .rooms
+ .short
+ .get_shortroomid(room_id)?
+ .expect("room exists");
let mut key = shortroomid.to_be_bytes().to_vec();
key.extend_from_slice(&token.to_be_bytes());
@@ -102,13 +110,15 @@ impl service::rooms::user::Data for KeyValueDatabase {
});
// We use the default compare function because keys are sorted correctly (not reversed)
- Ok(Box::new(Box::new(utils::common_elements(iterators, Ord::cmp)
- .expect("users is not empty")
- .map(|bytes| {
- RoomId::parse(utils::string_from_bytes(&*bytes).map_err(|_| {
- Error::bad_database("Invalid RoomId bytes in userroomid_joined")
- })?)
- .map_err(|_| Error::bad_database("Invalid RoomId in userroomid_joined."))
- }))))
+ Ok(Box::new(Box::new(
+ utils::common_elements(iterators, Ord::cmp)
+ .expect("users is not empty")
+ .map(|bytes| {
+ RoomId::parse(utils::string_from_bytes(&*bytes).map_err(|_| {
+ Error::bad_database("Invalid RoomId bytes in userroomid_joined")
+ })?)
+ .map_err(|_| Error::bad_database("Invalid RoomId in userroomid_joined."))
+ }),
+ )))
}
}
diff --git a/src/database/key_value/transaction_ids.rs b/src/database/key_value/transaction_ids.rs
index a63b3c5..2ea6ad4 100644
--- a/src/database/key_value/transaction_ids.rs
+++ b/src/database/key_value/transaction_ids.rs
@@ -1,6 +1,6 @@
-use ruma::{UserId, DeviceId, TransactionId};
+use ruma::{DeviceId, TransactionId, UserId};
-use crate::{service, database::KeyValueDatabase, Result};
+use crate::{database::KeyValueDatabase, service, Result};
impl service::transaction_ids::Data for KeyValueDatabase {
fn add_txnid(
diff --git a/src/database/key_value/uiaa.rs b/src/database/key_value/uiaa.rs
index cf242de..8a9f176 100644
--- a/src/database/key_value/uiaa.rs
+++ b/src/database/key_value/uiaa.rs
@@ -1,4 +1,8 @@
-use ruma::{UserId, DeviceId, signatures::CanonicalJsonValue, api::client::{uiaa::UiaaInfo, error::ErrorKind}};
+use ruma::{
+ api::client::{error::ErrorKind, uiaa::UiaaInfo},
+ signatures::CanonicalJsonValue,
+ DeviceId, UserId,
+};
use crate::{database::KeyValueDatabase, service, Error, Result};
diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs
index 55a518d..15699a1 100644
--- a/src/database/key_value/users.rs
+++ b/src/database/key_value/users.rs
@@ -1,9 +1,20 @@
-use std::{mem::size_of, collections::BTreeMap};
-
-use ruma::{api::client::{filter::IncomingFilterDefinition, error::ErrorKind, device::Device}, UserId, RoomAliasId, MxcUri, DeviceId, MilliSecondsSinceUnixEpoch, DeviceKeyId, encryption::{OneTimeKey, CrossSigningKey, DeviceKeys}, serde::Raw, events::{AnyToDeviceEvent, StateEventType}, DeviceKeyAlgorithm, UInt};
+use std::{collections::BTreeMap, mem::size_of};
+
+use ruma::{
+ api::client::{device::Device, error::ErrorKind, filter::IncomingFilterDefinition},
+ encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
+ events::{AnyToDeviceEvent, StateEventType},
+ serde::Raw,
+ DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, MxcUri, RoomAliasId,
+ UInt, UserId,
+};
use tracing::warn;
-use crate::{service::{self, users::clean_signatures}, database::KeyValueDatabase, Error, utils, services, Result};
+use crate::{
+ database::KeyValueDatabase,
+ service::{self, users::clean_signatures},
+ services, utils, Error, Result,
+};
impl service::users::Data for KeyValueDatabase {
/// Check if a user has an account on this homeserver.
@@ -274,18 +285,21 @@ impl service::users::Data for KeyValueDatabase {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
// All devices have metadata
- Box::new(self.userdeviceid_metadata
- .scan_prefix(prefix)
- .map(|(bytes, _)| {
- Ok(utils::string_from_bytes(
- bytes
- .rsplit(|&b| b == 0xff)
- .next()
- .ok_or_else(|| Error::bad_database("UserDevice ID in db is invalid."))?,
- )
- .map_err(|_| Error::bad_database("Device ID in userdeviceid_metadata is invalid."))?
- .into())
- }))
+ Box::new(
+ self.userdeviceid_metadata
+ .scan_prefix(prefix)
+ .map(|(bytes, _)| {
+ Ok(utils::string_from_bytes(
+ bytes.rsplit(|&b| b == 0xff).next().ok_or_else(|| {
+ Error::bad_database("UserDevice ID in db is invalid.")
+ })?,
+ )
+ .map_err(|_| {
+ Error::bad_database("Device ID in userdeviceid_metadata is invalid.")
+ })?
+ .into())
+ }),
+ )
}
/// Replaces the access token of one device.
@@ -341,8 +355,10 @@ impl service::users::Data for KeyValueDatabase {
&serde_json::to_vec(&one_time_key_value).expect("OneTimeKey::to_vec always works"),
)?;
- self.userid_lastonetimekeyupdate
- .insert(user_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?;
+ self.userid_lastonetimekeyupdate.insert(
+ user_id.as_bytes(),
+ &services().globals.next_count()?.to_be_bytes(),
+ )?;
Ok(())
}
@@ -372,8 +388,10 @@ impl service::users::Data for KeyValueDatabase {
prefix.extend_from_slice(key_algorithm.as_ref().as_bytes());
prefix.push(b':');
- self.userid_lastonetimekeyupdate
- .insert(user_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?;
+ self.userid_lastonetimekeyupdate.insert(
+ user_id.as_bytes(),
+ &services().globals.next_count()?.to_be_bytes(),
+ )?;
self.onetimekeyid_onetimekeys
.scan_prefix(prefix)
@@ -617,38 +635,47 @@ impl service::users::Data for KeyValueDatabase {
let to = to.unwrap_or(u64::MAX);
- Box::new(self.keychangeid_userid
- .iter_from(&start, false)
- .take_while(move |(k, _)| {
- k.starts_with(&prefix)
- && if let Some(current) = k.splitn(2, |&b| b == 0xff).nth(1) {
- if let Ok(c) = utils::u64_from_bytes(current) {
- c <= to
+ Box::new(
+ self.keychangeid_userid
+ .iter_from(&start, false)
+ .take_while(move |(k, _)| {
+ k.starts_with(&prefix)
+ && if let Some(current) = k.splitn(2, |&b| b == 0xff).nth(1) {
+ if let Ok(c) = utils::u64_from_bytes(current) {
+ c <= to
+ } else {
+ warn!("BadDatabase: Could not parse keychangeid_userid bytes");
+ false
+ }
} else {
- warn!("BadDatabase: Could not parse keychangeid_userid bytes");
+ warn!("BadDatabase: Could not parse keychangeid_userid");
false
}
- } else {
- warn!("BadDatabase: Could not parse keychangeid_userid");
- false
- }
- })
- .map(|(_, bytes)| {
- UserId::parse(utils::string_from_bytes(&bytes).map_err(|_| {
- Error::bad_database("User ID in devicekeychangeid_userid is invalid unicode.")
- })?)
- .map_err(|_| Error::bad_database("User ID in devicekeychangeid_userid is invalid."))
- }))
+ })
+ .map(|(_, bytes)| {
+ UserId::parse(utils::string_from_bytes(&bytes).map_err(|_| {
+ Error::bad_database(
+ "User ID in devicekeychangeid_userid is invalid unicode.",
+ )
+ })?)
+ .map_err(|_| {
+ Error::bad_database("User ID in devicekeychangeid_userid is invalid.")
+ })
+ }),
+ )
}
- fn mark_device_key_update(
- &self,
- user_id: &UserId,
- ) -> Result<()> {
+ fn mark_device_key_update(&self, user_id: &UserId) -> Result<()> {
let count = services().globals.next_count()?.to_be_bytes();
- for room_id in services().rooms.state_cache.rooms_joined(user_id).filter_map(|r| r.ok()) {
+ for room_id in services()
+ .rooms
+ .state_cache
+ .rooms_joined(user_id)
+ .filter_map(|r| r.ok())
+ {
// Don't send key updates to unencrypted rooms
- if services().rooms
+ if services()
+ .rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomEncryption, "")?
.is_none()
@@ -883,20 +910,19 @@ impl service::users::Data for KeyValueDatabase {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
- Box::new(self.userdeviceid_metadata
- .scan_prefix(key)
- .map(|(_, bytes)| {
- serde_json::from_slice::<Device>(&bytes)
- .map_err(|_| Error::bad_database("Device in userdeviceid_metadata is invalid."))
- }))
+ Box::new(
+ self.userdeviceid_metadata
+ .scan_prefix(key)
+ .map(|(_, bytes)| {
+ serde_json::from_slice::<Device>(&bytes).map_err(|_| {
+ Error::bad_database("Device in userdeviceid_metadata is invalid.")
+ })
+ }),
+ )
}
/// Creates a new sync filter. Returns the filter id.
- fn create_filter(
- &self,
- user_id: &UserId,
- filter: &IncomingFilterDefinition,
- ) -> Result<String> {
+ fn create_filter(&self, user_id: &UserId, filter: &IncomingFilterDefinition) -> Result<String> {
let filter_id = utils::random_string(4);
let mut key = user_id.as_bytes().to_vec();
diff --git a/src/database/mod.rs b/src/database/mod.rs
index 6868467..8a7c78e 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -1,8 +1,16 @@
pub mod abstraction;
pub mod key_value;
-use crate::{utils, Config, Error, Result, service::{users, globals, uiaa, rooms::{self, state_compressor::CompressedStateEvent}, account_data, media, key_backups, transaction_ids, sending, appservice, pusher}, services, PduEvent, Services, SERVICES};
+use crate::{
+ service::{
+ account_data, appservice, globals, key_backups, media, pusher,
+ rooms::{self, state_compressor::CompressedStateEvent},
+ sending, transaction_ids, uiaa, users,
+ },
+ services, utils, Config, Error, PduEvent, Result, Services, SERVICES,
+};
use abstraction::KeyValueDatabaseEngine;
+use abstraction::KvTree;
use directories::ProjectDirs;
use futures_util::{stream::FuturesUnordered, StreamExt};
use lru_cache::LruCache;
@@ -12,7 +20,8 @@ use ruma::{
GlobalAccountDataEvent, GlobalAccountDataEventType, StateEventType,
},
push::Ruleset,
- DeviceId, EventId, RoomId, UserId, signatures::CanonicalJsonValue,
+ signatures::CanonicalJsonValue,
+ DeviceId, EventId, RoomId, UserId,
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
@@ -25,7 +34,6 @@ use std::{
};
use tokio::sync::{mpsc, OwnedRwLockReadGuard, RwLock as TokioRwLock, Semaphore};
use tracing::{debug, error, info, warn};
-use abstraction::KvTree;
pub struct KeyValueDatabase {
_db: Arc<dyn KeyValueDatabaseEngine>,
@@ -65,9 +73,9 @@ pub struct KeyValueDatabase {
pub(super) readreceiptid_readreceipt: Arc<dyn KvTree>, // ReadReceiptId = RoomId + Count + UserId
pub(super) roomuserid_privateread: Arc<dyn KvTree>, // RoomUserId = Room + User, PrivateRead = Count
pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count
- pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count
+ pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count
pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count
- pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId
+ pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId
pub(super) userid_lastpresenceupdate: Arc<dyn KvTree>, // LastPresenceUpdate = Count
//pub rooms: rooms::Rooms,
@@ -279,127 +287,126 @@ impl KeyValueDatabase {
let db = Arc::new(Self {
_db: builder.clone(),
- userid_password: builder.open_tree("userid_password")?,
- userid_displayname: builder.open_tree("userid_displayname")?,
- userid_avatarurl: builder.open_tree("userid_avatarurl")?,
- userid_blurhash: builder.open_tree("userid_blurhash")?,
- userdeviceid_token: builder.open_tree("userdeviceid_token")?,
- userdeviceid_metadata: builder.open_tree("userdeviceid_metadata")?,
- userid_devicelistversion: builder.open_tree("userid_devicelistversion")?,
- token_userdeviceid: builder.open_tree("token_userdeviceid")?,
- onetimekeyid_onetimekeys: builder.open_tree("onetimekeyid_onetimekeys")?,
- userid_lastonetimekeyupdate: builder.open_tree("userid_lastonetimekeyupdate")?,
- keychangeid_userid: builder.open_tree("keychangeid_userid")?,
- keyid_key: builder.open_tree("keyid_key")?,
- userid_masterkeyid: builder.open_tree("userid_masterkeyid")?,
- userid_selfsigningkeyid: builder.open_tree("userid_selfsigningkeyid")?,
- userid_usersigningkeyid: builder.open_tree("userid_usersigningkeyid")?,
- userfilterid_filter: builder.open_tree("userfilterid_filter")?,
- todeviceid_events: builder.open_tree("todeviceid_events")?,
-
- userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?,
- userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()),
- readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?,
- roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt
- roomuserid_lastprivatereadupdate: builder
- .open_tree("roomuserid_lastprivatereadupdate")?,
- typingid_userid: builder.open_tree("typingid_userid")?,
- roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?,
- presenceid_presence: builder.open_tree("presenceid_presence")?,
- userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?,
- pduid_pdu: builder.open_tree("pduid_pdu")?,
- eventid_pduid: builder.open_tree("eventid_pduid")?,
- roomid_pduleaves: builder.open_tree("roomid_pduleaves")?,
-
- alias_roomid: builder.open_tree("alias_roomid")?,
- aliasid_alias: builder.open_tree("aliasid_alias")?,
- publicroomids: builder.open_tree("publicroomids")?,
-
- tokenids: builder.open_tree("tokenids")?,
-
- roomserverids: builder.open_tree("roomserverids")?,
- serverroomids: builder.open_tree("serverroomids")?,
- userroomid_joined: builder.open_tree("userroomid_joined")?,
- roomuserid_joined: builder.open_tree("roomuserid_joined")?,
- roomid_joinedcount: builder.open_tree("roomid_joinedcount")?,
- roomid_invitedcount: builder.open_tree("roomid_invitedcount")?,
- roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?,
- userroomid_invitestate: builder.open_tree("userroomid_invitestate")?,
- roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?,
- userroomid_leftstate: builder.open_tree("userroomid_leftstate")?,
- roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?,
-
- disabledroomids: builder.open_tree("disabledroomids")?,
-
- lazyloadedids: builder.open_tree("lazyloadedids")?,
-
- userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?,
- userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?,
-
- statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?,
- shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?,
-
- shorteventid_authchain: builder.open_tree("shorteventid_authchain")?,
-
- roomid_shortroomid: builder.open_tree("roomid_shortroomid")?,
-
- shortstatehash_statediff: builder.open_tree("shortstatehash_statediff")?,
- eventid_shorteventid: builder.open_tree("eventid_shorteventid")?,
- shorteventid_eventid: builder.open_tree("shorteventid_eventid")?,
- shorteventid_shortstatehash: builder.open_tree("shorteventid_shortstatehash")?,
- roomid_shortstatehash: builder.open_tree("roomid_shortstatehash")?,
- roomsynctoken_shortstatehash: builder.open_tree("roomsynctoken_shortstatehash")?,
- statehash_shortstatehash: builder.open_tree("statehash_shortstatehash")?,
-
- eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
- softfailedeventids: builder.open_tree("softfailedeventids")?,
-
- referencedevents: builder.open_tree("referencedevents")?,
- roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
- roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?,
- mediaid_file: builder.open_tree("mediaid_file")?,
- backupid_algorithm: builder.open_tree("backupid_algorithm")?,
- backupid_etag: builder.open_tree("backupid_etag")?,
- backupkeyid_backup: builder.open_tree("backupkeyid_backup")?,
- userdevicetxnid_response: builder.open_tree("userdevicetxnid_response")?,
- servername_educount: builder.open_tree("servername_educount")?,
- servernameevent_data: builder.open_tree("servernameevent_data")?,
- servercurrentevent_data: builder.open_tree("servercurrentevent_data")?,
- id_appserviceregistrations: builder.open_tree("id_appserviceregistrations")?,
- senderkey_pusher: builder.open_tree("senderkey_pusher")?,
- global: builder.open_tree("global")?,
- server_signingkeys: builder.open_tree("server_signingkeys")?,
-
- cached_registrations: Arc::new(RwLock::new(HashMap::new())),
- pdu_cache: Mutex::new(LruCache::new(
- config
- .pdu_cache_capacity
- .try_into()
- .expect("pdu cache capacity fits into usize"),
- )),
- auth_chain_cache: Mutex::new(LruCache::new(
- (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
- )),
- shorteventid_cache: Mutex::new(LruCache::new(
- (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
- )),
- eventidshort_cache: Mutex::new(LruCache::new(
- (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
- )),
- shortstatekey_cache: Mutex::new(LruCache::new(
- (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
- )),
- statekeyshort_cache: Mutex::new(LruCache::new(
- (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
- )),
- our_real_users_cache: RwLock::new(HashMap::new()),
- appservice_in_room_cache: RwLock::new(HashMap::new()),
- lazy_load_waiting: Mutex::new(HashMap::new()),
- stateinfo_cache: Mutex::new(LruCache::new(
- (100.0 * config.conduit_cache_capacity_modifier) as usize,
- )),
- lasttimelinecount_cache: Mutex::new(HashMap::new()),
-
+ userid_password: builder.open_tree("userid_password")?,
+ userid_displayname: builder.open_tree("userid_displayname")?,
+ userid_avatarurl: builder.open_tree("userid_avatarurl")?,
+ userid_blurhash: builder.open_tree("userid_blurhash")?,
+ userdeviceid_token: builder.open_tree("userdeviceid_token")?,
+ userdeviceid_metadata: builder.open_tree("userdeviceid_metadata")?,
+ userid_devicelistversion: builder.open_tree("userid_devicelistversion")?,
+ token_userdeviceid: builder.open_tree("token_userdeviceid")?,
+ onetimekeyid_onetimekeys: builder.open_tree("onetimekeyid_onetimekeys")?,
+ userid_lastonetimekeyupdate: builder.open_tree("userid_lastonetimekeyupdate")?,
+ keychangeid_userid: builder.open_tree("keychangeid_userid")?,
+ keyid_key: builder.open_tree("keyid_key")?,
+ userid_masterkeyid: builder.open_tree("userid_masterkeyid")?,
+ userid_selfsigningkeyid: builder.open_tree("userid_selfsigningkeyid")?,
+ userid_usersigningkeyid: builder.open_tree("userid_usersigningkeyid")?,
+ userfilterid_filter: builder.open_tree("userfilterid_filter")?,
+ todeviceid_events: builder.open_tree("todeviceid_events")?,
+
+ userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?,
+ userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()),
+ readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?,
+ roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt
+ roomuserid_lastprivatereadupdate: builder
+ .open_tree("roomuserid_lastprivatereadupdate")?,
+ typingid_userid: builder.open_tree("typingid_userid")?,
+ roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?,
+ presenceid_presence: builder.open_tree("presenceid_presence")?,
+ userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?,
+ pduid_pdu: builder.open_tree("pduid_pdu")?,
+ eventid_pduid: builder.open_tree("eventid_pduid")?,
+ roomid_pduleaves: builder.open_tree("roomid_pduleaves")?,
+
+ alias_roomid: builder.open_tree("alias_roomid")?,
+ aliasid_alias: builder.open_tree("aliasid_alias")?,
+ publicroomids: builder.open_tree("publicroomids")?,
+
+ tokenids: builder.open_tree("tokenids")?,
+
+ roomserverids: builder.open_tree("roomserverids")?,
+ serverroomids: builder.open_tree("serverroomids")?,
+ userroomid_joined: builder.open_tree("userroomid_joined")?,
+ roomuserid_joined: builder.open_tree("roomuserid_joined")?,
+ roomid_joinedcount: builder.open_tree("roomid_joinedcount")?,
+ roomid_invitedcount: builder.open_tree("roomid_invitedcount")?,
+ roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?,
+ userroomid_invitestate: builder.open_tree("userroomid_invitestate")?,
+ roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?,
+ userroomid_leftstate: builder.open_tree("userroomid_leftstate")?,
+ roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?,
+
+ disabledroomids: builder.open_tree("disabledroomids")?,
+
+ lazyloadedids: builder.open_tree("lazyloadedids")?,
+
+ userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?,
+ userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?,
+
+ statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?,
+ shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?,
+
+ shorteventid_authchain: builder.open_tree("shorteventid_authchain")?,
+
+ roomid_shortroomid: builder.open_tree("roomid_shortroomid")?,
+
+ shortstatehash_statediff: builder.open_tree("shortstatehash_statediff")?,
+ eventid_shorteventid: builder.open_tree("eventid_shorteventid")?,
+ shorteventid_eventid: builder.open_tree("shorteventid_eventid")?,
+ shorteventid_shortstatehash: builder.open_tree("shorteventid_shortstatehash")?,
+ roomid_shortstatehash: builder.open_tree("roomid_shortstatehash")?,
+ roomsynctoken_shortstatehash: builder.open_tree("roomsynctoken_shortstatehash")?,
+ statehash_shortstatehash: builder.open_tree("statehash_shortstatehash")?,
+
+ eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
+ softfailedeventids: builder.open_tree("softfailedeventids")?,
+
+ referencedevents: builder.open_tree("referencedevents")?,
+ roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
+ roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?,
+ mediaid_file: builder.open_tree("mediaid_file")?,
+ backupid_algorithm: builder.open_tree("backupid_algorithm")?,
+ backupid_etag: builder.open_tree("backupid_etag")?,
+ backupkeyid_backup: builder.open_tree("backupkeyid_backup")?,
+ userdevicetxnid_response: builder.open_tree("userdevicetxnid_response")?,
+ servername_educount: builder.open_tree("servername_educount")?,
+ servernameevent_data: builder.open_tree("servernameevent_data")?,
+ servercurrentevent_data: builder.open_tree("servercurrentevent_data")?,
+ id_appserviceregistrations: builder.open_tree("id_appserviceregistrations")?,
+ senderkey_pusher: builder.open_tree("senderkey_pusher")?,
+ global: builder.open_tree("global")?,
+ server_signingkeys: builder.open_tree("server_signingkeys")?,
+
+ cached_registrations: Arc::new(RwLock::new(HashMap::new())),
+ pdu_cache: Mutex::new(LruCache::new(
+ config
+ .pdu_cache_capacity
+ .try_into()
+ .expect("pdu cache capacity fits into usize"),
+ )),
+ auth_chain_cache: Mutex::new(LruCache::new(
+ (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
+ )),
+ shorteventid_cache: Mutex::new(LruCache::new(
+ (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
+ )),
+ eventidshort_cache: Mutex::new(LruCache::new(
+ (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
+ )),
+ shortstatekey_cache: Mutex::new(LruCache::new(
+ (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
+ )),
+ statekeyshort_cache: Mutex::new(LruCache::new(
+ (100_000.0 * config.conduit_cache_capacity_modifier) as usize,
+ )),
+ our_real_users_cache: RwLock::new(HashMap::new()),
+ appservice_in_room_cache: RwLock::new(HashMap::new()),
+ lazy_load_waiting: Mutex::new(HashMap::new()),
+ stateinfo_cache: Mutex::new(LruCache::new(
+ (100.0 * config.conduit_cache_capacity_modifier) as usize,
+ )),
+ lasttimelinecount_cache: Mutex::new(HashMap::new()),
});
let services_raw = Box::new(Services::build(Arc::clone(&db), config)?);
@@ -407,7 +414,6 @@ impl KeyValueDatabase {
// This is the first and only time we initialize the SERVICE static
*SERVICES.write().unwrap() = Some(Box::leak(services_raw));
-
// Matrix resource ownership is based on the server name; changing it
// requires recreating the database from scratch.
if services().users.count()? > 0 {
@@ -570,7 +576,10 @@ impl KeyValueDatabase {
let states_parents = last_roomsstatehash.map_or_else(
|| Ok(Vec::new()),
|&last_roomsstatehash| {
- services().rooms.state_compressor.load_shortstatehash_info(dbg!(last_roomsstatehash))
+ services()
+ .rooms
+ .state_compressor
+ .load_shortstatehash_info(dbg!(last_roomsstatehash))
},
)?;
@@ -643,14 +652,15 @@ impl KeyValueDatabase {
current_state = HashSet::new();
current_sstatehash = Some(sstatehash);
- let event_id = db
- .shorteventid_eventid
- .get(&seventid)
- .unwrap()
- .unwrap();
+ let event_id = db.shorteventid_eventid.get(&seventid).unwrap().unwrap();
let string = utils::string_from_bytes(&event_id).unwrap();
let event_id = <&EventId>::try_from(string.as_str()).unwrap();
- let pdu = services().rooms.timeline.get_pdu(event_id).unwrap().unwrap();
+ let pdu = services()
+ .rooms
+ .timeline
+ .get_pdu(event_id)
+ .unwrap()
+ .unwrap();
if Some(&pdu.room_id) != current_room.as_ref() {
current_room = Some(pdu.room_id.clone());
@@ -764,8 +774,7 @@ impl KeyValueDatabase {
.peekable();
while iter.peek().is_some() {
- db.tokenids
- .insert_batch(&mut iter.by_ref().take(1000))?;
+ db.tokenids.insert_batch(&mut iter.by_ref().take(1000))?;
println!("smaller batch done");
}
@@ -803,8 +812,7 @@ impl KeyValueDatabase {
// Force E2EE device list updates so we can send them over federation
for user_id in services().users.iter().filter_map(|r| r.ok()) {
- services().users
- .mark_device_key_update(&user_id)?;
+ services().users.mark_device_key_update(&user_id)?;
}
services().globals.bump_database_version(10)?;
@@ -825,7 +833,8 @@ impl KeyValueDatabase {
info!(
"Loaded {} database with version {}",
- services().globals.config.database_backend, latest_database_version
+ services().globals.config.database_backend,
+ latest_database_version
);
} else {
services()
@@ -837,7 +846,8 @@ impl KeyValueDatabase {
warn!(
"Created new {} database with version {}",
- services().globals.config.database_backend, latest_database_version
+ services().globals.config.database_backend,
+ latest_database_version
);
}
@@ -862,9 +872,7 @@ impl KeyValueDatabase {
}
};
- services()
- .sending
- .start_handler(sending_receiver);
+ services().sending.start_handler(sending_receiver);
Self::start_cleanup_task().await;
@@ -898,7 +906,8 @@ impl KeyValueDatabase {
use std::time::{Duration, Instant};
- let timer_interval = Duration::from_secs(services().globals.config.cleanup_second_interval as u64);
+ let timer_interval =
+ Duration::from_secs(services().globals.config.cleanup_second_interval as u64);
tokio::spawn(async move {
let mut i = interval(timer_interval);
@@ -937,8 +946,10 @@ fn set_emergency_access() -> Result<bool> {
let conduit_user = UserId::parse_with_server_name("conduit", services().globals.server_name())
.expect("@conduit:server_name is a valid UserId");
- services().users
- .set_password(&conduit_user, services().globals.emergency_password().as_deref())?;
+ services().users.set_password(
+ &conduit_user,
+ services().globals.emergency_password().as_deref(),
+ )?;
let (ruleset, res) = match services().globals.emergency_password() {
Some(_) => (Ruleset::server_default(&conduit_user), Ok(true)),
@@ -951,7 +962,8 @@ fn set_emergency_access() -> Result<bool> {
GlobalAccountDataEventType::PushRules.to_string().into(),
&serde_json::to_value(&GlobalAccountDataEvent {
content: PushRulesEventContent { global: ruleset },
- }).expect("to json value always works"),
+ })
+ .expect("to json value always works"),
)?;
res