diff options
author | Timo Kösters <timo@koesters.xyz> | 2022-10-05 12:45:54 +0200 |
---|---|---|
committer | Nyaaori <+@nyaaori.cat> | 2022-10-10 14:02:00 +0200 |
commit | face766e0f32481fd97a435f1ed8579d8cfc634c (patch) | |
tree | 6985fdee252323a858b6ff29b825d5b48baef61a /src/api | |
parent | 8708cd3b633d88d260982563f2e2826bc8b12038 (diff) | |
download | conduit-face766e0f32481fd97a435f1ed8579d8cfc634c.zip |
messing with trait objects
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/client_server/membership.rs | 6 | ||||
-rw-r--r-- | src/api/client_server/room.rs | 4 | ||||
-rw-r--r-- | src/api/client_server/sync.rs | 2 | ||||
-rw-r--r-- | src/api/ruma_wrapper/axum.rs | 2 | ||||
-rw-r--r-- | src/api/server_server.rs | 175 |
5 files changed, 13 insertions, 176 deletions
diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 98931f2..720c1e6 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -481,7 +481,7 @@ async fn join_room_by_id_helper( let (make_join_response, remote_server) = make_join_response_and_server?; let room_version = match make_join_response.room_version { - Some(room_version) if services().rooms.metadata.is_supported_version(&room_version) => room_version, + Some(room_version) if services().globals.supported_room_versions().contains(&room_version) => room_version, _ => return Err(Error::BadServerResponse("Room version is not supported")), }; @@ -568,7 +568,7 @@ async fn join_room_by_id_helper( let mut state = HashMap::new(); let pub_key_map = RwLock::new(BTreeMap::new()); - server_server::fetch_join_signing_keys( + services().rooms.event_handler.fetch_join_signing_keys( &send_join_response, &room_version, &pub_key_map, @@ -1048,7 +1048,7 @@ async fn remote_leave_room( let (make_leave_response, remote_server) = make_leave_response_and_server?; let room_version_id = match make_leave_response.room_version { - Some(version) if services().rooms.is_supported_version(&version) => version, + Some(version) if services().globals.supported_room_versions().contains(&version) => version, _ => return Err(Error::BadServerResponse("Room version is not supported")), }; diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index a7fa952..939fbaa 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -99,7 +99,7 @@ pub async fn create_room_route( let room_version = match body.room_version.clone() { Some(room_version) => { - if services().rooms.is_supported_version(&services(), &room_version) { + if services().globals.supported_room_versions().contains(&room_version) { room_version } else { return Err(Error::BadRequest( @@ -470,7 +470,7 @@ pub async fn upgrade_room_route( ) -> Result<upgrade_room::v3::Response> { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if !services().rooms.is_supported_version(&body.new_version) { + if !services().globals.supported_room_versions().contains(&body.new_version) { return Err(Error::BadRequest( ErrorKind::UnsupportedRoomVersion, "This server does not support that room version.", diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index e38ea60..3489a9a 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -175,7 +175,7 @@ async fn sync_helper( services().rooms.edus.presence.ping_presence(&sender_user)?; // Setup watchers, so if there's no response, we can wait for them - let watcher = services().watch(&sender_user, &sender_device); + let watcher = services().globals.db.watch(&sender_user, &sender_device); let next_batch = services().globals.current_count()?; let next_batch_string = next_batch.to_string(); diff --git a/src/api/ruma_wrapper/axum.rs b/src/api/ruma_wrapper/axum.rs index babf2a7..d926b89 100644 --- a/src/api/ruma_wrapper/axum.rs +++ b/src/api/ruma_wrapper/axum.rs @@ -197,7 +197,7 @@ where request_map.insert("content".to_owned(), json_body.clone()); }; - let keys_result = server_server::fetch_signing_keys( + let keys_result = services().rooms.event_handler.fetch_signing_keys( &x_matrix.origin, vec![x_matrix.key.to_owned()], ) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 9aa2beb..45d749d 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -664,7 +664,7 @@ pub async fn send_transaction_message_route( Some(id) => id, None => { // Event is invalid - resolved_map.insert(event_id, Err("Event needs a valid RoomId.".to_owned())); + resolved_map.insert(event_id, Err(Error::bad_database("Event needs a valid RoomId."))); continue; } }; @@ -707,7 +707,7 @@ pub async fn send_transaction_message_route( for pdu in &resolved_map { if let Err(e) = pdu.1 { - if e != "Room is unknown to this server." { + if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) { warn!("Incoming PDU failed {:?}", pdu); } } @@ -854,170 +854,7 @@ pub async fn send_transaction_message_route( } } - Ok(send_transaction_message::v1::Response { pdus: resolved_map }) -} - -/// Search the DB for the signing keys of the given server, if we don't have them -/// fetch them from the server and save to our DB. -#[tracing::instrument(skip_all)] -pub(crate) async fn fetch_signing_keys( - origin: &ServerName, - signature_ids: Vec<String>, -) -> Result<BTreeMap<String, Base64>> { - let contains_all_ids = - |keys: &BTreeMap<String, Base64>| signature_ids.iter().all(|id| keys.contains_key(id)); - - let permit = services() - .globals - .servername_ratelimiter - .read() - .unwrap() - .get(origin) - .map(|s| Arc::clone(s).acquire_owned()); - - let permit = match permit { - Some(p) => p, - None => { - let mut write = services().globals.servername_ratelimiter.write().unwrap(); - let s = Arc::clone( - write - .entry(origin.to_owned()) - .or_insert_with(|| Arc::new(Semaphore::new(1))), - ); - - s.acquire_owned() - } - } - .await; - - let back_off = |id| match services() - .globals - .bad_signature_ratelimiter - .write() - .unwrap() - .entry(id) - { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - } - hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), - }; - - if let Some((time, tries)) = services() - .globals - .bad_signature_ratelimiter - .read() - .unwrap() - .get(&signature_ids) - { - // Exponential backoff - let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - debug!("Backing off from {:?}", signature_ids); - return Err(Error::BadServerResponse("bad signature, still backing off")); - } - } - - trace!("Loading signing keys for {}", origin); - - let mut result: BTreeMap<_, _> = services() - .globals - .signing_keys_for(origin)? - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect(); - - if contains_all_ids(&result) { - return Ok(result); - } - - debug!("Fetching signing keys for {} over federation", origin); - - if let Some(server_key) = services() - .sending - .send_federation_request(origin, get_server_keys::v2::Request::new()) - .await - .ok() - .and_then(|resp| resp.server_key.deserialize().ok()) - { - services().globals.add_signing_key(origin, server_key.clone())?; - - result.extend( - server_key - .verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - result.extend( - server_key - .old_verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - - if contains_all_ids(&result) { - return Ok(result); - } - } - - for server in services().globals.trusted_servers() { - debug!("Asking {} for {}'s signing key", server, origin); - if let Some(server_keys) = services() - .sending - .send_federation_request( - server, - get_remote_server_keys::v2::Request::new( - origin, - MilliSecondsSinceUnixEpoch::from_system_time( - SystemTime::now() - .checked_add(Duration::from_secs(3600)) - .expect("SystemTime to large"), - ) - .expect("time is valid"), - ), - ) - .await - .ok() - .map(|resp| { - resp.server_keys - .into_iter() - .filter_map(|e| e.deserialize().ok()) - .collect::<Vec<_>>() - }) - { - trace!("Got signing keys: {:?}", server_keys); - for k in server_keys { - services().globals.add_signing_key(origin, k.clone())?; - result.extend( - k.verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - result.extend( - k.old_verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - } - - if contains_all_ids(&result) { - return Ok(result); - } - } - } - - drop(permit); - - back_off(signature_ids); - - warn!("Failed to find public key for server: {}", origin); - Err(Error::BadServerResponse( - "Failed to find public key for server", - )) + Ok(send_transaction_message::v1::Response { pdus: resolved_map.into_iter().map(|(e, r)| (e, r.map_err(|e| e.to_string()))).collect() }) } #[tracing::instrument(skip(starting_events))] @@ -1050,7 +887,7 @@ pub(crate) async fn get_auth_chain<'a>( } let chunk_key: Vec<u64> = chunk.iter().map(|(short, _)| short).copied().collect(); - if let Some(cached) = services().rooms.auth_chain.get_auth_chain_from_cache(&chunk_key)? { + if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&chunk_key)? { hits += 1; full_auth_chain.extend(cached.iter().copied()); continue; @@ -1062,7 +899,7 @@ pub(crate) async fn get_auth_chain<'a>( let mut misses2 = 0; let mut i = 0; for (sevent_id, event_id) in chunk { - if let Some(cached) = services().rooms.auth_chain.get_auth_chain_from_cache(&[sevent_id])? { + if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&[sevent_id])? { hits2 += 1; chunk_cache.extend(cached.iter().copied()); } else { @@ -1689,7 +1526,7 @@ pub async fn create_invite_route( services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; - if !services().rooms.is_supported_version(&body.room_version) { + if !services().globals.supported_room_versions().contains(&body.room_version) { return Err(Error::BadRequest( ErrorKind::IncompatibleRoomVersion { room_version: body.room_version.clone(), |