diff --git a/src/core/player.rs b/src/core/player.rs index 865d87e..59ea849 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -31,14 +31,16 @@ pub struct PlayerId(ByteVec); impl PlayerId { pub fn from_bytes(bytes: ByteVec) -> Result { if bytes.len() > 32 { - return Err(anyhow::Error::msg("Nickname cannot be longer than 32 symbols")); + return Err(fail("Nickname cannot be longer than 32 symbols")); } if bytes.contains(&b' ') { return Err(anyhow::Error::msg("Nickname cannot contain spaces")); } Ok(PlayerId(bytes)) } - pub fn as_bytes(&self) -> &ByteVec { &self.0 } + pub fn as_bytes(&self) -> &ByteVec { + &self.0 + } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -113,43 +115,15 @@ impl PlayerHandle { Ok(deferred.await?) } - pub async fn receive_message( - &self, - room_id: RoomId, - author: PlayerId, - connection_id: ConnectionId, - body: String, - ) { - self.tx - .send(PlayerCommand::IncomingMessage { - room_id, - author, - connection_id, - body, - }) - .await; - } - pub async fn send(&self, command: PlayerCommand) { self.tx.send(command).await; } + + pub async fn update(&self, update: Updates) { + self.send(PlayerCommand::Update(update)).await; + } } -/// Player update event type which is sent to a connection handler. -pub enum Updates { - RoomJoined { - player_id: PlayerId, - connection_id: ConnectionId, - room_id: RoomId, - }, - NewMessage { - author_id: PlayerId, - connection_id: ConnectionId, - room_id: RoomId, - body: String, - }, - IncomingUpdate(IncomingPlayerEvent), -} pub enum PlayerCommand { /** Commands from connections */ AddSocket { @@ -168,23 +142,27 @@ pub enum PlayerCommand { }, GetRooms(Promise>), /** Events from rooms */ - IncomingMessage { + Update(Updates), +} + +/// Player update event type which is sent to a player actor and from there to a connection handler. +#[derive(Clone)] +pub enum Updates { + RoomTopicChanged { + room_id: RoomId, + new_topic: ByteVec, + }, + NewMessage { room_id: RoomId, connection_id: ConnectionId, - author: PlayerId, + author_id: PlayerId, body: String, }, - IncomingRoomJoined { + RoomJoined { room_id: RoomId, new_member_id: PlayerId, connection_id: ConnectionId, }, - Event(IncomingPlayerEvent), -} - -#[derive(Clone)] -pub enum IncomingPlayerEvent { - IncomingRoomTopicChanged { room_id: RoomId, new_topic: ByteVec }, } /// Handle to a player registry — a shared data structure containing information about players. @@ -294,48 +272,13 @@ impl Player { } promise.send(response); } - PlayerCommand::IncomingMessage { - room_id, - author, - connection_id, - body, - } => { - tracing::info!( - "Handling incoming message, player_id={:?}", - player_id.clone() + PlayerCommand::Update(update) => { + log::info!( + "Player received an update, broadcasting to {} connections", + self.sockets.len() ); for socket in &self.sockets { - log::info!("Send message to socket"); - socket - .send(Updates::NewMessage { - author_id: author.clone(), - connection_id: connection_id.clone(), - room_id: room_id.clone(), - body: body.clone(), - }) - .await; - } - } - PlayerCommand::IncomingRoomJoined { - room_id, - new_member_id, - connection_id, - } => { - for socket in &self.sockets { - let room_id = room_id.clone(); - let connection_id = connection_id.clone(); - socket - .send(Updates::RoomJoined { - player_id: new_member_id.clone(), - connection_id, - room_id, - }) - .await; - } - } - PlayerCommand::Event(event) => { - for socket in &self.sockets { - socket.send(Updates::IncomingUpdate(event.clone())).await; + socket.send(update.clone()).await; } } } diff --git a/src/core/room.rs b/src/core/room.rs index 0ee8c47..9b9caca 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -9,11 +9,11 @@ use prometheus::{IntGauge, Registry as MetricRegistry}; use tokio::sync::RwLock as AsyncRwLock; use crate::{ - core::player::{PlayerCommand, PlayerHandle, PlayerId}, + core::player::{PlayerHandle, PlayerId}, prelude::*, }; -use super::player::{ConnectionId, IncomingPlayerEvent}; +use super::player::{ConnectionId, Updates}; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -21,14 +21,18 @@ pub struct RoomId(ByteVec); impl RoomId { pub fn from_bytes(bytes: ByteVec) -> Result { if bytes.len() > 32 { - return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); + return Err(anyhow::Error::msg( + "Room name cannot be longer than 32 symbols", + )); } if bytes.contains(&b' ') { return Err(anyhow::Error::msg("Room name cannot contain spaces")); } Ok(RoomId(bytes)) } - pub fn as_bytes(&self) -> &ByteVec { &self.0 } + pub fn as_bytes(&self) -> &ByteVec { + &self.0 + } } /// Shared datastructure for storing metadata about rooms. @@ -123,15 +127,12 @@ impl RoomHandle { pub async fn set_topic(&mut self, new_topic: ByteVec) { let mut lock = self.0.write().await; lock.topic = new_topic.clone(); - for (player_id, player_handle) in &lock.subscriptions { - let msg = player_handle - .send(PlayerCommand::Event( - IncomingPlayerEvent::IncomingRoomTopicChanged { - room_id: lock.room_id.clone(), - new_topic: new_topic.clone(), - }, - )) - .await; + for (_, player_handle) in &lock.subscriptions { + let update = Updates::RoomTopicChanged { + room_id: lock.room_id.clone(), + new_topic: new_topic.clone(), + }; + player_handle.update(update.clone()).await; } } } @@ -150,27 +151,27 @@ impl Room { ) { tracing::info!("Adding a subscriber to room"); self.subscriptions.insert(player_id.clone(), player_handle); + let update = Updates::RoomJoined { + room_id: self.room_id.clone(), + new_member_id: player_id.clone(), + connection_id: connection_id.clone(), + }; for (_, sub) in &self.subscriptions { - sub.send(PlayerCommand::IncomingRoomJoined { - room_id: self.room_id.clone(), - new_member_id: player_id.clone(), - connection_id: connection_id.clone(), - }) - .await; + sub.update(update.clone()).await; } } - async fn send_message(&self, player_id: PlayerId, connection_id: ConnectionId, body: String) { + async fn send_message(&self, author_id: PlayerId, connection_id: ConnectionId, body: String) { tracing::info!("Adding a message to room"); + let update = Updates::NewMessage { + room_id: self.room_id.clone(), + connection_id, + author_id, + body, + }; for (_, sub) in &self.subscriptions { log::info!("Sending a message from room to player"); - sub.receive_message( - self.room_id.clone(), - player_id.clone(), - connection_id.clone(), - body.clone(), - ) - .await; + sub.update(update.clone()).await; } } } diff --git a/src/prelude.rs b/src/prelude.rs index 4bcc546..1345d13 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -12,3 +12,7 @@ pub mod log { pub type Result = std::result::Result; pub type ByteVec = Vec; + +pub fn fail(msg: &'static str) -> anyhow::Error { + anyhow::Error::msg(msg) +} diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index 5b8019e..a9126ad 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -7,9 +7,7 @@ use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot::channel; -use crate::core::player::{ - IncomingPlayerEvent, PlayerCommand, PlayerConnection, PlayerId, PlayerRegistry, Updates, -}; +use crate::core::player::{PlayerCommand, PlayerConnection, PlayerId, PlayerRegistry, Updates}; use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; use crate::prelude::*; use crate::protos::irc::client::{client_message, ClientMessage}; @@ -239,8 +237,8 @@ async fn handle_registered_socket<'a>( }, update = connection.receiver.recv() => { match update.unwrap() { - Updates::RoomJoined { player_id: author_id, connection_id, room_id } => { - if player_id == author_id { + Updates::RoomJoined { new_member_id, connection_id, room_id } => { + if player_id == new_member_id { if let Some(room) = rooms.get_room(&room_id) { let room_info = room.get_room_info().await; let chan = Chan::Global(room_id.as_bytes().clone()); @@ -252,7 +250,7 @@ async fn handle_registered_socket<'a>( } else { ServerMessage { tags: vec![], - sender: Some(author_id.as_bytes().clone()), + sender: Some(new_member_id.as_bytes().clone()), body: ServerMessageBody::Join(Chan::Global(room_id.as_bytes().clone())), }.write_async(writer).await?; writer.flush().await? @@ -268,7 +266,7 @@ async fn handle_registered_socket<'a>( writer.flush().await? } }, - Updates::IncomingUpdate(IncomingPlayerEvent::IncomingRoomTopicChanged { room_id, new_topic }) => { + Updates::RoomTopicChanged { room_id, new_topic } => { ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()),