use std::net::SocketAddr; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use serde::Deserialize; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot::channel; use crate::core::player::PlayerRegistry; use crate::prelude::*; use crate::protos::irc::client::{client_message, ClientMessage}; use crate::protos::irc::server::{ServerMessage, ServerMessageBody}; use crate::util::Terminator; #[derive(Deserialize, Debug, Clone)] pub struct ServerConfig { pub listen_on: SocketAddr, pub server_name: String, } #[derive(Debug)] struct RegisteredUser { nickname: Vec, username: Vec, realname: Vec, } async fn handle_socket( config: ServerConfig, mut stream: TcpStream, socket_addr: SocketAddr, mut players: PlayerRegistry, current_connections: IntGauge, ) { let (reader, writer) = stream.split(); let mut reader: BufReader = BufReader::new(reader); let mut writer = BufWriter::new(writer); { let notice = ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::Notice { first_target: b"*".to_vec(), rest_targets: vec![], text: b"Welcome to my server!".to_vec(), }, }; let mut buffer = vec![]; notice.write(&mut buffer).unwrap(); writer.write_all(buffer.as_slice()).await; writer.flush().await; } let mut buffer = vec![]; // let (player_id, mut player_handle) = players.create_player().await; // let mut sub = player_handle.subscribe().await; let mut future_nickname: Option> = None; let mut future_username: Option<(Vec, Vec)> = None; let registered_user: Result = loop { let res = reader.read_until(b'\n', &mut buffer).await; match res { Ok(len) => { if len == 0 { log::info!("Terminating socket"); break Err(anyhow::Error::msg("EOF")); } } Err(err) => { log::warn!("Failed to read from socket: {err}"); break Err(err.into()); } } let parsed = client_message(&buffer[..]); match parsed { Ok((rest, msg)) => { log::info!("Incoming IRC message: {msg:?}"); match msg { ClientMessage::Nick { nickname } => { if let Some((username, realname)) = future_username { break Ok(RegisteredUser { nickname, username, realname, }); } else { future_nickname = Some(nickname); } } ClientMessage::User { username, realname } => { if let Some(nickname) = future_nickname { break Ok(RegisteredUser { nickname, username, realname, }); } else { future_username = Some((username, realname)); } } _ => {} } } Err(err) => { log::warn!("Failed to parse IRC message: {err}"); } } buffer.clear(); }; match registered_user { Ok(user) => { handle_registered_socket(config, socket_addr, players, reader, writer, user).await } Err(_) => {} } current_connections.dec(); } async fn handle_registered_socket<'a>( config: ServerConfig, socket_addr: SocketAddr, mut players: PlayerRegistry, mut reader: BufReader>, mut writer: BufWriter>, user: RegisteredUser, ) { let mut buffer = vec![]; log::info!("Handling registered user: {user:?}"); { let notice = ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N001Welcome { client: user.nickname.clone(), text: b"Welcome to Kek Server".to_vec(), }, }; let mut buffer = vec![]; notice.write(&mut buffer).unwrap(); writer.write_all(buffer.as_slice()).await; writer.flush().await; } { let notice = ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N002YourHost { client: user.nickname.clone(), text: b"Welcome to Kek Server".to_vec(), }, }; let mut buffer = vec![]; notice.write(&mut buffer).unwrap(); writer.write_all(buffer.as_slice()).await; writer.flush().await; } { let notice = ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N003Created { client: user.nickname.clone(), text: b"Welcome to Kek Server".to_vec(), }, }; let mut buffer = vec![]; notice.write(&mut buffer).unwrap(); writer.write_all(buffer.as_slice()).await; writer.flush().await; } { let notice = ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N004MyInfo { client: user.nickname.clone(), hostname: config.server_name.as_bytes().to_vec(), softname: b"kek-0.1.alpha.3".to_vec(), }, }; let mut buffer = vec![]; notice.write(&mut buffer).unwrap(); writer.write_all(buffer.as_slice()).await; writer.flush().await; } { let notice = ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), body: ServerMessageBody::N005ISupport { client: user.nickname.clone(), params: b"CHANTYPES=#".to_vec(), }, }; let mut buffer = vec![]; notice.write(&mut buffer).unwrap(); writer.write_all(buffer.as_slice()).await; writer.flush().await; } loop { select! { biased; res = reader.read_until(b'\n', &mut buffer) => { match res { Ok(len) => if len == 0 { log::info!("Terminating socket"); break; }, Err(err) => log::warn!("Failed to read from socket: {err}"), } let parsed = client_message(&buffer[..]); match parsed { Ok((rest, msg)) => { match msg { ClientMessage::Ping { token } => { let response = ServerMessage { tags: vec![], sender: None, body: ServerMessageBody::Pong { from: config.server_name.as_bytes().to_vec(), token, } }; let mut buffer = vec![]; response.write(&mut buffer).unwrap(); writer.write_all(buffer.as_slice()).await; writer.flush().await; }, _ => {}, } }, Err(err) => { log::warn!("Failed to parse IRC message: {err}"); }, } buffer.clear(); }, } } } pub async fn launch( config: ServerConfig, players: PlayerRegistry, metrics: MetricsRegistry, ) -> Result { log::info!("Starting IRC projection"); let (signal, mut rx) = channel(); let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; let total_connections = IntCounter::new( "irc_total_connections", "Total number of opened connections", )?; metrics.register(Box::new(current_connections.clone()))?; metrics.register(Box::new(total_connections.clone()))?; let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); let handle = tokio::task::spawn(async move { loop { select! { biased; _ = &mut rx => break, new_conn = listener.accept() => { match new_conn { Ok((stream, socket_addr)) => { let config = config.clone(); total_connections.inc(); current_connections.inc(); log::debug!("Incoming connection from {socket_addr}"); let handle = tokio::task::spawn(handle_socket(config, stream, socket_addr, players.clone(), current_connections.clone())); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), } }, } } log::info!("Stopping IRC projection"); Ok(()) }); let terminator = Terminator::from_raw(signal, handle); Ok(terminator) }