use std::collections::HashMap; use std::net::SocketAddr; use anyhow::{anyhow, Result}; use chrono::SecondsFormat; use futures_util::future::join_all; use nonempty::nonempty; use nonempty::NonEmpty; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use serde::Deserialize; use tokio::io::AsyncReadExt; use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::channel; use lavina_core::auth::Verdict; use lavina_core::player::*; use lavina_core::prelude::*; use lavina_core::room::{RoomId, RoomInfo}; use lavina_core::terminator::Terminator; use lavina_core::LavinaCore; use proto_irc::client::CapabilitySubcommand; use proto_irc::client::{client_message, ClientMessage}; use proto_irc::server::CapSubBody; use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; use proto_irc::user::PrefixedNick; use proto_irc::{Chan, Recipient, Tag}; use sasl::AuthBody; mod cap; use handler::Handler; mod whois; use crate::cap::Capabilities; mod handler; pub const APP_VERSION: &str = concat!("lavina", "_", env!("CARGO_PKG_VERSION")); #[derive(Deserialize, Debug, Clone)] pub struct ServerConfig { pub listen_on: SocketAddr, pub server_name: Str, } #[derive(Debug)] struct RegisteredUser { nickname: Str, /** * Username is mostly unused in modern IRC. * * */ username: Str, realname: Str, enabled_capabilities: Capabilities, } async fn handle_socket( config: ServerConfig, mut stream: TcpStream, socket_addr: &SocketAddr, core: LavinaCore, termination: Deferred<()>, // TODO use it to stop the connection gracefully ) -> Result<()> { log::info!("Received an IRC connection from {socket_addr}"); let (reader, writer) = stream.split(); let mut reader: BufReader = BufReader::new(reader); let mut writer = BufWriter::new(writer); pin!(termination); select! { biased; _ = &mut termination =>{ log::info!("Socket handling was terminated"); return Ok(()) }, registered_user = handle_registration(&mut reader, &mut writer, &core, &config) => match registered_user { Ok(user) => { log::debug!("User registered"); handle_registered_socket(config, &core, &mut reader, &mut writer, user).await?; } Err(err) => { log::debug!("Registration failed: {err}"); } } } stream.shutdown().await?; Ok(()) } struct RegistrationState { /// The last received `NICK` message. future_nickname: Option, /// The last received `USER` message. future_username: Option<(Str, Str)>, enabled_capabilities: Capabilities, /// `CAP LS` or `CAP REQ` was received, but not `CAP END`. cap_negotiation_in_progress: bool, /// The last received `PASS` message. pass: Option, authentication_started: bool, validated_user: Option, } impl RegistrationState { fn new() -> RegistrationState { RegistrationState { future_nickname: None, future_username: None, enabled_capabilities: Capabilities::None, cap_negotiation_in_progress: false, pass: None, authentication_started: false, validated_user: None, } } /// Handle an incoming message from the client during the registration process. /// /// Returns `Some` if the user is fully registered, `None` if the registration is still in progress. async fn handle_msg( &mut self, msg: ClientMessage, writer: &mut BufWriter>, core: &LavinaCore, config: &ServerConfig, ) -> Result> { match msg { ClientMessage::Pass { password } => { self.pass = Some(password); Ok(None) } ClientMessage::Capability { subcommand } => match subcommand { CapabilitySubcommand::List { code: _ } => { self.cap_negotiation_in_progress = true; ServerMessage { tags: vec![], sender: Some(config.server_name.clone().into()), body: ServerMessageBody::Cap { target: self.future_nickname.clone().unwrap_or_else(|| "*".into()), subcmd: CapSubBody::Ls("sasl=PLAIN server-time".into()), }, } .write_async(writer) .await?; writer.flush().await?; Ok(None) } CapabilitySubcommand::Req(caps) => { self.cap_negotiation_in_progress = true; let mut acked = vec![]; let mut naked = vec![]; for cap in caps { if &*cap.name == "sasl" { if cap.to_disable { self.enabled_capabilities &= !Capabilities::Sasl; } else { self.enabled_capabilities |= Capabilities::Sasl; } acked.push(cap); } else if &*cap.name == "server-time" { if cap.to_disable { self.enabled_capabilities &= !Capabilities::ServerTime; } else { self.enabled_capabilities |= Capabilities::ServerTime; } acked.push(cap); } else { naked.push(cap); } } let mut ack_body = String::new(); if let Some((first, tail)) = acked.split_first() { if first.to_disable { ack_body.push('-'); } ack_body += &*first.name; for cap in tail { ack_body.push(' '); if cap.to_disable { ack_body.push('-'); } ack_body += &*cap.name; } } ServerMessage { tags: vec![], sender: Some(config.server_name.clone().into()), body: ServerMessageBody::Cap { target: self.future_nickname.clone().unwrap_or_else(|| "*".into()), subcmd: CapSubBody::Ack(ack_body.into()), }, } .write_async(writer) .await?; writer.flush().await?; Ok(None) } CapabilitySubcommand::End => { let Some((ref username, ref realname)) = self.future_username else { self.cap_negotiation_in_progress = false; return Ok(None); }; let Some(nickname) = self.future_nickname.clone() else { self.cap_negotiation_in_progress = false; return Ok(None); }; let username = username.clone(); let realname = realname.clone(); let candidate_user = RegisteredUser { nickname: nickname.clone(), username, realname, enabled_capabilities: self.enabled_capabilities, }; self.finalize_auth(candidate_user, writer, core, config).await } }, ClientMessage::Nick { nickname } => { if self.cap_negotiation_in_progress { self.future_nickname = Some(nickname); Ok(None) } else if let Some((username, realname)) = &self.future_username.clone() { let candidate_user = RegisteredUser { nickname: nickname.clone(), username: username.clone(), realname: realname.clone(), enabled_capabilities: self.enabled_capabilities, }; self.finalize_auth(candidate_user, writer, core, config).await } else { self.future_nickname = Some(nickname); Ok(None) } } ClientMessage::User { username, realname } => { if self.cap_negotiation_in_progress { self.future_username = Some((username, realname)); Ok(None) } else if let Some(nickname) = self.future_nickname.clone() { let candidate_user = RegisteredUser { nickname: nickname.clone(), username, realname, enabled_capabilities: self.enabled_capabilities, }; self.finalize_auth(candidate_user, writer, core, config).await } else { self.future_username = Some((username, realname)); Ok(None) } } ClientMessage::Authenticate(body) => { if !self.authentication_started { tracing::debug!("Received authentication request"); if &*body == "PLAIN" { tracing::debug!("Authentication request with method PLAIN"); self.authentication_started = true; ServerMessage { tags: vec![], sender: Some(config.server_name.clone().into()), body: ServerMessageBody::Authenticate("+".into()), } .write_async(writer) .await?; writer.flush().await?; Ok(None) } else { let target = self.future_nickname.clone().unwrap_or_else(|| "*".into()); sasl_fail_message(config.server_name.clone(), target, "Unsupported mechanism".into()) .write_async(writer) .await?; writer.flush().await?; Ok(None) } } else { let body = AuthBody::from_str(body.as_bytes())?; if let Err(e) = auth_user(core, &body.login, &body.password).await { tracing::warn!("Authentication failed: {:?}", e); let target = self.future_nickname.clone().unwrap_or_else(|| "*".into()); sasl_fail_message(config.server_name.clone(), target, "Bad credentials".into()) .write_async(writer) .await?; writer.flush().await?; Ok(None) } else { let login: Str = body.login.into(); self.validated_user = Some(login.clone()); ServerMessage { tags: vec![], sender: Some(config.server_name.clone().into()), body: ServerMessageBody::N900LoggedIn { nick: login.clone(), address: login.clone(), account: login.clone(), message: format!("You are now logged in as {}", login).into(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone().into()), body: ServerMessageBody::N903SaslSuccess { nick: login.clone(), message: "SASL authentication successful".into(), }, } .write_async(writer) .await?; writer.flush().await?; Ok(None) } } // TODO handle abortion of authentication } _ => Ok(None), } } async fn finalize_auth( &mut self, candidate_user: RegisteredUser, writer: &mut BufWriter>, core: &LavinaCore, config: &ServerConfig, ) -> Result> { if self.enabled_capabilities.contains(Capabilities::Sasl) && self.validated_user.as_ref() == Some(&candidate_user.nickname) { Ok(Some(candidate_user)) } else { let Some(candidate_password) = &self.pass else { sasl_fail_message( config.server_name.clone(), candidate_user.nickname.clone(), "User credentials was not provided".into(), ) .write_async(writer) .await?; writer.flush().await?; return Ok(None); }; auth_user(core, &*candidate_user.nickname, &*candidate_password).await?; Ok(Some(candidate_user)) } } } async fn handle_registration<'a>( reader: &mut BufReader>, writer: &mut BufWriter>, core: &LavinaCore, config: &ServerConfig, ) -> Result { let mut buffer = vec![]; let mut state = RegistrationState::new(); let user = loop { let res = read_irc_message(reader, &mut buffer).await; tracing::trace!("Received message: {:?}", res); let len = match res { Ok(len) => len, Err(err) => { log::warn!("Failed to read from socket: {err}"); break Err(err.into()); } }; if len == 0 { log::info!("Terminating socket"); break Err(anyhow::Error::msg("EOF")); } let res = match std::str::from_utf8(&buffer[..len - 2]) { Ok(res) => res, Err(e) => break Err(e.into()), }; tracing::trace!("Incoming raw IRC message: '{res}'"); let parsed = client_message(res); let msg = match parsed { Ok(msg) => msg, Err(err) => { tracing::warn!("Failed to parse IRC message: {err}"); buffer.clear(); continue; } }; tracing::debug!("Incoming IRC message: {msg:?}"); if let Some(user) = state.handle_msg(msg, writer, core, config).await? { break Ok(user); } buffer.clear(); }?; // TODO properly implement session temination Ok(user) } fn sasl_fail_message(sender: Str, nick: Str, text: Str) -> ServerMessage { ServerMessage { tags: vec![], sender: Some(sender), body: ServerMessageBody::N904SaslFail { nick, text }, } } async fn auth_user(core: &LavinaCore, login: &str, plain_password: &str) -> Result<()> { let verdict = core.authenticate(login, plain_password).await?; // TODO properly map these onto protocol messages match verdict { Verdict::Authenticated => Ok(()), Verdict::UserNotFound => Err(anyhow!("no user found")), Verdict::InvalidPassword => Err(anyhow!("incorrect credentials")), } } async fn handle_registered_socket<'a>( config: ServerConfig, core: &LavinaCore, reader: &mut BufReader>, writer: &mut BufWriter>, user: RegisteredUser, ) -> Result<()> { let mut buffer = vec![]; log::info!("Handling registered user: {user:?}"); let player_id = PlayerId::from(user.nickname.clone())?; let mut connection = core.connect_to_player(&player_id).await; let text: Str = format!("Welcome to {} Server", &config.server_name).into(); ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N001Welcome { client: user.nickname.clone(), text: text.clone(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N002YourHost { client: user.nickname.clone(), text: text.clone(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N003Created { client: user.nickname.clone(), text: text.clone(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N004MyInfo { client: user.nickname.clone(), hostname: config.server_name.clone(), softname: APP_VERSION.into(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N005ISupport { client: user.nickname.clone(), params: "CHANTYPES=#".into(), }, } .write_async(writer) .await?; let rooms_list = connection.get_rooms().await?; for room in &rooms_list { produce_on_join_cmd_messages(&config, &user, &Chan::Global(room.id.as_inner().clone()), room, writer).await?; } writer.flush().await?; loop { select! { biased; len = read_irc_message(reader, &mut buffer) => { let len = len?; let len = if len == 0 { log::info!("EOF, Terminating socket"); break; } else { len }; let incoming = std::str::from_utf8(&buffer[0..len-2])?; if let HandleResult::Leave = handle_incoming_message(incoming, &config, &user, core, &mut connection, writer).await? { break; } buffer.clear(); }, update = connection.receiver.recv() => { match update { Some(ConnectionMessage::Update(update)) => { handle_update(&config, &user, &player_id, writer, core, update).await?; } Some(ConnectionMessage::Stop(_)) => { tracing::debug!("Connection is being terminated"); break; } None => { log::warn!("Player is terminated, must terminate the connection"); break; } } } } } ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::Error { reason: "Leaving the server".into(), }, } .write_async(writer) .await?; writer.flush().await?; connection.terminate().await; Ok(()) } // TODO this is public only for tests, perhaps move this into proto-irc // TODO limit buffer size in size to protect against dos attacks with large payloads pub async fn read_irc_message(reader: &mut BufReader>, buf: &mut Vec) -> Result { let mut size = 0; 'outer: loop { let res = reader.read_until(b'\r', buf).await?; size += res; let next = reader.read_u8().await?; buf.push(next); size += 1; if next != b'\n' { continue 'outer; } return Ok(size); } } async fn handle_update( config: &ServerConfig, user: &RegisteredUser, player_id: &PlayerId, writer: &mut (impl AsyncWrite + Unpin), core: &LavinaCore, update: Updates, ) -> Result<()> { log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); match update { Updates::RoomJoined { new_member_id, room_id } => { if player_id == &new_member_id { if let Some(room) = core.get_room(&room_id).await { let room_info = room.get_room_info().await; let chan = Chan::Global(room_id.as_inner().clone()); produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?; writer.flush().await?; } else { log::warn!("Received join to a non-existant room"); } } else { ServerMessage { tags: vec![], sender: Some(new_member_id.as_inner().clone()), body: ServerMessageBody::Join(Chan::Global(room_id.as_inner().clone())), } .write_async(writer) .await?; writer.flush().await? } } Updates::RoomLeft { room_id, former_member_id, } => { ServerMessage { tags: vec![], sender: Some(former_member_id.as_inner().clone()), body: ServerMessageBody::Part(Chan::Global(room_id.as_inner().clone())), } .write_async(writer) .await?; writer.flush().await? } Updates::NewMessage { author_id, room_id, body, created_at, } => { let mut tags = vec![]; if user.enabled_capabilities.contains(Capabilities::ServerTime) { let tag = Tag { key: "time".into(), value: Some(created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()), }; tags.push(tag); } ServerMessage { tags, sender: Some(author_id.as_inner().clone()), body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_inner().clone())), body: body.clone(), }, } .write_async(writer) .await?; writer.flush().await? } Updates::RoomTopicChanged { room_id, new_topic } => { ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N332Topic { client: user.nickname.clone(), chat: Chan::Global(room_id.as_inner().clone()), topic: new_topic, }, } .write_async(writer) .await?; writer.flush().await? } Updates::BannedFrom(room_id) => { // TODO think about the case when the user was banned, but was not in the room - no need to send PART in this case ServerMessage { tags: vec![], sender: Some(player_id.as_inner().clone()), body: ServerMessageBody::Part(Chan::Global(room_id.as_inner().clone())), } .write_async(writer) .await?; writer.flush().await? } Updates::NewDialogMessage { sender, receiver, body, created_at, } => { let mut tags = vec![]; if user.enabled_capabilities.contains(Capabilities::ServerTime) { let tag = Tag { key: "time".into(), value: Some(created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()), }; tags.push(tag); } ServerMessage { tags, sender: Some(sender.as_inner().clone()), body: ServerMessageBody::PrivateMessage { target: Recipient::Nick(receiver.as_inner().clone()), body: body.clone(), }, } .write_async(writer) .await?; writer.flush().await? } } Ok(()) } enum HandleResult { Continue, Leave, } #[tracing::instrument(skip_all, name = "handle_incoming_message")] async fn handle_incoming_message( buffer: &str, config: &ServerConfig, user: &RegisteredUser, core: &LavinaCore, user_handle: &mut PlayerConnection, writer: &mut (impl AsyncWrite + Unpin), ) -> Result { log::debug!("Incoming raw IRC message: '{buffer}'"); let parsed = client_message(buffer); log::debug!("Incoming IRC message: {parsed:?}"); match parsed { Ok(msg) => match msg { ClientMessage::Ping { token } => { ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::Pong { from: config.server_name.clone(), token, }, } .write_async(writer) .await?; writer.flush().await?; } ClientMessage::Join(ref chan) => { handle_join(&config, &user, user_handle, chan, writer).await?; } ClientMessage::Part { chan, message } => { handle_part(config, user, user_handle, &chan, writer).await?; } ClientMessage::PrivateMessage { recipient, body } => match recipient { Recipient::Chan(Chan::Global(chan)) => { let room_id = RoomId::from(chan)?; user_handle.send_message(room_id, body).await?; } Recipient::Nick(nick) => { let receiver = PlayerId::from(nick)?; user_handle.send_dialog_message(receiver, body).await?; } _ => log::warn!("Unsupported target type"), }, ClientMessage::Topic { chan, topic } => { match chan { Chan::Global(chan) => { let room_id = RoomId::from(chan)?; user_handle.change_topic(room_id.clone(), topic.clone()).await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N332Topic { client: user.nickname.clone(), chat: Chan::Global(room_id.as_inner().clone()), topic, }, } .write_async(writer) .await?; writer.flush().await?; } Chan::Local(_) => {} }; } ClientMessage::Who { target } => match &target { Recipient::Nick(nick) => { // TODO handle non-existing user ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: user_to_who_msg(config, user, nick), } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N315EndOfWho { client: user.nickname.clone(), mask: target.clone(), msg: "End of WHO list".into(), }, } .write_async(writer) .await?; writer.flush().await?; } Recipient::Chan(Chan::Global(chan)) => { let room = core.get_room(&RoomId::from(chan.clone())?).await; if let Some(room) = room { let room_info = room.get_room_info().await; for member in room_info.members { ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: user_to_who_msg(config, user, member.as_inner()), } .write_async(writer) .await?; } } ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N315EndOfWho { client: user.nickname.clone(), mask: target.clone(), msg: "End of WHO list".into(), }, } .write_async(writer) .await?; writer.flush().await?; } Recipient::Chan(Chan::Local(_)) => { log::warn!("Local chans not supported"); } }, ClientMessage::Whois { arg } => { arg.handle(handler::IrcConnection { server_name: config.server_name.clone(), client: user.nickname.clone(), writer, player_connection: user_handle, }) .await?; writer.flush().await?; } ClientMessage::Mode { target } => { match target { Recipient::Nick(nickname) => { if nickname == user.nickname { ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N221UserModeIs { client: user.nickname.clone(), modes: "+r".into(), }, } .write_async(writer) .await?; writer.flush().await?; } else { ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N502UsersDontMatch { client: user.nickname.clone(), message: "Cant change mode for other users".into(), }, } .write_async(writer) .await?; writer.flush().await?; } } Recipient::Chan(_) => { // TODO handle chan mode handling } } } ClientMessage::Quit { reason } => { log::info!("Received QUIT"); return Ok(HandleResult::Leave); } cmd => { log::warn!("Not implemented handler for client command: {cmd:?}"); } }, Err(err) => { log::warn!("Failed to parse IRC message: {err}"); } } Ok(HandleResult::Continue) } fn user_to_who_msg(config: &ServerConfig, requestor: &RegisteredUser, target_user_nickname: &Str) -> ServerMessageBody { // Username is equal to nickname let username = format!("~{target_user_nickname}").into(); // User's host is not public, replace it with `user/` pattern let host = format!("user/{target_user_nickname}").into(); ServerMessageBody::N352WhoReply { client: requestor.nickname.clone(), username, host, server: config.server_name.clone(), flags: AwayStatus::Here, nickname: target_user_nickname.clone(), hops: 0, // TODO Realname is not available yet, should be matched to a future core's player field realname: target_user_nickname.clone(), } } async fn handle_join( config: &ServerConfig, user: &RegisteredUser, user_handle: &mut PlayerConnection, chan: &Chan, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { match chan { Chan::Global(chan_name) => { let room_id = RoomId::from(chan_name.clone())?; if let JoinResult::Success(room_info) = user_handle.join_room(room_id).await? { produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?; } else { ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N474BannedFromChan { client: user.nickname.clone(), chan: chan.clone(), message: "U dun goofed".into(), }, } .write_async(writer) .await?; } writer.flush().await?; } Chan::Local(_) => {} }; Ok(()) } async fn handle_part( config: &ServerConfig, user: &RegisteredUser, user_handle: &mut PlayerConnection, chan: &Chan, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { if let Chan::Global(chan_name) = chan { let room_id = RoomId::from(chan_name.clone())?; user_handle.leave_room(room_id).await?; ServerMessage { tags: vec![], sender: Some(user.nickname.clone()), body: ServerMessageBody::Part(Chan::Global(chan_name.clone())), } .write_async(writer) .await?; writer.flush().await?; } else { log::warn!("Local chans unsupported"); } Ok(()) } async fn produce_on_join_cmd_messages( config: &ServerConfig, user: &RegisteredUser, chan: &Chan, room_info: &RoomInfo, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { ServerMessage { tags: vec![], sender: Some(user.nickname.clone()), body: ServerMessageBody::Join(chan.clone()), } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N332Topic { client: user.nickname.clone(), chat: chan.clone(), topic: room_info.topic.clone(), }, } .write_async(writer) .await?; let prefixed_members: Vec = room_info.members.iter().map(|member| PrefixedNick::from_str(member.clone().into_inner())).collect(); let non_empty_members: NonEmpty = NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N353NamesReply { client: user.nickname.clone(), chan: chan.clone(), members: non_empty_members.into(), }, } .write_async(writer) .await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N366NamesReplyEnd { client: user.nickname.clone(), chan: chan.clone(), }, } .write_async(writer) .await?; Ok(()) } pub struct RunningServer { pub addr: SocketAddr, terminator: Terminator, } impl RunningServer { pub async fn terminate(self) -> Result<()> { self.terminator.terminate().await } } pub async fn launch(config: ServerConfig, core: LavinaCore, metrics: MetricsRegistry) -> Result { log::info!("Starting IRC projection"); let (stopped_tx, mut stopped_rx) = channel(32); let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; let total_connections = IntCounter::new("irc_total_connections", "Total number of opened connections")?; metrics.register(Box::new(current_connections.clone()))?; metrics.register(Box::new(total_connections.clone()))?; let listener = TcpListener::bind(config.listen_on).await?; let addr = listener.local_addr()?; let terminator = Terminator::spawn(|mut rx| async move { // TODO probably should separate logic for accepting new connection and storing them // into two tasks so that they don't block each other let mut actors = HashMap::new(); loop { select! { biased; _ = &mut rx => break, stopped = stopped_rx.recv() => match stopped { Some(stopped) => { let _ = actors.remove(&stopped); }, None => unreachable!(), }, new_conn = listener.accept() => { match new_conn { Ok((stream, socket_addr)) => { let config = config.clone(); total_connections.inc(); current_connections.inc(); log::debug!("Incoming connection from {socket_addr}"); if actors.contains_key(&socket_addr) { log::warn!("Already contains connection form {socket_addr}"); // TODO kill the older connection and restart it continue; } let terminator = Terminator::spawn(|termination| { let core = core.clone(); let current_connections_clone = current_connections.clone(); let stopped_tx = stopped_tx.clone(); async move { match handle_socket(config, stream, &socket_addr, core, termination).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } current_connections_clone.dec(); stopped_tx.send(socket_addr).await?; Ok(()) } }); actors.insert(socket_addr, terminator); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), } }, } } log::info!("Stopping IRC projection"); join_all(actors.into_iter().map(|(socket_addr, terminator)| async move { log::debug!("Stopping IRC connection at {socket_addr}"); match terminator.terminate().await { Ok(_) => log::debug!("Stopped IRC connection at {socket_addr}"), Err(err) => { log::warn!("IRC connection to {socket_addr} finished with error: {err}") } } })) .await; log::info!("Stopped IRC projection"); Ok(()) }); log::info!("Started IRC projection"); Ok(RunningServer { addr, terminator }) }