forked from lavina/lavina
1
0
Fork 0

use scoped for xmpp

This commit is contained in:
Nikita Vilunov 2023-09-06 17:20:48 +02:00
parent ee7f7b5ffc
commit 1652333184
2 changed files with 16 additions and 11 deletions

View File

@ -61,13 +61,13 @@ async fn main() -> Result<()> {
// unsafe: outer future is never dropped, scope is joined on `scope.collect` // unsafe: outer future is never dropped, scope is joined on `scope.collect`
let mut scope = unsafe { Scope::create() }; let mut scope = unsafe { Scope::create() };
let irc = projections::irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?; let irc = projections::irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?;
let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; let xmpp = projections::xmpp::launch(xmpp_config, &players, &rooms, &metrics, &mut scope).await?;
tracing::info!("Started"); tracing::info!("Started");
sleep.await; sleep.await;
tracing::info!("Begin shutdown"); tracing::info!("Begin shutdown");
xmpp.terminate().await?; let _ = xmpp.send(());
let _ = irc.send(()); let _ = irc.send(());
let _ = scope.collect().await; let _ = scope.collect().await;
drop(scope); drop(scope);

View File

@ -7,6 +7,7 @@ use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use futures_util::FutureExt;
use futures_util::future::join_all; use futures_util::future::join_all;
use prometheus::Registry as MetricsRegistry; use prometheus::Registry as MetricsRegistry;
use quick_xml::events::{BytesDecl, Event}; use quick_xml::events::{BytesDecl, Event};
@ -53,12 +54,13 @@ struct Authenticated {
xmpp_muc_name: Resource, xmpp_muc_name: Resource,
} }
pub async fn launch( pub async fn launch<'a>(
config: ServerConfig, config: ServerConfig,
players: PlayerRegistry, players: &'a PlayerRegistry,
rooms: RoomRegistry, rooms: &'a RoomRegistry,
metrics: MetricsRegistry, metrics: &'a MetricsRegistry,
) -> Result<Terminator> { scope: &mut Scope<'a>,
) -> Result<Promise<()>> {
log::info!("Starting XMPP projection"); log::info!("Starting XMPP projection");
let certs = certs(&mut SyncBufReader::new(File::open(config.cert)?))?; let certs = certs(&mut SyncBufReader::new(File::open(config.cert)?))?;
@ -75,13 +77,15 @@ pub async fn launch(
}); });
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
let terminator = Terminator::spawn(|mut termination| async move {
let (signal, mut rx) = oneshot();
let future = async move {
let (stopped_tx, mut stopped_rx) = channel(32); let (stopped_tx, mut stopped_rx) = channel(32);
let mut actors = HashMap::new(); let mut actors = HashMap::new();
loop { loop {
select! { select! {
biased; biased;
_ = &mut termination => break, _ = &mut rx => break,
stopped = stopped_rx.recv() => match stopped { stopped = stopped_rx.recv() => match stopped {
Some(stopped) => { let _ = actors.remove(&stopped); }, Some(stopped) => { let _ = actors.remove(&stopped); },
None => unreachable!(), None => unreachable!(),
@ -136,9 +140,10 @@ pub async fn launch(
.await; .await;
log::info!("Stopped XMPP projection"); log::info!("Stopped XMPP projection");
Ok(()) Ok(())
}); };
scope.spawn(future.map(|_: Result<()>| ()));
log::info!("Started XMPP projection"); log::info!("Started XMPP projection");
Ok(terminator) Ok(signal)
} }
async fn handle_socket( async fn handle_socket(