diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 6493087..831539b 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -47,6 +47,7 @@ impl PlayerId { pub struct ConnectionId(pub AnonKey); /// Representation of an authenticated client connection. +/// The public API available to projections through which all client actions are executed. /// /// The connection is used to send commands to the player actor and to receive updates that might be sent to the client. pub struct PlayerConnection { @@ -55,6 +56,7 @@ pub struct PlayerConnection { player_handle: PlayerHandle, } impl PlayerConnection { + /// Handled in [Player::send_message]. pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::SendMessage { room_id, body, promise }; @@ -62,6 +64,7 @@ impl PlayerConnection { Ok(deferred.await?) } + /// Handled in [Player::join_room]. pub async fn join_room(&mut self, room_id: RoomId) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::JoinRoom { room_id, promise }; @@ -69,6 +72,7 @@ impl PlayerConnection { Ok(deferred.await?) } + /// Handled in [Player::change_topic]. pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::ChangeTopic { @@ -80,6 +84,7 @@ impl PlayerConnection { Ok(deferred.await?) } + /// Handled in [Player::leave_room]. pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::LeaveRoom { room_id, promise }; @@ -296,22 +301,7 @@ impl Player { } let _ = promise.send(response); } - ActorCommand::Update(update) => { - log::info!( - "Player received an update, broadcasting to {} connections", - self.connections.len() - ); - match update { - Updates::BannedFrom(ref room_id) => { - self.banned_from.insert(room_id.clone()); - self.my_rooms.remove(room_id); - } - _ => {} - } - for (_, connection) in &self.connections { - let _ = connection.send(update.clone()).await; - } - } + ActorCommand::Update(update) => self.handle_update(update).await, ActorCommand::ClientCommand(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, ActorCommand::Stop => break, } @@ -320,83 +310,117 @@ impl Player { self } + /// Handle an incoming update by changing the internal state and broadcasting it to all connections if necessary. + async fn handle_update(&mut self, update: Updates) { + log::info!( + "Player received an update, broadcasting to {} connections", + self.connections.len() + ); + match update { + Updates::BannedFrom(ref room_id) => { + self.banned_from.insert(room_id.clone()); + self.my_rooms.remove(room_id); + } + _ => {} + } + for (_, connection) in &self.connections { + let _ = connection.send(update.clone()).await; + } + } + fn terminate_connection(&mut self, connection_id: ConnectionId) { if let None = self.connections.pop(connection_id.0) { log::warn!("Connection {connection_id:?} already terminated"); } } + /// Dispatches a client command to the appropriate handler. async fn handle_cmd(&mut self, cmd: ClientCommand, connection_id: ConnectionId) { match cmd { ClientCommand::JoinRoom { room_id, promise } => { - if self.banned_from.contains(&room_id) { - let _ = promise.send(JoinResult::Banned); - return; - } - - let room = match self.rooms.get_or_create_room(room_id.clone()).await { - Ok(room) => room, - Err(e) => { - log::error!("Failed to get or create room: {e}"); - return; - } - }; - room.subscribe(self.player_id.clone(), self.handle.clone()).await; - self.my_rooms.insert(room_id.clone(), room.clone()); - let room_info = room.get_room_info().await; - let _ = promise.send(JoinResult::Success(room_info)); - let update = Updates::RoomJoined { - room_id, - new_member_id: self.player_id.clone(), - }; - self.broadcast_update(update, connection_id).await; + let result = self.join_room(connection_id, room_id).await; + let _ = promise.send(result); } ClientCommand::LeaveRoom { room_id, promise } => { - let room = self.my_rooms.remove(&room_id); - if let Some(room) = room { - room.unsubscribe(&self.player_id).await; - let room_info = room.get_room_info().await; - } + self.leave_room(connection_id, room_id).await; let _ = promise.send(()); - let update = Updates::RoomLeft { - room_id, - former_member_id: self.player_id.clone(), - }; - self.broadcast_update(update, connection_id).await; } ClientCommand::SendMessage { room_id, body, promise } => { - let room = self.rooms.get_room(&room_id).await; - if let Some(room) = room { - room.send_message(self.player_id.clone(), body.clone()).await; - } else { - tracing::info!("no room found"); - } + self.send_message(connection_id, room_id, body).await; let _ = promise.send(()); - let update = Updates::NewMessage { - room_id, - author_id: self.player_id.clone(), - body, - }; - self.broadcast_update(update, connection_id).await; } ClientCommand::ChangeTopic { room_id, new_topic, promise, } => { - let room = self.rooms.get_room(&room_id).await; - if let Some(mut room) = room { - room.set_topic(self.player_id.clone(), new_topic.clone()).await; - } else { - tracing::info!("no room found"); - } + self.change_topic(connection_id, room_id, new_topic).await; let _ = promise.send(()); - let update = Updates::RoomTopicChanged { room_id, new_topic }; - self.broadcast_update(update, connection_id).await; } } } + async fn join_room(&mut self, connection_id: ConnectionId, room_id: RoomId) -> JoinResult { + if self.banned_from.contains(&room_id) { + return JoinResult::Banned; + } + + let room = match self.rooms.get_or_create_room(room_id.clone()).await { + Ok(room) => room, + Err(e) => { + log::error!("Failed to get or create room: {e}"); + todo!(); + } + }; + room.subscribe(self.player_id.clone(), self.handle.clone()).await; + self.my_rooms.insert(room_id.clone(), room.clone()); + let room_info = room.get_room_info().await; + let update = Updates::RoomJoined { + room_id, + new_member_id: self.player_id.clone(), + }; + self.broadcast_update(update, connection_id).await; + JoinResult::Success(room_info) + } + + async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) { + let room = self.my_rooms.remove(&room_id); + if let Some(room) = room { + room.unsubscribe(&self.player_id).await; + } + let update = Updates::RoomLeft { + room_id, + former_member_id: self.player_id.clone(), + }; + self.broadcast_update(update, connection_id).await; + } + + async fn send_message(&mut self, connection_id: ConnectionId, room_id: RoomId, body: Str) { + let room = self.rooms.get_room(&room_id).await; + if let Some(room) = room { + room.send_message(self.player_id.clone(), body.clone()).await; + } else { + tracing::info!("no room found"); + } + let update = Updates::NewMessage { + room_id, + author_id: self.player_id.clone(), + body, + }; + self.broadcast_update(update, connection_id).await; + } + + async fn change_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { + let room = self.rooms.get_room(&room_id).await; + if let Some(mut room) = room { + room.set_topic(self.player_id.clone(), new_topic.clone()).await; + } else { + tracing::info!("no room found"); + } + let update = Updates::RoomTopicChanged { room_id, new_topic }; + self.broadcast_update(update, connection_id).await; + } + /// Broadcasts an update to all connections except the one with the given id. /// /// This is called after handling a client command.