From 6d330c0fcd093493a8859e345d53aefdfbf658f6 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Mon, 13 Feb 2023 19:58:05 +0100 Subject: [PATCH] split irc connection handler into functions --- src/projections/irc.rs | 259 +++++++++++++++++++++++------------------ 1 file changed, 146 insertions(+), 113 deletions(-) diff --git a/src/projections/irc.rs b/src/projections/irc.rs index a669ec5..465a516 100644 --- a/src/projections/irc.rs +++ b/src/projections/irc.rs @@ -2,12 +2,12 @@ use std::net::SocketAddr; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use serde::Deserialize; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot::channel; -use crate::core::player::{PlayerId, PlayerRegistry, Updates}; +use crate::core::player::{ConnectionId, PlayerHandle, PlayerId, PlayerRegistry, Updates}; use crate::core::room::RoomId; use crate::prelude::*; use crate::protos::irc::client::{client_message, ClientMessage}; @@ -51,15 +51,30 @@ async fn handle_socket( .await?; writer.flush().await?; - let mut buffer = vec![]; + let registered_user: Result = + handle_registration(&mut reader, &mut writer).await; + match registered_user { + Ok(user) => { + handle_registered_socket(config, socket_addr, players, &mut reader, &mut writer, user) + .await?; + } + Err(_) => {} + } - // let (player_id, mut player_handle) = players.create_player().await; - // let mut sub = player_handle.subscribe().await; + stream.shutdown().await?; + Ok(()) +} + +async fn handle_registration<'a>( + reader: &mut BufReader>, + writer: &mut BufWriter>, +) -> Result { + let mut buffer = vec![]; let mut future_nickname: Option> = None; let mut future_username: Option<(Vec, Vec)> = None; - let registered_user: Result = loop { + loop { let res = reader.read_until(b'\n', &mut buffer).await; match res { Ok(len) => { @@ -108,24 +123,15 @@ async fn handle_socket( } } buffer.clear(); - }; - match registered_user { - Ok(user) => { - handle_registered_socket(config, socket_addr, players, reader, writer, user).await?; - } - Err(_) => {} } - - stream.shutdown().await?; - Ok(()) } async fn handle_registered_socket<'a>( config: ServerConfig, socket_addr: SocketAddr, mut players: PlayerRegistry, - mut reader: BufReader>, - mut writer: BufWriter>, + reader: &mut BufReader>, + writer: &mut BufWriter>, user: RegisteredUser, ) -> Result<()> { let mut buffer = vec![]; @@ -144,7 +150,7 @@ async fn handle_registered_socket<'a>( text: b"Welcome to Kek Server".to_vec(), }, } - .write_async(&mut writer) + .write_async(writer) .await?; ServerMessage { tags: vec![], @@ -154,7 +160,7 @@ async fn handle_registered_socket<'a>( text: b"Welcome to Kek Server".to_vec(), }, } - .write_async(&mut writer) + .write_async(writer) .await?; ServerMessage { tags: vec![], @@ -164,7 +170,7 @@ async fn handle_registered_socket<'a>( text: b"Welcome to Kek Server".to_vec(), }, } - .write_async(&mut writer) + .write_async(writer) .await?; ServerMessage { tags: vec![], @@ -175,7 +181,7 @@ async fn handle_registered_socket<'a>( softname: b"kek-0.1.alpha.3".to_vec(), }, } - .write_async(&mut writer) + .write_async(writer) .await?; ServerMessage { tags: vec![], @@ -185,103 +191,22 @@ async fn handle_registered_socket<'a>( params: b"CHANTYPES=#".to_vec(), }, } - .write_async(&mut writer) + .write_async(writer) .await?; writer.flush().await?; loop { select! { biased; - res = reader.read_until(b'\n', &mut buffer) => { - match res { - Ok(len) => if len == 0 { - log::info!("Terminating socket"); - break; - }, - Err(err) => log::warn!("Failed to read from socket: {err}"), - } - let parsed = client_message(&buffer[..]); - match parsed { - Ok((rest, msg)) => { - match msg { - ClientMessage::Ping { token } => { - ServerMessage { - tags: vec![], - sender: None, - body: ServerMessageBody::Pong { - from: config.server_name.as_bytes().to_vec(), - token, - } - }.write_async(&mut writer).await?; - writer.flush().await?; - }, - ClientMessage::Join(chan) => { - match chan { - Chan::Global(ref room) => { - user_handle.join_room(RoomId(room.clone())).await; - ServerMessage { - tags: vec![], - sender: Some(user.nickname.clone()), - body: ServerMessageBody::Join(chan.clone()), - } - .write_async(&mut writer) - .await?; - ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N332Topic { - client: user.nickname.clone(), - chat: chan.clone(), - topic: b"chan topic lol".to_vec(), - }, - } - .write_async(&mut writer) - .await?; - ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N353NamesReply { - client: user.nickname.clone(), - chan: chan.clone(), - members: user.nickname.clone(), - }, - } - .write_async(&mut writer) - .await?; - ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N366NamesReplyEnd { - client: user.nickname.clone(), - chan: chan.clone(), - }, - } - .write_async(&mut writer) - .await?; - - writer.flush().await?; - }, - Chan::Local(_) => {}, - }; - }, - ClientMessage::PrivateMessage { recipient, body } => { - match recipient { - Recipient::Chan(Chan::Global(room)) => { - match String::from_utf8(body) { - Ok(body) => user_handle.send_message(RoomId(room.clone()), my_connection_id.clone(), body.clone()).await, - Err(err) => log::warn!("failed to parse incoming message: {err}"), - } - }, - _ => log::warn!("Unsupported target type"), - } - }, - _ => {}, - } - }, - Err(err) => { - log::warn!("Failed to parse IRC message: {err}"); - }, - } + len = reader.read_until(b'\n', &mut buffer) => { + let len = len?; + let len = if len == 0 { + log::info!("EOF, Terminating socket"); + break; + } else { + len + }; + handle_incoming_message(&buffer[0..len], &config, &user, &mut user_handle, &my_connection_id, writer).await?; buffer.clear(); }, update = connnection.recv() => { @@ -293,7 +218,7 @@ async fn handle_registered_socket<'a>( tags: vec![], sender: Some(author_id.0.clone()), body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.0)), body: body.as_bytes().to_vec() } - }.write_async(&mut writer).await?; + }.write_async(writer).await?; writer.flush().await? } }, @@ -304,6 +229,114 @@ async fn handle_registered_socket<'a>( Ok(()) } +async fn handle_incoming_message( + buffer: &[u8], + config: &ServerConfig, + user: &RegisteredUser, + user_handle: &mut PlayerHandle, + my_connection_id: &ConnectionId, + writer: &mut (impl AsyncWrite + Unpin), +) -> Result<()> { + let parsed = client_message(buffer); + match parsed { + Ok((rest, msg)) => match msg { + ClientMessage::Ping { token } => { + ServerMessage { + tags: vec![], + sender: None, + body: ServerMessageBody::Pong { + from: config.server_name.as_bytes().to_vec(), + token, + }, + } + .write_async(writer) + .await?; + writer.flush().await?; + } + ClientMessage::Join(ref chan) => { + handle_join(&config, &user, user_handle, chan, writer).await?; + } + ClientMessage::PrivateMessage { recipient, body } => match recipient { + Recipient::Chan(Chan::Global(room)) => match String::from_utf8(body) { + Ok(body) => { + user_handle + .send_message( + RoomId(room.clone()), + my_connection_id.clone(), + body.clone(), + ) + .await + } + Err(err) => log::warn!("failed to parse incoming message: {err}"), + }, + _ => log::warn!("Unsupported target type"), + }, + _ => {} + }, + Err(err) => { + log::warn!("Failed to parse IRC message: {err}"); + } + } + Ok(()) +} + +async fn handle_join( + config: &ServerConfig, + user: &RegisteredUser, + user_handle: &mut PlayerHandle, + chan: &Chan, + writer: &mut (impl AsyncWrite + Unpin), +) -> Result<()> { + match chan { + Chan::Global(ref room) => { + user_handle.join_room(RoomId(room.clone())).await; + ServerMessage { + tags: vec![], + sender: Some(user.nickname.clone()), + body: ServerMessageBody::Join(chan.clone()), + } + .write_async(writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N332Topic { + client: user.nickname.clone(), + chat: chan.clone(), + topic: b"chan topic lol".to_vec(), + }, + } + .write_async(writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N353NamesReply { + client: user.nickname.clone(), + chan: chan.clone(), + members: user.nickname.clone(), + }, + } + .write_async(writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N366NamesReplyEnd { + client: user.nickname.clone(), + chan: chan.clone(), + }, + } + .write_async(writer) + .await?; + + writer.flush().await?; + } + Chan::Local(_) => {} + }; + Ok(()) +} + pub async fn launch( config: ServerConfig, players: PlayerRegistry,