//! Domain of chat participants. //! //! Player is a single user account, which is used to participate in chats, //! including sending messages, receiving messaged, retrieving history and running privileged actions. //! A player corresponds to a single user account. Usually a person has only one account, //! but it is possible to have multiple accounts for one person and therefore multiple player entities. //! //! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor, //! so that they don't overload the room actor. use std::collections::{HashMap, HashSet}; use chrono::{DateTime, Utc}; use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use tracing::{Instrument, Span}; use crate::clustering::room::*; use crate::prelude::*; use crate::room::{RoomHandle, RoomId, RoomInfo}; use crate::table::{AnonTable, Key as AnonKey}; use crate::LavinaCore; /// Opaque player identifier. Cannot contain spaces, must be shorter than 32. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub struct PlayerId(Str); impl PlayerId { pub fn from(str: impl Into) -> Result { let bytes = str.into(); if bytes.len() > 32 { return Err(fail("Nickname cannot be longer than 32 symbols")); } if bytes.contains(' ') { return Err(anyhow::Error::msg("Nickname cannot contain spaces")); } Ok(PlayerId(bytes)) } pub fn as_inner(&self) -> &Str { &self.0 } pub fn into_inner(self) -> Str { self.0 } } /// Node-local identifier of a connection. It is used to address a connection within a player actor. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionId(pub AnonKey); /// Representation of an authenticated client connection. /// The public API available to projections through which all client actions are executed. /// /// The connection is used to send commands to the player actor and to receive updates that might be sent to the client. pub struct PlayerConnection { pub connection_id: ConnectionId, pub receiver: Receiver, player_handle: PlayerHandle, } impl PlayerConnection { /// Handled in [Player::send_room_message]. #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_message")] pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::SendMessage { room_id, body, promise }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } /// Handled in [Player::join_room]. #[tracing::instrument(skip(self), name = "PlayerConnection::join_room")] pub async fn join_room(&mut self, room_id: RoomId) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::JoinRoom { room_id, promise }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } /// Handled in [Player::change_room_topic]. #[tracing::instrument(skip(self, new_topic), name = "PlayerConnection::change_topic")] pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::ChangeTopic { room_id, new_topic, promise, }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } /// Handled in [Player::leave_room]. #[tracing::instrument(skip(self), name = "PlayerConnection::leave_room")] pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::LeaveRoom { room_id, promise }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } pub async fn terminate(self) { self.player_handle.send(ActorCommand::TerminateConnection(self.connection_id)).await; } /// Handled in [Player::get_rooms]. #[tracing::instrument(skip(self), name = "PlayerConnection::get_rooms")] pub async fn get_rooms(&self) -> Result> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::GetRooms { promise }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } /// Handler in [Player::send_dialog_message]. #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_dialog_message")] pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::SendDialogMessage { recipient, body, promise, }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } /// Handler in [Player::check_user_existence]. #[tracing::instrument(skip(self), name = "PlayerConnection::check_user_existence")] pub async fn check_user_existence(&self, recipient: PlayerId) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::GetInfo { recipient, promise }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } } /// Handle to a player actor. #[derive(Clone)] pub struct PlayerHandle { tx: Sender<(ActorCommand, Span)>, } impl PlayerHandle { pub async fn subscribe(&self) -> PlayerConnection { let (sender, receiver) = channel(32); let (promise, deferred) = oneshot(); let cmd = ActorCommand::AddConnection { sender, promise }; self.send(cmd).await; let connection_id = deferred.await.unwrap(); PlayerConnection { connection_id, player_handle: self.clone(), receiver, } } async fn send(&self, command: ActorCommand) { let span = tracing::span!(tracing::Level::INFO, "PlayerHandle::send"); // TODO either handle the error or doc why it is safe to ignore let _ = self.tx.send((command, span)).await; } pub async fn update(&self, update: Updates) { self.send(ActorCommand::Update(update)).await; } } /// Messages sent to the player actor. enum ActorCommand { /// Establish a new connection. AddConnection { sender: Sender, promise: Promise, }, /// Terminate an existing connection. TerminateConnection(ConnectionId), /// Player-issued command. ClientCommand(ClientCommand, ConnectionId), /// Update which is sent from a room the player is member of. Update(Updates), Stop, } /// Client-issued command sent to the player actor. The actor will respond with by fulfilling the promise. pub enum ClientCommand { JoinRoom { room_id: RoomId, promise: Promise, }, LeaveRoom { room_id: RoomId, promise: Promise<()>, }, SendMessage { room_id: RoomId, body: Str, promise: Promise, }, ChangeTopic { room_id: RoomId, new_topic: Str, promise: Promise<()>, }, GetRooms { promise: Promise>, }, SendDialogMessage { recipient: PlayerId, body: Str, promise: Promise<()>, }, GetInfo { recipient: PlayerId, promise: Promise, }, } pub enum GetInfoResult { UserExists, UserDoesntExist, } pub enum JoinResult { Success(RoomInfo), AlreadyJoined, Banned, } pub enum SendMessageResult { Success(DateTime), NoSuchRoom, } /// Player update event type which is sent to a player actor and from there to a connection handler. #[derive(Clone, Debug)] pub enum Updates { RoomTopicChanged { room_id: RoomId, new_topic: Str, }, NewMessage { room_id: RoomId, author_id: PlayerId, body: Str, created_at: DateTime, }, RoomJoined { room_id: RoomId, new_member_id: PlayerId, }, RoomLeft { room_id: RoomId, former_member_id: PlayerId, }, /// The player was banned from the room and left it immediately. BannedFrom(RoomId), NewDialogMessage { sender: PlayerId, receiver: PlayerId, body: Str, created_at: DateTime, }, } /// Handle to a player registry — a shared data structure containing information about players. pub(crate) struct PlayerRegistry(RwLock); impl PlayerRegistry { pub fn empty(metrics: &mut MetricsRegistry) -> Result { let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; metrics.register(Box::new(metric_active_players.clone()))?; let inner = PlayerRegistryInner { players: HashMap::new(), metric_active_players, }; Ok(PlayerRegistry(RwLock::new(inner))) } pub fn shutdown(self) { let res = self.0.into_inner(); drop(res); } #[tracing::instrument(skip(self), name = "PlayerRegistry::get_player")] pub async fn get_player(&self, id: &PlayerId) -> Option { let inner = self.0.read().await; inner.players.get(id).map(|(handle, _)| handle.clone()) } #[tracing::instrument(skip(self), name = "PlayerRegistry::stop_player")] pub async fn stop_player(&self, id: &PlayerId) -> Result> { let mut inner = self.0.write().await; if let Some((handle, fiber)) = inner.players.remove(id) { handle.send(ActorCommand::Stop).await; drop(handle); fiber.await?; inner.metric_active_players.dec(); Ok(Some(())) } else { Ok(None) } } #[tracing::instrument(skip(self, core), name = "PlayerRegistry::get_or_launch_player")] pub async fn get_or_launch_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerHandle { let inner = self.0.read().await; if let Some((handle, _)) = inner.players.get(id) { handle.clone() } else { drop(inner); let mut inner = self.0.write().await; if let Some((handle, _)) = inner.players.get(id) { handle.clone() } else { let (handle, fiber) = Player::launch(id.clone(), core.clone()).await; inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.metric_active_players.inc(); handle } } } #[tracing::instrument(skip(self, core), name = "PlayerRegistry::connect_to_player")] pub async fn connect_to_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerConnection { let player_handle = self.get_or_launch_player(core, id).await; player_handle.subscribe().await } pub async fn shutdown_all(&self) -> Result<()> { let mut inner = self.0.write().await; for (i, (k, j)) in inner.players.drain() { k.send(ActorCommand::Stop).await; drop(k); j.await?; log::debug!("Player stopped #{i:?}") } log::debug!("All players stopped"); Ok(()) } } /// The player registry state representation. struct PlayerRegistryInner { /// Active player actors. players: HashMap)>, metric_active_players: IntGauge, } enum RoomRef { Local(RoomHandle), Remote { node_id: u32 }, } /// Player actor inner state representation. struct Player { player_id: PlayerId, storage_id: u32, connections: AnonTable>, my_rooms: HashMap, banned_from: HashSet, rx: Receiver<(ActorCommand, Span)>, handle: PlayerHandle, services: LavinaCore, } impl Player { async fn launch(player_id: PlayerId, core: LavinaCore) -> (PlayerHandle, JoinHandle) { let (tx, rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); let storage_id = core.services.storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap(); let player = Player { player_id, storage_id, // connections are empty when the actor is just started connections: AnonTable::new(), // room handlers will be loaded later in the started task my_rooms: HashMap::new(), // TODO implement and load bans banned_from: HashSet::new(), rx, handle, services: core, }; let fiber = tokio::task::spawn(player.main_loop()); (handle_clone, fiber) } fn room_location(&self, room_id: &RoomId) -> Option { let res = self.services.cluster_metadata.rooms.get(room_id.as_inner().as_ref()).copied(); let node = res.unwrap_or(self.services.cluster_metadata.main_owner); if node == self.services.cluster_metadata.node_id { None } else { Some(node) } } async fn main_loop(mut self) -> Self { let rooms = self.services.storage.get_rooms_of_a_user(self.storage_id).await.unwrap(); for room_id in rooms { if let Some(remote_node) = self.room_location(&room_id) { self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node }); self.services.subscribe(self.player_id.clone(), room_id).await; } else { let room = self.services.rooms.get_room(&self.services, &room_id).await; if let Some(room) = room { self.my_rooms.insert(room_id, RoomRef::Local(room)); } else { tracing::error!("Room #{room_id:?} not found"); } } } while let Some(cmd) = self.rx.recv().await { let (cmd, span) = cmd; let should_stop = async { match cmd { ActorCommand::AddConnection { sender, promise } => { let connection_id = self.connections.insert(sender); if let Err(connection_id) = promise.send(ConnectionId(connection_id)) { log::warn!("Connection {connection_id:?} terminated before finalization"); self.terminate_connection(connection_id); } false } ActorCommand::TerminateConnection(connection_id) => { self.terminate_connection(connection_id); false } ActorCommand::Update(update) => { self.handle_update(update).await; false } ActorCommand::ClientCommand(cmd, connection_id) => { self.handle_cmd(cmd, connection_id).await; false } ActorCommand::Stop => true, } } .instrument(span) .await; if should_stop { break; } } log::debug!("Shutting down player actor #{:?}", self.player_id); self } /// Handle an incoming update by changing the internal state and broadcasting it to all connections if necessary. #[tracing::instrument(skip(self, update), name = "Player::handle_update")] async fn handle_update(&mut self, update: Updates) { log::debug!( "Player received an update, broadcasting to {} connections", self.connections.len() ); match update { Updates::BannedFrom(ref room_id) => { self.banned_from.insert(room_id.clone()); self.my_rooms.remove(room_id); } _ => {} } for (_, connection) in &self.connections { let _ = connection.send(ConnectionMessage::Update(update.clone())).await; } } fn terminate_connection(&mut self, connection_id: ConnectionId) { if let None = self.connections.pop(connection_id.0) { log::warn!("Connection {connection_id:?} already terminated"); } } /// Dispatches a client command to the appropriate handler. async fn handle_cmd(&mut self, cmd: ClientCommand, connection_id: ConnectionId) { match cmd { ClientCommand::JoinRoom { room_id, promise } => { let result = self.join_room(connection_id, room_id).await; let _ = promise.send(result); } ClientCommand::LeaveRoom { room_id, promise } => { self.leave_room(connection_id, room_id).await; let _ = promise.send(()); } ClientCommand::SendMessage { room_id, body, promise } => { let result = self.send_room_message(connection_id, room_id, body).await; let _ = promise.send(result); } ClientCommand::ChangeTopic { room_id, new_topic, promise, } => { self.change_room_topic(connection_id, room_id, new_topic).await; let _ = promise.send(()); } ClientCommand::GetRooms { promise } => { let result = self.get_rooms().await; let _ = promise.send(result); } ClientCommand::SendDialogMessage { recipient, body, promise, } => { self.send_dialog_message(connection_id, recipient, body).await; let _ = promise.send(()); } ClientCommand::GetInfo { recipient, promise } => { let result = self.check_user_existence(recipient).await; let _ = promise.send(result); } } } #[tracing::instrument(skip(self), name = "Player::join_room")] async fn join_room(&mut self, connection_id: ConnectionId, room_id: RoomId) -> JoinResult { if self.banned_from.contains(&room_id) { return JoinResult::Banned; } if self.my_rooms.contains_key(&room_id) { return JoinResult::AlreadyJoined; } if let Some(remote_node) = self.room_location(&room_id) { let req = JoinRoomReq { room_id: room_id.as_inner(), player_id: self.player_id.as_inner(), }; self.services.client.join_room(remote_node, req).await.unwrap(); let room_storage_id = self.services.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap(); self.services.storage.add_room_member(room_storage_id, self.storage_id).await.unwrap(); self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node }); JoinResult::Success(RoomInfo { id: room_id, topic: "unknown".into(), members: vec![], }) } else { let room = match self.services.rooms.get_or_create_room(&self.services, room_id.clone()).await { Ok(room) => room, Err(e) => { log::error!("Failed to get or create room: {e}"); todo!(); } }; room.add_member(&self.services, &self.player_id, self.storage_id).await; room.subscribe(&self.player_id, self.handle.clone()).await; self.my_rooms.insert(room_id.clone(), RoomRef::Local(room.clone())); let room_info = room.get_room_info().await; let update = Updates::RoomJoined { room_id, new_member_id: self.player_id.clone(), }; self.broadcast_update(update, connection_id).await; JoinResult::Success(room_info) } } #[tracing::instrument(skip(self), name = "Player::leave_room")] async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) { let room = self.my_rooms.remove(&room_id); if let Some(room) = room { match room { RoomRef::Local(room) => { room.unsubscribe(&self.player_id).await; room.remove_member(&self.services, &self.player_id, self.storage_id).await; } RoomRef::Remote { node_id } => { let req = LeaveRoomReq { room_id: room_id.as_inner(), player_id: self.player_id.as_inner(), }; self.services.client.leave_room(node_id, req).await.unwrap(); let room_storage_id = self.services.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap(); self.services.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap(); } } } let update = Updates::RoomLeft { room_id, former_member_id: self.player_id.clone(), }; self.broadcast_update(update, connection_id).await; } #[tracing::instrument(skip(self, body), name = "Player::send_room_message")] async fn send_room_message( &mut self, connection_id: ConnectionId, room_id: RoomId, body: Str, ) -> SendMessageResult { let Some(room) = self.my_rooms.get(&room_id) else { tracing::info!("no room found"); return SendMessageResult::NoSuchRoom; }; let created_at = Utc::now(); match room { RoomRef::Local(room) => { room.send_message(&self.services, &self.player_id, body.clone(), created_at.clone()).await; } RoomRef::Remote { node_id } => { let req = SendMessageReq { room_id: room_id.as_inner(), player_id: self.player_id.as_inner(), message: &*body, created_at: &*created_at.to_rfc3339(), }; self.services.client.send_room_message(*node_id, req).await.unwrap(); self.services .broadcast( room_id.clone(), self.player_id.clone(), body.clone(), created_at.clone(), ) .await; } } let update = Updates::NewMessage { room_id, author_id: self.player_id.clone(), body, created_at, }; self.broadcast_update(update, connection_id).await; SendMessageResult::Success(created_at) } #[tracing::instrument(skip(self, new_topic), name = "Player::change_room_topic")] async fn change_room_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { let Some(room) = self.my_rooms.get(&room_id) else { tracing::info!("no room found"); return; }; match room { RoomRef::Local(room) => { room.set_topic(&self.services, &self.player_id, new_topic.clone()).await; } RoomRef::Remote { node_id } => { let req = SetRoomTopicReq { room_id: room_id.as_inner(), player_id: self.player_id.as_inner(), topic: &*new_topic, }; self.services.client.set_room_topic(*node_id, req).await.unwrap(); } } let update = Updates::RoomTopicChanged { room_id, new_topic }; self.broadcast_update(update, connection_id).await; } #[tracing::instrument(skip(self), name = "Player::get_rooms")] async fn get_rooms(&self) -> Vec { let mut response = vec![]; for (room_id, handle) in &self.my_rooms { match handle { RoomRef::Local(handle) => { response.push(handle.get_room_info().await); } RoomRef::Remote { .. } => { let room_info = RoomInfo { id: room_id.clone(), topic: "unknown".into(), members: vec![], }; response.push(room_info); } } } response } #[tracing::instrument(skip(self, body), name = "Player::send_dialog_message")] async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) { let created_at = Utc::now(); self.services .send_dialog_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at) .await .unwrap(); let update = Updates::NewDialogMessage { sender: self.player_id.clone(), receiver: recipient.clone(), body, created_at, }; self.broadcast_update(update, connection_id).await; } #[tracing::instrument(skip(self), name = "Player::check_user_existence")] async fn check_user_existence(&self, recipient: PlayerId) -> GetInfoResult { if self.services.storage.check_user_existence(recipient.as_inner().as_ref()).await.unwrap() { GetInfoResult::UserExists } else { GetInfoResult::UserDoesntExist } } /// Broadcasts an update to all connections except the one with the given id. /// /// This is called after handling a client command. /// Sending the update to the connection which sent the command is handled by the connection itself. #[tracing::instrument(skip(self, update), name = "Player::broadcast_update")] async fn broadcast_update(&self, update: Updates, except: ConnectionId) { for (a, b) in &self.connections { if ConnectionId(a) == except { continue; } let _ = b.send(ConnectionMessage::Update(update.clone())).await; } } } pub enum ConnectionMessage { Update(Updates), Stop(StopReason), } #[derive(Debug)] pub enum StopReason { ServerShutdown, InternalError, }