summaryrefslogtreecommitdiff
path: root/src/service/pdu.rs
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2022-12-21 15:51:47 +0100
committerTimo Kösters <timo@koesters.xyz>2022-12-21 15:51:47 +0100
commit53f14a2c4c216b529cc63137d8704573197aed19 (patch)
tree866d747ed9345f04e7f31c10f7f4e045af29be2b /src/service/pdu.rs
parentdf16012661b0ec958be41f5a52b4f9bb931dc3ff (diff)
parentd20f21ae32061ef81feb9786ca78fbb1345c9a3a (diff)
downloadconduit-53f14a2c4c216b529cc63137d8704573197aed19.zip
Merge remote-tracking branch 'origin/next'
Diffstat (limited to 'src/service/pdu.rs')
-rw-r--r--src/service/pdu.rs365
1 files changed, 365 insertions, 0 deletions
diff --git a/src/service/pdu.rs b/src/service/pdu.rs
new file mode 100644
index 0000000..554f3be
--- /dev/null
+++ b/src/service/pdu.rs
@@ -0,0 +1,365 @@
+use crate::Error;
+use ruma::{
+ events::{
+ room::member::RoomMemberEventContent, AnyEphemeralRoomEvent, AnyStateEvent,
+ AnyStrippedStateEvent, AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent,
+ RoomEventType, StateEvent,
+ },
+ serde::Raw,
+ state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
+ OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
+};
+use serde::{Deserialize, Serialize};
+use serde_json::{
+ json,
+ value::{to_raw_value, RawValue as RawJsonValue},
+};
+use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
+use tracing::warn;
+
+/// Content hashes of a PDU.
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct EventHash {
+ /// The SHA-256 hash.
+ pub sha256: String,
+}
+
+#[derive(Clone, Deserialize, Serialize, Debug)]
+pub struct PduEvent {
+ pub event_id: Arc<EventId>,
+ pub room_id: OwnedRoomId,
+ pub sender: OwnedUserId,
+ pub origin_server_ts: UInt,
+ #[serde(rename = "type")]
+ pub kind: RoomEventType,
+ pub content: Box<RawJsonValue>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub state_key: Option<String>,
+ pub prev_events: Vec<Arc<EventId>>,
+ pub depth: UInt,
+ pub auth_events: Vec<Arc<EventId>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub redacts: Option<Arc<EventId>>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub unsigned: Option<Box<RawJsonValue>>,
+ pub hashes: EventHash,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub signatures: Option<Box<RawJsonValue>>, // BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, String>>
+}
+
+impl PduEvent {
+ #[tracing::instrument(skip(self))]
+ pub fn redact(&mut self, reason: &PduEvent) -> crate::Result<()> {
+ self.unsigned = None;
+
+ let allowed: &[&str] = match self.kind {
+ RoomEventType::RoomMember => &["join_authorised_via_users_server", "membership"],
+ RoomEventType::RoomCreate => &["creator"],
+ RoomEventType::RoomJoinRules => &["join_rule"],
+ RoomEventType::RoomPowerLevels => &[
+ "ban",
+ "events",
+ "events_default",
+ "kick",
+ "redact",
+ "state_default",
+ "users",
+ "users_default",
+ ],
+ RoomEventType::RoomHistoryVisibility => &["history_visibility"],
+ _ => &[],
+ };
+
+ let mut old_content: BTreeMap<String, serde_json::Value> =
+ serde_json::from_str(self.content.get())
+ .map_err(|_| Error::bad_database("PDU in db has invalid content."))?;
+
+ let mut new_content = serde_json::Map::new();
+
+ for key in allowed {
+ if let Some(value) = old_content.remove(*key) {
+ new_content.insert((*key).to_owned(), value);
+ }
+ }
+
+ self.unsigned = Some(to_raw_value(&json!({
+ "redacted_because": serde_json::to_value(reason).expect("to_value(PduEvent) always works")
+ })).expect("to string always works"));
+
+ self.content = to_raw_value(&new_content).expect("to string always works");
+
+ Ok(())
+ }
+
+ pub fn remove_transaction_id(&mut self) -> crate::Result<()> {
+ if let Some(unsigned) = &self.unsigned {
+ let mut unsigned: BTreeMap<String, Box<RawJsonValue>> =
+ serde_json::from_str(unsigned.get())
+ .map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?;
+ unsigned.remove("transaction_id");
+ self.unsigned = Some(to_raw_value(&unsigned).expect("unsigned is valid"));
+ }
+
+ Ok(())
+ }
+
+ #[tracing::instrument(skip(self))]
+ pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
+ let mut json = json!({
+ "content": self.content,
+ "type": self.kind,
+ "event_id": self.event_id,
+ "sender": self.sender,
+ "origin_server_ts": self.origin_server_ts,
+ "unsigned": self.unsigned,
+ });
+
+ if let Some(state_key) = &self.state_key {
+ json["state_key"] = json!(state_key);
+ }
+ if let Some(redacts) = &self.redacts {
+ json["redacts"] = json!(redacts);
+ }
+
+ serde_json::from_value(json).expect("Raw::from_value always works")
+ }
+
+ /// This only works for events that are also AnyRoomEvents.
+ #[tracing::instrument(skip(self))]
+ pub fn to_any_event(&self) -> Raw<AnyEphemeralRoomEvent> {
+ let mut json = json!({
+ "content": self.content,
+ "type": self.kind,
+ "event_id": self.event_id,
+ "sender": self.sender,
+ "origin_server_ts": self.origin_server_ts,
+ "unsigned": self.unsigned,
+ "room_id": self.room_id,
+ });
+
+ if let Some(state_key) = &self.state_key {
+ json["state_key"] = json!(state_key);
+ }
+ if let Some(redacts) = &self.redacts {
+ json["redacts"] = json!(redacts);
+ }
+
+ serde_json::from_value(json).expect("Raw::from_value always works")
+ }
+
+ #[tracing::instrument(skip(self))]
+ pub fn to_room_event(&self) -> Raw<AnyTimelineEvent> {
+ let mut json = json!({
+ "content": self.content,
+ "type": self.kind,
+ "event_id": self.event_id,
+ "sender": self.sender,
+ "origin_server_ts": self.origin_server_ts,
+ "unsigned": self.unsigned,
+ "room_id": self.room_id,
+ });
+
+ if let Some(state_key) = &self.state_key {
+ json["state_key"] = json!(state_key);
+ }
+ if let Some(redacts) = &self.redacts {
+ json["redacts"] = json!(redacts);
+ }
+
+ serde_json::from_value(json).expect("Raw::from_value always works")
+ }
+
+ #[tracing::instrument(skip(self))]
+ pub fn to_state_event(&self) -> Raw<AnyStateEvent> {
+ let json = json!({
+ "content": self.content,
+ "type": self.kind,
+ "event_id": self.event_id,
+ "sender": self.sender,
+ "origin_server_ts": self.origin_server_ts,
+ "unsigned": self.unsigned,
+ "room_id": self.room_id,
+ "state_key": self.state_key,
+ });
+
+ serde_json::from_value(json).expect("Raw::from_value always works")
+ }
+
+ #[tracing::instrument(skip(self))]
+ pub fn to_sync_state_event(&self) -> Raw<AnySyncStateEvent> {
+ let json = json!({
+ "content": self.content,
+ "type": self.kind,
+ "event_id": self.event_id,
+ "sender": self.sender,
+ "origin_server_ts": self.origin_server_ts,
+ "unsigned": self.unsigned,
+ "state_key": self.state_key,
+ });
+
+ serde_json::from_value(json).expect("Raw::from_value always works")
+ }
+
+ #[tracing::instrument(skip(self))]
+ pub fn to_stripped_state_event(&self) -> Raw<AnyStrippedStateEvent> {
+ let json = json!({
+ "content": self.content,
+ "type": self.kind,
+ "sender": self.sender,
+ "state_key": self.state_key,
+ });
+
+ serde_json::from_value(json).expect("Raw::from_value always works")
+ }
+
+ #[tracing::instrument(skip(self))]
+ pub fn to_member_event(&self) -> Raw<StateEvent<RoomMemberEventContent>> {
+ let json = json!({
+ "content": self.content,
+ "type": self.kind,
+ "event_id": self.event_id,
+ "sender": self.sender,
+ "origin_server_ts": self.origin_server_ts,
+ "redacts": self.redacts,
+ "unsigned": self.unsigned,
+ "room_id": self.room_id,
+ "state_key": self.state_key,
+ });
+
+ serde_json::from_value(json).expect("Raw::from_value always works")
+ }
+
+ /// This does not return a full `Pdu` it is only to satisfy ruma's types.
+ #[tracing::instrument]
+ pub fn convert_to_outgoing_federation_event(
+ mut pdu_json: CanonicalJsonObject,
+ ) -> Box<RawJsonValue> {
+ if let Some(unsigned) = pdu_json
+ .get_mut("unsigned")
+ .and_then(|val| val.as_object_mut())
+ {
+ unsigned.remove("transaction_id");
+ }
+
+ pdu_json.remove("event_id");
+
+ // TODO: another option would be to convert it to a canonical string to validate size
+ // and return a Result<Raw<...>>
+ // serde_json::from_str::<Raw<_>>(
+ // ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is valid serde_json::Value"),
+ // )
+ // .expect("Raw::from_value always works")
+
+ to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
+ }
+
+ pub fn from_id_val(
+ event_id: &EventId,
+ mut json: CanonicalJsonObject,
+ ) -> Result<Self, serde_json::Error> {
+ json.insert(
+ "event_id".to_owned(),
+ CanonicalJsonValue::String(event_id.as_str().to_owned()),
+ );
+
+ serde_json::from_value(serde_json::to_value(json).expect("valid JSON"))
+ }
+}
+
+impl state_res::Event for PduEvent {
+ type Id = Arc<EventId>;
+
+ fn event_id(&self) -> &Self::Id {
+ &self.event_id
+ }
+
+ fn room_id(&self) -> &RoomId {
+ &self.room_id
+ }
+
+ fn sender(&self) -> &UserId {
+ &self.sender
+ }
+
+ fn event_type(&self) -> &RoomEventType {
+ &self.kind
+ }
+
+ fn content(&self) -> &RawJsonValue {
+ &self.content
+ }
+
+ fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch {
+ MilliSecondsSinceUnixEpoch(self.origin_server_ts)
+ }
+
+ fn state_key(&self) -> Option<&str> {
+ self.state_key.as_deref()
+ }
+
+ fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + '_> {
+ Box::new(self.prev_events.iter())
+ }
+
+ fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + '_> {
+ Box::new(self.auth_events.iter())
+ }
+
+ fn redacts(&self) -> Option<&Self::Id> {
+ self.redacts.as_ref()
+ }
+}
+
+// These impl's allow us to dedup state snapshots when resolving state
+// for incoming events (federation/send/{txn}).
+impl Eq for PduEvent {}
+impl PartialEq for PduEvent {
+ fn eq(&self, other: &Self) -> bool {
+ self.event_id == other.event_id
+ }
+}
+impl PartialOrd for PduEvent {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ self.event_id.partial_cmp(&other.event_id)
+ }
+}
+impl Ord for PduEvent {
+ fn cmp(&self, other: &Self) -> Ordering {
+ self.event_id.cmp(&other.event_id)
+ }
+}
+
+/// Generates a correct eventId for the incoming pdu.
+///
+/// Returns a tuple of the new `EventId` and the PDU as a `BTreeMap<String, CanonicalJsonValue>`.
+pub(crate) fn gen_event_id_canonical_json(
+ pdu: &RawJsonValue,
+ room_version_id: &RoomVersionId,
+) -> crate::Result<(OwnedEventId, CanonicalJsonObject)> {
+ let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
+ warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
+ Error::BadServerResponse("Invalid PDU in server response")
+ })?;
+
+ let event_id = format!(
+ "${}",
+ // Anything higher than version3 behaves the same
+ ruma::signatures::reference_hash(&value, room_version_id)
+ .expect("ruma can calculate reference hashes")
+ )
+ .try_into()
+ .expect("ruma's reference hashes are valid event ids");
+
+ Ok((event_id, value))
+}
+
+/// Build the start of a PDU in order to add it to the Database.
+#[derive(Debug, Deserialize)]
+pub struct PduBuilder {
+ #[serde(rename = "type")]
+ pub event_type: RoomEventType,
+ pub content: Box<RawJsonValue>,
+ pub unsigned: Option<BTreeMap<String, serde_json::Value>>,
+ pub state_key: Option<String>,
+ pub redacts: Option<Arc<EventId>>,
+}