summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJonathan de Jong <jonathan@automatia.nl>2023-01-26 18:19:39 +0100
committerJonathan de Jong <jonathan@automatia.nl>2023-01-26 18:19:39 +0100
commitb158896396bf33e088dd35ec651c52e6afe646e5 (patch)
tree660940ce37ec869a667d0477f8e8dde82193f450 /src
parent52018c3967185c04b32bca63752ad2bf61fd2ed9 (diff)
parentf95dd4521caaa8dcbeadc463f535eb34186d6ed7 (diff)
downloadconduit-b158896396bf33e088dd35ec651c52e6afe646e5.zip
Merge remote-tracking branch 'origin/next' into complement-improvements
Diffstat (limited to 'src')
-rw-r--r--src/api/client_server/account.rs27
-rw-r--r--src/api/client_server/alias.rs26
-rw-r--r--src/api/client_server/backup.rs24
-rw-r--r--src/api/client_server/capabilities.rs2
-rw-r--r--src/api/client_server/config.rs8
-rw-r--r--src/api/client_server/context.rs2
-rw-r--r--src/api/client_server/device.rs8
-rw-r--r--src/api/client_server/directory.rs38
-rw-r--r--src/api/client_server/filter.rs4
-rw-r--r--src/api/client_server/keys.rs8
-rw-r--r--src/api/client_server/media.rs22
-rw-r--r--src/api/client_server/membership.rs584
-rw-r--r--src/api/client_server/message.rs4
-rw-r--r--src/api/client_server/mod.rs4
-rw-r--r--src/api/client_server/presence.rs4
-rw-r--r--src/api/client_server/profile.rs20
-rw-r--r--src/api/client_server/push.rs74
-rw-r--r--src/api/client_server/read_marker.rs4
-rw-r--r--src/api/client_server/redact.rs2
-rw-r--r--src/api/client_server/report.rs2
-rw-r--r--src/api/client_server/room.rs10
-rw-r--r--src/api/client_server/search.rs4
-rw-r--r--src/api/client_server/session.rs12
-rw-r--r--src/api/client_server/state.rs10
-rw-r--r--src/api/client_server/sync.rs68
-rw-r--r--src/api/client_server/tag.rs8
-rw-r--r--src/api/client_server/thirdparty.rs2
-rw-r--r--src/api/client_server/to_device.rs2
-rw-r--r--src/api/client_server/typing.rs2
-rw-r--r--src/api/client_server/unversioned.rs2
-rw-r--r--src/api/client_server/user_directory.rs2
-rw-r--r--src/api/client_server/voip.rs2
-rw-r--r--src/api/ruma_wrapper/axum.rs3
-rw-r--r--src/api/server_server.rs112
-rw-r--r--src/config/mod.rs16
-rw-r--r--src/database/abstraction/sqlite.rs19
-rw-r--r--src/database/key_value/pusher.rs50
-rw-r--r--src/database/key_value/rooms/edus/typing.rs43
-rw-r--r--src/database/key_value/rooms/state_accessor.rs9
-rw-r--r--src/database/key_value/rooms/user.rs25
-rw-r--r--src/database/key_value/users.rs10
-rw-r--r--src/database/mod.rs103
-rw-r--r--src/lib.rs2
-rw-r--r--src/main.rs142
-rw-r--r--src/service/admin/mod.rs77
-rw-r--r--src/service/globals/mod.rs7
-rw-r--r--src/service/pdu.rs14
-rw-r--r--src/service/pusher/data.rs9
-rw-r--r--src/service/pusher/mod.rs166
-rw-r--r--src/service/rooms/auth_chain/mod.rs42
-rw-r--r--src/service/rooms/edus/typing/data.rs3
-rw-r--r--src/service/rooms/edus/typing/mod.rs47
-rw-r--r--src/service/rooms/event_handler/mod.rs62
-rw-r--r--src/service/rooms/search/data.rs2
-rw-r--r--src/service/rooms/state/data.rs2
-rw-r--r--src/service/rooms/state/mod.rs2
-rw-r--r--src/service/rooms/state_accessor/data.rs7
-rw-r--r--src/service/rooms/state_accessor/mod.rs9
-rw-r--r--src/service/rooms/state_cache/data.rs2
-rw-r--r--src/service/rooms/state_cache/mod.rs2
-rw-r--r--src/service/rooms/timeline/mod.rs89
-rw-r--r--src/service/rooms/user/data.rs3
-rw-r--r--src/service/rooms/user/mod.rs10
-rw-r--r--src/service/sending/mod.rs8
-rw-r--r--src/service/uiaa/mod.rs12
-rw-r--r--src/service/users/data.rs10
-rw-r--r--src/service/users/mod.rs10
-rw-r--r--src/utils/error.rs12
68 files changed, 1317 insertions, 815 deletions
diff --git a/src/api/client_server/account.rs b/src/api/client_server/account.rs
index c9e3c9b..7459254 100644
--- a/src/api/client_server/account.rs
+++ b/src/api/client_server/account.rs
@@ -4,8 +4,8 @@ use ruma::{
api::client::{
account::{
change_password, deactivate, get_3pids, get_username_availability, register,
- request_3pid_management_token_via_email, request_3pid_management_token_via_msisdn, whoami,
- ThirdPartyIdRemovalStatus,
+ request_3pid_management_token_via_email, request_3pid_management_token_via_msisdn,
+ whoami, ThirdPartyIdRemovalStatus,
},
error::ErrorKind,
uiaa::{AuthFlow, AuthType, UiaaInfo},
@@ -30,7 +30,7 @@ const RANDOM_USER_ID_LENGTH: usize = 10;
///
/// Note: This will not reserve the username, so the username might become invalid when trying to register
pub async fn get_register_available_route(
- body: Ruma<get_username_availability::v3::IncomingRequest>,
+ body: Ruma<get_username_availability::v3::Request>,
) -> Result<get_username_availability::v3::Response> {
// Validate user id
let user_id = UserId::parse_with_server_name(
@@ -73,9 +73,7 @@ pub async fn get_register_available_route(
/// - If type is not guest and no username is given: Always fails after UIAA check
/// - Creates a new account and populates it with default account data
/// - If `inhibit_login` is false: Creates a device and returns device id and access_token
-pub async fn register_route(
- body: Ruma<register::v3::IncomingRequest>,
-) -> Result<register::v3::Response> {
+pub async fn register_route(body: Ruma<register::v3::Request>) -> Result<register::v3::Response> {
if !services().globals.allow_registration() && !body.from_appservice {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
@@ -227,8 +225,7 @@ pub async fn register_route(
services()
.admin
.send_message(RoomMessageEventContent::notice_plain(format!(
- "New user {} registered on this server.",
- user_id
+ "New user {user_id} registered on this server."
)));
// If this is the first real user, grant them admin privileges
@@ -266,7 +263,7 @@ pub async fn register_route(
/// - Forgets to-device events
/// - Triggers device list updates
pub async fn change_password_route(
- body: Ruma<change_password::v3::IncomingRequest>,
+ body: Ruma<change_password::v3::Request>,
) -> Result<change_password::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
@@ -320,8 +317,7 @@ pub async fn change_password_route(
services()
.admin
.send_message(RoomMessageEventContent::notice_plain(format!(
- "User {} changed their password.",
- sender_user
+ "User {sender_user} changed their password."
)));
Ok(change_password::v3::Response {})
@@ -354,7 +350,7 @@ pub async fn whoami_route(body: Ruma<whoami::v3::Request>) -> Result<whoami::v3:
/// - Triggers device list updates
/// - Removes ability to log in again
pub async fn deactivate_route(
- body: Ruma<deactivate::v3::IncomingRequest>,
+ body: Ruma<deactivate::v3::Request>,
) -> Result<deactivate::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
@@ -398,8 +394,7 @@ pub async fn deactivate_route(
services()
.admin
.send_message(RoomMessageEventContent::notice_plain(format!(
- "User {} deactivated their account.",
- sender_user
+ "User {sender_user} deactivated their account."
)));
Ok(deactivate::v3::Response {
@@ -426,7 +421,7 @@ pub async fn third_party_route(
///
/// - 403 signals that The homeserver does not allow the third party identifier as a contact option.
pub async fn request_3pid_management_token_via_email_route(
- _body: Ruma<request_3pid_management_token_via_email::v3::IncomingRequest>,
+ _body: Ruma<request_3pid_management_token_via_email::v3::Request>,
) -> Result<request_3pid_management_token_via_email::v3::Response> {
Err(Error::BadRequest(
ErrorKind::ThreepidDenied,
@@ -440,7 +435,7 @@ pub async fn request_3pid_management_token_via_email_route(
///
/// - 403 signals that The homeserver does not allow the third party identifier as a contact option.
pub async fn request_3pid_management_token_via_msisdn_route(
- _body: Ruma<request_3pid_management_token_via_msisdn::v3::IncomingRequest>,
+ _body: Ruma<request_3pid_management_token_via_msisdn::v3::Request>,
) -> Result<request_3pid_management_token_via_msisdn::v3::Response> {
Err(Error::BadRequest(
ErrorKind::ThreepidDenied,
diff --git a/src/api/client_server/alias.rs b/src/api/client_server/alias.rs
index b28606c..ab51b50 100644
--- a/src/api/client_server/alias.rs
+++ b/src/api/client_server/alias.rs
@@ -9,14 +9,14 @@ use ruma::{
},
federation,
},
- RoomAliasId,
+ OwnedRoomAliasId,
};
/// # `PUT /_matrix/client/r0/directory/room/{roomAlias}`
///
/// Creates a new room alias on this server.
pub async fn create_alias_route(
- body: Ruma<create_alias::v3::IncomingRequest>,
+ body: Ruma<create_alias::v3::Request>,
) -> Result<create_alias::v3::Response> {
if body.room_alias.server_name() != services().globals.server_name() {
return Err(Error::BadRequest(
@@ -49,7 +49,7 @@ pub async fn create_alias_route(
/// - TODO: additional access control checks
/// - TODO: Update canonical alias event
pub async fn delete_alias_route(
- body: Ruma<delete_alias::v3::IncomingRequest>,
+ body: Ruma<delete_alias::v3::Request>,
) -> Result<delete_alias::v3::Response> {
if body.room_alias.server_name() != services().globals.server_name() {
return Err(Error::BadRequest(
@@ -71,18 +71,22 @@ pub async fn delete_alias_route(
///
/// - TODO: Suggest more servers to join via
pub async fn get_alias_route(
- body: Ruma<get_alias::v3::IncomingRequest>,
+ body: Ruma<get_alias::v3::Request>,
) -> Result<get_alias::v3::Response> {
- get_alias_helper(&body.room_alias).await
+ get_alias_helper(body.body.room_alias).await
}
-pub(crate) async fn get_alias_helper(room_alias: &RoomAliasId) -> Result<get_alias::v3::Response> {
+pub(crate) async fn get_alias_helper(
+ room_alias: OwnedRoomAliasId,
+) -> Result<get_alias::v3::Response> {
if room_alias.server_name() != services().globals.server_name() {
let response = services()
.sending
.send_federation_request(
room_alias.server_name(),
- federation::query::get_room_information::v1::Request { room_alias },
+ federation::query::get_room_information::v1::Request {
+ room_alias: room_alias.to_owned(),
+ },
)
.await?;
@@ -93,7 +97,7 @@ pub(crate) async fn get_alias_helper(room_alias: &RoomAliasId) -> Result<get_ali
}
let mut room_id = None;
- match services().rooms.alias.resolve_local_alias(room_alias)? {
+ match services().rooms.alias.resolve_local_alias(&room_alias)? {
Some(r) => room_id = Some(r),
None => {
for (_id, registration) in services().appservice.all()? {
@@ -115,7 +119,9 @@ pub(crate) async fn get_alias_helper(room_alias: &RoomAliasId) -> Result<get_ali
.sending
.send_appservice_request(
registration,
- appservice::query::query_room_alias::v1::Request { room_alias },
+ appservice::query::query_room_alias::v1::Request {
+ room_alias: room_alias.clone(),
+ },
)
.await
.is_ok()
@@ -124,7 +130,7 @@ pub(crate) async fn get_alias_helper(room_alias: &RoomAliasId) -> Result<get_ali
services()
.rooms
.alias
- .resolve_local_alias(room_alias)?
+ .resolve_local_alias(&room_alias)?
.ok_or_else(|| {
Error::bad_config("Appservice lied to us. Room does not exist.")
})?,
diff --git a/src/api/client_server/backup.rs b/src/api/client_server/backup.rs
index f3d5ddc..115cba7 100644
--- a/src/api/client_server/backup.rs
+++ b/src/api/client_server/backup.rs
@@ -28,7 +28,7 @@ pub async fn create_backup_version_route(
///
/// Update information about an existing backup. Only `auth_data` can be modified.
pub async fn update_backup_version_route(
- body: Ruma<update_backup_version::v3::IncomingRequest>,
+ body: Ruma<update_backup_version::v3::Request>,
) -> Result<update_backup_version::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
services()
@@ -66,7 +66,7 @@ pub async fn get_latest_backup_info_route(
///
/// Get information about an existing backup.
pub async fn get_backup_info_route(
- body: Ruma<get_backup_info::v3::IncomingRequest>,
+ body: Ruma<get_backup_info::v3::Request>,
) -> Result<get_backup_info::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let algorithm = services()
@@ -96,7 +96,7 @@ pub async fn get_backup_info_route(
///
/// - Deletes both information about the backup, as well as all key data related to the backup
pub async fn delete_backup_version_route(
- body: Ruma<delete_backup_version::v3::IncomingRequest>,
+ body: Ruma<delete_backup_version::v3::Request>,
) -> Result<delete_backup_version::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -115,7 +115,7 @@ pub async fn delete_backup_version_route(
/// - Adds the keys to the backup
/// - Returns the new number of keys in this backup and the etag
pub async fn add_backup_keys_route(
- body: Ruma<add_backup_keys::v3::IncomingRequest>,
+ body: Ruma<add_backup_keys::v3::Request>,
) -> Result<add_backup_keys::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -162,7 +162,7 @@ pub async fn add_backup_keys_route(
/// - Adds the keys to the backup
/// - Returns the new number of keys in this backup and the etag
pub async fn add_backup_keys_for_room_route(
- body: Ruma<add_backup_keys_for_room::v3::IncomingRequest>,
+ body: Ruma<add_backup_keys_for_room::v3::Request>,
) -> Result<add_backup_keys_for_room::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -207,7 +207,7 @@ pub async fn add_backup_keys_for_room_route(
/// - Adds the keys to the backup
/// - Returns the new number of keys in this backup and the etag
pub async fn add_backup_keys_for_session_route(
- body: Ruma<add_backup_keys_for_session::v3::IncomingRequest>,
+ body: Ruma<add_backup_keys_for_session::v3::Request>,
) -> Result<add_backup_keys_for_session::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -246,7 +246,7 @@ pub async fn add_backup_keys_for_session_route(
///
/// Retrieves all keys from the backup.
pub async fn get_backup_keys_route(
- body: Ruma<get_backup_keys::v3::IncomingRequest>,
+ body: Ruma<get_backup_keys::v3::Request>,
) -> Result<get_backup_keys::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -259,7 +259,7 @@ pub async fn get_backup_keys_route(
///
/// Retrieves all keys from the backup for a given room.
pub async fn get_backup_keys_for_room_route(
- body: Ruma<get_backup_keys_for_room::v3::IncomingRequest>,
+ body: Ruma<get_backup_keys_for_room::v3::Request>,
) -> Result<get_backup_keys_for_room::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -274,7 +274,7 @@ pub async fn get_backup_keys_for_room_route(
///
/// Retrieves a key from the backup.
pub async fn get_backup_keys_for_session_route(
- body: Ruma<get_backup_keys_for_session::v3::IncomingRequest>,
+ body: Ruma<get_backup_keys_for_session::v3::Request>,
) -> Result<get_backup_keys_for_session::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -293,7 +293,7 @@ pub async fn get_backup_keys_for_session_route(
///
/// Delete the keys from the backup.
pub async fn delete_backup_keys_route(
- body: Ruma<delete_backup_keys::v3::IncomingRequest>,
+ body: Ruma<delete_backup_keys::v3::Request>,
) -> Result<delete_backup_keys::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -316,7 +316,7 @@ pub async fn delete_backup_keys_route(
///
/// Delete the keys from the backup for a given room.
pub async fn delete_backup_keys_for_room_route(
- body: Ruma<delete_backup_keys_for_room::v3::IncomingRequest>,
+ body: Ruma<delete_backup_keys_for_room::v3::Request>,
) -> Result<delete_backup_keys_for_room::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -339,7 +339,7 @@ pub async fn delete_backup_keys_for_room_route(
///
/// Delete a key from the backup.
pub async fn delete_backup_keys_for_session_route(
- body: Ruma<delete_backup_keys_for_session::v3::IncomingRequest>,
+ body: Ruma<delete_backup_keys_for_session::v3::Request>,
) -> Result<delete_backup_keys_for_session::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/capabilities.rs b/src/api/client_server/capabilities.rs
index 31d42d2..233e3c9 100644
--- a/src/api/client_server/capabilities.rs
+++ b/src/api/client_server/capabilities.rs
@@ -8,7 +8,7 @@ use std::collections::BTreeMap;
///
/// Get information on the supported feature set and other relevent capabilities of this server.
pub async fn get_capabilities_route(
- _body: Ruma<get_capabilities::v3::IncomingRequest>,
+ _body: Ruma<get_capabilities::v3::Request>,
) -> Result<get_capabilities::v3::Response> {
let mut available = BTreeMap::new();
for room_version in &services().globals.unstable_room_versions {
diff --git a/src/api/client_server/config.rs b/src/api/client_server/config.rs
index dbd2b2c..12f9aea 100644
--- a/src/api/client_server/config.rs
+++ b/src/api/client_server/config.rs
@@ -17,7 +17,7 @@ use serde_json::{json, value::RawValue as RawJsonValue};
///
/// Sets some account data for the sender user.
pub async fn set_global_account_data_route(
- body: Ruma<set_global_account_data::v3::IncomingRequest>,
+ body: Ruma<set_global_account_data::v3::Request>,
) -> Result<set_global_account_data::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -43,7 +43,7 @@ pub async fn set_global_account_data_route(
///
/// Sets some room account data for the sender user.
pub async fn set_room_account_data_route(
- body: Ruma<set_room_account_data::v3::IncomingRequest>,
+ body: Ruma<set_room_account_data::v3::Request>,
) -> Result<set_room_account_data::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -69,7 +69,7 @@ pub async fn set_room_account_data_route(
///
/// Gets some account data for the sender user.
pub async fn get_global_account_data_route(
- body: Ruma<get_global_account_data::v3::IncomingRequest>,
+ body: Ruma<get_global_account_data::v3::Request>,
) -> Result<get_global_account_data::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -89,7 +89,7 @@ pub async fn get_global_account_data_route(
///
/// Gets some room account data for the sender user.
pub async fn get_room_account_data_route(
- body: Ruma<get_room_account_data::v3::IncomingRequest>,
+ body: Ruma<get_room_account_data::v3::Request>,
) -> Result<get_room_account_data::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/context.rs b/src/api/client_server/context.rs
index 2e0f257..1e62f91 100644
--- a/src/api/client_server/context.rs
+++ b/src/api/client_server/context.rs
@@ -13,7 +13,7 @@ use tracing::error;
/// - Only works if the user is joined (TODO: always allow, but only show events if the user was
/// joined, depending on history_visibility)
pub async fn get_context_route(
- body: Ruma<get_context::v3::IncomingRequest>,
+ body: Ruma<get_context::v3::Request>,
) -> Result<get_context::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/device.rs b/src/api/client_server/device.rs
index d4c4178..aba061b 100644
--- a/src/api/client_server/device.rs
+++ b/src/api/client_server/device.rs
@@ -28,7 +28,7 @@ pub async fn get_devices_route(
///
/// Get metadata on a single device of the sender user.
pub async fn get_device_route(
- body: Ruma<get_device::v3::IncomingRequest>,
+ body: Ruma<get_device::v3::Request>,
) -> Result<get_device::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -44,7 +44,7 @@ pub async fn get_device_route(
///
/// Updates the metadata on a given device of the sender user.
pub async fn update_device_route(
- body: Ruma<update_device::v3::IncomingRequest>,
+ body: Ruma<update_device::v3::Request>,
) -> Result<update_device::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -72,7 +72,7 @@ pub async fn update_device_route(
/// - Forgets to-device events
/// - Triggers device list updates
pub async fn delete_device_route(
- body: Ruma<delete_device::v3::IncomingRequest>,
+ body: Ruma<delete_device::v3::Request>,
) -> Result<delete_device::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
@@ -126,7 +126,7 @@ pub async fn delete_device_route(
/// - Forgets to-device events
/// - Triggers device list updates
pub async fn delete_devices_route(
- body: Ruma<delete_devices::v3::IncomingRequest>,
+ body: Ruma<delete_devices::v3::Request>,
) -> Result<delete_devices::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/directory.rs b/src/api/client_server/directory.rs
index a7381d8..e132210 100644
--- a/src/api/client_server/directory.rs
+++ b/src/api/client_server/directory.rs
@@ -11,10 +11,7 @@ use ruma::{
},
federation,
},
- directory::{
- Filter, IncomingFilter, IncomingRoomNetwork, PublicRoomJoinRule, PublicRoomsChunk,
- RoomNetwork,
- },
+ directory::{Filter, PublicRoomJoinRule, PublicRoomsChunk, RoomNetwork},
events::{
room::{
avatar::RoomAvatarEventContent,
@@ -38,7 +35,7 @@ use tracing::{error, info, warn};
///
/// - Rooms are ordered by the number of joined members
pub async fn get_public_rooms_filtered_route(
- body: Ruma<get_public_rooms_filtered::v3::IncomingRequest>,
+ body: Ruma<get_public_rooms_filtered::v3::Request>,
) -> Result<get_public_rooms_filtered::v3::Response> {
get_public_rooms_filtered_helper(
body.server.as_deref(),
@@ -56,14 +53,14 @@ pub async fn get_public_rooms_filtered_route(
///
/// - Rooms are ordered by the number of joined members
pub async fn get_public_rooms_route(
- body: Ruma<get_public_rooms::v3::IncomingRequest>,
+ body: Ruma<get_public_rooms::v3::Request>,
) -> Result<get_public_rooms::v3::Response> {
let response = get_public_rooms_filtered_helper(
body.server.as_deref(),
body.limit,
body.since.as_deref(),
- &IncomingFilter::default(),
- &IncomingRoomNetwork::Matrix,
+ &Filter::default(),
+ &RoomNetwork::Matrix,
)
.await?;
@@ -81,16 +78,13 @@ pub async fn get_public_rooms_route(
///
/// - TODO: Access control checks
pub async fn set_room_visibility_route(
- body: Ruma<set_room_visibility::v3::IncomingRequest>,
+ body: Ruma<set_room_visibility::v3::Request>,
) -> Result<set_room_visibility::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if !services().rooms.metadata.exists(&body.room_id)? {
// Return 404 if the room doesn't exist
- return Err(Error::BadRequest(
- ErrorKind::NotFound,
- "Room not found",
- ));
+ return Err(Error::BadRequest(ErrorKind::NotFound, "Room not found"));
}
match &body.visibility {
@@ -114,15 +108,11 @@ pub async fn set_room_visibility_route(
///
/// Gets the visibility of a given room in the room directory.
pub async fn get_room_visibility_route(
- body: Ruma<get_room_visibility::v3::IncomingRequest>,
+ body: Ruma<get_room_visibility::v3::Request>,
) -> Result<get_room_visibility::v3::Response> {
-
if !services().rooms.metadata.exists(&body.room_id)? {
// Return 404 if the room doesn't exist
- return Err(Error::BadRequest(
- ErrorKind::NotFound,
- "Room not found",
- ));
+ return Err(Error::BadRequest(ErrorKind::NotFound, "Room not found"));
}
Ok(get_room_visibility::v3::Response {
@@ -138,8 +128,8 @@ pub(crate) async fn get_public_rooms_filtered_helper(
server: Option<&ServerName>,
limit: Option<UInt>,
since: Option<&str>,
- filter: &IncomingFilter,
- _network: &IncomingRoomNetwork,
+ filter: &Filter,
+ _network: &RoomNetwork,
) -> Result<get_public_rooms_filtered::v3::Response> {
if let Some(other_server) =
server.filter(|server| *server != services().globals.server_name().as_str())
@@ -150,9 +140,9 @@ pub(crate) async fn get_public_rooms_filtered_helper(
other_server,
federation::directory::get_public_rooms_filtered::v1::Request {
limit,
- since,
+ since: since.map(ToOwned::to_owned),
filter: Filter {
- generic_search_term: filter.generic_search_term.as_deref(),
+ generic_search_term: filter.generic_search_term.clone(),
room_types: filter.room_types.clone(),
},
room_network: RoomNetwork::Matrix,
@@ -371,7 +361,7 @@ pub(crate) async fn get_public_rooms_filtered_helper(
let prev_batch = if num_since == 0 {
None
} else {
- Some(format!("p{}", num_since))
+ Some(format!("p{num_since}"))
};
let next_batch = if chunk.len() < limit as usize {
diff --git a/src/api/client_server/filter.rs b/src/api/client_server/filter.rs
index a0d5a19..e9a359d 100644
--- a/src/api/client_server/filter.rs
+++ b/src/api/client_server/filter.rs
@@ -10,7 +10,7 @@ use ruma::api::client::{
///
/// - A user can only access their own filters
pub async fn get_filter_route(
- body: Ruma<get_filter::v3::IncomingRequest>,
+ body: Ruma<get_filter::v3::Request>,
) -> Result<get_filter::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let filter = match services().users.get_filter(sender_user, &body.filter_id)? {
@@ -25,7 +25,7 @@ pub async fn get_filter_route(
///
/// Creates a new filter to be used by other endpoints.
pub async fn create_filter_route(
- body: Ruma<create_filter::v3::IncomingRequest>,
+ body: Ruma<create_filter::v3::Request>,
) -> Result<create_filter::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
Ok(create_filter::v3::Response::new(
diff --git a/src/api/client_server/keys.rs b/src/api/client_server/keys.rs
index b649166..ba89ece 100644
--- a/src/api/client_server/keys.rs
+++ b/src/api/client_server/keys.rs
@@ -65,9 +65,7 @@ pub async fn upload_keys_route(
/// - Always fetches users from other servers over federation
/// - Gets master keys, self-signing keys, user signing keys and device keys.
/// - The master and self-signing keys contain signatures that the user is allowed to see
-pub async fn get_keys_route(
- body: Ruma<get_keys::v3::IncomingRequest>,
-) -> Result<get_keys::v3::Response> {
+pub async fn get_keys_route(body: Ruma<get_keys::v3::Request>) -> Result<get_keys::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let response =
@@ -93,7 +91,7 @@ pub async fn claim_keys_route(
///
/// - Requires UIAA to verify password
pub async fn upload_signing_keys_route(
- body: Ruma<upload_signing_keys::v3::IncomingRequest>,
+ body: Ruma<upload_signing_keys::v3::Request>,
) -> Result<upload_signing_keys::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
@@ -214,7 +212,7 @@ pub async fn upload_signatures_route(
///
/// - TODO: left users
pub async fn get_key_changes_route(
- body: Ruma<get_key_changes::v3::IncomingRequest>,
+ body: Ruma<get_key_changes::v3::Request>,
) -> Result<get_key_changes::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/media.rs b/src/api/client_server/media.rs
index ae023c9..3410cc0 100644
--- a/src/api/client_server/media.rs
+++ b/src/api/client_server/media.rs
@@ -27,7 +27,7 @@ pub async fn get_media_config_route(
/// - Some metadata will be saved in the database
/// - Media will be saved in the media/ directory
pub async fn create_content_route(
- body: Ruma<create_content::v3::IncomingRequest>,
+ body: Ruma<create_content::v3::Request>,
) -> Result<create_content::v3::Response> {
let mxc = format!(
"mxc://{}/{}",
@@ -57,7 +57,7 @@ pub async fn create_content_route(
pub async fn get_remote_content(
mxc: &str,
server_name: &ruma::ServerName,
- media_id: &str,
+ media_id: String,
) -> Result<get_content::v3::Response, Error> {
let content_response = services()
.sending
@@ -65,7 +65,7 @@ pub async fn get_remote_content(
server_name,
get_content::v3::Request {
allow_remote: false,
- server_name,
+ server_name: server_name.to_owned(),
media_id,
},
)
@@ -74,7 +74,7 @@ pub async fn get_remote_content(
services()
.media
.create(
- mxc.to_string(),
+ mxc.to_owned(),
content_response.content_disposition.as_deref(),
content_response.content_type.as_deref(),
&content_response.file,
@@ -90,7 +90,7 @@ pub async fn get_remote_content(
///
/// - Only allows federation if `allow_remote` is true
pub async fn get_content_route(
- body: Ruma<get_content::v3::IncomingRequest>,
+ body: Ruma<get_content::v3::Request>,
) -> Result<get_content::v3::Response> {
let mxc = format!("mxc://{}/{}", body.server_name, body.media_id);
@@ -108,7 +108,7 @@ pub async fn get_content_route(
})
} else if &*body.server_name != services().globals.server_name() && body.allow_remote {
let remote_content_response =
- get_remote_content(&mxc, &body.server_name, &body.media_id).await?;
+ get_remote_content(&mxc, &body.server_name, body.media_id.clone()).await?;
Ok(remote_content_response)
} else {
Err(Error::BadRequest(ErrorKind::NotFound, "Media not found."))
@@ -121,7 +121,7 @@ pub async fn get_content_route(
///
/// - Only allows federation if `allow_remote` is true
pub async fn get_content_as_filename_route(
- body: Ruma<get_content_as_filename::v3::IncomingRequest>,
+ body: Ruma<get_content_as_filename::v3::Request>,
) -> Result<get_content_as_filename::v3::Response> {
let mxc = format!("mxc://{}/{}", body.server_name, body.media_id);
@@ -139,7 +139,7 @@ pub async fn get_content_as_filename_route(
})
} else if &*body.server_name != services().globals.server_name() && body.allow_remote {
let remote_content_response =
- get_remote_content(&mxc, &body.server_name, &body.media_id).await?;
+ get_remote_content(&mxc, &body.server_name, body.media_id.clone()).await?;
Ok(get_content_as_filename::v3::Response {
content_disposition: Some(format!("inline: filename={}", body.filename)),
@@ -158,7 +158,7 @@ pub async fn get_content_as_filename_route(
///
/// - Only allows federation if `allow_remote` is true
pub async fn get_content_thumbnail_route(
- body: Ruma<get_content_thumbnail::v3::IncomingRequest>,
+ body: Ruma<get_content_thumbnail::v3::Request>,
) -> Result<get_content_thumbnail::v3::Response> {
let mxc = format!("mxc://{}/{}", body.server_name, body.media_id);
@@ -192,8 +192,8 @@ pub async fn get_content_thumbnail_route(
height: body.height,
width: body.width,
method: body.method.clone(),
- server_name: &body.server_name,
- media_id: &body.media_id,
+ server_name: body.server_name.clone(),
+ media_id: body.media_id.clone(),
},
)
.await?;
diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs
index 7142b8e..61c67cb 100644
--- a/src/api/client_server/membership.rs
+++ b/src/api/client_server/membership.rs
@@ -5,19 +5,23 @@ use ruma::{
membership::{
ban_user, forget_room, get_member_events, invite_user, join_room_by_id,
join_room_by_id_or_alias, joined_members, joined_rooms, kick_user, leave_room,
- unban_user, IncomingThirdPartySigned,
+ unban_user, ThirdPartySigned,
},
},
federation::{self, membership::create_invite},
},
canonical_json::to_canonical_value,
events::{
- room::member::{MembershipState, RoomMemberEventContent},
+ room::{
+ join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent},
+ member::{MembershipState, RoomMemberEventContent},
+ power_levels::RoomPowerLevelsEventContent,
+ },
RoomEventType, StateEventType,
},
serde::Base64,
- CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName,
- OwnedUserId, RoomId, RoomVersionId, UserId,
+ state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
+ OwnedServerName, OwnedUserId, RoomId, RoomVersionId, UserId,
};
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use std::{
@@ -41,7 +45,7 @@ use super::get_alias_helper;
/// - If the server knowns about this room: creates the join event and does auth rules locally
/// - If the server does not know about the room: asks other servers over federation
pub async fn join_room_by_id_route(
- body: Ruma<join_room_by_id::v3::IncomingRequest>,
+ body: Ruma<join_room_by_id::v3::Request>,
) -> Result<join_room_by_id::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -65,6 +69,7 @@ pub async fn join_room_by_id_route(
join_room_by_id_helper(
body.sender_user.as_deref(),
&body.room_id,
+ body.reason.clone(),
&servers,
body.third_party_signed.as_ref(),
)
@@ -78,7 +83,7 @@ pub async fn join_room_by_id_route(
/// - If the server knowns about this room: creates the join event and does auth rules locally
/// - If the server does not know about the room: asks other servers over federation
pub async fn join_room_by_id_or_alias_route(
- body: Ruma<join_room_by_id_or_alias::v3::IncomingRequest>,
+ body: Ruma<join_room_by_id_or_alias::v3::Request>,
) -> Result<join_room_by_id_or_alias::v3::Response> {
let sender_user = body.sender_user.as_deref().expect("user is authenticated");
let body = body.body;
@@ -104,7 +109,7 @@ pub async fn join_room_by_id_or_alias_route(
(servers, room_id)
}
Err(room_alias) => {
- let response = get_alias_helper(&room_alias).await?;
+ let response = get_alias_helper(room_alias).await?;
(response.servers.into_iter().collect(), response.room_id)
}
@@ -113,6 +118,7 @@ pub async fn join_room_by_id_or_alias_route(
let join_room_response = join_room_by_id_helper(
Some(sender_user),
&room_id,
+ body.reason.clone(),
&servers,
body.third_party_signed.as_ref(),
)
@@ -129,11 +135,11 @@ pub async fn join_room_by_id_or_alias_route(
///
/// - This should always work if the user is currently joined.
pub async fn leave_room_route(
- body: Ruma<leave_room::v3::IncomingRequest>,
+ body: Ruma<leave_room::v3::Request>,
) -> Result<leave_room::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- leave_room(sender_user, &body.room_id).await?;
+ leave_room(sender_user, &body.room_id, body.reason.clone()).await?;
Ok(leave_room::v3::Response::new())
}
@@ -142,12 +148,19 @@ pub async fn leave_room_route(
///
/// Tries to send an invite event into the room.
pub async fn invite_user_route(
- body: Ruma<invite_user::v3::IncomingRequest>,
+ body: Ruma<invite_user::v3::Request>,
) -> Result<invite_user::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- if let invite_user::v3::IncomingInvitationRecipient::UserId { user_id } = &body.recipient {
- invite_helper(sender_user, user_id, &body.room_id, false).await?;
+ if let invite_user::v3::InvitationRecipient::UserId { user_id } = &body.recipient {
+ invite_helper(
+ sender_user,
+ user_id,
+ &body.room_id,
+ body.reason.clone(),
+ false,
+ )
+ .await?;
Ok(invite_user::v3::Response {})
} else {
Err(Error::BadRequest(ErrorKind::NotFound, "User not found."))
@@ -158,7 +171,7 @@ pub async fn invite_user_route(
///
/// Tries to send a kick event into the room.
pub async fn kick_user_route(
- body: Ruma<kick_user::v3::IncomingRequest>,
+ body: Ruma<kick_user::v3::Request>,
) -> Result<kick_user::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -181,7 +194,7 @@ pub async fn kick_user_route(
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = MembershipState::Leave;
- // TODO: reason
+ event.reason = body.reason.clone();
let mutex_state = Arc::clone(
services()
@@ -215,13 +228,9 @@ pub async fn kick_user_route(
/// # `POST /_matrix/client/r0/rooms/{roomId}/ban`
///
/// Tries to send a ban event into the room.
-pub async fn ban_user_route(
- body: Ruma<ban_user::v3::IncomingRequest>,
-) -> Result<ban_user::v3::Response> {
+pub async fn ban_user_route(body: Ruma<ban_user::v3::Request>) -> Result<ban_user::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- // TODO: reason
-
let event = services()
.rooms
.state_accessor
@@ -238,7 +247,7 @@ pub async fn ban_user_route(
is_direct: None,
third_party_invite: None,
blurhash: services().users.blurhash(&body.user_id)?,
- reason: None,
+ reason: body.reason.clone(),
join_authorized_via_users_server: None,
}),
|event| {
@@ -284,7 +293,7 @@ pub async fn ban_user_route(
///
/// Tries to send an unban event into the room.
pub async fn unban_user_route(
- body: Ruma<unban_user::v3::IncomingRequest>,
+ body: Ruma<unban_user::v3::Request>,
) -> Result<unban_user::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -307,6 +316,7 @@ pub async fn unban_user_route(
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = MembershipState::Leave;
+ event.reason = body.reason.clone();
let mutex_state = Arc::clone(
services()
@@ -346,7 +356,7 @@ pub async fn unban_user_route(
/// Note: Other devices of the user have no way of knowing the room was forgotten, so this has to
/// be called from every device
pub async fn forget_room_route(
- body: Ruma<forget_room::v3::IncomingRequest>,
+ body: Ruma<forget_room::v3::Request>,
) -> Result<forget_room::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -382,7 +392,7 @@ pub async fn joined_rooms_route(
///
/// - Only works if the user is currently joined
pub async fn get_member_events_route(
- body: Ruma<get_member_events::v3::IncomingRequest>,
+ body: Ruma<get_member_events::v3::Request>,
) -> Result<get_member_events::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -418,7 +428,7 @@ pub async fn get_member_events_route(
/// - The sender user must be in the room
/// - TODO: An appservice just needs a puppet joined
pub async fn joined_members_route(
- body: Ruma<joined_members::v3::IncomingRequest>,
+ body: Ruma<joined_members::v3::Request>,
) -> Result<joined_members::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -458,8 +468,9 @@ pub async fn joined_members_route(
async fn join_room_by_id_helper(
sender_user: Option<&UserId>,
room_id: &RoomId,
+ reason: Option<String>,
servers: &[OwnedServerName],
- _third_party_signed: Option<&IncomingThirdPartySigned>,
+ _third_party_signed: Option<&ThirdPartySigned>,
) -> Result<join_room_by_id::v3::Response> {
let sender_user = sender_user.expect("user is authenticated");
@@ -480,33 +491,10 @@ async fn join_room_by_id_helper(
.state_cache
.server_in_room(services().globals.server_name(), room_id)?
{
- let mut make_join_response_and_server = Err(Error::BadServerResponse(
- "No server available to assist in joining.",
- ));
-
- for remote_server in servers {
- let make_join_response = services()
- .sending
- .send_federation_request(
- remote_server,
- federation::membership::prepare_join_event::v1::Request {
- room_id,
- user_id: sender_user,
- ver: &services().globals.supported_room_versions(),
- },
- )
- .await;
-
- make_join_response_and_server = make_join_response.map(|r| (r, remote_server));
-
- if make_join_response_and_server.is_ok() {
- break;
- }
- }
+ let (make_join_response, remote_server) =
+ make_join_request(sender_user, room_id, servers).await?;
- let (make_join_response, remote_server) = make_join_response_and_server?;
-
- let room_version = match make_join_response.room_version {
+ let room_version_id = match make_join_response.room_version {
Some(room_version)
if services()
.globals
@@ -554,7 +542,7 @@ async fn join_room_by_id_helper(
is_direct: None,
third_party_invite: None,
blurhash: services().users.blurhash(sender_user)?,
- reason: None,
+ reason,
join_authorized_via_users_server,
})
.expect("event is valid, we just created it"),
@@ -568,14 +556,14 @@ async fn join_room_by_id_helper(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut join_event_stub,
- &room_version,
+ &room_version_id,
)
.expect("event is valid, we just created it");
// Generate event id
let event_id = format!(
"${}",
- ruma::signatures::reference_hash(&join_event_stub, &room_version)
+ ruma::signatures::reference_hash(&join_event_stub, &room_version_id)
.expect("ruma can calculate reference hashes")
);
let event_id = <&EventId>::try_from(event_id.as_str())
@@ -588,23 +576,67 @@ async fn join_room_by_id_helper(
);
// It has enough fields to be called a proper event now
- let join_event = join_event_stub;
+ let mut join_event = join_event_stub;
let send_join_response = services()
.sending
.send_federation_request(
- remote_server,
+ &remote_server,
federation::membership::create_join_event::v2::Request {
- room_id,
- event_id,
- pdu: &PduEvent::convert_to_outgoing_federation_event(join_event.clone()),
+ room_id: room_id.to_owned(),
+ event_id: event_id.to_owned(),
+ pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()),
},
)
.await?;
+ if let Some(signed_raw) = &send_join_response.room_state.event {
+ let (signed_event_id, signed_value) =
+ match gen_event_id_canonical_json(signed_raw, &room_version_id) {
+ Ok(t) => t,
+ Err(_) => {
+ // Event could not be converted to canonical json
+ return Err(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Could not convert event to canonical json.",
+ ));
+ }
+ };
+
+ if signed_event_id != event_id {
+ return Err(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Server sent event with wrong event id",
+ ));
+ }
+
+ if let Ok(signature) = signed_value["signatures"]
+ .as_object()
+ .ok_or(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Server sent invalid signatures type",
+ ))
+ .and_then(|e| {
+ e.get(remote_server.as_str()).ok_or(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Server did not send its signature",
+ ))
+ })
+ {
+ join_event
+ .get_mut("signatures")
+ .expect("we created a valid pdu")
+ .as_object_mut()
+ .expect("we created a valid pdu")
+ .insert(remote_server.to_string(), signature.clone());
+ } else {
+ warn!("Server {} sent invalid sendjoin event", remote_server);
+ }
+ }
+
services().rooms.short.get_or_create_shortroomid(room_id)?;
- let parsed_pdu = PduEvent::from_id_val(event_id, join_event.clone())
+ let parsed_join_pdu = PduEvent::from_id_val(event_id, join_event.clone())
.map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?;
let mut state = HashMap::new();
@@ -613,14 +645,14 @@ async fn join_room_by_id_helper(
services()
.rooms
.event_handler
- .fetch_join_signing_keys(&send_join_response, &room_version, &pub_key_map)
+ .fetch_join_signing_keys(&send_join_response, &room_version_id, &pub_key_map)
.await?;
for result in send_join_response
.room_state
.state
.iter()
- .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map))
+ .map(|pdu| validate_and_add_event_id(pdu, &room_version_id, &pub_key_map))
{
let (event_id, value) = match result {
Ok(t) => t,
@@ -645,31 +677,11 @@ async fn join_room_by_id_helper(
}
}
- let incoming_shortstatekey = services().rooms.short.get_or_create_shortstatekey(
- &parsed_pdu.kind.to_string().into(),
- parsed_pdu
- .state_key
- .as_ref()
- .expect("Pdu is a membership state event"),
- )?;
-
- state.insert(incoming_shortstatekey, parsed_pdu.event_id.clone());
-
- let create_shortstatekey = services()
- .rooms
- .short
- .get_shortstatekey(&StateEventType::RoomCreate, "")?
- .expect("Room exists");
-
- if state.get(&create_shortstatekey).is_none() {
- return Err(Error::BadServerResponse("State contained no create event."));
- }
-
for result in send_join_response
.room_state
.auth_chain
.iter()
- .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map))
+ .map(|pdu| validate_and_add_event_id(pdu, &room_version_id, &pub_key_map))
{
let (event_id, value) = match result {
Ok(t) => t,
@@ -682,6 +694,34 @@ async fn join_room_by_id_helper(
.add_pdu_outlier(&event_id, &value)?;
}
+ if !state_res::event_auth::auth_check(
+ &state_res::RoomVersion::new(&room_version_id).expect("room version is supported"),
+ &parsed_join_pdu,
+ None::<PduEvent>, // TODO: third party invite
+ |k, s| {
+ services()
+ .rooms
+ .timeline
+ .get_pdu(
+ state.get(
+ &services()
+ .rooms
+ .short
+ .get_or_create_shortstatekey(&k.to_string().into(), s)
+ .ok()?,
+ )?,
+ )
+ .ok()?
+ },
+ )
+ .map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed"))?
+ {
+ return Err(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Auth check failed",
+ ));
+ }
+
let (statehash_before_join, new, removed) = services().rooms.state_compressor.save_state(
room_id,
state
@@ -705,12 +745,12 @@ async fn join_room_by_id_helper(
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
- let statehash_after_join = services().rooms.state.append_to_state(&parsed_pdu)?;
+ let statehash_after_join = services().rooms.state.append_to_state(&parsed_join_pdu)?;
services().rooms.timeline.append_pdu(
- &parsed_pdu,
+ &parsed_join_pdu,
join_event,
- vec![(*parsed_pdu.event_id).to_owned()],
+ vec![(*parsed_join_pdu.event_id).to_owned()],
&state_lock,
)?;
@@ -721,6 +761,95 @@ async fn join_room_by_id_helper(
.state
.set_room_state(room_id, statehash_after_join, &state_lock)?;
} else {
+ let join_rules_event = services().rooms.state_accessor.room_state_get(
+ room_id,
+ &StateEventType::RoomJoinRules,
+ "",
+ )?;
+ let power_levels_event = services().rooms.state_accessor.room_state_get(
+ room_id,
+ &StateEventType::RoomPowerLevels,
+ "",
+ )?;
+
+ let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
+ .as_ref()
+ .map(|join_rules_event| {
+ serde_json::from_str(join_rules_event.content.get()).map_err(|e| {
+ warn!("Invalid join rules event: {}", e);
+ Error::bad_database("Invalid join rules event in db.")
+ })
+ })
+ .transpose()?;
+ let power_levels_event_content: Option<RoomPowerLevelsEventContent> = power_levels_event
+ .as_ref()
+ .map(|power_levels_event| {
+ serde_json::from_str(power_levels_event.content.get()).map_err(|e| {
+ warn!("Invalid power levels event: {}", e);
+ Error::bad_database("Invalid power levels event in db.")
+ })
+ })
+ .transpose()?;
+
+ let restriction_rooms = match join_rules_event_content {
+ Some(RoomJoinRulesEventContent {
+ join_rule: JoinRule::Restricted(restricted),
+ })
+ | Some(RoomJoinRulesEventContent {
+ join_rule: JoinRule::KnockRestricted(restricted),
+ }) => restricted
+ .allow
+ .into_iter()
+ .filter_map(|a| match a {
+ AllowRule::RoomMembership(r) => Some(r.room_id),
+ _ => None,
+ })
+ .collect(),
+ _ => Vec::new(),
+ };
+
+ let authorized_user = restriction_rooms
+ .iter()
+ .find_map(|restriction_room_id| {
+ if !services()
+ .rooms
+ .state_cache
+ .is_joined(sender_user, restriction_room_id)
+ .ok()?
+ {
+ return None;
+ }
+ let authorized_user = power_levels_event_content
+ .as_ref()
+ .and_then(|c| {
+ c.users
+ .iter()
+ .filter(|(uid, i)| {
+ uid.server_name() == services().globals.server_name()
+ && **i > ruma::int!(0)
+ && services()
+ .rooms
+ .state_cache
+ .is_joined(uid, restriction_room_id)
+ .unwrap_or(false)
+ })
+ .max_by_key(|(_, i)| *i)
+ .map(|(u, _)| u.to_owned())
+ })
+ .or_else(|| {
+ // TODO: Check here if user is actually allowed to invite. Currently the auth
+ // check will just fail in this case.
+ services()
+ .rooms
+ .state_cache
+ .room_members(restriction_room_id)
+ .filter_map(|r| r.ok())
+ .find(|uid| uid.server_name() == services().globals.server_name())
+ });
+ Some(authorized_user)
+ })
+ .flatten();
+
let event = RoomMemberEventContent {
membership: MembershipState::Join,
displayname: services().users.displayname(sender_user)?,
@@ -728,11 +857,12 @@ async fn join_room_by_id_helper(
is_direct: None,
third_party_invite: None,
blurhash: services().users.blurhash(sender_user)?,
- reason: None,
- join_authorized_via_users_server: None,
+ reason: reason.clone(),
+ join_authorized_via_users_server: authorized_user,
};
- services().rooms.timeline.build_and_append_pdu(
+ // Try normal join first
+ let error = match services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomMember,
content: to_raw_value(&event).expect("event is valid, we just created it"),
@@ -743,14 +873,193 @@ async fn join_room_by_id_helper(
sender_user,
room_id,
&state_lock,
- )?;
- }
+ ) {
+ Ok(_event_id) => return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())),
+ Err(e) => e,
+ };
- drop(state_lock);
+ if !restriction_rooms.is_empty() {
+ // We couldn't do the join locally, maybe federation can help to satisfy the restricted
+ // join requirements
+ let (make_join_response, remote_server) =
+ make_join_request(sender_user, room_id, servers).await?;
+
+ let room_version_id = match make_join_response.room_version {
+ Some(room_version_id)
+ if services()
+ .globals
+ .supported_room_versions()
+ .contains(&room_version_id) =>
+ {
+ room_version_id
+ }
+ _ => return Err(Error::BadServerResponse("Room version is not supported")),
+ };
+ let mut join_event_stub: CanonicalJsonObject =
+ serde_json::from_str(make_join_response.event.get()).map_err(|_| {
+ Error::BadServerResponse("Invalid make_join event json received from server.")
+ })?;
+ let join_authorized_via_users_server = join_event_stub
+ .get("content")
+ .map(|s| {
+ s.as_object()?
+ .get("join_authorised_via_users_server")?
+ .as_str()
+ })
+ .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
+ // TODO: Is origin needed?
+ join_event_stub.insert(
+ "origin".to_owned(),
+ CanonicalJsonValue::String(services().globals.server_name().as_str().to_owned()),
+ );
+ join_event_stub.insert(
+ "origin_server_ts".to_owned(),
+ CanonicalJsonValue::Integer(
+ utils::millis_since_unix_epoch()
+ .try_into()
+ .expect("Timestamp is valid js_int value"),
+ ),
+ );
+ join_event_stub.insert(
+ "content".to_owned(),
+ to_canonical_value(RoomMemberEventContent {
+ membership: MembershipState::Join,
+ displayname: services().users.displayname(sender_user)?,
+ avatar_url: services().users.avatar_url(sender_user)?,
+ is_direct: None,
+ third_party_invite: None,
+ blurhash: services().users.blurhash(sender_user)?,
+ reason,
+ join_authorized_via_users_server,
+ })
+ .expect("event is valid, we just created it"),
+ );
+
+ // We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms
+ join_event_stub.remove("event_id");
+
+ // In order to create a compatible ref hash (EventID) the `hashes` field needs to be present
+ ruma::signatures::hash_and_sign_event(
+ services().globals.server_name().as_str(),
+ services().globals.keypair(),
+ &mut join_event_stub,
+ &room_version_id,
+ )
+ .expect("event is valid, we just created it");
+
+ // Generate event id
+ let event_id = format!(
+ "${}",
+ ruma::signatures::reference_hash(&join_event_stub, &room_version_id)
+ .expect("ruma can calculate reference hashes")
+ );
+ let event_id = <&EventId>::try_from(event_id.as_str())
+ .expect("ruma's reference hashes are valid event ids");
+
+ // Add event_id back
+ join_event_stub.insert(
+ "event_id".to_owned(),
+ CanonicalJsonValue::String(event_id.as_str().to_owned()),
+ );
+
+ // It has enough fields to be called a proper event now
+ let join_event = join_event_stub;
+
+ let send_join_response = services()
+ .sending
+ .send_federation_request(
+ &remote_server,
+ federation::membership::create_join_event::v2::Request {
+ room_id: room_id.to_owned(),
+ event_id: event_id.to_owned(),
+ pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()),
+ },
+ )
+ .await?;
+
+ if let Some(signed_raw) = send_join_response.room_state.event {
+ let (signed_event_id, signed_value) =
+ match gen_event_id_canonical_json(&signed_raw, &room_version_id) {
+ Ok(t) => t,
+ Err(_) => {
+ // Event could not be converted to canonical json
+ return Err(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Could not convert event to canonical json.",
+ ));
+ }
+ };
+
+ if signed_event_id != event_id {
+ return Err(Error::BadRequest(
+ ErrorKind::InvalidParam,
+ "Server sent event with wrong event id",
+ ));
+ }
+
+ drop(state_lock);
+ let pub_key_map = RwLock::new(BTreeMap::new());
+ services()
+ .rooms
+ .event_handler
+ .handle_incoming_pdu(
+ &remote_server,
+ &signed_event_id,
+ room_id,
+ signed_value,
+ true,
+ &pub_key_map,
+ )
+ .await?;
+ } else {
+ return Err(error);
+ }
+ } else {
+ return Err(error);
+ }
+ }
Ok(join_room_by_id::v3::Response::new(room_id.to_owned()))
}
+async fn make_join_request(
+ sender_user: &UserId,
+ room_id: &RoomId,
+ servers: &[OwnedServerName],
+) -> Result<(
+ federation::membership::prepare_join_event::v1::Response,
+ OwnedServerName,
+)> {
+ let mut make_join_response_and_server = Err(Error::BadServerResponse(
+ "No server available to assist in joining.",
+ ));
+
+ for remote_server in servers {
+ if remote_server == services().globals.server_name() {
+ continue;
+ }
+ let make_join_response = services()
+ .sending
+ .send_federation_request(
+ remote_server,
+ federation::membership::prepare_join_event::v1::Request {
+ room_id: room_id.to_owned(),
+ user_id: sender_user.to_owned(),
+ ver: services().globals.supported_room_versions(),
+ },
+ )
+ .await;
+
+ make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone()));
+
+ if make_join_response_and_server.is_ok() {
+ break;
+ }
+ }
+
+ make_join_response_and_server
+}
+
fn validate_and_add_event_id(
pdu: &RawJsonValue,
room_version: &RoomVersionId,
@@ -823,10 +1132,11 @@ pub(crate) async fn invite_helper<'a>(
sender_user: &UserId,
user_id: &UserId,
room_id: &RoomId,
+ reason: Option<String>,
is_direct: bool,
) -> Result<()> {
if user_id.server_name() != services().globals.server_name() {
- let (pdu_json, invite_room_state) = {
+ let (pdu, pdu_json, invite_room_state) = {
let mutex_state = Arc::clone(
services()
.globals
@@ -845,7 +1155,7 @@ pub(crate) async fn invite_helper<'a>(
membership: MembershipState::Invite,
third_party_invite: None,
blurhash: None,
- reason: None,
+ reason,
join_authorized_via_users_server: None,
})
.expect("member event is valid value");
@@ -867,31 +1177,21 @@ pub(crate) async fn invite_helper<'a>(
drop(state_lock);
- (pdu_json, invite_room_state)
+ (pdu, pdu_json, invite_room_state)
};
- // Generate event id
- let expected_event_id = format!(
- "${}",
- ruma::signatures::reference_hash(
- &pdu_json,
- &services().rooms.state.get_room_version(room_id)?
- )
- .expect("ruma can calculate reference hashes")
- );
- let expected_event_id = <&EventId>::try_from(expected_event_id.as_str())
- .expect("ruma's reference hashes are valid event ids");
+ let room_version_id = services().rooms.state.get_room_version(room_id)?;
let response = services()
.sending
.send_federation_request(
user_id.server_name(),
create_invite::v2::Request {
- room_id,
- event_id: expected_event_id,
- room_version: &services().rooms.state.get_room_version(room_id)?,
- event: &PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()),
- invite_room_state: &invite_room_state,
+ room_id: room_id.to_owned(),
+ event_id: (*pdu.event_id).to_owned(),
+ room_version: room_version_id.clone(),
+ event: PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()),
+ invite_room_state,
},
)
.await?;
@@ -899,7 +1199,8 @@ pub(crate) async fn invite_helper<'a>(
let pub_key_map = RwLock::new(BTreeMap::new());
// We do not add the event_id field to the pdu here because of signature and hashes checks
- let (event_id, value) = match gen_event_id_canonical_json(&response.event) {
+ let (event_id, value) = match gen_event_id_canonical_json(&response.event, &room_version_id)
+ {
Ok(t) => t,
Err(_) => {
// Event could not be converted to canonical json
@@ -910,7 +1211,7 @@ pub(crate) async fn invite_helper<'a>(
}
};
- if expected_event_id != event_id {
+ if *pdu.event_id != *event_id {
warn!("Server {} changed invite event, that's not allowed in the spec: ours: {:?}, theirs: {:?}", user_id.server_name(), pdu_json, value);
}
@@ -978,7 +1279,7 @@ pub(crate) async fn invite_helper<'a>(
is_direct: Some(is_direct),
third_party_invite: None,
blurhash: services().users.blurhash(user_id)?,
- reason: None,
+ reason,
join_authorized_via_users_server: None,
})
.expect("event is valid, we just created it"),
@@ -1017,13 +1318,13 @@ pub async fn leave_all_rooms(user_id: &UserId) -> Result<()> {
Err(_) => continue,
};
- let _ = leave_room(user_id, &room_id).await;
+ let _ = leave_room(user_id, &room_id, None).await;
}
Ok(())
}
-pub async fn leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> {
+pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option<String>) -> Result<()> {
// Ask a remote server if we don't have this room
if !services().rooms.metadata.exists(room_id)?
&& room_id.server_name() != services().globals.server_name()
@@ -1063,21 +1364,35 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> {
);
let state_lock = mutex_state.lock().await;
- let mut event: RoomMemberEventContent = serde_json::from_str(
- services()
- .rooms
- .state_accessor
- .room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())?
- .ok_or(Error::BadRequest(
- ErrorKind::BadState,
- "Cannot leave a room you are not a member of.",
- ))?
- .content
- .get(),
- )
- .map_err(|_| Error::bad_database("Invalid member event in database."))?;
+ let member_event = services().rooms.state_accessor.room_state_get(
+ room_id,
+ &StateEventType::RoomMember,
+ user_id.as_str(),
+ )?;
+
+ // Fix for broken rooms
+ let member_event = match member_event {
+ None => {
+ error!("Trying to leave a room you are not a member of.");
+
+ services().rooms.state_cache.update_membership(
+ room_id,
+ user_id,
+ MembershipState::Leave,
+ user_id,
+ None,
+ true,
+ )?;
+ return Ok(());
+ }
+ Some(e) => e,
+ };
+
+ let mut event: RoomMemberEventContent = serde_json::from_str(member_event.content.get())
+ .map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = MembershipState::Leave;
+ event.reason = reason;
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
@@ -1124,7 +1439,10 @@ async fn remote_leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> {
.sending
.send_federation_request(
&remote_server,
- federation::membership::prepare_leave_event::v1::Request { room_id, user_id },
+ federation::membership::prepare_leave_event::v1::Request {
+ room_id: room_id.to_owned(),
+ user_id: user_id.to_owned(),
+ },
)
.await;
@@ -1201,9 +1519,9 @@ async fn remote_leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> {
.send_federation_request(
&remote_server,
federation::membership::create_leave_event::v2::Request {
- room_id,
- event_id: &event_id,
- pdu: &PduEvent::convert_to_outgoing_federation_event(leave_event.clone()),
+ room_id: room_id.to_owned(),
+ event_id,
+ pdu: PduEvent::convert_to_outgoing_federation_event(leave_event.clone()),
},
)
.await?;
diff --git a/src/api/client_server/message.rs b/src/api/client_server/message.rs
index b04c262..6ad0751 100644
--- a/src/api/client_server/message.rs
+++ b/src/api/client_server/message.rs
@@ -19,7 +19,7 @@ use std::{
/// - The only requirement for the content is that it has to be valid json
/// - Tries to send the event into the room, auth rules will determine if it is allowed
pub async fn send_message_event_route(
- body: Ruma<send_message_event::v3::IncomingRequest>,
+ body: Ruma<send_message_event::v3::Request>,
) -> Result<send_message_event::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_deref();
@@ -105,7 +105,7 @@ pub async fn send_message_event_route(
/// - Only works if the user is joined (TODO: always allow, but only show events where the user was
/// joined, depending on history_visibility)
pub async fn get_message_events_route(
- body: Ruma<get_message_events::v3::IncomingRequest>,
+ body: Ruma<get_message_events::v3::Request>,
) -> Result<get_message_events::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/mod.rs b/src/api/client_server/mod.rs
index 65b7a10..6ed17e7 100644
--- a/src/api/client_server/mod.rs
+++ b/src/api/client_server/mod.rs
@@ -63,6 +63,6 @@ pub use user_directory::*;
pub use voip::*;
pub const DEVICE_ID_LENGTH: usize = 10;
-pub const TOKEN_LENGTH: usize = 256;
-pub const SESSION_ID_LENGTH: usize = 256;
+pub const TOKEN_LENGTH: usize = 32;
+pub const SESSION_ID_LENGTH: usize = 32;
pub const AUTO_GEN_PASSWORD_LENGTH: usize = 15;
diff --git a/src/api/client_server/presence.rs b/src/api/client_server/presence.rs
index dfac3db..ef88d1a 100644
--- a/src/api/client_server/presence.rs
+++ b/src/api/client_server/presence.rs
@@ -6,7 +6,7 @@ use std::time::Duration;
///
/// Sets the presence state of the sender user.
pub async fn set_presence_route(
- body: Ruma<set_presence::v3::IncomingRequest>,
+ body: Ruma<set_presence::v3::Request>,
) -> Result<set_presence::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -43,7 +43,7 @@ pub async fn set_presence_route(
///
/// - Only works if you share a room with the user
pub async fn get_presence_route(
- body: Ruma<get_presence::v3::IncomingRequest>,
+ body: Ruma<get_presence::v3::Request>,
) -> Result<get_presence::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs
index 5ace177..6400e89 100644
--- a/src/api/client_server/profile.rs
+++ b/src/api/client_server/profile.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
///
/// - Also makes sure other users receive the update using presence EDUs
pub async fn set_displayname_route(
- body: Ruma<set_display_name::v3::IncomingRequest>,
+ body: Ruma<set_display_name::v3::Request>,
) -> Result<set_display_name::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -121,7 +121,7 @@ pub async fn set_displayname_route(
///
/// - If user is on another server: Fetches displayname over federation
pub async fn get_displayname_route(
- body: Ruma<get_display_name::v3::IncomingRequest>,
+ body: Ruma<get_display_name::v3::Request>,
) -> Result<get_display_name::v3::Response> {
if body.user_id.server_name() != services().globals.server_name() {
let response = services()
@@ -129,8 +129,8 @@ pub async fn get_displayname_route(
.send_federation_request(
body.user_id.server_name(),
federation::query::get_profile_information::v1::Request {
- user_id: &body.user_id,
- field: Some(&ProfileField::DisplayName),
+ user_id: body.user_id.clone(),
+ field: Some(ProfileField::DisplayName),
},
)
.await?;
@@ -151,7 +151,7 @@ pub async fn get_displayname_route(
///
/// - Also makes sure other users receive the update using presence EDUs
pub async fn set_avatar_url_route(
- body: Ruma<set_avatar_url::v3::IncomingRequest>,
+ body: Ruma<set_avatar_url::v3::Request>,
) -> Result<set_avatar_url::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -256,7 +256,7 @@ pub async fn set_avatar_url_route(
///
/// - If user is on another server: Fetches avatar_url and blurhash over federation
pub async fn get_avatar_url_route(
- body: Ruma<get_avatar_url::v3::IncomingRequest>,
+ body: Ruma<get_avatar_url::v3::Request>,
) -> Result<get_avatar_url::v3::Response> {
if body.user_id.server_name() != services().globals.server_name() {
let response = services()
@@ -264,8 +264,8 @@ pub async fn get_avatar_url_route(
.send_federation_request(
body.user_id.server_name(),
federation::query::get_profile_information::v1::Request {
- user_id: &body.user_id,
- field: Some(&ProfileField::AvatarUrl),
+ user_id: body.user_id.clone(),
+ field: Some(ProfileField::AvatarUrl),
},
)
.await?;
@@ -288,7 +288,7 @@ pub async fn get_avatar_url_route(
///
/// - If user is on another server: Fetches profile over federation
pub async fn get_profile_route(
- body: Ruma<get_profile::v3::IncomingRequest>,
+ body: Ruma<get_profile::v3::Request>,
) -> Result<get_profile::v3::Response> {
if body.user_id.server_name() != services().globals.server_name() {
let response = services()
@@ -296,7 +296,7 @@ pub async fn get_profile_route(
.send_federation_request(
body.user_id.server_name(),
federation::query::get_profile_information::v1::Request {
- user_id: &body.user_id,
+ user_id: body.user_id.clone(),
field: None,
},
)
diff --git a/src/api/client_server/push.rs b/src/api/client_server/push.rs
index 2301ddc..b044138 100644
--- a/src/api/client_server/push.rs
+++ b/src/api/client_server/push.rs
@@ -5,11 +5,11 @@ use ruma::{
push::{
delete_pushrule, get_pushers, get_pushrule, get_pushrule_actions, get_pushrule_enabled,
get_pushrules_all, set_pusher, set_pushrule, set_pushrule_actions,
- set_pushrule_enabled, RuleKind,
+ set_pushrule_enabled, RuleKind, RuleScope,
},
},
events::{push_rules::PushRulesEvent, GlobalAccountDataEventType},
- push::{ConditionalPushRuleInit, PatternedPushRuleInit, SimplePushRuleInit},
+ push::{ConditionalPushRuleInit, NewPushRule, PatternedPushRuleInit, SimplePushRuleInit},
};
/// # `GET /_matrix/client/r0/pushrules`
@@ -45,7 +45,7 @@ pub async fn get_pushrules_all_route(
///
/// Retrieves a single specified push rule for this user.
pub async fn get_pushrule_route(
- body: Ruma<get_pushrule::v3::IncomingRequest>,
+ body: Ruma<get_pushrule::v3::Request>,
) -> Result<get_pushrule::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -104,12 +104,12 @@ pub async fn get_pushrule_route(
///
/// Creates a single specified push rule for this user.
pub async fn set_pushrule_route(
- body: Ruma<set_pushrule::v3::IncomingRequest>,
+ body: Ruma<set_pushrule::v3::Request>,
) -> Result<set_pushrule::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let body = body.body;
- if body.scope != "global" {
+ if body.scope != RuleScope::Global {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Scopes other than 'global' are not supported.",
@@ -132,66 +132,65 @@ pub async fn set_pushrule_route(
.map_err(|_| Error::bad_database("Invalid account data event in db."))?;
let global = &mut account_data.content.global;
- match body.kind {
- RuleKind::Override => {
+ match body.rule {
+ NewPushRule::Override(rule) => {
global.override_.replace(
ConditionalPushRuleInit {
- actions: body.actions,
+ actions: rule.actions,
default: false,
enabled: true,
- rule_id: body.rule_id,
- conditions: body.conditions,
+ rule_id: rule.rule_id,
+ conditions: rule.conditions,
}
.into(),
);
}
- RuleKind::Underride => {
+ NewPushRule::Underride(rule) => {
global.underride.replace(
ConditionalPushRuleInit {
- actions: body.actions,
+ actions: rule.actions,
default: false,
enabled: true,
- rule_id: body.rule_id,
- conditions: body.conditions,
+ rule_id: rule.rule_id,
+ conditions: rule.conditions,
}
.into(),
);
}
- RuleKind::Sender => {
+ NewPushRule::Sender(rule) => {
global.sender.replace(
SimplePushRuleInit {
- actions: body.actions,
+ actions: rule.actions,
default: false,
enabled: true,
- rule_id: body.rule_id,
+ rule_id: rule.rule_id,
}
.into(),
);
}
- RuleKind::Room => {
+ NewPushRule::Room(rule) => {
global.room.replace(
SimplePushRuleInit {
- actions: body.actions,
+ actions: rule.actions,
default: false,
enabled: true,
- rule_id: body.rule_id,
+ rule_id: rule.rule_id,
}
.into(),
);
}
- RuleKind::Content => {
+ NewPushRule::Content(rule) => {
global.content.replace(
PatternedPushRuleInit {
- actions: body.actions,
+ actions: rule.actions,
default: false,
enabled: true,
- rule_id: body.rule_id,
- pattern: body.pattern.unwrap_or_default(),
+ rule_id: rule.rule_id,
+ pattern: rule.pattern,
}
.into(),
);
}
- _ => {}
}
services().account_data.update(
@@ -208,11 +207,11 @@ pub async fn set_pushrule_route(
///
/// Gets the actions of a single specified push rule for this user.
pub async fn get_pushrule_actions_route(
- body: Ruma<get_pushrule_actions::v3::IncomingRequest>,
+ body: Ruma<get_pushrule_actions::v3::Request>,
) -> Result<get_pushrule_actions::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- if body.scope != "global" {
+ if body.scope != RuleScope::Global {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Scopes other than 'global' are not supported.",
@@ -269,11 +268,11 @@ pub async fn get_pushrule_actions_route(
///
/// Sets the actions of a single specified push rule for this user.
pub async fn set_pushrule_actions_route(
- body: Ruma<set_pushrule_actions::v3::IncomingRequest>,
+ body: Ruma<set_pushrule_actions::v3::Request>,
) -> Result<set_pushrule_actions::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- if body.scope != "global" {
+ if body.scope != RuleScope::Global {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Scopes other than 'global' are not supported.",
@@ -344,11 +343,11 @@ pub async fn set_pushrule_actions_route(
///
/// Gets the enabled status of a single specified push rule for this user.
pub async fn get_pushrule_enabled_route(
- body: Ruma<get_pushrule_enabled::v3::IncomingRequest>,
+ body: Ruma<get_pushrule_enabled::v3::Request>,
) -> Result<get_pushrule_enabled::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- if body.scope != "global" {
+ if body.scope != RuleScope::Global {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Scopes other than 'global' are not supported.",
@@ -407,11 +406,11 @@ pub async fn get_pushrule_enabled_route(
///
/// Sets the enabled status of a single specified push rule for this user.
pub async fn set_pushrule_enabled_route(
- body: Ruma<set_pushrule_enabled::v3::IncomingRequest>,
+ body: Ruma<set_pushrule_enabled::v3::Request>,
) -> Result<set_pushrule_enabled::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- if body.scope != "global" {
+ if body.scope != RuleScope::Global {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Scopes other than 'global' are not supported.",
@@ -487,11 +486,11 @@ pub async fn set_pushrule_enabled_route(
///
/// Deletes a single specified push rule for this user.
pub async fn delete_pushrule_route(
- body: Ruma<delete_pushrule::v3::IncomingRequest>,
+ body: Ruma<delete_pushrule::v3::Request>,
) -> Result<delete_pushrule::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- if body.scope != "global" {
+ if body.scope != RuleScope::Global {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Scopes other than 'global' are not supported.",
@@ -575,9 +574,10 @@ pub async fn set_pushers_route(
body: Ruma<set_pusher::v3::Request>,
) -> Result<set_pusher::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- let pusher = body.pusher.clone();
- services().pusher.set_pusher(sender_user, pusher)?;
+ services()
+ .pusher
+ .set_pusher(sender_user, body.action.clone())?;
Ok(set_pusher::v3::Response::default())
}
diff --git a/src/api/client_server/read_marker.rs b/src/api/client_server/read_marker.rs
index d529c6a..b12468a 100644
--- a/src/api/client_server/read_marker.rs
+++ b/src/api/client_server/read_marker.rs
@@ -16,7 +16,7 @@ use std::collections::BTreeMap;
/// - Updates fully-read account data event to `fully_read`
/// - If `read_receipt` is set: Update private marker and public read receipt EDU
pub async fn set_read_marker_route(
- body: Ruma<set_read_marker::v3::IncomingRequest>,
+ body: Ruma<set_read_marker::v3::Request>,
) -> Result<set_read_marker::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -89,7 +89,7 @@ pub async fn set_read_marker_route(
///
/// Sets private read marker and public read receipt EDU.
pub async fn create_receipt_route(
- body: Ruma<create_receipt::v3::IncomingRequest>,
+ body: Ruma<create_receipt::v3::Request>,
) -> Result<create_receipt::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/redact.rs b/src/api/client_server/redact.rs
index ab586c0..a29a561 100644
--- a/src/api/client_server/redact.rs
+++ b/src/api/client_server/redact.rs
@@ -14,7 +14,7 @@ use serde_json::value::to_raw_value;
///
/// - TODO: Handle txn id
pub async fn redact_event_route(
- body: Ruma<redact_event::v3::IncomingRequest>,
+ body: Ruma<redact_event::v3::Request>,
) -> Result<redact_event::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let body = body.body;
diff --git a/src/api/client_server/report.rs b/src/api/client_server/report.rs
index e45820e..ab5027c 100644
--- a/src/api/client_server/report.rs
+++ b/src/api/client_server/report.rs
@@ -10,7 +10,7 @@ use ruma::{
/// Reports an inappropriate event to homeserver admins
///
pub async fn report_event_route(
- body: Ruma<report_content::v3::IncomingRequest>,
+ body: Ruma<report_content::v3::Request>,
) -> Result<report_content::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs
index 097f0e1..830e085 100644
--- a/src/api/client_server/room.rs
+++ b/src/api/client_server/room.rs
@@ -46,7 +46,7 @@ use tracing::{info, warn};
/// - Send events implied by `name` and `topic`
/// - Send invite events
pub async fn create_room_route(
- body: Ruma<create_room::v3::IncomingRequest>,
+ body: Ruma<create_room::v3::Request>,
) -> Result<create_room::v3::Response> {
use create_room::v3::RoomPreset;
@@ -398,7 +398,7 @@ pub async fn create_room_route(
// 8. Events implied by invite (and TODO: invite_3pid)
drop(state_lock);
for user_id in &body.invite {
- let _ = invite_helper(sender_user, user_id, &room_id, body.is_direct).await;
+ let _ = invite_helper(sender_user, user_id, &room_id, None, body.is_direct).await;
}
// Homeserver specific stuff
@@ -421,7 +421,7 @@ pub async fn create_room_route(
///
/// - You have to currently be joined to the room (TODO: Respect history visibility)
pub async fn get_room_event_route(
- body: Ruma<get_room_event::v3::IncomingRequest>,
+ body: Ruma<get_room_event::v3::Request>,
) -> Result<get_room_event::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -452,7 +452,7 @@ pub async fn get_room_event_route(
///
/// - Only users joined to the room are allowed to call this TODO: Allow any user to call it if history_visibility is world readable
pub async fn get_room_aliases_route(
- body: Ruma<aliases::v3::IncomingRequest>,
+ body: Ruma<aliases::v3::Request>,
) -> Result<aliases::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -488,7 +488,7 @@ pub async fn get_room_aliases_route(
/// - Moves local aliases
/// - Modifies old room power levels to prevent users from speaking
pub async fn upgrade_room_route(
- body: Ruma<upgrade_room::v3::IncomingRequest>,
+ body: Ruma<upgrade_room::v3::Request>,
) -> Result<upgrade_room::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/search.rs b/src/api/client_server/search.rs
index 1ba9cdf..51255d5 100644
--- a/src/api/client_server/search.rs
+++ b/src/api/client_server/search.rs
@@ -15,7 +15,7 @@ use std::collections::BTreeMap;
///
/// - Only works if the user is currently joined to the room (TODO: Respect history visibility)
pub async fn search_events_route(
- body: Ruma<search_events::v3::IncomingRequest>,
+ body: Ruma<search_events::v3::Request>,
) -> Result<search_events::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -103,7 +103,7 @@ pub async fn search_events_route(
.take(limit)
.collect();
- let next_batch = if results.len() < limit as usize {
+ let next_batch = if results.len() < limit {
None
} else {
Some((skip + limit).to_string())
diff --git a/src/api/client_server/session.rs b/src/api/client_server/session.rs
index 7c8c128..64c0072 100644
--- a/src/api/client_server/session.rs
+++ b/src/api/client_server/session.rs
@@ -4,7 +4,7 @@ use ruma::{
api::client::{
error::ErrorKind,
session::{get_login_types, login, logout, logout_all},
- uiaa::IncomingUserIdentifier,
+ uiaa::UserIdentifier,
},
UserId,
};
@@ -22,7 +22,7 @@ struct Claims {
/// Get the supported login types of this server. One of these should be used as the `type` field
/// when logging in.
pub async fn get_login_types_route(
- _body: Ruma<get_login_types::v3::IncomingRequest>,
+ _body: Ruma<get_login_types::v3::Request>,
) -> Result<get_login_types::v3::Response> {
Ok(get_login_types::v3::Response::new(vec![
get_login_types::v3::LoginType::Password(Default::default()),
@@ -40,15 +40,15 @@ pub async fn get_login_types_route(
///
/// Note: You can use [`GET /_matrix/client/r0/login`](fn.get_supported_versions_route.html) to see
/// supported login types.
-pub async fn login_route(body: Ruma<login::v3::IncomingRequest>) -> Result<login::v3::Response> {
+pub async fn login_route(body: Ruma<login::v3::Request>) -> Result<login::v3::Response> {
// Validate login method
// TODO: Other login methods
let user_id = match &body.login_info {
- login::v3::IncomingLoginInfo::Password(login::v3::IncomingPassword {
+ login::v3::LoginInfo::Password(login::v3::Password {
identifier,
password,
}) => {
- let username = if let IncomingUserIdentifier::UserIdOrLocalpart(user_id) = identifier {
+ let username = if let UserIdentifier::UserIdOrLocalpart(user_id) = identifier {
user_id.to_lowercase()
} else {
return Err(Error::BadRequest(ErrorKind::Forbidden, "Bad login type."));
@@ -84,7 +84,7 @@ pub async fn login_route(body: Ruma<login::v3::IncomingRequest>) -> Result<login
user_id
}
- login::v3::IncomingLoginInfo::Token(login::v3::IncomingToken { token }) => {
+ login::v3::LoginInfo::Token(login::v3::Token { token }) => {
if let Some(jwt_decoding_key) = services().globals.jwt_decoding_key() {
let token = jsonwebtoken::decode::<Claims>(
token,
diff --git a/src/api/client_server/state.rs b/src/api/client_server/state.rs
index 36466b8..d9c1464 100644
--- a/src/api/client_server/state.rs
+++ b/src/api/client_server/state.rs
@@ -25,7 +25,7 @@ use ruma::{
/// - Tries to send the event into the room, auth rules will determine if it is allowed
/// - If event is new canonical_alias: Rejects if alias is incorrect
pub async fn send_state_event_for_key_route(
- body: Ruma<send_state_event::v3::IncomingRequest>,
+ body: Ruma<send_state_event::v3::Request>,
) -> Result<send_state_event::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -50,7 +50,7 @@ pub async fn send_state_event_for_key_route(
/// - Tries to send the event into the room, auth rules will determine if it is allowed
/// - If event is new canonical_alias: Rejects if alias is incorrect
pub async fn send_state_event_for_empty_key_route(
- body: Ruma<send_state_event::v3::IncomingRequest>,
+ body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -81,7 +81,7 @@ pub async fn send_state_event_for_empty_key_route(
///
/// - If not joined: Only works if current room history visibility is world readable
pub async fn get_state_events_route(
- body: Ruma<get_state_events::v3::IncomingRequest>,
+ body: Ruma<get_state_events::v3::Request>,
) -> Result<get_state_events::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -133,7 +133,7 @@ pub async fn get_state_events_route(
///
/// - If not joined: Only works if current room history visibility is world readable
pub async fn get_state_events_for_key_route(
- body: Ruma<get_state_events_for_key::v3::IncomingRequest>,
+ body: Ruma<get_state_events_for_key::v3::Request>,
) -> Result<get_state_events_for_key::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -188,7 +188,7 @@ pub async fn get_state_events_for_key_route(
///
/// - If not joined: Only works if current room history visibility is world readable
pub async fn get_state_events_for_empty_key_route(
- body: Ruma<get_state_events_for_key::v3::IncomingRequest>,
+ body: Ruma<get_state_events_for_key::v3::Request>,
) -> Result<RumaResponse<get_state_events_for_key::v3::Response>> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index 828ae19..568a23c 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -1,7 +1,7 @@
use crate::{services, Error, Result, Ruma, RumaResponse};
use ruma::{
api::client::{
- filter::{IncomingFilterDefinition, LazyLoadOptions},
+ filter::{FilterDefinition, LazyLoadOptions},
sync::sync_events::{self, DeviceLists, UnreadNotificationsCount},
uiaa::UiaaResponse,
},
@@ -55,7 +55,7 @@ use tracing::error;
/// - Sync is handled in an async task, multiple requests from the same device with the same
/// `since` will be cached
pub async fn sync_events_route(
- body: Ruma<sync_events::v3::IncomingRequest>,
+ body: Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
@@ -124,7 +124,7 @@ pub async fn sync_events_route(
async fn sync_helper_wrapper(
sender_user: OwnedUserId,
sender_device: OwnedDeviceId,
- body: sync_events::v3::IncomingRequest,
+ body: sync_events::v3::Request,
tx: Sender<Option<Result<sync_events::v3::Response>>>,
) {
let since = body.since.clone();
@@ -157,12 +157,12 @@ async fn sync_helper_wrapper(
async fn sync_helper(
sender_user: OwnedUserId,
sender_device: OwnedDeviceId,
- body: sync_events::v3::IncomingRequest,
+ body: sync_events::v3::Request,
// bool = caching allowed
) -> Result<(sync_events::v3::Response, bool), Error> {
use sync_events::v3::{
- Ephemeral, GlobalAccountData, IncomingFilter, InviteState, InvitedRoom, JoinedRoom,
- LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
+ Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, LeftRoom,
+ Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
};
// TODO: match body.set_presence {
@@ -176,9 +176,9 @@ async fn sync_helper(
// Load filter
let filter = match body.filter {
- None => IncomingFilterDefinition::default(),
- Some(IncomingFilter::FilterDefinition(filter)) => filter,
- Some(IncomingFilter::FilterId(filter_id)) => services()
+ None => FilterDefinition::default(),
+ Some(Filter::FilterDefinition(filter)) => filter,
+ Some(Filter::FilterId(filter_id)) => services()
.users
.get_filter(&sender_user, &filter_id)?
.unwrap_or_default(),
@@ -282,9 +282,8 @@ async fn sync_helper(
let send_notification_counts = !timeline_pdus.is_empty()
|| services()
.rooms
- .edus
- .read_receipt
- .last_privateread_update(&sender_user, &room_id)?
+ .user
+ .last_notification_read(&sender_user, &room_id)?
> since;
let mut timeline_users = HashSet::new();
@@ -389,13 +388,35 @@ async fn sync_helper(
))
};
+ let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
+ .and_then(|shortstatehash| {
+ services()
+ .rooms
+ .state_accessor
+ .state_get(
+ shortstatehash,
+ &StateEventType::RoomMember,
+ sender_user.as_str(),
+ )
+ .transpose()
+ })
+ .transpose()?
+ .and_then(|pdu| {
+ serde_json::from_str(pdu.content.get())
+ .map_err(|_| Error::bad_database("Invalid PDU in database."))
+ .ok()
+ });
+
+ let joined_since_last_sync =
+ since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
+
let (
heroes,
joined_member_count,
invited_member_count,
joined_since_last_sync,
state_events,
- ) = if since_shortstatehash.is_none() {
+ ) = if since_shortstatehash.is_none() || joined_since_last_sync {
// Probably since = 0, we will do an initial sync
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
@@ -488,23 +509,6 @@ async fn sync_helper(
// Incremental /sync
let since_shortstatehash = since_shortstatehash.unwrap();
- let since_sender_member: Option<RoomMemberEventContent> = services()
- .rooms
- .state_accessor
- .state_get(
- since_shortstatehash,
- &StateEventType::RoomMember,
- sender_user.as_str(),
- )?
- .and_then(|pdu| {
- serde_json::from_str(pdu.content.get())
- .map_err(|_| Error::bad_database("Invalid PDU in database."))
- .ok()
- });
-
- let joined_since_last_sync = since_sender_member
- .map_or(true, |member| member.membership != MembershipState::Join);
-
let mut state_events = Vec::new();
let mut lazy_loaded = HashSet::new();
@@ -869,7 +873,7 @@ async fn sync_helper(
let since_state_ids = match since_shortstatehash {
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
- None => BTreeMap::new(),
+ None => HashMap::new(),
};
let left_event_id = match services().rooms.state_accessor.room_state_get_id(
@@ -905,7 +909,7 @@ async fn sync_helper(
let leave_shortstatekey = services()
.rooms
.short
- .get_or_create_shortstatekey(&StateEventType::RoomMember, &sender_user.as_str())?;
+ .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
left_state_ids.insert(leave_shortstatekey, left_event_id);
diff --git a/src/api/client_server/tag.rs b/src/api/client_server/tag.rs
index c87e233..16f1600 100644
--- a/src/api/client_server/tag.rs
+++ b/src/api/client_server/tag.rs
@@ -14,7 +14,7 @@ use std::collections::BTreeMap;
///
/// - Inserts the tag into the tag event of the room account data.
pub async fn update_tag_route(
- body: Ruma<create_tag::v3::IncomingRequest>,
+ body: Ruma<create_tag::v3::Request>,
) -> Result<create_tag::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -58,7 +58,7 @@ pub async fn update_tag_route(
///
/// - Removes the tag from the tag event of the room account data.
pub async fn delete_tag_route(
- body: Ruma<delete_tag::v3::IncomingRequest>,
+ body: Ruma<delete_tag::v3::Request>,
) -> Result<delete_tag::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@@ -98,9 +98,7 @@ pub async fn delete_tag_route(
/// Returns tags on the room.
///
/// - Gets the tag event of the room account data.
-pub async fn get_tags_route(
- body: Ruma<get_tags::v3::IncomingRequest>,
-) -> Result<get_tags::v3::Response> {
+pub async fn get_tags_route(body: Ruma<get_tags::v3::Request>) -> Result<get_tags::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let event = services().account_data.get(
diff --git a/src/api/client_server/thirdparty.rs b/src/api/client_server/thirdparty.rs
index 5665ad6..c2c1adf 100644
--- a/src/api/client_server/thirdparty.rs
+++ b/src/api/client_server/thirdparty.rs
@@ -7,7 +7,7 @@ use std::collections::BTreeMap;
///
/// TODO: Fetches all metadata about protocols supported by the homeserver.
pub async fn get_protocols_route(
- _body: Ruma<get_protocols::v3::IncomingRequest>,
+ _body: Ruma<get_protocols::v3::Request>,
) -> Result<get_protocols::v3::Response> {
// TODO
Ok(get_protocols::v3::Response {
diff --git a/src/api/client_server/to_device.rs b/src/api/client_server/to_device.rs
index 139b845..26db4e4 100644
--- a/src/api/client_server/to_device.rs
+++ b/src/api/client_server/to_device.rs
@@ -14,7 +14,7 @@ use ruma::{
///
/// Send a to-device event to a set of client devices.
pub async fn send_event_to_device_route(
- body: Ruma<send_event_to_device::v3::IncomingRequest>,
+ body: Ruma<send_event_to_device::v3::Request>,
) -> Result<send_event_to_device::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_deref();
diff --git a/src/api/client_server/typing.rs b/src/api/client_server/typing.rs
index ecc926f..43217e1 100644
--- a/src/api/client_server/typing.rs
+++ b/src/api/client_server/typing.rs
@@ -5,7 +5,7 @@ use ruma::api::client::{error::ErrorKind, typing::create_typing_event};
///
/// Sets the typing state of the sender user.
pub async fn create_typing_event_route(
- body: Ruma<create_typing_event::v3::IncomingRequest>,
+ body: Ruma<create_typing_event::v3::Request>,
) -> Result<create_typing_event::v3::Response> {
use create_typing_event::v3::Typing;
diff --git a/src/api/client_server/unversioned.rs b/src/api/client_server/unversioned.rs
index 8a5c3d2..526598b 100644
--- a/src/api/client_server/unversioned.rs
+++ b/src/api/client_server/unversioned.rs
@@ -15,7 +15,7 @@ use crate::{Result, Ruma};
/// Note: Unstable features are used while developing new features. Clients should avoid using
/// unstable features in their stable releases
pub async fn get_supported_versions_route(
- _body: Ruma<get_supported_versions::IncomingRequest>,
+ _body: Ruma<get_supported_versions::Request>,
) -> Result<get_supported_versions::Response> {
let resp = get_supported_versions::Response {
versions: vec![
diff --git a/src/api/client_server/user_directory.rs b/src/api/client_server/user_directory.rs
index 518daa5..c30bac5 100644
--- a/src/api/client_server/user_directory.rs
+++ b/src/api/client_server/user_directory.rs
@@ -14,7 +14,7 @@ use ruma::{
/// - Hides any local users that aren't in any public rooms (i.e. those that have the join rule set to public)
/// and don't share a room with the sender
pub async fn search_users_route(
- body: Ruma<search_users::v3::IncomingRequest>,
+ body: Ruma<search_users::v3::Request>,
) -> Result<search_users::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let limit = u64::from(body.limit) as usize;
diff --git a/src/api/client_server/voip.rs b/src/api/client_server/voip.rs
index 6b1ee40..4990c17 100644
--- a/src/api/client_server/voip.rs
+++ b/src/api/client_server/voip.rs
@@ -10,7 +10,7 @@ type HmacSha1 = Hmac<Sha1>;
///
/// TODO: Returns information about the recommended turn server.
pub async fn turn_server_route(
- body: Ruma<get_turn_server_info::v3::IncomingRequest>,
+ body: Ruma<get_turn_server_info::v3::Request>,
) -> Result<get_turn_server_info::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
diff --git a/src/api/ruma_wrapper/axum.rs b/src/api/ruma_wrapper/axum.rs
index 3870250..74f506f 100644
--- a/src/api/ruma_wrapper/axum.rs
+++ b/src/api/ruma_wrapper/axum.rs
@@ -311,8 +311,7 @@ impl Credentials for XMatrix {
fn decode(value: &http::HeaderValue) -> Option<Self> {
debug_assert!(
value.as_bytes().starts_with(b"X-Matrix "),
- "HeaderValue to decode should start with \"X-Matrix ..\", received = {:?}",
- value,
+ "HeaderValue to decode should start with \"X-Matrix ..\", received = {value:?}",
);
let parameters = str::from_utf8(&value.as_bytes()["X-Matrix ".len()..])
diff --git a/src/api/server_server.rs b/src/api/server_server.rs
index 320e396..fc3e2c0 100644
--- a/src/api/server_server.rs
+++ b/src/api/server_server.rs
@@ -31,7 +31,7 @@ use ruma::{
EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, OutgoingResponse,
SendAccessToken,
},
- directory::{IncomingFilter, IncomingRoomNetwork},
+ directory::{Filter, RoomNetwork},
events::{
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
room::{
@@ -42,8 +42,8 @@ use ruma::{
},
serde::{Base64, JsonObject, Raw},
to_device::DeviceIdOrAllDevices,
- CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
- OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
+ CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
+ OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
};
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use std::{
@@ -55,7 +55,7 @@ use std::{
time::{Duration, Instant, SystemTime},
};
-use tracing::{error, info, warn};
+use tracing::{debug, error, info, warn};
/// Wraps either an literal IP address plus port, or a hostname plus complement
/// (colon-plus-port if it was specified).
@@ -84,8 +84,8 @@ pub enum FedDest {
impl FedDest {
fn into_https_string(self) -> String {
match self {
- Self::Literal(addr) => format!("https://{}", addr),
- Self::Named(host, port) => format!("https://{}{}", host, port),
+ Self::Literal(addr) => format!("https://{addr}"),
+ Self::Named(host, port) => format!("https://{host}{port}"),
}
}
@@ -294,13 +294,7 @@ where
} else {
Err(Error::FederationError(
destination.to_owned(),
- RumaError::try_from_http_response(http_response).map_err(|e| {
- warn!(
- "Invalid {} response from {} on: {} {}",
- status, &destination, url, e
- );
- Error::BadServerResponse("Server returned bad error response.")
- })?,
+ RumaError::from_http_response(http_response),
))
}
}
@@ -391,7 +385,7 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
}
if let Some(port) = force_port {
- FedDest::Named(delegated_hostname, format!(":{}", port))
+ FedDest::Named(delegated_hostname, format!(":{port}"))
} else {
add_port_to_hostname(&delegated_hostname)
}
@@ -433,7 +427,7 @@ async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDe
}
if let Some(port) = force_port {
- FedDest::Named(hostname.clone(), format!(":{}", port))
+ FedDest::Named(hostname.clone(), format!(":{port}"))
} else {
add_port_to_hostname(&hostname)
}
@@ -466,7 +460,7 @@ async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
if let Ok(Some(host_port)) = services()
.globals
.dns_resolver()
- .srv_lookup(format!("_matrix._tcp.{}", hostname))
+ .srv_lookup(format!("_matrix._tcp.{hostname}"))
.await
.map(|srv| {
srv.iter().next().map(|result| {
@@ -488,10 +482,7 @@ async fn request_well_known(destination: &str) -> Option<String> {
&services()
.globals
.default_client()
- .get(&format!(
- "https://{}/.well-known/matrix/server",
- destination
- ))
+ .get(&format!("https://{destination}/.well-known/matrix/server"))
.send()
.await
.ok()?
@@ -586,7 +577,7 @@ pub async fn get_server_keys_deprecated_route() -> impl IntoResponse {
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_filtered_route(
- body: Ruma<get_public_rooms_filtered::v1::IncomingRequest>,
+ body: Ruma<get_public_rooms_filtered::v1::Request>,
) -> Result<get_public_rooms_filtered::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -613,7 +604,7 @@ pub async fn get_public_rooms_filtered_route(
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_route(
- body: Ruma<get_public_rooms::v1::IncomingRequest>,
+ body: Ruma<get_public_rooms::v1::Request>,
) -> Result<get_public_rooms::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -623,8 +614,8 @@ pub async fn get_public_rooms_route(
None,
body.limit,
body.since.as_deref(),
- &IncomingFilter::default(),
- &IncomingRoomNetwork::Matrix,
+ &Filter::default(),
+ &RoomNetwork::Matrix,
)
.await?;
@@ -640,7 +631,7 @@ pub async fn get_public_rooms_route(
///
/// Push EDUs and PDUs to this server.
pub async fn send_transaction_message_route(
- body: Ruma<send_transaction_message::v1::IncomingRequest>,
+ body: Ruma<send_transaction_message::v1::Request>,
) -> Result<send_transaction_message::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -664,16 +655,11 @@ pub async fn send_transaction_message_route(
// let mut auth_cache = EventMap::new();
for pdu in &body.pdus {
- // We do not add the event_id field to the pdu here because of signature and hashes checks
- let (event_id, value) = match gen_event_id_canonical_json(pdu) {
- Ok(t) => t,
- Err(_) => {
- // Event could not be converted to canonical json
- continue;
- }
- };
+ let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
+ warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
+ Error::BadServerResponse("Invalid PDU in server response")
+ })?;
- // 0. Check the server is in the room
let room_id: OwnedRoomId = match value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
@@ -681,14 +667,26 @@ pub async fn send_transaction_message_route(
Some(id) => id,
None => {
// Event is invalid
- resolved_map.insert(
- event_id,
- Err(Error::bad_database("Event needs a valid RoomId.")),
- );
continue;
}
};
+ let room_version_id = match services().rooms.state.get_room_version(&room_id) {
+ Ok(v) => v,
+ Err(_) => {
+ continue;
+ }
+ };
+
+ let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
+ Ok(t) => t,
+ Err(_) => {
+ // Event could not be converted to canonical json
+ continue;
+ }
+ };
+ // We do not add the event_id field to the pdu here because of signature and hashes checks
+
services()
.rooms
.event_handler
@@ -724,7 +722,7 @@ pub async fn send_transaction_message_route(
drop(mutex_lock);
let elapsed = start_time.elapsed();
- warn!(
+ debug!(
"Handling transaction of event {} took {}m{}s",
event_id,
elapsed.as_secs() / 60,
@@ -909,7 +907,7 @@ pub async fn send_transaction_message_route(
///
/// - Only works if a user of this server is currently invited or joined the room
pub async fn get_event_route(
- body: Ruma<get_event::v1::IncomingRequest>,
+ body: Ruma<get_event::v1::Request>,
) -> Result<get_event::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -956,7 +954,7 @@ pub async fn get_event_route(
///
/// Retrieves events that the sender is missing.
pub async fn get_missing_events_route(
- body: Ruma<get_missing_events::v1::IncomingRequest>,
+ body: Ruma<get_missing_events::v1::Request>,
) -> Result<get_missing_events::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -1035,7 +1033,7 @@ pub async fn get_missing_events_route(
///
/// - This does not include the event itself
pub async fn get_event_authorization_route(
- body: Ruma<get_event_authorization::v1::IncomingRequest>,
+ body: Ruma<get_event_authorization::v1::Request>,
) -> Result<get_event_authorization::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -1094,7 +1092,7 @@ pub async fn get_event_authorization_route(
///
/// Retrieves the current state of the room.
pub async fn get_room_state_route(
- body: Ruma<get_room_state::v1::IncomingRequest>,
+ body: Ruma<get_room_state::v1::Request>,
) -> Result<get_room_state::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -1174,7 +1172,7 @@ pub async fn get_room_state_route(
///
/// Retrieves the current state of the room.
pub async fn get_room_state_ids_route(
- body: Ruma<get_room_state_ids::v1::IncomingRequest>,
+ body: Ruma<get_room_state_ids::v1::Request>,
) -> Result<get_room_state_ids::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -1235,7 +1233,7 @@ pub async fn get_room_state_ids_route(
///
/// Creates a join template.
pub async fn create_join_event_template_route(
- body: Ruma<prepare_join_event::v1::IncomingRequest>,
+ body: Ruma<prepare_join_event::v1::Request>,
) -> Result<prepare_join_event::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -1407,7 +1405,8 @@ async fn create_join_event(
// let mut auth_cache = EventMap::new();
// We do not add the event_id field to the pdu here because of signature and hashes checks
- let (event_id, value) = match gen_event_id_canonical_json(pdu) {
+ let room_version_id = services().rooms.state.get_room_version(room_id)?;
+ let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
Ok(t) => t,
Err(_) => {
// Event could not be converted to canonical json
@@ -1478,6 +1477,7 @@ async fn create_join_event(
.filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
+ event: None, // TODO: handle restricted joins
})
}
@@ -1485,7 +1485,7 @@ async fn create_join_event(
///
/// Submits a signed join event.
pub async fn create_join_event_v1_route(
- body: Ruma<create_join_event::v1::IncomingRequest>,
+ body: Ruma<create_join_event::v1::Request>,
) -> Result<create_join_event::v1::Response> {
let sender_servername = body
.sender_servername
@@ -1501,7 +1501,7 @@ pub async fn create_join_event_v1_route(
///
/// Submits a signed join event.
pub async fn create_join_event_v2_route(
- body: Ruma<create_join_event::v2::IncomingRequest>,
+ body: Ruma<create_join_event::v2::Request>,
) -> Result<create_join_event::v2::Response> {
let sender_servername = body
.sender_servername
@@ -1517,7 +1517,7 @@ pub async fn create_join_event_v2_route(
///
/// Invites a remote user to a room.
pub async fn create_invite_route(
- body: Ruma<create_invite::v2::IncomingRequest>,
+ body: Ruma<create_invite::v2::Request>,
) -> Result<create_invite::v2::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -1609,8 +1609,12 @@ pub async fn create_invite_route(
invite_state.push(pdu.to_stripped_state_event());
- // If the room already exists, the remote server will notify us about the join via /send
- if !services().rooms.metadata.exists(&pdu.room_id)? {
+ // If we are active in the room, the remote server will notify us about the join via /send
+ if !services()
+ .rooms
+ .state_cache
+ .server_in_room(services().globals.server_name(), &body.room_id)?
+ {
services().rooms.state_cache.update_membership(
&body.room_id,
&invited_user,
@@ -1630,7 +1634,7 @@ pub async fn create_invite_route(
///
/// Gets information on all devices of the user.
pub async fn get_devices_route(
- body: Ruma<get_devices::v1::IncomingRequest>,
+ body: Ruma<get_devices::v1::Request>,
) -> Result<get_devices::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -1677,7 +1681,7 @@ pub async fn get_devices_route(
///
/// Resolve a room alias to a room id.
pub async fn get_room_information_route(
- body: Ruma<get_room_information::v1::IncomingRequest>,
+ body: Ruma<get_room_information::v1::Request>,
) -> Result<get_room_information::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
@@ -1702,7 +1706,7 @@ pub async fn get_room_information_route(
///
/// Gets information on a profile.
pub async fn get_profile_information_route(
- body: Ruma<get_profile_information::v1::IncomingRequest>,
+ body: Ruma<get_profile_information::v1::Request>,
) -> Result<get_profile_information::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
diff --git a/src/config/mod.rs b/src/config/mod.rs
index 3c3a764..31a586f 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -40,6 +40,8 @@ pub struct Config {
pub max_request_size: u32,
#[serde(default = "default_max_concurrent_requests")]
pub max_concurrent_requests: u16,
+ #[serde(default = "default_max_fetch_prev_events")]
+ pub max_fetch_prev_events: u16,
#[serde(default = "false_fn")]
pub allow_registration: bool,
#[serde(default = "true_fn")]
@@ -183,7 +185,7 @@ impl fmt::Display for Config {
("Turn TTL", &self.turn_ttl.to_string()),
("Turn URIs", {
let mut lst = vec![];
- for item in self.turn_uris.to_vec().into_iter().enumerate() {
+ for item in self.turn_uris.iter().cloned().enumerate() {
let (_, uri): (usize, String) = item;
lst.push(uri);
}
@@ -191,13 +193,13 @@ impl fmt::Display for Config {
}),
];
- let mut msg: String = "Active config values:\n\n".to_string();
+ let mut msg: String = "Active config values:\n\n".to_owned();
for line in lines.into_iter().enumerate() {
msg += &format!("{}: {}\n", line.1 .0, line.1 .1);
}
- write!(f, "{}", msg)
+ write!(f, "{msg}")
}
}
@@ -222,7 +224,7 @@ fn default_database_backend() -> String {
}
fn default_db_cache_capacity_mb() -> f64 {
- 10.0
+ 1000.0
}
fn default_conduit_cache_capacity_modifier() -> f64 {
@@ -230,7 +232,7 @@ fn default_conduit_cache_capacity_modifier() -> f64 {
}
fn default_rocksdb_max_open_files() -> i32 {
- 20
+ 1000
}
fn default_pdu_cache_capacity() -> u32 {
@@ -249,6 +251,10 @@ fn default_max_concurrent_requests() -> u16 {
100
}
+fn default_max_fetch_prev_events() -> u16 {
+ 100_u16
+}
+
fn default_log() -> String {
"warn,state_res=warn,_=off,sled=off".to_owned()
}
diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs
index 4961fd7..b69efb6 100644
--- a/src/database/abstraction/sqlite.rs
+++ b/src/database/abstraction/sqlite.rs
@@ -106,7 +106,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
}
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
- self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?;
+ self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"), [])?;
Ok(Arc::new(SqliteTable {
engine: Arc::clone(self),
@@ -135,7 +135,6 @@ type TupleOfBytes = (Vec<u8>, Vec<u8>);
impl SqliteTable {
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
- //dbg!(&self.name);
Ok(guard
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())?
.query_row([key], |row| row.get(0))
@@ -143,7 +142,6 @@ impl SqliteTable {
}
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> {
- //dbg!(&self.name);
guard.execute(
format!(
"INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)",
@@ -176,10 +174,7 @@ impl SqliteTable {
statement
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
- .map(move |r| {
- //dbg!(&name);
- r.unwrap()
- }),
+ .map(move |r| r.unwrap()),
);
Box::new(PreparedStatementIterator {
@@ -276,10 +271,7 @@ impl KvTree for SqliteTable {
statement
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
- .map(move |r| {
- //dbg!(&name);
- r.unwrap()
- }),
+ .map(move |r| r.unwrap()),
);
Box::new(PreparedStatementIterator {
iterator,
@@ -301,10 +293,7 @@ impl KvTree for SqliteTable {
statement
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
- .map(move |r| {
- //dbg!(&name);
- r.unwrap()
- }),
+ .map(move |r| r.unwrap()),
);
Box::new(PreparedStatementIterator {
diff --git a/src/database/key_value/pusher.rs b/src/database/key_value/pusher.rs
index 3dfceb6..50a6fac 100644
--- a/src/database/key_value/pusher.rs
+++ b/src/database/key_value/pusher.rs
@@ -1,38 +1,36 @@
use ruma::{
- api::client::push::{get_pushers, set_pusher},
+ api::client::push::{set_pusher, Pusher},
UserId,
};
use crate::{database::KeyValueDatabase, service, utils, Error, Result};
impl service::pusher::Data for KeyValueDatabase {
- fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> {
- let mut key = sender.as_bytes().to_vec();
- key.push(0xff);
- key.extend_from_slice(pusher.pushkey.as_bytes());
-
- // There are 2 kinds of pushers but the spec says: null deletes the pusher.
- if pusher.kind.is_none() {
- return self
- .senderkey_pusher
- .remove(&key)
- .map(|_| ())
- .map_err(Into::into);
+ fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::PusherAction) -> Result<()> {
+ match &pusher {
+ set_pusher::v3::PusherAction::Post(data) => {
+ let mut key = sender.as_bytes().to_vec();
+ key.push(0xff);
+ key.extend_from_slice(data.pusher.ids.pushkey.as_bytes());
+ self.senderkey_pusher.insert(
+ &key,
+ &serde_json::to_vec(&pusher).expect("Pusher is valid JSON value"),
+ )?;
+ Ok(())
+ }
+ set_pusher::v3::PusherAction::Delete(ids) => {
+ let mut key = sender.as_bytes().to_vec();
+ key.push(0xff);
+ key.extend_from_slice(ids.pushkey.as_bytes());
+ self.senderkey_pusher
+ .remove(&key)
+ .map(|_| ())
+ .map_err(Into::into)
+ }
}
-
- self.senderkey_pusher.insert(
- &key,
- &serde_json::to_vec(&pusher).expect("Pusher is valid JSON value"),
- )?;
-
- Ok(())
}
- fn get_pusher(
- &self,
- sender: &UserId,
- pushkey: &str,
- ) -> Result<Option<get_pushers::v3::Pusher>> {
+ fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result<Option<Pusher>> {
let mut senderkey = sender.as_bytes().to_vec();
senderkey.push(0xff);
senderkey.extend_from_slice(pushkey.as_bytes());
@@ -46,7 +44,7 @@ impl service::pusher::Data for KeyValueDatabase {
.transpose()
}
- fn get_pushers(&self, sender: &UserId) -> Result<Vec<get_pushers::v3::Pusher>> {
+ fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> {
let mut prefix = sender.as_bytes().to_vec();
prefix.push(0xff);
diff --git a/src/database/key_value/rooms/edus/typing.rs b/src/database/key_value/rooms/edus/typing.rs
index 4a2f0f9..5709192 100644
--- a/src/database/key_value/rooms/edus/typing.rs
+++ b/src/database/key_value/rooms/edus/typing.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::{collections::HashSet, mem};
use ruma::{OwnedUserId, RoomId, UserId};
@@ -53,6 +53,47 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase {
Ok(())
}
+ fn typings_maintain(&self, room_id: &RoomId) -> Result<()> {
+ let mut prefix = room_id.as_bytes().to_vec();
+ prefix.push(0xff);
+
+ let current_timestamp = utils::millis_since_unix_epoch();
+
+ let mut found_outdated = false;
+
+ // Find all outdated edus before inserting a new one
+ for outdated_edu in self
+ .typingid_userid
+ .scan_prefix(prefix)
+ .map(|(key, _)| {
+ Ok::<_, Error>((
+ key.clone(),
+ utils::u64_from_bytes(
+ &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| {
+ Error::bad_database("RoomTyping has invalid timestamp or delimiters.")
+ })?[0..mem::size_of::<u64>()],
+ )
+ .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?,
+ ))
+ })
+ .filter_map(|r| r.ok())
+ .take_while(|&(_, timestamp)| timestamp < current_timestamp)
+ {
+ // This is an outdated edu (time > timestamp)
+ self.typingid_userid.remove(&outdated_edu.0)?;
+ found_outdated = true;
+ }
+
+ if found_outdated {
+ self.roomid_lasttypingupdate.insert(
+ room_id.as_bytes(),
+ &services().globals.next_count()?.to_be_bytes(),
+ )?;
+ }
+
+ Ok(())
+ }
+
fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
Ok(self
.roomid_lasttypingupdate
diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs
index 70e59ac..0f0c0dc 100644
--- a/src/database/key_value/rooms/state_accessor.rs
+++ b/src/database/key_value/rooms/state_accessor.rs
@@ -1,7 +1,4 @@
-use std::{
- collections::{BTreeMap, HashMap},
- sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
use async_trait::async_trait;
@@ -9,7 +6,7 @@ use ruma::{events::StateEventType, EventId, RoomId};
#[async_trait]
impl service::rooms::state_accessor::Data for KeyValueDatabase {
- async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
+ async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
let full_state = services()
.rooms
.state_compressor
@@ -17,7 +14,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
.pop()
.expect("there is always one layer")
.1;
- let mut result = BTreeMap::new();
+ let mut result = HashMap::new();
let mut i = 0;
for compressed in full_state.into_iter() {
let parsed = services()
diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs
index 3d8d1c8..4c43572 100644
--- a/src/database/key_value/rooms/user.rs
+++ b/src/database/key_value/rooms/user.rs
@@ -7,12 +7,20 @@ impl service::rooms::user::Data for KeyValueDatabase {
let mut userroom_id = user_id.as_bytes().to_vec();
userroom_id.push(0xff);
userroom_id.extend_from_slice(room_id.as_bytes());
+ let mut roomuser_id = room_id.as_bytes().to_vec();
+ roomuser_id.push(0xff);
+ roomuser_id.extend_from_slice(user_id.as_bytes());
self.userroomid_notificationcount
.insert(&userroom_id, &0_u64.to_be_bytes())?;
self.userroomid_highlightcount
.insert(&userroom_id, &0_u64.to_be_bytes())?;
+ self.roomuserid_lastnotificationread.insert(
+ &roomuser_id,
+ &services().globals.next_count()?.to_be_bytes(),
+ )?;
+
Ok(())
}
@@ -44,6 +52,23 @@ impl service::rooms::user::Data for KeyValueDatabase {
.unwrap_or(Ok(0))
}
+ fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
+ let mut key = room_id.as_bytes().to_vec();
+ key.push(0xff);
+ key.extend_from_slice(user_id.as_bytes());
+
+ Ok(self
+ .roomuserid_lastnotificationread
+ .get(&key)?
+ .map(|bytes| {
+ utils::u64_from_bytes(&bytes).map_err(|_| {
+ Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
+ })
+ })
+ .transpose()?
+ .unwrap_or(0))
+ }
+
fn associate_token_shortstatehash(
&self,
room_id: &RoomId,
diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs
index cd5a535..1cabab0 100644
--- a/src/database/key_value/users.rs
+++ b/src/database/key_value/users.rs
@@ -1,7 +1,7 @@
use std::{collections::BTreeMap, mem::size_of};
use ruma::{
- api::client::{device::Device, error::ErrorKind, filter::IncomingFilterDefinition},
+ api::client::{device::Device, error::ErrorKind, filter::FilterDefinition},
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::{AnyToDeviceEvent, StateEventType},
serde::Raw,
@@ -899,7 +899,7 @@ impl service::users::Data for KeyValueDatabase {
}
/// Creates a new sync filter. Returns the filter id.
- fn create_filter(&self, user_id: &UserId, filter: &IncomingFilterDefinition) -> Result<String> {
+ fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String> {
let filter_id = utils::random_string(4);
let mut key = user_id.as_bytes().to_vec();
@@ -914,11 +914,7 @@ impl service::users::Data for KeyValueDatabase {
Ok(filter_id)
}
- fn get_filter(
- &self,
- user_id: &UserId,
- filter_id: &str,
- ) -> Result<Option<IncomingFilterDefinition>> {
+ fn get_filter(&self, user_id: &UserId, filter_id: &str) -> Result<Option<FilterDefinition>> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(filter_id.as_bytes());
diff --git a/src/database/mod.rs b/src/database/mod.rs
index 15ee137..78bb358 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -7,7 +7,8 @@ use directories::ProjectDirs;
use lru_cache::LruCache;
use ruma::{
events::{
- push_rules::PushRulesEventContent, room::message::RoomMessageEventContent,
+ push_rules::{PushRulesEvent, PushRulesEventContent},
+ room::message::RoomMessageEventContent,
GlobalAccountDataEvent, GlobalAccountDataEventType, StateEventType,
},
push::Ruleset,
@@ -98,6 +99,7 @@ pub struct KeyValueDatabase {
pub(super) userroomid_notificationcount: Arc<dyn KvTree>, // NotifyCount = u64
pub(super) userroomid_highlightcount: Arc<dyn KvTree>, // HightlightCount = u64
+ pub(super) roomuserid_lastnotificationread: Arc<dyn KvTree>, // LastNotificationRead = u64
/// Remember the current state hash of a room.
pub(super) roomid_shortstatehash: Arc<dyn KvTree>,
@@ -256,7 +258,7 @@ impl KeyValueDatabase {
};
if config.max_request_size < 1024 {
- eprintln!("ERROR: Max request size is less than 1KB. Please increase it.");
+ error!(?config.max_request_size, "Max request size is less than 1KB. Please increase it.");
}
let db_raw = Box::new(Self {
@@ -317,6 +319,7 @@ impl KeyValueDatabase {
userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?,
userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?,
+ roomuserid_lastnotificationread: builder.open_tree("userroomid_highlightcount")?,
statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?,
shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?,
@@ -405,7 +408,7 @@ impl KeyValueDatabase {
}
// If the database has any data, perform data migrations before starting
- let latest_database_version = 11;
+ let latest_database_version = 12;
if services().users.count()? > 0 {
// MIGRATIONS
@@ -480,7 +483,7 @@ impl KeyValueDatabase {
for user in services().rooms.state_cache.room_members(&room?) {
let user = user?;
if user.server_name() != services().globals.server_name() {
- println!("Migration: Creating user {}", user);
+ info!(?user, "Migration: creating user");
services().users.create(&user, None)?;
}
}
@@ -542,7 +545,6 @@ impl KeyValueDatabase {
current_state: HashSet<_>,
last_roomstates: &mut HashMap<_, _>| {
counter += 1;
- println!("counter: {}", counter);
let last_roomsstatehash = last_roomstates.get(current_room);
let states_parents = last_roomsstatehash.map_or_else(
@@ -739,15 +741,13 @@ impl KeyValueDatabase {
new_key.extend_from_slice(word);
new_key.push(0xff);
new_key.extend_from_slice(pdu_id_count);
- println!("old {:?}", key);
- println!("new {:?}", new_key);
Some((new_key, Vec::new()))
})
.peekable();
while iter.peek().is_some() {
db.tokenids.insert_batch(&mut iter.by_ref().take(1000))?;
- println!("smaller batch done");
+ debug!("Inserted smaller batch");
}
info!("Deleting starts");
@@ -757,7 +757,6 @@ impl KeyValueDatabase {
.iter()
.filter_map(|(key, _)| {
if key.starts_with(b"!") {
- println!("del {:?}", key);
Some(key)
} else {
None
@@ -766,7 +765,6 @@ impl KeyValueDatabase {
.collect();
for key in batch2 {
- println!("del");
db.tokenids.remove(&key)?;
}
@@ -801,7 +799,81 @@ impl KeyValueDatabase {
warn!("Migration: 10 -> 11 finished");
}
- assert_eq!(11, latest_database_version);
+ if services().globals.database_version()? < 12 {
+ for username in services().users.list_local_users().unwrap() {
+ let user =
+ UserId::parse_with_server_name(username, services().globals.server_name())
+ .unwrap();
+
+ let raw_rules_list = services()
+ .account_data
+ .get(
+ None,
+ &user,
+ GlobalAccountDataEventType::PushRules.to_string().into(),
+ )
+ .unwrap()
+ .expect("Username is invalid");
+
+ let mut account_data =
+ serde_json::from_str::<PushRulesEvent>(raw_rules_list.get()).unwrap();
+ let rules_list = &mut account_data.content.global;
+
+ //content rule
+ {
+ let content_rule_transformation =
+ [".m.rules.contains_user_name", ".m.rule.contains_user_name"];
+
+ let rule = rules_list.content.get(content_rule_transformation[0]);
+ if rule.is_some() {
+ let mut rule = rule.unwrap().clone();
+ rule.rule_id = content_rule_transformation[1].to_owned();
+ rules_list.content.remove(content_rule_transformation[0]);
+ rules_list.content.insert(rule);
+ }
+ }
+
+ //underride rules
+ {
+ let underride_rule_transformation = [
+ [".m.rules.call", ".m.rule.call"],
+ [".m.rules.room_one_to_one", ".m.rule.room_one_to_one"],
+ [
+ ".m.rules.encrypted_room_one_to_one",
+ ".m.rule.encrypted_room_one_to_one",
+ ],
+ [".m.rules.message", ".m.rule.message"],
+ [".m.rules.encrypted", ".m.rule.encrypted"],
+ ];
+
+ for transformation in underride_rule_transformation {
+ let rule = rules_list.underride.get(transformation[0]);
+ if let Some(rule) = rule {
+ let mut rule = rule.clone();
+ rule.rule_id = transformation[1].to_owned();
+ rules_list.underride.remove(transformation[0]);
+ rules_list.underride.insert(rule);
+ }
+ }
+ }
+
+ services().account_data.update(
+ None,
+ &user,
+ GlobalAccountDataEventType::PushRules.to_string().into(),
+ &serde_json::to_value(account_data).expect("to json value always works"),
+ )?;
+ }
+
+ services().globals.bump_database_version(12)?;
+
+ warn!("Migration: 11 -> 12 finished");
+ }
+
+ assert_eq!(
+ services().globals.database_version().unwrap(),
+ latest_database_version
+ );
info!(
"Loaded {} database with version {}",
@@ -868,7 +940,6 @@ impl KeyValueDatabase {
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
- use tracing::info;
use std::time::{Duration, Instant};
@@ -884,23 +955,23 @@ impl KeyValueDatabase {
#[cfg(unix)]
tokio::select! {
_ = i.tick() => {
- info!("cleanup: Timer ticked");
+ debug!("cleanup: Timer ticked");
}
_ = s.recv() => {
- info!("cleanup: Received SIGHUP");
+ debug!("cleanup: Received SIGHUP");
}
};
#[cfg(not(unix))]
{
i.tick().await;
- info!("cleanup: Timer ticked")
+ debug!("cleanup: Timer ticked")
}
let start = Instant::now();
if let Err(e) = services().globals.cleanup() {
error!("cleanup: Errored: {}", e);
} else {
- info!("cleanup: Finished in {:?}", start.elapsed());
+ debug!("cleanup: Finished in {:?}", start.elapsed());
}
}
});
diff --git a/src/lib.rs b/src/lib.rs
index 3d7f7ae..dc6a9d2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -23,7 +23,7 @@ pub use utils::error::{Error, Result};
pub static SERVICES: RwLock<Option<&'static Services>> = RwLock::new(None);
-pub fn services<'a>() -> &'static Services {
+pub fn services() -> &'static Services {
SERVICES
.read()
.unwrap()
diff --git a/src/main.rs b/src/main.rs
index af19d2a..da80507 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -24,10 +24,15 @@ use figment::{
};
use http::{
header::{self, HeaderName},
- Method, Uri,
+ Method, StatusCode, Uri,
+};
+use ruma::api::{
+ client::{
+ error::{Error as RumaError, ErrorBody, ErrorKind},
+ uiaa::UiaaResponse,
+ },
+ IncomingRequest,
};
-use opentelemetry::trace::{FutureExt, Tracer};
-use ruma::api::{client::error::ErrorKind, IncomingRequest};
use tokio::signal;
use tower::ServiceBuilder;
use tower_http::{
@@ -35,7 +40,7 @@ use tower_http::{
trace::TraceLayer,
ServiceBuilderExt as _,
};
-use tracing::{info, warn};
+use tracing::{error, info, warn};
use tracing_subscriber::{prelude::*, EnvFilter};
pub use conduit::*; // Re-export everything from the library crate
@@ -63,65 +68,74 @@ async fn main() {
let config = match raw_config.extract::<Config>() {
Ok(s) => s,
Err(e) => {
- eprintln!("It looks like your config is invalid. The following error occured while parsing it: {}", e);
+ eprintln!("It looks like your config is invalid. The following error occurred: {e}");
std::process::exit(1);
}
};
config.warn_deprecated();
- if let Err(e) = KeyValueDatabase::load_or_create(config).await {
- eprintln!(
- "The database couldn't be loaded or created. The following error occured: {}",
- e
- );
- std::process::exit(1);
- };
-
- let config = &services().globals.config;
-
- let start = async {
- run_server().await.unwrap();
- };
-
if config.allow_jaeger {
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_agent_pipeline()
+ .with_auto_split_batch(true)
+ .with_service_name("conduit")
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
+ let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
+
+ let filter_layer = match EnvFilter::try_new(&config.log) {
+ Ok(s) => s,
+ Err(e) => {
+ eprintln!(
+ "It looks like your log config is invalid. The following error occurred: {e}"
+ );
+ EnvFilter::try_new("warn").unwrap()
+ }
+ };
+
+ let subscriber = tracing_subscriber::Registry::default()
+ .with(filter_layer)
+ .with(telemetry);
+ tracing::subscriber::set_global_default(subscriber).unwrap();
+ } else if config.tracing_flame {
+ let registry = tracing_subscriber::Registry::default();
+ let (flame_layer, _guard) =
+ tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
+ let flame_layer = flame_layer.with_empty_samples(false);
- let span = tracer.start("conduit");
- start.with_current_context().await;
- drop(span);
+ let filter_layer = EnvFilter::new("trace,h2=off");
- println!("exporting");
- opentelemetry::global::shutdown_tracer_provider();
+ let subscriber = registry.with(filter_layer).with(flame_layer);
+ tracing::subscriber::set_global_default(subscriber).unwrap();
} else {
let registry = tracing_subscriber::Registry::default();
- if config.tracing_flame {
- let (flame_layer, _guard) =
- tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
- let flame_layer = flame_layer.with_empty_samples(false);
-
- let filter_layer = EnvFilter::new("trace,h2=off");
-
- let subscriber = registry.with(filter_layer).with(flame_layer);
- tracing::subscriber::set_global_default(subscriber).unwrap();
- start.await;
- } else {
- let fmt_layer = tracing_subscriber::fmt::Layer::new();
- let filter_layer = match EnvFilter::try_new(&config.log) {
- Ok(s) => s,
- Err(e) => {
- eprintln!("It looks like your log config is invalid. The following error occurred: {}", e);
- EnvFilter::try_new("warn").unwrap()
- }
- };
+ let fmt_layer = tracing_subscriber::fmt::Layer::new();
+ let filter_layer = match EnvFilter::try_new(&config.log) {
+ Ok(s) => s,
+ Err(e) => {
+ eprintln!("It looks like your config is invalid. The following error occured while parsing it: {e}");
+ EnvFilter::try_new("warn").unwrap()
+ }
+ };
- let subscriber = registry.with(filter_layer).with(fmt_layer);
- tracing::subscriber::set_global_default(subscriber).unwrap();
- start.await;
- }
+ let subscriber = registry.with(filter_layer).with(fmt_layer);
+ tracing::subscriber::set_global_default(subscriber).unwrap();
+ }
+
+ info!("Loading database");
+ if let Err(error) = KeyValueDatabase::load_or_create(config).await {
+ error!(?error, "The database couldn't be loaded or created");
+
+ std::process::exit(1);
+ };
+ let config = &services().globals.config;
+
+ info!("Starting server");
+ run_server().await.unwrap();
+
+ if config.allow_jaeger {
+ opentelemetry::global::shutdown_tracer_provider();
}
}
@@ -180,10 +194,20 @@ async fn run_server() -> io::Result<()> {
match &config.tls {
Some(tls) => {
let conf = RustlsConfig::from_pem_file(&tls.certs, &tls.key).await?;
- bind_rustls(addr, conf).handle(handle).serve(app).await?;
+ let server = bind_rustls(addr, conf).handle(handle).serve(app);
+
+ #[cfg(feature = "systemd")]
+ let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]);
+
+ server.await?
}
None => {
- bind(addr).handle(handle).serve(app).await?;
+ let server = bind(addr).handle(handle).serve(app);
+
+ #[cfg(feature = "systemd")]
+ let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]);
+
+ server.await?
}
}
@@ -191,21 +215,29 @@ async fn run_server() -> io::Result<()> {
info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers...");
services().globals.rotate.fire();
+ #[cfg(feature = "systemd")]
+ let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]);
+
Ok(())
}
async fn unrecognized_method<B>(
req: axum::http::Request<B>,
next: axum::middleware::Next<B>,
-) -> std::result::Result<axum::response::Response, axum::http::StatusCode> {
+) -> std::result::Result<axum::response::Response, StatusCode> {
let method = req.method().clone();
let uri = req.uri().clone();
let inner = next.run(req).await;
if inner.status() == axum::http::StatusCode::METHOD_NOT_ALLOWED {
warn!("Method not allowed: {method} {uri}");
- return Ok(
- Error::BadRequest(ErrorKind::Unrecognized, "Unrecognized request").into_response(),
- );
+ return Ok(RumaResponse(UiaaResponse::MatrixError(RumaError {
+ body: ErrorBody::Standard {
+ kind: ErrorKind::Unrecognized,
+ message: "M_UNRECOGNIZED: Unrecognized request".to_owned(),
+ },
+ status_code: StatusCode::METHOD_NOT_ALLOWED,
+ }))
+ .into_response());
}
Ok(inner)
}
@@ -464,7 +496,7 @@ macro_rules! impl_ruma_handler {
let meta = Req::METADATA;
let method_filter = method_to_filter(meta.method);
- for path in IntoIterator::into_iter([meta.unstable_path, meta.r0_path, meta.stable_path]).flatten() {
+ for path in meta.history.all_paths() {
let handler = self.clone();
router = router.route(path, on(method_filter, |$( $ty: $ty, )* req| async move {
@@ -498,6 +530,6 @@ fn method_to_filter(method: Method) -> MethodFilter {
Method::POST => MethodFilter::POST,
Method::PUT => MethodFilter::PUT,
Method::TRACE => MethodFilter::TRACE,
- m => panic!("Unsupported HTTP method: {:?}", m),
+ m => panic!("Unsupported HTTP method: {m:?}"),
}
}
diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs
index 5766b2f..77f351a 100644
--- a/src/service/admin/mod.rs
+++ b/src/service/admin/mod.rs
@@ -287,13 +287,11 @@ impl Service {
Err(error) => {
let markdown_message = format!(
"Encountered an error while handling the command:\n\
- ```\n{}\n```",
- error,
+ ```\n{error}\n```",
);
let html_message = format!(
"Encountered an error while handling the command:\n\
- <pre>\n{}\n</pre>",
- error,
+ <pre>\n{error}\n</pre>",
);
RoomMessageEventContent::text_html(markdown_message, html_message)
@@ -338,17 +336,14 @@ impl Service {
match parsed_config {
Ok(yaml) => match services().appservice.register_appservice(yaml) {
Ok(id) => RoomMessageEventContent::text_plain(format!(
- "Appservice registered with ID: {}.",
- id
+ "Appservice registered with ID: {id}."
)),
Err(e) => RoomMessageEventContent::text_plain(format!(
- "Failed to register appservice: {}",
- e
+ "Failed to register appservice: {e}"
)),
},
Err(e) => RoomMessageEventContent::text_plain(format!(
- "Could not parse appservice config: {}",
- e
+ "Could not parse appservice config: {e}"
)),
}
} else {
@@ -365,8 +360,7 @@ impl Service {
{
Ok(()) => RoomMessageEventContent::text_plain("Appservice unregistered."),
Err(e) => RoomMessageEventContent::text_plain(format!(
- "Failed to unregister appservice: {}",
- e
+ "Failed to unregister appservice: {e}"
)),
},
AdminCommand::ListAppservices => {
@@ -459,8 +453,7 @@ impl Service {
.count();
let elapsed = start.elapsed();
RoomMessageEventContent::text_plain(format!(
- "Loaded auth chain with length {} in {:?}",
- count, elapsed
+ "Loaded auth chain with length {count} in {elapsed:?}"
))
} else {
RoomMessageEventContent::text_plain("Event not found.")
@@ -474,30 +467,26 @@ impl Service {
Ok(value) => {
match ruma::signatures::reference_hash(&value, &RoomVersionId::V6) {
Ok(hash) => {
- let event_id = EventId::parse(format!("${}", hash));
+ let event_id = EventId::parse(format!("${hash}"));
match serde_json::from_value::<PduEvent>(
serde_json::to_value(value).expect("value is json"),
) {
Ok(pdu) => RoomMessageEventContent::text_plain(format!(
- "EventId: {:?}\n{:#?}",
- event_id, pdu
+ "EventId: {event_id:?}\n{pdu:#?}"
)),
Err(e) => RoomMessageEventContent::text_plain(format!(
- "EventId: {:?}\nCould not parse event: {}",
- event_id, e
+ "EventId: {event_id:?}\nCould not parse event: {e}"
)),
}
}
Err(e) => RoomMessageEventContent::text_plain(format!(
- "Could not parse PDU JSON: {:?}",
- e
+ "Could not parse PDU JSON: {e:?}"
)),
}
}
Err(e) => RoomMessageEventContent::text_plain(format!(
- "Invalid json in command body: {}",
- e
+ "Invalid json in command body: {e}"
)),
}
} else {
@@ -545,8 +534,7 @@ impl Service {
AdminCommand::DatabaseMemoryUsage => match services().globals.db.memory_usage() {
Ok(response) => RoomMessageEventContent::text_plain(response),
Err(e) => RoomMessageEventContent::text_plain(format!(
- "Failed to get database memory usage: {}",
- e
+ "Failed to get database memory usage: {e}"
)),
},
AdminCommand::ShowConfig => {
@@ -561,8 +549,7 @@ impl Service {
Ok(id) => id,
Err(e) => {
return Ok(RoomMessageEventContent::text_plain(format!(
- "The supplied username is not a valid username: {}",
- e
+ "The supplied username is not a valid username: {e}"
)))
}
};
@@ -589,17 +576,16 @@ impl Service {
.set_password(&user_id, Some(new_password.as_str()))
{
Ok(()) => RoomMessageEventContent::text_plain(format!(
- "Successfully reset the password for user {}: {}",
- user_id, new_password
+ "Successfully reset the password for user {user_id}: {new_password}"
)),
Err(e) => RoomMessageEventContent::text_plain(format!(
- "Couldn't reset the password for user {}: {}",
- user_id, e
+ "Couldn't reset the password for user {user_id}: {e}"
)),
}
}
AdminCommand::CreateUser { username, password } => {
- let password = password.unwrap_or(utils::random_string(AUTO_GEN_PASSWORD_LENGTH));
+ let password =
+ password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH));
// Validate user id
let user_id = match UserId::parse_with_server_name(
username.as_str().to_lowercase(),
@@ -608,8 +594,7 @@ impl Service {
Ok(id) => id,
Err(e) => {
return Ok(RoomMessageEventContent::text_plain(format!(
- "The supplied username is not a valid username: {}",
- e
+ "The supplied username is not a valid username: {e}"
)))
}
};
@@ -675,8 +660,7 @@ impl Service {
let user_id = Arc::<UserId>::from(user_id);
if services().users.exists(&user_id)? {
RoomMessageEventContent::text_plain(format!(
- "Making {} leave all rooms before deactivation...",
- user_id
+ "Making {user_id} leave all rooms before deactivation..."
));
services().users.deactivate_account(&user_id)?;
@@ -686,13 +670,11 @@ impl Service {
}
RoomMessageEventContent::text_plain(format!(
- "User {} has been deactivated",
- user_id
+ "User {user_id} has been deactivated"
))
} else {
RoomMessageEventContent::text_plain(format!(
- "User {} doesn't exist on this server",
- user_id
+ "User {user_id} doesn't exist on this server"
))
}
}
@@ -708,8 +690,7 @@ impl Service {
Ok(user_id) => user_ids.push(user_id),
Err(_) => {
return Ok(RoomMessageEventContent::text_plain(format!(
- "{} is not a valid username",
- username
+ "{username} is not a valid username"
)))
}
}
@@ -732,9 +713,8 @@ impl Service {
}
for &user_id in &user_ids {
- match services().users.deactivate_account(user_id) {
- Ok(_) => deactivation_count += 1,
- Err(_) => {}
+ if services().users.deactivate_account(user_id).is_ok() {
+ deactivation_count += 1
}
}
@@ -746,8 +726,7 @@ impl Service {
if admins.is_empty() {
RoomMessageEventContent::text_plain(format!(
- "Deactivated {} accounts.",
- deactivation_count
+ "Deactivated {deactivation_count} accounts."
))
} else {
RoomMessageEventContent::text_plain(format!("Deactivated {} accounts.\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts", deactivation_count, admins.join(", ")))
@@ -767,8 +746,8 @@ impl Service {
fn usage_to_html(&self, text: &str, server_name: &ServerName) -> String {
// Replace `@conduit:servername:-subcmdname` with `@conduit:servername: subcmdname`
let text = text.replace(
- &format!("@conduit:{}:-", server_name),
- &format!("@conduit:{}: ", server_name),
+ &format!("@conduit:{server_name}:-"),
+ &format!("@conduit:{server_name}: "),
);
// For the conduit admin room, subcommands become main commands
diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs
index affc051..cd3be08 100644
--- a/src/service/globals/mod.rs
+++ b/src/service/globals/mod.rs
@@ -168,7 +168,7 @@ impl Service {
.supported_room_versions()
.contains(&s.config.default_room_version)
{
- error!("Room version in config isn't supported, falling back to default version");
+ error!(config=?s.config.default_room_version, fallback=?crate::config::default_default_room_version(), "Room version in config isn't supported, falling back to default version");
s.config.default_room_version = crate::config::default_default_room_version();
};
@@ -222,6 +222,10 @@ impl Service {
self.config.max_request_size
}
+ pub fn max_fetch_prev_events(&self) -> u16 {
+ self.config.max_fetch_prev_events
+ }
+
pub fn allow_registration(&self) -> bool {
self.config.allow_registration
}
@@ -341,6 +345,7 @@ impl Service {
fn reqwest_client_builder(config: &Config) -> Result<reqwest::ClientBuilder> {
let mut reqwest_client_builder = reqwest::Client::builder()
+ .pool_max_idle_per_host(0)
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(60 * 3));
diff --git a/src/service/pdu.rs b/src/service/pdu.rs
index 593a687..554f3be 100644
--- a/src/service/pdu.rs
+++ b/src/service/pdu.rs
@@ -1,4 +1,4 @@
-use crate::{services, Error};
+use crate::Error;
use ruma::{
events::{
room::member::RoomMemberEventContent, AnyEphemeralRoomEvent, AnyStateEvent,
@@ -7,7 +7,7 @@ use ruma::{
},
serde::Raw,
state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
- OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
+ OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
};
use serde::{Deserialize, Serialize};
use serde_json::{
@@ -334,23 +334,17 @@ impl Ord for PduEvent {
/// Returns a tuple of the new `EventId` and the PDU as a `BTreeMap<String, CanonicalJsonValue>`.
pub(crate) fn gen_event_id_canonical_json(
pdu: &RawJsonValue,
+ room_version_id: &RoomVersionId,
) -> crate::Result<(OwnedEventId, CanonicalJsonObject)> {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
- let room_id = value
- .get("room_id")
- .and_then(|id| RoomId::parse(id.as_str()?).ok())
- .ok_or_else(|| Error::bad_database("PDU in db has invalid room_id."))?;
-
- let room_version_id = services().rooms.state.get_room_version(&room_id);
-
let event_id = format!(
"${}",
// Anything higher than version3 behaves the same
- ruma::signatures::reference_hash(&value, &room_version_id?)
+ ruma::signatures::reference_hash(&value, room_version_id)
.expect("ruma can calculate reference hashes")
)
.try_into()
diff --git a/src/service/pusher/data.rs b/src/service/pusher/data.rs
index e317121..2062f56 100644
--- a/src/service/pusher/data.rs
+++ b/src/service/pusher/data.rs
@@ -1,16 +1,15 @@
use crate::Result;
use ruma::{
- api::client::push::{get_pushers, set_pusher},
+ api::client::push::{set_pusher, Pusher},
UserId,
};
pub trait Data: Send + Sync {
- fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()>;
+ fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::PusherAction) -> Result<()>;
- fn get_pusher(&self, sender: &UserId, pushkey: &str)
- -> Result<Option<get_pushers::v3::Pusher>>;
+ fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result<Option<Pusher>>;
- fn get_pushers(&self, sender: &UserId) -> Result<Vec<get_pushers::v3::Pusher>>;
+ fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>>;
fn get_pushkeys<'a>(&'a self, sender: &UserId)
-> Box<dyn Iterator<Item = Result<String>> + 'a>;
diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs
index 7fee276..ba096a2 100644
--- a/src/service/pusher/mod.rs
+++ b/src/service/pusher/mod.rs
@@ -6,7 +6,7 @@ use crate::{services, Error, PduEvent, Result};
use bytes::BytesMut;
use ruma::{
api::{
- client::push::{get_pushers, set_pusher, PusherKind},
+ client::push::{set_pusher, Pusher, PusherKind},
push_gateway::send_event_notification::{
self,
v1::{Device, Notification, NotificationCounts, NotificationPriority},
@@ -23,30 +23,26 @@ use ruma::{
};
use std::{fmt::Debug, mem};
-use tracing::{error, info, warn};
+use tracing::{info, warn};
pub struct Service {
pub db: &'static dyn Data,
}
impl Service {
- pub fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()> {
+ pub fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::PusherAction) -> Result<()> {
self.db.set_pusher(sender, pusher)
}
- pub fn get_pusher(
- &self,
- sender: &UserId,
- pushkey: &str,
- ) -> Result<Option<get_pushers::v3::Pusher>> {
+ pub fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result<Option<Pusher>> {
self.db.get_pusher(sender, pushkey)
}
- pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<get_pushers::v3::Pusher>> {
+ pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> {
self.db.get_pushers(sender)
}
- pub fn get_pushkeys<'a>(&'a self, sender: &UserId) -> Box<dyn Iterator<Item = Result<String>>> {
+ pub fn get_pushkeys(&self, sender: &UserId) -> Box<dyn Iterator<Item = Result<String>>> {
self.db.get_pushkeys(sender)
}
@@ -140,7 +136,7 @@ impl Service {
&self,
user: &UserId,
unread: UInt,
- pusher: &get_pushers::v3::Pusher,
+ pusher: &Pusher,
ruleset: Ruleset,
pdu: &PduEvent,
) -> Result<()> {
@@ -221,91 +217,85 @@ impl Service {
async fn send_notice(
&self,
unread: UInt,
- pusher: &get_pushers::v3::Pusher,
+ pusher: &Pusher,
tweaks: Vec<Tweak>,
event: &PduEvent,
) -> Result<()> {
// TODO: email
- if pusher.kind == PusherKind::Email {
- return Ok(());
- }
-
- // TODO:
- // Two problems with this
- // 1. if "event_id_only" is the only format kind it seems we should never add more info
- // 2. can pusher/devices have conflicting formats
- let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
- let url = if let Some(url) = &pusher.data.url {
- url
- } else {
- error!("Http Pusher must have URL specified.");
- return Ok(());
- };
-
- let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone());
- let mut data_minus_url = pusher.data.clone();
- // The url must be stripped off according to spec
- data_minus_url.url = None;
- device.data = data_minus_url.into();
+ match &pusher.kind {
+ PusherKind::Http(http) => {
+ // TODO:
+ // Two problems with this
+ // 1. if "event_id_only" is the only format kind it seems we should never add more info
+ // 2. can pusher/devices have conflicting formats
+ let event_id_only = http.format == Some(PushFormat::EventIdOnly);
+
+ let mut device = Device::new(pusher.ids.app_id.clone(), pusher.ids.pushkey.clone());
+ device.data.default_payload = http.default_payload.clone();
+ device.data.format = http.format.clone();
+
+ // Tweaks are only added if the format is NOT event_id_only
+ if !event_id_only {
+ device.tweaks = tweaks.clone();
+ }
- // Tweaks are only added if the format is NOT event_id_only
- if !event_id_only {
- device.tweaks = tweaks.clone();
- }
+ let d = vec![device];
+ let mut notifi = Notification::new(d);
+
+ notifi.prio = NotificationPriority::Low;
+ notifi.event_id = Some((*event.event_id).to_owned());
+ notifi.room_id = Some((*event.room_id).to_owned());
+ // TODO: missed calls
+ notifi.counts = NotificationCounts::new(unread, uint!(0));
+
+ if event.kind == RoomEventType::RoomEncrypted
+ || tweaks
+ .iter()
+ .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
+ {
+ notifi.prio = NotificationPriority::High
+ }
- let d = &[device];
- let mut notifi = Notification::new(d);
-
- notifi.prio = NotificationPriority::Low;
- notifi.event_id = Some(&event.event_id);
- notifi.room_id = Some(&event.room_id);
- // TODO: missed calls
- notifi.counts = NotificationCounts::new(unread, uint!(0));
-
- if event.kind == RoomEventType::RoomEncrypted
- || tweaks
- .iter()
- .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
- {
- notifi.prio = NotificationPriority::High
- }
+ if event_id_only {
+ self.send_request(&http.url, send_event_notification::v1::Request::new(notifi))
+ .await?;
+ } else {
+ notifi.sender = Some(event.sender.clone());
+ notifi.event_type = Some(event.kind.clone());
+ notifi.content = serde_json::value::to_raw_value(&event.content).ok();
+
+ if event.kind == RoomEventType::RoomMember {
+ notifi.user_is_target =
+ event.state_key.as_deref() == Some(event.sender.as_str());
+ }
+
+ notifi.sender_display_name = services().users.displayname(&event.sender)?;
+
+ let room_name = if let Some(room_name_pdu) = services()
+ .rooms
+ .state_accessor
+ .room_state_get(&event.room_id, &StateEventType::RoomName, "")?
+ {
+ serde_json::from_str::<RoomNameEventContent>(room_name_pdu.content.get())
+ .map_err(|_| {
+ Error::bad_database("Invalid room name event in database.")
+ })?
+ .name
+ } else {
+ None
+ };
+
+ notifi.room_name = room_name;
+
+ self.send_request(&http.url, send_event_notification::v1::Request::new(notifi))
+ .await?;
+ }
- if event_id_only {
- self.send_request(url, send_event_notification::v1::Request::new(notifi))
- .await?;
- } else {
- notifi.sender = Some(&event.sender);
- notifi.event_type = Some(&event.kind);
- let content = serde_json::value::to_raw_value(&event.content).ok();
- notifi.content = content.as_deref();
-
- if event.kind == RoomEventType::RoomMember {
- notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str());
+ Ok(())
}
-
- let user_name = services().users.displayname(&event.sender)?;
- notifi.sender_display_name = user_name.as_deref();
-
- let room_name = if let Some(room_name_pdu) = services()
- .rooms
- .state_accessor
- .room_state_get(&event.room_id, &StateEventType::RoomName, "")?
- {
- serde_json::from_str::<RoomNameEventContent>(room_name_pdu.content.get())
- .map_err(|_| Error::bad_database("Invalid room name event in database."))?
- .name
- } else {
- None
- };
-
- notifi.room_name = room_name.as_deref();
-
- self.send_request(url, send_event_notification::v1::Request::new(notifi))
- .await?;
+ // TODO: Handle email
+ PusherKind::Email(_) => Ok(()),
+ _ => Ok(()),
}
-
- // TODO: email
-
- Ok(())
}
}
diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs
index d3b6e40..da1944e 100644
--- a/src/service/rooms/auth_chain/mod.rs
+++ b/src/service/rooms/auth_chain/mod.rs
@@ -6,7 +6,7 @@ use std::{
pub use data::Data;
use ruma::{api::client::error::ErrorKind, EventId, RoomId};
-use tracing::log::warn;
+use tracing::{debug, error, warn};
use crate::{services, Error, Result};
@@ -15,11 +15,7 @@ pub struct Service {
}
impl Service {
- #[tracing::instrument(skip(self))]
- pub fn get_cached_eventid_authchain<'a>(
- &'a self,
- key: &[u64],
- ) -> Result<Option<Arc<HashSet<u64>>>> {
+ pub fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result<Option<Arc<HashSet<u64>>>> {
self.db.get_cached_eventid_authchain(key)
}
@@ -89,10 +85,10 @@ impl Service {
.rooms
.auth_chain
.cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?;
- println!(
- "cache missed event {} with auth chain len {}",
- event_id,
- auth_chain.len()
+ debug!(
+ event_id = ?event_id,
+ chain_length = ?auth_chain.len(),
+ "Cache missed event"
);
chunk_cache.extend(auth_chain.iter());
@@ -102,11 +98,11 @@ impl Service {
}
};
}
- println!(
- "chunk missed with len {}, event hits2: {}, misses2: {}",
- chunk_cache.len(),
- hits2,
- misses2
+ debug!(
+ chunk_cache_length = ?chunk_cache.len(),
+ hits = ?hits2,
+ misses = ?misses2,
+ "Chunk missed",
);
let chunk_cache = Arc::new(chunk_cache);
services()
@@ -116,11 +112,11 @@ impl Service {
full_auth_chain.extend(chunk_cache.iter());
}
- println!(
- "total: {}, chunk hits: {}, misses: {}",
- full_auth_chain.len(),
- hits,
- misses
+ debug!(
+ chain_length = ?full_auth_chain.len(),
+ hits = ?hits,
+ misses = ?misses,
+ "Auth chain stats",
);
Ok(full_auth_chain
@@ -152,10 +148,10 @@ impl Service {
}
}
Ok(None) => {
- warn!("Could not find pdu mentioned in auth events: {}", event_id);
+ warn!(?event_id, "Could not find pdu mentioned in auth events");
}
- Err(e) => {
- warn!("Could not load event in auth chain: {} {}", event_id, e);
+ Err(error) => {
+ error!(?event_id, ?error, "Could not load event in auth chain");
}
}
}
diff --git a/src/service/rooms/edus/typing/data.rs b/src/service/rooms/edus/typing/data.rs
index c4ad867..3b1eecf 100644
--- a/src/service/rooms/edus/typing/data.rs
+++ b/src/service/rooms/edus/typing/data.rs
@@ -10,6 +10,9 @@ pub trait Data: Send + Sync {
/// Removes a user from typing before the timeout is reached.
fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
+ /// Makes sure that typing events with old timestamps get removed.
+ fn typings_maintain(&self, room_id: &RoomId) -> Result<()>;
+
/// Returns the count of the last typing update in this room.
fn last_typing_update(&self, room_id: &RoomId) -> Result<u64>;
diff --git a/src/service/rooms/edus/typing/mod.rs b/src/service/rooms/edus/typing/mod.rs
index d05ec90..7d44f7d 100644
--- a/src/service/rooms/edus/typing/mod.rs
+++ b/src/service/rooms/edus/typing/mod.rs
@@ -21,54 +21,15 @@ impl Service {
self.db.typing_remove(user_id, room_id)
}
- /* TODO: Do this in background thread?
/// Makes sure that typing events with old timestamps get removed.
- fn typings_maintain(
- &self,
- room_id: &RoomId,
- globals: &super::super::globals::Globals,
- ) -> Result<()> {
- let mut prefix = room_id.as_bytes().to_vec();
- prefix.push(0xff);
-
- let current_timestamp = utils::millis_since_unix_epoch();
-
- let mut found_outdated = false;
-
- // Find all outdated edus before inserting a new one
- for outdated_edu in self
- .typingid_userid
- .scan_prefix(prefix)
- .map(|(key, _)| {
- Ok::<_, Error>((
- key.clone(),
- utils::u64_from_bytes(
- &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| {
- Error::bad_database("RoomTyping has invalid timestamp or delimiters.")
- })?[0..mem::size_of::<u64>()],
- )
- .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?,
- ))
- })
- .filter_map(|r| r.ok())
- .take_while(|&(_, timestamp)| timestamp < current_timestamp)
- {
- // This is an outdated edu (time > timestamp)
- self.typingid_userid.remove(&outdated_edu.0)?;
- found_outdated = true;
- }
-
- if found_outdated {
- self.roomid_lasttypingupdate
- .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?;
- }
-
- Ok(())
+ fn typings_maintain(&self, room_id: &RoomId) -> Result<()> {
+ self.db.typings_maintain(room_id)
}
- */
/// Returns the count of the last typing update in this room.
pub fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
+ self.typings_maintain(room_id)?;
+
self.db.last_typing_update(room_id)
}
diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs
index 477a971..bc67f7a 100644
--- a/src/service/rooms/event_handler/mod.rs
+++ b/src/service/rooms/event_handler/mod.rs
@@ -7,7 +7,7 @@ use ruma::{
RoomVersionId,
};
use std::{
- collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet},
+ collections::{hash_map, BTreeMap, HashMap, HashSet},
pin::Pin,
sync::{Arc, RwLock, RwLockWriteGuard},
time::{Duration, Instant, SystemTime},
@@ -76,6 +76,7 @@ impl Service {
is_timeline_event: bool,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<Option<Vec<u8>>> {
+ // 0. Check the server is in the room
if !services().rooms.metadata.exists(room_id)? {
return Err(Error::BadRequest(
ErrorKind::NotFound,
@@ -101,6 +102,13 @@ impl Service {
.room_state_get(room_id, &StateEventType::RoomCreate, "")?
.ok_or_else(|| Error::bad_database("Failed to find create event in db."))?;
+ let create_event_content: RoomCreateEventContent =
+ serde_json::from_str(create_event.content.get()).map_err(|e| {
+ error!("Invalid create event: {}", e);
+ Error::BadDatabase("Invalid create event in db")
+ })?;
+ let room_version_id = &create_event_content.room_version;
+
let first_pdu_in_room = services()
.rooms
.timeline
@@ -127,13 +135,15 @@ impl Service {
origin,
&create_event,
room_id,
+ room_version_id,
pub_key_map,
incoming_pdu.prev_events.clone(),
)
.await?;
let mut errors = 0;
- for prev_id in dbg!(sorted_prev_events) {
+ debug!(events = ?sorted_prev_events, "Got previous events");
+ for prev_id in sorted_prev_events {
// Check for disabled again because it might have changed
if services().rooms.metadata.is_disabled(room_id)? {
return Err(Error::BadRequest(
@@ -303,7 +313,7 @@ impl Service {
Ok(ruma::signatures::Verified::Signatures) => {
// Redact
warn!("Calculated hash does not match: {}", event_id);
- match ruma::canonical_json::redact(&value, room_version_id) {
+ match ruma::canonical_json::redact(value, room_version_id, None) {
Ok(obj) => obj,
Err(_) => {
return Err(Error::BadRequest(
@@ -330,7 +340,7 @@ impl Service {
// 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// NOTE: Step 5 is not applied anymore because it failed too often
- warn!("Fetching auth events for {}", incoming_pdu.event_id);
+ debug!(event_id = ?incoming_pdu.event_id, "Fetching auth events");
self.fetch_and_handle_outliers(
origin,
&incoming_pdu
@@ -340,6 +350,7 @@ impl Service {
.collect::<Vec<_>>(),
create_event,
room_id,
+ room_version_id,
pub_key_map,
)
.await;
@@ -542,7 +553,7 @@ impl Service {
let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
for (sstatehash, prev_event) in extremity_sstatehashes {
- let mut leaf_state: BTreeMap<_, _> = services()
+ let mut leaf_state: HashMap<_, _> = services()
.rooms
.state_accessor
.state_full_ids(sstatehash)
@@ -627,8 +638,8 @@ impl Service {
.send_federation_request(
origin,
get_room_state_ids::v1::Request {
- room_id,
- event_id: &incoming_pdu.event_id,
+ room_id: room_id.to_owned(),
+ event_id: (*incoming_pdu.event_id).to_owned(),
},
)
.await
@@ -644,11 +655,12 @@ impl Service {
.collect::<Vec<_>>(),
create_event,
room_id,
+ room_version_id,
pub_key_map,
)
.await;
- let mut state: BTreeMap<_, Arc<EventId>> = BTreeMap::new();
+ let mut state: HashMap<_, Arc<EventId>> = HashMap::new();
for (pdu, _) in state_vec {
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Found non-state pdu in state events.")
@@ -660,10 +672,10 @@ impl Service {
)?;
match state.entry(shortstatekey) {
- btree_map::Entry::Vacant(v) => {
+ hash_map::Entry::Vacant(v) => {
v.insert(Arc::from(&*pdu.event_id));
}
- btree_map::Entry::Occupied(_) => return Err(
+ hash_map::Entry::Occupied(_) => return Err(
Error::bad_database("State event's type and state_key combination exists multiple times."),
),
}
@@ -827,8 +839,8 @@ impl Service {
info!("Preparing for stateres to derive new room state");
let mut extremity_sstatehashes = HashMap::new();
- info!("Loading extremities");
- for id in dbg!(&extremities) {
+ info!(?extremities, "Loading extremities");
+ for id in &extremities {
match services().rooms.timeline.get_pdu(id)? {
Some(leaf_pdu) => {
extremity_sstatehashes.insert(
@@ -1024,6 +1036,7 @@ impl Service {
events: &'a [Arc<EventId>],
create_event: &'a PduEvent,
room_id: &'a RoomId,
+ room_version_id: &'a RoomVersionId,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>>
{
@@ -1099,14 +1112,16 @@ impl Service {
.sending
.send_federation_request(
origin,
- get_event::v1::Request { event_id: &next_id },
+ get_event::v1::Request {
+ event_id: (*next_id).to_owned(),
+ },
)
.await
{
Ok(res) => {
info!("Got {} over federation", next_id);
let (calculated_event_id, value) =
- match pdu::gen_event_id_canonical_json(&res.pdu) {
+ match pdu::gen_event_id_canonical_json(&res.pdu, room_version_id) {
Ok(t) => t,
Err(_) => {
back_off((*next_id).to_owned());
@@ -1179,6 +1194,7 @@ impl Service {
origin: &ServerName,
create_event: &PduEvent,
room_id: &RoomId,
+ room_version_id: &RoomVersionId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
initial_set: Vec<Arc<EventId>>,
) -> Result<(
@@ -1204,12 +1220,13 @@ impl Service {
&[prev_event_id.clone()],
create_event,
room_id,
+ room_version_id,
pub_key_map,
)
.await
.pop()
{
- if amount > 100 {
+ if amount > services().globals.max_fetch_prev_events() {
// Max limit reached
warn!("Max prev event limit reached!");
graph.insert(prev_event_id.clone(), HashSet::new());
@@ -1256,7 +1273,6 @@ impl Service {
// This return value is the key used for sorting events,
// events are then sorted by power level, time,
// and lexically by event_id.
- println!("{}", event_id);
Ok((
int!(0),
MilliSecondsSinceUnixEpoch(
@@ -1464,7 +1480,17 @@ impl Service {
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?;
for k in keys.server_keys {
- let k = k.deserialize().unwrap();
+ let k = match k.deserialize() {
+ Ok(key) => key,
+ Err(e) => {
+ warn!(
+ "Received error {} while fetching keys from trusted server {}",
+ e, server
+ );
+ warn!("{}", k.into_json());
+ continue;
+ }
+ };
// TODO: Check signature from trusted server?
servers.remove(&k.server_name);
@@ -1664,7 +1690,7 @@ impl Service {
.send_federation_request(
server,
get_remote_server_keys::v2::Request::new(
- origin,
+ origin.to_owned(),
MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now()
.checked_add(Duration::from_secs(3600))
diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs
index 82c0800..6eef38f 100644
--- a/src/service/rooms/search/data.rs
+++ b/src/service/rooms/search/data.rs
@@ -2,7 +2,7 @@ use crate::Result;
use ruma::RoomId;
pub trait Data: Send + Sync {
- fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()>;
+ fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()>;
fn search_pdus<'a>(
&'a self,
diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs
index f52ea72..96116b0 100644
--- a/src/service/rooms/state/data.rs
+++ b/src/service/rooms/state/data.rs
@@ -22,7 +22,7 @@ pub trait Data: Send + Sync {
fn get_forward_extremities(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>>;
/// Replace the forward extremities of the room.
- fn set_forward_extremities<'a>(
+ fn set_forward_extremities(
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs
index 0e45032..3072b80 100644
--- a/src/service/rooms/state/mod.rs
+++ b/src/service/rooms/state/mod.rs
@@ -343,7 +343,7 @@ impl Service {
self.db.get_forward_extremities(room_id)
}
- pub fn set_forward_extremities<'a>(
+ pub fn set_forward_extremities(
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs
index 340b19c..f3ae3c2 100644
--- a/src/service/rooms/state_accessor/data.rs
+++ b/src/service/rooms/state_accessor/data.rs
@@ -1,7 +1,4 @@
-use std::{
- collections::{BTreeMap, HashMap},
- sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use ruma::{events::StateEventType, EventId, RoomId};
@@ -12,7 +9,7 @@ use crate::{PduEvent, Result};
pub trait Data: Send + Sync {
/// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash.
- async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>>;
+ async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>>;
async fn state_full(
&self,
diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs
index 1a9c4a9..87d9936 100644
--- a/src/service/rooms/state_accessor/mod.rs
+++ b/src/service/rooms/state_accessor/mod.rs
@@ -1,8 +1,5 @@
mod data;
-use std::{
- collections::{BTreeMap, HashMap},
- sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
pub use data::Data;
use ruma::{events::StateEventType, EventId, RoomId};
@@ -16,7 +13,8 @@ pub struct Service {
impl Service {
/// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash.
- pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
+ #[tracing::instrument(skip(self))]
+ pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
self.db.state_full_ids(shortstatehash).await
}
@@ -39,7 +37,6 @@ impl Service {
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
- #[tracing::instrument(skip(self))]
pub fn state_get(
&self,
shortstatehash: u64,
diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs
index 42de56d..d8bb4a4 100644
--- a/src/service/rooms/state_cache/data.rs
+++ b/src/service/rooms/state_cache/data.rs
@@ -37,7 +37,7 @@ pub trait Data: Send + Sync {
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedServerName>> + 'a>;
- fn server_in_room<'a>(&'a self, server: &ServerName, room_id: &RoomId) -> Result<bool>;
+ fn server_in_room(&self, server: &ServerName, room_id: &RoomId) -> Result<bool>;
/// Returns an iterator of all rooms a server participates in (as far as we know).
fn server_rooms<'a>(
diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs
index 6c9bed3..32afdd4 100644
--- a/src/service/rooms/state_cache/mod.rs
+++ b/src/service/rooms/state_cache/mod.rs
@@ -164,7 +164,7 @@ impl Service {
.content
.ignored_users
.iter()
- .any(|user| user == sender)
+ .any(|(user, _details)| user == sender)
});
if is_ignored {
diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs
index 619dca2..cc58e6f 100644
--- a/src/service/rooms/timeline/mod.rs
+++ b/src/service/rooms/timeline/mod.rs
@@ -22,6 +22,7 @@ use ruma::{
},
push::{Action, Ruleset, Tweak},
state_res,
+ state_res::Event,
state_res::RoomVersion,
uint, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
OwnedServerName, RoomAliasId, RoomId, UserId,
@@ -378,7 +379,7 @@ impl Service {
)?;
let server_user = format!("@conduit:{}", services().globals.server_name());
- let to_conduit = body.starts_with(&format!("{}: ", server_user));
+ let to_conduit = body.starts_with(&format!("{server_user}: "));
// This will evaluate to false if the emergency password is set up so that
// the administrator can execute commands as conduit
@@ -683,6 +684,92 @@ impl Service {
let (pdu, pdu_json) =
self.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)?;
+ let admin_room = services().rooms.alias.resolve_local_alias(
+ <&RoomAliasId>::try_from(
+ format!("#admins:{}", services().globals.server_name()).as_str(),
+ )
+ .expect("#admins:server_name is a valid room alias"),
+ )?;
+ if admin_room.filter(|v| v == room_id).is_some() {
+ match pdu.event_type() {
+ RoomEventType::RoomEncryption => {
+ warn!("Encryption is not allowed in the admins room");
+ return Err(Error::BadRequest(
+ ErrorKind::Forbidden,
+ "Encryption is not allowed in the admins room.",
+ ));
+ }
+ RoomEventType::RoomMember => {
+ #[derive(Deserialize)]
+ struct ExtractMembership {
+ membership: MembershipState,
+ }
+
+ let target = pdu
+ .state_key()
+ .filter(|v| v.starts_with("@"))
+ .unwrap_or(sender.as_str());
+ let server_name = services().globals.server_name();
+ let server_user = format!("@conduit:{}", server_name);
+ let content = serde_json::from_str::<ExtractMembership>(pdu.content.get())
+ .map_err(|_| Error::bad_database("Invalid content in pdu."))?;
+
+ if content.membership == MembershipState::Leave {
+ if target == &server_user {
+ warn!("Conduit user cannot leave from admins room");
+ return Err(Error::BadRequest(
+ ErrorKind::Forbidden,
+ "Conduit user cannot leave from admins room.",
+ ));
+ }
+
+ let count = services()
+ .rooms
+ .state_cache
+ .room_members(room_id)
+ .filter_map(|m| m.ok())
+ .filter(|m| m.server_name() == server_name)
+ .filter(|m| m != target)
+ .count();
+ if count < 2 {
+ warn!("Last admin cannot leave from admins room");
+ return Err(Error::BadRequest(
+ ErrorKind::Forbidden,
+ "Last admin cannot leave from admins room.",
+ ));
+ }
+ }
+
+ if content.membership == MembershipState::Ban && pdu.state_key().is_some() {
+ if target == &server_user {
+ warn!("Conduit user cannot be banned in admins room");
+ return Err(Error::BadRequest(
+ ErrorKind::Forbidden,
+ "Conduit user cannot be banned in admins room.",
+ ));
+ }
+
+ let count = services()
+ .rooms
+ .state_cache
+ .room_members(room_id)
+ .filter_map(|m| m.ok())
+ .filter(|m| m.server_name() == server_name)
+ .filter(|m| m != target)
+ .count();
+ if count < 2 {
+ warn!("Last admin cannot be banned in admins room");
+ return Err(Error::BadRequest(
+ ErrorKind::Forbidden,
+ "Last admin cannot be banned in admins room.",
+ ));
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = services().rooms.state.append_to_state(&pdu)?;
diff --git a/src/service/rooms/user/data.rs b/src/service/rooms/user/data.rs
index 43c4c92..4b8a4ec 100644
--- a/src/service/rooms/user/data.rs
+++ b/src/service/rooms/user/data.rs
@@ -8,6 +8,9 @@ pub trait Data: Send + Sync {
fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
+ // Returns the count at which the last reset_notification_counts was called
+ fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
+
fn associate_token_shortstatehash(
&self,
room_id: &RoomId,
diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs
index a765cfd..672e502 100644
--- a/src/service/rooms/user/mod.rs
+++ b/src/service/rooms/user/mod.rs
@@ -22,6 +22,10 @@ impl Service {
self.db.highlight_count(user_id, room_id)
}
+ pub fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
+ self.db.last_notification_read(user_id, room_id)
+ }
+
pub fn associate_token_shortstatehash(
&self,
room_id: &RoomId,
@@ -36,10 +40,10 @@ impl Service {
self.db.get_token_shortstatehash(room_id, token)
}
- pub fn get_shared_rooms<'a>(
- &'a self,
+ pub fn get_shared_rooms(
+ &self,
users: Vec<OwnedUserId>,
- ) -> Result<impl Iterator<Item = Result<OwnedRoomId>> + 'a> {
+ ) -> Result<impl Iterator<Item = Result<OwnedRoomId>>> {
self.db.get_shared_rooms(users)
}
}
diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs
index afa12fc..1861feb 100644
--- a/src/service/sending/mod.rs
+++ b/src/service/sending/mod.rs
@@ -496,7 +496,7 @@ impl Service {
)
})?,
appservice::event::push_events::v1::Request {
- events: &pdu_jsons,
+ events: pdu_jsons,
txn_id: (&*base64::encode_config(
calculate_hash(
&events
@@ -638,9 +638,9 @@ impl Service {
let response = server_server::send_request(
server,
send_transaction_message::v1::Request {
- origin: services().globals.server_name(),
- pdus: &pdu_jsons,
- edus: &edu_jsons,
+ origin: services().globals.server_name().to_owned(),
+ pdus: pdu_jsons,
+ edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: (&*base64::encode_config(
calculate_hash(
diff --git a/src/service/uiaa/mod.rs b/src/service/uiaa/mod.rs
index 672290c..147ce4d 100644
--- a/src/service/uiaa/mod.rs
+++ b/src/service/uiaa/mod.rs
@@ -5,7 +5,7 @@ pub use data::Data;
use ruma::{
api::client::{
error::ErrorKind,
- uiaa::{AuthType, IncomingAuthData, IncomingPassword, IncomingUserIdentifier, UiaaInfo},
+ uiaa::{AuthData, AuthType, Password, UiaaInfo, UserIdentifier},
},
CanonicalJsonValue, DeviceId, UserId,
};
@@ -44,7 +44,7 @@ impl Service {
&self,
user_id: &UserId,
device_id: &DeviceId,
- auth: &IncomingAuthData,
+ auth: &AuthData,
uiaainfo: &UiaaInfo,
) -> Result<(bool, UiaaInfo)> {
let mut uiaainfo = auth
@@ -58,13 +58,13 @@ impl Service {
match auth {
// Find out what the user completed
- IncomingAuthData::Password(IncomingPassword {
+ AuthData::Password(Password {
identifier,
password,
..
}) => {
let username = match identifier {
- IncomingUserIdentifier::UserIdOrLocalpart(username) => username,
+ UserIdentifier::UserIdOrLocalpart(username) => username,
_ => {
return Err(Error::BadRequest(
ErrorKind::Unrecognized,
@@ -85,7 +85,7 @@ impl Service {
argon2::verify_encoded(&hash, password.as_bytes()).unwrap_or(false);
if !hash_matches {
- uiaainfo.auth_error = Some(ruma::api::client::error::ErrorBody {
+ uiaainfo.auth_error = Some(ruma::api::client::error::StandardErrorBody {
kind: ErrorKind::Forbidden,
message: "Invalid username or password.".to_owned(),
});
@@ -96,7 +96,7 @@ impl Service {
// Password was correct! Let's add it to `completed`
uiaainfo.completed.push(AuthType::Password);
}
- IncomingAuthData::Dummy(_) => {
+ AuthData::Dummy(_) => {
uiaainfo.completed.push(AuthType::Dummy);
}
k => error!("type not supported: {:?}", k),
diff --git a/src/service/users/data.rs b/src/service/users/data.rs
index bc1db33..8553210 100644
--- a/src/service/users/data.rs
+++ b/src/service/users/data.rs
@@ -1,6 +1,6 @@
use crate::Result;
use ruma::{
- api::client::{device::Device, filter::IncomingFilterDefinition},
+ api::client::{device::Device, filter::FilterDefinition},
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::AnyToDeviceEvent,
serde::Raw,
@@ -191,11 +191,7 @@ pub trait Data: Send + Sync {
) -> Box<dyn Iterator<Item = Result<Device>> + 'a>;
/// Creates a new sync filter. Returns the filter id.
- fn create_filter(&self, user_id: &UserId, filter: &IncomingFilterDefinition) -> Result<String>;
+ fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String>;
- fn get_filter(
- &self,
- user_id: &UserId,
- filter_id: &str,
- ) -> Result<Option<IncomingFilterDefinition>>;
+ fn get_filter(&self, user_id: &UserId, filter_id: &str) -> Result<Option<FilterDefinition>>;
}
diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs
index 9dcfa8b..6be5c89 100644
--- a/src/service/users/mod.rs
+++ b/src/service/users/mod.rs
@@ -3,7 +3,7 @@ use std::{collections::BTreeMap, mem};
pub use data::Data;
use ruma::{
- api::client::{device::Device, error::ErrorKind, filter::IncomingFilterDefinition},
+ api::client::{device::Device, error::ErrorKind, filter::FilterDefinition},
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::AnyToDeviceEvent,
serde::Raw,
@@ -326,11 +326,7 @@ impl Service {
}
/// Creates a new sync filter. Returns the filter id.
- pub fn create_filter(
- &self,
- user_id: &UserId,
- filter: &IncomingFilterDefinition,
- ) -> Result<String> {
+ pub fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String> {
self.db.create_filter(user_id, filter)
}
@@ -338,7 +334,7 @@ impl Service {
&self,
user_id: &UserId,
filter_id: &str,
- ) -> Result<Option<IncomingFilterDefinition>> {
+ ) -> Result<Option<FilterDefinition>> {
self.db.get_filter(user_id, filter_id)
}
}
diff --git a/src/utils/error.rs b/src/utils/error.rs
index 9c8617f..4f044ca 100644
--- a/src/utils/error.rs
+++ b/src/utils/error.rs
@@ -3,7 +3,7 @@ use std::convert::Infallible;
use http::StatusCode;
use ruma::{
api::client::{
- error::{Error as RumaError, ErrorKind},
+ error::{Error as RumaError, ErrorBody, ErrorKind},
uiaa::{UiaaInfo, UiaaResponse},
},
OwnedServerName,
@@ -102,11 +102,14 @@ impl Error {
if let Self::FederationError(origin, error) = self {
let mut error = error.clone();
- error.message = format!("Answer from {}: {}", origin, error.message);
+ error.body = ErrorBody::Standard {
+ kind: Unknown,
+ message: format!("Answer from {origin}: {error}"),
+ };
return RumaResponse(UiaaResponse::MatrixError(error));
}
- let message = format!("{}", self);
+ let message = format!("{self}");
use ErrorKind::*;
let (kind, status_code) = match self {
@@ -131,8 +134,7 @@ impl Error {
warn!("{}: {}", status_code, message);
RumaResponse(UiaaResponse::MatrixError(RumaError {
- kind,
- message,
+ body: ErrorBody::Standard { kind, message },
status_code,
}))
}