diff --git a/Cargo.lock b/Cargo.lock index 69c4337..4bb1503 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,6 +56,18 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-scoped" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7a6a57c8aeb40da1ec037f5d455836852f7a57e69e1b1ad3d8f38ac1d6cadf" +dependencies = [ + "futures", + "pin-project", + "slab", + "tokio", +] + [[package]] name = "atoi" version = "2.0.0" @@ -377,6 +389,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -450,6 +477,7 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -754,6 +782,7 @@ version = "0.1.0" dependencies = [ "anyhow", "assert_matches", + "async-scoped", "derive_more", "figment", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index ddabb38..8950494 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ quick-xml = { version = "0.30.0", features = ["async-tokio"] } derive_more = "0.99.17" uuid = { version = "1.3.0", features = ["v4"] } sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "runtime-tokio-rustls", "migrate"] } +async-scoped = { version = "0.7.1", features = ["use-tokio"] } [dev-dependencies] assert_matches = "1.5.0" diff --git a/src/main.rs b/src/main.rs index a623441..fae4f28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,7 +57,10 @@ async fn main() -> Result<()> { let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?; - let irc = projections::irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; + + // unsafe: outer future is never dropped, scope is joined on `scope.collect` + let mut scope = unsafe { Scope::create() }; + let irc = projections::irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?; let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; tracing::info!("Started"); @@ -65,7 +68,10 @@ async fn main() -> Result<()> { tracing::info!("Begin shutdown"); xmpp.terminate().await?; - irc.terminate().await?; + let _ = irc.send(()); + let _ = scope.collect().await; + drop(scope); + telemetry_terminator.terminate().await?; players.shutdown_all().await?; drop(players); diff --git a/src/prelude.rs b/src/prelude.rs index 9488bc2..e01332d 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -23,3 +23,5 @@ macro_rules! ffail { } pub(crate) use ffail; + +pub type Scope<'a> = async_scoped::Scope<'a, (), async_scoped::Tokio>; diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index 6c36716..2f74919 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::net::SocketAddr; +use futures_util::FutureExt; use futures_util::future::join_all; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use serde::Deserialize; @@ -689,13 +690,14 @@ async fn produce_on_join_cmd_messages( Ok(()) } -pub async fn launch( - config: ServerConfig, - players: PlayerRegistry, - rooms: RoomRegistry, - metrics: MetricsRegistry, - storage: Storage, -) -> Result { +pub async fn launch<'a>( + config: &'a ServerConfig, + players: &'a PlayerRegistry, + rooms: &'a RoomRegistry, + metrics: &'a MetricsRegistry, + storage: &'a Storage, + scope: &mut Scope<'a>, +) -> Result> { log::info!("Starting IRC projection"); let (stopped_tx, mut stopped_rx) = channel(32); let current_connections = @@ -709,7 +711,8 @@ pub async fn launch( let listener = TcpListener::bind(config.listen_on).await?; - let terminator = Terminator::spawn(|mut rx| async move { + let (signal, mut rx) = oneshot(); + let future = async move { // TODO probably should separate logic for accepting new connection and storing them // into two tasks so that they don't block each other let mut actors = HashMap::new(); @@ -771,8 +774,10 @@ pub async fn launch( })).await; log::info!("Stopped IRC projection"); Ok(()) - }); + }; + + scope.spawn(future.map(|_: Result<()>| ())); log::info!("Started IRC projection"); - Ok(terminator) + Ok(signal) }