summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-07-06 10:32:25 +0200
committerTimo Kösters <timo@koesters.xyz>2023-07-06 10:32:25 +0200
commite4f769963fb880d6131ae940f89ab9b1193c5d32 (patch)
treed0a39619779232353e7a56a4c05459bc06ca577a /src
parentf8a36e7554672b1541e7e6a0d6f62c02a6c8a30e (diff)
downloadconduit-e4f769963fb880d6131ae940f89ab9b1193c5d32.zip
feat: very simple sliding sync implementation
Diffstat (limited to 'src')
-rw-r--r--src/api/client_server/directory.rs12
-rw-r--r--src/api/client_server/sync.rs270
-rw-r--r--src/api/client_server/unversioned.rs20
-rw-r--r--src/config/mod.rs1
-rw-r--r--src/main.rs1
-rw-r--r--src/service/globals/mod.rs4
-rw-r--r--src/service/pusher/mod.rs16
-rw-r--r--src/service/rooms/spaces/mod.rs10
-rw-r--r--src/service/rooms/state_accessor/mod.rs13
9 files changed, 269 insertions, 78 deletions
diff --git a/src/api/client_server/directory.rs b/src/api/client_server/directory.rs
index e132210..df1ac40 100644
--- a/src/api/client_server/directory.rs
+++ b/src/api/client_server/directory.rs
@@ -203,17 +203,7 @@ pub(crate) async fn get_public_rooms_filtered_helper(
Error::bad_database("Invalid canonical alias event in database.")
})
})?,
- name: services()
- .rooms
- .state_accessor
- .room_state_get(&room_id, &StateEventType::RoomName, "")?
- .map_or(Ok(None), |s| {
- serde_json::from_str(s.content.get())
- .map(|c: RoomNameEventContent| c.name)
- .map_err(|_| {
- Error::bad_database("Invalid room name event in database.")
- })
- })?,
+ name: services().rooms.state_accessor.get_name(&room_id)?,
num_joined_members: services()
.rooms
.state_cache
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index dd75347..bc89a4c 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -1,4 +1,6 @@
-use crate::{service::rooms::timeline::PduCount, services, Error, Result, Ruma, RumaResponse};
+use crate::{
+ service::rooms::timeline::PduCount, services, Error, PduEvent, Result, Ruma, RumaResponse,
+};
use ruma::{
api::client::{
filter::{FilterDefinition, LazyLoadOptions},
@@ -8,6 +10,7 @@ use ruma::{
Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
},
+ v4::SlidingOp,
DeviceLists, UnreadNotificationsCount,
},
uiaa::UiaaResponse,
@@ -17,10 +20,10 @@ use ruma::{
StateEventType, TimelineEventType,
},
serde::Raw,
- DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UserId,
+ uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
};
use std::{
- collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
+ collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
time::Duration,
};
@@ -199,7 +202,7 @@ async fn sync_helper(
let mut joined_rooms = BTreeMap::new();
let since = body
.since
- .clone()
+ .as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let sincecount = PduCount::Normal(since);
@@ -581,43 +584,7 @@ async fn load_joined_room(
drop(insert_lock);
}
- let timeline_pdus;
- let limited;
- if services()
- .rooms
- .timeline
- .last_timeline_count(&sender_user, &room_id)?
- > sincecount
- {
- let mut non_timeline_pdus = services()
- .rooms
- .timeline
- .pdus_until(&sender_user, &room_id, PduCount::max())?
- .filter_map(|r| {
- // Filter out buggy events
- if r.is_err() {
- error!("Bad pdu in pdus_since: {:?}", r);
- }
- r.ok()
- })
- .take_while(|(pducount, _)| pducount > &sincecount);
-
- // Take the last 10 events for the timeline
- timeline_pdus = non_timeline_pdus
- .by_ref()
- .take(10)
- .collect::<Vec<_>>()
- .into_iter()
- .rev()
- .collect::<Vec<_>>();
-
- // They /sync response doesn't always return all messages, so we say the output is
- // limited unless there are events in non_timeline_pdus
- limited = non_timeline_pdus.next().is_some();
- } else {
- timeline_pdus = Vec::new();
- limited = false;
- }
+ let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?;
let send_notification_counts = !timeline_pdus.is_empty()
|| services()
@@ -1132,6 +1099,52 @@ async fn load_joined_room(
})
}
+fn load_timeline(
+ sender_user: &UserId,
+ room_id: &RoomId,
+ sincecount: PduCount,
+ limit: u64,
+) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
+ let timeline_pdus;
+ let limited;
+ if services()
+ .rooms
+ .timeline
+ .last_timeline_count(&sender_user, &room_id)?
+ > sincecount
+ {
+ let mut non_timeline_pdus = services()
+ .rooms
+ .timeline
+ .pdus_until(&sender_user, &room_id, PduCount::max())?
+ .filter_map(|r| {
+ // Filter out buggy events
+ if r.is_err() {
+ error!("Bad pdu in pdus_since: {:?}", r);
+ }
+ r.ok()
+ })
+ .take_while(|(pducount, _)| pducount > &sincecount);
+
+ // Take the last events for the timeline
+ timeline_pdus = non_timeline_pdus
+ .by_ref()
+ .take(limit as usize)
+ .collect::<Vec<_>>()
+ .into_iter()
+ .rev()
+ .collect::<Vec<_>>();
+
+ // They /sync response doesn't always return all messages, so we say the output is
+ // limited unless there are events in non_timeline_pdus
+ limited = non_timeline_pdus.next().is_some();
+ } else {
+ timeline_pdus = Vec::new();
+ limited = false;
+ }
+ Ok((timeline_pdus, limited))
+}
+
fn share_encrypted_room(
sender_user: &UserId,
user_id: &UserId,
@@ -1155,3 +1168,178 @@ fn share_encrypted_room(
})
.any(|encrypted| encrypted))
}
+
+pub async fn sync_events_v4_route(
+ body: Ruma<sync_events::v4::Request>,
+) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
+ let sender_user = body.sender_user.expect("user is authenticated");
+ let sender_device = body.sender_device.expect("user is authenticated");
+ let body = dbg!(body.body);
+
+ // Setup watchers, so if there's no response, we can wait for them
+ let watcher = services().globals.watch(&sender_user, &sender_device);
+
+ let next_batch = services().globals.current_count()?;
+
+ let since = body
+ .pos
+ .as_ref()
+ .and_then(|string| string.parse().ok())
+ .unwrap_or(0);
+ let sincecount = PduCount::Normal(since);
+
+ let initial = since == 0;
+
+ let all_joined_rooms = services()
+ .rooms
+ .state_cache
+ .rooms_joined(&sender_user)
+ .filter_map(|r| r.ok())
+ .collect::<Vec<_>>();
+
+ let mut lists = BTreeMap::new();
+ let mut todo_rooms = BTreeMap::new(); // and required state
+
+ for (list_id, list) in body.lists {
+ if list.filters.and_then(|f| f.is_invite).unwrap_or(false) {
+ continue;
+ }
+
+ lists.insert(
+ list_id,
+ sync_events::v4::SyncList {
+ ops: list
+ .ranges
+ .into_iter()
+ .map(|mut r| {
+ r.0 =
+ r.0.clamp(uint!(0), UInt::from(all_joined_rooms.len() as u32 - 1));
+ r.1 =
+ r.1.clamp(r.0, UInt::from(all_joined_rooms.len() as u32 - 1));
+ let room_ids = all_joined_rooms
+ [(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
+ .to_vec();
+ todo_rooms.extend(room_ids.iter().cloned().map(|r| {
+ let limit = list
+ .room_details
+ .timeline_limit
+ .map_or(10, u64::from)
+ .min(100);
+ (r, (list.room_details.required_state.clone(), limit))
+ }));
+ sync_events::v4::SyncOp {
+ op: SlidingOp::Sync,
+ range: Some(r.clone()),
+ index: None,
+ room_ids,
+ room_id: None,
+ }
+ })
+ .collect(),
+ count: UInt::from(all_joined_rooms.len() as u32),
+ },
+ );
+ }
+
+ let mut rooms = BTreeMap::new();
+ for (room_id, (required_state_request, timeline_limit)) in todo_rooms {
+ let (timeline_pdus, limited) =
+ load_timeline(&sender_user, &room_id, sincecount, timeline_limit)?;
+
+ let room_events: Vec<_> = timeline_pdus
+ .iter()
+ .map(|(_, pdu)| pdu.to_sync_room_event())
+ .collect();
+
+ let required_state = required_state_request
+ .iter()
+ .map(|state| {
+ services()
+ .rooms
+ .state_accessor
+ .room_state_get(&room_id, &state.0, &state.1)
+ })
+ .filter_map(|r| r.ok())
+ .filter_map(|o| o)
+ .map(|state| state.to_sync_state_event())
+ .collect();
+
+ rooms.insert(
+ room_id.clone(),
+ sync_events::v4::SlidingSyncRoom {
+ name: services().rooms.state_accessor.get_name(&room_id)?,
+ initial: Some(initial),
+ is_dm: None,
+ invite_state: None,
+ unread_notifications: UnreadNotificationsCount {
+ highlight_count: None,
+ notification_count: None,
+ },
+ timeline: room_events,
+ required_state,
+ prev_batch: None,
+ limited,
+ joined_count: Some(
+ (services()
+ .rooms
+ .state_cache
+ .room_joined_count(&room_id)?
+ .unwrap_or(0) as u32)
+ .into(),
+ ),
+ invited_count: Some(
+ (services()
+ .rooms
+ .state_cache
+ .room_invited_count(&room_id)?
+ .unwrap_or(0) as u32)
+ .into(),
+ ),
+ num_live: None,
+ },
+ );
+ }
+
+ if rooms
+ .iter()
+ .all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
+ {
+ // Hang a few seconds so requests are not spammed
+ // Stop hanging if new info arrives
+ let mut duration = body.timeout.unwrap_or(Duration::from_secs(30));
+ if duration.as_secs() > 30 {
+ duration = Duration::from_secs(30);
+ }
+ let _ = tokio::time::timeout(duration, watcher).await;
+ }
+
+ Ok(dbg!(sync_events::v4::Response {
+ initial: initial,
+ txn_id: body.txn_id.clone(),
+ pos: next_batch.to_string(),
+ lists,
+ rooms,
+ extensions: sync_events::v4::Extensions {
+ to_device: None,
+ e2ee: sync_events::v4::E2EE {
+ device_lists: DeviceLists {
+ changed: Vec::new(),
+ left: Vec::new(),
+ },
+ device_one_time_keys_count: BTreeMap::new(),
+ device_unused_fallback_key_types: None,
+ },
+ account_data: sync_events::v4::AccountData {
+ global: Vec::new(),
+ rooms: BTreeMap::new(),
+ },
+ receipts: sync_events::v4::Receipts {
+ rooms: BTreeMap::new(),
+ },
+ typing: sync_events::v4::Typing {
+ rooms: BTreeMap::new(),
+ },
+ },
+ delta_token: None,
+ }))
+}
diff --git a/src/api/client_server/unversioned.rs b/src/api/client_server/unversioned.rs
index b4f03f4..797b952 100644
--- a/src/api/client_server/unversioned.rs
+++ b/src/api/client_server/unversioned.rs
@@ -1,8 +1,9 @@
use std::{collections::BTreeMap, iter::FromIterator};
-use ruma::api::client::discovery::get_supported_versions;
+use axum::{response::IntoResponse, Json};
+use ruma::api::client::{discovery::get_supported_versions, error::ErrorKind};
-use crate::{Result, Ruma};
+use crate::{services, Error, Result, Ruma};
/// # `GET /_matrix/client/versions`
///
@@ -31,3 +32,18 @@ pub async fn get_supported_versions_route(
Ok(resp)
}
+
+/// # `GET /.well-known/matrix/client`
+pub async fn well_known_client_route(
+ _body: Ruma<get_supported_versions::Request>,
+) -> Result<impl IntoResponse> {
+ let client_url = match services().globals.well_known_client() {
+ Some(url) => url.clone(),
+ None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
+ };
+
+ Ok(Json(serde_json::json!({
+ "m.homeserver": {"base_url": client_url},
+ "org.matrix.msc3575.proxy": {"url": client_url}
+ })))
+}
diff --git a/src/config/mod.rs b/src/config/mod.rs
index f922282..4dad9f7 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -54,6 +54,7 @@ pub struct Config {
pub allow_unstable_room_versions: bool,
#[serde(default = "default_default_room_version")]
pub default_room_version: RoomVersionId,
+ pub well_known_client: Option<String>,
#[serde(default = "false_fn")]
pub allow_jaeger: bool,
#[serde(default = "false_fn")]
diff --git a/src/main.rs b/src/main.rs
index ea5572e..579eeb1 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -357,6 +357,7 @@ fn routes() -> Router {
.put(client_server::send_state_event_for_empty_key_route),
)
.ruma_route(client_server::sync_events_route)
+ .ruma_route(client_server::sync_events_v4_route)
.ruma_route(client_server::get_context_route)
.ruma_route(client_server::get_message_events_route)
.ruma_route(client_server::search_events_route)
diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs
index e4affde..5326b7a 100644
--- a/src/service/globals/mod.rs
+++ b/src/service/globals/mod.rs
@@ -342,6 +342,10 @@ impl Service {
r
}
+ pub fn well_known_client(&self) -> &Option<String> {
+ &self.config.well_known_client
+ }
+
pub fn shutdown(&self) {
self.shutdown.store(true, atomic::Ordering::Relaxed);
// On shutdown
diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs
index d4acaa5..5e4281d 100644
--- a/src/service/pusher/mod.rs
+++ b/src/service/pusher/mod.rs
@@ -270,21 +270,7 @@ impl Service {
notifi.sender_display_name = services().users.displayname(&event.sender)?;
- let room_name = if let Some(room_name_pdu) = services()
- .rooms
- .state_accessor
- .room_state_get(&event.room_id, &StateEventType::RoomName, "")?
- {
- serde_json::from_str::<RoomNameEventContent>(room_name_pdu.content.get())
- .map_err(|_| {
- Error::bad_database("Invalid room name event in database.")
- })?
- .name
- } else {
- None
- };
-
- notifi.room_name = room_name;
+ notifi.room_name = services().rooms.state_accessor.get_name(&event.room_id)?;
self.send_request(&http.url, send_event_notification::v1::Request::new(notifi))
.await?;
diff --git a/src/service/rooms/spaces/mod.rs b/src/service/rooms/spaces/mod.rs
index 76ba6c5..380f86c 100644
--- a/src/service/rooms/spaces/mod.rs
+++ b/src/service/rooms/spaces/mod.rs
@@ -273,15 +273,7 @@ impl Service {
Error::bad_database("Invalid canonical alias event in database.")
})
})?,
- name: services()
- .rooms
- .state_accessor
- .room_state_get(&room_id, &StateEventType::RoomName, "")?
- .map_or(Ok(None), |s| {
- serde_json::from_str(s.content.get())
- .map(|c: RoomNameEventContent| c.name)
- .map_err(|_| Error::bad_database("Invalid room name event in database."))
- })?,
+ name: services().rooms.state_accessor.get_name(&room_id)?,
num_joined_members: services()
.rooms
.state_cache
diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs
index a25a8b5..9d071a5 100644
--- a/src/service/rooms/state_accessor/mod.rs
+++ b/src/service/rooms/state_accessor/mod.rs
@@ -11,6 +11,7 @@ use ruma::{
room::{
history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
member::{MembershipState, RoomMemberEventContent},
+ name::RoomNameEventContent,
},
StateEventType,
},
@@ -269,4 +270,16 @@ impl Service {
) -> Result<Option<Arc<PduEvent>>> {
self.db.room_state_get(room_id, event_type, state_key)
}
+
+ pub fn get_name(&self, room_id: &RoomId) -> Result<Option<String>> {
+ services()
+ .rooms
+ .state_accessor
+ .room_state_get(&room_id, &StateEventType::RoomName, "")?
+ .map_or(Ok(None), |s| {
+ serde_json::from_str(s.content.get())
+ .map(|c: RoomNameEventContent| c.name)
+ .map_err(|_| Error::bad_database("Invalid room name event in database."))
+ })
+ }
}