summaryrefslogtreecommitdiff
path: root/src/api/server_server.rs
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2022-10-05 18:36:12 +0200
committerNyaaori <+@nyaaori.cat>2022-10-10 14:02:00 +0200
commit44fe6d1554eaa0a15314686974ab01f48c836588 (patch)
tree742d3e844c32acc6fa1c6616d0ff440aff8a6e6c /src/api/server_server.rs
parentcff52d7ebb5066f3d8e513488b84a431c0093e65 (diff)
downloadconduit-44fe6d1554eaa0a15314686974ab01f48c836588.zip
127 errors left
Diffstat (limited to 'src/api/server_server.rs')
-rw-r--r--src/api/server_server.rs133
1 files changed, 4 insertions, 129 deletions
diff --git a/src/api/server_server.rs b/src/api/server_server.rs
index 647f457..11f7ec3 100644
--- a/src/api/server_server.rs
+++ b/src/api/server_server.rs
@@ -857,131 +857,6 @@ pub async fn send_transaction_message_route(
Ok(send_transaction_message::v1::Response { pdus: resolved_map.into_iter().map(|(e, r)| (e, r.map_err(|e| e.to_string()))).collect() })
}
-#[tracing::instrument(skip(starting_events))]
-pub(crate) async fn get_auth_chain<'a>(
- room_id: &RoomId,
- starting_events: Vec<Arc<EventId>>,
-) -> Result<impl Iterator<Item = Arc<EventId>> + 'a> {
- const NUM_BUCKETS: usize = 50;
-
- let mut buckets = vec![BTreeSet::new(); NUM_BUCKETS];
-
- let mut i = 0;
- for id in starting_events {
- let short = services().rooms.short.get_or_create_shorteventid(&id)?;
- let bucket_id = (short % NUM_BUCKETS as u64) as usize;
- buckets[bucket_id].insert((short, id.clone()));
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
- }
- }
-
- let mut full_auth_chain = HashSet::new();
-
- let mut hits = 0;
- let mut misses = 0;
- for chunk in buckets {
- if chunk.is_empty() {
- continue;
- }
-
- let chunk_key: Vec<u64> = chunk.iter().map(|(short, _)| short).copied().collect();
- if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&chunk_key)? {
- hits += 1;
- full_auth_chain.extend(cached.iter().copied());
- continue;
- }
- misses += 1;
-
- let mut chunk_cache = HashSet::new();
- let mut hits2 = 0;
- let mut misses2 = 0;
- let mut i = 0;
- for (sevent_id, event_id) in chunk {
- if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&[sevent_id])? {
- hits2 += 1;
- chunk_cache.extend(cached.iter().copied());
- } else {
- misses2 += 1;
- let auth_chain = Arc::new(get_auth_chain_inner(room_id, &event_id)?);
- services().rooms
- .auth_chain
- .cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?;
- println!(
- "cache missed event {} with auth chain len {}",
- event_id,
- auth_chain.len()
- );
- chunk_cache.extend(auth_chain.iter());
-
- i += 1;
- if i % 100 == 0 {
- tokio::task::yield_now().await;
- }
- };
- }
- println!(
- "chunk missed with len {}, event hits2: {}, misses2: {}",
- chunk_cache.len(),
- hits2,
- misses2
- );
- let chunk_cache = Arc::new(chunk_cache);
- services().rooms
- .auth_chain.cache_auth_chain(chunk_key, Arc::clone(&chunk_cache))?;
- full_auth_chain.extend(chunk_cache.iter());
- }
-
- println!(
- "total: {}, chunk hits: {}, misses: {}",
- full_auth_chain.len(),
- hits,
- misses
- );
-
- Ok(full_auth_chain
- .into_iter()
- .filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok()))
-}
-
-#[tracing::instrument(skip(event_id))]
-fn get_auth_chain_inner(
- room_id: &RoomId,
- event_id: &EventId,
-) -> Result<HashSet<u64>> {
- let mut todo = vec![Arc::from(event_id)];
- let mut found = HashSet::new();
-
- while let Some(event_id) = todo.pop() {
- match services().rooms.timeline.get_pdu(&event_id) {
- Ok(Some(pdu)) => {
- if pdu.room_id != room_id {
- return Err(Error::BadRequest(ErrorKind::Forbidden, "Evil event in db"));
- }
- for auth_event in &pdu.auth_events {
- let sauthevent = services()
- .rooms.short
- .get_or_create_shorteventid(auth_event)?;
-
- if !found.contains(&sauthevent) {
- found.insert(sauthevent);
- todo.push(auth_event.clone());
- }
- }
- }
- Ok(None) => {
- warn!("Could not find pdu mentioned in auth events: {}", event_id);
- }
- Err(e) => {
- warn!("Could not load event in auth chain: {} {}", event_id, e);
- }
- }
- }
-
- Ok(found)
-}
-
/// # `GET /_matrix/federation/v1/event/{eventId}`
///
/// Retrieves a single event from the server.
@@ -1135,7 +1010,7 @@ pub async fn get_event_authorization_route(
let room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
- let auth_chain_ids = get_auth_chain(room_id, vec![Arc::from(&*body.event_id)]).await?;
+ let auth_chain_ids = services().rooms.auth_chain.get_auth_chain(room_id, vec![Arc::from(&*body.event_id)]).await?;
Ok(get_event_authorization::v1::Response {
auth_chain: auth_chain_ids
@@ -1190,7 +1065,7 @@ pub async fn get_room_state_route(
.collect();
let auth_chain_ids =
- get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?;
+ services().rooms.auth_chain.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?;
Ok(get_room_state::v1::Response {
auth_chain: auth_chain_ids
@@ -1246,7 +1121,7 @@ pub async fn get_room_state_ids_route(
.collect();
let auth_chain_ids =
- get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?;
+ services().rooms.auth_chain.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?;
Ok(get_room_state_ids::v1::Response {
auth_chain_ids: auth_chain_ids.map(|id| (*id).to_owned()).collect(),
@@ -1449,7 +1324,7 @@ async fn create_join_event(
drop(mutex_lock);
let state_ids = services().rooms.state_accessor.state_full_ids(shortstatehash).await?;
- let auth_chain_ids = get_auth_chain(
+ let auth_chain_ids = services().rooms.auth_chain.get_auth_chain(
room_id,
state_ids.iter().map(|(_, id)| id.clone()).collect(),
)