summaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2022-10-05 12:45:54 +0200
committerNyaaori <+@nyaaori.cat>2022-10-10 14:02:00 +0200
commitface766e0f32481fd97a435f1ed8579d8cfc634c (patch)
tree6985fdee252323a858b6ff29b825d5b48baef61a /src/api
parent8708cd3b633d88d260982563f2e2826bc8b12038 (diff)
downloadconduit-face766e0f32481fd97a435f1ed8579d8cfc634c.zip
messing with trait objects
Diffstat (limited to 'src/api')
-rw-r--r--src/api/client_server/membership.rs6
-rw-r--r--src/api/client_server/room.rs4
-rw-r--r--src/api/client_server/sync.rs2
-rw-r--r--src/api/ruma_wrapper/axum.rs2
-rw-r--r--src/api/server_server.rs175
5 files changed, 13 insertions, 176 deletions
diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs
index 98931f2..720c1e6 100644
--- a/src/api/client_server/membership.rs
+++ b/src/api/client_server/membership.rs
@@ -481,7 +481,7 @@ async fn join_room_by_id_helper(
let (make_join_response, remote_server) = make_join_response_and_server?;
let room_version = match make_join_response.room_version {
- Some(room_version) if services().rooms.metadata.is_supported_version(&room_version) => room_version,
+ Some(room_version) if services().globals.supported_room_versions().contains(&room_version) => room_version,
_ => return Err(Error::BadServerResponse("Room version is not supported")),
};
@@ -568,7 +568,7 @@ async fn join_room_by_id_helper(
let mut state = HashMap::new();
let pub_key_map = RwLock::new(BTreeMap::new());
- server_server::fetch_join_signing_keys(
+ services().rooms.event_handler.fetch_join_signing_keys(
&send_join_response,
&room_version,
&pub_key_map,
@@ -1048,7 +1048,7 @@ async fn remote_leave_room(
let (make_leave_response, remote_server) = make_leave_response_and_server?;
let room_version_id = match make_leave_response.room_version {
- Some(version) if services().rooms.is_supported_version(&version) => version,
+ Some(version) if services().globals.supported_room_versions().contains(&version) => version,
_ => return Err(Error::BadServerResponse("Room version is not supported")),
};
diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs
index a7fa952..939fbaa 100644
--- a/src/api/client_server/room.rs
+++ b/src/api/client_server/room.rs
@@ -99,7 +99,7 @@ pub async fn create_room_route(
let room_version = match body.room_version.clone() {
Some(room_version) => {
- if services().rooms.is_supported_version(&services(), &room_version) {
+ if services().globals.supported_room_versions().contains(&room_version) {
room_version
} else {
return Err(Error::BadRequest(
@@ -470,7 +470,7 @@ pub async fn upgrade_room_route(
) -> Result<upgrade_room::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
- if !services().rooms.is_supported_version(&body.new_version) {
+ if !services().globals.supported_room_versions().contains(&body.new_version) {
return Err(Error::BadRequest(
ErrorKind::UnsupportedRoomVersion,
"This server does not support that room version.",
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index e38ea60..3489a9a 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -175,7 +175,7 @@ async fn sync_helper(
services().rooms.edus.presence.ping_presence(&sender_user)?;
// Setup watchers, so if there's no response, we can wait for them
- let watcher = services().watch(&sender_user, &sender_device);
+ let watcher = services().globals.db.watch(&sender_user, &sender_device);
let next_batch = services().globals.current_count()?;
let next_batch_string = next_batch.to_string();
diff --git a/src/api/ruma_wrapper/axum.rs b/src/api/ruma_wrapper/axum.rs
index babf2a7..d926b89 100644
--- a/src/api/ruma_wrapper/axum.rs
+++ b/src/api/ruma_wrapper/axum.rs
@@ -197,7 +197,7 @@ where
request_map.insert("content".to_owned(), json_body.clone());
};
- let keys_result = server_server::fetch_signing_keys(
+ let keys_result = services().rooms.event_handler.fetch_signing_keys(
&x_matrix.origin,
vec![x_matrix.key.to_owned()],
)
diff --git a/src/api/server_server.rs b/src/api/server_server.rs
index 9aa2beb..45d749d 100644
--- a/src/api/server_server.rs
+++ b/src/api/server_server.rs
@@ -664,7 +664,7 @@ pub async fn send_transaction_message_route(
Some(id) => id,
None => {
// Event is invalid
- resolved_map.insert(event_id, Err("Event needs a valid RoomId.".to_owned()));
+ resolved_map.insert(event_id, Err(Error::bad_database("Event needs a valid RoomId.")));
continue;
}
};
@@ -707,7 +707,7 @@ pub async fn send_transaction_message_route(
for pdu in &resolved_map {
if let Err(e) = pdu.1 {
- if e != "Room is unknown to this server." {
+ if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {:?}", pdu);
}
}
@@ -854,170 +854,7 @@ pub async fn send_transaction_message_route(
}
}
- Ok(send_transaction_message::v1::Response { pdus: resolved_map })
-}
-
-/// Search the DB for the signing keys of the given server, if we don't have them
-/// fetch them from the server and save to our DB.
-#[tracing::instrument(skip_all)]
-pub(crate) async fn fetch_signing_keys(
- origin: &ServerName,
- signature_ids: Vec<String>,
-) -> Result<BTreeMap<String, Base64>> {
- let contains_all_ids =
- |keys: &BTreeMap<String, Base64>| signature_ids.iter().all(|id| keys.contains_key(id));
-
- let permit = services()
- .globals
- .servername_ratelimiter
- .read()
- .unwrap()
- .get(origin)
- .map(|s| Arc::clone(s).acquire_owned());
-
- let permit = match permit {
- Some(p) => p,
- None => {
- let mut write = services().globals.servername_ratelimiter.write().unwrap();
- let s = Arc::clone(
- write
- .entry(origin.to_owned())
- .or_insert_with(|| Arc::new(Semaphore::new(1))),
- );
-
- s.acquire_owned()
- }
- }
- .await;
-
- let back_off = |id| match services()
- .globals
- .bad_signature_ratelimiter
- .write()
- .unwrap()
- .entry(id)
- {
- hash_map::Entry::Vacant(e) => {
- e.insert((Instant::now(), 1));
- }
- hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1),
- };
-
- if let Some((time, tries)) = services()
- .globals
- .bad_signature_ratelimiter
- .read()
- .unwrap()
- .get(&signature_ids)
- {
- // Exponential backoff
- let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
- if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
- min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
- }
-
- if time.elapsed() < min_elapsed_duration {
- debug!("Backing off from {:?}", signature_ids);
- return Err(Error::BadServerResponse("bad signature, still backing off"));
- }
- }
-
- trace!("Loading signing keys for {}", origin);
-
- let mut result: BTreeMap<_, _> = services()
- .globals
- .signing_keys_for(origin)?
- .into_iter()
- .map(|(k, v)| (k.to_string(), v.key))
- .collect();
-
- if contains_all_ids(&result) {
- return Ok(result);
- }
-
- debug!("Fetching signing keys for {} over federation", origin);
-
- if let Some(server_key) = services()
- .sending
- .send_federation_request(origin, get_server_keys::v2::Request::new())
- .await
- .ok()
- .and_then(|resp| resp.server_key.deserialize().ok())
- {
- services().globals.add_signing_key(origin, server_key.clone())?;
-
- result.extend(
- server_key
- .verify_keys
- .into_iter()
- .map(|(k, v)| (k.to_string(), v.key)),
- );
- result.extend(
- server_key
- .old_verify_keys
- .into_iter()
- .map(|(k, v)| (k.to_string(), v.key)),
- );
-
- if contains_all_ids(&result) {
- return Ok(result);
- }
- }
-
- for server in services().globals.trusted_servers() {
- debug!("Asking {} for {}'s signing key", server, origin);
- if let Some(server_keys) = services()
- .sending
- .send_federation_request(
- server,
- get_remote_server_keys::v2::Request::new(
- origin,
- MilliSecondsSinceUnixEpoch::from_system_time(
- SystemTime::now()
- .checked_add(Duration::from_secs(3600))
- .expect("SystemTime to large"),
- )
- .expect("time is valid"),
- ),
- )
- .await
- .ok()
- .map(|resp| {
- resp.server_keys
- .into_iter()
- .filter_map(|e| e.deserialize().ok())
- .collect::<Vec<_>>()
- })
- {
- trace!("Got signing keys: {:?}", server_keys);
- for k in server_keys {
- services().globals.add_signing_key(origin, k.clone())?;
- result.extend(
- k.verify_keys
- .into_iter()
- .map(|(k, v)| (k.to_string(), v.key)),
- );
- result.extend(
- k.old_verify_keys
- .into_iter()
- .map(|(k, v)| (k.to_string(), v.key)),
- );
- }
-
- if contains_all_ids(&result) {
- return Ok(result);
- }
- }
- }
-
- drop(permit);
-
- back_off(signature_ids);
-
- warn!("Failed to find public key for server: {}", origin);
- Err(Error::BadServerResponse(
- "Failed to find public key for server",
- ))
+ Ok(send_transaction_message::v1::Response { pdus: resolved_map.into_iter().map(|(e, r)| (e, r.map_err(|e| e.to_string()))).collect() })
}
#[tracing::instrument(skip(starting_events))]
@@ -1050,7 +887,7 @@ pub(crate) async fn get_auth_chain<'a>(
}
let chunk_key: Vec<u64> = chunk.iter().map(|(short, _)| short).copied().collect();
- if let Some(cached) = services().rooms.auth_chain.get_auth_chain_from_cache(&chunk_key)? {
+ if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&chunk_key)? {
hits += 1;
full_auth_chain.extend(cached.iter().copied());
continue;
@@ -1062,7 +899,7 @@ pub(crate) async fn get_auth_chain<'a>(
let mut misses2 = 0;
let mut i = 0;
for (sevent_id, event_id) in chunk {
- if let Some(cached) = services().rooms.auth_chain.get_auth_chain_from_cache(&[sevent_id])? {
+ if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&[sevent_id])? {
hits2 += 1;
chunk_cache.extend(cached.iter().copied());
} else {
@@ -1689,7 +1526,7 @@ pub async fn create_invite_route(
services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
- if !services().rooms.is_supported_version(&body.room_version) {
+ if !services().globals.supported_room_versions().contains(&body.room_version) {
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion {
room_version: body.room_version.clone(),