summaryrefslogtreecommitdiff
path: root/src/service/sending/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/sending/mod.rs')
-rw-r--r--src/service/sending/mod.rs38
1 files changed, 23 insertions, 15 deletions
diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs
index 1861feb..b441144 100644
--- a/src/service/sending/mod.rs
+++ b/src/service/sending/mod.rs
@@ -18,6 +18,8 @@ use crate::{
use federation::transactions::send_transaction_message;
use futures_util::{stream::FuturesUnordered, StreamExt};
+use base64::{engine::general_purpose, Engine as _};
+
use ruma::{
api::{
appservice,
@@ -40,7 +42,7 @@ use tokio::{
select,
sync::{mpsc, Mutex, Semaphore},
};
-use tracing::{error, warn};
+use tracing::{debug, error, warn};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind {
@@ -497,17 +499,14 @@ impl Service {
})?,
appservice::event::push_events::v1::Request {
events: pdu_jsons,
- txn_id: (&*base64::encode_config(
- calculate_hash(
- &events
- .iter()
- .map(|e| match e {
- SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b,
- })
- .collect::<Vec<_>>(),
- ),
- base64::URL_SAFE_NO_PAD,
- ))
+ txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash(
+ &events
+ .iter()
+ .map(|e| match e {
+ SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b,
+ })
+ .collect::<Vec<_>>(),
+ )))
.into(),
},
)
@@ -642,7 +641,7 @@ impl Service {
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
- transaction_id: (&*base64::encode_config(
+ transaction_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(
calculate_hash(
&events
.iter()
@@ -651,7 +650,6 @@ impl Service {
})
.collect::<Vec<_>>(),
),
- base64::URL_SAFE_NO_PAD,
))
.into(),
},
@@ -683,8 +681,18 @@ impl Service {
where
T: Debug,
{
+ debug!("Waiting for permit");
let permit = self.maximum_requests.acquire().await;
- let response = server_server::send_request(destination, request).await;
+ debug!("Got permit");
+ let response = tokio::time::timeout(
+ Duration::from_secs(2 * 60),
+ server_server::send_request(destination, request),
+ )
+ .await
+ .map_err(|_| {
+ warn!("Timeout waiting for server response of {destination}");
+ Error::BadServerResponse("Timeout waiting for server response")
+ })?;
drop(permit);
response