From bbd68853ae021579baefd623d513b07d84f2f0a2 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Wed, 22 Feb 2023 15:40:05 +0100 Subject: [PATCH] graceful shutdown of irc socket listener --- src/projections/irc/mod.rs | 46 +++++++++++++++++++++++++++++++++----- src/protos/irc/server.rs | 2 +- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index 4df1e57..fbbbf1f 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::net::SocketAddr; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; @@ -5,7 +6,7 @@ use serde::Deserialize; use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::oneshot::channel; +use tokio::sync::mpsc::channel; use crate::core::player::*; use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; @@ -39,9 +40,10 @@ struct RegisteredUser { async fn handle_socket( config: ServerConfig, mut stream: TcpStream, - socket_addr: SocketAddr, + socket_addr: &SocketAddr, players: PlayerRegistry, rooms: RoomRegistry, + terminator: Deferred<()>, // TODO use it to stop the connection gracefully ) -> Result<()> { let (reader, writer) = stream.split(); let mut reader: BufReader = BufReader::new(reader); @@ -244,7 +246,9 @@ async fn handle_registered_socket<'a>( ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::Error { reason: b"Leaving the server".to_vec() }, + body: ServerMessageBody::Error { + reason: b"Leaving the server".to_vec(), + }, } .write_async(writer) .await?; @@ -660,7 +664,8 @@ pub async fn launch( metrics: MetricsRegistry, ) -> Result { log::info!("Starting IRC projection"); - let (signal, mut rx) = channel(); + let (signal, mut rx) = oneshot(); + let (stopped_tx, mut stopped_rx) = channel(32); let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; let total_connections = IntCounter::new( @@ -671,13 +676,19 @@ pub async fn launch( metrics.register(Box::new(total_connections.clone()))?; let listener = TcpListener::bind(config.listen_on).await?; - log::debug!("Listener started"); let handle = tokio::task::spawn(async move { + // TODO probably should separate logic for accepting new connection and storing them + // into two tasks so that they don't block each other + let mut actors = HashMap::new(); loop { select! { biased; _ = &mut rx => break, + stopped = stopped_rx.recv() => match stopped { + Some(stopped) => { let _ = actors.remove(&stopped); }, + None => unreachable!(), + }, new_conn = listener.accept() => { match new_conn { Ok((stream, socket_addr)) => { @@ -685,16 +696,29 @@ pub async fn launch( total_connections.inc(); current_connections.inc(); log::debug!("Incoming connection from {socket_addr}"); + + if actors.contains_key(&socket_addr) { + log::warn!("Already contains connection form {socket_addr}"); + // TODO kill the older connection and restart it + continue; + } let players = players.clone(); let rooms = rooms.clone(); let current_connections_clone = current_connections.clone(); + + let (promise, deferred) = oneshot(); + let stopped = stopped_tx.clone(); let handle = tokio::task::spawn(async move { - match handle_socket(config, stream, socket_addr, players, rooms).await { + match handle_socket(config, stream, &socket_addr, players, rooms, deferred).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } current_connections_clone.dec(); + stopped.send(socket_addr).await?; + Ok(()) }); + let terminator = Terminator::from_raw(promise, handle); + actors.insert(socket_addr, terminator); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), } @@ -703,9 +727,19 @@ pub async fn launch( } log::info!("Stopping IRC projection"); + for (socket_addr, terminator) in actors { + log::debug!("Stopping IRC connection at {socket_addr}"); + match terminator.terminate().await { + Ok(_) => log::debug!("Stopped IRC connection at {socket_addr}"), + Err(err) => { + log::warn!("IRC connection to {socket_addr} finished with error: {err}") + } + } + } Ok(()) }); + log::info!("Started IRC projection"); let terminator = Terminator::from_raw(signal, handle); Ok(terminator) } diff --git a/src/protos/irc/server.rs b/src/protos/irc/server.rs index fa104ba..107d740 100644 --- a/src/protos/irc/server.rs +++ b/src/protos/irc/server.rs @@ -62,7 +62,7 @@ pub enum ServerMessageBody { Join(Chan), Part(Chan), Error { - reason: ByteVec, + reason: ByteVec, }, N001Welcome { client: ByteVec,