//! Domain of chat participants. //! //! Player is a single user account, which is used to participate in chats, //! including sending messages, receiving messaged, retrieving history and running privileged actions. //! A player corresponds to a single user account. Usually a person has only one account, //! but it is possible to have multiple accounts for one person and therefore multiple player entities. //! //! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor, //! so that they don't overload the room actor. use std::{ collections::{HashMap, HashSet}, sync::{Arc, RwLock}, }; use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, task::JoinHandle, }; use crate::{ core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}, prelude::*, util::table::{AnonTable, Key as AnonKey}, }; /// Opaque player identifier. Cannot contain spaces, must be shorter than 32. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub struct PlayerId(Box); impl PlayerId { pub fn from_bytes(bytes: Box) -> Result { if bytes.len() > 32 { return Err(fail("Nickname cannot be longer than 32 symbols")); } if bytes.contains(' ') { return Err(anyhow::Error::msg("Nickname cannot contain spaces")); } Ok(PlayerId(bytes)) } pub fn as_bytes(&self) -> &Box { &self.0 } pub fn into_bytes(self) -> Box { self.0 } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionId(pub AnonKey); pub struct PlayerConnection { pub connection_id: ConnectionId, pub receiver: Receiver, player_handle: PlayerHandle, } impl PlayerConnection { pub async fn send_message(&mut self, room_id: RoomId, body: Box) -> Result<()> { self.player_handle .send_message(room_id, self.connection_id.clone(), body) .await } pub async fn join_room(&mut self, room_id: RoomId) -> Result { self.player_handle .join_room(room_id, self.connection_id.clone()) .await } pub async fn change_topic(&mut self, room_id: RoomId, new_topic: ByteVec) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = Cmd::ChangeTopic { room_id, new_topic, promise, }; self.player_handle .send(PlayerCommand::Cmd(cmd, self.connection_id.clone())) .await; Ok(deferred.await?) } pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { let (promise, deferred) = oneshot(); self.player_handle .send(PlayerCommand::Cmd( Cmd::LeaveRoom { room_id, promise }, self.connection_id.clone(), )) .await; Ok(deferred.await?) } pub async fn terminate(self) { self.player_handle .send(PlayerCommand::TerminateConnection(self.connection_id)) .await; } pub async fn get_rooms(&self) -> Result> { let (promise, deferred) = oneshot(); self.player_handle .send(PlayerCommand::GetRooms(promise)) .await; Ok(deferred.await?) } } /// Handle to a player actor. #[derive(Clone)] pub struct PlayerHandle { tx: Sender, } impl PlayerHandle { pub async fn subscribe(&self) -> PlayerConnection { let (sender, receiver) = channel(32); let (promise, deferred) = oneshot(); let cmd = PlayerCommand::AddConnection { sender, promise }; let _ = self.tx.send(cmd).await; let connection_id = deferred.await.unwrap(); PlayerConnection { connection_id, player_handle: self.clone(), receiver, } } pub async fn send_message( &self, room_id: RoomId, connection_id: ConnectionId, body: Box, ) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = Cmd::SendMessage { room_id, body, promise, }; let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; Ok(deferred.await?) } pub async fn join_room( &self, room_id: RoomId, connection_id: ConnectionId, ) -> Result { let (promise, deferred) = oneshot(); let cmd = Cmd::JoinRoom { room_id, promise }; let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; Ok(deferred.await?) } async fn send(&self, command: PlayerCommand) { let _ = self.tx.send(command).await; } pub async fn update(&self, update: Updates) { self.send(PlayerCommand::Update(update)).await; } } enum PlayerCommand { /** Commands from connections */ AddConnection { sender: Sender, promise: Promise, }, TerminateConnection(ConnectionId), Cmd(Cmd, ConnectionId), /// Query - responds with a list of rooms the player is a member of. GetRooms(Promise>), /** Events from rooms */ Update(Updates), } pub enum Cmd { JoinRoom { room_id: RoomId, promise: Promise, }, LeaveRoom { room_id: RoomId, promise: Promise<()>, }, SendMessage { room_id: RoomId, body: Box, promise: Promise<()>, }, ChangeTopic { room_id: RoomId, new_topic: ByteVec, promise: Promise<()>, }, } pub enum JoinResult { Success(RoomInfo), Banned, } /// Player update event type which is sent to a player actor and from there to a connection handler. #[derive(Clone)] pub enum Updates { RoomTopicChanged { room_id: RoomId, new_topic: ByteVec, }, NewMessage { room_id: RoomId, author_id: PlayerId, body: Box, }, RoomJoined { room_id: RoomId, new_member_id: PlayerId, }, RoomLeft { room_id: RoomId, former_member_id: PlayerId, }, /// The player was banned from the room and left it immediately. BannedFrom(RoomId), } /// Handle to a player registry — a shared data structure containing information about players. #[derive(Clone)] pub struct PlayerRegistry(Arc>); impl PlayerRegistry { pub fn empty( room_registry: RoomRegistry, metrics: &mut MetricsRegistry, ) -> Result { let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; metrics.register(Box::new(metric_active_players.clone()))?; let inner = PlayerRegistryInner { room_registry, players: HashMap::new(), metric_active_players, }; Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) } pub async fn get_or_create_player(&mut self, id: PlayerId) -> PlayerHandle { let mut inner = self.0.write().unwrap(); if let Some((handle, _)) = inner.players.get(&id) { handle.clone() } else { let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone()); inner.players.insert(id, (handle.clone(), fiber)); inner.metric_active_players.inc(); handle } } pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection { let player_handle = self.get_or_create_player(id).await; player_handle.subscribe().await } } /// The player registry state representation. struct PlayerRegistryInner { room_registry: RoomRegistry, players: HashMap)>, metric_active_players: IntGauge, } /// Player actor inner state representation. struct Player { player_id: PlayerId, connections: AnonTable>, my_rooms: HashMap, banned_from: HashSet, rx: Receiver, handle: PlayerHandle, rooms: RoomRegistry, } impl Player { fn launch(player_id: PlayerId, rooms: RoomRegistry) -> (PlayerHandle, JoinHandle) { let (tx, rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); let player = Player { player_id, connections: AnonTable::new(), my_rooms: HashMap::new(), banned_from: HashSet::from([RoomId::from_bytes("empty".into()).unwrap()]), rx, handle, rooms, }; let fiber = tokio::task::spawn(player.main_loop()); (handle_clone, fiber) } async fn main_loop(mut self) -> Self { while let Some(cmd) = self.rx.recv().await { match cmd { PlayerCommand::AddConnection { sender, promise } => { let connection_id = self.connections.insert(sender); if let Err(connection_id) = promise.send(ConnectionId(connection_id)) { log::warn!("Connection {connection_id:?} terminated before finalization"); self.terminate_connection(connection_id); } } PlayerCommand::TerminateConnection(connection_id) => { self.terminate_connection(connection_id); } PlayerCommand::GetRooms(promise) => { let mut response = vec![]; for (_, handle) in &self.my_rooms { response.push(handle.get_room_info().await); } let _ = promise.send(response); } PlayerCommand::Update(update) => { log::info!( "Player received an update, broadcasting to {} connections", self.connections.len() ); match update { Updates::BannedFrom(ref room_id) => { self.banned_from.insert(room_id.clone()); self.my_rooms.remove(room_id); } _ => {} } for (_, connection) in &self.connections { let _ = connection.send(update.clone()).await; } } PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, } } self } fn terminate_connection(&mut self, connection_id: ConnectionId) { if let None = self.connections.pop(connection_id.0) { log::warn!("Connection {connection_id:?} already terminated"); } } async fn handle_cmd(&mut self, cmd: Cmd, connection_id: ConnectionId) { match cmd { Cmd::JoinRoom { room_id, promise } => { if self.banned_from.contains(&room_id) { let _ = promise.send(JoinResult::Banned); return; } let room = self.rooms.get_or_create_room(room_id.clone()); room.subscribe(self.player_id.clone(), self.handle.clone()) .await; self.my_rooms.insert(room_id.clone(), room.clone()); let room_info = room.get_room_info().await; let _ = promise.send(JoinResult::Success(room_info)); let update = Updates::RoomJoined { room_id, new_member_id: self.player_id.clone(), }; self.broadcast_update(update, connection_id).await; } Cmd::LeaveRoom { room_id, promise } => { let room = self.my_rooms.remove(&room_id); if let Some(room) = room { room.unsubscribe(&self.player_id).await; let room_info = room.get_room_info().await; } let _ = promise.send(()); let update = Updates::RoomLeft { room_id, former_member_id: self.player_id.clone(), }; self.broadcast_update(update, connection_id).await; } Cmd::SendMessage { room_id, body, promise, } => { let room = self.rooms.get_room(&room_id); if let Some(room) = room { room.send_message(self.player_id.clone(), body.clone()) .await; } else { tracing::info!("no room found"); } let _ = promise.send(()); let update = Updates::NewMessage { room_id, author_id: self.player_id.clone(), body, }; self.broadcast_update(update, connection_id).await; } Cmd::ChangeTopic { room_id, new_topic, promise, } => { let room = self.rooms.get_room(&room_id); if let Some(mut room) = room { room.set_topic(self.player_id.clone(), new_topic.clone()) .await; } else { tracing::info!("no room found"); } let _ = promise.send(()); let update = Updates::RoomTopicChanged { room_id, new_topic }; self.broadcast_update(update, connection_id).await; } } } async fn broadcast_update(&self, update: Updates, except: ConnectionId) { for (a, b) in &self.connections { if ConnectionId(a) == except { continue; } let _ = b.send(update.clone()).await; } } }