diff options
author | Timo Kösters <timo@koesters.xyz> | 2023-06-25 19:31:40 +0200 |
---|---|---|
committer | Timo Kösters <timo@koesters.xyz> | 2023-06-25 19:40:33 +0200 |
commit | c7e0ea525a3c6f66072c3518bb8c533c87f1e3db (patch) | |
tree | 49632e0ab354a704c1e59bee220e3a9569a1a6c3 /src/database | |
parent | def079267d3a4255df2c3dd38ed317ca65df5416 (diff) | |
download | conduit-c7e0ea525a3c6f66072c3518bb8c533c87f1e3db.zip |
feat: WIP relationships and threads
Diffstat (limited to 'src/database')
-rw-r--r-- | src/database/key_value/rooms/mod.rs | 1 | ||||
-rw-r--r-- | src/database/key_value/rooms/pdu_metadata.rs | 7 | ||||
-rw-r--r-- | src/database/key_value/rooms/threads.rs | 78 | ||||
-rw-r--r-- | src/database/key_value/rooms/timeline.rs | 21 | ||||
-rw-r--r-- | src/database/mod.rs | 7 |
5 files changed, 109 insertions, 5 deletions
diff --git a/src/database/key_value/rooms/mod.rs b/src/database/key_value/rooms/mod.rs index 406943e..e7b53d3 100644 --- a/src/database/key_value/rooms/mod.rs +++ b/src/database/key_value/rooms/mod.rs @@ -12,6 +12,7 @@ mod state; mod state_accessor; mod state_cache; mod state_compressor; +mod threads; mod timeline; mod user; diff --git a/src/database/key_value/rooms/pdu_metadata.rs b/src/database/key_value/rooms/pdu_metadata.rs index 76ec734..4b3f810 100644 --- a/src/database/key_value/rooms/pdu_metadata.rs +++ b/src/database/key_value/rooms/pdu_metadata.rs @@ -5,6 +5,13 @@ use ruma::{EventId, RoomId}; use crate::{database::KeyValueDatabase, service, Result}; impl service::rooms::pdu_metadata::Data for KeyValueDatabase { + fn add_relation(&self, from: u64, to: u64) -> Result<()> { + let mut key = from.to_be_bytes().to_vec(); + key.extend_from_slice(&to.to_be_bytes()); + self.fromto_relation.insert(&key, &[])?; + Ok(()) + } + fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> { for prev in event_ids { let mut key = room_id.as_bytes().to_vec(); diff --git a/src/database/key_value/rooms/threads.rs b/src/database/key_value/rooms/threads.rs new file mode 100644 index 0000000..4be289b --- /dev/null +++ b/src/database/key_value/rooms/threads.rs @@ -0,0 +1,78 @@ +use std::mem; + +use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId}; + +use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; + +impl service::rooms::threads::Data for KeyValueDatabase { + fn threads_until<'a>( + &'a self, + user_id: &'a UserId, + room_id: &'a RoomId, + until: u64, + include: &'a IncludeThreads, + ) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>> { + let prefix = services() + .rooms + .short + .get_shortroomid(room_id)? + .expect("room exists") + .to_be_bytes() + .to_vec(); + + let mut current = prefix.clone(); + current.extend_from_slice(&(until - 1).to_be_bytes()); + + Ok(Box::new( + self.threadid_userids + .iter_from(¤t, true) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pduid, users)| { + let count = utils::u64_from_bytes(&pduid[(mem::size_of::<u64>())..]) + .map_err(|_| Error::bad_database("Invalid pduid in threadid_userids."))?; + let mut pdu = services() + .rooms + .timeline + .get_pdu_from_id(&pduid)? + .ok_or_else(|| { + Error::bad_database("Invalid pduid reference in threadid_userids") + })?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + Ok((count, pdu)) + }), + )) + } + + fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()> { + let users = participants + .iter() + .map(|user| user.as_bytes()) + .collect::<Vec<_>>() + .join(&[0xff][..]); + + self.threadid_userids.insert(&root_id, &users)?; + + Ok(()) + } + + fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>> { + if let Some(users) = self.threadid_userids.get(&root_id)? { + Ok(Some( + users + .split(|b| *b == 0xff) + .map(|bytes| { + UserId::parse(utils::string_from_bytes(bytes).map_err(|_| { + Error::bad_database("Invalid UserId bytes in threadid_userids.") + })?) + .map_err(|_| Error::bad_database("Invalid UserId in threadid_userids.")) + }) + .filter_map(|r| r.ok()) + .collect(), + )) + } else { + Ok(None) + } + } +} diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index d9c4423..74e3e5c 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -198,19 +198,30 @@ impl service::rooms::timeline::Data for KeyValueDatabase { } /// Removes a pdu and creates a new one with the same id. - fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> { + fn replace_pdu( + &self, + pdu_id: &[u8], + pdu_json: &CanonicalJsonObject, + pdu: &PduEvent, + ) -> Result<()> { if self.pduid_pdu.get(pdu_id)?.is_some() { self.pduid_pdu.insert( pdu_id, - &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"), + &serde_json::to_vec(pdu_json).expect("CanonicalJsonObject is always a valid"), )?; - Ok(()) } else { - Err(Error::BadRequest( + return Err(Error::BadRequest( ErrorKind::NotFound, "PDU does not exist.", - )) + )); } + + self.pdu_cache + .lock() + .unwrap() + .remove(&(*pdu.event_id).to_owned()); + + Ok(()) } /// Returns an iterator over all events and their tokens in a room that happened before the diff --git a/src/database/mod.rs b/src/database/mod.rs index 1415f68..b864ceb 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -80,6 +80,8 @@ pub struct KeyValueDatabase { pub(super) aliasid_alias: Arc<dyn KvTree>, // AliasId = RoomId + Count pub(super) publicroomids: Arc<dyn KvTree>, + pub(super) threadid_userids: Arc<dyn KvTree>, // ThreadId = RoomId + Count + pub(super) tokenids: Arc<dyn KvTree>, // TokenId = ShortRoomId + Token + PduIdCount /// Participating servers in a room. @@ -128,6 +130,8 @@ pub struct KeyValueDatabase { pub(super) eventid_outlierpdu: Arc<dyn KvTree>, pub(super) softfailedeventids: Arc<dyn KvTree>, + /// ShortEventId + ShortEventId -> (). + pub(super) fromto_relation: Arc<dyn KvTree>, /// RoomId + EventId -> Parent PDU EventId. pub(super) referencedevents: Arc<dyn KvTree>, @@ -302,6 +306,8 @@ impl KeyValueDatabase { aliasid_alias: builder.open_tree("aliasid_alias")?, publicroomids: builder.open_tree("publicroomids")?, + threadid_userids: builder.open_tree("threadid_userids")?, + tokenids: builder.open_tree("tokenids")?, roomserverids: builder.open_tree("roomserverids")?, @@ -342,6 +348,7 @@ impl KeyValueDatabase { eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, softfailedeventids: builder.open_tree("softfailedeventids")?, + fromto_relation: builder.open_tree("fromto_relation")?, referencedevents: builder.open_tree("referencedevents")?, roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?, |