summaryrefslogtreecommitdiff
path: root/src/database/key_value/rooms/timeline.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/database/key_value/rooms/timeline.rs')
-rw-r--r--src/database/key_value/rooms/timeline.rs281
1 files changed, 139 insertions, 142 deletions
diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs
index 336317d..5ce2136 100644
--- a/src/database/key_value/rooms/timeline.rs
+++ b/src/database/key_value/rooms/timeline.rs
@@ -7,30 +7,10 @@ use tracing::error;
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
-impl service::rooms::timeline::Data for KeyValueDatabase {
- fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
- let prefix = services()
- .rooms
- .short
- .get_shortroomid(room_id)?
- .expect("room exists")
- .to_be_bytes()
- .to_vec();
-
- // Look for PDUs in that room.
- self.pduid_pdu
- .iter_from(&prefix, false)
- .filter(|(k, _)| k.starts_with(&prefix))
- .map(|(_, pdu)| {
- serde_json::from_slice(&pdu)
- .map_err(|_| Error::bad_database("Invalid first PDU in db."))
- .map(Arc::new)
- })
- .next()
- .transpose()
- }
+use service::rooms::timeline::PduCount;
- fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<u64> {
+impl service::rooms::timeline::Data for KeyValueDatabase {
+ fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount> {
match self
.lasttimelinecount_cache
.lock()
@@ -39,20 +19,18 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
{
hash_map::Entry::Vacant(v) => {
if let Some(last_count) = self
- .pdus_until(sender_user, room_id, u64::MAX)?
- .filter_map(|r| {
+ .pdus_until(sender_user, room_id, PduCount::max())?
+ .find_map(|r| {
// Filter out buggy events
if r.is_err() {
error!("Bad pdu in pdus_since: {:?}", r);
}
r.ok()
})
- .map(|(pduid, _)| self.pdu_count(&pduid))
- .next()
{
- Ok(*v.insert(last_count?))
+ Ok(*v.insert(last_count.0))
} else {
- Ok(0)
+ Ok(PduCount::Normal(0))
}
}
hash_map::Entry::Occupied(o) => Ok(*o.get()),
@@ -60,29 +38,28 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
}
/// Returns the `count` of this pdu's id.
- fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> {
- self.eventid_pduid
+ fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<PduCount>> {
+ Ok(self
+ .eventid_pduid
.get(event_id.as_bytes())?
- .map(|pdu_id| self.pdu_count(&pdu_id))
- .transpose()
+ .map(|pdu_id| pdu_count(&pdu_id))
+ .transpose()?)
}
/// Returns the json of a pdu.
fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
- self.eventid_pduid
- .get(event_id.as_bytes())?
- .map_or_else(
- || self.eventid_outlierpdu.get(event_id.as_bytes()),
- |pduid| {
- Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
- Error::bad_database("Invalid pduid in eventid_pduid.")
- })?))
- },
- )?
- .map(|pdu| {
- serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
- })
- .transpose()
+ self.get_non_outlier_pdu_json(event_id)?.map_or_else(
+ || {
+ self.eventid_outlierpdu
+ .get(event_id.as_bytes())?
+ .map(|pdu| {
+ serde_json::from_slice(&pdu)
+ .map_err(|_| Error::bad_database("Invalid PDU in db."))
+ })
+ .transpose()
+ },
+ |x| Ok(Some(x)),
+ )
}
/// Returns the json of a pdu.
@@ -103,7 +80,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
/// Returns the pdu's id.
fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> {
- self.eventid_pduid.get(event_id.as_bytes())
+ Ok(self.eventid_pduid.get(event_id.as_bytes())?)
}
/// Returns the pdu.
@@ -133,22 +110,20 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
}
if let Some(pdu) = self
- .eventid_pduid
- .get(event_id.as_bytes())?
+ .get_non_outlier_pdu(event_id)?
.map_or_else(
- || self.eventid_outlierpdu.get(event_id.as_bytes()),
- |pduid| {
- Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
- Error::bad_database("Invalid pduid in eventid_pduid.")
- })?))
+ || {
+ self.eventid_outlierpdu
+ .get(event_id.as_bytes())?
+ .map(|pdu| {
+ serde_json::from_slice(&pdu)
+ .map_err(|_| Error::bad_database("Invalid PDU in db."))
+ })
+ .transpose()
},
+ |x| Ok(Some(x)),
)?
- .map(|pdu| {
- serde_json::from_slice(&pdu)
- .map_err(|_| Error::bad_database("Invalid PDU in db."))
- .map(Arc::new)
- })
- .transpose()?
+ .map(Arc::new)
{
self.pdu_cache
.lock()
@@ -182,12 +157,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
})
}
- /// Returns the `count` of this pdu's id.
- fn pdu_count(&self, pdu_id: &[u8]) -> Result<u64> {
- utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..])
- .map_err(|_| Error::bad_database("PDU has invalid count bytes."))
- }
-
fn append_pdu(
&self,
pdu_id: &[u8],
@@ -203,7 +172,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
self.lasttimelinecount_cache
.lock()
.unwrap()
- .insert(pdu.room_id.clone(), count);
+ .insert(pdu.room_id.clone(), PduCount::Normal(count));
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id)?;
self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?;
@@ -211,57 +180,48 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
Ok(())
}
+ fn prepend_backfill_pdu(
+ &self,
+ pdu_id: &[u8],
+ event_id: &EventId,
+ json: &CanonicalJsonObject,
+ ) -> Result<()> {
+ self.pduid_pdu.insert(
+ pdu_id,
+ &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"),
+ )?;
+
+ self.eventid_pduid.insert(event_id.as_bytes(), pdu_id)?;
+ self.eventid_outlierpdu.remove(event_id.as_bytes())?;
+
+ Ok(())
+ }
+
/// Removes a pdu and creates a new one with the same id.
- fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> {
+ fn replace_pdu(
+ &self,
+ pdu_id: &[u8],
+ pdu_json: &CanonicalJsonObject,
+ pdu: &PduEvent,
+ ) -> Result<()> {
if self.pduid_pdu.get(pdu_id)?.is_some() {
self.pduid_pdu.insert(
pdu_id,
- &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"),
+ &serde_json::to_vec(pdu_json).expect("CanonicalJsonObject is always a valid"),
)?;
- Ok(())
} else {
- Err(Error::BadRequest(
+ return Err(Error::BadRequest(
ErrorKind::NotFound,
"PDU does not exist.",
- ))
+ ));
}
- }
-
- /// Returns an iterator over all events in a room that happened after the event with id `since`
- /// in chronological order.
- fn pdus_since<'a>(
- &'a self,
- user_id: &UserId,
- room_id: &RoomId,
- since: u64,
- ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> {
- let prefix = services()
- .rooms
- .short
- .get_shortroomid(room_id)?
- .expect("room exists")
- .to_be_bytes()
- .to_vec();
-
- // Skip the first pdu if it's exactly at since, because we sent that last time
- let mut first_pdu_id = prefix.clone();
- first_pdu_id.extend_from_slice(&(since + 1).to_be_bytes());
- let user_id = user_id.to_owned();
+ self.pdu_cache
+ .lock()
+ .unwrap()
+ .remove(&(*pdu.event_id).to_owned());
- Ok(Box::new(
- self.pduid_pdu
- .iter_from(&first_pdu_id, false)
- .take_while(move |(k, _)| k.starts_with(&prefix))
- .map(move |(pdu_id, v)| {
- let mut pdu = serde_json::from_slice::<PduEvent>(&v)
- .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
- if pdu.sender != user_id {
- pdu.remove_transaction_id()?;
- }
- Ok((pdu_id, pdu))
- }),
- ))
+ Ok(())
}
/// Returns an iterator over all events and their tokens in a room that happened before the
@@ -270,27 +230,15 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
&'a self,
user_id: &UserId,
room_id: &RoomId,
- until: u64,
- ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> {
- // Create the first part of the full pdu id
- let prefix = services()
- .rooms
- .short
- .get_shortroomid(room_id)?
- .expect("room exists")
- .to_be_bytes()
- .to_vec();
-
- let mut current = prefix.clone();
- current.extend_from_slice(&(until.saturating_sub(1)).to_be_bytes()); // -1 because we don't want event at `until`
-
- let current: &[u8] = &current;
+ until: PduCount,
+ ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
+ let (prefix, current) = count_to_id(&room_id, until, 1, true)?;
let user_id = user_id.to_owned();
Ok(Box::new(
self.pduid_pdu
- .iter_from(current, true)
+ .iter_from(&current, true)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
@@ -298,7 +246,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
- Ok((pdu_id, pdu))
+ pdu.add_age()?;
+ let count = pdu_count(&pdu_id)?;
+ Ok((count, pdu))
}),
))
}
@@ -307,27 +257,15 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
&'a self,
user_id: &UserId,
room_id: &RoomId,
- from: u64,
- ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> {
- // Create the first part of the full pdu id
- let prefix = services()
- .rooms
- .short
- .get_shortroomid(room_id)?
- .expect("room exists")
- .to_be_bytes()
- .to_vec();
-
- let mut current = prefix.clone();
- current.extend_from_slice(&(from + 1).to_be_bytes()); // +1 so we don't send the base event
-
- let current: &[u8] = &current;
+ from: PduCount,
+ ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
+ let (prefix, current) = count_to_id(&room_id, from, 1, false)?;
let user_id = user_id.to_owned();
Ok(Box::new(
self.pduid_pdu
- .iter_from(current, false)
+ .iter_from(&current, false)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
@@ -335,7 +273,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
- Ok((pdu_id, pdu))
+ pdu.add_age()?;
+ let count = pdu_count(&pdu_id)?;
+ Ok((count, pdu))
}),
))
}
@@ -368,3 +308,60 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
Ok(())
}
}
+
+/// Returns the `count` of this pdu's id.
+fn pdu_count(pdu_id: &[u8]) -> Result<PduCount> {
+ let last_u64 = utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..])
+ .map_err(|_| Error::bad_database("PDU has invalid count bytes."))?;
+ let second_last_u64 = utils::u64_from_bytes(
+ &pdu_id[pdu_id.len() - 2 * size_of::<u64>()..pdu_id.len() - size_of::<u64>()],
+ );
+
+ if matches!(second_last_u64, Ok(0)) {
+ Ok(PduCount::Backfilled(u64::MAX - last_u64))
+ } else {
+ Ok(PduCount::Normal(last_u64))
+ }
+}
+
+fn count_to_id(
+ room_id: &RoomId,
+ count: PduCount,
+ offset: u64,
+ subtract: bool,
+) -> Result<(Vec<u8>, Vec<u8>)> {
+ let prefix = services()
+ .rooms
+ .short
+ .get_shortroomid(room_id)?
+ .expect("room exists")
+ .to_be_bytes()
+ .to_vec();
+ let mut pdu_id = prefix.clone();
+ // +1 so we don't send the base event
+ let count_raw = match count {
+ PduCount::Normal(x) => {
+ if subtract {
+ x - offset
+ } else {
+ x + offset
+ }
+ }
+ PduCount::Backfilled(x) => {
+ pdu_id.extend_from_slice(&0_u64.to_be_bytes());
+ let num = u64::MAX - x;
+ if subtract {
+ if num > 0 {
+ num - offset
+ } else {
+ num
+ }
+ } else {
+ num + offset
+ }
+ }
+ };
+ pdu_id.extend_from_slice(&count_raw.to_be_bytes());
+
+ Ok((prefix, pdu_id))
+}