diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index fbbbf1f..64a6063 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -664,7 +664,6 @@ pub async fn launch( metrics: MetricsRegistry, ) -> Result { log::info!("Starting IRC projection"); - let (signal, mut rx) = oneshot(); let (stopped_tx, mut stopped_rx) = channel(32); let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; @@ -677,7 +676,7 @@ pub async fn launch( let listener = TcpListener::bind(config.listen_on).await?; - let handle = tokio::task::spawn(async move { + let terminator = Terminator::spawn(|mut rx| async move { // TODO probably should separate logic for accepting new connection and storing them // into two tasks so that they don't block each other let mut actors = HashMap::new(); @@ -702,22 +701,22 @@ pub async fn launch( // TODO kill the older connection and restart it continue; } - let players = players.clone(); - let rooms = rooms.clone(); - let current_connections_clone = current_connections.clone(); - let (promise, deferred) = oneshot(); - let stopped = stopped_tx.clone(); - let handle = tokio::task::spawn(async move { - match handle_socket(config, stream, &socket_addr, players, rooms, deferred).await { - Ok(_) => log::info!("Connection terminated"), - Err(err) => log::warn!("Connection failed: {err}"), + let terminator = Terminator::spawn(|deferred| { + let players = players.clone(); + let rooms = rooms.clone(); + let current_connections_clone = current_connections.clone(); + let stopped_tx = stopped_tx.clone(); + async move { + match handle_socket(config, stream, &socket_addr, players, rooms, deferred).await { + Ok(_) => log::info!("Connection terminated"), + Err(err) => log::warn!("Connection failed: {err}"), + } + current_connections_clone.dec(); + stopped_tx.send(socket_addr).await?; + Ok(()) } - current_connections_clone.dec(); - stopped.send(socket_addr).await?; - Ok(()) }); - let terminator = Terminator::from_raw(promise, handle); actors.insert(socket_addr, terminator); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), @@ -740,6 +739,5 @@ pub async fn launch( }); log::info!("Started IRC projection"); - let terminator = Terminator::from_raw(signal, handle); Ok(terminator) } diff --git a/src/util/mod.rs b/src/util/mod.rs index 1686470..cdf334d 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -11,10 +11,6 @@ pub struct Terminator { completion: JoinHandle>, } impl Terminator { - pub fn from_raw(signal: Promise<()>, completion: JoinHandle>) -> Terminator { - Terminator { signal, completion } - } - pub async fn terminate(self) -> Result<()> { match self.signal.send(()) { Ok(()) => {} @@ -23,4 +19,16 @@ impl Terminator { self.completion.await??; Ok(()) } + + /// Used to spawn managed tasks with support for graceful shutdown + pub fn spawn(launcher: Fun) -> Terminator + where + Fun: FnOnce(Deferred<()>) -> Fut, + Fut: Future> + Send + 'static, + { + let (signal, rx) = oneshot(); + let future = launcher(rx); + let completion = tokio::task::spawn(future); + Terminator { signal, completion } + } } diff --git a/src/util/telemetry.rs b/src/util/telemetry.rs index 0eb3e42..c218e02 100644 --- a/src/util/telemetry.rs +++ b/src/util/telemetry.rs @@ -34,9 +34,7 @@ pub async fn launch( log::info!("Starting the telemetry service"); let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); - let (signal, rx) = channel(); - let handle = tokio::task::spawn(main_loop(listener, metrics, rooms, rx.map(|_| ()))); - let terminator = Terminator::from_raw(signal, handle); + let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, rx.map(|_| ()))); Ok(terminator) }