From d436631450d639a418c4896bbcf92c7dc526a1fd Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Tue, 26 Mar 2024 16:26:31 +0000 Subject: [PATCH] improve docs and split command handlers into methods (#40) --- .run/Run lavina.run.xml | 21 +++ crates/lavina-core/src/player.rs | 282 +++++++++++++++++-------------- crates/lavina-core/src/room.rs | 10 +- crates/projection-irc/src/lib.rs | 2 +- crates/proto-irc/src/client.rs | 24 +-- crates/proto-irc/src/lib.rs | 4 +- 6 files changed, 201 insertions(+), 142 deletions(-) create mode 100644 .run/Run lavina.run.xml diff --git a/.run/Run lavina.run.xml b/.run/Run lavina.run.xml new file mode 100644 index 0000000..eea015c --- /dev/null +++ b/.run/Run lavina.run.xml @@ -0,0 +1,21 @@ + + + + \ No newline at end of file diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index a81bcbd..eec22f8 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -14,10 +14,7 @@ use std::{ use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; -use tokio::{ - sync::mpsc::{channel, Receiver, Sender}, - task::JoinHandle, -}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::prelude::*; use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; @@ -45,52 +42,65 @@ impl PlayerId { } } +/// Node-local identifier of a connection. It is used to address a connection within a player actor. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionId(pub AnonKey); +/// Representation of an authenticated client connection. +/// The public API available to projections through which all client actions are executed. +/// +/// The connection is used to send commands to the player actor and to receive updates that might be sent to the client. pub struct PlayerConnection { pub connection_id: ConnectionId, pub receiver: Receiver, player_handle: PlayerHandle, } impl PlayerConnection { + /// Handled in [Player::send_message]. pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<()> { - self.player_handle.send_message(room_id, self.connection_id.clone(), body).await + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::SendMessage { room_id, body, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) } + /// Handled in [Player::join_room]. pub async fn join_room(&mut self, room_id: RoomId) -> Result { - self.player_handle.join_room(room_id, self.connection_id.clone()).await + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::JoinRoom { room_id, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) } + /// Handled in [Player::change_topic]. pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { let (promise, deferred) = oneshot(); - let cmd = Cmd::ChangeTopic { + let cmd = ClientCommand::ChangeTopic { room_id, new_topic, promise, }; - self.player_handle.send(PlayerCommand::Cmd(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } + /// Handled in [Player::leave_room]. pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { let (promise, deferred) = oneshot(); - self.player_handle - .send(PlayerCommand::Cmd( - Cmd::LeaveRoom { room_id, promise }, - self.connection_id.clone(), - )) - .await; + let cmd = ClientCommand::LeaveRoom { room_id, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } pub async fn terminate(self) { - self.player_handle.send(PlayerCommand::TerminateConnection(self.connection_id)).await; + self.player_handle.send(ActorCommand::TerminateConnection(self.connection_id)).await; } + /// Handled in [Player::get_rooms]. pub async fn get_rooms(&self) -> Result> { let (promise, deferred) = oneshot(); - self.player_handle.send(PlayerCommand::GetRooms(promise)).await; + let cmd = ClientCommand::GetRooms { promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } } @@ -98,13 +108,13 @@ impl PlayerConnection { /// Handle to a player actor. #[derive(Clone)] pub struct PlayerHandle { - tx: Sender, + tx: Sender, } impl PlayerHandle { pub async fn subscribe(&self) -> PlayerConnection { let (sender, receiver) = channel(32); let (promise, deferred) = oneshot(); - let cmd = PlayerCommand::AddConnection { sender, promise }; + let cmd = ActorCommand::AddConnection { sender, promise }; let _ = self.tx.send(cmd).await; let connection_id = deferred.await.unwrap(); PlayerConnection { @@ -114,45 +124,34 @@ impl PlayerHandle { } } - pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: Str) -> Result<()> { - let (promise, deferred) = oneshot(); - let cmd = Cmd::SendMessage { room_id, body, promise }; - let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; - Ok(deferred.await?) - } - - pub async fn join_room(&self, room_id: RoomId, connection_id: ConnectionId) -> Result { - let (promise, deferred) = oneshot(); - let cmd = Cmd::JoinRoom { room_id, promise }; - let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; - Ok(deferred.await?) - } - - async fn send(&self, command: PlayerCommand) { + async fn send(&self, command: ActorCommand) { + // TODO either handle the error or doc why it is safe to ignore let _ = self.tx.send(command).await; } pub async fn update(&self, update: Updates) { - self.send(PlayerCommand::Update(update)).await; + self.send(ActorCommand::Update(update)).await; } } -enum PlayerCommand { - /** Commands from connections */ +/// Messages sent to the player actor. +enum ActorCommand { + /// Establish a new connection. AddConnection { sender: Sender, promise: Promise, }, + /// Terminate an existing connection. TerminateConnection(ConnectionId), - Cmd(Cmd, ConnectionId), - /// Query - responds with a list of rooms the player is a member of. - GetRooms(Promise>), - /** Events from rooms */ + /// Player-issued command. + ClientCommand(ClientCommand, ConnectionId), + /// Update which is sent from a room the player is member of. Update(Updates), Stop, } -pub enum Cmd { +/// Client-issued command sent to the player actor. The actor will respond with by fulfilling the promise. +pub enum ClientCommand { JoinRoom { room_id: RoomId, promise: Promise, @@ -171,6 +170,9 @@ pub enum Cmd { new_topic: Str, promise: Promise<()>, }, + GetRooms { + promise: Promise>, + }, } pub enum JoinResult { @@ -237,7 +239,7 @@ impl PlayerRegistry { pub async fn shutdown_all(&mut self) -> Result<()> { let mut inner = self.0.write().unwrap(); for (i, (k, j)) in inner.players.drain() { - k.send(PlayerCommand::Stop).await; + k.send(ActorCommand::Stop).await; drop(k); j.await?; log::debug!("Player stopped #{i:?}") @@ -260,7 +262,7 @@ struct Player { connections: AnonTable>, my_rooms: HashMap, banned_from: HashSet, - rx: Receiver, + rx: Receiver, handle: PlayerHandle, rooms: RoomRegistry, } @@ -285,124 +287,152 @@ impl Player { async fn main_loop(mut self) -> Self { while let Some(cmd) = self.rx.recv().await { match cmd { - PlayerCommand::AddConnection { sender, promise } => { + ActorCommand::AddConnection { sender, promise } => { let connection_id = self.connections.insert(sender); if let Err(connection_id) = promise.send(ConnectionId(connection_id)) { log::warn!("Connection {connection_id:?} terminated before finalization"); self.terminate_connection(connection_id); } } - PlayerCommand::TerminateConnection(connection_id) => { + ActorCommand::TerminateConnection(connection_id) => { self.terminate_connection(connection_id); } - PlayerCommand::GetRooms(promise) => { - let mut response = vec![]; - for (_, handle) in &self.my_rooms { - response.push(handle.get_room_info().await); - } - let _ = promise.send(response); - } - PlayerCommand::Update(update) => { - log::info!( - "Player received an update, broadcasting to {} connections", - self.connections.len() - ); - match update { - Updates::BannedFrom(ref room_id) => { - self.banned_from.insert(room_id.clone()); - self.my_rooms.remove(room_id); - } - _ => {} - } - for (_, connection) in &self.connections { - let _ = connection.send(update.clone()).await; - } - } - PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, - PlayerCommand::Stop => break, + ActorCommand::Update(update) => self.handle_update(update).await, + ActorCommand::ClientCommand(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, + ActorCommand::Stop => break, } } log::debug!("Shutting down player actor #{:?}", self.player_id); self } + /// Handle an incoming update by changing the internal state and broadcasting it to all connections if necessary. + async fn handle_update(&mut self, update: Updates) { + log::info!( + "Player received an update, broadcasting to {} connections", + self.connections.len() + ); + match update { + Updates::BannedFrom(ref room_id) => { + self.banned_from.insert(room_id.clone()); + self.my_rooms.remove(room_id); + } + _ => {} + } + for (_, connection) in &self.connections { + let _ = connection.send(update.clone()).await; + } + } + fn terminate_connection(&mut self, connection_id: ConnectionId) { if let None = self.connections.pop(connection_id.0) { log::warn!("Connection {connection_id:?} already terminated"); } } - async fn handle_cmd(&mut self, cmd: Cmd, connection_id: ConnectionId) { + /// Dispatches a client command to the appropriate handler. + async fn handle_cmd(&mut self, cmd: ClientCommand, connection_id: ConnectionId) { match cmd { - Cmd::JoinRoom { room_id, promise } => { - if self.banned_from.contains(&room_id) { - let _ = promise.send(JoinResult::Banned); - return; - } - - let room = match self.rooms.get_or_create_room(room_id.clone()).await { - Ok(room) => room, - Err(e) => { - log::error!("Failed to get or create room: {e}"); - return; - } - }; - room.subscribe(self.player_id.clone(), self.handle.clone()).await; - self.my_rooms.insert(room_id.clone(), room.clone()); - let room_info = room.get_room_info().await; - let _ = promise.send(JoinResult::Success(room_info)); - let update = Updates::RoomJoined { - room_id, - new_member_id: self.player_id.clone(), - }; - self.broadcast_update(update, connection_id).await; + ClientCommand::JoinRoom { room_id, promise } => { + let result = self.join_room(connection_id, room_id).await; + let _ = promise.send(result); } - Cmd::LeaveRoom { room_id, promise } => { - let room = self.my_rooms.remove(&room_id); - if let Some(room) = room { - room.unsubscribe(&self.player_id).await; - let room_info = room.get_room_info().await; - } + ClientCommand::LeaveRoom { room_id, promise } => { + self.leave_room(connection_id, room_id).await; let _ = promise.send(()); - let update = Updates::RoomLeft { - room_id, - former_member_id: self.player_id.clone(), - }; - self.broadcast_update(update, connection_id).await; } - Cmd::SendMessage { room_id, body, promise } => { - let room = self.rooms.get_room(&room_id).await; - if let Some(room) = room { - room.send_message(self.player_id.clone(), body.clone()).await; - } else { - tracing::info!("no room found"); - } + ClientCommand::SendMessage { room_id, body, promise } => { + self.send_message(connection_id, room_id, body).await; let _ = promise.send(()); - let update = Updates::NewMessage { - room_id, - author_id: self.player_id.clone(), - body, - }; - self.broadcast_update(update, connection_id).await; } - Cmd::ChangeTopic { + ClientCommand::ChangeTopic { room_id, new_topic, promise, } => { - let room = self.rooms.get_room(&room_id).await; - if let Some(mut room) = room { - room.set_topic(self.player_id.clone(), new_topic.clone()).await; - } else { - tracing::info!("no room found"); - } + self.change_topic(connection_id, room_id, new_topic).await; let _ = promise.send(()); - let update = Updates::RoomTopicChanged { room_id, new_topic }; - self.broadcast_update(update, connection_id).await; + } + ClientCommand::GetRooms { promise } => { + let result = self.get_rooms().await; + let _ = promise.send(result); } } } + async fn join_room(&mut self, connection_id: ConnectionId, room_id: RoomId) -> JoinResult { + if self.banned_from.contains(&room_id) { + return JoinResult::Banned; + } + + let room = match self.rooms.get_or_create_room(room_id.clone()).await { + Ok(room) => room, + Err(e) => { + log::error!("Failed to get or create room: {e}"); + todo!(); + } + }; + room.subscribe(self.player_id.clone(), self.handle.clone()).await; + self.my_rooms.insert(room_id.clone(), room.clone()); + let room_info = room.get_room_info().await; + let update = Updates::RoomJoined { + room_id, + new_member_id: self.player_id.clone(), + }; + self.broadcast_update(update, connection_id).await; + JoinResult::Success(room_info) + } + + async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) { + let room = self.my_rooms.remove(&room_id); + if let Some(room) = room { + room.unsubscribe(&self.player_id).await; + } + let update = Updates::RoomLeft { + room_id, + former_member_id: self.player_id.clone(), + }; + self.broadcast_update(update, connection_id).await; + } + + async fn send_message(&mut self, connection_id: ConnectionId, room_id: RoomId, body: Str) { + let room = self.rooms.get_room(&room_id).await; + if let Some(room) = room { + room.send_message(self.player_id.clone(), body.clone()).await; + } else { + tracing::info!("no room found"); + } + let update = Updates::NewMessage { + room_id, + author_id: self.player_id.clone(), + body, + }; + self.broadcast_update(update, connection_id).await; + } + + async fn change_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { + let room = self.rooms.get_room(&room_id).await; + if let Some(mut room) = room { + room.set_topic(self.player_id.clone(), new_topic.clone()).await; + } else { + tracing::info!("no room found"); + } + let update = Updates::RoomTopicChanged { room_id, new_topic }; + self.broadcast_update(update, connection_id).await; + } + + async fn get_rooms(&self) -> Vec { + let mut response = vec![]; + for (_, handle) in &self.my_rooms { + response.push(handle.get_room_info().await); + } + response + } + + /// Broadcasts an update to all connections except the one with the given id. + /// + /// This is called after handling a client command. + /// Sending the update to the connection which sent the command is handled by the connection itself. async fn broadcast_update(&self, update: Updates, except: ConnectionId) { for (a, b) in &self.connections { if ConnectionId(a) == except { diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index 193098e..04fdbb1 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -31,7 +31,7 @@ impl RoomId { } } -/// Shared datastructure for storing metadata about rooms. +/// Shared data structure for storing metadata about rooms. #[derive(Clone)] pub struct RoomRegistry(Arc>); impl RoomRegistry { @@ -160,9 +160,13 @@ impl RoomHandle { } struct Room { + /// The numeric node-local id of the room as it is stored in the database. storage_id: u32, + /// The cluster-global id of the room. room_id: RoomId, + /// Player actors on the local node which are subscribed to this room's updates. subscriptions: HashMap, + /// The total number of messages. Used to calculate the id of the new message. message_count: u32, topic: Str, storage: Storage, @@ -191,6 +195,10 @@ impl Room { Ok(()) } + /// Broadcasts an update to all players except the one who caused the update. + /// + /// This is called after handling a client command. + /// Sending the update to the player who sent the command is handled by the player actor. async fn broadcast_update(&self, update: Updates, except: &PlayerId) { tracing::debug!("Broadcasting an update to {} subs", self.subscriptions.len()); for (player_id, sub) in &self.subscriptions { diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 27b1d69..ee47f9b 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -44,7 +44,7 @@ struct RegisteredUser { /** * Username is mostly unused in modern IRC. * - * [https://stackoverflow.com/questions/31666247/what-is-the-difference-between-the-nick-username-and-real-name-in-irc-and-wha] + * */ username: Str, realname: Str, diff --git a/crates/proto-irc/src/client.rs b/crates/proto-irc/src/client.rs index 676fd40..66cf107 100644 --- a/crates/proto-irc/src/client.rs +++ b/crates/proto-irc/src/client.rs @@ -7,42 +7,42 @@ use nonempty::NonEmpty; /// Client-to-server command. #[derive(Clone, Debug, PartialEq, Eq)] pub enum ClientMessage { - /// CAP. Capability-related commands. + /// `CAP`. Capability-related commands. Capability { subcommand: CapabilitySubcommand, }, - /// PING + /// `PING ` Ping { token: Str, }, - /// PONG + /// `PONG ` Pong { token: Str, }, - /// NICK + /// `NICK ` Nick { nickname: Str, }, - /// PASS + /// `PASS ` Pass { password: Str, }, - /// USER 0 * : + /// `USER 0 * :` User { username: Str, realname: Str, }, - /// JOIN + /// `JOIN ` Join(Chan), - /// MODE + /// `MODE ` Mode { target: Recipient, }, - /// WHO + /// `WHO ` Who { target: Recipient, // aka mask }, - /// TOPIC : + /// `TOPIC :` Topic { chan: Chan, topic: Str, @@ -51,12 +51,12 @@ pub enum ClientMessage { chan: Chan, message: Str, }, - /// PRIVMSG : + /// `PRIVMSG :` PrivateMessage { recipient: Recipient, body: Str, }, - /// QUIT : + /// `QUIT :` Quit { reason: Str, }, diff --git a/crates/proto-irc/src/lib.rs b/crates/proto-irc/src/lib.rs index fd54809..54ff676 100644 --- a/crates/proto-irc/src/lib.rs +++ b/crates/proto-irc/src/lib.rs @@ -36,9 +36,9 @@ fn params(input: &str) -> IResult<&str, &str> { #[derive(Clone, Debug, PartialEq, Eq)] pub enum Chan { - /// # — network-global channel, available from any server in the network. + /// `#` — network-global channel, available from any server in the network. Global(Str), - /// & — server-local channel, available only to connections to the same server. Rarely used in practice. + /// `&` — server-local channel, available only to connections to the same server. Rarely used in practice. Local(Str), } impl Chan {