forked from lavina/lavina
1
0
Fork 0

use scoped for irc projection

This commit is contained in:
Nikita Vilunov 2023-09-06 17:02:05 +02:00
parent 377d9c32d2
commit ee7f7b5ffc
5 changed files with 55 additions and 12 deletions

29
Cargo.lock generated
View File

@ -56,6 +56,18 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-scoped"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7a6a57c8aeb40da1ec037f5d455836852f7a57e69e1b1ad3d8f38ac1d6cadf"
dependencies = [
"futures",
"pin-project",
"slab",
"tokio",
]
[[package]] [[package]]
name = "atoi" name = "atoi"
version = "2.0.0" version = "2.0.0"
@ -377,6 +389,21 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "futures"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.28" version = "0.3.28"
@ -450,6 +477,7 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro", "futures-macro",
@ -754,6 +782,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"assert_matches", "assert_matches",
"async-scoped",
"derive_more", "derive_more",
"figment", "figment",
"futures-util", "futures-util",

View File

@ -25,6 +25,7 @@ quick-xml = { version = "0.30.0", features = ["async-tokio"] }
derive_more = "0.99.17" derive_more = "0.99.17"
uuid = { version = "1.3.0", features = ["v4"] } uuid = { version = "1.3.0", features = ["v4"] }
sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "runtime-tokio-rustls", "migrate"] } sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "runtime-tokio-rustls", "migrate"] }
async-scoped = { version = "0.7.1", features = ["use-tokio"] }
[dev-dependencies] [dev-dependencies]
assert_matches = "1.5.0" assert_matches = "1.5.0"

View File

@ -57,7 +57,10 @@ async fn main() -> Result<()> {
let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?;
let telemetry_terminator = let telemetry_terminator =
util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?; util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?;
let irc = projections::irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?;
// unsafe: outer future is never dropped, scope is joined on `scope.collect`
let mut scope = unsafe { Scope::create() };
let irc = projections::irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?;
let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?;
tracing::info!("Started"); tracing::info!("Started");
@ -65,7 +68,10 @@ async fn main() -> Result<()> {
tracing::info!("Begin shutdown"); tracing::info!("Begin shutdown");
xmpp.terminate().await?; xmpp.terminate().await?;
irc.terminate().await?; let _ = irc.send(());
let _ = scope.collect().await;
drop(scope);
telemetry_terminator.terminate().await?; telemetry_terminator.terminate().await?;
players.shutdown_all().await?; players.shutdown_all().await?;
drop(players); drop(players);

View File

@ -23,3 +23,5 @@ macro_rules! ffail {
} }
pub(crate) use ffail; pub(crate) use ffail;
pub type Scope<'a> = async_scoped::Scope<'a, (), async_scoped::Tokio>;

View File

@ -1,6 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures_util::FutureExt;
use futures_util::future::join_all; use futures_util::future::join_all;
use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry};
use serde::Deserialize; use serde::Deserialize;
@ -689,13 +690,14 @@ async fn produce_on_join_cmd_messages(
Ok(()) Ok(())
} }
pub async fn launch( pub async fn launch<'a>(
config: ServerConfig, config: &'a ServerConfig,
players: PlayerRegistry, players: &'a PlayerRegistry,
rooms: RoomRegistry, rooms: &'a RoomRegistry,
metrics: MetricsRegistry, metrics: &'a MetricsRegistry,
storage: Storage, storage: &'a Storage,
) -> Result<Terminator> { scope: &mut Scope<'a>,
) -> Result<Promise<()>> {
log::info!("Starting IRC projection"); log::info!("Starting IRC projection");
let (stopped_tx, mut stopped_rx) = channel(32); let (stopped_tx, mut stopped_rx) = channel(32);
let current_connections = let current_connections =
@ -709,7 +711,8 @@ pub async fn launch(
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
let terminator = Terminator::spawn(|mut rx| async move { let (signal, mut rx) = oneshot();
let future = async move {
// TODO probably should separate logic for accepting new connection and storing them // TODO probably should separate logic for accepting new connection and storing them
// into two tasks so that they don't block each other // into two tasks so that they don't block each other
let mut actors = HashMap::new(); let mut actors = HashMap::new();
@ -771,8 +774,10 @@ pub async fn launch(
})).await; })).await;
log::info!("Stopped IRC projection"); log::info!("Stopped IRC projection");
Ok(()) Ok(())
}); };
scope.spawn(future.map(|_: Result<()>| ()));
log::info!("Started IRC projection"); log::info!("Started IRC projection");
Ok(terminator) Ok(signal)
} }