summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-07-31 16:18:23 +0200
committerTimo Kösters <timo@koesters.xyz>2023-07-31 16:18:23 +0200
commitacfe381dd3064512272f8f47ea4dd388c04f1c39 (patch)
treef6723660a778885fedfa0f3cfca2b2337b212056
parentafd8112e25a86918c7f9ac657523698b2e0315f4 (diff)
downloadconduit-acfe381dd3064512272f8f47ea4dd388c04f1c39.zip
fix: threads get updated properly
Workaround for element web while waiting for https://github.com/matrix-org/matrix-js-sdk/pull/3635
-rw-r--r--src/api/client_server/membership.rs2
-rw-r--r--src/api/client_server/room.rs3
-rw-r--r--src/api/client_server/sync.rs3
-rw-r--r--src/api/server_server.rs4
-rw-r--r--src/database/key_value/rooms/timeline.rs2
-rw-r--r--src/main.rs6
-rw-r--r--src/service/pdu.rs13
-rw-r--r--src/service/rooms/event_handler/mod.rs42
-rw-r--r--src/utils/error.rs4
9 files changed, 50 insertions, 29 deletions
diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs
index c9357b2..4a1f374 100644
--- a/src/api/client_server/membership.rs
+++ b/src/api/client_server/membership.rs
@@ -674,7 +674,7 @@ async fn join_room_by_id_helper(
};
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
- warn!("{:?}: {}", value, e);
+ warn!("Invalid PDU in send_join response: {} {:?}", e, value);
Error::BadServerResponse("Invalid PDU in send_join response.")
})?;
diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs
index 56bdf03..420dd50 100644
--- a/src/api/client_server/room.rs
+++ b/src/api/client_server/room.rs
@@ -445,6 +445,9 @@ pub async fn get_room_event_route(
));
}
+ let mut event = (*event).clone();
+ event.add_age()?;
+
Ok(get_room_event::v3::Response {
event: event.to_room_event(),
})
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index 527625a..7c6002e 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -20,8 +20,9 @@ use ruma::{
StateEventType, TimelineEventType,
},
serde::Raw,
- uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
+ uint, DeviceId, OwnedDeviceId, OwnedEventId, OwnedUserId, RoomId, UInt, UserId,
};
+use serde::Deserialize;
use std::{
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
diff --git a/src/api/server_server.rs b/src/api/server_server.rs
index ca5b69d..2220c4d 100644
--- a/src/api/server_server.rs
+++ b/src/api/server_server.rs
@@ -813,7 +813,7 @@ pub async fn send_transaction_message_route(
.readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
- info!("No known event ids in read receipt: {:?}", user_updates);
+ debug!("No known event ids in read receipt: {:?}", user_updates);
}
}
}
@@ -1011,7 +1011,7 @@ pub async fn get_backfill_route(
.as_ref()
.expect("server is authenticated");
- info!("Got backfill request from: {}", sender_servername);
+ debug!("Got backfill request from: {}", sender_servername);
if !services()
.rooms
diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs
index 74e3e5c..5ce2136 100644
--- a/src/database/key_value/rooms/timeline.rs
+++ b/src/database/key_value/rooms/timeline.rs
@@ -246,6 +246,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
+ pdu.add_age()?;
let count = pdu_count(&pdu_id)?;
Ok((count, pdu))
}),
@@ -272,6 +273,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
+ pdu.add_age()?;
let count = pdu_count(&pdu_id)?;
Ok((count, pdu))
}),
diff --git a/src/main.rs b/src/main.rs
index 9b7528c..1975038 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -85,6 +85,8 @@ async fn main() {
config.warn_deprecated();
+ let log = format!("{},ruma_state_res=error,_=off,sled=off", config.log);
+
if config.allow_jaeger {
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_agent_pipeline()
@@ -94,7 +96,7 @@ async fn main() {
.unwrap();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
- let filter_layer = match EnvFilter::try_new(&config.log) {
+ let filter_layer = match EnvFilter::try_new(&log) {
Ok(s) => s,
Err(e) => {
eprintln!(
@@ -121,7 +123,7 @@ async fn main() {
} else {
let registry = tracing_subscriber::Registry::default();
let fmt_layer = tracing_subscriber::fmt::Layer::new();
- let filter_layer = match EnvFilter::try_new(&config.log) {
+ let filter_layer = match EnvFilter::try_new(&log) {
Ok(s) => s,
Err(e) => {
eprintln!("It looks like your config is invalid. The following error occured while parsing it: {e}");
diff --git a/src/service/pdu.rs b/src/service/pdu.rs
index d24e174..4a170bc 100644
--- a/src/service/pdu.rs
+++ b/src/service/pdu.rs
@@ -103,6 +103,19 @@ impl PduEvent {
Ok(())
}
+ pub fn add_age(&mut self) -> crate::Result<()> {
+ let mut unsigned: BTreeMap<String, Box<RawJsonValue>> = self
+ .unsigned
+ .as_ref()
+ .map_or_else(|| Ok(BTreeMap::new()), |u| serde_json::from_str(u.get()))
+ .map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?;
+
+ unsigned.insert("age".to_owned(), to_raw_value(&1).unwrap());
+ self.unsigned = Some(to_raw_value(&unsigned).expect("unsigned is valid"));
+
+ Ok(())
+ }
+
#[tracing::instrument(skip(self))]
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
let mut json = json!({
diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs
index 89ac72e..c6e433c 100644
--- a/src/service/rooms/event_handler/mod.rs
+++ b/src/service/rooms/event_handler/mod.rs
@@ -357,7 +357,7 @@ impl Service {
.await;
// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
- info!(
+ debug!(
"Auth check for {} based on auth events",
incoming_pdu.event_id
);
@@ -419,7 +419,7 @@ impl Service {
));
}
- info!("Validation successful.");
+ debug!("Validation successful.");
// 7. Persist the event as an outlier.
services()
@@ -427,7 +427,7 @@ impl Service {
.outlier
.add_pdu_outlier(&incoming_pdu.event_id, &val)?;
- info!("Added pdu as outlier.");
+ debug!("Added pdu as outlier.");
Ok((Arc::new(incoming_pdu), val))
})
@@ -476,7 +476,7 @@ impl Service {
// TODO: if we know the prev_events of the incoming event we can avoid the request and build
// the state from a known point and resolve if > 1 prev_event
- info!("Requesting state at event");
+ debug!("Requesting state at event");
let mut state_at_incoming_event = None;
if incoming_pdu.prev_events.len() == 1 {
@@ -499,7 +499,7 @@ impl Service {
};
if let Some(Ok(mut state)) = state {
- info!("Using cached state");
+ debug!("Using cached state");
let prev_pdu = services()
.rooms
.timeline
@@ -523,7 +523,7 @@ impl Service {
state_at_incoming_event = Some(state);
}
} else {
- info!("Calculating state at event using state res");
+ debug!("Calculating state at event using state res");
let mut extremity_sstatehashes = HashMap::new();
let mut okay = true;
@@ -632,7 +632,7 @@ impl Service {
}
if state_at_incoming_event.is_none() {
- info!("Calling /state_ids");
+ debug!("Calling /state_ids");
// Call /state_ids to find out what the state at this pdu is. We trust the server's
// response to some extend, but we still do a lot of checks on the events
match services()
@@ -647,7 +647,7 @@ impl Service {
.await
{
Ok(res) => {
- info!("Fetching state events at event.");
+ debug!("Fetching state events at event.");
let state_vec = self
.fetch_and_handle_outliers(
origin,
@@ -710,7 +710,7 @@ impl Service {
let state_at_incoming_event =
state_at_incoming_event.expect("we always set this to some above");
- info!("Starting auth check");
+ debug!("Starting auth check");
// 11. Check the auth of the event passes based on the state of the event
let check_result = state_res::event_auth::auth_check(
&room_version,
@@ -734,7 +734,7 @@ impl Service {
"Event has failed auth check with state at the event.",
));
}
- info!("Auth check succeeded");
+ debug!("Auth check succeeded");
// Soft fail check before doing state res
let auth_events = services().rooms.state.get_auth_events(
@@ -769,7 +769,7 @@ impl Service {
// Now we calculate the set of extremities this room has after the incoming event has been
// applied. We start with the previous extremities (aka leaves)
- info!("Calculating extremities");
+ debug!("Calculating extremities");
let mut extremities = services().rooms.state.get_forward_extremities(room_id)?;
// Remove any forward extremities that are referenced by this incoming event's prev_events
@@ -790,7 +790,7 @@ impl Service {
)
});
- info!("Compressing state at event");
+ debug!("Compressing state at event");
let state_ids_compressed = Arc::new(
state_at_incoming_event
.iter()
@@ -804,7 +804,7 @@ impl Service {
);
if incoming_pdu.state_key.is_some() {
- info!("Preparing for stateres to derive new room state");
+ debug!("Preparing for stateres to derive new room state");
// We also add state after incoming event to the fork states
let mut state_after = state_at_incoming_event.clone();
@@ -822,7 +822,7 @@ impl Service {
.await?;
// Set the new room state to the resolved state
- info!("Forcing new room state");
+ debug!("Forcing new room state");
let (sstatehash, new, removed) = services()
.rooms
@@ -837,7 +837,7 @@ impl Service {
}
// 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it
- info!("Starting soft fail auth check");
+ debug!("Starting soft fail auth check");
if soft_fail {
services().rooms.timeline.append_incoming_pdu(
@@ -861,7 +861,7 @@ impl Service {
));
}
- info!("Appending pdu to timeline");
+ debug!("Appending pdu to timeline");
extremities.insert(incoming_pdu.event_id.clone());
// Now that the event has passed all auth it is added into the timeline.
@@ -877,7 +877,7 @@ impl Service {
&state_lock,
)?;
- info!("Appended incoming pdu");
+ debug!("Appended incoming pdu");
// Event has passed all auth/stateres checks
drop(state_lock);
@@ -890,7 +890,7 @@ impl Service {
room_version_id: &RoomVersionId,
incoming_state: HashMap<u64, Arc<EventId>>,
) -> Result<Arc<HashSet<CompressedStateEvent>>> {
- info!("Loading current room state ids");
+ debug!("Loading current room state ids");
let current_sstatehash = services()
.rooms
.state
@@ -917,7 +917,7 @@ impl Service {
);
}
- info!("Loading fork states");
+ debug!("Loading fork states");
let fork_states: Vec<_> = fork_states
.into_iter()
@@ -935,7 +935,7 @@ impl Service {
})
.collect();
- info!("Resolving state");
+ debug!("Resolving state");
let lock = services().globals.stateres_mutex.lock();
let state = match state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| {
@@ -953,7 +953,7 @@ impl Service {
drop(lock);
- info!("State resolution done. Compressing state");
+ debug!("State resolution done. Compressing state");
let new_room_state = state
.into_iter()
diff --git a/src/utils/error.rs b/src/utils/error.rs
index 4f044ca..5ffb38c 100644
--- a/src/utils/error.rs
+++ b/src/utils/error.rs
@@ -9,7 +9,7 @@ use ruma::{
OwnedServerName,
};
use thiserror::Error;
-use tracing::{error, warn};
+use tracing::{error, info};
#[cfg(feature = "persy")]
use persy::PersyError;
@@ -131,7 +131,7 @@ impl Error {
_ => (Unknown, StatusCode::INTERNAL_SERVER_ERROR),
};
- warn!("{}: {}", status_code, message);
+ info!("Returning an error: {}: {}", status_code, message);
RumaResponse(UiaaResponse::MatrixError(RumaError {
body: ErrorBody::Standard { kind, message },