diff options
author | Timo Kösters <timo@koesters.xyz> | 2022-10-05 18:36:12 +0200 |
---|---|---|
committer | Nyaaori <+@nyaaori.cat> | 2022-10-10 14:02:00 +0200 |
commit | 44fe6d1554eaa0a15314686974ab01f48c836588 (patch) | |
tree | 742d3e844c32acc6fa1c6616d0ff440aff8a6e6c /src/api/server_server.rs | |
parent | cff52d7ebb5066f3d8e513488b84a431c0093e65 (diff) | |
download | conduit-44fe6d1554eaa0a15314686974ab01f48c836588.zip |
127 errors left
Diffstat (limited to 'src/api/server_server.rs')
-rw-r--r-- | src/api/server_server.rs | 133 |
1 files changed, 4 insertions, 129 deletions
diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 647f457..11f7ec3 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -857,131 +857,6 @@ pub async fn send_transaction_message_route( Ok(send_transaction_message::v1::Response { pdus: resolved_map.into_iter().map(|(e, r)| (e, r.map_err(|e| e.to_string()))).collect() }) } -#[tracing::instrument(skip(starting_events))] -pub(crate) async fn get_auth_chain<'a>( - room_id: &RoomId, - starting_events: Vec<Arc<EventId>>, -) -> Result<impl Iterator<Item = Arc<EventId>> + 'a> { - const NUM_BUCKETS: usize = 50; - - let mut buckets = vec![BTreeSet::new(); NUM_BUCKETS]; - - let mut i = 0; - for id in starting_events { - let short = services().rooms.short.get_or_create_shorteventid(&id)?; - let bucket_id = (short % NUM_BUCKETS as u64) as usize; - buckets[bucket_id].insert((short, id.clone())); - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } - - let mut full_auth_chain = HashSet::new(); - - let mut hits = 0; - let mut misses = 0; - for chunk in buckets { - if chunk.is_empty() { - continue; - } - - let chunk_key: Vec<u64> = chunk.iter().map(|(short, _)| short).copied().collect(); - if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&chunk_key)? { - hits += 1; - full_auth_chain.extend(cached.iter().copied()); - continue; - } - misses += 1; - - let mut chunk_cache = HashSet::new(); - let mut hits2 = 0; - let mut misses2 = 0; - let mut i = 0; - for (sevent_id, event_id) in chunk { - if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&[sevent_id])? { - hits2 += 1; - chunk_cache.extend(cached.iter().copied()); - } else { - misses2 += 1; - let auth_chain = Arc::new(get_auth_chain_inner(room_id, &event_id)?); - services().rooms - .auth_chain - .cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?; - println!( - "cache missed event {} with auth chain len {}", - event_id, - auth_chain.len() - ); - chunk_cache.extend(auth_chain.iter()); - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - }; - } - println!( - "chunk missed with len {}, event hits2: {}, misses2: {}", - chunk_cache.len(), - hits2, - misses2 - ); - let chunk_cache = Arc::new(chunk_cache); - services().rooms - .auth_chain.cache_auth_chain(chunk_key, Arc::clone(&chunk_cache))?; - full_auth_chain.extend(chunk_cache.iter()); - } - - println!( - "total: {}, chunk hits: {}, misses: {}", - full_auth_chain.len(), - hits, - misses - ); - - Ok(full_auth_chain - .into_iter() - .filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok())) -} - -#[tracing::instrument(skip(event_id))] -fn get_auth_chain_inner( - room_id: &RoomId, - event_id: &EventId, -) -> Result<HashSet<u64>> { - let mut todo = vec![Arc::from(event_id)]; - let mut found = HashSet::new(); - - while let Some(event_id) = todo.pop() { - match services().rooms.timeline.get_pdu(&event_id) { - Ok(Some(pdu)) => { - if pdu.room_id != room_id { - return Err(Error::BadRequest(ErrorKind::Forbidden, "Evil event in db")); - } - for auth_event in &pdu.auth_events { - let sauthevent = services() - .rooms.short - .get_or_create_shorteventid(auth_event)?; - - if !found.contains(&sauthevent) { - found.insert(sauthevent); - todo.push(auth_event.clone()); - } - } - } - Ok(None) => { - warn!("Could not find pdu mentioned in auth events: {}", event_id); - } - Err(e) => { - warn!("Could not load event in auth chain: {} {}", event_id, e); - } - } - } - - Ok(found) -} - /// # `GET /_matrix/federation/v1/event/{eventId}` /// /// Retrieves a single event from the server. @@ -1135,7 +1010,7 @@ pub async fn get_event_authorization_route( let room_id = <&RoomId>::try_from(room_id_str) .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - let auth_chain_ids = get_auth_chain(room_id, vec![Arc::from(&*body.event_id)]).await?; + let auth_chain_ids = services().rooms.auth_chain.get_auth_chain(room_id, vec![Arc::from(&*body.event_id)]).await?; Ok(get_event_authorization::v1::Response { auth_chain: auth_chain_ids @@ -1190,7 +1065,7 @@ pub async fn get_room_state_route( .collect(); let auth_chain_ids = - get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?; + services().rooms.auth_chain.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?; Ok(get_room_state::v1::Response { auth_chain: auth_chain_ids @@ -1246,7 +1121,7 @@ pub async fn get_room_state_ids_route( .collect(); let auth_chain_ids = - get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?; + services().rooms.auth_chain.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)]).await?; Ok(get_room_state_ids::v1::Response { auth_chain_ids: auth_chain_ids.map(|id| (*id).to_owned()).collect(), @@ -1449,7 +1324,7 @@ async fn create_join_event( drop(mutex_lock); let state_ids = services().rooms.state_accessor.state_full_ids(shortstatehash).await?; - let auth_chain_ids = get_auth_chain( + let auth_chain_ids = services().rooms.auth_chain.get_auth_chain( room_id, state_ids.iter().map(|(_, id)| id.clone()).collect(), ) |