diff options
Diffstat (limited to 'src/database/key_value/rooms/timeline.rs')
-rw-r--r-- | src/database/key_value/rooms/timeline.rs | 281 |
1 files changed, 139 insertions, 142 deletions
diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 336317d..5ce2136 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -7,30 +7,10 @@ use tracing::error; use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; -impl service::rooms::timeline::Data for KeyValueDatabase { - fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> { - let prefix = services() - .rooms - .short - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - // Look for PDUs in that room. - self.pduid_pdu - .iter_from(&prefix, false) - .filter(|(k, _)| k.starts_with(&prefix)) - .map(|(_, pdu)| { - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid first PDU in db.")) - .map(Arc::new) - }) - .next() - .transpose() - } +use service::rooms::timeline::PduCount; - fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<u64> { +impl service::rooms::timeline::Data for KeyValueDatabase { + fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount> { match self .lasttimelinecount_cache .lock() @@ -39,20 +19,18 @@ impl service::rooms::timeline::Data for KeyValueDatabase { { hash_map::Entry::Vacant(v) => { if let Some(last_count) = self - .pdus_until(sender_user, room_id, u64::MAX)? - .filter_map(|r| { + .pdus_until(sender_user, room_id, PduCount::max())? + .find_map(|r| { // Filter out buggy events if r.is_err() { error!("Bad pdu in pdus_since: {:?}", r); } r.ok() }) - .map(|(pduid, _)| self.pdu_count(&pduid)) - .next() { - Ok(*v.insert(last_count?)) + Ok(*v.insert(last_count.0)) } else { - Ok(0) + Ok(PduCount::Normal(0)) } } hash_map::Entry::Occupied(o) => Ok(*o.get()), @@ -60,29 +38,28 @@ impl service::rooms::timeline::Data for KeyValueDatabase { } /// Returns the `count` of this pdu's id. - fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> { - self.eventid_pduid + fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<PduCount>> { + Ok(self + .eventid_pduid .get(event_id.as_bytes())? - .map(|pdu_id| self.pdu_count(&pdu_id)) - .transpose() + .map(|pdu_id| pdu_count(&pdu_id)) + .transpose()?) } /// Returns the json of a pdu. fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> { - self.eventid_pduid - .get(event_id.as_bytes())? - .map_or_else( - || self.eventid_outlierpdu.get(event_id.as_bytes()), - |pduid| { - Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| { - Error::bad_database("Invalid pduid in eventid_pduid.") - })?)) - }, - )? - .map(|pdu| { - serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) - }) - .transpose() + self.get_non_outlier_pdu_json(event_id)?.map_or_else( + || { + self.eventid_outlierpdu + .get(event_id.as_bytes())? + .map(|pdu| { + serde_json::from_slice(&pdu) + .map_err(|_| Error::bad_database("Invalid PDU in db.")) + }) + .transpose() + }, + |x| Ok(Some(x)), + ) } /// Returns the json of a pdu. @@ -103,7 +80,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { /// Returns the pdu's id. fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> { - self.eventid_pduid.get(event_id.as_bytes()) + Ok(self.eventid_pduid.get(event_id.as_bytes())?) } /// Returns the pdu. @@ -133,22 +110,20 @@ impl service::rooms::timeline::Data for KeyValueDatabase { } if let Some(pdu) = self - .eventid_pduid - .get(event_id.as_bytes())? + .get_non_outlier_pdu(event_id)? .map_or_else( - || self.eventid_outlierpdu.get(event_id.as_bytes()), - |pduid| { - Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| { - Error::bad_database("Invalid pduid in eventid_pduid.") - })?)) + || { + self.eventid_outlierpdu + .get(event_id.as_bytes())? + .map(|pdu| { + serde_json::from_slice(&pdu) + .map_err(|_| Error::bad_database("Invalid PDU in db.")) + }) + .transpose() }, + |x| Ok(Some(x)), )? - .map(|pdu| { - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid PDU in db.")) - .map(Arc::new) - }) - .transpose()? + .map(Arc::new) { self.pdu_cache .lock() @@ -182,12 +157,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase { }) } - /// Returns the `count` of this pdu's id. - fn pdu_count(&self, pdu_id: &[u8]) -> Result<u64> { - utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..]) - .map_err(|_| Error::bad_database("PDU has invalid count bytes.")) - } - fn append_pdu( &self, pdu_id: &[u8], @@ -203,7 +172,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { self.lasttimelinecount_cache .lock() .unwrap() - .insert(pdu.room_id.clone(), count); + .insert(pdu.room_id.clone(), PduCount::Normal(count)); self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id)?; self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?; @@ -211,57 +180,48 @@ impl service::rooms::timeline::Data for KeyValueDatabase { Ok(()) } + fn prepend_backfill_pdu( + &self, + pdu_id: &[u8], + event_id: &EventId, + json: &CanonicalJsonObject, + ) -> Result<()> { + self.pduid_pdu.insert( + pdu_id, + &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"), + )?; + + self.eventid_pduid.insert(event_id.as_bytes(), pdu_id)?; + self.eventid_outlierpdu.remove(event_id.as_bytes())?; + + Ok(()) + } + /// Removes a pdu and creates a new one with the same id. - fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> { + fn replace_pdu( + &self, + pdu_id: &[u8], + pdu_json: &CanonicalJsonObject, + pdu: &PduEvent, + ) -> Result<()> { if self.pduid_pdu.get(pdu_id)?.is_some() { self.pduid_pdu.insert( pdu_id, - &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"), + &serde_json::to_vec(pdu_json).expect("CanonicalJsonObject is always a valid"), )?; - Ok(()) } else { - Err(Error::BadRequest( + return Err(Error::BadRequest( ErrorKind::NotFound, "PDU does not exist.", - )) + )); } - } - - /// Returns an iterator over all events in a room that happened after the event with id `since` - /// in chronological order. - fn pdus_since<'a>( - &'a self, - user_id: &UserId, - room_id: &RoomId, - since: u64, - ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> { - let prefix = services() - .rooms - .short - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - // Skip the first pdu if it's exactly at since, because we sent that last time - let mut first_pdu_id = prefix.clone(); - first_pdu_id.extend_from_slice(&(since + 1).to_be_bytes()); - let user_id = user_id.to_owned(); + self.pdu_cache + .lock() + .unwrap() + .remove(&(*pdu.event_id).to_owned()); - Ok(Box::new( - self.pduid_pdu - .iter_from(&first_pdu_id, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::<PduEvent>(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - }), - )) + Ok(()) } /// Returns an iterator over all events and their tokens in a room that happened before the @@ -270,27 +230,15 @@ impl service::rooms::timeline::Data for KeyValueDatabase { &'a self, user_id: &UserId, room_id: &RoomId, - until: u64, - ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> { - // Create the first part of the full pdu id - let prefix = services() - .rooms - .short - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - let mut current = prefix.clone(); - current.extend_from_slice(&(until.saturating_sub(1)).to_be_bytes()); // -1 because we don't want event at `until` - - let current: &[u8] = ¤t; + until: PduCount, + ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> { + let (prefix, current) = count_to_id(&room_id, until, 1, true)?; let user_id = user_id.to_owned(); Ok(Box::new( self.pduid_pdu - .iter_from(current, true) + .iter_from(¤t, true) .take_while(move |(k, _)| k.starts_with(&prefix)) .map(move |(pdu_id, v)| { let mut pdu = serde_json::from_slice::<PduEvent>(&v) @@ -298,7 +246,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase { if pdu.sender != user_id { pdu.remove_transaction_id()?; } - Ok((pdu_id, pdu)) + pdu.add_age()?; + let count = pdu_count(&pdu_id)?; + Ok((count, pdu)) }), )) } @@ -307,27 +257,15 @@ impl service::rooms::timeline::Data for KeyValueDatabase { &'a self, user_id: &UserId, room_id: &RoomId, - from: u64, - ) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> { - // Create the first part of the full pdu id - let prefix = services() - .rooms - .short - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - let mut current = prefix.clone(); - current.extend_from_slice(&(from + 1).to_be_bytes()); // +1 so we don't send the base event - - let current: &[u8] = ¤t; + from: PduCount, + ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> { + let (prefix, current) = count_to_id(&room_id, from, 1, false)?; let user_id = user_id.to_owned(); Ok(Box::new( self.pduid_pdu - .iter_from(current, false) + .iter_from(¤t, false) .take_while(move |(k, _)| k.starts_with(&prefix)) .map(move |(pdu_id, v)| { let mut pdu = serde_json::from_slice::<PduEvent>(&v) @@ -335,7 +273,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase { if pdu.sender != user_id { pdu.remove_transaction_id()?; } - Ok((pdu_id, pdu)) + pdu.add_age()?; + let count = pdu_count(&pdu_id)?; + Ok((count, pdu)) }), )) } @@ -368,3 +308,60 @@ impl service::rooms::timeline::Data for KeyValueDatabase { Ok(()) } } + +/// Returns the `count` of this pdu's id. +fn pdu_count(pdu_id: &[u8]) -> Result<PduCount> { + let last_u64 = utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..]) + .map_err(|_| Error::bad_database("PDU has invalid count bytes."))?; + let second_last_u64 = utils::u64_from_bytes( + &pdu_id[pdu_id.len() - 2 * size_of::<u64>()..pdu_id.len() - size_of::<u64>()], + ); + + if matches!(second_last_u64, Ok(0)) { + Ok(PduCount::Backfilled(u64::MAX - last_u64)) + } else { + Ok(PduCount::Normal(last_u64)) + } +} + +fn count_to_id( + room_id: &RoomId, + count: PduCount, + offset: u64, + subtract: bool, +) -> Result<(Vec<u8>, Vec<u8>)> { + let prefix = services() + .rooms + .short + .get_shortroomid(room_id)? + .expect("room exists") + .to_be_bytes() + .to_vec(); + let mut pdu_id = prefix.clone(); + // +1 so we don't send the base event + let count_raw = match count { + PduCount::Normal(x) => { + if subtract { + x - offset + } else { + x + offset + } + } + PduCount::Backfilled(x) => { + pdu_id.extend_from_slice(&0_u64.to_be_bytes()); + let num = u64::MAX - x; + if subtract { + if num > 0 { + num - offset + } else { + num + } + } else { + num + offset + } + } + }; + pdu_id.extend_from_slice(&count_raw.to_be_bytes()); + + Ok((prefix, pdu_id)) +} |