summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-02-18 13:20:20 +0100
committerTimo Kösters <timo@koesters.xyz>2023-03-13 10:39:16 +0100
commit23b18d71eef78544148746d30d8b9630998a3a22 (patch)
tree075cdba0d27e630f430efbea31277dfddbee5c73 /src
parent84cfed52317a64627b33d759479a7ace7d58c337 (diff)
downloadconduit-23b18d71eef78544148746d30d8b9630998a3a22.zip
feat: handle backfill requests
Based on https://gitlab.com/famedly/conduit/-/merge_requests/421
Diffstat (limited to 'src')
-rw-r--r--src/api/server_server.rs83
-rw-r--r--src/main.rs1
-rw-r--r--src/service/mod.rs7
-rw-r--r--src/service/rooms/state_accessor/mod.rs114
4 files changed, 199 insertions, 6 deletions
diff --git a/src/api/server_server.rs b/src/api/server_server.rs
index fc3e2c0..11a6cbf 100644
--- a/src/api/server_server.rs
+++ b/src/api/server_server.rs
@@ -12,6 +12,7 @@ use ruma::{
client::error::{Error as RumaError, ErrorKind},
federation::{
authorization::get_event_authorization,
+ backfill::get_backfill,
device::get_devices::{self, v1::UserDevice},
directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
@@ -42,8 +43,9 @@ use ruma::{
},
serde::{Base64, JsonObject, Raw},
to_device::DeviceIdOrAllDevices,
- CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
- OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
+ uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
+ OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId,
+ ServerName,
};
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use std::{
@@ -950,6 +952,83 @@ pub async fn get_event_route(
})
}
+/// # `GET /_matrix/federation/v1/backfill/<room_id>`
+///
+/// Retrieves events from before the sender joined the room, if the room's
+/// history visibility allows.
+pub async fn get_backfill_route(
+ body: Ruma<get_backfill::v1::Request>,
+) -> Result<get_backfill::v1::Response> {
+ if !services().globals.allow_federation() {
+ return Err(Error::bad_config("Federation is disabled."));
+ }
+
+ let sender_servername = body
+ .sender_servername
+ .as_ref()
+ .expect("server is authenticated");
+
+ info!("Got backfill request from: {}", sender_servername);
+
+ if !services()
+ .rooms
+ .state_cache
+ .server_in_room(sender_servername, &body.room_id)?
+ {
+ return Err(Error::BadRequest(
+ ErrorKind::Forbidden,
+ "Server is not in room.",
+ ));
+ }
+
+ services()
+ .rooms
+ .event_handler
+ .acl_check(sender_servername, &body.room_id)?;
+
+ let until = body
+ .v
+ .iter()
+ .map(|eventid| services().rooms.timeline.get_pdu_count(eventid))
+ .filter_map(|r| r.ok().flatten())
+ .max()
+ .ok_or(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "No known eventid in v",
+ ))?;
+
+ let limit = body.limit.min(uint!(100));
+
+ let all_events = services()
+ .rooms
+ .timeline
+ .pdus_until(&user_id!("@doesntmatter:conduit.rs"), &body.room_id, until)?
+ .take(limit.try_into().unwrap());
+
+ let events = all_events
+ .filter_map(|r| r.ok())
+ .filter(|(_, e)| {
+ matches!(
+ services().rooms.state_accessor.server_can_see_event(
+ sender_servername,
+ &e.room_id,
+ &e.event_id,
+ ),
+ Ok(true),
+ )
+ })
+ .map(|(pdu_id, _)| services().rooms.timeline.get_pdu_json_from_id(&pdu_id))
+ .filter_map(|r| r.ok().flatten())
+ .map(|pdu| PduEvent::convert_to_outgoing_federation_event(pdu))
+ .collect();
+
+ Ok(get_backfill::v1::Response {
+ origin: services().globals.server_name().to_owned(),
+ origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
+ pdus: events,
+ })
+}
+
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
///
/// Retrieves events that the sender is missing.
diff --git a/src/main.rs b/src/main.rs
index da80507..fe6cfc0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -390,6 +390,7 @@ fn routes() -> Router {
.ruma_route(server_server::get_public_rooms_filtered_route)
.ruma_route(server_server::send_transaction_message_route)
.ruma_route(server_server::get_event_route)
+ .ruma_route(server_server::get_backfill_route)
.ruma_route(server_server::get_missing_events_route)
.ruma_route(server_server::get_event_authorization_route)
.ruma_route(server_server::get_room_state_route)
diff --git a/src/service/mod.rs b/src/service/mod.rs
index 385dcc6..07d80a1 100644
--- a/src/service/mod.rs
+++ b/src/service/mod.rs
@@ -77,7 +77,12 @@ impl Services {
search: rooms::search::Service { db },
short: rooms::short::Service { db },
state: rooms::state::Service { db },
- state_accessor: rooms::state_accessor::Service { db },
+ state_accessor: rooms::state_accessor::Service {
+ db,
+ server_visibility_cache: Mutex::new(LruCache::new(
+ (100.0 * config.conduit_cache_capacity_modifier) as usize,
+ )),
+ },
state_cache: rooms::state_cache::Service { db },
state_compressor: rooms::state_compressor::Service {
db,
diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs
index 87d9936..e940ffa 100644
--- a/src/service/rooms/state_accessor/mod.rs
+++ b/src/service/rooms/state_accessor/mod.rs
@@ -1,13 +1,28 @@
mod data;
-use std::{collections::HashMap, sync::Arc};
+use std::{
+ collections::HashMap,
+ sync::{Arc, Mutex},
+};
pub use data::Data;
-use ruma::{events::StateEventType, EventId, RoomId};
+use lru_cache::LruCache;
+use ruma::{
+ events::{
+ room::{
+ history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
+ member::{MembershipState, RoomMemberEventContent},
+ },
+ StateEventType,
+ },
+ EventId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
+};
+use tracing::error;
-use crate::{PduEvent, Result};
+use crate::{services, Error, PduEvent, Result};
pub struct Service {
pub db: &'static dyn Data,
+ pub server_visibility_cache: Mutex<LruCache<(OwnedServerName, u64), bool>>,
}
impl Service {
@@ -46,6 +61,99 @@ impl Service {
self.db.state_get(shortstatehash, event_type, state_key)
}
+ /// Get membership for given user in state
+ fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result<MembershipState> {
+ self.state_get(
+ shortstatehash,
+ &StateEventType::RoomMember,
+ user_id.as_str(),
+ )?
+ .map_or(Ok(MembershipState::Leave), |s| {
+ serde_json::from_str(s.content.get())
+ .map(|c: RoomMemberEventContent| c.membership)
+ .map_err(|_| Error::bad_database("Invalid room membership event in database."))
+ })
+ }
+
+ /// The user was a joined member at this state (potentially in the past)
+ fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool {
+ self.user_membership(shortstatehash, user_id)
+ .map(|s| s == MembershipState::Join)
+ .unwrap_or_default() // Return sensible default, i.e. false
+ }
+
+ /// The user was an invited or joined room member at this state (potentially
+ /// in the past)
+ fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> bool {
+ self.user_membership(shortstatehash, user_id)
+ .map(|s| s == MembershipState::Join || s == MembershipState::Invite)
+ .unwrap_or_default() // Return sensible default, i.e. false
+ }
+
+ /// Whether a server is allowed to see an event through federation, based on
+ /// the room's history_visibility at that event's state.
+ #[tracing::instrument(skip(self))]
+ pub fn server_can_see_event(
+ &self,
+ origin: &ServerName,
+ room_id: &RoomId,
+ event_id: &EventId,
+ ) -> Result<bool> {
+ let shortstatehash = match self.pdu_shortstatehash(event_id)? {
+ Some(shortstatehash) => shortstatehash,
+ None => return Ok(false),
+ };
+
+ if let Some(visibility) = self
+ .server_visibility_cache
+ .lock()
+ .unwrap()
+ .get_mut(&(origin.to_owned(), shortstatehash))
+ {
+ return Ok(*visibility);
+ }
+
+ let history_visibility = self
+ .state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")?
+ .map_or(Ok(HistoryVisibility::Shared), |s| {
+ serde_json::from_str(s.content.get())
+ .map(|c: RoomHistoryVisibilityEventContent| c.history_visibility)
+ .map_err(|_| {
+ Error::bad_database("Invalid history visibility event in database.")
+ })
+ })?;
+
+ let mut current_server_members = services()
+ .rooms
+ .state_cache
+ .room_members(room_id)
+ .filter_map(|r| r.ok())
+ .filter(|member| member.server_name() == origin);
+
+ let visibility = match history_visibility {
+ HistoryVisibility::WorldReadable | HistoryVisibility::Shared => true,
+ HistoryVisibility::Invited => {
+ // Allow if any member on requesting server was AT LEAST invited, else deny
+ current_server_members.any(|member| self.user_was_invited(shortstatehash, &member))
+ }
+ HistoryVisibility::Joined => {
+ // Allow if any member on requested server was joined, else deny
+ current_server_members.any(|member| self.user_was_joined(shortstatehash, &member))
+ }
+ _ => {
+ error!("Unknown history visibility {history_visibility}");
+ false
+ }
+ };
+
+ self.server_visibility_cache
+ .lock()
+ .unwrap()
+ .insert((origin.to_owned(), shortstatehash), visibility);
+
+ Ok(visibility)
+ }
+
/// Returns the state hash for this pdu.
pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
self.db.pdu_shortstatehash(event_id)