diff options
author | Jonathan de Jong <jonathan@automatia.nl> | 2023-01-26 18:19:39 +0100 |
---|---|---|
committer | Jonathan de Jong <jonathan@automatia.nl> | 2023-01-26 18:19:39 +0100 |
commit | b158896396bf33e088dd35ec651c52e6afe646e5 (patch) | |
tree | 660940ce37ec869a667d0477f8e8dde82193f450 /src | |
parent | 52018c3967185c04b32bca63752ad2bf61fd2ed9 (diff) | |
parent | f95dd4521caaa8dcbeadc463f535eb34186d6ed7 (diff) | |
download | conduit-b158896396bf33e088dd35ec651c52e6afe646e5.zip |
Merge remote-tracking branch 'origin/next' into complement-improvements
Diffstat (limited to 'src')
68 files changed, 1317 insertions, 815 deletions
diff --git a/src/api/client_server/account.rs b/src/api/client_server/account.rs index c9e3c9b..7459254 100644 --- a/src/api/client_server/account.rs +++ b/src/api/client_server/account.rs @@ -4,8 +4,8 @@ use ruma::{ api::client::{ account::{ change_password, deactivate, get_3pids, get_username_availability, register, - request_3pid_management_token_via_email, request_3pid_management_token_via_msisdn, whoami, - ThirdPartyIdRemovalStatus, + request_3pid_management_token_via_email, request_3pid_management_token_via_msisdn, + whoami, ThirdPartyIdRemovalStatus, }, error::ErrorKind, uiaa::{AuthFlow, AuthType, UiaaInfo}, @@ -30,7 +30,7 @@ const RANDOM_USER_ID_LENGTH: usize = 10; /// /// Note: This will not reserve the username, so the username might become invalid when trying to register pub async fn get_register_available_route( - body: Ruma<get_username_availability::v3::IncomingRequest>, + body: Ruma<get_username_availability::v3::Request>, ) -> Result<get_username_availability::v3::Response> { // Validate user id let user_id = UserId::parse_with_server_name( @@ -73,9 +73,7 @@ pub async fn get_register_available_route( /// - If type is not guest and no username is given: Always fails after UIAA check /// - Creates a new account and populates it with default account data /// - If `inhibit_login` is false: Creates a device and returns device id and access_token -pub async fn register_route( - body: Ruma<register::v3::IncomingRequest>, -) -> Result<register::v3::Response> { +pub async fn register_route(body: Ruma<register::v3::Request>) -> Result<register::v3::Response> { if !services().globals.allow_registration() && !body.from_appservice { return Err(Error::BadRequest( ErrorKind::Forbidden, @@ -227,8 +225,7 @@ pub async fn register_route( services() .admin .send_message(RoomMessageEventContent::notice_plain(format!( - "New user {} registered on this server.", - user_id + "New user {user_id} registered on this server." ))); // If this is the first real user, grant them admin privileges @@ -266,7 +263,7 @@ pub async fn register_route( /// - Forgets to-device events /// - Triggers device list updates pub async fn change_password_route( - body: Ruma<change_password::v3::IncomingRequest>, + body: Ruma<change_password::v3::Request>, ) -> Result<change_password::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); @@ -320,8 +317,7 @@ pub async fn change_password_route( services() .admin .send_message(RoomMessageEventContent::notice_plain(format!( - "User {} changed their password.", - sender_user + "User {sender_user} changed their password." ))); Ok(change_password::v3::Response {}) @@ -354,7 +350,7 @@ pub async fn whoami_route(body: Ruma<whoami::v3::Request>) -> Result<whoami::v3: /// - Triggers device list updates /// - Removes ability to log in again pub async fn deactivate_route( - body: Ruma<deactivate::v3::IncomingRequest>, + body: Ruma<deactivate::v3::Request>, ) -> Result<deactivate::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); @@ -398,8 +394,7 @@ pub async fn deactivate_route( services() .admin .send_message(RoomMessageEventContent::notice_plain(format!( - "User {} deactivated their account.", - sender_user + "User {sender_user} deactivated their account." ))); Ok(deactivate::v3::Response { @@ -426,7 +421,7 @@ pub async fn third_party_route( /// /// - 403 signals that The homeserver does not allow the third party identifier as a contact option. pub async fn request_3pid_management_token_via_email_route( - _body: Ruma<request_3pid_management_token_via_email::v3::IncomingRequest>, + _body: Ruma<request_3pid_management_token_via_email::v3::Request>, ) -> Result<request_3pid_management_token_via_email::v3::Response> { Err(Error::BadRequest( ErrorKind::ThreepidDenied, @@ -440,7 +435,7 @@ pub async fn request_3pid_management_token_via_email_route( /// /// - 403 signals that The homeserver does not allow the third party identifier as a contact option. pub async fn request_3pid_management_token_via_msisdn_route( - _body: Ruma<request_3pid_management_token_via_msisdn::v3::IncomingRequest>, + _body: Ruma<request_3pid_management_token_via_msisdn::v3::Request>, ) -> Result<request_3pid_management_token_via_msisdn::v3::Response> { Err(Error::BadRequest( ErrorKind::ThreepidDenied, diff --git a/src/api/client_server/alias.rs b/src/api/client_server/alias.rs index b28606c..ab51b50 100644 --- a/src/api/client_server/alias.rs +++ b/src/api/client_server/alias.rs @@ -9,14 +9,14 @@ use ruma::{ }, federation, }, - RoomAliasId, + OwnedRoomAliasId, }; /// # `PUT /_matrix/client/r0/directory/room/{roomAlias}` /// /// Creates a new room alias on this server. pub async fn create_alias_route( - body: Ruma<create_alias::v3::IncomingRequest>, + body: Ruma<create_alias::v3::Request>, ) -> Result<create_alias::v3::Response> { if body.room_alias.server_name() != services().globals.server_name() { return Err(Error::BadRequest( @@ -49,7 +49,7 @@ pub async fn create_alias_route( /// - TODO: additional access control checks /// - TODO: Update canonical alias event pub async fn delete_alias_route( - body: Ruma<delete_alias::v3::IncomingRequest>, + body: Ruma<delete_alias::v3::Request>, ) -> Result<delete_alias::v3::Response> { if body.room_alias.server_name() != services().globals.server_name() { return Err(Error::BadRequest( @@ -71,18 +71,22 @@ pub async fn delete_alias_route( /// /// - TODO: Suggest more servers to join via pub async fn get_alias_route( - body: Ruma<get_alias::v3::IncomingRequest>, + body: Ruma<get_alias::v3::Request>, ) -> Result<get_alias::v3::Response> { - get_alias_helper(&body.room_alias).await + get_alias_helper(body.body.room_alias).await } -pub(crate) async fn get_alias_helper(room_alias: &RoomAliasId) -> Result<get_alias::v3::Response> { +pub(crate) async fn get_alias_helper( + room_alias: OwnedRoomAliasId, +) -> Result<get_alias::v3::Response> { if room_alias.server_name() != services().globals.server_name() { let response = services() .sending .send_federation_request( room_alias.server_name(), - federation::query::get_room_information::v1::Request { room_alias }, + federation::query::get_room_information::v1::Request { + room_alias: room_alias.to_owned(), + }, ) .await?; @@ -93,7 +97,7 @@ pub(crate) async fn get_alias_helper(room_alias: &RoomAliasId) -> Result<get_ali } let mut room_id = None; - match services().rooms.alias.resolve_local_alias(room_alias)? { + match services().rooms.alias.resolve_local_alias(&room_alias)? { Some(r) => room_id = Some(r), None => { for (_id, registration) in services().appservice.all()? { @@ -115,7 +119,9 @@ pub(crate) async fn get_alias_helper(room_alias: &RoomAliasId) -> Result<get_ali .sending .send_appservice_request( registration, - appservice::query::query_room_alias::v1::Request { room_alias }, + appservice::query::query_room_alias::v1::Request { + room_alias: room_alias.clone(), + }, ) .await .is_ok() @@ -124,7 +130,7 @@ pub(crate) async fn get_alias_helper(room_alias: &RoomAliasId) -> Result<get_ali services() .rooms .alias - .resolve_local_alias(room_alias)? + .resolve_local_alias(&room_alias)? .ok_or_else(|| { Error::bad_config("Appservice lied to us. Room does not exist.") })?, diff --git a/src/api/client_server/backup.rs b/src/api/client_server/backup.rs index f3d5ddc..115cba7 100644 --- a/src/api/client_server/backup.rs +++ b/src/api/client_server/backup.rs @@ -28,7 +28,7 @@ pub async fn create_backup_version_route( /// /// Update information about an existing backup. Only `auth_data` can be modified. pub async fn update_backup_version_route( - body: Ruma<update_backup_version::v3::IncomingRequest>, + body: Ruma<update_backup_version::v3::Request>, ) -> Result<update_backup_version::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); services() @@ -66,7 +66,7 @@ pub async fn get_latest_backup_info_route( /// /// Get information about an existing backup. pub async fn get_backup_info_route( - body: Ruma<get_backup_info::v3::IncomingRequest>, + body: Ruma<get_backup_info::v3::Request>, ) -> Result<get_backup_info::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let algorithm = services() @@ -96,7 +96,7 @@ pub async fn get_backup_info_route( /// /// - Deletes both information about the backup, as well as all key data related to the backup pub async fn delete_backup_version_route( - body: Ruma<delete_backup_version::v3::IncomingRequest>, + body: Ruma<delete_backup_version::v3::Request>, ) -> Result<delete_backup_version::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -115,7 +115,7 @@ pub async fn delete_backup_version_route( /// - Adds the keys to the backup /// - Returns the new number of keys in this backup and the etag pub async fn add_backup_keys_route( - body: Ruma<add_backup_keys::v3::IncomingRequest>, + body: Ruma<add_backup_keys::v3::Request>, ) -> Result<add_backup_keys::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -162,7 +162,7 @@ pub async fn add_backup_keys_route( /// - Adds the keys to the backup /// - Returns the new number of keys in this backup and the etag pub async fn add_backup_keys_for_room_route( - body: Ruma<add_backup_keys_for_room::v3::IncomingRequest>, + body: Ruma<add_backup_keys_for_room::v3::Request>, ) -> Result<add_backup_keys_for_room::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -207,7 +207,7 @@ pub async fn add_backup_keys_for_room_route( /// - Adds the keys to the backup /// - Returns the new number of keys in this backup and the etag pub async fn add_backup_keys_for_session_route( - body: Ruma<add_backup_keys_for_session::v3::IncomingRequest>, + body: Ruma<add_backup_keys_for_session::v3::Request>, ) -> Result<add_backup_keys_for_session::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -246,7 +246,7 @@ pub async fn add_backup_keys_for_session_route( /// /// Retrieves all keys from the backup. pub async fn get_backup_keys_route( - body: Ruma<get_backup_keys::v3::IncomingRequest>, + body: Ruma<get_backup_keys::v3::Request>, ) -> Result<get_backup_keys::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -259,7 +259,7 @@ pub async fn get_backup_keys_route( /// /// Retrieves all keys from the backup for a given room. pub async fn get_backup_keys_for_room_route( - body: Ruma<get_backup_keys_for_room::v3::IncomingRequest>, + body: Ruma<get_backup_keys_for_room::v3::Request>, ) -> Result<get_backup_keys_for_room::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -274,7 +274,7 @@ pub async fn get_backup_keys_for_room_route( /// /// Retrieves a key from the backup. pub async fn get_backup_keys_for_session_route( - body: Ruma<get_backup_keys_for_session::v3::IncomingRequest>, + body: Ruma<get_backup_keys_for_session::v3::Request>, ) -> Result<get_backup_keys_for_session::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -293,7 +293,7 @@ pub async fn get_backup_keys_for_session_route( /// /// Delete the keys from the backup. pub async fn delete_backup_keys_route( - body: Ruma<delete_backup_keys::v3::IncomingRequest>, + body: Ruma<delete_backup_keys::v3::Request>, ) -> Result<delete_backup_keys::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -316,7 +316,7 @@ pub async fn delete_backup_keys_route( /// /// Delete the keys from the backup for a given room. pub async fn delete_backup_keys_for_room_route( - body: Ruma<delete_backup_keys_for_room::v3::IncomingRequest>, + body: Ruma<delete_backup_keys_for_room::v3::Request>, ) -> Result<delete_backup_keys_for_room::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -339,7 +339,7 @@ pub async fn delete_backup_keys_for_room_route( /// /// Delete a key from the backup. pub async fn delete_backup_keys_for_session_route( - body: Ruma<delete_backup_keys_for_session::v3::IncomingRequest>, + body: Ruma<delete_backup_keys_for_session::v3::Request>, ) -> Result<delete_backup_keys_for_session::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/capabilities.rs b/src/api/client_server/capabilities.rs index 31d42d2..233e3c9 100644 --- a/src/api/client_server/capabilities.rs +++ b/src/api/client_server/capabilities.rs @@ -8,7 +8,7 @@ use std::collections::BTreeMap; /// /// Get information on the supported feature set and other relevent capabilities of this server. pub async fn get_capabilities_route( - _body: Ruma<get_capabilities::v3::IncomingRequest>, + _body: Ruma<get_capabilities::v3::Request>, ) -> Result<get_capabilities::v3::Response> { let mut available = BTreeMap::new(); for room_version in &services().globals.unstable_room_versions { diff --git a/src/api/client_server/config.rs b/src/api/client_server/config.rs index dbd2b2c..12f9aea 100644 --- a/src/api/client_server/config.rs +++ b/src/api/client_server/config.rs @@ -17,7 +17,7 @@ use serde_json::{json, value::RawValue as RawJsonValue}; /// /// Sets some account data for the sender user. pub async fn set_global_account_data_route( - body: Ruma<set_global_account_data::v3::IncomingRequest>, + body: Ruma<set_global_account_data::v3::Request>, ) -> Result<set_global_account_data::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -43,7 +43,7 @@ pub async fn set_global_account_data_route( /// /// Sets some room account data for the sender user. pub async fn set_room_account_data_route( - body: Ruma<set_room_account_data::v3::IncomingRequest>, + body: Ruma<set_room_account_data::v3::Request>, ) -> Result<set_room_account_data::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -69,7 +69,7 @@ pub async fn set_room_account_data_route( /// /// Gets some account data for the sender user. pub async fn get_global_account_data_route( - body: Ruma<get_global_account_data::v3::IncomingRequest>, + body: Ruma<get_global_account_data::v3::Request>, ) -> Result<get_global_account_data::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -89,7 +89,7 @@ pub async fn get_global_account_data_route( /// /// Gets some room account data for the sender user. pub async fn get_room_account_data_route( - body: Ruma<get_room_account_data::v3::IncomingRequest>, + body: Ruma<get_room_account_data::v3::Request>, ) -> Result<get_room_account_data::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/context.rs b/src/api/client_server/context.rs index 2e0f257..1e62f91 100644 --- a/src/api/client_server/context.rs +++ b/src/api/client_server/context.rs @@ -13,7 +13,7 @@ use tracing::error; /// - Only works if the user is joined (TODO: always allow, but only show events if the user was /// joined, depending on history_visibility) pub async fn get_context_route( - body: Ruma<get_context::v3::IncomingRequest>, + body: Ruma<get_context::v3::Request>, ) -> Result<get_context::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/device.rs b/src/api/client_server/device.rs index d4c4178..aba061b 100644 --- a/src/api/client_server/device.rs +++ b/src/api/client_server/device.rs @@ -28,7 +28,7 @@ pub async fn get_devices_route( /// /// Get metadata on a single device of the sender user. pub async fn get_device_route( - body: Ruma<get_device::v3::IncomingRequest>, + body: Ruma<get_device::v3::Request>, ) -> Result<get_device::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -44,7 +44,7 @@ pub async fn get_device_route( /// /// Updates the metadata on a given device of the sender user. pub async fn update_device_route( - body: Ruma<update_device::v3::IncomingRequest>, + body: Ruma<update_device::v3::Request>, ) -> Result<update_device::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -72,7 +72,7 @@ pub async fn update_device_route( /// - Forgets to-device events /// - Triggers device list updates pub async fn delete_device_route( - body: Ruma<delete_device::v3::IncomingRequest>, + body: Ruma<delete_device::v3::Request>, ) -> Result<delete_device::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); @@ -126,7 +126,7 @@ pub async fn delete_device_route( /// - Forgets to-device events /// - Triggers device list updates pub async fn delete_devices_route( - body: Ruma<delete_devices::v3::IncomingRequest>, + body: Ruma<delete_devices::v3::Request>, ) -> Result<delete_devices::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/directory.rs b/src/api/client_server/directory.rs index a7381d8..e132210 100644 --- a/src/api/client_server/directory.rs +++ b/src/api/client_server/directory.rs @@ -11,10 +11,7 @@ use ruma::{ }, federation, }, - directory::{ - Filter, IncomingFilter, IncomingRoomNetwork, PublicRoomJoinRule, PublicRoomsChunk, - RoomNetwork, - }, + directory::{Filter, PublicRoomJoinRule, PublicRoomsChunk, RoomNetwork}, events::{ room::{ avatar::RoomAvatarEventContent, @@ -38,7 +35,7 @@ use tracing::{error, info, warn}; /// /// - Rooms are ordered by the number of joined members pub async fn get_public_rooms_filtered_route( - body: Ruma<get_public_rooms_filtered::v3::IncomingRequest>, + body: Ruma<get_public_rooms_filtered::v3::Request>, ) -> Result<get_public_rooms_filtered::v3::Response> { get_public_rooms_filtered_helper( body.server.as_deref(), @@ -56,14 +53,14 @@ pub async fn get_public_rooms_filtered_route( /// /// - Rooms are ordered by the number of joined members pub async fn get_public_rooms_route( - body: Ruma<get_public_rooms::v3::IncomingRequest>, + body: Ruma<get_public_rooms::v3::Request>, ) -> Result<get_public_rooms::v3::Response> { let response = get_public_rooms_filtered_helper( body.server.as_deref(), body.limit, body.since.as_deref(), - &IncomingFilter::default(), - &IncomingRoomNetwork::Matrix, + &Filter::default(), + &RoomNetwork::Matrix, ) .await?; @@ -81,16 +78,13 @@ pub async fn get_public_rooms_route( /// /// - TODO: Access control checks pub async fn set_room_visibility_route( - body: Ruma<set_room_visibility::v3::IncomingRequest>, + body: Ruma<set_room_visibility::v3::Request>, ) -> Result<set_room_visibility::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); if !services().rooms.metadata.exists(&body.room_id)? { // Return 404 if the room doesn't exist - return Err(Error::BadRequest( - ErrorKind::NotFound, - "Room not found", - )); + return Err(Error::BadRequest(ErrorKind::NotFound, "Room not found")); } match &body.visibility { @@ -114,15 +108,11 @@ pub async fn set_room_visibility_route( /// /// Gets the visibility of a given room in the room directory. pub async fn get_room_visibility_route( - body: Ruma<get_room_visibility::v3::IncomingRequest>, + body: Ruma<get_room_visibility::v3::Request>, ) -> Result<get_room_visibility::v3::Response> { - if !services().rooms.metadata.exists(&body.room_id)? { // Return 404 if the room doesn't exist - return Err(Error::BadRequest( - ErrorKind::NotFound, - "Room not found", - )); + return Err(Error::BadRequest(ErrorKind::NotFound, "Room not found")); } Ok(get_room_visibility::v3::Response { @@ -138,8 +128,8 @@ pub(crate) async fn get_public_rooms_filtered_helper( server: Option<&ServerName>, limit: Option<UInt>, since: Option<&str>, - filter: &IncomingFilter, - _network: &IncomingRoomNetwork, + filter: &Filter, + _network: &RoomNetwork, ) -> Result<get_public_rooms_filtered::v3::Response> { if let Some(other_server) = server.filter(|server| *server != services().globals.server_name().as_str()) @@ -150,9 +140,9 @@ pub(crate) async fn get_public_rooms_filtered_helper( other_server, federation::directory::get_public_rooms_filtered::v1::Request { limit, - since, + since: since.map(ToOwned::to_owned), filter: Filter { - generic_search_term: filter.generic_search_term.as_deref(), + generic_search_term: filter.generic_search_term.clone(), room_types: filter.room_types.clone(), }, room_network: RoomNetwork::Matrix, @@ -371,7 +361,7 @@ pub(crate) async fn get_public_rooms_filtered_helper( let prev_batch = if num_since == 0 { None } else { - Some(format!("p{}", num_since)) + Some(format!("p{num_since}")) }; let next_batch = if chunk.len() < limit as usize { diff --git a/src/api/client_server/filter.rs b/src/api/client_server/filter.rs index a0d5a19..e9a359d 100644 --- a/src/api/client_server/filter.rs +++ b/src/api/client_server/filter.rs @@ -10,7 +10,7 @@ use ruma::api::client::{ /// /// - A user can only access their own filters pub async fn get_filter_route( - body: Ruma<get_filter::v3::IncomingRequest>, + body: Ruma<get_filter::v3::Request>, ) -> Result<get_filter::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let filter = match services().users.get_filter(sender_user, &body.filter_id)? { @@ -25,7 +25,7 @@ pub async fn get_filter_route( /// /// Creates a new filter to be used by other endpoints. pub async fn create_filter_route( - body: Ruma<create_filter::v3::IncomingRequest>, + body: Ruma<create_filter::v3::Request>, ) -> Result<create_filter::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); Ok(create_filter::v3::Response::new( diff --git a/src/api/client_server/keys.rs b/src/api/client_server/keys.rs index b649166..ba89ece 100644 --- a/src/api/client_server/keys.rs +++ b/src/api/client_server/keys.rs @@ -65,9 +65,7 @@ pub async fn upload_keys_route( /// - Always fetches users from other servers over federation /// - Gets master keys, self-signing keys, user signing keys and device keys. /// - The master and self-signing keys contain signatures that the user is allowed to see -pub async fn get_keys_route( - body: Ruma<get_keys::v3::IncomingRequest>, -) -> Result<get_keys::v3::Response> { +pub async fn get_keys_route(body: Ruma<get_keys::v3::Request>) -> Result<get_keys::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let response = @@ -93,7 +91,7 @@ pub async fn claim_keys_route( /// /// - Requires UIAA to verify password pub async fn upload_signing_keys_route( - body: Ruma<upload_signing_keys::v3::IncomingRequest>, + body: Ruma<upload_signing_keys::v3::Request>, ) -> Result<upload_signing_keys::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); @@ -214,7 +212,7 @@ pub async fn upload_signatures_route( /// /// - TODO: left users pub async fn get_key_changes_route( - body: Ruma<get_key_changes::v3::IncomingRequest>, + body: Ruma<get_key_changes::v3::Request>, ) -> Result<get_key_changes::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/media.rs b/src/api/client_server/media.rs index ae023c9..3410cc0 100644 --- a/src/api/client_server/media.rs +++ b/src/api/client_server/media.rs @@ -27,7 +27,7 @@ pub async fn get_media_config_route( /// - Some metadata will be saved in the database /// - Media will be saved in the media/ directory pub async fn create_content_route( - body: Ruma<create_content::v3::IncomingRequest>, + body: Ruma<create_content::v3::Request>, ) -> Result<create_content::v3::Response> { let mxc = format!( "mxc://{}/{}", @@ -57,7 +57,7 @@ pub async fn create_content_route( pub async fn get_remote_content( mxc: &str, server_name: &ruma::ServerName, - media_id: &str, + media_id: String, ) -> Result<get_content::v3::Response, Error> { let content_response = services() .sending @@ -65,7 +65,7 @@ pub async fn get_remote_content( server_name, get_content::v3::Request { allow_remote: false, - server_name, + server_name: server_name.to_owned(), media_id, }, ) @@ -74,7 +74,7 @@ pub async fn get_remote_content( services() .media .create( - mxc.to_string(), + mxc.to_owned(), content_response.content_disposition.as_deref(), content_response.content_type.as_deref(), &content_response.file, @@ -90,7 +90,7 @@ pub async fn get_remote_content( /// /// - Only allows federation if `allow_remote` is true pub async fn get_content_route( - body: Ruma<get_content::v3::IncomingRequest>, + body: Ruma<get_content::v3::Request>, ) -> Result<get_content::v3::Response> { let mxc = format!("mxc://{}/{}", body.server_name, body.media_id); @@ -108,7 +108,7 @@ pub async fn get_content_route( }) } else if &*body.server_name != services().globals.server_name() && body.allow_remote { let remote_content_response = - get_remote_content(&mxc, &body.server_name, &body.media_id).await?; + get_remote_content(&mxc, &body.server_name, body.media_id.clone()).await?; Ok(remote_content_response) } else { Err(Error::BadRequest(ErrorKind::NotFound, "Media not found.")) @@ -121,7 +121,7 @@ pub async fn get_content_route( /// /// - Only allows federation if `allow_remote` is true pub async fn get_content_as_filename_route( - body: Ruma<get_content_as_filename::v3::IncomingRequest>, + body: Ruma<get_content_as_filename::v3::Request>, ) -> Result<get_content_as_filename::v3::Response> { let mxc = format!("mxc://{}/{}", body.server_name, body.media_id); @@ -139,7 +139,7 @@ pub async fn get_content_as_filename_route( }) } else if &*body.server_name != services().globals.server_name() && body.allow_remote { let remote_content_response = - get_remote_content(&mxc, &body.server_name, &body.media_id).await?; + get_remote_content(&mxc, &body.server_name, body.media_id.clone()).await?; Ok(get_content_as_filename::v3::Response { content_disposition: Some(format!("inline: filename={}", body.filename)), @@ -158,7 +158,7 @@ pub async fn get_content_as_filename_route( /// /// - Only allows federation if `allow_remote` is true pub async fn get_content_thumbnail_route( - body: Ruma<get_content_thumbnail::v3::IncomingRequest>, + body: Ruma<get_content_thumbnail::v3::Request>, ) -> Result<get_content_thumbnail::v3::Response> { let mxc = format!("mxc://{}/{}", body.server_name, body.media_id); @@ -192,8 +192,8 @@ pub async fn get_content_thumbnail_route( height: body.height, width: body.width, method: body.method.clone(), - server_name: &body.server_name, - media_id: &body.media_id, + server_name: body.server_name.clone(), + media_id: body.media_id.clone(), }, ) .await?; diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 7142b8e..61c67cb 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -5,19 +5,23 @@ use ruma::{ membership::{ ban_user, forget_room, get_member_events, invite_user, join_room_by_id, join_room_by_id_or_alias, joined_members, joined_rooms, kick_user, leave_room, - unban_user, IncomingThirdPartySigned, + unban_user, ThirdPartySigned, }, }, federation::{self, membership::create_invite}, }, canonical_json::to_canonical_value, events::{ - room::member::{MembershipState, RoomMemberEventContent}, + room::{ + join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent}, + member::{MembershipState, RoomMemberEventContent}, + power_levels::RoomPowerLevelsEventContent, + }, RoomEventType, StateEventType, }, serde::Base64, - CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, - OwnedUserId, RoomId, RoomVersionId, UserId, + state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, + OwnedServerName, OwnedUserId, RoomId, RoomVersionId, UserId, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ @@ -41,7 +45,7 @@ use super::get_alias_helper; /// - If the server knowns about this room: creates the join event and does auth rules locally /// - If the server does not know about the room: asks other servers over federation pub async fn join_room_by_id_route( - body: Ruma<join_room_by_id::v3::IncomingRequest>, + body: Ruma<join_room_by_id::v3::Request>, ) -> Result<join_room_by_id::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -65,6 +69,7 @@ pub async fn join_room_by_id_route( join_room_by_id_helper( body.sender_user.as_deref(), &body.room_id, + body.reason.clone(), &servers, body.third_party_signed.as_ref(), ) @@ -78,7 +83,7 @@ pub async fn join_room_by_id_route( /// - If the server knowns about this room: creates the join event and does auth rules locally /// - If the server does not know about the room: asks other servers over federation pub async fn join_room_by_id_or_alias_route( - body: Ruma<join_room_by_id_or_alias::v3::IncomingRequest>, + body: Ruma<join_room_by_id_or_alias::v3::Request>, ) -> Result<join_room_by_id_or_alias::v3::Response> { let sender_user = body.sender_user.as_deref().expect("user is authenticated"); let body = body.body; @@ -104,7 +109,7 @@ pub async fn join_room_by_id_or_alias_route( (servers, room_id) } Err(room_alias) => { - let response = get_alias_helper(&room_alias).await?; + let response = get_alias_helper(room_alias).await?; (response.servers.into_iter().collect(), response.room_id) } @@ -113,6 +118,7 @@ pub async fn join_room_by_id_or_alias_route( let join_room_response = join_room_by_id_helper( Some(sender_user), &room_id, + body.reason.clone(), &servers, body.third_party_signed.as_ref(), ) @@ -129,11 +135,11 @@ pub async fn join_room_by_id_or_alias_route( /// /// - This should always work if the user is currently joined. pub async fn leave_room_route( - body: Ruma<leave_room::v3::IncomingRequest>, + body: Ruma<leave_room::v3::Request>, ) -> Result<leave_room::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - leave_room(sender_user, &body.room_id).await?; + leave_room(sender_user, &body.room_id, body.reason.clone()).await?; Ok(leave_room::v3::Response::new()) } @@ -142,12 +148,19 @@ pub async fn leave_room_route( /// /// Tries to send an invite event into the room. pub async fn invite_user_route( - body: Ruma<invite_user::v3::IncomingRequest>, + body: Ruma<invite_user::v3::Request>, ) -> Result<invite_user::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if let invite_user::v3::IncomingInvitationRecipient::UserId { user_id } = &body.recipient { - invite_helper(sender_user, user_id, &body.room_id, false).await?; + if let invite_user::v3::InvitationRecipient::UserId { user_id } = &body.recipient { + invite_helper( + sender_user, + user_id, + &body.room_id, + body.reason.clone(), + false, + ) + .await?; Ok(invite_user::v3::Response {}) } else { Err(Error::BadRequest(ErrorKind::NotFound, "User not found.")) @@ -158,7 +171,7 @@ pub async fn invite_user_route( /// /// Tries to send a kick event into the room. pub async fn kick_user_route( - body: Ruma<kick_user::v3::IncomingRequest>, + body: Ruma<kick_user::v3::Request>, ) -> Result<kick_user::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -181,7 +194,7 @@ pub async fn kick_user_route( .map_err(|_| Error::bad_database("Invalid member event in database."))?; event.membership = MembershipState::Leave; - // TODO: reason + event.reason = body.reason.clone(); let mutex_state = Arc::clone( services() @@ -215,13 +228,9 @@ pub async fn kick_user_route( /// # `POST /_matrix/client/r0/rooms/{roomId}/ban` /// /// Tries to send a ban event into the room. -pub async fn ban_user_route( - body: Ruma<ban_user::v3::IncomingRequest>, -) -> Result<ban_user::v3::Response> { +pub async fn ban_user_route(body: Ruma<ban_user::v3::Request>) -> Result<ban_user::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - // TODO: reason - let event = services() .rooms .state_accessor @@ -238,7 +247,7 @@ pub async fn ban_user_route( is_direct: None, third_party_invite: None, blurhash: services().users.blurhash(&body.user_id)?, - reason: None, + reason: body.reason.clone(), join_authorized_via_users_server: None, }), |event| { @@ -284,7 +293,7 @@ pub async fn ban_user_route( /// /// Tries to send an unban event into the room. pub async fn unban_user_route( - body: Ruma<unban_user::v3::IncomingRequest>, + body: Ruma<unban_user::v3::Request>, ) -> Result<unban_user::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -307,6 +316,7 @@ pub async fn unban_user_route( .map_err(|_| Error::bad_database("Invalid member event in database."))?; event.membership = MembershipState::Leave; + event.reason = body.reason.clone(); let mutex_state = Arc::clone( services() @@ -346,7 +356,7 @@ pub async fn unban_user_route( /// Note: Other devices of the user have no way of knowing the room was forgotten, so this has to /// be called from every device pub async fn forget_room_route( - body: Ruma<forget_room::v3::IncomingRequest>, + body: Ruma<forget_room::v3::Request>, ) -> Result<forget_room::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -382,7 +392,7 @@ pub async fn joined_rooms_route( /// /// - Only works if the user is currently joined pub async fn get_member_events_route( - body: Ruma<get_member_events::v3::IncomingRequest>, + body: Ruma<get_member_events::v3::Request>, ) -> Result<get_member_events::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -418,7 +428,7 @@ pub async fn get_member_events_route( /// - The sender user must be in the room /// - TODO: An appservice just needs a puppet joined pub async fn joined_members_route( - body: Ruma<joined_members::v3::IncomingRequest>, + body: Ruma<joined_members::v3::Request>, ) -> Result<joined_members::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -458,8 +468,9 @@ pub async fn joined_members_route( async fn join_room_by_id_helper( sender_user: Option<&UserId>, room_id: &RoomId, + reason: Option<String>, servers: &[OwnedServerName], - _third_party_signed: Option<&IncomingThirdPartySigned>, + _third_party_signed: Option<&ThirdPartySigned>, ) -> Result<join_room_by_id::v3::Response> { let sender_user = sender_user.expect("user is authenticated"); @@ -480,33 +491,10 @@ async fn join_room_by_id_helper( .state_cache .server_in_room(services().globals.server_name(), room_id)? { - let mut make_join_response_and_server = Err(Error::BadServerResponse( - "No server available to assist in joining.", - )); - - for remote_server in servers { - let make_join_response = services() - .sending - .send_federation_request( - remote_server, - federation::membership::prepare_join_event::v1::Request { - room_id, - user_id: sender_user, - ver: &services().globals.supported_room_versions(), - }, - ) - .await; - - make_join_response_and_server = make_join_response.map(|r| (r, remote_server)); - - if make_join_response_and_server.is_ok() { - break; - } - } + let (make_join_response, remote_server) = + make_join_request(sender_user, room_id, servers).await?; - let (make_join_response, remote_server) = make_join_response_and_server?; - - let room_version = match make_join_response.room_version { + let room_version_id = match make_join_response.room_version { Some(room_version) if services() .globals @@ -554,7 +542,7 @@ async fn join_room_by_id_helper( is_direct: None, third_party_invite: None, blurhash: services().users.blurhash(sender_user)?, - reason: None, + reason, join_authorized_via_users_server, }) .expect("event is valid, we just created it"), @@ -568,14 +556,14 @@ async fn join_room_by_id_helper( services().globals.server_name().as_str(), services().globals.keypair(), &mut join_event_stub, - &room_version, + &room_version_id, ) .expect("event is valid, we just created it"); // Generate event id let event_id = format!( "${}", - ruma::signatures::reference_hash(&join_event_stub, &room_version) + ruma::signatures::reference_hash(&join_event_stub, &room_version_id) .expect("ruma can calculate reference hashes") ); let event_id = <&EventId>::try_from(event_id.as_str()) @@ -588,23 +576,67 @@ async fn join_room_by_id_helper( ); // It has enough fields to be called a proper event now - let join_event = join_event_stub; + let mut join_event = join_event_stub; let send_join_response = services() .sending .send_federation_request( - remote_server, + &remote_server, federation::membership::create_join_event::v2::Request { - room_id, - event_id, - pdu: &PduEvent::convert_to_outgoing_federation_event(join_event.clone()), + room_id: room_id.to_owned(), + event_id: event_id.to_owned(), + pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()), }, ) .await?; + if let Some(signed_raw) = &send_join_response.room_state.event { + let (signed_event_id, signed_value) = + match gen_event_id_canonical_json(signed_raw, &room_version_id) { + Ok(t) => t, + Err(_) => { + // Event could not be converted to canonical json + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Could not convert event to canonical json.", + )); + } + }; + + if signed_event_id != event_id { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Server sent event with wrong event id", + )); + } + + if let Ok(signature) = signed_value["signatures"] + .as_object() + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Server sent invalid signatures type", + )) + .and_then(|e| { + e.get(remote_server.as_str()).ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Server did not send its signature", + )) + }) + { + join_event + .get_mut("signatures") + .expect("we created a valid pdu") + .as_object_mut() + .expect("we created a valid pdu") + .insert(remote_server.to_string(), signature.clone()); + } else { + warn!("Server {} sent invalid sendjoin event", remote_server); + } + } + services().rooms.short.get_or_create_shortroomid(room_id)?; - let parsed_pdu = PduEvent::from_id_val(event_id, join_event.clone()) + let parsed_join_pdu = PduEvent::from_id_val(event_id, join_event.clone()) .map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?; let mut state = HashMap::new(); @@ -613,14 +645,14 @@ async fn join_room_by_id_helper( services() .rooms .event_handler - .fetch_join_signing_keys(&send_join_response, &room_version, &pub_key_map) + .fetch_join_signing_keys(&send_join_response, &room_version_id, &pub_key_map) .await?; for result in send_join_response .room_state .state .iter() - .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map)) + .map(|pdu| validate_and_add_event_id(pdu, &room_version_id, &pub_key_map)) { let (event_id, value) = match result { Ok(t) => t, @@ -645,31 +677,11 @@ async fn join_room_by_id_helper( } } - let incoming_shortstatekey = services().rooms.short.get_or_create_shortstatekey( - &parsed_pdu.kind.to_string().into(), - parsed_pdu - .state_key - .as_ref() - .expect("Pdu is a membership state event"), - )?; - - state.insert(incoming_shortstatekey, parsed_pdu.event_id.clone()); - - let create_shortstatekey = services() - .rooms - .short - .get_shortstatekey(&StateEventType::RoomCreate, "")? - .expect("Room exists"); - - if state.get(&create_shortstatekey).is_none() { - return Err(Error::BadServerResponse("State contained no create event.")); - } - for result in send_join_response .room_state .auth_chain .iter() - .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map)) + .map(|pdu| validate_and_add_event_id(pdu, &room_version_id, &pub_key_map)) { let (event_id, value) = match result { Ok(t) => t, @@ -682,6 +694,34 @@ async fn join_room_by_id_helper( .add_pdu_outlier(&event_id, &value)?; } + if !state_res::event_auth::auth_check( + &state_res::RoomVersion::new(&room_version_id).expect("room version is supported"), + &parsed_join_pdu, + None::<PduEvent>, // TODO: third party invite + |k, s| { + services() + .rooms + .timeline + .get_pdu( + state.get( + &services() + .rooms + .short + .get_or_create_shortstatekey(&k.to_string().into(), s) + .ok()?, + )?, + ) + .ok()? + }, + ) + .map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed"))? + { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Auth check failed", + )); + } + let (statehash_before_join, new, removed) = services().rooms.state_compressor.save_state( room_id, state @@ -705,12 +745,12 @@ async fn join_room_by_id_helper( // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - let statehash_after_join = services().rooms.state.append_to_state(&parsed_pdu)?; + let statehash_after_join = services().rooms.state.append_to_state(&parsed_join_pdu)?; services().rooms.timeline.append_pdu( - &parsed_pdu, + &parsed_join_pdu, join_event, - vec![(*parsed_pdu.event_id).to_owned()], + vec![(*parsed_join_pdu.event_id).to_owned()], &state_lock, )?; @@ -721,6 +761,95 @@ async fn join_room_by_id_helper( .state .set_room_state(room_id, statehash_after_join, &state_lock)?; } else { + let join_rules_event = services().rooms.state_accessor.room_state_get( + room_id, + &StateEventType::RoomJoinRules, + "", + )?; + let power_levels_event = services().rooms.state_accessor.room_state_get( + room_id, + &StateEventType::RoomPowerLevels, + "", + )?; + + let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event + .as_ref() + .map(|join_rules_event| { + serde_json::from_str(join_rules_event.content.get()).map_err(|e| { + warn!("Invalid join rules event: {}", e); + Error::bad_database("Invalid join rules event in db.") + }) + }) + .transpose()?; + let power_levels_event_content: Option<RoomPowerLevelsEventContent> = power_levels_event + .as_ref() + .map(|power_levels_event| { + serde_json::from_str(power_levels_event.content.get()).map_err(|e| { + warn!("Invalid power levels event: {}", e); + Error::bad_database("Invalid power levels event in db.") + }) + }) + .transpose()?; + + let restriction_rooms = match join_rules_event_content { + Some(RoomJoinRulesEventContent { + join_rule: JoinRule::Restricted(restricted), + }) + | Some(RoomJoinRulesEventContent { + join_rule: JoinRule::KnockRestricted(restricted), + }) => restricted + .allow + .into_iter() + .filter_map(|a| match a { + AllowRule::RoomMembership(r) => Some(r.room_id), + _ => None, + }) + .collect(), + _ => Vec::new(), + }; + + let authorized_user = restriction_rooms + .iter() + .find_map(|restriction_room_id| { + if !services() + .rooms + .state_cache + .is_joined(sender_user, restriction_room_id) + .ok()? + { + return None; + } + let authorized_user = power_levels_event_content + .as_ref() + .and_then(|c| { + c.users + .iter() + .filter(|(uid, i)| { + uid.server_name() == services().globals.server_name() + && **i > ruma::int!(0) + && services() + .rooms + .state_cache + .is_joined(uid, restriction_room_id) + .unwrap_or(false) + }) + .max_by_key(|(_, i)| *i) + .map(|(u, _)| u.to_owned()) + }) + .or_else(|| { + // TODO: Check here if user is actually allowed to invite. Currently the auth + // check will just fail in this case. + services() + .rooms + .state_cache + .room_members(restriction_room_id) + .filter_map(|r| r.ok()) + .find(|uid| uid.server_name() == services().globals.server_name()) + }); + Some(authorized_user) + }) + .flatten(); + let event = RoomMemberEventContent { membership: MembershipState::Join, displayname: services().users.displayname(sender_user)?, @@ -728,11 +857,12 @@ async fn join_room_by_id_helper( is_direct: None, third_party_invite: None, blurhash: services().users.blurhash(sender_user)?, - reason: None, - join_authorized_via_users_server: None, + reason: reason.clone(), + join_authorized_via_users_server: authorized_user, }; - services().rooms.timeline.build_and_append_pdu( + // Try normal join first + let error = match services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMember, content: to_raw_value(&event).expect("event is valid, we just created it"), @@ -743,14 +873,193 @@ async fn join_room_by_id_helper( sender_user, room_id, &state_lock, - )?; - } + ) { + Ok(_event_id) => return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())), + Err(e) => e, + }; - drop(state_lock); + if !restriction_rooms.is_empty() { + // We couldn't do the join locally, maybe federation can help to satisfy the restricted + // join requirements + let (make_join_response, remote_server) = + make_join_request(sender_user, room_id, servers).await?; + + let room_version_id = match make_join_response.room_version { + Some(room_version_id) + if services() + .globals + .supported_room_versions() + .contains(&room_version_id) => + { + room_version_id + } + _ => return Err(Error::BadServerResponse("Room version is not supported")), + }; + let mut join_event_stub: CanonicalJsonObject = + serde_json::from_str(make_join_response.event.get()).map_err(|_| { + Error::BadServerResponse("Invalid make_join event json received from server.") + })?; + let join_authorized_via_users_server = join_event_stub + .get("content") + .map(|s| { + s.as_object()? + .get("join_authorised_via_users_server")? + .as_str() + }) + .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()); + // TODO: Is origin needed? + join_event_stub.insert( + "origin".to_owned(), + CanonicalJsonValue::String(services().globals.server_name().as_str().to_owned()), + ); + join_event_stub.insert( + "origin_server_ts".to_owned(), + CanonicalJsonValue::Integer( + utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + ), + ); + join_event_stub.insert( + "content".to_owned(), + to_canonical_value(RoomMemberEventContent { + membership: MembershipState::Join, + displayname: services().users.displayname(sender_user)?, + avatar_url: services().users.avatar_url(sender_user)?, + is_direct: None, + third_party_invite: None, + blurhash: services().users.blurhash(sender_user)?, + reason, + join_authorized_via_users_server, + }) + .expect("event is valid, we just created it"), + ); + + // We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms + join_event_stub.remove("event_id"); + + // In order to create a compatible ref hash (EventID) the `hashes` field needs to be present + ruma::signatures::hash_and_sign_event( + services().globals.server_name().as_str(), + services().globals.keypair(), + &mut join_event_stub, + &room_version_id, + ) + .expect("event is valid, we just created it"); + + // Generate event id + let event_id = format!( + "${}", + ruma::signatures::reference_hash(&join_event_stub, &room_version_id) + .expect("ruma can calculate reference hashes") + ); + let event_id = <&EventId>::try_from(event_id.as_str()) + .expect("ruma's reference hashes are valid event ids"); + + // Add event_id back + join_event_stub.insert( + "event_id".to_owned(), + CanonicalJsonValue::String(event_id.as_str().to_owned()), + ); + + // It has enough fields to be called a proper event now + let join_event = join_event_stub; + + let send_join_response = services() + .sending + .send_federation_request( + &remote_server, + federation::membership::create_join_event::v2::Request { + room_id: room_id.to_owned(), + event_id: event_id.to_owned(), + pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()), + }, + ) + .await?; + + if let Some(signed_raw) = send_join_response.room_state.event { + let (signed_event_id, signed_value) = + match gen_event_id_canonical_json(&signed_raw, &room_version_id) { + Ok(t) => t, + Err(_) => { + // Event could not be converted to canonical json + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Could not convert event to canonical json.", + )); + } + }; + + if signed_event_id != event_id { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Server sent event with wrong event id", + )); + } + + drop(state_lock); + let pub_key_map = RwLock::new(BTreeMap::new()); + services() + .rooms + .event_handler + .handle_incoming_pdu( + &remote_server, + &signed_event_id, + room_id, + signed_value, + true, + &pub_key_map, + ) + .await?; + } else { + return Err(error); + } + } else { + return Err(error); + } + } Ok(join_room_by_id::v3::Response::new(room_id.to_owned())) } +async fn make_join_request( + sender_user: &UserId, + room_id: &RoomId, + servers: &[OwnedServerName], +) -> Result<( + federation::membership::prepare_join_event::v1::Response, + OwnedServerName, +)> { + let mut make_join_response_and_server = Err(Error::BadServerResponse( + "No server available to assist in joining.", + )); + + for remote_server in servers { + if remote_server == services().globals.server_name() { + continue; + } + let make_join_response = services() + .sending + .send_federation_request( + remote_server, + federation::membership::prepare_join_event::v1::Request { + room_id: room_id.to_owned(), + user_id: sender_user.to_owned(), + ver: services().globals.supported_room_versions(), + }, + ) + .await; + + make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone())); + + if make_join_response_and_server.is_ok() { + break; + } + } + + make_join_response_and_server +} + fn validate_and_add_event_id( pdu: &RawJsonValue, room_version: &RoomVersionId, @@ -823,10 +1132,11 @@ pub(crate) async fn invite_helper<'a>( sender_user: &UserId, user_id: &UserId, room_id: &RoomId, + reason: Option<String>, is_direct: bool, ) -> Result<()> { if user_id.server_name() != services().globals.server_name() { - let (pdu_json, invite_room_state) = { + let (pdu, pdu_json, invite_room_state) = { let mutex_state = Arc::clone( services() .globals @@ -845,7 +1155,7 @@ pub(crate) async fn invite_helper<'a>( membership: MembershipState::Invite, third_party_invite: None, blurhash: None, - reason: None, + reason, join_authorized_via_users_server: None, }) .expect("member event is valid value"); @@ -867,31 +1177,21 @@ pub(crate) async fn invite_helper<'a>( drop(state_lock); - (pdu_json, invite_room_state) + (pdu, pdu_json, invite_room_state) }; - // Generate event id - let expected_event_id = format!( - "${}", - ruma::signatures::reference_hash( - &pdu_json, - &services().rooms.state.get_room_version(room_id)? - ) - .expect("ruma can calculate reference hashes") - ); - let expected_event_id = <&EventId>::try_from(expected_event_id.as_str()) - .expect("ruma's reference hashes are valid event ids"); + let room_version_id = services().rooms.state.get_room_version(room_id)?; let response = services() .sending .send_federation_request( user_id.server_name(), create_invite::v2::Request { - room_id, - event_id: expected_event_id, - room_version: &services().rooms.state.get_room_version(room_id)?, - event: &PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()), - invite_room_state: &invite_room_state, + room_id: room_id.to_owned(), + event_id: (*pdu.event_id).to_owned(), + room_version: room_version_id.clone(), + event: PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()), + invite_room_state, }, ) .await?; @@ -899,7 +1199,8 @@ pub(crate) async fn invite_helper<'a>( let pub_key_map = RwLock::new(BTreeMap::new()); // We do not add the event_id field to the pdu here because of signature and hashes checks - let (event_id, value) = match gen_event_id_canonical_json(&response.event) { + let (event_id, value) = match gen_event_id_canonical_json(&response.event, &room_version_id) + { Ok(t) => t, Err(_) => { // Event could not be converted to canonical json @@ -910,7 +1211,7 @@ pub(crate) async fn invite_helper<'a>( } }; - if expected_event_id != event_id { + if *pdu.event_id != *event_id { warn!("Server {} changed invite event, that's not allowed in the spec: ours: {:?}, theirs: {:?}", user_id.server_name(), pdu_json, value); } @@ -978,7 +1279,7 @@ pub(crate) async fn invite_helper<'a>( is_direct: Some(is_direct), third_party_invite: None, blurhash: services().users.blurhash(user_id)?, - reason: None, + reason, join_authorized_via_users_server: None, }) .expect("event is valid, we just created it"), @@ -1017,13 +1318,13 @@ pub async fn leave_all_rooms(user_id: &UserId) -> Result<()> { Err(_) => continue, }; - let _ = leave_room(user_id, &room_id).await; + let _ = leave_room(user_id, &room_id, None).await; } Ok(()) } -pub async fn leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { +pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option<String>) -> Result<()> { // Ask a remote server if we don't have this room if !services().rooms.metadata.exists(room_id)? && room_id.server_name() != services().globals.server_name() @@ -1063,21 +1364,35 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { ); let state_lock = mutex_state.lock().await; - let mut event: RoomMemberEventContent = serde_json::from_str( - services() - .rooms - .state_accessor - .room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())? - .ok_or(Error::BadRequest( - ErrorKind::BadState, - "Cannot leave a room you are not a member of.", - ))? - .content - .get(), - ) - .map_err(|_| Error::bad_database("Invalid member event in database."))?; + let member_event = services().rooms.state_accessor.room_state_get( + room_id, + &StateEventType::RoomMember, + user_id.as_str(), + )?; + + // Fix for broken rooms + let member_event = match member_event { + None => { + error!("Trying to leave a room you are not a member of."); + + services().rooms.state_cache.update_membership( + room_id, + user_id, + MembershipState::Leave, + user_id, + None, + true, + )?; + return Ok(()); + } + Some(e) => e, + }; + + let mut event: RoomMemberEventContent = serde_json::from_str(member_event.content.get()) + .map_err(|_| Error::bad_database("Invalid member event in database."))?; event.membership = MembershipState::Leave; + event.reason = reason; services().rooms.timeline.build_and_append_pdu( PduBuilder { @@ -1124,7 +1439,10 @@ async fn remote_leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { .sending .send_federation_request( &remote_server, - federation::membership::prepare_leave_event::v1::Request { room_id, user_id }, + federation::membership::prepare_leave_event::v1::Request { + room_id: room_id.to_owned(), + user_id: user_id.to_owned(), + }, ) .await; @@ -1201,9 +1519,9 @@ async fn remote_leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { .send_federation_request( &remote_server, federation::membership::create_leave_event::v2::Request { - room_id, - event_id: &event_id, - pdu: &PduEvent::convert_to_outgoing_federation_event(leave_event.clone()), + room_id: room_id.to_owned(), + event_id, + pdu: PduEvent::convert_to_outgoing_federation_event(leave_event.clone()), }, ) .await?; diff --git a/src/api/client_server/message.rs b/src/api/client_server/message.rs index b04c262..6ad0751 100644 --- a/src/api/client_server/message.rs +++ b/src/api/client_server/message.rs @@ -19,7 +19,7 @@ use std::{ /// - The only requirement for the content is that it has to be valid json /// - Tries to send the event into the room, auth rules will determine if it is allowed pub async fn send_message_event_route( - body: Ruma<send_message_event::v3::IncomingRequest>, + body: Ruma<send_message_event::v3::Request>, ) -> Result<send_message_event::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_deref(); @@ -105,7 +105,7 @@ pub async fn send_message_event_route( /// - Only works if the user is joined (TODO: always allow, but only show events where the user was /// joined, depending on history_visibility) pub async fn get_message_events_route( - body: Ruma<get_message_events::v3::IncomingRequest>, + body: Ruma<get_message_events::v3::Request>, ) -> Result<get_message_events::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/mod.rs b/src/api/client_server/mod.rs index 65b7a10..6ed17e7 100644 --- a/src/api/client_server/mod.rs +++ b/src/api/client_server/mod.rs @@ -63,6 +63,6 @@ pub use user_directory::*; pub use voip::*; pub const DEVICE_ID_LENGTH: usize = 10; -pub const TOKEN_LENGTH: usize = 256; -pub const SESSION_ID_LENGTH: usize = 256; +pub const TOKEN_LENGTH: usize = 32; +pub const SESSION_ID_LENGTH: usize = 32; pub const AUTO_GEN_PASSWORD_LENGTH: usize = 15; diff --git a/src/api/client_server/presence.rs b/src/api/client_server/presence.rs index dfac3db..ef88d1a 100644 --- a/src/api/client_server/presence.rs +++ b/src/api/client_server/presence.rs @@ -6,7 +6,7 @@ use std::time::Duration; /// /// Sets the presence state of the sender user. pub async fn set_presence_route( - body: Ruma<set_presence::v3::IncomingRequest>, + body: Ruma<set_presence::v3::Request>, ) -> Result<set_presence::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -43,7 +43,7 @@ pub async fn set_presence_route( /// /// - Only works if you share a room with the user pub async fn get_presence_route( - body: Ruma<get_presence::v3::IncomingRequest>, + body: Ruma<get_presence::v3::Request>, ) -> Result<get_presence::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs index 5ace177..6400e89 100644 --- a/src/api/client_server/profile.rs +++ b/src/api/client_server/profile.rs @@ -20,7 +20,7 @@ use std::sync::Arc; /// /// - Also makes sure other users receive the update using presence EDUs pub async fn set_displayname_route( - body: Ruma<set_display_name::v3::IncomingRequest>, + body: Ruma<set_display_name::v3::Request>, ) -> Result<set_display_name::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -121,7 +121,7 @@ pub async fn set_displayname_route( /// /// - If user is on another server: Fetches displayname over federation pub async fn get_displayname_route( - body: Ruma<get_display_name::v3::IncomingRequest>, + body: Ruma<get_display_name::v3::Request>, ) -> Result<get_display_name::v3::Response> { if body.user_id.server_name() != services().globals.server_name() { let response = services() @@ -129,8 +129,8 @@ pub async fn get_displayname_route( .send_federation_request( body.user_id.server_name(), federation::query::get_profile_information::v1::Request { - user_id: &body.user_id, - field: Some(&ProfileField::DisplayName), + user_id: body.user_id.clone(), + field: Some(ProfileField::DisplayName), }, ) .await?; @@ -151,7 +151,7 @@ pub async fn get_displayname_route( /// /// - Also makes sure other users receive the update using presence EDUs pub async fn set_avatar_url_route( - body: Ruma<set_avatar_url::v3::IncomingRequest>, + body: Ruma<set_avatar_url::v3::Request>, ) -> Result<set_avatar_url::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -256,7 +256,7 @@ pub async fn set_avatar_url_route( /// /// - If user is on another server: Fetches avatar_url and blurhash over federation pub async fn get_avatar_url_route( - body: Ruma<get_avatar_url::v3::IncomingRequest>, + body: Ruma<get_avatar_url::v3::Request>, ) -> Result<get_avatar_url::v3::Response> { if body.user_id.server_name() != services().globals.server_name() { let response = services() @@ -264,8 +264,8 @@ pub async fn get_avatar_url_route( .send_federation_request( body.user_id.server_name(), federation::query::get_profile_information::v1::Request { - user_id: &body.user_id, - field: Some(&ProfileField::AvatarUrl), + user_id: body.user_id.clone(), + field: Some(ProfileField::AvatarUrl), }, ) .await?; @@ -288,7 +288,7 @@ pub async fn get_avatar_url_route( /// /// - If user is on another server: Fetches profile over federation pub async fn get_profile_route( - body: Ruma<get_profile::v3::IncomingRequest>, + body: Ruma<get_profile::v3::Request>, ) -> Result<get_profile::v3::Response> { if body.user_id.server_name() != services().globals.server_name() { let response = services() @@ -296,7 +296,7 @@ pub async fn get_profile_route( .send_federation_request( body.user_id.server_name(), federation::query::get_profile_information::v1::Request { - user_id: &body.user_id, + user_id: body.user_id.clone(), field: None, }, ) diff --git a/src/api/client_server/push.rs b/src/api/client_server/push.rs index 2301ddc..b044138 100644 --- a/src/api/client_server/push.rs +++ b/src/api/client_server/push.rs @@ -5,11 +5,11 @@ use ruma::{ push::{ delete_pushrule, get_pushers, get_pushrule, get_pushrule_actions, get_pushrule_enabled, get_pushrules_all, set_pusher, set_pushrule, set_pushrule_actions, - set_pushrule_enabled, RuleKind, + set_pushrule_enabled, RuleKind, RuleScope, }, }, events::{push_rules::PushRulesEvent, GlobalAccountDataEventType}, - push::{ConditionalPushRuleInit, PatternedPushRuleInit, SimplePushRuleInit}, + push::{ConditionalPushRuleInit, NewPushRule, PatternedPushRuleInit, SimplePushRuleInit}, }; /// # `GET /_matrix/client/r0/pushrules` @@ -45,7 +45,7 @@ pub async fn get_pushrules_all_route( /// /// Retrieves a single specified push rule for this user. pub async fn get_pushrule_route( - body: Ruma<get_pushrule::v3::IncomingRequest>, + body: Ruma<get_pushrule::v3::Request>, ) -> Result<get_pushrule::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -104,12 +104,12 @@ pub async fn get_pushrule_route( /// /// Creates a single specified push rule for this user. pub async fn set_pushrule_route( - body: Ruma<set_pushrule::v3::IncomingRequest>, + body: Ruma<set_pushrule::v3::Request>, ) -> Result<set_pushrule::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let body = body.body; - if body.scope != "global" { + if body.scope != RuleScope::Global { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Scopes other than 'global' are not supported.", @@ -132,66 +132,65 @@ pub async fn set_pushrule_route( .map_err(|_| Error::bad_database("Invalid account data event in db."))?; let global = &mut account_data.content.global; - match body.kind { - RuleKind::Override => { + match body.rule { + NewPushRule::Override(rule) => { global.override_.replace( ConditionalPushRuleInit { - actions: body.actions, + actions: rule.actions, default: false, enabled: true, - rule_id: body.rule_id, - conditions: body.conditions, + rule_id: rule.rule_id, + conditions: rule.conditions, } .into(), ); } - RuleKind::Underride => { + NewPushRule::Underride(rule) => { global.underride.replace( ConditionalPushRuleInit { - actions: body.actions, + actions: rule.actions, default: false, enabled: true, - rule_id: body.rule_id, - conditions: body.conditions, + rule_id: rule.rule_id, + conditions: rule.conditions, } .into(), ); } - RuleKind::Sender => { + NewPushRule::Sender(rule) => { global.sender.replace( SimplePushRuleInit { - actions: body.actions, + actions: rule.actions, default: false, enabled: true, - rule_id: body.rule_id, + rule_id: rule.rule_id, } .into(), ); } - RuleKind::Room => { + NewPushRule::Room(rule) => { global.room.replace( SimplePushRuleInit { - actions: body.actions, + actions: rule.actions, default: false, enabled: true, - rule_id: body.rule_id, + rule_id: rule.rule_id, } .into(), ); } - RuleKind::Content => { + NewPushRule::Content(rule) => { global.content.replace( PatternedPushRuleInit { - actions: body.actions, + actions: rule.actions, default: false, enabled: true, - rule_id: body.rule_id, - pattern: body.pattern.unwrap_or_default(), + rule_id: rule.rule_id, + pattern: rule.pattern, } .into(), ); } - _ => {} } services().account_data.update( @@ -208,11 +207,11 @@ pub async fn set_pushrule_route( /// /// Gets the actions of a single specified push rule for this user. pub async fn get_pushrule_actions_route( - body: Ruma<get_pushrule_actions::v3::IncomingRequest>, + body: Ruma<get_pushrule_actions::v3::Request>, ) -> Result<get_pushrule_actions::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if body.scope != "global" { + if body.scope != RuleScope::Global { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Scopes other than 'global' are not supported.", @@ -269,11 +268,11 @@ pub async fn get_pushrule_actions_route( /// /// Sets the actions of a single specified push rule for this user. pub async fn set_pushrule_actions_route( - body: Ruma<set_pushrule_actions::v3::IncomingRequest>, + body: Ruma<set_pushrule_actions::v3::Request>, ) -> Result<set_pushrule_actions::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if body.scope != "global" { + if body.scope != RuleScope::Global { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Scopes other than 'global' are not supported.", @@ -344,11 +343,11 @@ pub async fn set_pushrule_actions_route( /// /// Gets the enabled status of a single specified push rule for this user. pub async fn get_pushrule_enabled_route( - body: Ruma<get_pushrule_enabled::v3::IncomingRequest>, + body: Ruma<get_pushrule_enabled::v3::Request>, ) -> Result<get_pushrule_enabled::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if body.scope != "global" { + if body.scope != RuleScope::Global { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Scopes other than 'global' are not supported.", @@ -407,11 +406,11 @@ pub async fn get_pushrule_enabled_route( /// /// Sets the enabled status of a single specified push rule for this user. pub async fn set_pushrule_enabled_route( - body: Ruma<set_pushrule_enabled::v3::IncomingRequest>, + body: Ruma<set_pushrule_enabled::v3::Request>, ) -> Result<set_pushrule_enabled::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if body.scope != "global" { + if body.scope != RuleScope::Global { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Scopes other than 'global' are not supported.", @@ -487,11 +486,11 @@ pub async fn set_pushrule_enabled_route( /// /// Deletes a single specified push rule for this user. pub async fn delete_pushrule_route( - body: Ruma<delete_pushrule::v3::IncomingRequest>, + body: Ruma<delete_pushrule::v3::Request>, ) -> Result<delete_pushrule::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if body.scope != "global" { + if body.scope != RuleScope::Global { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Scopes other than 'global' are not supported.", @@ -575,9 +574,10 @@ pub async fn set_pushers_route( body: Ruma<set_pusher::v3::Request>, ) -> Result<set_pusher::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let pusher = body.pusher.clone(); - services().pusher.set_pusher(sender_user, pusher)?; + services() + .pusher + .set_pusher(sender_user, body.action.clone())?; Ok(set_pusher::v3::Response::default()) } diff --git a/src/api/client_server/read_marker.rs b/src/api/client_server/read_marker.rs index d529c6a..b12468a 100644 --- a/src/api/client_server/read_marker.rs +++ b/src/api/client_server/read_marker.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; /// - Updates fully-read account data event to `fully_read` /// - If `read_receipt` is set: Update private marker and public read receipt EDU pub async fn set_read_marker_route( - body: Ruma<set_read_marker::v3::IncomingRequest>, + body: Ruma<set_read_marker::v3::Request>, ) -> Result<set_read_marker::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -89,7 +89,7 @@ pub async fn set_read_marker_route( /// /// Sets private read marker and public read receipt EDU. pub async fn create_receipt_route( - body: Ruma<create_receipt::v3::IncomingRequest>, + body: Ruma<create_receipt::v3::Request>, ) -> Result<create_receipt::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/redact.rs b/src/api/client_server/redact.rs index ab586c0..a29a561 100644 --- a/src/api/client_server/redact.rs +++ b/src/api/client_server/redact.rs @@ -14,7 +14,7 @@ use serde_json::value::to_raw_value; /// /// - TODO: Handle txn id pub async fn redact_event_route( - body: Ruma<redact_event::v3::IncomingRequest>, + body: Ruma<redact_event::v3::Request>, ) -> Result<redact_event::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let body = body.body; diff --git a/src/api/client_server/report.rs b/src/api/client_server/report.rs index e45820e..ab5027c 100644 --- a/src/api/client_server/report.rs +++ b/src/api/client_server/report.rs @@ -10,7 +10,7 @@ use ruma::{ /// Reports an inappropriate event to homeserver admins /// pub async fn report_event_route( - body: Ruma<report_content::v3::IncomingRequest>, + body: Ruma<report_content::v3::Request>, ) -> Result<report_content::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index 097f0e1..830e085 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -46,7 +46,7 @@ use tracing::{info, warn}; /// - Send events implied by `name` and `topic` /// - Send invite events pub async fn create_room_route( - body: Ruma<create_room::v3::IncomingRequest>, + body: Ruma<create_room::v3::Request>, ) -> Result<create_room::v3::Response> { use create_room::v3::RoomPreset; @@ -398,7 +398,7 @@ pub async fn create_room_route( // 8. Events implied by invite (and TODO: invite_3pid) drop(state_lock); for user_id in &body.invite { - let _ = invite_helper(sender_user, user_id, &room_id, body.is_direct).await; + let _ = invite_helper(sender_user, user_id, &room_id, None, body.is_direct).await; } // Homeserver specific stuff @@ -421,7 +421,7 @@ pub async fn create_room_route( /// /// - You have to currently be joined to the room (TODO: Respect history visibility) pub async fn get_room_event_route( - body: Ruma<get_room_event::v3::IncomingRequest>, + body: Ruma<get_room_event::v3::Request>, ) -> Result<get_room_event::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -452,7 +452,7 @@ pub async fn get_room_event_route( /// /// - Only users joined to the room are allowed to call this TODO: Allow any user to call it if history_visibility is world readable pub async fn get_room_aliases_route( - body: Ruma<aliases::v3::IncomingRequest>, + body: Ruma<aliases::v3::Request>, ) -> Result<aliases::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -488,7 +488,7 @@ pub async fn get_room_aliases_route( /// - Moves local aliases /// - Modifies old room power levels to prevent users from speaking pub async fn upgrade_room_route( - body: Ruma<upgrade_room::v3::IncomingRequest>, + body: Ruma<upgrade_room::v3::Request>, ) -> Result<upgrade_room::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/search.rs b/src/api/client_server/search.rs index 1ba9cdf..51255d5 100644 --- a/src/api/client_server/search.rs +++ b/src/api/client_server/search.rs @@ -15,7 +15,7 @@ use std::collections::BTreeMap; /// /// - Only works if the user is currently joined to the room (TODO: Respect history visibility) pub async fn search_events_route( - body: Ruma<search_events::v3::IncomingRequest>, + body: Ruma<search_events::v3::Request>, ) -> Result<search_events::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -103,7 +103,7 @@ pub async fn search_events_route( .take(limit) .collect(); - let next_batch = if results.len() < limit as usize { + let next_batch = if results.len() < limit { None } else { Some((skip + limit).to_string()) diff --git a/src/api/client_server/session.rs b/src/api/client_server/session.rs index 7c8c128..64c0072 100644 --- a/src/api/client_server/session.rs +++ b/src/api/client_server/session.rs @@ -4,7 +4,7 @@ use ruma::{ api::client::{ error::ErrorKind, session::{get_login_types, login, logout, logout_all}, - uiaa::IncomingUserIdentifier, + uiaa::UserIdentifier, }, UserId, }; @@ -22,7 +22,7 @@ struct Claims { /// Get the supported login types of this server. One of these should be used as the `type` field /// when logging in. pub async fn get_login_types_route( - _body: Ruma<get_login_types::v3::IncomingRequest>, + _body: Ruma<get_login_types::v3::Request>, ) -> Result<get_login_types::v3::Response> { Ok(get_login_types::v3::Response::new(vec![ get_login_types::v3::LoginType::Password(Default::default()), @@ -40,15 +40,15 @@ pub async fn get_login_types_route( /// /// Note: You can use [`GET /_matrix/client/r0/login`](fn.get_supported_versions_route.html) to see /// supported login types. -pub async fn login_route(body: Ruma<login::v3::IncomingRequest>) -> Result<login::v3::Response> { +pub async fn login_route(body: Ruma<login::v3::Request>) -> Result<login::v3::Response> { // Validate login method // TODO: Other login methods let user_id = match &body.login_info { - login::v3::IncomingLoginInfo::Password(login::v3::IncomingPassword { + login::v3::LoginInfo::Password(login::v3::Password { identifier, password, }) => { - let username = if let IncomingUserIdentifier::UserIdOrLocalpart(user_id) = identifier { + let username = if let UserIdentifier::UserIdOrLocalpart(user_id) = identifier { user_id.to_lowercase() } else { return Err(Error::BadRequest(ErrorKind::Forbidden, "Bad login type.")); @@ -84,7 +84,7 @@ pub async fn login_route(body: Ruma<login::v3::IncomingRequest>) -> Result<login user_id } - login::v3::IncomingLoginInfo::Token(login::v3::IncomingToken { token }) => { + login::v3::LoginInfo::Token(login::v3::Token { token }) => { if let Some(jwt_decoding_key) = services().globals.jwt_decoding_key() { let token = jsonwebtoken::decode::<Claims>( token, diff --git a/src/api/client_server/state.rs b/src/api/client_server/state.rs index 36466b8..d9c1464 100644 --- a/src/api/client_server/state.rs +++ b/src/api/client_server/state.rs @@ -25,7 +25,7 @@ use ruma::{ /// - Tries to send the event into the room, auth rules will determine if it is allowed /// - If event is new canonical_alias: Rejects if alias is incorrect pub async fn send_state_event_for_key_route( - body: Ruma<send_state_event::v3::IncomingRequest>, + body: Ruma<send_state_event::v3::Request>, ) -> Result<send_state_event::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -50,7 +50,7 @@ pub async fn send_state_event_for_key_route( /// - Tries to send the event into the room, auth rules will determine if it is allowed /// - If event is new canonical_alias: Rejects if alias is incorrect pub async fn send_state_event_for_empty_key_route( - body: Ruma<send_state_event::v3::IncomingRequest>, + body: Ruma<send_state_event::v3::Request>, ) -> Result<RumaResponse<send_state_event::v3::Response>> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -81,7 +81,7 @@ pub async fn send_state_event_for_empty_key_route( /// /// - If not joined: Only works if current room history visibility is world readable pub async fn get_state_events_route( - body: Ruma<get_state_events::v3::IncomingRequest>, + body: Ruma<get_state_events::v3::Request>, ) -> Result<get_state_events::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -133,7 +133,7 @@ pub async fn get_state_events_route( /// /// - If not joined: Only works if current room history visibility is world readable pub async fn get_state_events_for_key_route( - body: Ruma<get_state_events_for_key::v3::IncomingRequest>, + body: Ruma<get_state_events_for_key::v3::Request>, ) -> Result<get_state_events_for_key::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -188,7 +188,7 @@ pub async fn get_state_events_for_key_route( /// /// - If not joined: Only works if current room history visibility is world readable pub async fn get_state_events_for_empty_key_route( - body: Ruma<get_state_events_for_key::v3::IncomingRequest>, + body: Ruma<get_state_events_for_key::v3::Request>, ) -> Result<RumaResponse<get_state_events_for_key::v3::Response>> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 828ae19..568a23c 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -1,7 +1,7 @@ use crate::{services, Error, Result, Ruma, RumaResponse}; use ruma::{ api::client::{ - filter::{IncomingFilterDefinition, LazyLoadOptions}, + filter::{FilterDefinition, LazyLoadOptions}, sync::sync_events::{self, DeviceLists, UnreadNotificationsCount}, uiaa::UiaaResponse, }, @@ -55,7 +55,7 @@ use tracing::error; /// - Sync is handled in an async task, multiple requests from the same device with the same /// `since` will be cached pub async fn sync_events_route( - body: Ruma<sync_events::v3::IncomingRequest>, + body: Ruma<sync_events::v3::Request>, ) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> { let sender_user = body.sender_user.expect("user is authenticated"); let sender_device = body.sender_device.expect("user is authenticated"); @@ -124,7 +124,7 @@ pub async fn sync_events_route( async fn sync_helper_wrapper( sender_user: OwnedUserId, sender_device: OwnedDeviceId, - body: sync_events::v3::IncomingRequest, + body: sync_events::v3::Request, tx: Sender<Option<Result<sync_events::v3::Response>>>, ) { let since = body.since.clone(); @@ -157,12 +157,12 @@ async fn sync_helper_wrapper( async fn sync_helper( sender_user: OwnedUserId, sender_device: OwnedDeviceId, - body: sync_events::v3::IncomingRequest, + body: sync_events::v3::Request, // bool = caching allowed ) -> Result<(sync_events::v3::Response, bool), Error> { use sync_events::v3::{ - Ephemeral, GlobalAccountData, IncomingFilter, InviteState, InvitedRoom, JoinedRoom, - LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice, + Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, LeftRoom, + Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice, }; // TODO: match body.set_presence { @@ -176,9 +176,9 @@ async fn sync_helper( // Load filter let filter = match body.filter { - None => IncomingFilterDefinition::default(), - Some(IncomingFilter::FilterDefinition(filter)) => filter, - Some(IncomingFilter::FilterId(filter_id)) => services() + None => FilterDefinition::default(), + Some(Filter::FilterDefinition(filter)) => filter, + Some(Filter::FilterId(filter_id)) => services() .users .get_filter(&sender_user, &filter_id)? .unwrap_or_default(), @@ -282,9 +282,8 @@ async fn sync_helper( let send_notification_counts = !timeline_pdus.is_empty() || services() .rooms - .edus - .read_receipt - .last_privateread_update(&sender_user, &room_id)? + .user + .last_notification_read(&sender_user, &room_id)? > since; let mut timeline_users = HashSet::new(); @@ -389,13 +388,35 @@ async fn sync_helper( )) }; + let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash + .and_then(|shortstatehash| { + services() + .rooms + .state_accessor + .state_get( + shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) + .transpose() + }) + .transpose()? + .and_then(|pdu| { + serde_json::from_str(pdu.content.get()) + .map_err(|_| Error::bad_database("Invalid PDU in database.")) + .ok() + }); + + let joined_since_last_sync = + since_sender_member.map_or(true, |member| member.membership != MembershipState::Join); + let ( heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events, - ) = if since_shortstatehash.is_none() { + ) = if since_shortstatehash.is_none() || joined_since_last_sync { // Probably since = 0, we will do an initial sync let (joined_member_count, invited_member_count, heroes) = calculate_counts()?; @@ -488,23 +509,6 @@ async fn sync_helper( // Incremental /sync let since_shortstatehash = since_shortstatehash.unwrap(); - let since_sender_member: Option<RoomMemberEventContent> = services() - .rooms - .state_accessor - .state_get( - since_shortstatehash, - &StateEventType::RoomMember, - sender_user.as_str(), - )? - .and_then(|pdu| { - serde_json::from_str(pdu.content.get()) - .map_err(|_| Error::bad_database("Invalid PDU in database.")) - .ok() - }); - - let joined_since_last_sync = since_sender_member - .map_or(true, |member| member.membership != MembershipState::Join); - let mut state_events = Vec::new(); let mut lazy_loaded = HashSet::new(); @@ -869,7 +873,7 @@ async fn sync_helper( let since_state_ids = match since_shortstatehash { Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, - None => BTreeMap::new(), + None => HashMap::new(), }; let left_event_id = match services().rooms.state_accessor.room_state_get_id( @@ -905,7 +909,7 @@ async fn sync_helper( let leave_shortstatekey = services() .rooms .short - .get_or_create_shortstatekey(&StateEventType::RoomMember, &sender_user.as_str())?; + .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?; left_state_ids.insert(leave_shortstatekey, left_event_id); diff --git a/src/api/client_server/tag.rs b/src/api/client_server/tag.rs index c87e233..16f1600 100644 --- a/src/api/client_server/tag.rs +++ b/src/api/client_server/tag.rs @@ -14,7 +14,7 @@ use std::collections::BTreeMap; /// /// - Inserts the tag into the tag event of the room account data. pub async fn update_tag_route( - body: Ruma<create_tag::v3::IncomingRequest>, + body: Ruma<create_tag::v3::Request>, ) -> Result<create_tag::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -58,7 +58,7 @@ pub async fn update_tag_route( /// /// - Removes the tag from the tag event of the room account data. pub async fn delete_tag_route( - body: Ruma<delete_tag::v3::IncomingRequest>, + body: Ruma<delete_tag::v3::Request>, ) -> Result<delete_tag::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -98,9 +98,7 @@ pub async fn delete_tag_route( /// Returns tags on the room. /// /// - Gets the tag event of the room account data. -pub async fn get_tags_route( - body: Ruma<get_tags::v3::IncomingRequest>, -) -> Result<get_tags::v3::Response> { +pub async fn get_tags_route(body: Ruma<get_tags::v3::Request>) -> Result<get_tags::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let event = services().account_data.get( diff --git a/src/api/client_server/thirdparty.rs b/src/api/client_server/thirdparty.rs index 5665ad6..c2c1adf 100644 --- a/src/api/client_server/thirdparty.rs +++ b/src/api/client_server/thirdparty.rs @@ -7,7 +7,7 @@ use std::collections::BTreeMap; /// /// TODO: Fetches all metadata about protocols supported by the homeserver. pub async fn get_protocols_route( - _body: Ruma<get_protocols::v3::IncomingRequest>, + _body: Ruma<get_protocols::v3::Request>, ) -> Result<get_protocols::v3::Response> { // TODO Ok(get_protocols::v3::Response { diff --git a/src/api/client_server/to_device.rs b/src/api/client_server/to_device.rs index 139b845..26db4e4 100644 --- a/src/api/client_server/to_device.rs +++ b/src/api/client_server/to_device.rs @@ -14,7 +14,7 @@ use ruma::{ /// /// Send a to-device event to a set of client devices. pub async fn send_event_to_device_route( - body: Ruma<send_event_to_device::v3::IncomingRequest>, + body: Ruma<send_event_to_device::v3::Request>, ) -> Result<send_event_to_device::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_deref(); diff --git a/src/api/client_server/typing.rs b/src/api/client_server/typing.rs index ecc926f..43217e1 100644 --- a/src/api/client_server/typing.rs +++ b/src/api/client_server/typing.rs @@ -5,7 +5,7 @@ use ruma::api::client::{error::ErrorKind, typing::create_typing_event}; /// /// Sets the typing state of the sender user. pub async fn create_typing_event_route( - body: Ruma<create_typing_event::v3::IncomingRequest>, + body: Ruma<create_typing_event::v3::Request>, ) -> Result<create_typing_event::v3::Response> { use create_typing_event::v3::Typing; diff --git a/src/api/client_server/unversioned.rs b/src/api/client_server/unversioned.rs index 8a5c3d2..526598b 100644 --- a/src/api/client_server/unversioned.rs +++ b/src/api/client_server/unversioned.rs @@ -15,7 +15,7 @@ use crate::{Result, Ruma}; /// Note: Unstable features are used while developing new features. Clients should avoid using /// unstable features in their stable releases pub async fn get_supported_versions_route( - _body: Ruma<get_supported_versions::IncomingRequest>, + _body: Ruma<get_supported_versions::Request>, ) -> Result<get_supported_versions::Response> { let resp = get_supported_versions::Response { versions: vec![ diff --git a/src/api/client_server/user_directory.rs b/src/api/client_server/user_directory.rs index 518daa5..c30bac5 100644 --- a/src/api/client_server/user_directory.rs +++ b/src/api/client_server/user_directory.rs @@ -14,7 +14,7 @@ use ruma::{ /// - Hides any local users that aren't in any public rooms (i.e. those that have the join rule set to public) /// and don't share a room with the sender pub async fn search_users_route( - body: Ruma<search_users::v3::IncomingRequest>, + body: Ruma<search_users::v3::Request>, ) -> Result<search_users::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let limit = u64::from(body.limit) as usize; diff --git a/src/api/client_server/voip.rs b/src/api/client_server/voip.rs index 6b1ee40..4990c17 100644 --- a/src/api/client_server/voip.rs +++ b/src/api/client_server/voip.rs @@ -10,7 +10,7 @@ type HmacSha1 = Hmac<Sha1>; /// /// TODO: Returns information about the recommended turn server. pub async fn turn_server_route( - body: Ruma<get_turn_server_info::v3::IncomingRequest>, + body: Ruma<get_turn_server_info::v3::Request>, ) -> Result<get_turn_server_info::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); diff --git a/src/api/ruma_wrapper/axum.rs b/src/api/ruma_wrapper/axum.rs index 3870250..74f506f 100644 --- a/src/api/ruma_wrapper/axum.rs +++ b/src/api/ruma_wrapper/axum.rs @@ -311,8 +311,7 @@ impl Credentials for XMatrix { fn decode(value: &http::HeaderValue) -> Option<Self> { debug_assert!( value.as_bytes().starts_with(b"X-Matrix "), - "HeaderValue to decode should start with \"X-Matrix ..\", received = {:?}", - value, + "HeaderValue to decode should start with \"X-Matrix ..\", received = {value:?}", ); let parameters = str::from_utf8(&value.as_bytes()["X-Matrix ".len()..]) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 320e396..fc3e2c0 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -31,7 +31,7 @@ use ruma::{ EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, OutgoingResponse, SendAccessToken, }, - directory::{IncomingFilter, IncomingRoomNetwork}, + directory::{Filter, RoomNetwork}, events::{ receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, room::{ @@ -42,8 +42,8 @@ use ruma::{ }, serde::{Base64, JsonObject, Raw}, to_device::DeviceIdOrAllDevices, - CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, - OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, + CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, + OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ @@ -55,7 +55,7 @@ use std::{ time::{Duration, Instant, SystemTime}, }; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; /// Wraps either an literal IP address plus port, or a hostname plus complement /// (colon-plus-port if it was specified). @@ -84,8 +84,8 @@ pub enum FedDest { impl FedDest { fn into_https_string(self) -> String { match self { - Self::Literal(addr) => format!("https://{}", addr), - Self::Named(host, port) => format!("https://{}{}", host, port), + Self::Literal(addr) => format!("https://{addr}"), + Self::Named(host, port) => format!("https://{host}{port}"), } } @@ -294,13 +294,7 @@ where } else { Err(Error::FederationError( destination.to_owned(), - RumaError::try_from_http_response(http_response).map_err(|e| { - warn!( - "Invalid {} response from {} on: {} {}", - status, &destination, url, e - ); - Error::BadServerResponse("Server returned bad error response.") - })?, + RumaError::from_http_response(http_response), )) } } @@ -391,7 +385,7 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe } if let Some(port) = force_port { - FedDest::Named(delegated_hostname, format!(":{}", port)) + FedDest::Named(delegated_hostname, format!(":{port}")) } else { add_port_to_hostname(&delegated_hostname) } @@ -433,7 +427,7 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe } if let Some(port) = force_port { - FedDest::Named(hostname.clone(), format!(":{}", port)) + FedDest::Named(hostname.clone(), format!(":{port}")) } else { add_port_to_hostname(&hostname) } @@ -466,7 +460,7 @@ async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> { if let Ok(Some(host_port)) = services() .globals .dns_resolver() - .srv_lookup(format!("_matrix._tcp.{}", hostname)) + .srv_lookup(format!("_matrix._tcp.{hostname}")) .await .map(|srv| { srv.iter().next().map(|result| { @@ -488,10 +482,7 @@ async fn request_well_known(destination: &str) -> Option<String> { &services() .globals .default_client() - .get(&format!( - "https://{}/.well-known/matrix/server", - destination - )) + .get(&format!("https://{destination}/.well-known/matrix/server")) .send() .await .ok()? @@ -586,7 +577,7 @@ pub async fn get_server_keys_deprecated_route() -> impl IntoResponse { /// /// Lists the public rooms on this server. pub async fn get_public_rooms_filtered_route( - body: Ruma<get_public_rooms_filtered::v1::IncomingRequest>, + body: Ruma<get_public_rooms_filtered::v1::Request>, ) -> Result<get_public_rooms_filtered::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -613,7 +604,7 @@ pub async fn get_public_rooms_filtered_route( /// /// Lists the public rooms on this server. pub async fn get_public_rooms_route( - body: Ruma<get_public_rooms::v1::IncomingRequest>, + body: Ruma<get_public_rooms::v1::Request>, ) -> Result<get_public_rooms::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -623,8 +614,8 @@ pub async fn get_public_rooms_route( None, body.limit, body.since.as_deref(), - &IncomingFilter::default(), - &IncomingRoomNetwork::Matrix, + &Filter::default(), + &RoomNetwork::Matrix, ) .await?; @@ -640,7 +631,7 @@ pub async fn get_public_rooms_route( /// /// Push EDUs and PDUs to this server. pub async fn send_transaction_message_route( - body: Ruma<send_transaction_message::v1::IncomingRequest>, + body: Ruma<send_transaction_message::v1::Request>, ) -> Result<send_transaction_message::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -664,16 +655,11 @@ pub async fn send_transaction_message_route( // let mut auth_cache = EventMap::new(); for pdu in &body.pdus { - // We do not add the event_id field to the pdu here because of signature and hashes checks - let (event_id, value) = match gen_event_id_canonical_json(pdu) { - Ok(t) => t, - Err(_) => { - // Event could not be converted to canonical json - continue; - } - }; + let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { + warn!("Error parsing incoming event {:?}: {:?}", pdu, e); + Error::BadServerResponse("Invalid PDU in server response") + })?; - // 0. Check the server is in the room let room_id: OwnedRoomId = match value .get("room_id") .and_then(|id| RoomId::parse(id.as_str()?).ok()) @@ -681,14 +667,26 @@ pub async fn send_transaction_message_route( Some(id) => id, None => { // Event is invalid - resolved_map.insert( - event_id, - Err(Error::bad_database("Event needs a valid RoomId.")), - ); continue; } }; + let room_version_id = match services().rooms.state.get_room_version(&room_id) { + Ok(v) => v, + Err(_) => { + continue; + } + }; + + let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) { + Ok(t) => t, + Err(_) => { + // Event could not be converted to canonical json + continue; + } + }; + // We do not add the event_id field to the pdu here because of signature and hashes checks + services() .rooms .event_handler @@ -724,7 +722,7 @@ pub async fn send_transaction_message_route( drop(mutex_lock); let elapsed = start_time.elapsed(); - warn!( + debug!( "Handling transaction of event {} took {}m{}s", event_id, elapsed.as_secs() / 60, @@ -909,7 +907,7 @@ pub async fn send_transaction_message_route( /// /// - Only works if a user of this server is currently invited or joined the room pub async fn get_event_route( - body: Ruma<get_event::v1::IncomingRequest>, + body: Ruma<get_event::v1::Request>, ) -> Result<get_event::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -956,7 +954,7 @@ pub async fn get_event_route( /// /// Retrieves events that the sender is missing. pub async fn get_missing_events_route( - body: Ruma<get_missing_events::v1::IncomingRequest>, + body: Ruma<get_missing_events::v1::Request>, ) -> Result<get_missing_events::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -1035,7 +1033,7 @@ pub async fn get_missing_events_route( /// /// - This does not include the event itself pub async fn get_event_authorization_route( - body: Ruma<get_event_authorization::v1::IncomingRequest>, + body: Ruma<get_event_authorization::v1::Request>, ) -> Result<get_event_authorization::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -1094,7 +1092,7 @@ pub async fn get_event_authorization_route( /// /// Retrieves the current state of the room. pub async fn get_room_state_route( - body: Ruma<get_room_state::v1::IncomingRequest>, + body: Ruma<get_room_state::v1::Request>, ) -> Result<get_room_state::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -1174,7 +1172,7 @@ pub async fn get_room_state_route( /// /// Retrieves the current state of the room. pub async fn get_room_state_ids_route( - body: Ruma<get_room_state_ids::v1::IncomingRequest>, + body: Ruma<get_room_state_ids::v1::Request>, ) -> Result<get_room_state_ids::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -1235,7 +1233,7 @@ pub async fn get_room_state_ids_route( /// /// Creates a join template. pub async fn create_join_event_template_route( - body: Ruma<prepare_join_event::v1::IncomingRequest>, + body: Ruma<prepare_join_event::v1::Request>, ) -> Result<prepare_join_event::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -1407,7 +1405,8 @@ async fn create_join_event( // let mut auth_cache = EventMap::new(); // We do not add the event_id field to the pdu here because of signature and hashes checks - let (event_id, value) = match gen_event_id_canonical_json(pdu) { + let room_version_id = services().rooms.state.get_room_version(room_id)?; + let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) { Ok(t) => t, Err(_) => { // Event could not be converted to canonical json @@ -1478,6 +1477,7 @@ async fn create_join_event( .filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten()) .map(PduEvent::convert_to_outgoing_federation_event) .collect(), + event: None, // TODO: handle restricted joins }) } @@ -1485,7 +1485,7 @@ async fn create_join_event( /// /// Submits a signed join event. pub async fn create_join_event_v1_route( - body: Ruma<create_join_event::v1::IncomingRequest>, + body: Ruma<create_join_event::v1::Request>, ) -> Result<create_join_event::v1::Response> { let sender_servername = body .sender_servername @@ -1501,7 +1501,7 @@ pub async fn create_join_event_v1_route( /// /// Submits a signed join event. pub async fn create_join_event_v2_route( - body: Ruma<create_join_event::v2::IncomingRequest>, + body: Ruma<create_join_event::v2::Request>, ) -> Result<create_join_event::v2::Response> { let sender_servername = body .sender_servername @@ -1517,7 +1517,7 @@ pub async fn create_join_event_v2_route( /// /// Invites a remote user to a room. pub async fn create_invite_route( - body: Ruma<create_invite::v2::IncomingRequest>, + body: Ruma<create_invite::v2::Request>, ) -> Result<create_invite::v2::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -1609,8 +1609,12 @@ pub async fn create_invite_route( invite_state.push(pdu.to_stripped_state_event()); - // If the room already exists, the remote server will notify us about the join via /send - if !services().rooms.metadata.exists(&pdu.room_id)? { + // If we are active in the room, the remote server will notify us about the join via /send + if !services() + .rooms + .state_cache + .server_in_room(services().globals.server_name(), &body.room_id)? + { services().rooms.state_cache.update_membership( &body.room_id, &invited_user, @@ -1630,7 +1634,7 @@ pub async fn create_invite_route( /// /// Gets information on all devices of the user. pub async fn get_devices_route( - body: Ruma<get_devices::v1::IncomingRequest>, + body: Ruma<get_devices::v1::Request>, ) -> Result<get_devices::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -1677,7 +1681,7 @@ pub async fn get_devices_route( /// /// Resolve a room alias to a room id. pub async fn get_room_information_route( - body: Ruma<get_room_information::v1::IncomingRequest>, + body: Ruma<get_room_information::v1::Request>, ) -> Result<get_room_information::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); @@ -1702,7 +1706,7 @@ pub async fn get_room_information_route( /// /// Gets information on a profile. pub async fn get_profile_information_route( - body: Ruma<get_profile_information::v1::IncomingRequest>, + body: Ruma<get_profile_information::v1::Request>, ) -> Result<get_profile_information::v1::Response> { if !services().globals.allow_federation() { return Err(Error::bad_config("Federation is disabled.")); diff --git a/src/config/mod.rs b/src/config/mod.rs index 3c3a764..31a586f 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -40,6 +40,8 @@ pub struct Config { pub max_request_size: u32, #[serde(default = "default_max_concurrent_requests")] pub max_concurrent_requests: u16, + #[serde(default = "default_max_fetch_prev_events")] + pub max_fetch_prev_events: u16, #[serde(default = "false_fn")] pub allow_registration: bool, #[serde(default = "true_fn")] @@ -183,7 +185,7 @@ impl fmt::Display for Config { ("Turn TTL", &self.turn_ttl.to_string()), ("Turn URIs", { let mut lst = vec![]; - for item in self.turn_uris.to_vec().into_iter().enumerate() { + for item in self.turn_uris.iter().cloned().enumerate() { let (_, uri): (usize, String) = item; lst.push(uri); } @@ -191,13 +193,13 @@ impl fmt::Display for Config { }), ]; - let mut msg: String = "Active config values:\n\n".to_string(); + let mut msg: String = "Active config values:\n\n".to_owned(); for line in lines.into_iter().enumerate() { msg += &format!("{}: {}\n", line.1 .0, line.1 .1); } - write!(f, "{}", msg) + write!(f, "{msg}") } } @@ -222,7 +224,7 @@ fn default_database_backend() -> String { } fn default_db_cache_capacity_mb() -> f64 { - 10.0 + 1000.0 } fn default_conduit_cache_capacity_modifier() -> f64 { @@ -230,7 +232,7 @@ fn default_conduit_cache_capacity_modifier() -> f64 { } fn default_rocksdb_max_open_files() -> i32 { - 20 + 1000 } fn default_pdu_cache_capacity() -> u32 { @@ -249,6 +251,10 @@ fn default_max_concurrent_requests() -> u16 { 100 } +fn default_max_fetch_prev_events() -> u16 { + 100_u16 +} + fn default_log() -> String { "warn,state_res=warn,_=off,sled=off".to_owned() } diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 4961fd7..b69efb6 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -106,7 +106,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> { } fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> { - self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; + self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"), [])?; Ok(Arc::new(SqliteTable { engine: Arc::clone(self), @@ -135,7 +135,6 @@ type TupleOfBytes = (Vec<u8>, Vec<u8>); impl SqliteTable { fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> { - //dbg!(&self.name); Ok(guard .prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? .query_row([key], |row| row.get(0)) @@ -143,7 +142,6 @@ impl SqliteTable { } fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> { - //dbg!(&self.name); guard.execute( format!( "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", @@ -176,10 +174,7 @@ impl SqliteTable { statement .query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(move |r| { - //dbg!(&name); - r.unwrap() - }), + .map(move |r| r.unwrap()), ); Box::new(PreparedStatementIterator { @@ -276,10 +271,7 @@ impl KvTree for SqliteTable { statement .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(move |r| { - //dbg!(&name); - r.unwrap() - }), + .map(move |r| r.unwrap()), ); Box::new(PreparedStatementIterator { iterator, @@ -301,10 +293,7 @@ impl KvTree for SqliteTable { statement .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(move |r| { - //dbg!(&name); - r.unwrap() - }), + .map(move |r| r.unwrap()), ); Box::new(PreparedStatementIterator { diff --git a/src/database/key_value/pusher.rs b/src/database/key_value/pusher.rs index 3dfceb6..50a6fac 100644 --- a/src/database/key_value/pusher.rs +++ b/src/database/key_value/pusher.rs @@ -1,38 +1,36 @@ use ruma::{ - api::client::push::{get_pushers, set_pusher}, + api::client::push::{set_pusher, Pusher}, UserId, }; use crate::{database::KeyValueDatabase, service, utils, Error, Result}; impl service::pusher::Data for KeyValueDatabase { - fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> { - let mut key = sender.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(pusher.pushkey.as_bytes()); - - // There are 2 kinds of pushers but the spec says: null deletes the pusher. - if pusher.kind.is_none() { - return self - .senderkey_pusher - .remove(&key) - .map(|_| ()) - .map_err(Into::into); + fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::PusherAction) -> Result<()> { + match &pusher { + set_pusher::v3::PusherAction::Post(data) => { + let mut key = sender.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(data.pusher.ids.pushkey.as_bytes()); + self.senderkey_pusher.insert( + &key, + &serde_json::to_vec(&pusher).expect("Pusher is valid JSON value"), + )?; + Ok(()) + } + set_pusher::v3::PusherAction::Delete(ids) => { + let mut key = sender.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(ids.pushkey.as_bytes()); + self.senderkey_pusher + .remove(&key) + .map(|_| ()) + .map_err(Into::into) + } } - - self.senderkey_pusher.insert( - &key, - &serde_json::to_vec(&pusher).expect("Pusher is valid JSON value"), - )?; - - Ok(()) } - fn get_pusher( - &self, - sender: &UserId, - pushkey: &str, - ) -> Result<Option<get_pushers::v3::Pusher>> { + fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result<Option<Pusher>> { let mut senderkey = sender.as_bytes().to_vec(); senderkey.push(0xff); senderkey.extend_from_slice(pushkey.as_bytes()); @@ -46,7 +44,7 @@ impl service::pusher::Data for KeyValueDatabase { .transpose() } - fn get_pushers(&self, sender: &UserId) -> Result<Vec<get_pushers::v3::Pusher>> { + fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> { let mut prefix = sender.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/key_value/rooms/edus/typing.rs b/src/database/key_value/rooms/edus/typing.rs index 4a2f0f9..5709192 100644 --- a/src/database/key_value/rooms/edus/typing.rs +++ b/src/database/key_value/rooms/edus/typing.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::{collections::HashSet, mem}; use ruma::{OwnedUserId, RoomId, UserId}; @@ -53,6 +53,47 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase { Ok(()) } + fn typings_maintain(&self, room_id: &RoomId) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let current_timestamp = utils::millis_since_unix_epoch(); + + let mut found_outdated = false; + + // Find all outdated edus before inserting a new one + for outdated_edu in self + .typingid_userid + .scan_prefix(prefix) + .map(|(key, _)| { + Ok::<_, Error>(( + key.clone(), + utils::u64_from_bytes( + &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| { + Error::bad_database("RoomTyping has invalid timestamp or delimiters.") + })?[0..mem::size_of::<u64>()], + ) + .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, + )) + }) + .filter_map(|r| r.ok()) + .take_while(|&(_, timestamp)| timestamp < current_timestamp) + { + // This is an outdated edu (time > timestamp) + self.typingid_userid.remove(&outdated_edu.0)?; + found_outdated = true; + } + + if found_outdated { + self.roomid_lasttypingupdate.insert( + room_id.as_bytes(), + &services().globals.next_count()?.to_be_bytes(), + )?; + } + + Ok(()) + } + fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> { Ok(self .roomid_lasttypingupdate diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 70e59ac..0f0c0dc 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; @@ -9,7 +6,7 @@ use ruma::{events::StateEventType, EventId, RoomId}; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { - async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> { + async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> { let full_state = services() .rooms .state_compressor @@ -17,7 +14,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { .pop() .expect("there is always one layer") .1; - let mut result = BTreeMap::new(); + let mut result = HashMap::new(); let mut i = 0; for compressed in full_state.into_iter() { let parsed = services() diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs index 3d8d1c8..4c43572 100644 --- a/src/database/key_value/rooms/user.rs +++ b/src/database/key_value/rooms/user.rs @@ -7,12 +7,20 @@ impl service::rooms::user::Data for KeyValueDatabase { let mut userroom_id = user_id.as_bytes().to_vec(); userroom_id.push(0xff); userroom_id.extend_from_slice(room_id.as_bytes()); + let mut roomuser_id = room_id.as_bytes().to_vec(); + roomuser_id.push(0xff); + roomuser_id.extend_from_slice(user_id.as_bytes()); self.userroomid_notificationcount .insert(&userroom_id, &0_u64.to_be_bytes())?; self.userroomid_highlightcount .insert(&userroom_id, &0_u64.to_be_bytes())?; + self.roomuserid_lastnotificationread.insert( + &roomuser_id, + &services().globals.next_count()?.to_be_bytes(), + )?; + Ok(()) } @@ -44,6 +52,23 @@ impl service::rooms::user::Data for KeyValueDatabase { .unwrap_or(Ok(0)) } + fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + Ok(self + .roomuserid_lastnotificationread + .get(&key)? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.") + }) + }) + .transpose()? + .unwrap_or(0)) + } + fn associate_token_shortstatehash( &self, room_id: &RoomId, diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs index cd5a535..1cabab0 100644 --- a/src/database/key_value/users.rs +++ b/src/database/key_value/users.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeMap, mem::size_of}; use ruma::{ - api::client::{device::Device, error::ErrorKind, filter::IncomingFilterDefinition}, + api::client::{device::Device, error::ErrorKind, filter::FilterDefinition}, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::{AnyToDeviceEvent, StateEventType}, serde::Raw, @@ -899,7 +899,7 @@ impl service::users::Data for KeyValueDatabase { } /// Creates a new sync filter. Returns the filter id. - fn create_filter(&self, user_id: &UserId, filter: &IncomingFilterDefinition) -> Result<String> { + fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String> { let filter_id = utils::random_string(4); let mut key = user_id.as_bytes().to_vec(); @@ -914,11 +914,7 @@ impl service::users::Data for KeyValueDatabase { Ok(filter_id) } - fn get_filter( - &self, - user_id: &UserId, - filter_id: &str, - ) -> Result<Option<IncomingFilterDefinition>> { + fn get_filter(&self, user_id: &UserId, filter_id: &str) -> Result<Option<FilterDefinition>> { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(filter_id.as_bytes()); diff --git a/src/database/mod.rs b/src/database/mod.rs index 15ee137..78bb358 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -7,7 +7,8 @@ use directories::ProjectDirs; use lru_cache::LruCache; use ruma::{ events::{ - push_rules::PushRulesEventContent, room::message::RoomMessageEventContent, + push_rules::{PushRulesEvent, PushRulesEventContent}, + room::message::RoomMessageEventContent, GlobalAccountDataEvent, GlobalAccountDataEventType, StateEventType, }, push::Ruleset, @@ -98,6 +99,7 @@ pub struct KeyValueDatabase { pub(super) userroomid_notificationcount: Arc<dyn KvTree>, // NotifyCount = u64 pub(super) userroomid_highlightcount: Arc<dyn KvTree>, // HightlightCount = u64 + pub(super) roomuserid_lastnotificationread: Arc<dyn KvTree>, // LastNotificationRead = u64 /// Remember the current state hash of a room. pub(super) roomid_shortstatehash: Arc<dyn KvTree>, @@ -256,7 +258,7 @@ impl KeyValueDatabase { }; if config.max_request_size < 1024 { - eprintln!("ERROR: Max request size is less than 1KB. Please increase it."); + error!(?config.max_request_size, "Max request size is less than 1KB. Please increase it."); } let db_raw = Box::new(Self { @@ -317,6 +319,7 @@ impl KeyValueDatabase { userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?, userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?, + roomuserid_lastnotificationread: builder.open_tree("userroomid_highlightcount")?, statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?, shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?, @@ -405,7 +408,7 @@ impl KeyValueDatabase { } // If the database has any data, perform data migrations before starting - let latest_database_version = 11; + let latest_database_version = 12; if services().users.count()? > 0 { // MIGRATIONS @@ -480,7 +483,7 @@ impl KeyValueDatabase { for user in services().rooms.state_cache.room_members(&room?) { let user = user?; if user.server_name() != services().globals.server_name() { - println!("Migration: Creating user {}", user); + info!(?user, "Migration: creating user"); services().users.create(&user, None)?; } } @@ -542,7 +545,6 @@ impl KeyValueDatabase { current_state: HashSet<_>, last_roomstates: &mut HashMap<_, _>| { counter += 1; - println!("counter: {}", counter); let last_roomsstatehash = last_roomstates.get(current_room); let states_parents = last_roomsstatehash.map_or_else( @@ -739,15 +741,13 @@ impl KeyValueDatabase { new_key.extend_from_slice(word); new_key.push(0xff); new_key.extend_from_slice(pdu_id_count); - println!("old {:?}", key); - println!("new {:?}", new_key); Some((new_key, Vec::new())) }) .peekable(); while iter.peek().is_some() { db.tokenids.insert_batch(&mut iter.by_ref().take(1000))?; - println!("smaller batch done"); + debug!("Inserted smaller batch"); } info!("Deleting starts"); @@ -757,7 +757,6 @@ impl KeyValueDatabase { .iter() .filter_map(|(key, _)| { if key.starts_with(b"!") { - println!("del {:?}", key); Some(key) } else { None @@ -766,7 +765,6 @@ impl KeyValueDatabase { .collect(); for key in batch2 { - println!("del"); db.tokenids.remove(&key)?; } @@ -801,7 +799,81 @@ impl KeyValueDatabase { warn!("Migration: 10 -> 11 finished"); } - assert_eq!(11, latest_database_version); + if services().globals.database_version()? < 12 { + for username in services().users.list_local_users().unwrap() { + let user = + UserId::parse_with_server_name(username, services().globals.server_name()) + .unwrap(); + + let raw_rules_list = services() + .account_data + .get( + None, + &user, + GlobalAccountDataEventType::PushRules.to_string().into(), + ) + .unwrap() + .expect("Username is invalid"); + + let mut account_data = + serde_json::from_str::<PushRulesEvent>(raw_rules_list.get()).unwrap(); + let rules_list = &mut account_data.content.global; + + //content rule + { + let content_rule_transformation = + [".m.rules.contains_user_name", ".m.rule.contains_user_name"]; + + let rule = rules_list.content.get(content_rule_transformation[0]); + if rule.is_some() { + let mut rule = rule.unwrap().clone(); + rule.rule_id = content_rule_transformation[1].to_owned(); + rules_list.content.remove(content_rule_transformation[0]); + rules_list.content.insert(rule); + } + } + + //underride rules + { + let underride_rule_transformation = [ + [".m.rules.call", ".m.rule.call"], + [".m.rules.room_one_to_one", ".m.rule.room_one_to_one"], + [ + ".m.rules.encrypted_room_one_to_one", + ".m.rule.encrypted_room_one_to_one", + ], + [".m.rules.message", ".m.rule.message"], + [".m.rules.encrypted", ".m.rule.encrypted"], + ]; + + for transformation in underride_rule_transformation { + let rule = rules_list.underride.get(transformation[0]); + if let Some(rule) = rule { + let mut rule = rule.clone(); + rule.rule_id = transformation[1].to_owned(); + rules_list.underride.remove(transformation[0]); + rules_list.underride.insert(rule); + } + } + } + + services().account_data.update( + None, + &user, + GlobalAccountDataEventType::PushRules.to_string().into(), + &serde_json::to_value(account_data).expect("to json value always works"), + )?; + } + + services().globals.bump_database_version(12)?; + + warn!("Migration: 11 -> 12 finished"); + } + + assert_eq!( + services().globals.database_version().unwrap(), + latest_database_version + ); info!( "Loaded {} database with version {}", @@ -868,7 +940,6 @@ impl KeyValueDatabase { #[cfg(unix)] use tokio::signal::unix::{signal, SignalKind}; - use tracing::info; use std::time::{Duration, Instant}; @@ -884,23 +955,23 @@ impl KeyValueDatabase { #[cfg(unix)] tokio::select! { _ = i.tick() => { - info!("cleanup: Timer ticked"); + debug!("cleanup: Timer ticked"); } _ = s.recv() => { - info!("cleanup: Received SIGHUP"); + debug!("cleanup: Received SIGHUP"); } }; #[cfg(not(unix))] { i.tick().await; - info!("cleanup: Timer ticked") + debug!("cleanup: Timer ticked") } let start = Instant::now(); if let Err(e) = services().globals.cleanup() { error!("cleanup: Errored: {}", e); } else { - info!("cleanup: Finished in {:?}", start.elapsed()); + debug!("cleanup: Finished in {:?}", start.elapsed()); } } }); @@ -23,7 +23,7 @@ pub use utils::error::{Error, Result}; pub static SERVICES: RwLock<Option<&'static Services>> = RwLock::new(None); -pub fn services<'a>() -> &'static Services { +pub fn services() -> &'static Services { SERVICES .read() .unwrap() diff --git a/src/main.rs b/src/main.rs index af19d2a..da80507 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,10 +24,15 @@ use figment::{ }; use http::{ header::{self, HeaderName}, - Method, Uri, + Method, StatusCode, Uri, +}; +use ruma::api::{ + client::{ + error::{Error as RumaError, ErrorBody, ErrorKind}, + uiaa::UiaaResponse, + }, + IncomingRequest, }; -use opentelemetry::trace::{FutureExt, Tracer}; -use ruma::api::{client::error::ErrorKind, IncomingRequest}; use tokio::signal; use tower::ServiceBuilder; use tower_http::{ @@ -35,7 +40,7 @@ use tower_http::{ trace::TraceLayer, ServiceBuilderExt as _, }; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use tracing_subscriber::{prelude::*, EnvFilter}; pub use conduit::*; // Re-export everything from the library crate @@ -63,65 +68,74 @@ async fn main() { let config = match raw_config.extract::<Config>() { Ok(s) => s, Err(e) => { - eprintln!("It looks like your config is invalid. The following error occured while parsing it: {}", e); + eprintln!("It looks like your config is invalid. The following error occurred: {e}"); std::process::exit(1); } }; config.warn_deprecated(); - if let Err(e) = KeyValueDatabase::load_or_create(config).await { - eprintln!( - "The database couldn't be loaded or created. The following error occured: {}", - e - ); - std::process::exit(1); - }; - - let config = &services().globals.config; - - let start = async { - run_server().await.unwrap(); - }; - if config.allow_jaeger { opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_auto_split_batch(true) + .with_service_name("conduit") .install_batch(opentelemetry::runtime::Tokio) .unwrap(); + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + let filter_layer = match EnvFilter::try_new(&config.log) { + Ok(s) => s, + Err(e) => { + eprintln!( + "It looks like your log config is invalid. The following error occurred: {e}" + ); + EnvFilter::try_new("warn").unwrap() + } + }; + + let subscriber = tracing_subscriber::Registry::default() + .with(filter_layer) + .with(telemetry); + tracing::subscriber::set_global_default(subscriber).unwrap(); + } else if config.tracing_flame { + let registry = tracing_subscriber::Registry::default(); + let (flame_layer, _guard) = + tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap(); + let flame_layer = flame_layer.with_empty_samples(false); - let span = tracer.start("conduit"); - start.with_current_context().await; - drop(span); + let filter_layer = EnvFilter::new("trace,h2=off"); - println!("exporting"); - opentelemetry::global::shutdown_tracer_provider(); + let subscriber = registry.with(filter_layer).with(flame_layer); + tracing::subscriber::set_global_default(subscriber).unwrap(); } else { let registry = tracing_subscriber::Registry::default(); - if config.tracing_flame { - let (flame_layer, _guard) = - tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap(); - let flame_layer = flame_layer.with_empty_samples(false); - - let filter_layer = EnvFilter::new("trace,h2=off"); - - let subscriber = registry.with(filter_layer).with(flame_layer); - tracing::subscriber::set_global_default(subscriber).unwrap(); - start.await; - } else { - let fmt_layer = tracing_subscriber::fmt::Layer::new(); - let filter_layer = match EnvFilter::try_new(&config.log) { - Ok(s) => s, - Err(e) => { - eprintln!("It looks like your log config is invalid. The following error occurred: {}", e); - EnvFilter::try_new("warn").unwrap() - } - }; + let fmt_layer = tracing_subscriber::fmt::Layer::new(); + let filter_layer = match EnvFilter::try_new(&config.log) { + Ok(s) => s, + Err(e) => { + eprintln!("It looks like your config is invalid. The following error occured while parsing it: {e}"); + EnvFilter::try_new("warn").unwrap() + } + }; - let subscriber = registry.with(filter_layer).with(fmt_layer); - tracing::subscriber::set_global_default(subscriber).unwrap(); - start.await; - } + let subscriber = registry.with(filter_layer).with(fmt_layer); + tracing::subscriber::set_global_default(subscriber).unwrap(); + } + + info!("Loading database"); + if let Err(error) = KeyValueDatabase::load_or_create(config).await { + error!(?error, "The database couldn't be loaded or created"); + + std::process::exit(1); + }; + let config = &services().globals.config; + + info!("Starting server"); + run_server().await.unwrap(); + + if config.allow_jaeger { + opentelemetry::global::shutdown_tracer_provider(); } } @@ -180,10 +194,20 @@ async fn run_server() -> io::Result<()> { match &config.tls { Some(tls) => { let conf = RustlsConfig::from_pem_file(&tls.certs, &tls.key).await?; - bind_rustls(addr, conf).handle(handle).serve(app).await?; + let server = bind_rustls(addr, conf).handle(handle).serve(app); + + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + + server.await? } None => { - bind(addr).handle(handle).serve(app).await?; + let server = bind(addr).handle(handle).serve(app); + + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + + server.await? } } @@ -191,21 +215,29 @@ async fn run_server() -> io::Result<()> { info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers..."); services().globals.rotate.fire(); + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]); + Ok(()) } async fn unrecognized_method<B>( req: axum::http::Request<B>, next: axum::middleware::Next<B>, -) -> std::result::Result<axum::response::Response, axum::http::StatusCode> { +) -> std::result::Result<axum::response::Response, StatusCode> { let method = req.method().clone(); let uri = req.uri().clone(); let inner = next.run(req).await; if inner.status() == axum::http::StatusCode::METHOD_NOT_ALLOWED { warn!("Method not allowed: {method} {uri}"); - return Ok( - Error::BadRequest(ErrorKind::Unrecognized, "Unrecognized request").into_response(), - ); + return Ok(RumaResponse(UiaaResponse::MatrixError(RumaError { + body: ErrorBody::Standard { + kind: ErrorKind::Unrecognized, + message: "M_UNRECOGNIZED: Unrecognized request".to_owned(), + }, + status_code: StatusCode::METHOD_NOT_ALLOWED, + })) + .into_response()); } Ok(inner) } @@ -464,7 +496,7 @@ macro_rules! impl_ruma_handler { let meta = Req::METADATA; let method_filter = method_to_filter(meta.method); - for path in IntoIterator::into_iter([meta.unstable_path, meta.r0_path, meta.stable_path]).flatten() { + for path in meta.history.all_paths() { let handler = self.clone(); router = router.route(path, on(method_filter, |$( $ty: $ty, )* req| async move { @@ -498,6 +530,6 @@ fn method_to_filter(method: Method) -> MethodFilter { Method::POST => MethodFilter::POST, Method::PUT => MethodFilter::PUT, Method::TRACE => MethodFilter::TRACE, - m => panic!("Unsupported HTTP method: {:?}", m), + m => panic!("Unsupported HTTP method: {m:?}"), } } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 5766b2f..77f351a 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -287,13 +287,11 @@ impl Service { Err(error) => { let markdown_message = format!( "Encountered an error while handling the command:\n\ - ```\n{}\n```", - error, + ```\n{error}\n```", ); let html_message = format!( "Encountered an error while handling the command:\n\ - <pre>\n{}\n</pre>", - error, + <pre>\n{error}\n</pre>", ); RoomMessageEventContent::text_html(markdown_message, html_message) @@ -338,17 +336,14 @@ impl Service { match parsed_config { Ok(yaml) => match services().appservice.register_appservice(yaml) { Ok(id) => RoomMessageEventContent::text_plain(format!( - "Appservice registered with ID: {}.", - id + "Appservice registered with ID: {id}." )), Err(e) => RoomMessageEventContent::text_plain(format!( - "Failed to register appservice: {}", - e + "Failed to register appservice: {e}" )), }, Err(e) => RoomMessageEventContent::text_plain(format!( - "Could not parse appservice config: {}", - e + "Could not parse appservice config: {e}" )), } } else { @@ -365,8 +360,7 @@ impl Service { { Ok(()) => RoomMessageEventContent::text_plain("Appservice unregistered."), Err(e) => RoomMessageEventContent::text_plain(format!( - "Failed to unregister appservice: {}", - e + "Failed to unregister appservice: {e}" )), }, AdminCommand::ListAppservices => { @@ -459,8 +453,7 @@ impl Service { .count(); let elapsed = start.elapsed(); RoomMessageEventContent::text_plain(format!( - "Loaded auth chain with length {} in {:?}", - count, elapsed + "Loaded auth chain with length {count} in {elapsed:?}" )) } else { RoomMessageEventContent::text_plain("Event not found.") @@ -474,30 +467,26 @@ impl Service { Ok(value) => { match ruma::signatures::reference_hash(&value, &RoomVersionId::V6) { Ok(hash) => { - let event_id = EventId::parse(format!("${}", hash)); + let event_id = EventId::parse(format!("${hash}")); match serde_json::from_value::<PduEvent>( serde_json::to_value(value).expect("value is json"), ) { Ok(pdu) => RoomMessageEventContent::text_plain(format!( - "EventId: {:?}\n{:#?}", - event_id, pdu + "EventId: {event_id:?}\n{pdu:#?}" )), Err(e) => RoomMessageEventContent::text_plain(format!( - "EventId: {:?}\nCould not parse event: {}", - event_id, e + "EventId: {event_id:?}\nCould not parse event: {e}" )), } } Err(e) => RoomMessageEventContent::text_plain(format!( - "Could not parse PDU JSON: {:?}", - e + "Could not parse PDU JSON: {e:?}" )), } } Err(e) => RoomMessageEventContent::text_plain(format!( - "Invalid json in command body: {}", - e + "Invalid json in command body: {e}" )), } } else { @@ -545,8 +534,7 @@ impl Service { AdminCommand::DatabaseMemoryUsage => match services().globals.db.memory_usage() { Ok(response) => RoomMessageEventContent::text_plain(response), Err(e) => RoomMessageEventContent::text_plain(format!( - "Failed to get database memory usage: {}", - e + "Failed to get database memory usage: {e}" )), }, AdminCommand::ShowConfig => { @@ -561,8 +549,7 @@ impl Service { Ok(id) => id, Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( - "The supplied username is not a valid username: {}", - e + "The supplied username is not a valid username: {e}" ))) } }; @@ -589,17 +576,16 @@ impl Service { .set_password(&user_id, Some(new_password.as_str())) { Ok(()) => RoomMessageEventContent::text_plain(format!( - "Successfully reset the password for user {}: {}", - user_id, new_password + "Successfully reset the password for user {user_id}: {new_password}" )), Err(e) => RoomMessageEventContent::text_plain(format!( - "Couldn't reset the password for user {}: {}", - user_id, e + "Couldn't reset the password for user {user_id}: {e}" )), } } AdminCommand::CreateUser { username, password } => { - let password = password.unwrap_or(utils::random_string(AUTO_GEN_PASSWORD_LENGTH)); + let password = + password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH)); // Validate user id let user_id = match UserId::parse_with_server_name( username.as_str().to_lowercase(), @@ -608,8 +594,7 @@ impl Service { Ok(id) => id, Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( - "The supplied username is not a valid username: {}", - e + "The supplied username is not a valid username: {e}" ))) } }; @@ -675,8 +660,7 @@ impl Service { let user_id = Arc::<UserId>::from(user_id); if services().users.exists(&user_id)? { RoomMessageEventContent::text_plain(format!( - "Making {} leave all rooms before deactivation...", - user_id + "Making {user_id} leave all rooms before deactivation..." )); services().users.deactivate_account(&user_id)?; @@ -686,13 +670,11 @@ impl Service { } RoomMessageEventContent::text_plain(format!( - "User {} has been deactivated", - user_id + "User {user_id} has been deactivated" )) } else { RoomMessageEventContent::text_plain(format!( - "User {} doesn't exist on this server", - user_id + "User {user_id} doesn't exist on this server" )) } } @@ -708,8 +690,7 @@ impl Service { Ok(user_id) => user_ids.push(user_id), Err(_) => { return Ok(RoomMessageEventContent::text_plain(format!( - "{} is not a valid username", - username + "{username} is not a valid username" ))) } } @@ -732,9 +713,8 @@ impl Service { } for &user_id in &user_ids { - match services().users.deactivate_account(user_id) { - Ok(_) => deactivation_count += 1, - Err(_) => {} + if services().users.deactivate_account(user_id).is_ok() { + deactivation_count += 1 } } @@ -746,8 +726,7 @@ impl Service { if admins.is_empty() { RoomMessageEventContent::text_plain(format!( - "Deactivated {} accounts.", - deactivation_count + "Deactivated {deactivation_count} accounts." )) } else { RoomMessageEventContent::text_plain(format!("Deactivated {} accounts.\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts", deactivation_count, admins.join(", "))) @@ -767,8 +746,8 @@ impl Service { fn usage_to_html(&self, text: &str, server_name: &ServerName) -> String { // Replace `@conduit:servername:-subcmdname` with `@conduit:servername: subcmdname` let text = text.replace( - &format!("@conduit:{}:-", server_name), - &format!("@conduit:{}: ", server_name), + &format!("@conduit:{server_name}:-"), + &format!("@conduit:{server_name}: "), ); // For the conduit admin room, subcommands become main commands diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index affc051..cd3be08 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -168,7 +168,7 @@ impl Service { .supported_room_versions() .contains(&s.config.default_room_version) { - error!("Room version in config isn't supported, falling back to default version"); + error!(config=?s.config.default_room_version, fallback=?crate::config::default_default_room_version(), "Room version in config isn't supported, falling back to default version"); s.config.default_room_version = crate::config::default_default_room_version(); }; @@ -222,6 +222,10 @@ impl Service { self.config.max_request_size } + pub fn max_fetch_prev_events(&self) -> u16 { + self.config.max_fetch_prev_events + } + pub fn allow_registration(&self) -> bool { self.config.allow_registration } @@ -341,6 +345,7 @@ impl Service { fn reqwest_client_builder(config: &Config) -> Result<reqwest::ClientBuilder> { let mut reqwest_client_builder = reqwest::Client::builder() + .pool_max_idle_per_host(0) .connect_timeout(Duration::from_secs(30)) .timeout(Duration::from_secs(60 * 3)); diff --git a/src/service/pdu.rs b/src/service/pdu.rs index 593a687..554f3be 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -1,4 +1,4 @@ -use crate::{services, Error}; +use crate::Error; use ruma::{ events::{ room::member::RoomMemberEventContent, AnyEphemeralRoomEvent, AnyStateEvent, @@ -7,7 +7,7 @@ use ruma::{ }, serde::Raw, state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, - OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId, + OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId, }; use serde::{Deserialize, Serialize}; use serde_json::{ @@ -334,23 +334,17 @@ impl Ord for PduEvent { /// Returns a tuple of the new `EventId` and the PDU as a `BTreeMap<String, CanonicalJsonValue>`. pub(crate) fn gen_event_id_canonical_json( pdu: &RawJsonValue, + room_version_id: &RoomVersionId, ) -> crate::Result<(OwnedEventId, CanonicalJsonObject)> { let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { warn!("Error parsing incoming event {:?}: {:?}", pdu, e); Error::BadServerResponse("Invalid PDU in server response") })?; - let room_id = value - .get("room_id") - .and_then(|id| RoomId::parse(id.as_str()?).ok()) - .ok_or_else(|| Error::bad_database("PDU in db has invalid room_id."))?; - - let room_version_id = services().rooms.state.get_room_version(&room_id); - let event_id = format!( "${}", // Anything higher than version3 behaves the same - ruma::signatures::reference_hash(&value, &room_version_id?) + ruma::signatures::reference_hash(&value, room_version_id) .expect("ruma can calculate reference hashes") ) .try_into() diff --git a/src/service/pusher/data.rs b/src/service/pusher/data.rs index e317121..2062f56 100644 --- a/src/service/pusher/data.rs +++ b/src/service/pusher/data.rs @@ -1,16 +1,15 @@ use crate::Result; use ruma::{ - api::client::push::{get_pushers, set_pusher}, + api::client::push::{set_pusher, Pusher}, UserId, }; pub trait Data: Send + Sync { - fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()>; + fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::PusherAction) -> Result<()>; - fn get_pusher(&self, sender: &UserId, pushkey: &str) - -> Result<Option<get_pushers::v3::Pusher>>; + fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result<Option<Pusher>>; - fn get_pushers(&self, sender: &UserId) -> Result<Vec<get_pushers::v3::Pusher>>; + fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>>; fn get_pushkeys<'a>(&'a self, sender: &UserId) -> Box<dyn Iterator<Item = Result<String>> + 'a>; diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 7fee276..ba096a2 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -6,7 +6,7 @@ use crate::{services, Error, PduEvent, Result}; use bytes::BytesMut; use ruma::{ api::{ - client::push::{get_pushers, set_pusher, PusherKind}, + client::push::{set_pusher, Pusher, PusherKind}, push_gateway::send_event_notification::{ self, v1::{Device, Notification, NotificationCounts, NotificationPriority}, @@ -23,30 +23,26 @@ use ruma::{ }; use std::{fmt::Debug, mem}; -use tracing::{error, info, warn}; +use tracing::{info, warn}; pub struct Service { pub db: &'static dyn Data, } impl Service { - pub fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> { + pub fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::PusherAction) -> Result<()> { self.db.set_pusher(sender, pusher) } - pub fn get_pusher( - &self, - sender: &UserId, - pushkey: &str, - ) -> Result<Option<get_pushers::v3::Pusher>> { + pub fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result<Option<Pusher>> { self.db.get_pusher(sender, pushkey) } - pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<get_pushers::v3::Pusher>> { + pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> { self.db.get_pushers(sender) } - pub fn get_pushkeys<'a>(&'a self, sender: &UserId) -> Box<dyn Iterator<Item = Result<String>>> { + pub fn get_pushkeys(&self, sender: &UserId) -> Box<dyn Iterator<Item = Result<String>>> { self.db.get_pushkeys(sender) } @@ -140,7 +136,7 @@ impl Service { &self, user: &UserId, unread: UInt, - pusher: &get_pushers::v3::Pusher, + pusher: &Pusher, ruleset: Ruleset, pdu: &PduEvent, ) -> Result<()> { @@ -221,91 +217,85 @@ impl Service { async fn send_notice( &self, unread: UInt, - pusher: &get_pushers::v3::Pusher, + pusher: &Pusher, tweaks: Vec<Tweak>, event: &PduEvent, ) -> Result<()> { // TODO: email - if pusher.kind == PusherKind::Email { - return Ok(()); - } - - // TODO: - // Two problems with this - // 1. if "event_id_only" is the only format kind it seems we should never add more info - // 2. can pusher/devices have conflicting formats - let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); - let url = if let Some(url) = &pusher.data.url { - url - } else { - error!("Http Pusher must have URL specified."); - return Ok(()); - }; - - let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone()); - let mut data_minus_url = pusher.data.clone(); - // The url must be stripped off according to spec - data_minus_url.url = None; - device.data = data_minus_url.into(); + match &pusher.kind { + PusherKind::Http(http) => { + // TODO: + // Two problems with this + // 1. if "event_id_only" is the only format kind it seems we should never add more info + // 2. can pusher/devices have conflicting formats + let event_id_only = http.format == Some(PushFormat::EventIdOnly); + + let mut device = Device::new(pusher.ids.app_id.clone(), pusher.ids.pushkey.clone()); + device.data.default_payload = http.default_payload.clone(); + device.data.format = http.format.clone(); + + // Tweaks are only added if the format is NOT event_id_only + if !event_id_only { + device.tweaks = tweaks.clone(); + } - // Tweaks are only added if the format is NOT event_id_only - if !event_id_only { - device.tweaks = tweaks.clone(); - } + let d = vec![device]; + let mut notifi = Notification::new(d); + + notifi.prio = NotificationPriority::Low; + notifi.event_id = Some((*event.event_id).to_owned()); + notifi.room_id = Some((*event.room_id).to_owned()); + // TODO: missed calls + notifi.counts = NotificationCounts::new(unread, uint!(0)); + + if event.kind == RoomEventType::RoomEncrypted + || tweaks + .iter() + .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) + { + notifi.prio = NotificationPriority::High + } - let d = &[device]; - let mut notifi = Notification::new(d); - - notifi.prio = NotificationPriority::Low; - notifi.event_id = Some(&event.event_id); - notifi.room_id = Some(&event.room_id); - // TODO: missed calls - notifi.counts = NotificationCounts::new(unread, uint!(0)); - - if event.kind == RoomEventType::RoomEncrypted - || tweaks - .iter() - .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) - { - notifi.prio = NotificationPriority::High - } + if event_id_only { + self.send_request(&http.url, send_event_notification::v1::Request::new(notifi)) + .await?; + } else { + notifi.sender = Some(event.sender.clone()); + notifi.event_type = Some(event.kind.clone()); + notifi.content = serde_json::value::to_raw_value(&event.content).ok(); + + if event.kind == RoomEventType::RoomMember { + notifi.user_is_target = + event.state_key.as_deref() == Some(event.sender.as_str()); + } + + notifi.sender_display_name = services().users.displayname(&event.sender)?; + + let room_name = if let Some(room_name_pdu) = services() + .rooms + .state_accessor + .room_state_get(&event.room_id, &StateEventType::RoomName, "")? + { + serde_json::from_str::<RoomNameEventContent>(room_name_pdu.content.get()) + .map_err(|_| { + Error::bad_database("Invalid room name event in database.") + })? + .name + } else { + None + }; + + notifi.room_name = room_name; + + self.send_request(&http.url, send_event_notification::v1::Request::new(notifi)) + .await?; + } - if event_id_only { - self.send_request(url, send_event_notification::v1::Request::new(notifi)) - .await?; - } else { - notifi.sender = Some(&event.sender); - notifi.event_type = Some(&event.kind); - let content = serde_json::value::to_raw_value(&event.content).ok(); - notifi.content = content.as_deref(); - - if event.kind == RoomEventType::RoomMember { - notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str()); + Ok(()) } - - let user_name = services().users.displayname(&event.sender)?; - notifi.sender_display_name = user_name.as_deref(); - - let room_name = if let Some(room_name_pdu) = services() - .rooms - .state_accessor - .room_state_get(&event.room_id, &StateEventType::RoomName, "")? - { - serde_json::from_str::<RoomNameEventContent>(room_name_pdu.content.get()) - .map_err(|_| Error::bad_database("Invalid room name event in database."))? - .name - } else { - None - }; - - notifi.room_name = room_name.as_deref(); - - self.send_request(url, send_event_notification::v1::Request::new(notifi)) - .await?; + // TODO: Handle email + PusherKind::Email(_) => Ok(()), + _ => Ok(()), } - - // TODO: email - - Ok(()) } } diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index d3b6e40..da1944e 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -6,7 +6,7 @@ use std::{ pub use data::Data; use ruma::{api::client::error::ErrorKind, EventId, RoomId}; -use tracing::log::warn; +use tracing::{debug, error, warn}; use crate::{services, Error, Result}; @@ -15,11 +15,7 @@ pub struct Service { } impl Service { - #[tracing::instrument(skip(self))] - pub fn get_cached_eventid_authchain<'a>( - &'a self, - key: &[u64], - ) -> Result<Option<Arc<HashSet<u64>>>> { + pub fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result<Option<Arc<HashSet<u64>>>> { self.db.get_cached_eventid_authchain(key) } @@ -89,10 +85,10 @@ impl Service { .rooms .auth_chain .cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?; - println!( - "cache missed event {} with auth chain len {}", - event_id, - auth_chain.len() + debug!( + event_id = ?event_id, + chain_length = ?auth_chain.len(), + "Cache missed event" ); chunk_cache.extend(auth_chain.iter()); @@ -102,11 +98,11 @@ impl Service { } }; } - println!( - "chunk missed with len {}, event hits2: {}, misses2: {}", - chunk_cache.len(), - hits2, - misses2 + debug!( + chunk_cache_length = ?chunk_cache.len(), + hits = ?hits2, + misses = ?misses2, + "Chunk missed", ); let chunk_cache = Arc::new(chunk_cache); services() @@ -116,11 +112,11 @@ impl Service { full_auth_chain.extend(chunk_cache.iter()); } - println!( - "total: {}, chunk hits: {}, misses: {}", - full_auth_chain.len(), - hits, - misses + debug!( + chain_length = ?full_auth_chain.len(), + hits = ?hits, + misses = ?misses, + "Auth chain stats", ); Ok(full_auth_chain @@ -152,10 +148,10 @@ impl Service { } } Ok(None) => { - warn!("Could not find pdu mentioned in auth events: {}", event_id); + warn!(?event_id, "Could not find pdu mentioned in auth events"); } - Err(e) => { - warn!("Could not load event in auth chain: {} {}", event_id, e); + Err(error) => { + error!(?event_id, ?error, "Could not load event in auth chain"); } } } diff --git a/src/service/rooms/edus/typing/data.rs b/src/service/rooms/edus/typing/data.rs index c4ad867..3b1eecf 100644 --- a/src/service/rooms/edus/typing/data.rs +++ b/src/service/rooms/edus/typing/data.rs @@ -10,6 +10,9 @@ pub trait Data: Send + Sync { /// Removes a user from typing before the timeout is reached. fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; + /// Makes sure that typing events with old timestamps get removed. + fn typings_maintain(&self, room_id: &RoomId) -> Result<()>; + /// Returns the count of the last typing update in this room. fn last_typing_update(&self, room_id: &RoomId) -> Result<u64>; diff --git a/src/service/rooms/edus/typing/mod.rs b/src/service/rooms/edus/typing/mod.rs index d05ec90..7d44f7d 100644 --- a/src/service/rooms/edus/typing/mod.rs +++ b/src/service/rooms/edus/typing/mod.rs @@ -21,54 +21,15 @@ impl Service { self.db.typing_remove(user_id, room_id) } - /* TODO: Do this in background thread? /// Makes sure that typing events with old timestamps get removed. - fn typings_maintain( - &self, - room_id: &RoomId, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let current_timestamp = utils::millis_since_unix_epoch(); - - let mut found_outdated = false; - - // Find all outdated edus before inserting a new one - for outdated_edu in self - .typingid_userid - .scan_prefix(prefix) - .map(|(key, _)| { - Ok::<_, Error>(( - key.clone(), - utils::u64_from_bytes( - &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| { - Error::bad_database("RoomTyping has invalid timestamp or delimiters.") - })?[0..mem::size_of::<u64>()], - ) - .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, - )) - }) - .filter_map(|r| r.ok()) - .take_while(|&(_, timestamp)| timestamp < current_timestamp) - { - // This is an outdated edu (time > timestamp) - self.typingid_userid.remove(&outdated_edu.0)?; - found_outdated = true; - } - - if found_outdated { - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; - } - - Ok(()) + fn typings_maintain(&self, room_id: &RoomId) -> Result<()> { + self.db.typings_maintain(room_id) } - */ /// Returns the count of the last typing update in this room. pub fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> { + self.typings_maintain(room_id)?; + self.db.last_typing_update(room_id) } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 477a971..bc67f7a 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -7,7 +7,7 @@ use ruma::{ RoomVersionId, }; use std::{ - collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet}, + collections::{hash_map, BTreeMap, HashMap, HashSet}, pin::Pin, sync::{Arc, RwLock, RwLockWriteGuard}, time::{Duration, Instant, SystemTime}, @@ -76,6 +76,7 @@ impl Service { is_timeline_event: bool, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, ) -> Result<Option<Vec<u8>>> { + // 0. Check the server is in the room if !services().rooms.metadata.exists(room_id)? { return Err(Error::BadRequest( ErrorKind::NotFound, @@ -101,6 +102,13 @@ impl Service { .room_state_get(room_id, &StateEventType::RoomCreate, "")? .ok_or_else(|| Error::bad_database("Failed to find create event in db."))?; + let create_event_content: RoomCreateEventContent = + serde_json::from_str(create_event.content.get()).map_err(|e| { + error!("Invalid create event: {}", e); + Error::BadDatabase("Invalid create event in db") + })?; + let room_version_id = &create_event_content.room_version; + let first_pdu_in_room = services() .rooms .timeline @@ -127,13 +135,15 @@ impl Service { origin, &create_event, room_id, + room_version_id, pub_key_map, incoming_pdu.prev_events.clone(), ) .await?; let mut errors = 0; - for prev_id in dbg!(sorted_prev_events) { + debug!(events = ?sorted_prev_events, "Got previous events"); + for prev_id in sorted_prev_events { // Check for disabled again because it might have changed if services().rooms.metadata.is_disabled(room_id)? { return Err(Error::BadRequest( @@ -303,7 +313,7 @@ impl Service { Ok(ruma::signatures::Verified::Signatures) => { // Redact warn!("Calculated hash does not match: {}", event_id); - match ruma::canonical_json::redact(&value, room_version_id) { + match ruma::canonical_json::redact(value, room_version_id, None) { Ok(obj) => obj, Err(_) => { return Err(Error::BadRequest( @@ -330,7 +340,7 @@ impl Service { // 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // NOTE: Step 5 is not applied anymore because it failed too often - warn!("Fetching auth events for {}", incoming_pdu.event_id); + debug!(event_id = ?incoming_pdu.event_id, "Fetching auth events"); self.fetch_and_handle_outliers( origin, &incoming_pdu @@ -340,6 +350,7 @@ impl Service { .collect::<Vec<_>>(), create_event, room_id, + room_version_id, pub_key_map, ) .await; @@ -542,7 +553,7 @@ impl Service { let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); for (sstatehash, prev_event) in extremity_sstatehashes { - let mut leaf_state: BTreeMap<_, _> = services() + let mut leaf_state: HashMap<_, _> = services() .rooms .state_accessor .state_full_ids(sstatehash) @@ -627,8 +638,8 @@ impl Service { .send_federation_request( origin, get_room_state_ids::v1::Request { - room_id, - event_id: &incoming_pdu.event_id, + room_id: room_id.to_owned(), + event_id: (*incoming_pdu.event_id).to_owned(), }, ) .await @@ -644,11 +655,12 @@ impl Service { .collect::<Vec<_>>(), create_event, room_id, + room_version_id, pub_key_map, ) .await; - let mut state: BTreeMap<_, Arc<EventId>> = BTreeMap::new(); + let mut state: HashMap<_, Arc<EventId>> = HashMap::new(); for (pdu, _) in state_vec { let state_key = pdu.state_key.clone().ok_or_else(|| { Error::bad_database("Found non-state pdu in state events.") @@ -660,10 +672,10 @@ impl Service { )?; match state.entry(shortstatekey) { - btree_map::Entry::Vacant(v) => { + hash_map::Entry::Vacant(v) => { v.insert(Arc::from(&*pdu.event_id)); } - btree_map::Entry::Occupied(_) => return Err( + hash_map::Entry::Occupied(_) => return Err( Error::bad_database("State event's type and state_key combination exists multiple times."), ), } @@ -827,8 +839,8 @@ impl Service { info!("Preparing for stateres to derive new room state"); let mut extremity_sstatehashes = HashMap::new(); - info!("Loading extremities"); - for id in dbg!(&extremities) { + info!(?extremities, "Loading extremities"); + for id in &extremities { match services().rooms.timeline.get_pdu(id)? { Some(leaf_pdu) => { extremity_sstatehashes.insert( @@ -1024,6 +1036,7 @@ impl Service { events: &'a [Arc<EventId>], create_event: &'a PduEvent, room_id: &'a RoomId, + room_version_id: &'a RoomVersionId, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, ) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> { @@ -1099,14 +1112,16 @@ impl Service { .sending .send_federation_request( origin, - get_event::v1::Request { event_id: &next_id }, + get_event::v1::Request { + event_id: (*next_id).to_owned(), + }, ) .await { Ok(res) => { info!("Got {} over federation", next_id); let (calculated_event_id, value) = - match pdu::gen_event_id_canonical_json(&res.pdu) { + match pdu::gen_event_id_canonical_json(&res.pdu, room_version_id) { Ok(t) => t, Err(_) => { back_off((*next_id).to_owned()); @@ -1179,6 +1194,7 @@ impl Service { origin: &ServerName, create_event: &PduEvent, room_id: &RoomId, + room_version_id: &RoomVersionId, pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, initial_set: Vec<Arc<EventId>>, ) -> Result<( @@ -1204,12 +1220,13 @@ impl Service { &[prev_event_id.clone()], create_event, room_id, + room_version_id, pub_key_map, ) .await .pop() { - if amount > 100 { + if amount > services().globals.max_fetch_prev_events() { // Max limit reached warn!("Max prev event limit reached!"); graph.insert(prev_event_id.clone(), HashSet::new()); @@ -1256,7 +1273,6 @@ impl Service { // This return value is the key used for sorting events, // events are then sorted by power level, time, // and lexically by event_id. - println!("{}", event_id); Ok(( int!(0), MilliSecondsSinceUnixEpoch( @@ -1464,7 +1480,17 @@ impl Service { .write() .map_err(|_| Error::bad_database("RwLock is poisoned."))?; for k in keys.server_keys { - let k = k.deserialize().unwrap(); + let k = match k.deserialize() { + Ok(key) => key, + Err(e) => { + warn!( + "Received error {} while fetching keys from trusted server {}", + e, server + ); + warn!("{}", k.into_json()); + continue; + } + }; // TODO: Check signature from trusted server? servers.remove(&k.server_name); @@ -1664,7 +1690,7 @@ impl Service { .send_federation_request( server, get_remote_server_keys::v2::Request::new( - origin, + origin.to_owned(), MilliSecondsSinceUnixEpoch::from_system_time( SystemTime::now() .checked_add(Duration::from_secs(3600)) diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs index 82c0800..6eef38f 100644 --- a/src/service/rooms/search/data.rs +++ b/src/service/rooms/search/data.rs @@ -2,7 +2,7 @@ use crate::Result; use ruma::RoomId; pub trait Data: Send + Sync { - fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()>; + fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()>; fn search_pdus<'a>( &'a self, diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs index f52ea72..96116b0 100644 --- a/src/service/rooms/state/data.rs +++ b/src/service/rooms/state/data.rs @@ -22,7 +22,7 @@ pub trait Data: Send + Sync { fn get_forward_extremities(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>>; /// Replace the forward extremities of the room. - fn set_forward_extremities<'a>( + fn set_forward_extremities( &self, room_id: &RoomId, event_ids: Vec<OwnedEventId>, diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 0e45032..3072b80 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -343,7 +343,7 @@ impl Service { self.db.get_forward_extremities(room_id) } - pub fn set_forward_extremities<'a>( + pub fn set_forward_extremities( &self, room_id: &RoomId, event_ids: Vec<OwnedEventId>, diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 340b19c..f3ae3c2 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use ruma::{events::StateEventType, EventId, RoomId}; @@ -12,7 +9,7 @@ use crate::{PduEvent, Result}; pub trait Data: Send + Sync { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. - async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>>; + async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>>; async fn state_full( &self, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 1a9c4a9..87d9936 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -1,8 +1,5 @@ mod data; -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; pub use data::Data; use ruma::{events::StateEventType, EventId, RoomId}; @@ -16,7 +13,8 @@ pub struct Service { impl Service { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. - pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> { + #[tracing::instrument(skip(self))] + pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> { self.db.state_full_ids(shortstatehash).await } @@ -39,7 +37,6 @@ impl Service { } /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). - #[tracing::instrument(skip(self))] pub fn state_get( &self, shortstatehash: u64, diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs index 42de56d..d8bb4a4 100644 --- a/src/service/rooms/state_cache/data.rs +++ b/src/service/rooms/state_cache/data.rs @@ -37,7 +37,7 @@ pub trait Data: Send + Sync { room_id: &RoomId, ) -> Box<dyn Iterator<Item = Result<OwnedServerName>> + 'a>; - fn server_in_room<'a>(&'a self, server: &ServerName, room_id: &RoomId) -> Result<bool>; + fn server_in_room(&self, server: &ServerName, room_id: &RoomId) -> Result<bool>; /// Returns an iterator of all rooms a server participates in (as far as we know). fn server_rooms<'a>( diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 6c9bed3..32afdd4 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -164,7 +164,7 @@ impl Service { .content .ignored_users .iter() - .any(|user| user == sender) + .any(|(user, _details)| user == sender) }); if is_ignored { diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 619dca2..cc58e6f 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -22,6 +22,7 @@ use ruma::{ }, push::{Action, Ruleset, Tweak}, state_res, + state_res::Event, state_res::RoomVersion, uint, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, RoomAliasId, RoomId, UserId, @@ -378,7 +379,7 @@ impl Service { )?; let server_user = format!("@conduit:{}", services().globals.server_name()); - let to_conduit = body.starts_with(&format!("{}: ", server_user)); + let to_conduit = body.starts_with(&format!("{server_user}: ")); // This will evaluate to false if the emergency password is set up so that // the administrator can execute commands as conduit @@ -683,6 +684,92 @@ impl Service { let (pdu, pdu_json) = self.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)?; + let admin_room = services().rooms.alias.resolve_local_alias( + <&RoomAliasId>::try_from( + format!("#admins:{}", services().globals.server_name()).as_str(), + ) + .expect("#admins:server_name is a valid room alias"), + )?; + if admin_room.filter(|v| v == room_id).is_some() { + match pdu.event_type() { + RoomEventType::RoomEncryption => { + warn!("Encryption is not allowed in the admins room"); + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Encryption is not allowed in the admins room.", + )); + } + RoomEventType::RoomMember => { + #[derive(Deserialize)] + struct ExtractMembership { + membership: MembershipState, + } + + let target = pdu + .state_key() + .filter(|v| v.starts_with("@")) + .unwrap_or(sender.as_str()); + let server_name = services().globals.server_name(); + let server_user = format!("@conduit:{}", server_name); + let content = serde_json::from_str::<ExtractMembership>(pdu.content.get()) + .map_err(|_| Error::bad_database("Invalid content in pdu."))?; + + if content.membership == MembershipState::Leave { + if target == &server_user { + warn!("Conduit user cannot leave from admins room"); + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Conduit user cannot leave from admins room.", + )); + } + + let count = services() + .rooms + .state_cache + .room_members(room_id) + .filter_map(|m| m.ok()) + .filter(|m| m.server_name() == server_name) + .filter(|m| m != target) + .count(); + if count < 2 { + warn!("Last admin cannot leave from admins room"); + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Last admin cannot leave from admins room.", + )); + } + } + + if content.membership == MembershipState::Ban && pdu.state_key().is_some() { + if target == &server_user { + warn!("Conduit user cannot be banned in admins room"); + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Conduit user cannot be banned in admins room.", + )); + } + + let count = services() + .rooms + .state_cache + .room_members(room_id) + .filter_map(|m| m.ok()) + .filter(|m| m.server_name() == server_name) + .filter(|m| m != target) + .count(); + if count < 2 { + warn!("Last admin cannot be banned in admins room"); + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Last admin cannot be banned in admins room.", + )); + } + } + } + _ => {} + } + } + // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = services().rooms.state.append_to_state(&pdu)?; diff --git a/src/service/rooms/user/data.rs b/src/service/rooms/user/data.rs index 43c4c92..4b8a4ec 100644 --- a/src/service/rooms/user/data.rs +++ b/src/service/rooms/user/data.rs @@ -8,6 +8,9 @@ pub trait Data: Send + Sync { fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>; + // Returns the count at which the last reset_notification_counts was called + fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>; + fn associate_token_shortstatehash( &self, room_id: &RoomId, diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index a765cfd..672e502 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -22,6 +22,10 @@ impl Service { self.db.highlight_count(user_id, room_id) } + pub fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> { + self.db.last_notification_read(user_id, room_id) + } + pub fn associate_token_shortstatehash( &self, room_id: &RoomId, @@ -36,10 +40,10 @@ impl Service { self.db.get_token_shortstatehash(room_id, token) } - pub fn get_shared_rooms<'a>( - &'a self, + pub fn get_shared_rooms( + &self, users: Vec<OwnedUserId>, - ) -> Result<impl Iterator<Item = Result<OwnedRoomId>> + 'a> { + ) -> Result<impl Iterator<Item = Result<OwnedRoomId>>> { self.db.get_shared_rooms(users) } } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index afa12fc..1861feb 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -496,7 +496,7 @@ impl Service { ) })?, appservice::event::push_events::v1::Request { - events: &pdu_jsons, + events: pdu_jsons, txn_id: (&*base64::encode_config( calculate_hash( &events @@ -638,9 +638,9 @@ impl Service { let response = server_server::send_request( server, send_transaction_message::v1::Request { - origin: services().globals.server_name(), - pdus: &pdu_jsons, - edus: &edu_jsons, + origin: services().globals.server_name().to_owned(), + pdus: pdu_jsons, + edus: edu_jsons, origin_server_ts: MilliSecondsSinceUnixEpoch::now(), transaction_id: (&*base64::encode_config( calculate_hash( diff --git a/src/service/uiaa/mod.rs b/src/service/uiaa/mod.rs index 672290c..147ce4d 100644 --- a/src/service/uiaa/mod.rs +++ b/src/service/uiaa/mod.rs @@ -5,7 +5,7 @@ pub use data::Data; use ruma::{ api::client::{ error::ErrorKind, - uiaa::{AuthType, IncomingAuthData, IncomingPassword, IncomingUserIdentifier, UiaaInfo}, + uiaa::{AuthData, AuthType, Password, UiaaInfo, UserIdentifier}, }, CanonicalJsonValue, DeviceId, UserId, }; @@ -44,7 +44,7 @@ impl Service { &self, user_id: &UserId, device_id: &DeviceId, - auth: &IncomingAuthData, + auth: &AuthData, uiaainfo: &UiaaInfo, ) -> Result<(bool, UiaaInfo)> { let mut uiaainfo = auth @@ -58,13 +58,13 @@ impl Service { match auth { // Find out what the user completed - IncomingAuthData::Password(IncomingPassword { + AuthData::Password(Password { identifier, password, .. }) => { let username = match identifier { - IncomingUserIdentifier::UserIdOrLocalpart(username) => username, + UserIdentifier::UserIdOrLocalpart(username) => username, _ => { return Err(Error::BadRequest( ErrorKind::Unrecognized, @@ -85,7 +85,7 @@ impl Service { argon2::verify_encoded(&hash, password.as_bytes()).unwrap_or(false); if !hash_matches { - uiaainfo.auth_error = Some(ruma::api::client::error::ErrorBody { + uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody { kind: ErrorKind::Forbidden, message: "Invalid username or password.".to_owned(), }); @@ -96,7 +96,7 @@ impl Service { // Password was correct! Let's add it to `completed` uiaainfo.completed.push(AuthType::Password); } - IncomingAuthData::Dummy(_) => { + AuthData::Dummy(_) => { uiaainfo.completed.push(AuthType::Dummy); } k => error!("type not supported: {:?}", k), diff --git a/src/service/users/data.rs b/src/service/users/data.rs index bc1db33..8553210 100644 --- a/src/service/users/data.rs +++ b/src/service/users/data.rs @@ -1,6 +1,6 @@ use crate::Result; use ruma::{ - api::client::{device::Device, filter::IncomingFilterDefinition}, + api::client::{device::Device, filter::FilterDefinition}, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, @@ -191,11 +191,7 @@ pub trait Data: Send + Sync { ) -> Box<dyn Iterator<Item = Result<Device>> + 'a>; /// Creates a new sync filter. Returns the filter id. - fn create_filter(&self, user_id: &UserId, filter: &IncomingFilterDefinition) -> Result<String>; + fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String>; - fn get_filter( - &self, - user_id: &UserId, - filter_id: &str, - ) -> Result<Option<IncomingFilterDefinition>>; + fn get_filter(&self, user_id: &UserId, filter_id: &str) -> Result<Option<FilterDefinition>>; } diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 9dcfa8b..6be5c89 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, mem}; pub use data::Data; use ruma::{ - api::client::{device::Device, error::ErrorKind, filter::IncomingFilterDefinition}, + api::client::{device::Device, error::ErrorKind, filter::FilterDefinition}, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, @@ -326,11 +326,7 @@ impl Service { } /// Creates a new sync filter. Returns the filter id. - pub fn create_filter( - &self, - user_id: &UserId, - filter: &IncomingFilterDefinition, - ) -> Result<String> { + pub fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String> { self.db.create_filter(user_id, filter) } @@ -338,7 +334,7 @@ impl Service { &self, user_id: &UserId, filter_id: &str, - ) -> Result<Option<IncomingFilterDefinition>> { + ) -> Result<Option<FilterDefinition>> { self.db.get_filter(user_id, filter_id) } } diff --git a/src/utils/error.rs b/src/utils/error.rs index 9c8617f..4f044ca 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -3,7 +3,7 @@ use std::convert::Infallible; use http::StatusCode; use ruma::{ api::client::{ - error::{Error as RumaError, ErrorKind}, + error::{Error as RumaError, ErrorBody, ErrorKind}, uiaa::{UiaaInfo, UiaaResponse}, }, OwnedServerName, @@ -102,11 +102,14 @@ impl Error { if let Self::FederationError(origin, error) = self { let mut error = error.clone(); - error.message = format!("Answer from {}: {}", origin, error.message); + error.body = ErrorBody::Standard { + kind: Unknown, + message: format!("Answer from {origin}: {error}"), + }; return RumaResponse(UiaaResponse::MatrixError(error)); } - let message = format!("{}", self); + let message = format!("{self}"); use ErrorKind::*; let (kind, status_code) = match self { @@ -131,8 +134,7 @@ impl Error { warn!("{}: {}", status_code, message); RumaResponse(UiaaResponse::MatrixError(RumaError { - kind, - message, + body: ErrorBody::Standard { kind, message }, status_code, })) } |