use std::net::SocketAddr; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use serde::Deserialize; use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot::channel; use crate::core::player::{PlayerConnection, PlayerId, PlayerRegistry, Updates}; use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; use crate::prelude::*; use crate::protos::irc::client::{client_message, ClientMessage}; use crate::protos::irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; use crate::protos::irc::{Chan, Recipient}; use crate::util::Terminator; #[cfg(test)] mod test; #[derive(Deserialize, Debug, Clone)] pub struct ServerConfig { pub listen_on: SocketAddr, pub server_name: String, } #[derive(Debug)] struct RegisteredUser { nickname: Vec, /** * Username is mostly unused in modern IRC. * * [https://stackoverflow.com/questions/31666247/what-is-the-difference-between-the-nick-username-and-real-name-in-irc-and-wha] */ username: Vec, realname: Vec, } async fn handle_socket( config: ServerConfig, mut stream: TcpStream, socket_addr: SocketAddr, players: PlayerRegistry, rooms: RoomRegistry, ) -> Result<()> { let (reader, writer) = stream.split(); let mut reader: BufReader = BufReader::new(reader); let mut writer = BufWriter::new(writer); ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::Notice { first_target: b"*".to_vec(), rest_targets: vec![], text: b"Welcome to my server!".to_vec(), }, } .write_async(&mut writer) .await?; writer.flush().await?; let registered_user: Result = handle_registration(&mut reader, &mut writer).await; match registered_user { Ok(user) => { handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user) .await?; } Err(_) => {} } stream.shutdown().await?; Ok(()) } async fn handle_registration<'a>( reader: &mut BufReader>, writer: &mut BufWriter>, ) -> Result { let mut buffer = vec![]; let mut future_nickname: Option> = None; let mut future_username: Option<(Vec, Vec)> = None; loop { let res = reader.read_until(b'\n', &mut buffer).await; match res { Ok(len) => { if len == 0 { log::info!("Terminating socket"); break Err(anyhow::Error::msg("EOF")); } } Err(err) => { log::warn!("Failed to read from socket: {err}"); break Err(err.into()); } } let parsed = client_message(&buffer[..]); match parsed { Ok((_, msg)) => { log::info!("Incoming IRC message: {msg:?}"); match msg { ClientMessage::Nick { nickname } => { if let Some((username, realname)) = future_username { break Ok(RegisteredUser { nickname, username, realname, }); } else { future_nickname = Some(nickname); } } ClientMessage::User { username, realname } => { if let Some(nickname) = future_nickname { break Ok(RegisteredUser { nickname, username, realname, }); } else { future_username = Some((username, realname)); } } _ => {} } } Err(err) => { log::warn!("Failed to parse IRC message: {err}"); } } buffer.clear(); } } async fn handle_registered_socket<'a>( config: ServerConfig, mut players: PlayerRegistry, rooms: RoomRegistry, reader: &mut BufReader>, writer: &mut BufWriter>, user: RegisteredUser, ) -> Result<()> { let mut buffer = vec![]; log::info!("Handling registered user: {user:?}"); let player_id = PlayerId::from_bytes(user.nickname.clone())?; let mut connection = players.connect_to_player(player_id.clone()).await; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N001Welcome { client: user.nickname.clone(), text: b"Welcome to Kek Server".to_vec(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N002YourHost { client: user.nickname.clone(), text: b"Welcome to Kek Server".to_vec(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N003Created { client: user.nickname.clone(), text: b"Welcome to Kek Server".to_vec(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N004MyInfo { client: user.nickname.clone(), hostname: config.server_name.as_bytes().to_vec(), softname: b"kek-0.1.alpha.3".to_vec(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N005ISupport { client: user.nickname.clone(), params: b"CHANTYPES=#".to_vec(), }, } .write_async(writer) .await?; let rooms_list = connection.get_rooms().await?; for room in &rooms_list { produce_on_join_cmd_messages( &config, &user, &Chan::Global(room.id.as_bytes().clone()), room, writer, ) .await?; } writer.flush().await?; loop { select! { biased; len = reader.read_until(b'\n', &mut buffer) => { let len = len?; let len = if len == 0 { log::info!("EOF, Terminating socket"); break; } else { len }; handle_incoming_message(&buffer[0..len], &config, &user, &rooms, &mut connection, writer).await?; buffer.clear(); }, update = connection.receiver.recv() => { if let Some(update) = update { handle_update(&config, &user, &player_id, writer, &rooms, update).await?; } else { log::warn!("Player is terminated, must terminate the connection"); break; } } } } connection.terminate().await; Ok(()) } async fn handle_update( config: &ServerConfig, user: &RegisteredUser, player_id: &PlayerId, writer: &mut (impl AsyncWrite + Unpin), rooms: &RoomRegistry, update: Updates, ) -> Result<()> { match update { Updates::RoomJoined { new_member_id, room_id, } => { if player_id == &new_member_id { if let Some(room) = rooms.get_room(&room_id) { let room_info = room.get_room_info().await; let chan = Chan::Global(room_id.as_bytes().clone()); produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?; writer.flush().await?; } else { log::warn!("Received join to a non-existant room"); } } else { ServerMessage { tags: vec![], sender: Some(new_member_id.as_bytes().clone()), body: ServerMessageBody::Join(Chan::Global(room_id.as_bytes().clone())), } .write_async(writer) .await?; writer.flush().await? } } Updates::RoomLeft { room_id, former_member_id, } => { ServerMessage { tags: vec![], sender: Some(former_member_id.as_bytes().clone()), body: ServerMessageBody::Part(Chan::Global(room_id.as_bytes().clone())), } .write_async(writer) .await?; writer.flush().await? } Updates::NewMessage { author_id, room_id, body, } => { ServerMessage { tags: vec![], sender: Some(author_id.as_bytes().clone()), body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_bytes().clone())), body: body.as_bytes().to_vec(), }, } .write_async(writer) .await?; writer.flush().await? } Updates::RoomTopicChanged { room_id, new_topic } => { ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N332Topic { client: user.nickname.clone(), chat: Chan::Global(room_id.as_bytes().clone()), topic: new_topic, }, } .write_async(writer) .await?; writer.flush().await? } } Ok(()) } async fn handle_incoming_message( buffer: &[u8], config: &ServerConfig, user: &RegisteredUser, rooms: &RoomRegistry, user_handle: &mut PlayerConnection, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { let parsed = client_message(buffer); match parsed { Ok((_, msg)) => match msg { ClientMessage::Ping { token } => { ServerMessage { tags: vec![], sender: None, body: ServerMessageBody::Pong { from: config.server_name.as_bytes().to_vec(), token, }, } .write_async(writer) .await?; writer.flush().await?; } ClientMessage::Join(ref chan) => { handle_join(&config, &user, user_handle, chan, writer).await?; } ClientMessage::Part { chan, message } => { handle_part(config, user, user_handle, &chan, writer).await?; } ClientMessage::PrivateMessage { recipient, body } => match recipient { Recipient::Chan(Chan::Global(chan)) => match String::from_utf8(body) { Ok(body) => { let room_id = RoomId::from_bytes(chan)?; user_handle.send_message(room_id, body.clone()).await?; } Err(err) => log::warn!("failed to parse incoming message: {err}"), }, _ => log::warn!("Unsupported target type"), }, ClientMessage::Topic { chan, topic } => { match chan { Chan::Global(chan) => { let room_id = RoomId::from_bytes(chan)?; user_handle .change_topic(room_id.clone(), topic.clone()) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N332Topic { client: user.nickname.clone(), chat: Chan::Global(room_id.as_bytes().clone()), topic, }, } .write_async(writer) .await?; writer.flush().await?; } Chan::Local(_) => {} }; } ClientMessage::Who { target } => match &target { Recipient::Nick(nick) => { // TODO handle non-existing user let mut username = Vec::with_capacity(nick.len() + 1); username.push(b'~'); username.extend_from_slice(nick.as_slice()); let mut host = b"user/".to_vec(); host.extend_from_slice(nick.as_slice()); ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: user_to_who_msg(config, user, nick), } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N315EndOfWho { client: user.nickname.clone(), mask: target.clone(), msg: b"End of WHO list".to_vec(), }, } .write_async(writer) .await?; writer.flush().await?; } Recipient::Chan(Chan::Global(chan)) => { let room = rooms.get_room(&RoomId::from_bytes(chan.clone())?); if let Some(room) = room { let room_info = room.get_room_info().await; for member in room_info.members { ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: user_to_who_msg(config, user, member.as_bytes()), } .write_async(writer) .await?; } } ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N315EndOfWho { client: user.nickname.clone(), mask: target.clone(), msg: b"End of WHO list".to_vec(), }, } .write_async(writer) .await?; writer.flush().await?; } Recipient::Chan(Chan::Local(_)) => { log::warn!("Local chans not supported"); } }, ClientMessage::Mode { target } => { match target { Recipient::Nick(nickname) => { if nickname == user.nickname { ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N221UserModeIs { client: user.nickname.clone(), modes: b"+r".to_vec() }, } .write_async(writer) .await?; writer.flush().await?; } else { // TODO send 502 (not 401) if the user is not the sender } }, Recipient::Chan(_) => { // TODO handle chan mode handling }, } }, cmd => { log::warn!("Not implemented handler for client command: {cmd:?}"); } }, Err(err) => { log::warn!("Failed to parse IRC message: {err}"); } } Ok(()) } fn user_to_who_msg(config: &ServerConfig, requestor: &RegisteredUser, target_user_nickname: &ByteVec) -> ServerMessageBody { // Username is equal to nickname let mut username = Vec::with_capacity(target_user_nickname.len() + 1); username.push(b'~'); username.extend_from_slice(target_user_nickname.as_slice()); // User's host is not public, replace it with `user/` pattern let mut host = b"user/".to_vec(); host.extend_from_slice(target_user_nickname.as_slice()); ServerMessageBody::N352WhoReply { client: requestor.nickname.clone(), username, host, server: config.server_name.as_bytes().to_vec(), flags: AwayStatus::Here, nickname: target_user_nickname.clone(), hops: 0, // TODO Realname is not available yet, should be matched to a future core's player field realname: target_user_nickname.clone(), } } async fn handle_join( config: &ServerConfig, user: &RegisteredUser, user_handle: &mut PlayerConnection, chan: &Chan, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { match chan { Chan::Global(chan_name) => { let room_id = RoomId::from_bytes(chan_name.clone())?; let room_info = user_handle.join_room(room_id).await?; produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?; writer.flush().await?; } Chan::Local(_) => {} }; Ok(()) } async fn handle_part( config: &ServerConfig, user: &RegisteredUser, user_handle: &mut PlayerConnection, chan: &Chan, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { if let Chan::Global(chan_name) = chan { let room_id = RoomId::from_bytes(chan_name.clone())?; user_handle.leave_room(room_id).await?; ServerMessage { tags: vec![], sender: Some(user.nickname.clone()), body: ServerMessageBody::Part(Chan::Global(chan_name.clone())), } .write_async(writer) .await?; writer.flush().await?; } else { log::warn!("Local chans unsupported"); } Ok(()) } async fn produce_on_join_cmd_messages( config: &ServerConfig, user: &RegisteredUser, chan: &Chan, room_info: &RoomInfo, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { ServerMessage { tags: vec![], sender: Some(user.nickname.clone()), body: ServerMessageBody::Join(chan.clone()), } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N332Topic { client: user.nickname.clone(), chat: chan.clone(), topic: room_info.topic.clone(), }, } .write_async(writer) .await?; let mut members = if let Some(head) = room_info.members.first() { head.as_bytes().clone() } else { user.nickname.clone() }; for i in &room_info.members[1..] { members.push(b' '); members.extend(i.as_bytes()); } ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N353NamesReply { client: user.nickname.clone(), chan: chan.clone(), members, }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N366NamesReplyEnd { client: user.nickname.clone(), chan: chan.clone(), }, } .write_async(writer) .await?; Ok(()) } pub async fn launch( config: ServerConfig, players: PlayerRegistry, rooms: RoomRegistry, metrics: MetricsRegistry, ) -> Result { log::info!("Starting IRC projection"); let (signal, mut rx) = channel(); let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; let total_connections = IntCounter::new( "irc_total_connections", "Total number of opened connections", )?; metrics.register(Box::new(current_connections.clone()))?; metrics.register(Box::new(total_connections.clone()))?; let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); let handle = tokio::task::spawn(async move { loop { select! { biased; _ = &mut rx => break, new_conn = listener.accept() => { match new_conn { Ok((stream, socket_addr)) => { let config = config.clone(); total_connections.inc(); current_connections.inc(); log::debug!("Incoming connection from {socket_addr}"); let players = players.clone(); let rooms = rooms.clone(); let current_connections_clone = current_connections.clone(); let handle = tokio::task::spawn(async move { match handle_socket(config, stream, socket_addr, players, rooms).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } current_connections_clone.dec(); }); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), } }, } } log::info!("Stopping IRC projection"); Ok(()) }); let terminator = Terminator::from_raw(signal, handle); Ok(terminator) }