diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index a81bcbd..6493087 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -14,10 +14,7 @@ use std::{ use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; -use tokio::{ - sync::mpsc::{channel, Receiver, Sender}, - task::JoinHandle, -}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::prelude::*; use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; @@ -45,9 +42,13 @@ impl PlayerId { } } +/// Node-local identifier of a connection. It is used to address a connection within a player actor. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionId(pub AnonKey); +/// Representation of an authenticated client connection. +/// +/// The connection is used to send commands to the player actor and to receive updates that might be sent to the client. pub struct PlayerConnection { pub connection_id: ConnectionId, pub receiver: Receiver, @@ -55,42 +56,44 @@ pub struct PlayerConnection { } impl PlayerConnection { pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<()> { - self.player_handle.send_message(room_id, self.connection_id.clone(), body).await + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::SendMessage { room_id, body, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) } pub async fn join_room(&mut self, room_id: RoomId) -> Result { - self.player_handle.join_room(room_id, self.connection_id.clone()).await + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::JoinRoom { room_id, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) } pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { let (promise, deferred) = oneshot(); - let cmd = Cmd::ChangeTopic { + let cmd = ClientCommand::ChangeTopic { room_id, new_topic, promise, }; - self.player_handle.send(PlayerCommand::Cmd(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { let (promise, deferred) = oneshot(); - self.player_handle - .send(PlayerCommand::Cmd( - Cmd::LeaveRoom { room_id, promise }, - self.connection_id.clone(), - )) - .await; + let cmd = ClientCommand::LeaveRoom { room_id, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } pub async fn terminate(self) { - self.player_handle.send(PlayerCommand::TerminateConnection(self.connection_id)).await; + self.player_handle.send(ActorCommand::TerminateConnection(self.connection_id)).await; } pub async fn get_rooms(&self) -> Result> { let (promise, deferred) = oneshot(); - self.player_handle.send(PlayerCommand::GetRooms(promise)).await; + self.player_handle.send(ActorCommand::GetRooms(promise)).await; Ok(deferred.await?) } } @@ -98,13 +101,13 @@ impl PlayerConnection { /// Handle to a player actor. #[derive(Clone)] pub struct PlayerHandle { - tx: Sender, + tx: Sender, } impl PlayerHandle { pub async fn subscribe(&self) -> PlayerConnection { let (sender, receiver) = channel(32); let (promise, deferred) = oneshot(); - let cmd = PlayerCommand::AddConnection { sender, promise }; + let cmd = ActorCommand::AddConnection { sender, promise }; let _ = self.tx.send(cmd).await; let connection_id = deferred.await.unwrap(); PlayerConnection { @@ -114,45 +117,36 @@ impl PlayerHandle { } } - pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: Str) -> Result<()> { - let (promise, deferred) = oneshot(); - let cmd = Cmd::SendMessage { room_id, body, promise }; - let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; - Ok(deferred.await?) - } - - pub async fn join_room(&self, room_id: RoomId, connection_id: ConnectionId) -> Result { - let (promise, deferred) = oneshot(); - let cmd = Cmd::JoinRoom { room_id, promise }; - let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; - Ok(deferred.await?) - } - - async fn send(&self, command: PlayerCommand) { + async fn send(&self, command: ActorCommand) { + // TODO either handle the error or doc why it is safe to ignore let _ = self.tx.send(command).await; } pub async fn update(&self, update: Updates) { - self.send(PlayerCommand::Update(update)).await; + self.send(ActorCommand::Update(update)).await; } } -enum PlayerCommand { - /** Commands from connections */ +/// Messages sent to the player actor. +enum ActorCommand { + /// Establish a new connection. AddConnection { sender: Sender, promise: Promise, }, + /// Terminate an existing connection. TerminateConnection(ConnectionId), - Cmd(Cmd, ConnectionId), + /// Player-issued command. + ClientCommand(ClientCommand, ConnectionId), /// Query - responds with a list of rooms the player is a member of. GetRooms(Promise>), - /** Events from rooms */ + /// Update which is sent from a room the player is member of. Update(Updates), Stop, } -pub enum Cmd { +/// Client-issued command sent to the player actor. The actor will respond with by fulfilling the promise. +pub enum ClientCommand { JoinRoom { room_id: RoomId, promise: Promise, @@ -237,7 +231,7 @@ impl PlayerRegistry { pub async fn shutdown_all(&mut self) -> Result<()> { let mut inner = self.0.write().unwrap(); for (i, (k, j)) in inner.players.drain() { - k.send(PlayerCommand::Stop).await; + k.send(ActorCommand::Stop).await; drop(k); j.await?; log::debug!("Player stopped #{i:?}") @@ -260,7 +254,7 @@ struct Player { connections: AnonTable>, my_rooms: HashMap, banned_from: HashSet, - rx: Receiver, + rx: Receiver, handle: PlayerHandle, rooms: RoomRegistry, } @@ -285,24 +279,24 @@ impl Player { async fn main_loop(mut self) -> Self { while let Some(cmd) = self.rx.recv().await { match cmd { - PlayerCommand::AddConnection { sender, promise } => { + ActorCommand::AddConnection { sender, promise } => { let connection_id = self.connections.insert(sender); if let Err(connection_id) = promise.send(ConnectionId(connection_id)) { log::warn!("Connection {connection_id:?} terminated before finalization"); self.terminate_connection(connection_id); } } - PlayerCommand::TerminateConnection(connection_id) => { + ActorCommand::TerminateConnection(connection_id) => { self.terminate_connection(connection_id); } - PlayerCommand::GetRooms(promise) => { + ActorCommand::GetRooms(promise) => { let mut response = vec![]; for (_, handle) in &self.my_rooms { response.push(handle.get_room_info().await); } let _ = promise.send(response); } - PlayerCommand::Update(update) => { + ActorCommand::Update(update) => { log::info!( "Player received an update, broadcasting to {} connections", self.connections.len() @@ -318,8 +312,8 @@ impl Player { let _ = connection.send(update.clone()).await; } } - PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, - PlayerCommand::Stop => break, + ActorCommand::ClientCommand(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, + ActorCommand::Stop => break, } } log::debug!("Shutting down player actor #{:?}", self.player_id); @@ -332,9 +326,9 @@ impl Player { } } - async fn handle_cmd(&mut self, cmd: Cmd, connection_id: ConnectionId) { + async fn handle_cmd(&mut self, cmd: ClientCommand, connection_id: ConnectionId) { match cmd { - Cmd::JoinRoom { room_id, promise } => { + ClientCommand::JoinRoom { room_id, promise } => { if self.banned_from.contains(&room_id) { let _ = promise.send(JoinResult::Banned); return; @@ -357,7 +351,7 @@ impl Player { }; self.broadcast_update(update, connection_id).await; } - Cmd::LeaveRoom { room_id, promise } => { + ClientCommand::LeaveRoom { room_id, promise } => { let room = self.my_rooms.remove(&room_id); if let Some(room) = room { room.unsubscribe(&self.player_id).await; @@ -370,7 +364,7 @@ impl Player { }; self.broadcast_update(update, connection_id).await; } - Cmd::SendMessage { room_id, body, promise } => { + ClientCommand::SendMessage { room_id, body, promise } => { let room = self.rooms.get_room(&room_id).await; if let Some(room) = room { room.send_message(self.player_id.clone(), body.clone()).await; @@ -385,7 +379,7 @@ impl Player { }; self.broadcast_update(update, connection_id).await; } - Cmd::ChangeTopic { + ClientCommand::ChangeTopic { room_id, new_topic, promise, @@ -403,6 +397,10 @@ impl Player { } } + /// Broadcasts an update to all connections except the one with the given id. + /// + /// This is called after handling a client command. + /// Sending the update to the connection which sent the command is handled by the connection itself. async fn broadcast_update(&self, update: Updates, except: ConnectionId) { for (a, b) in &self.connections { if ConnectionId(a) == except { diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index 193098e..04fdbb1 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -31,7 +31,7 @@ impl RoomId { } } -/// Shared datastructure for storing metadata about rooms. +/// Shared data structure for storing metadata about rooms. #[derive(Clone)] pub struct RoomRegistry(Arc>); impl RoomRegistry { @@ -160,9 +160,13 @@ impl RoomHandle { } struct Room { + /// The numeric node-local id of the room as it is stored in the database. storage_id: u32, + /// The cluster-global id of the room. room_id: RoomId, + /// Player actors on the local node which are subscribed to this room's updates. subscriptions: HashMap, + /// The total number of messages. Used to calculate the id of the new message. message_count: u32, topic: Str, storage: Storage, @@ -191,6 +195,10 @@ impl Room { Ok(()) } + /// Broadcasts an update to all players except the one who caused the update. + /// + /// This is called after handling a client command. + /// Sending the update to the player who sent the command is handled by the player actor. async fn broadcast_update(&self, update: Updates, except: &PlayerId) { tracing::debug!("Broadcasting an update to {} subs", self.subscriptions.len()); for (player_id, sub) in &self.subscriptions { diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 27b1d69..ee47f9b 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -44,7 +44,7 @@ struct RegisteredUser { /** * Username is mostly unused in modern IRC. * - * [https://stackoverflow.com/questions/31666247/what-is-the-difference-between-the-nick-username-and-real-name-in-irc-and-wha] + * */ username: Str, realname: Str, diff --git a/crates/proto-irc/src/client.rs b/crates/proto-irc/src/client.rs index 676fd40..66cf107 100644 --- a/crates/proto-irc/src/client.rs +++ b/crates/proto-irc/src/client.rs @@ -7,42 +7,42 @@ use nonempty::NonEmpty; /// Client-to-server command. #[derive(Clone, Debug, PartialEq, Eq)] pub enum ClientMessage { - /// CAP. Capability-related commands. + /// `CAP`. Capability-related commands. Capability { subcommand: CapabilitySubcommand, }, - /// PING + /// `PING ` Ping { token: Str, }, - /// PONG + /// `PONG ` Pong { token: Str, }, - /// NICK + /// `NICK ` Nick { nickname: Str, }, - /// PASS + /// `PASS ` Pass { password: Str, }, - /// USER 0 * : + /// `USER 0 * :` User { username: Str, realname: Str, }, - /// JOIN + /// `JOIN ` Join(Chan), - /// MODE + /// `MODE ` Mode { target: Recipient, }, - /// WHO + /// `WHO ` Who { target: Recipient, // aka mask }, - /// TOPIC : + /// `TOPIC :` Topic { chan: Chan, topic: Str, @@ -51,12 +51,12 @@ pub enum ClientMessage { chan: Chan, message: Str, }, - /// PRIVMSG : + /// `PRIVMSG :` PrivateMessage { recipient: Recipient, body: Str, }, - /// QUIT : + /// `QUIT :` Quit { reason: Str, }, diff --git a/crates/proto-irc/src/lib.rs b/crates/proto-irc/src/lib.rs index fd54809..54ff676 100644 --- a/crates/proto-irc/src/lib.rs +++ b/crates/proto-irc/src/lib.rs @@ -36,9 +36,9 @@ fn params(input: &str) -> IResult<&str, &str> { #[derive(Clone, Debug, PartialEq, Eq)] pub enum Chan { - /// # — network-global channel, available from any server in the network. + /// `#` — network-global channel, available from any server in the network. Global(Str), - /// & — server-local channel, available only to connections to the same server. Rarely used in practice. + /// `&` — server-local channel, available only to connections to the same server. Rarely used in practice. Local(Str), } impl Chan {