diff --git a/src/core/player.rs b/src/core/player.rs index c9cd050..b7ba5ba 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -15,34 +15,51 @@ use std::{ use prometheus::{IntGauge, Registry as MetricsRegistry}; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, + sync::oneshot::{channel as oneshot, Sender as OneshotSender}, task::JoinHandle, }; use crate::{ core::room::{RoomId, RoomRegistry}, prelude::*, - util::table::AnonTable, + util::table::{AnonTable, Key as AnonKey}, }; /// Opaque player identifier. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PlayerId(pub ByteVec); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ConnectionId(pub AnonKey); + /// Handle to a player actor. #[derive(Clone)] pub struct PlayerHandle { tx: Sender, } impl PlayerHandle { - pub async fn subscribe(&mut self) -> Receiver { + pub async fn subscribe(&mut self) -> (ConnectionId, Receiver) { let (sender, rx) = channel(32); - self.tx.send(PlayerCommand::AddSocket { sender }).await; - rx + let (promise, deferred) = oneshot(); + self.tx + .send(PlayerCommand::AddSocket { sender, promise }) + .await; + let connection_id = deferred.await.unwrap(); + (connection_id, rx) } - pub async fn send_message(&mut self, room_id: RoomId, body: String) { + pub async fn send_message( + &mut self, + room_id: RoomId, + connection_id: ConnectionId, + body: String, + ) { self.tx - .send(PlayerCommand::SendMessage { room_id, body }) + .send(PlayerCommand::SendMessage { + room_id, + connection_id, + body, + }) .await; } @@ -50,11 +67,18 @@ impl PlayerHandle { self.tx.send(PlayerCommand::JoinRoom { room_id }).await; } - pub async fn receive_message(&mut self, room_id: RoomId, author: PlayerId, body: String) { + pub async fn receive_message( + &mut self, + room_id: RoomId, + author: PlayerId, + connection_id: ConnectionId, + body: String, + ) { self.tx .send(PlayerCommand::IncomingMessage { room_id, author, + connection_id, body, }) .await; @@ -63,22 +87,32 @@ impl PlayerHandle { /// Player update event type which is sent to a connection handler. pub enum Updates { - RoomJoined { room_id: RoomId }, - NewMessage { room_id: RoomId, body: String }, + RoomJoined { + room_id: RoomId, + }, + NewMessage { + author_id: PlayerId, + connection_id: ConnectionId, + room_id: RoomId, + body: String, + }, } enum PlayerCommand { AddSocket { sender: Sender, + promise: OneshotSender, }, JoinRoom { room_id: RoomId, }, SendMessage { room_id: RoomId, + connection_id: ConnectionId, body: String, }, IncomingMessage { room_id: RoomId, + connection_id: ConnectionId, author: PlayerId, body: String, }, @@ -108,10 +142,14 @@ impl PlayerRegistry { sockets: AnonTable::new(), }; let mut inner = self.0.write().unwrap(); - let (handle, fiber) = player.launch(id.clone(), inner.room_registry.clone()); - inner.players.insert(id, (handle.clone(), fiber)); - inner.metric_active_players.inc(); - handle + if let Some((handle, _)) = inner.players.get(&id) { + handle.clone() + } else { + let (handle, fiber) = player.launch(id.clone(), inner.room_registry.clone()); + inner.players.insert(id, (handle.clone(), fiber)); + inner.metric_active_players.inc(); + handle + } } } @@ -138,18 +176,24 @@ impl Player { let fiber = tokio::task::spawn(async move { while let Some(cmd) = rx.recv().await { match cmd { - PlayerCommand::AddSocket { sender } => { - self.sockets.insert(sender); + PlayerCommand::AddSocket { sender, promise } => { + let connection_id = self.sockets.insert(sender); + promise.send(ConnectionId(connection_id)); } PlayerCommand::JoinRoom { room_id } => { let mut room = rooms.get_or_create_room(room_id); room.subscribe(player_id.clone(), handle.clone()).await; } - PlayerCommand::SendMessage { room_id, body } => { + PlayerCommand::SendMessage { + room_id, + connection_id, + body, + } => { let room = rooms.get_room(room_id); match room { Some(mut room) => { - room.send_message(player_id.clone(), body).await; + room.send_message(player_id.clone(), connection_id, body) + .await; } None => { tracing::info!("no room found"); @@ -159,12 +203,16 @@ impl Player { PlayerCommand::IncomingMessage { room_id, author, + connection_id, body, } => { tracing::info!("Handling incoming message"); for socket in &self.sockets { + log::info!("Send message to socket"); socket .send(Updates::NewMessage { + author_id: author.clone(), + connection_id: connection_id.clone(), room_id: room_id.clone(), body: body.clone(), }) diff --git a/src/core/room.rs b/src/core/room.rs index 46d4929..74c7eea 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -13,6 +13,8 @@ use crate::{ prelude::*, }; +use super::player::ConnectionId; + /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RoomId(pub ByteVec); @@ -37,10 +39,14 @@ impl RoomRegistry { subscriptions: HashMap::new(), }; let mut inner = self.0.write().unwrap(); - let (room_handle, fiber) = room.launch(room_id.clone()); - inner.rooms.insert(room_id, (room_handle.clone(), fiber)); - inner.metric_active_rooms.inc(); - room_handle + if let Some((room_handle, _)) = inner.rooms.get(&room_id) { + room_handle.clone() + } else { + let (room_handle, fiber) = room.launch(room_id.clone()); + inner.rooms.insert(room_id, (room_handle.clone(), fiber)); + inner.metric_active_rooms.inc(); + room_handle + } } pub fn get_room(&self, room_id: RoomId) -> Option { @@ -73,9 +79,18 @@ impl RoomHandle { }; } - pub async fn send_message(&mut self, player_id: PlayerId, body: String) { + pub async fn send_message( + &mut self, + player_id: PlayerId, + connection_id: ConnectionId, + body: String, + ) { self.tx - .send(RoomCommand::SendMessage { player_id, body }) + .send(RoomCommand::SendMessage { + player_id, + connection_id, + body, + }) .await; } } @@ -87,6 +102,7 @@ enum RoomCommand { }, SendMessage { player_id: PlayerId, + connection_id: ConnectionId, body: String, }, } @@ -105,11 +121,21 @@ impl Room { tracing::info!("Adding a subscriber to room"); self.subscriptions.insert(player_id, player); } - RoomCommand::SendMessage { player_id, body } => { + RoomCommand::SendMessage { + player_id, + connection_id, + body, + } => { tracing::info!("Adding a message to room"); for (_, sub) in &mut self.subscriptions { - sub.receive_message(room_id.clone(), player_id.clone(), body.clone()) - .await; + log::info!("Sending a message from room to player"); + sub.receive_message( + room_id.clone(), + player_id.clone(), + connection_id.clone(), + body.clone(), + ) + .await; } } } diff --git a/src/projections/irc.rs b/src/projections/irc.rs index 1459d69..a669ec5 100644 --- a/src/projections/irc.rs +++ b/src/projections/irc.rs @@ -134,7 +134,7 @@ async fn handle_registered_socket<'a>( let mut user_handle = players .get_or_create_player(PlayerId(user.nickname.clone())) .await; - let mut connnection = user_handle.subscribe().await; + let (my_connection_id, mut connnection) = user_handle.subscribe().await; ServerMessage { tags: vec![], @@ -217,8 +217,49 @@ async fn handle_registered_socket<'a>( }, ClientMessage::Join(chan) => { match chan { - Chan::Global(room) => { + Chan::Global(ref room) => { user_handle.join_room(RoomId(room.clone())).await; + ServerMessage { + tags: vec![], + sender: Some(user.nickname.clone()), + body: ServerMessageBody::Join(chan.clone()), + } + .write_async(&mut writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N332Topic { + client: user.nickname.clone(), + chat: chan.clone(), + topic: b"chan topic lol".to_vec(), + }, + } + .write_async(&mut writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N353NamesReply { + client: user.nickname.clone(), + chan: chan.clone(), + members: user.nickname.clone(), + }, + } + .write_async(&mut writer) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N366NamesReplyEnd { + client: user.nickname.clone(), + chan: chan.clone(), + }, + } + .write_async(&mut writer) + .await?; + + writer.flush().await?; }, Chan::Local(_) => {}, }; @@ -227,7 +268,7 @@ async fn handle_registered_socket<'a>( match recipient { Recipient::Chan(Chan::Global(room)) => { match String::from_utf8(body) { - Ok(body) => user_handle.send_message(RoomId(room.clone()), body.clone()).await, + Ok(body) => user_handle.send_message(RoomId(room.clone()), my_connection_id.clone(), body.clone()).await, Err(err) => log::warn!("failed to parse incoming message: {err}"), } }, @@ -246,13 +287,15 @@ async fn handle_registered_socket<'a>( update = connnection.recv() => { match update.unwrap() { Updates::RoomJoined { room_id } => {}, - Updates::NewMessage { room_id, body } => { - ServerMessage { - tags: vec![], - sender: None, - body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.0)), body: body.as_bytes().to_vec() } - }.write_async(&mut writer).await?; - writer.flush().await? + Updates::NewMessage { author_id, connection_id, room_id, body } => { + if my_connection_id != connection_id { + ServerMessage { + tags: vec![], + sender: Some(author_id.0.clone()), + body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.0)), body: body.as_bytes().to_vec() } + }.write_async(&mut writer).await?; + writer.flush().await? + } }, } } diff --git a/src/protos/irc/server.rs b/src/protos/irc/server.rs index 1f6765b..1e343c6 100644 --- a/src/protos/irc/server.rs +++ b/src/protos/irc/server.rs @@ -59,6 +59,7 @@ pub enum ServerMessageBody { target: Recipient, body: ByteVec, }, + Join(Chan), N001Welcome { client: ByteVec, text: ByteVec, @@ -81,6 +82,20 @@ pub enum ServerMessageBody { client: ByteVec, params: ByteVec, // TODO make this a datatype }, + N332Topic { + client: ByteVec, + chat: Chan, + topic: ByteVec, + }, + N353NamesReply { + client: ByteVec, + chan: Chan, + members: ByteVec, // TODO make this a non-empty list with prefixes + }, + N366NamesReplyEnd { + client: ByteVec, + chan: Chan, + }, } impl ServerMessageBody { @@ -112,6 +127,10 @@ impl ServerMessageBody { writer.write_all(b" :").await?; writer.write_all(&body).await?; } + ServerMessageBody::Join(chan) => { + writer.write_all(b"JOIN ").await?; + chan.write_async(writer).await?; + } ServerMessageBody::N001Welcome { client, text } => { writer.write_all(b"001 ").await?; writer.write_all(&client).await?; @@ -153,6 +172,37 @@ impl ServerMessageBody { writer.write_all(¶ms).await?; writer.write_all(b" :are supported by this server").await?; } + ServerMessageBody::N332Topic { + client, + chat, + topic, + } => { + writer.write_all(b"332 ").await?; + writer.write_all(&client).await?; + writer.write_all(b" ").await?; + chat.write_async(writer).await?; + writer.write_all(b" :").await?; + writer.write_all(&topic).await?; + } + ServerMessageBody::N353NamesReply { + client, + chan, + members, + } => { + writer.write_all(b"353 ").await?; + writer.write_all(&client).await?; + writer.write_all(b" = ").await?; + chan.write_async(writer).await?; + writer.write_all(b" :").await?; + writer.write_all(&members).await?; + } + ServerMessageBody::N366NamesReplyEnd { client, chan } => { + writer.write_all(b"366 ").await?; + writer.write_all(&client).await?; + writer.write_all(b" ").await?; + chan.write_async(writer).await?; + writer.write_all(b" :End of /NAMES list").await?; + } } Ok(()) } diff --git a/src/util/table.rs b/src/util/table.rs index ff3a9af..c2bd0cc 100644 --- a/src/util/table.rs +++ b/src/util/table.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -#[derive(PartialEq, Eq, Debug, Clone, Copy)] +#[derive(PartialEq, Eq, Debug, Clone, Copy, Hash)] pub struct Key(u32); /// Hash map with auto-generated surrogate key.