forked from lavina/lavina
1
0
Fork 0

graceful shutdown of irc socket listener

This commit is contained in:
Nikita Vilunov 2023-02-22 15:40:05 +01:00
parent 49a975e66e
commit bbd68853ae
2 changed files with 41 additions and 7 deletions

View File

@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry};
@ -5,7 +6,7 @@ use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot::channel; use tokio::sync::mpsc::channel;
use crate::core::player::*; use crate::core::player::*;
use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; use crate::core::room::{RoomId, RoomInfo, RoomRegistry};
@ -39,9 +40,10 @@ struct RegisteredUser {
async fn handle_socket( async fn handle_socket(
config: ServerConfig, config: ServerConfig,
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: SocketAddr, socket_addr: &SocketAddr,
players: PlayerRegistry, players: PlayerRegistry,
rooms: RoomRegistry, rooms: RoomRegistry,
terminator: Deferred<()>, // TODO use it to stop the connection gracefully
) -> Result<()> { ) -> Result<()> {
let (reader, writer) = stream.split(); let (reader, writer) = stream.split();
let mut reader: BufReader<ReadHalf> = BufReader::new(reader); let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
@ -244,7 +246,9 @@ async fn handle_registered_socket<'a>(
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()), sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::Error { reason: b"Leaving the server".to_vec() }, body: ServerMessageBody::Error {
reason: b"Leaving the server".to_vec(),
},
} }
.write_async(writer) .write_async(writer)
.await?; .await?;
@ -660,7 +664,8 @@ pub async fn launch(
metrics: MetricsRegistry, metrics: MetricsRegistry,
) -> Result<Terminator> { ) -> Result<Terminator> {
log::info!("Starting IRC projection"); log::info!("Starting IRC projection");
let (signal, mut rx) = channel(); let (signal, mut rx) = oneshot();
let (stopped_tx, mut stopped_rx) = channel(32);
let current_connections = let current_connections =
IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; IntGauge::new("irc_current_connections", "Open and alive TCP connections")?;
let total_connections = IntCounter::new( let total_connections = IntCounter::new(
@ -671,13 +676,19 @@ pub async fn launch(
metrics.register(Box::new(total_connections.clone()))?; metrics.register(Box::new(total_connections.clone()))?;
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started");
let handle = tokio::task::spawn(async move { let handle = tokio::task::spawn(async move {
// TODO probably should separate logic for accepting new connection and storing them
// into two tasks so that they don't block each other
let mut actors = HashMap::new();
loop { loop {
select! { select! {
biased; biased;
_ = &mut rx => break, _ = &mut rx => break,
stopped = stopped_rx.recv() => match stopped {
Some(stopped) => { let _ = actors.remove(&stopped); },
None => unreachable!(),
},
new_conn = listener.accept() => { new_conn = listener.accept() => {
match new_conn { match new_conn {
Ok((stream, socket_addr)) => { Ok((stream, socket_addr)) => {
@ -685,16 +696,29 @@ pub async fn launch(
total_connections.inc(); total_connections.inc();
current_connections.inc(); current_connections.inc();
log::debug!("Incoming connection from {socket_addr}"); log::debug!("Incoming connection from {socket_addr}");
if actors.contains_key(&socket_addr) {
log::warn!("Already contains connection form {socket_addr}");
// TODO kill the older connection and restart it
continue;
}
let players = players.clone(); let players = players.clone();
let rooms = rooms.clone(); let rooms = rooms.clone();
let current_connections_clone = current_connections.clone(); let current_connections_clone = current_connections.clone();
let (promise, deferred) = oneshot();
let stopped = stopped_tx.clone();
let handle = tokio::task::spawn(async move { let handle = tokio::task::spawn(async move {
match handle_socket(config, stream, socket_addr, players, rooms).await { match handle_socket(config, stream, &socket_addr, players, rooms, deferred).await {
Ok(_) => log::info!("Connection terminated"), Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"), Err(err) => log::warn!("Connection failed: {err}"),
} }
current_connections_clone.dec(); current_connections_clone.dec();
stopped.send(socket_addr).await?;
Ok(())
}); });
let terminator = Terminator::from_raw(promise, handle);
actors.insert(socket_addr, terminator);
}, },
Err(err) => log::warn!("Failed to accept new connection: {err}"), Err(err) => log::warn!("Failed to accept new connection: {err}"),
} }
@ -703,9 +727,19 @@ pub async fn launch(
} }
log::info!("Stopping IRC projection"); log::info!("Stopping IRC projection");
for (socket_addr, terminator) in actors {
log::debug!("Stopping IRC connection at {socket_addr}");
match terminator.terminate().await {
Ok(_) => log::debug!("Stopped IRC connection at {socket_addr}"),
Err(err) => {
log::warn!("IRC connection to {socket_addr} finished with error: {err}")
}
}
}
Ok(()) Ok(())
}); });
log::info!("Started IRC projection");
let terminator = Terminator::from_raw(signal, handle); let terminator = Terminator::from_raw(signal, handle);
Ok(terminator) Ok(terminator)
} }

View File

@ -62,7 +62,7 @@ pub enum ServerMessageBody {
Join(Chan), Join(Chan),
Part(Chan), Part(Chan),
Error { Error {
reason: ByteVec, reason: ByteVec,
}, },
N001Welcome { N001Welcome {
client: ByteVec, client: ByteVec,