Skip to content
Snippets Groups Projects
Commit e4dc4a1b authored by Jason Volk's avatar Jason Volk
Browse files

fix graceful shutdown on unix socket


Signed-off-by: default avatarJason Volk <jason@zemos.net>
parent d2fb6d04
No related branches found
No related tags found
1 merge request!520Panic / Error Handling
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
use std::{ use std::{
net::{self, IpAddr, Ipv4Addr}, net::{self, IpAddr, Ipv4Addr},
path::Path, path::Path,
sync::Arc, sync::{atomic::Ordering, Arc},
}; };
use axum::{ use axum::{
...@@ -21,12 +21,14 @@ ...@@ -21,12 +21,14 @@
net::{unix::SocketAddr, UnixListener, UnixStream}, net::{unix::SocketAddr, UnixListener, UnixStream},
sync::broadcast::{self}, sync::broadcast::{self},
task::JoinSet, task::JoinSet,
time::{sleep, Duration},
}; };
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
type MakeService = IntoMakeServiceWithConnectInfo<Router, net::SocketAddr>; type MakeService = IntoMakeServiceWithConnectInfo<Router, net::SocketAddr>;
static NULL_ADDR: net::SocketAddr = net::SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); const NULL_ADDR: net::SocketAddr = net::SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
const FINI_POLL_INTERVAL: Duration = Duration::from_millis(750);
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(super) async fn serve(server: &Arc<Server>, app: Router, mut shutdown: broadcast::Receiver<()>) -> Result<()> { pub(super) async fn serve(server: &Arc<Server>, app: Router, mut shutdown: broadcast::Receiver<()>) -> Result<()> {
...@@ -47,7 +49,7 @@ pub(super) async fn serve(server: &Arc<Server>, app: Router, mut shutdown: broad ...@@ -47,7 +49,7 @@ pub(super) async fn serve(server: &Arc<Server>, app: Router, mut shutdown: broad
} }
} }
fini(listener, tasks).await; fini(server, listener, tasks).await;
Ok(()) Ok(())
} }
...@@ -111,15 +113,26 @@ async fn init(server: &Arc<Server>) -> Result<UnixListener> { ...@@ -111,15 +113,26 @@ async fn init(server: &Arc<Server>) -> Result<UnixListener> {
return Err!("Failed to set socket {path:?} permissions: {e}"); return Err!("Failed to set socket {path:?} permissions: {e}");
} }
info!("Listening at {:?}", path); info!("Listening at {path:?}");
Ok(listener.unwrap()) Ok(listener.unwrap())
} }
async fn fini(listener: UnixListener, mut tasks: JoinSet<()>) { async fn fini(server: &Arc<Server>, listener: UnixListener, mut tasks: JoinSet<()>) {
let local = listener.local_addr(); let local = listener.local_addr();
debug!("Closing listener at {local:?} ...");
drop(listener); drop(listener);
debug!("Waiting for requests to finish...");
while server.metrics.requests_spawn_active.load(Ordering::Relaxed) > 0 {
tokio::select! {
_ = tasks.join_next() => {}
() = sleep(FINI_POLL_INTERVAL) => {}
}
}
debug!("Shutting down...");
tasks.shutdown().await; tasks.shutdown().await;
if let Ok(local) = local { if let Ok(local) = local {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment