summaryrefslogtreecommitdiff
path: root/src/database/key_value
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-06-25 19:31:40 +0200
committerTimo Kösters <timo@koesters.xyz>2023-06-25 19:40:33 +0200
commitc7e0ea525a3c6f66072c3518bb8c533c87f1e3db (patch)
tree49632e0ab354a704c1e59bee220e3a9569a1a6c3 /src/database/key_value
parentdef079267d3a4255df2c3dd38ed317ca65df5416 (diff)
downloadconduit-c7e0ea525a3c6f66072c3518bb8c533c87f1e3db.zip
feat: WIP relationships and threads
Diffstat (limited to 'src/database/key_value')
-rw-r--r--src/database/key_value/rooms/mod.rs1
-rw-r--r--src/database/key_value/rooms/pdu_metadata.rs7
-rw-r--r--src/database/key_value/rooms/threads.rs78
-rw-r--r--src/database/key_value/rooms/timeline.rs21
4 files changed, 102 insertions, 5 deletions
diff --git a/src/database/key_value/rooms/mod.rs b/src/database/key_value/rooms/mod.rs
index 406943e..e7b53d3 100644
--- a/src/database/key_value/rooms/mod.rs
+++ b/src/database/key_value/rooms/mod.rs
@@ -12,6 +12,7 @@ mod state;
mod state_accessor;
mod state_cache;
mod state_compressor;
+mod threads;
mod timeline;
mod user;
diff --git a/src/database/key_value/rooms/pdu_metadata.rs b/src/database/key_value/rooms/pdu_metadata.rs
index 76ec734..4b3f810 100644
--- a/src/database/key_value/rooms/pdu_metadata.rs
+++ b/src/database/key_value/rooms/pdu_metadata.rs
@@ -5,6 +5,13 @@ use ruma::{EventId, RoomId};
use crate::{database::KeyValueDatabase, service, Result};
impl service::rooms::pdu_metadata::Data for KeyValueDatabase {
+ fn add_relation(&self, from: u64, to: u64) -> Result<()> {
+ let mut key = from.to_be_bytes().to_vec();
+ key.extend_from_slice(&to.to_be_bytes());
+ self.fromto_relation.insert(&key, &[])?;
+ Ok(())
+ }
+
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> {
for prev in event_ids {
let mut key = room_id.as_bytes().to_vec();
diff --git a/src/database/key_value/rooms/threads.rs b/src/database/key_value/rooms/threads.rs
new file mode 100644
index 0000000..4be289b
--- /dev/null
+++ b/src/database/key_value/rooms/threads.rs
@@ -0,0 +1,78 @@
+use std::mem;
+
+use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
+
+use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
+
+impl service::rooms::threads::Data for KeyValueDatabase {
+ fn threads_until<'a>(
+ &'a self,
+ user_id: &'a UserId,
+ room_id: &'a RoomId,
+ until: u64,
+ include: &'a IncludeThreads,
+ ) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>> {
+ let prefix = services()
+ .rooms
+ .short
+ .get_shortroomid(room_id)?
+ .expect("room exists")
+ .to_be_bytes()
+ .to_vec();
+
+ let mut current = prefix.clone();
+ current.extend_from_slice(&(until - 1).to_be_bytes());
+
+ Ok(Box::new(
+ self.threadid_userids
+ .iter_from(&current, true)
+ .take_while(move |(k, _)| k.starts_with(&prefix))
+ .map(move |(pduid, users)| {
+ let count = utils::u64_from_bytes(&pduid[(mem::size_of::<u64>())..])
+ .map_err(|_| Error::bad_database("Invalid pduid in threadid_userids."))?;
+ let mut pdu = services()
+ .rooms
+ .timeline
+ .get_pdu_from_id(&pduid)?
+ .ok_or_else(|| {
+ Error::bad_database("Invalid pduid reference in threadid_userids")
+ })?;
+ if pdu.sender != user_id {
+ pdu.remove_transaction_id()?;
+ }
+ Ok((count, pdu))
+ }),
+ ))
+ }
+
+ fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()> {
+ let users = participants
+ .iter()
+ .map(|user| user.as_bytes())
+ .collect::<Vec<_>>()
+ .join(&[0xff][..]);
+
+ self.threadid_userids.insert(&root_id, &users)?;
+
+ Ok(())
+ }
+
+ fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>> {
+ if let Some(users) = self.threadid_userids.get(&root_id)? {
+ Ok(Some(
+ users
+ .split(|b| *b == 0xff)
+ .map(|bytes| {
+ UserId::parse(utils::string_from_bytes(bytes).map_err(|_| {
+ Error::bad_database("Invalid UserId bytes in threadid_userids.")
+ })?)
+ .map_err(|_| Error::bad_database("Invalid UserId in threadid_userids."))
+ })
+ .filter_map(|r| r.ok())
+ .collect(),
+ ))
+ } else {
+ Ok(None)
+ }
+ }
+}
diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs
index d9c4423..74e3e5c 100644
--- a/src/database/key_value/rooms/timeline.rs
+++ b/src/database/key_value/rooms/timeline.rs
@@ -198,19 +198,30 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
}
/// Removes a pdu and creates a new one with the same id.
- fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> {
+ fn replace_pdu(
+ &self,
+ pdu_id: &[u8],
+ pdu_json: &CanonicalJsonObject,
+ pdu: &PduEvent,
+ ) -> Result<()> {
if self.pduid_pdu.get(pdu_id)?.is_some() {
self.pduid_pdu.insert(
pdu_id,
- &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"),
+ &serde_json::to_vec(pdu_json).expect("CanonicalJsonObject is always a valid"),
)?;
- Ok(())
} else {
- Err(Error::BadRequest(
+ return Err(Error::BadRequest(
ErrorKind::NotFound,
"PDU does not exist.",
- ))
+ ));
}
+
+ self.pdu_cache
+ .lock()
+ .unwrap()
+ .remove(&(*pdu.event_id).to_owned());
+
+ Ok(())
}
/// Returns an iterator over all events and their tokens in a room that happened before the