From 16523331847d91e7d2386f9a5b2a19917e34d1d8 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Wed, 6 Sep 2023 17:20:48 +0200 Subject: [PATCH] use scoped for xmpp --- src/main.rs | 4 ++-- src/projections/xmpp/mod.rs | 23 ++++++++++++++--------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index fae4f28..dadb8ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -61,13 +61,13 @@ async fn main() -> Result<()> { // unsafe: outer future is never dropped, scope is joined on `scope.collect` let mut scope = unsafe { Scope::create() }; let irc = projections::irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?; - let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; + let xmpp = projections::xmpp::launch(xmpp_config, &players, &rooms, &metrics, &mut scope).await?; tracing::info!("Started"); sleep.await; tracing::info!("Begin shutdown"); - xmpp.terminate().await?; + let _ = xmpp.send(()); let _ = irc.send(()); let _ = scope.collect().await; drop(scope); diff --git a/src/projections/xmpp/mod.rs b/src/projections/xmpp/mod.rs index a02ceca..9260c04 100644 --- a/src/projections/xmpp/mod.rs +++ b/src/projections/xmpp/mod.rs @@ -7,6 +7,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use futures_util::FutureExt; use futures_util::future::join_all; use prometheus::Registry as MetricsRegistry; use quick_xml::events::{BytesDecl, Event}; @@ -53,12 +54,13 @@ struct Authenticated { xmpp_muc_name: Resource, } -pub async fn launch( +pub async fn launch<'a>( config: ServerConfig, - players: PlayerRegistry, - rooms: RoomRegistry, - metrics: MetricsRegistry, -) -> Result { + players: &'a PlayerRegistry, + rooms: &'a RoomRegistry, + metrics: &'a MetricsRegistry, + scope: &mut Scope<'a>, +) -> Result> { log::info!("Starting XMPP projection"); let certs = certs(&mut SyncBufReader::new(File::open(config.cert)?))?; @@ -75,13 +77,15 @@ pub async fn launch( }); let listener = TcpListener::bind(config.listen_on).await?; - let terminator = Terminator::spawn(|mut termination| async move { + + let (signal, mut rx) = oneshot(); + let future = async move { let (stopped_tx, mut stopped_rx) = channel(32); let mut actors = HashMap::new(); loop { select! { biased; - _ = &mut termination => break, + _ = &mut rx => break, stopped = stopped_rx.recv() => match stopped { Some(stopped) => { let _ = actors.remove(&stopped); }, None => unreachable!(), @@ -136,9 +140,10 @@ pub async fn launch( .await; log::info!("Stopped XMPP projection"); Ok(()) - }); + }; + scope.spawn(future.map(|_: Result<()>| ())); log::info!("Started XMPP projection"); - Ok(terminator) + Ok(signal) } async fn handle_socket(