From 05f8c5e5025115fcadec7542a3f2445ad8d8287e Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Tue, 14 Feb 2023 23:22:04 +0100 Subject: [PATCH] rework commands and updates. updates from rooms are send only to users other than the initiator. updates from player are send only to connections other than the one the command was sent from. --- src/core/player.rs | 180 ++++++++++++++++++++++++------------- src/core/room.rs | 63 +++++-------- src/projections/irc/mod.rs | 48 ++++++---- src/util/table.rs | 16 ++-- 4 files changed, 180 insertions(+), 127 deletions(-) diff --git a/src/core/player.rs b/src/core/player.rs index cf1969e..8d8f10b 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -15,12 +15,11 @@ use std::{ use prometheus::{IntGauge, Registry as MetricsRegistry}; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, - sync::oneshot::{channel as oneshot, Sender as OneshotSender}, task::JoinHandle, }; use crate::{ - core::room::{RoomId, RoomInfo, RoomRegistry}, + core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}, prelude::*, util::table::{AnonTable, Key as AnonKey}, }; @@ -52,7 +51,7 @@ pub struct PlayerConnection { player_handle: PlayerHandle, } impl PlayerConnection { - pub async fn send_message(&mut self, room_id: RoomId, body: String) { + pub async fn send_message(&mut self, room_id: RoomId, body: String) -> Result<()> { self.player_handle .send_message(room_id, self.connection_id.clone(), body) .await @@ -64,6 +63,19 @@ impl PlayerConnection { .await } + pub async fn change_topic(&mut self, room_id: RoomId, new_topic: ByteVec) -> Result<()> { + let (promise, deferred) = oneshot(); + let cmd = Cmd::ChangeTopic { + room_id, + new_topic, + promise, + }; + self.player_handle + .send(PlayerCommand::Cmd(cmd, self.connection_id.clone())) + .await; + Ok(deferred.await?) + } + pub async fn send(&self, command: PlayerCommand) { self.player_handle.send(command).await; } @@ -89,14 +101,20 @@ impl PlayerHandle { } } - pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: String) { - self.tx - .send(PlayerCommand::SendMessage { - room_id, - connection_id, - body, - }) - .await; + pub async fn send_message( + &self, + room_id: RoomId, + connection_id: ConnectionId, + body: String, + ) -> Result<()> { + let (promise, deferred) = oneshot(); + let cmd = Cmd::SendMessage { + room_id, + body, + promise, + }; + self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; + Ok(deferred.await?) } pub async fn join_room( @@ -105,17 +123,15 @@ impl PlayerHandle { connection_id: ConnectionId, ) -> Result { let (promise, deferred) = oneshot(); - self.tx - .send(PlayerCommand::JoinRoom { - room_id, - connection_id, - promise, - }) - .await; + let cmd = Cmd::JoinRoom { + room_id, + promise: promise, + }; + self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; Ok(deferred.await?) } - pub async fn send(&self, command: PlayerCommand) { + async fn send(&self, command: PlayerCommand) { self.tx.send(command).await; } @@ -128,21 +144,30 @@ pub enum PlayerCommand { /** Commands from connections */ AddConnection { sender: Sender, - promise: OneshotSender, + promise: Promise, }, + Cmd(Cmd, ConnectionId), + /// Query - responds with a list of rooms the player is a member of. + GetRooms(Promise>), + /** Events from rooms */ + Update(Updates), +} + +pub enum Cmd { JoinRoom { room_id: RoomId, - connection_id: ConnectionId, promise: Promise, }, SendMessage { room_id: RoomId, - connection_id: ConnectionId, body: String, + promise: Promise<()>, + }, + ChangeTopic { + room_id: RoomId, + new_topic: ByteVec, + promise: Promise<()>, }, - GetRooms(Promise>), - /** Events from rooms */ - Update(Updates), } /// Player update event type which is sent to a player actor and from there to a connection handler. @@ -154,14 +179,12 @@ pub enum Updates { }, NewMessage { room_id: RoomId, - connection_id: ConnectionId, author_id: PlayerId, body: String, }, RoomJoined { room_id: RoomId, new_member_id: PlayerId, - connection_id: ConnectionId, }, } @@ -225,7 +248,7 @@ impl Player { let (tx, mut rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); - let mut my_rooms = HashMap::new(); + let mut my_rooms: HashMap = HashMap::new(); let fiber = tokio::task::spawn(async move { while let Some(cmd) = rx.recv().await { match cmd { @@ -233,38 +256,6 @@ impl Player { let connection_id = self.connections.insert(sender); promise.send(ConnectionId(connection_id)); } - PlayerCommand::JoinRoom { - room_id, - connection_id, - promise, - } => { - let mut room = rooms.get_or_create_room(room_id.clone()); - room.subscribe(player_id.clone(), connection_id, handle.clone()) - .await; - my_rooms.insert(room_id.clone(), room.clone()); - let members = room.get_members().await; - promise.send(RoomInfo { - id: room_id, - members, - topic: b"some topic lol".to_vec(), - }); - } - PlayerCommand::SendMessage { - room_id, - connection_id, - body, - } => { - let room = rooms.get_room(&room_id); - match room { - Some(mut room) => { - room.send_message(player_id.clone(), connection_id, body) - .await; - } - None => { - tracing::info!("no room found"); - } - } - } PlayerCommand::GetRooms(promise) => { let mut response = vec![]; for (_, handle) in &my_rooms { @@ -277,10 +268,77 @@ impl Player { "Player received an update, broadcasting to {} connections", self.connections.len() ); - for connection in &self.connections { + for (_, connection) in &self.connections { connection.send(update.clone()).await; } } + PlayerCommand::Cmd(cmd, connection_id) => match cmd { + Cmd::JoinRoom { room_id, promise } => { + let mut room = rooms.get_or_create_room(room_id.clone()); + room.subscribe(player_id.clone(), handle.clone()).await; + my_rooms.insert(room_id.clone(), room.clone()); + let members = room.get_members().await; + promise.send(RoomInfo { + id: room_id.clone(), + members, + topic: b"some topic lol".to_vec(), + }); + let update = Updates::RoomJoined { + room_id, + new_member_id: player_id.clone(), + }; + for (a, b) in &self.connections { + if ConnectionId(a) == connection_id { + continue; + } + b.send(update.clone()).await; + } + } + Cmd::SendMessage { + room_id, + body, + promise, + } => { + let room = rooms.get_room(&room_id); + if let Some(room) = room { + room.send_message(player_id.clone(), body.clone()).await; + } else { + tracing::info!("no room found"); + } + promise.send(()); + let update = Updates::NewMessage { + room_id, + author_id: player_id.clone(), + body, + }; + for (a, b) in &self.connections { + if ConnectionId(a) == connection_id { + continue; + } + b.send(update.clone()).await; + } + } + Cmd::ChangeTopic { + room_id, + new_topic, + promise, + } => { + let room = rooms.get_room(&room_id); + if let Some(mut room) = room { + room.set_topic(player_id.clone(), new_topic.clone()).await; + } else { + tracing::info!("no room found"); + } + promise.send(()); + let update = Updates::RoomTopicChanged { room_id, new_topic }; + for (a, b) in &self.connections { + if ConnectionId(a) == connection_id { + continue; + } + b.send(update.clone()).await; + } + } + }, } } self diff --git a/src/core/room.rs b/src/core/room.rs index 9b9caca..cccb129 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -9,12 +9,10 @@ use prometheus::{IntGauge, Registry as MetricRegistry}; use tokio::sync::RwLock as AsyncRwLock; use crate::{ - core::player::{PlayerHandle, PlayerId}, + core::player::{PlayerHandle, PlayerId, Updates}, prelude::*, }; -use super::player::{ConnectionId, Updates}; - /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RoomId(ByteVec); @@ -82,25 +80,14 @@ struct RoomRegistryInner { #[derive(Clone)] pub struct RoomHandle(Arc>); impl RoomHandle { - pub async fn subscribe( - &self, - player_id: PlayerId, - connection_id: ConnectionId, - player_handle: PlayerHandle, - ) { + pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) { let mut lock = self.0.write().await; - lock.add_subscriber(player_id, connection_id, player_handle) - .await; + lock.add_subscriber(player_id, player_handle).await; } - pub async fn send_message( - &self, - player_id: PlayerId, - connection_id: ConnectionId, - body: String, - ) { + pub async fn send_message(&self, player_id: PlayerId, body: String) { let lock = self.0.read().await; - lock.send_message(player_id, connection_id, body).await; + lock.send_message(player_id, body).await; } pub async fn get_members(&self) -> Vec { @@ -124,16 +111,14 @@ impl RoomHandle { } } - pub async fn set_topic(&mut self, new_topic: ByteVec) { + pub async fn set_topic(&mut self, changer_id: PlayerId, new_topic: ByteVec) { let mut lock = self.0.write().await; lock.topic = new_topic.clone(); - for (_, player_handle) in &lock.subscriptions { - let update = Updates::RoomTopicChanged { - room_id: lock.room_id.clone(), - new_topic: new_topic.clone(), - }; - player_handle.update(update.clone()).await; - } + let update = Updates::RoomTopicChanged { + room_id: lock.room_id.clone(), + new_topic: new_topic.clone(), + }; + lock.broadcast_update(update, &changer_id).await; } } @@ -143,33 +128,31 @@ struct Room { topic: ByteVec, } impl Room { - async fn add_subscriber( - &mut self, - player_id: PlayerId, - connection_id: ConnectionId, - player_handle: PlayerHandle, - ) { + async fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { tracing::info!("Adding a subscriber to room"); self.subscriptions.insert(player_id.clone(), player_handle); let update = Updates::RoomJoined { room_id: self.room_id.clone(), new_member_id: player_id.clone(), - connection_id: connection_id.clone(), }; - for (_, sub) in &self.subscriptions { - sub.update(update.clone()).await; - } + self.broadcast_update(update, &player_id).await; } - async fn send_message(&self, author_id: PlayerId, connection_id: ConnectionId, body: String) { + async fn send_message(&self, author_id: PlayerId, body: String) { tracing::info!("Adding a message to room"); let update = Updates::NewMessage { room_id: self.room_id.clone(), - connection_id, - author_id, + author_id: author_id.clone(), body, }; - for (_, sub) in &self.subscriptions { + self.broadcast_update(update, &author_id).await; + } + + async fn broadcast_update(&self, update: Updates, except: &PlayerId) { + for (player_id, sub) in &self.subscriptions { + if player_id == except { + continue; + } log::info!("Sending a message from room to player"); sub.update(update.clone()).await; } diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index ff24623..ce62fa7 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -232,12 +232,12 @@ async fn handle_registered_socket<'a>( } else { len }; - handle_incoming_message(&buffer[0..len], &config, &user, &rooms, &mut connection, writer).await?; + handle_incoming_message(&buffer[0..len], &config, &user, &rooms, &mut connection, &player_id, writer).await?; buffer.clear(); }, update = connection.receiver.recv() => { match update.unwrap() { - Updates::RoomJoined { new_member_id, connection_id, room_id } => { + Updates::RoomJoined { new_member_id, room_id } => { if player_id == new_member_id { if let Some(room) = rooms.get_room(&room_id) { let room_info = room.get_room_info().await; @@ -256,15 +256,13 @@ async fn handle_registered_socket<'a>( writer.flush().await? } }, - Updates::NewMessage { author_id, connection_id, room_id, body } => { - if player_id != author_id || connection.connection_id != connection_id { - ServerMessage { - tags: vec![], - sender: Some(author_id.as_bytes().clone()), - body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_bytes().clone())), body: body.as_bytes().to_vec() } - }.write_async(writer).await?; - writer.flush().await? - } + Updates::NewMessage { author_id, room_id, body } => { + ServerMessage { + tags: vec![], + sender: Some(author_id.as_bytes().clone()), + body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_bytes().clone())), body: body.as_bytes().to_vec() } + }.write_async(writer).await?; + writer.flush().await? }, Updates::RoomTopicChanged { room_id, new_topic } => { ServerMessage { @@ -291,6 +289,7 @@ async fn handle_incoming_message( user: &RegisteredUser, rooms: &RoomRegistry, user_handle: &mut PlayerConnection, + player_id: &PlayerId, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { let parsed = client_message(buffer); @@ -316,7 +315,7 @@ async fn handle_incoming_message( Recipient::Chan(Chan::Global(chan)) => match String::from_utf8(body) { Ok(body) => { let room_id = RoomId::from_bytes(chan)?; - user_handle.send_message(room_id, body.clone()).await + user_handle.send_message(room_id, body.clone()).await?; } Err(err) => log::warn!("failed to parse incoming message: {err}"), }, @@ -326,14 +325,25 @@ async fn handle_incoming_message( match chan { Chan::Global(chan) => { let room_id = RoomId::from_bytes(chan)?; - let room = rooms.get_room(&room_id); - if let Some(mut room) = room { - room.set_topic(topic).await; + user_handle + .change_topic(room_id.clone(), topic.clone()) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N332Topic { + client: user.nickname.clone(), + chat: Chan::Global(room_id.as_bytes().clone()), + topic, + }, } + .write_async(writer) + .await?; + writer.flush().await?; } Chan::Local(_) => {} }; - }, + } cmd => { log::warn!("Not implemented handler for client command: {cmd:?}"); } @@ -353,9 +363,11 @@ async fn handle_join( writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { match chan { - Chan::Global(chan) => { - let room_id = RoomId::from_bytes(chan.clone())?; + Chan::Global(chan_name) => { + let room_id = RoomId::from_bytes(chan_name.clone())?; let room_info = user_handle.join_room(room_id).await?; + produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?; + writer.flush().await?; } Chan::Local(_) => {} }; diff --git a/src/util/table.rs b/src/util/table.rs index c2bd0cc..2d93866 100644 --- a/src/util/table.rs +++ b/src/util/table.rs @@ -44,15 +44,15 @@ impl AnonTable { pub struct AnonTableIterator<'a, V>(<&'a HashMap as IntoIterator>::IntoIter); impl<'a, V> Iterator for AnonTableIterator<'a, V> { - type Item = &'a V; + type Item = (Key, &'a V); - fn next(&mut self) -> Option<&'a V> { - self.0.next().map(|a| a.1) + fn next(&mut self) -> Option<(Key, &'a V)> { + self.0.next().map(|(k, v)| (Key(*k), v)) } } impl<'a, V> IntoIterator for &'a AnonTable { - type Item = &'a V; + type Item = (Key, &'a V); type IntoIter = AnonTableIterator<'a, V>; @@ -63,15 +63,15 @@ impl<'a, V> IntoIterator for &'a AnonTable { pub struct AnonTableMutIterator<'a, V>(<&'a mut HashMap as IntoIterator>::IntoIter); impl<'a, V> Iterator for AnonTableMutIterator<'a, V> { - type Item = &'a mut V; + type Item = (Key, &'a mut V); - fn next(&mut self) -> Option<&'a mut V> { - self.0.next().map(|a| a.1) + fn next(&mut self) -> Option<(Key, &'a mut V)> { + self.0.next().map(|(k, v)| (Key(*k), v)) } } impl<'a, V> IntoIterator for &'a mut AnonTable { - type Item = &'a mut V; + type Item = (Key, &'a mut V); type IntoIter = AnonTableMutIterator<'a, V>;