From d5d0c6e73ebc5b95c49f80cbad4d6f610659e03e Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Fri, 10 Feb 2023 22:27:29 +0100 Subject: [PATCH] rewrite server message writes as async, introduce error handling --- src/projections/irc.rs | 172 +++++++++++++++++---------------------- src/protos/irc/server.rs | 87 ++++++++++---------- 2 files changed, 122 insertions(+), 137 deletions(-) diff --git a/src/projections/irc.rs b/src/projections/irc.rs index 94bb654..8480d91 100644 --- a/src/projections/irc.rs +++ b/src/projections/irc.rs @@ -31,27 +31,23 @@ async fn handle_socket( mut stream: TcpStream, socket_addr: SocketAddr, mut players: PlayerRegistry, - current_connections: IntGauge, -) { +) -> Result<()> { let (reader, writer) = stream.split(); let mut reader: BufReader = BufReader::new(reader); let mut writer = BufWriter::new(writer); - { - let notice = ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::Notice { - first_target: b"*".to_vec(), - rest_targets: vec![], - text: b"Welcome to my server!".to_vec(), - }, - }; - let mut buffer = vec![]; - notice.write(&mut buffer).unwrap(); - writer.write_all(buffer.as_slice()).await; - writer.flush().await; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::Notice { + first_target: b"*".to_vec(), + rest_targets: vec![], + text: b"Welcome to my server!".to_vec(), + }, } + .write_async(&mut writer) + .await?; + writer.flush().await?; let mut buffer = vec![]; @@ -113,12 +109,13 @@ async fn handle_socket( }; match registered_user { Ok(user) => { - handle_registered_socket(config, socket_addr, players, reader, writer, user).await + handle_registered_socket(config, socket_addr, players, reader, writer, user).await?; } Err(_) => {} } - current_connections.dec(); + stream.shutdown().await?; + Ok(()) } async fn handle_registered_socket<'a>( @@ -128,85 +125,62 @@ async fn handle_registered_socket<'a>( mut reader: BufReader>, mut writer: BufWriter>, user: RegisteredUser, -) { +) -> Result<()> { let mut buffer = vec![]; log::info!("Handling registered user: {user:?}"); - { - let notice = ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N001Welcome { - client: user.nickname.clone(), - text: b"Welcome to Kek Server".to_vec(), - }, - }; - let mut buffer = vec![]; - notice.write(&mut buffer).unwrap(); - writer.write_all(buffer.as_slice()).await; - writer.flush().await; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N001Welcome { + client: user.nickname.clone(), + text: b"Welcome to Kek Server".to_vec(), + }, } - - { - let notice = ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N002YourHost { - client: user.nickname.clone(), - text: b"Welcome to Kek Server".to_vec(), - }, - }; - let mut buffer = vec![]; - notice.write(&mut buffer).unwrap(); - writer.write_all(buffer.as_slice()).await; - writer.flush().await; + .write_async(&mut writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N002YourHost { + client: user.nickname.clone(), + text: b"Welcome to Kek Server".to_vec(), + }, } - - { - let notice = ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N003Created { - client: user.nickname.clone(), - text: b"Welcome to Kek Server".to_vec(), - }, - }; - let mut buffer = vec![]; - notice.write(&mut buffer).unwrap(); - writer.write_all(buffer.as_slice()).await; - writer.flush().await; + .write_async(&mut writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N003Created { + client: user.nickname.clone(), + text: b"Welcome to Kek Server".to_vec(), + }, } - - { - let notice = ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N004MyInfo { - client: user.nickname.clone(), - hostname: config.server_name.as_bytes().to_vec(), - softname: b"kek-0.1.alpha.3".to_vec(), - }, - }; - let mut buffer = vec![]; - notice.write(&mut buffer).unwrap(); - writer.write_all(buffer.as_slice()).await; - writer.flush().await; + .write_async(&mut writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N004MyInfo { + client: user.nickname.clone(), + hostname: config.server_name.as_bytes().to_vec(), + softname: b"kek-0.1.alpha.3".to_vec(), + }, } - - { - let notice = ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N005ISupport { - client: user.nickname.clone(), - params: b"CHANTYPES=#".to_vec(), - }, - }; - let mut buffer = vec![]; - notice.write(&mut buffer).unwrap(); - writer.write_all(buffer.as_slice()).await; - writer.flush().await; + .write_async(&mut writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N005ISupport { + client: user.nickname.clone(), + params: b"CHANTYPES=#".to_vec(), + }, } + .write_async(&mut writer) + .await?; + writer.flush().await?; loop { select! { @@ -224,18 +198,15 @@ async fn handle_registered_socket<'a>( Ok((rest, msg)) => { match msg { ClientMessage::Ping { token } => { - let response = ServerMessage { + ServerMessage { tags: vec![], sender: None, body: ServerMessageBody::Pong { from: config.server_name.as_bytes().to_vec(), token, } - }; - let mut buffer = vec![]; - response.write(&mut buffer).unwrap(); - writer.write_all(buffer.as_slice()).await; - writer.flush().await; + }.write_async(&mut writer).await?; + writer.flush().await?; }, _ => {}, } @@ -248,6 +219,7 @@ async fn handle_registered_socket<'a>( }, } } + Ok(()) } pub async fn launch( @@ -281,7 +253,15 @@ pub async fn launch( total_connections.inc(); current_connections.inc(); log::debug!("Incoming connection from {socket_addr}"); - let handle = tokio::task::spawn(handle_socket(config, stream, socket_addr, players.clone(), current_connections.clone())); + let players_clone = players.clone(); + let current_connections_clone = current_connections.clone(); + let handle = tokio::task::spawn(async move { + match handle_socket(config, stream, socket_addr, players_clone).await { + Ok(_) => log::info!("Connection terminated"), + Err(err) => log::warn!("Connection failed: {err}"), + } + current_connections_clone.dec(); + }); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), } diff --git a/src/protos/irc/server.rs b/src/protos/irc/server.rs index 03fb9b0..b99df48 100644 --- a/src/protos/irc/server.rs +++ b/src/protos/irc/server.rs @@ -1,3 +1,6 @@ +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; + use super::*; /// Server-to-client message. @@ -11,17 +14,17 @@ pub struct ServerMessage { } impl ServerMessage { - pub fn write(&self, writer: &mut impl Write) -> std::io::Result<()> { + pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> { match &self.sender { Some(ref sender) => { - writer.write(b":")?; - writer.write(sender.as_slice())?; - writer.write(b" ")?; + writer.write_all(b":").await?; + writer.write_all(sender.as_slice()).await?; + writer.write_all(b" ").await?; } None => {} } - self.body.write(writer)?; - writer.write(b"\n")?; + self.body.write_async(writer).await?; + writer.write_all(b"\n").await?; Ok(()) } } @@ -77,66 +80,68 @@ pub enum ServerMessageBody { } impl ServerMessageBody { - pub fn write(&self, writer: &mut impl Write) -> std::io::Result<()> { + pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> { match self { ServerMessageBody::Notice { first_target, rest_targets, text, } => { - writer.write(b"NOTICE ")?; - writer.write(&first_target)?; - writer.write(b" :")?; - writer.write(&text)?; + writer.write_all(b"NOTICE ").await?; + writer.write_all(&first_target).await?; + writer.write_all(b" :").await?; + writer.write_all(&text).await?; } ServerMessageBody::Ping { token } => { - writer.write(b"PING ")?; - writer.write(&token)?; + writer.write_all(b"PING ").await?; + writer.write_all(&token).await?; } ServerMessageBody::Pong { from, token } => { - writer.write(b"PONG ")?; - writer.write(&from)?; - writer.write(b" :")?; - writer.write(&token)?; + writer.write_all(b"PONG ").await?; + writer.write_all(&from).await?; + writer.write_all(b" :").await?; + writer.write_all(&token).await?; } ServerMessageBody::N001Welcome { client, text } => { - writer.write(b"001 ")?; - writer.write(&client)?; - writer.write(b" :")?; - writer.write(text)?; + writer.write_all(b"001 ").await?; + writer.write_all(&client).await?; + writer.write_all(b" :").await?; + writer.write_all(text).await?; } ServerMessageBody::N002YourHost { client, text } => { - writer.write(b"002 ")?; - writer.write(&client)?; - writer.write(b" :")?; - writer.write(text)?; + writer.write_all(b"002 ").await?; + writer.write_all(&client).await?; + writer.write_all(b" :").await?; + writer.write_all(text).await?; } ServerMessageBody::N003Created { client, text } => { - writer.write(b"003 ")?; - writer.write(&client)?; - writer.write(b" :")?; - writer.write(text)?; + writer.write_all(b"003 ").await?; + writer.write_all(&client).await?; + writer.write_all(b" :").await?; + writer.write_all(text).await?; } ServerMessageBody::N004MyInfo { client, hostname, softname, } => { - writer.write(b"004 ")?; - writer.write(&client)?; - writer.write(b" ")?; - writer.write(&hostname)?; - writer.write(b" ")?; - writer.write(&softname)?; - writer.write(b" DGMQRSZagiloswz CFILPQbcefgijklmnopqrstvz bkloveqjfI")?; + writer.write_all(b"004 ").await?; + writer.write_all(&client).await?; + writer.write_all(b" ").await?; + writer.write_all(&hostname).await?; + writer.write_all(b" ").await?; + writer.write_all(&softname).await?; + writer + .write_all(b" DGMQRSZagiloswz CFILPQbcefgijklmnopqrstvz bkloveqjfI") + .await?; // TODO remove hardcoded modes } ServerMessageBody::N005ISupport { client, params } => { - writer.write(b"005 ")?; - writer.write(&client)?; - writer.write(b" ")?; - writer.write(¶ms)?; - writer.write(b" :are supported by this server")?; + writer.write_all(b"005 ").await?; + writer.write_all(&client).await?; + writer.write_all(b" ").await?; + writer.write_all(¶ms).await?; + writer.write_all(b" :are supported by this server").await?; } } Ok(())