remove Terminator::from_raw

This commit is contained in:
Nikita Vilunov 2023-02-22 16:05:28 +01:00
parent bbd68853ae
commit 0adc19558d
3 changed files with 27 additions and 23 deletions

View File

@ -664,7 +664,6 @@ pub async fn launch(
metrics: MetricsRegistry, metrics: MetricsRegistry,
) -> Result<Terminator> { ) -> Result<Terminator> {
log::info!("Starting IRC projection"); log::info!("Starting IRC projection");
let (signal, mut rx) = oneshot();
let (stopped_tx, mut stopped_rx) = channel(32); let (stopped_tx, mut stopped_rx) = channel(32);
let current_connections = let current_connections =
IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; IntGauge::new("irc_current_connections", "Open and alive TCP connections")?;
@ -677,7 +676,7 @@ pub async fn launch(
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
let handle = tokio::task::spawn(async move { let terminator = Terminator::spawn(|mut rx| async move {
// TODO probably should separate logic for accepting new connection and storing them // TODO probably should separate logic for accepting new connection and storing them
// into two tasks so that they don't block each other // into two tasks so that they don't block each other
let mut actors = HashMap::new(); let mut actors = HashMap::new();
@ -702,22 +701,22 @@ pub async fn launch(
// TODO kill the older connection and restart it // TODO kill the older connection and restart it
continue; continue;
} }
let players = players.clone();
let rooms = rooms.clone();
let current_connections_clone = current_connections.clone();
let (promise, deferred) = oneshot(); let terminator = Terminator::spawn(|deferred| {
let stopped = stopped_tx.clone(); let players = players.clone();
let handle = tokio::task::spawn(async move { let rooms = rooms.clone();
match handle_socket(config, stream, &socket_addr, players, rooms, deferred).await { let current_connections_clone = current_connections.clone();
Ok(_) => log::info!("Connection terminated"), let stopped_tx = stopped_tx.clone();
Err(err) => log::warn!("Connection failed: {err}"), async move {
match handle_socket(config, stream, &socket_addr, players, rooms, deferred).await {
Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"),
}
current_connections_clone.dec();
stopped_tx.send(socket_addr).await?;
Ok(())
} }
current_connections_clone.dec();
stopped.send(socket_addr).await?;
Ok(())
}); });
let terminator = Terminator::from_raw(promise, handle);
actors.insert(socket_addr, terminator); actors.insert(socket_addr, terminator);
}, },
Err(err) => log::warn!("Failed to accept new connection: {err}"), Err(err) => log::warn!("Failed to accept new connection: {err}"),
@ -740,6 +739,5 @@ pub async fn launch(
}); });
log::info!("Started IRC projection"); log::info!("Started IRC projection");
let terminator = Terminator::from_raw(signal, handle);
Ok(terminator) Ok(terminator)
} }

View File

@ -11,10 +11,6 @@ pub struct Terminator {
completion: JoinHandle<Result<()>>, completion: JoinHandle<Result<()>>,
} }
impl Terminator { impl Terminator {
pub fn from_raw(signal: Promise<()>, completion: JoinHandle<Result<()>>) -> Terminator {
Terminator { signal, completion }
}
pub async fn terminate(self) -> Result<()> { pub async fn terminate(self) -> Result<()> {
match self.signal.send(()) { match self.signal.send(()) {
Ok(()) => {} Ok(()) => {}
@ -23,4 +19,16 @@ impl Terminator {
self.completion.await??; self.completion.await??;
Ok(()) Ok(())
} }
/// Used to spawn managed tasks with support for graceful shutdown
pub fn spawn<Fun, Fut>(launcher: Fun) -> Terminator
where
Fun: FnOnce(Deferred<()>) -> Fut,
Fut: Future<Output = Result<()>> + Send + 'static,
{
let (signal, rx) = oneshot();
let future = launcher(rx);
let completion = tokio::task::spawn(future);
Terminator { signal, completion }
}
} }

View File

@ -34,9 +34,7 @@ pub async fn launch(
log::info!("Starting the telemetry service"); log::info!("Starting the telemetry service");
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started"); log::debug!("Listener started");
let (signal, rx) = channel(); let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, rx.map(|_| ())));
let handle = tokio::task::spawn(main_loop(listener, metrics, rooms, rx.map(|_| ())));
let terminator = Terminator::from_raw(signal, handle);
Ok(terminator) Ok(terminator)
} }