From a8d6a98a5bace258f1ec93eadbd814bfeaa495d5 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Tue, 14 Feb 2023 18:53:43 +0100 Subject: [PATCH] produce join messages on joins from other connections --- src/core/player.rs | 77 ++++++++++++++---- src/core/room.rs | 51 ++++++++++-- src/main.rs | 2 +- src/prelude.rs | 2 +- src/projections/irc/mod.rs | 150 ++++++++++++++++++++++-------------- src/projections/irc/test.rs | 45 ++++++++--- src/protos/irc/client.rs | 87 +++++++++++++++++++-- 7 files changed, 318 insertions(+), 96 deletions(-) diff --git a/src/core/player.rs b/src/core/player.rs index 6e87f92..42a1378 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -20,7 +20,7 @@ use tokio::{ }; use crate::{ - core::room::{RoomId, RoomRegistry, RoomInfo}, + core::room::{RoomId, RoomInfo, RoomRegistry}, prelude::*, util::table::{AnonTable, Key as AnonKey}, }; @@ -45,7 +45,9 @@ impl PlayerConnection { } pub async fn join_room(&mut self, room_id: RoomId) -> Result { - self.player_handle.join_room(room_id).await + self.player_handle + .join_room(room_id, self.connection_id.clone()) + .await } } @@ -69,12 +71,7 @@ impl PlayerHandle { } } - pub async fn send_message( - &self, - room_id: RoomId, - connection_id: ConnectionId, - body: String, - ) { + pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: String) { self.tx .send(PlayerCommand::SendMessage { room_id, @@ -84,9 +81,19 @@ impl PlayerHandle { .await; } - pub async fn join_room(&self, room_id: RoomId) -> Result { + pub async fn join_room( + &self, + room_id: RoomId, + connection_id: ConnectionId, + ) -> Result { let (promise, deferred) = oneshot(); - self.tx.send(PlayerCommand::JoinRoom { room_id, promise }).await; + self.tx + .send(PlayerCommand::JoinRoom { + room_id, + connection_id, + promise, + }) + .await; Ok(deferred.await?) } @@ -106,11 +113,17 @@ impl PlayerHandle { }) .await; } + + pub async fn send(&self, command: PlayerCommand) { + self.tx.send(command).await; + } } /// Player update event type which is sent to a connection handler. pub enum Updates { RoomJoined { + player_id: PlayerId, + connection_id: ConnectionId, room_id: RoomId, }, NewMessage { @@ -120,13 +133,15 @@ pub enum Updates { body: String, }, } -enum PlayerCommand { +pub enum PlayerCommand { + /** Commands from connections */ AddSocket { sender: Sender, promise: OneshotSender, }, JoinRoom { room_id: RoomId, + connection_id: ConnectionId, promise: Promise, }, SendMessage { @@ -134,12 +149,18 @@ enum PlayerCommand { connection_id: ConnectionId, body: String, }, + /** Events from rooms */ IncomingMessage { room_id: RoomId, connection_id: ConnectionId, author: PlayerId, body: String, }, + IncomingRoomJoined { + room_id: RoomId, + new_member_id: PlayerId, + connection_id: ConnectionId, + }, } /// Handle to a player registry — a shared data structure containing information about players. @@ -209,18 +230,27 @@ impl Player { let connection_id = self.sockets.insert(sender); promise.send(ConnectionId(connection_id)); } - PlayerCommand::JoinRoom { room_id, promise } => { + PlayerCommand::JoinRoom { + room_id, + connection_id, + promise, + } => { let mut room = rooms.get_or_create_room(room_id.clone()); - room.subscribe(player_id.clone(), handle.clone()).await; + room.subscribe(player_id.clone(), connection_id, handle.clone()) + .await; let members = room.get_members().await; - promise.send(RoomInfo { id: room_id, members, topic: b"some topic lol".to_vec() }); + promise.send(RoomInfo { + id: room_id, + members, + topic: b"some topic lol".to_vec(), + }); } PlayerCommand::SendMessage { room_id, connection_id, body, } => { - let room = rooms.get_room(room_id); + let room = rooms.get_room(&room_id); match room { Some(mut room) => { room.send_message(player_id.clone(), connection_id, body) @@ -253,6 +283,23 @@ impl Player { .await; } } + PlayerCommand::IncomingRoomJoined { + room_id, + new_member_id, + connection_id, + } => { + for socket in &self.sockets { + let room_id = room_id.clone(); + let connection_id = connection_id.clone(); + socket + .send(Updates::RoomJoined { + player_id: new_member_id.clone(), + connection_id, + room_id, + }) + .await; + } + } } } self diff --git a/src/core/room.rs b/src/core/room.rs index 1a29392..cb7706b 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -9,7 +9,7 @@ use prometheus::{IntGauge, Registry as MetricRegistry}; use tokio::sync::RwLock as AsyncRwLock; use crate::{ - core::player::{PlayerHandle, PlayerId}, + core::player::{PlayerCommand, PlayerHandle, PlayerId}, prelude::*, }; @@ -50,9 +50,9 @@ impl RoomRegistry { } } - pub fn get_room(&self, room_id: RoomId) -> Option { + pub fn get_room(&self, room_id: &RoomId) -> Option { let inner = self.0.read().unwrap(); - let res = inner.rooms.get(&room_id); + let res = inner.rooms.get(room_id); res.map(|r| r.clone()) } } @@ -65,9 +65,15 @@ struct RoomRegistryInner { #[derive(Clone)] pub struct RoomHandle(Arc>); impl RoomHandle { - pub async fn subscribe(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { + pub async fn subscribe( + &self, + player_id: PlayerId, + connection_id: ConnectionId, + player_handle: PlayerHandle, + ) { let mut lock = self.0.write().await; - lock.add_subscriber(player_id, player_handle); + lock.add_subscriber(player_id, connection_id, player_handle) + .await; } pub async fn send_message( @@ -82,7 +88,23 @@ impl RoomHandle { pub async fn get_members(&self) -> Vec { let lock = self.0.read().await; - lock.subscriptions.keys().map(|x| x.clone()).collect::>() + lock.subscriptions + .keys() + .map(|x| x.clone()) + .collect::>() + } + + pub async fn get_room_info(&self) -> RoomInfo { + let lock = self.0.read().await; + RoomInfo { + id: lock.room_id.clone(), + members: lock + .subscriptions + .keys() + .map(|x| x.clone()) + .collect::>(), + topic: b"some topic lol".to_vec(), + } } } @@ -91,9 +113,22 @@ struct Room { subscriptions: HashMap, } impl Room { - fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { + async fn add_subscriber( + &mut self, + player_id: PlayerId, + connection_id: ConnectionId, + player_handle: PlayerHandle, + ) { tracing::info!("Adding a subscriber to room"); - self.subscriptions.insert(player_id, player_handle); + self.subscriptions.insert(player_id.clone(), player_handle); + for (_, sub) in &self.subscriptions { + sub.send(PlayerCommand::IncomingRoomJoined { + room_id: self.room_id.clone(), + new_member_id: player_id.clone(), + connection_id: connection_id.clone(), + }) + .await; + } } async fn send_message(&self, player_id: PlayerId, connection_id: ConnectionId, body: String) { diff --git a/src/main.rs b/src/main.rs index 5ea1525..81749d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,7 +43,7 @@ async fn main() -> Result<()> { let rooms = RoomRegistry::empty(&mut metrics)?; let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone()).await?; - let irc = projections::irc::launch(irc_config, players, metrics.clone()).await?; + let irc = projections::irc::launch(irc_config, players, rooms.clone(), metrics.clone()).await?; tracing::info!("Started"); sleep.await; diff --git a/src/prelude.rs b/src/prelude.rs index d9dd2c2..4bcc546 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -2,8 +2,8 @@ pub use std::future::Future; pub use tokio::pin; pub use tokio::select; -pub use tokio::task::JoinHandle; pub use tokio::sync::oneshot::{channel as oneshot, Receiver as Deferred, Sender as Promise}; +pub use tokio::task::JoinHandle; pub mod log { pub use tracing::{debug, error, info, warn}; diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index f68aae7..99b18d9 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot::channel; use crate::core::player::{ ConnectionId, PlayerConnection, PlayerHandle, PlayerId, PlayerRegistry, Updates, }; -use crate::core::room::RoomId; +use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; use crate::prelude::*; use crate::protos::irc::client::{client_message, ClientMessage}; use crate::protos::irc::server::{ServerMessage, ServerMessageBody}; @@ -38,6 +38,7 @@ async fn handle_socket( mut stream: TcpStream, socket_addr: SocketAddr, mut players: PlayerRegistry, + rooms: RoomRegistry, ) -> Result<()> { let (reader, writer) = stream.split(); let mut reader: BufReader = BufReader::new(reader); @@ -60,8 +61,16 @@ async fn handle_socket( handle_registration(&mut reader, &mut writer).await; match registered_user { Ok(user) => { - handle_registered_socket(config, socket_addr, players, &mut reader, &mut writer, user) - .await?; + handle_registered_socket( + config, + socket_addr, + players, + rooms, + &mut reader, + &mut writer, + user, + ) + .await?; } Err(_) => {} } @@ -135,6 +144,7 @@ async fn handle_registered_socket<'a>( config: ServerConfig, socket_addr: SocketAddr, mut players: PlayerRegistry, + rooms: RoomRegistry, reader: &mut BufReader>, writer: &mut BufWriter>, user: RegisteredUser, @@ -214,7 +224,25 @@ async fn handle_registered_socket<'a>( }, update = connection.receiver.recv() => { match update.unwrap() { - Updates::RoomJoined { room_id } => {}, + Updates::RoomJoined { player_id: author_id, connection_id, room_id } => { + if player_id == author_id { + if let Some(room) = rooms.get_room(&room_id) { + let room_info = room.get_room_info().await; + let chan = Chan::Global(room_id.0); + produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?; + writer.flush().await?; + } else { + log::warn!("Received join to a non-existant room"); + } + } else { + ServerMessage { + tags: vec![], + sender: Some(author_id.0.clone()), + body: ServerMessageBody::Join(Chan::Global(room_id.0)), + }.write_async(writer).await?; + writer.flush().await? + } + }, Updates::NewMessage { author_id, connection_id, room_id, body } => { if player_id != author_id || connection.connection_id != connection_id { ServerMessage { @@ -288,65 +316,74 @@ async fn handle_join( match chan { Chan::Global(ref room) => { let room_info = user_handle.join_room(RoomId(room.clone())).await?; - ServerMessage { - tags: vec![], - sender: Some(user.nickname.clone()), - body: ServerMessageBody::Join(chan.clone()), - } - .write_async(writer) - .await?; - ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N332Topic { - client: user.nickname.clone(), - chat: chan.clone(), - topic: room_info.topic, - }, - } - .write_async(writer) - .await?; - let mut members = if let Some(head) = room_info.members.first() { - head.0.clone() - } else { - user.nickname.clone() - }; - for i in &room_info.members[1..] { - members.push(b' '); - members.extend(&i.0); - } - ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N353NamesReply { - client: user.nickname.clone(), - chan: chan.clone(), - members, - }, - } - .write_async(writer) - .await?; - ServerMessage { - tags: vec![], - sender: Some(config.server_name.as_bytes().to_vec()), - body: ServerMessageBody::N366NamesReplyEnd { - client: user.nickname.clone(), - chan: chan.clone(), - }, - } - .write_async(writer) - .await?; - - writer.flush().await?; } Chan::Local(_) => {} }; Ok(()) } +async fn produce_on_join_cmd_messages( + config: &ServerConfig, + user: &RegisteredUser, + chan: &Chan, + room_info: &RoomInfo, + writer: &mut (impl AsyncWrite + Unpin), +) -> Result<()> { + ServerMessage { + tags: vec![], + sender: Some(user.nickname.clone()), + body: ServerMessageBody::Join(chan.clone()), + } + .write_async(writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N332Topic { + client: user.nickname.clone(), + chat: chan.clone(), + topic: room_info.topic.clone(), + }, + } + .write_async(writer) + .await?; + let mut members = if let Some(head) = room_info.members.first() { + head.0.clone() + } else { + user.nickname.clone() + }; + for i in &room_info.members[1..] { + members.push(b' '); + members.extend(&i.0); + } + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N353NamesReply { + client: user.nickname.clone(), + chan: chan.clone(), + members, + }, + } + .write_async(writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N366NamesReplyEnd { + client: user.nickname.clone(), + chan: chan.clone(), + }, + } + .write_async(writer) + .await?; + Ok(()) +} + pub async fn launch( config: ServerConfig, players: PlayerRegistry, + rooms: RoomRegistry, metrics: MetricsRegistry, ) -> Result { log::info!("Starting IRC projection"); @@ -375,10 +412,11 @@ pub async fn launch( total_connections.inc(); current_connections.inc(); log::debug!("Incoming connection from {socket_addr}"); - let players_clone = players.clone(); + let players = players.clone(); + let rooms = rooms.clone(); let current_connections_clone = current_connections.clone(); let handle = tokio::task::spawn(async move { - match handle_socket(config, stream, socket_addr, players_clone).await { + match handle_socket(config, stream, socket_addr, players, rooms).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } diff --git a/src/projections/irc/test.rs b/src/projections/irc/test.rs index b03d930..6e0e897 100644 --- a/src/projections/irc/test.rs +++ b/src/projections/irc/test.rs @@ -51,20 +51,41 @@ async fn registration(scope: &mut TestScope<'_>, nickname: &str) -> Result<()> { expect!(scope, ":irc.localhost NOTICE * :Welcome to my server!\n"); send!(scope, "NICK {nickname}\n"); send!(scope, "USER UserName 0 * :Real Name\n"); - expect!(scope, ":irc.localhost 001 {nickname} :Welcome to Kek Server\n"); - expect!(scope, ":irc.localhost 002 {nickname} :Welcome to Kek Server\n"); - expect!(scope, ":irc.localhost 003 {nickname} :Welcome to Kek Server\n"); + expect!( + scope, + ":irc.localhost 001 {nickname} :Welcome to Kek Server\n" + ); + expect!( + scope, + ":irc.localhost 002 {nickname} :Welcome to Kek Server\n" + ); + expect!( + scope, + ":irc.localhost 003 {nickname} :Welcome to Kek Server\n" + ); expect!(scope, ":irc.localhost 004 {nickname} irc.localhost kek-0.1.alpha.3 DGMQRSZagiloswz CFILPQbcefgijklmnopqrstvz bkloveqjfI\n"); - expect!(scope, ":irc.localhost 005 {nickname} CHANTYPES=# :are supported by this server\n"); + expect!( + scope, + ":irc.localhost 005 {nickname} CHANTYPES=# :are supported by this server\n" + ); Ok(()) } async fn join(scope: &mut TestScope<'_>, nickname: &str) -> Result<()> { send!(scope, "JOIN #channol\n"); expect!(scope, ":{nickname} JOIN #channol\n"); - expect!(scope, ":irc.localhost 332 {nickname} #channol :chan topic lol\n"); - expect!(scope, ":irc.localhost 353 {nickname} = #channol :{nickname}\n"); - expect!(scope, ":irc.localhost 366 {nickname} #channol :End of /NAMES list\n"); + expect!( + scope, + ":irc.localhost 332 {nickname} #channol :chan topic lol\n" + ); + expect!( + scope, + ":irc.localhost 353 {nickname} = #channol :{nickname}\n" + ); + expect!( + scope, + ":irc.localhost 366 {nickname} #channol :End of /NAMES list\n" + ); Ok(()) } @@ -90,7 +111,10 @@ async fn test_two_connections_one_player() -> Result<()> { join(&mut scope1, "NickName").await?; join(&mut scope2, "NickName").await?; send!(scope1, "PRIVMSG #channol :Chmoki vsem v etam chati!\n"); - expect!(scope2, ":NickName PRIVMSG #channol :Chmoki vsem v etam chati!\n"); + expect!( + scope2, + ":NickName PRIVMSG #channol :Chmoki vsem v etam chati!\n" + ); send!(scope2, "PRIVMSG #channol :I tebe privetiki\n"); expect!(scope1, ":NickName PRIVMSG #channol :I tebe privetiki\n"); @@ -109,7 +133,10 @@ async fn test_two_players() -> Result<()> { join(&mut scope1, "NickName1").await?; join(&mut scope2, "NickName2").await?; send!(scope1, "PRIVMSG #channol :Chmoki vsem v etam chati!\n"); - expect!(scope2, ":NickName1 PRIVMSG #channol :Chmoki vsem v etam chati!\n"); + expect!( + scope2, + ":NickName1 PRIVMSG #channol :Chmoki vsem v etam chati!\n" + ); send!(scope2, "PRIVMSG #channol :I tebe privetiki\n"); expect!(scope1, ":NickName2 PRIVMSG #channol :I tebe privetiki\n"); diff --git a/src/protos/irc/client.rs b/src/protos/irc/client.rs index c754e78..895f90a 100644 --- a/src/protos/irc/client.rs +++ b/src/protos/irc/client.rs @@ -4,13 +4,21 @@ use super::*; #[derive(Clone, Debug, PartialEq, Eq)] pub enum ClientMessage { /// CAP. Capability-related commands. - Capability { subcommand: CapabilitySubcommand }, + Capability { + subcommand: CapabilitySubcommand, + }, /// PING - Ping { token: ByteVec }, + Ping { + token: ByteVec, + }, /// PONG - Pong { token: ByteVec }, + Pong { + token: ByteVec, + }, /// NICK - Nick { nickname: ByteVec }, + Nick { + nickname: ByteVec, + }, /// USER 0 * : User { username: ByteVec, @@ -18,10 +26,28 @@ pub enum ClientMessage { }, /// JOIN Join(Chan), + /// MODE + Mode(Chan), // TODO support not only chan + /// WHO + Who(Chan), // TODO support not only chan + /// TOPIC : + Topic { + chan: Chan, + topic: ByteVec, + }, + Part { + chan: Chan, + message: ByteVec, + }, /// PRIVMSG : - PrivateMessage { recipient: Recipient, body: ByteVec }, + PrivateMessage { + recipient: Recipient, + body: ByteVec, + }, /// QUIT : - Quit { reason: ByteVec }, + Quit { + reason: ByteVec, + }, } pub fn client_message(input: &[u8]) -> IResult<&[u8], ClientMessage> { @@ -32,6 +58,10 @@ pub fn client_message(input: &[u8]) -> IResult<&[u8], ClientMessage> { client_message_nick, client_message_user, client_message_join, + client_message_mode, + client_message_who, + client_message_topic, + client_message_part, client_message_privmsg, client_message_quit, ))(input) @@ -103,6 +133,40 @@ fn client_message_join(input: &[u8]) -> IResult<&[u8], ClientMessage> { Ok((input, ClientMessage::Join(chan))) } +fn client_message_mode(input: &[u8]) -> IResult<&[u8], ClientMessage> { + let (input, _) = tag("MODE ")(input)?; + let (input, chan) = chan(input)?; + + Ok((input, ClientMessage::Mode(chan))) +} + +fn client_message_who(input: &[u8]) -> IResult<&[u8], ClientMessage> { + let (input, _) = tag("WHO ")(input)?; + let (input, chan) = chan(input)?; + + Ok((input, ClientMessage::Who(chan))) +} + +fn client_message_topic(input: &[u8]) -> IResult<&[u8], ClientMessage> { + let (input, _) = tag("TOPIC ")(input)?; + let (input, chan) = chan(input)?; + let (input, _) = tag(b" :")(input)?; + let (input, topic) = token(input)?; + + let topic = topic.to_vec(); + Ok((input, ClientMessage::Topic { chan, topic })) +} + +fn client_message_part(input: &[u8]) -> IResult<&[u8], ClientMessage> { + let (input, _) = tag("PART ")(input)?; + let (input, chan) = chan(input)?; + let (input, _) = tag(b" :")(input)?; + let (input, message) = token(input)?; + + let message = message.to_vec(); + Ok((input, ClientMessage::Part { chan, message })) +} + fn client_message_privmsg(input: &[u8]) -> IResult<&[u8], ClientMessage> { let (input, _) = tag("PRIVMSG ")(input)?; let (input, recipient) = recipient(input)?; @@ -219,6 +283,17 @@ mod test { realname: b"Real Name".to_vec(), }; + let result = client_message(input); + assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); + } + #[test] + fn test_client_message_part() { + let input = b"PART #chan :Pokasiki !!!"; + let expected = ClientMessage::Part { + chan: Chan::Global(b"chan".to_vec()), + message: b"Pokasiki !!!".to_vec(), + }; + let result = client_message(input); assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); }