summaryrefslogtreecommitdiff
path: root/src/database/key_value/rooms/threads.rs
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-06-25 19:31:40 +0200
committerTimo Kösters <timo@koesters.xyz>2023-06-25 19:40:33 +0200
commitc7e0ea525a3c6f66072c3518bb8c533c87f1e3db (patch)
tree49632e0ab354a704c1e59bee220e3a9569a1a6c3 /src/database/key_value/rooms/threads.rs
parentdef079267d3a4255df2c3dd38ed317ca65df5416 (diff)
downloadconduit-c7e0ea525a3c6f66072c3518bb8c533c87f1e3db.zip
feat: WIP relationships and threads
Diffstat (limited to 'src/database/key_value/rooms/threads.rs')
-rw-r--r--src/database/key_value/rooms/threads.rs78
1 files changed, 78 insertions, 0 deletions
diff --git a/src/database/key_value/rooms/threads.rs b/src/database/key_value/rooms/threads.rs
new file mode 100644
index 0000000..4be289b
--- /dev/null
+++ b/src/database/key_value/rooms/threads.rs
@@ -0,0 +1,78 @@
+use std::mem;
+
+use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
+
+use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
+
+impl service::rooms::threads::Data for KeyValueDatabase {
+ fn threads_until<'a>(
+ &'a self,
+ user_id: &'a UserId,
+ room_id: &'a RoomId,
+ until: u64,
+ include: &'a IncludeThreads,
+ ) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>> {
+ let prefix = services()
+ .rooms
+ .short
+ .get_shortroomid(room_id)?
+ .expect("room exists")
+ .to_be_bytes()
+ .to_vec();
+
+ let mut current = prefix.clone();
+ current.extend_from_slice(&(until - 1).to_be_bytes());
+
+ Ok(Box::new(
+ self.threadid_userids
+ .iter_from(&current, true)
+ .take_while(move |(k, _)| k.starts_with(&prefix))
+ .map(move |(pduid, users)| {
+ let count = utils::u64_from_bytes(&pduid[(mem::size_of::<u64>())..])
+ .map_err(|_| Error::bad_database("Invalid pduid in threadid_userids."))?;
+ let mut pdu = services()
+ .rooms
+ .timeline
+ .get_pdu_from_id(&pduid)?
+ .ok_or_else(|| {
+ Error::bad_database("Invalid pduid reference in threadid_userids")
+ })?;
+ if pdu.sender != user_id {
+ pdu.remove_transaction_id()?;
+ }
+ Ok((count, pdu))
+ }),
+ ))
+ }
+
+ fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()> {
+ let users = participants
+ .iter()
+ .map(|user| user.as_bytes())
+ .collect::<Vec<_>>()
+ .join(&[0xff][..]);
+
+ self.threadid_userids.insert(&root_id, &users)?;
+
+ Ok(())
+ }
+
+ fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>> {
+ if let Some(users) = self.threadid_userids.get(&root_id)? {
+ Ok(Some(
+ users
+ .split(|b| *b == 0xff)
+ .map(|bytes| {
+ UserId::parse(utils::string_from_bytes(bytes).map_err(|_| {
+ Error::bad_database("Invalid UserId bytes in threadid_userids.")
+ })?)
+ .map_err(|_| Error::bad_database("Invalid UserId in threadid_userids."))
+ })
+ .filter_map(|r| r.ok())
+ .collect(),
+ ))
+ } else {
+ Ok(None)
+ }
+ }
+}