summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTimo Kösters <timo@koesters.xyz>2023-03-18 08:58:20 +0100
committerTimo Kösters <timo@koesters.xyz>2023-03-18 08:58:20 +0100
commit2a7c4693b8680ab8f00e990e1c779a8d36967520 (patch)
tree30db5cf8d113e4ba67a6b08602012192e78d78b7 /src
parentda3871f39a3a0a7b5b34e2110e41717c33754a73 (diff)
downloadconduit-2a7c4693b8680ab8f00e990e1c779a8d36967520.zip
fix: don't accept new requests when shutting down
Diffstat (limited to 'src')
-rw-r--r--src/main.rs17
-rw-r--r--src/service/globals/mod.rs15
2 files changed, 22 insertions, 10 deletions
diff --git a/src/main.rs b/src/main.rs
index a51416d..59e82a7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -7,7 +7,7 @@
#![allow(clippy::suspicious_else_formatting)]
#![deny(clippy::dbg_macro)]
-use std::{future::Future, io, net::SocketAddr, time::Duration};
+use std::{future::Future, io, net::SocketAddr, sync::atomic, time::Duration};
use axum::{
extract::{DefaultBodyLimit, FromRequest, MatchedPath},
@@ -212,13 +212,6 @@ async fn run_server() -> io::Result<()> {
}
}
- // On shutdown
- info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers...");
- services().globals.rotate.fire();
-
- #[cfg(feature = "systemd")]
- let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]);
-
Ok(())
}
@@ -226,6 +219,9 @@ async fn spawn_task<B: Send + 'static>(
req: axum::http::Request<B>,
next: axum::middleware::Next<B>,
) -> std::result::Result<axum::response::Response, StatusCode> {
+ if services().globals.shutdown.load(atomic::Ordering::Relaxed) {
+ return Err(StatusCode::SERVICE_UNAVAILABLE);
+ }
tokio::spawn(next.run(req))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
@@ -452,6 +448,11 @@ async fn shutdown_signal(handle: ServerHandle) {
warn!("Received {}, shutting down...", sig);
handle.graceful_shutdown(Some(Duration::from_secs(30)));
+
+ services().globals.shutdown();
+
+ #[cfg(feature = "systemd")]
+ let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]);
}
async fn not_found(uri: Uri) -> impl IntoResponse {
diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs
index cd3be08..9206d43 100644
--- a/src/service/globals/mod.rs
+++ b/src/service/globals/mod.rs
@@ -6,7 +6,7 @@ use ruma::{
use crate::api::server_server::FedDest;
-use crate::{Config, Error, Result};
+use crate::{services, Config, Error, Result};
use ruma::{
api::{
client::sync::sync_events,
@@ -14,6 +14,7 @@ use ruma::{
},
DeviceId, RoomVersionId, ServerName, UserId,
};
+use std::sync::atomic::{self, AtomicBool};
use std::{
collections::{BTreeMap, HashMap},
fs,
@@ -24,7 +25,7 @@ use std::{
time::{Duration, Instant},
};
use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore};
-use tracing::error;
+use tracing::{error, info};
use trust_dns_resolver::TokioAsyncResolver;
type WellKnownMap = HashMap<OwnedServerName, (FedDest, String)>;
@@ -58,6 +59,8 @@ pub struct Service {
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
pub stateres_mutex: Arc<Mutex<()>>,
pub rotate: RotationHandler,
+
+ pub shutdown: AtomicBool,
}
/// Handles "rotation" of long-polling requests. "Rotation" in this context is similar to "rotation" of log files and the like.
@@ -160,6 +163,7 @@ impl Service {
stateres_mutex: Arc::new(Mutex::new(())),
sync_receivers: RwLock::new(HashMap::new()),
rotate: RotationHandler::new(),
+ shutdown: AtomicBool::new(false),
};
fs::create_dir_all(s.get_media_folder())?;
@@ -341,6 +345,13 @@ impl Service {
r.push(base64::encode_config(key, base64::URL_SAFE_NO_PAD));
r
}
+
+ pub fn shutdown(&self) {
+ self.shutdown.store(true, atomic::Ordering::Relaxed);
+ // On shutdown
+ info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers...");
+ services().globals.rotate.fire();
+ }
}
fn reqwest_client_builder(config: &Config) -> Result<reqwest::ClientBuilder> {