//! Domain of rooms — chats with multiple participants. use std::collections::HashSet; use std::{collections::HashMap, hash::Hash, sync::Arc}; use chrono::{DateTime, Utc}; use prometheus::{IntGauge, Registry as MetricRegistry}; use serde::Serialize; use tokio::sync::RwLock as AsyncRwLock; use crate::player::{PlayerHandle, PlayerId, Updates}; use crate::prelude::*; use crate::repo::Storage; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub struct RoomId(Str); impl RoomId { pub fn from(str: impl Into) -> Result { let bytes = str.into(); if bytes.len() > 32 { return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); } if bytes.contains(' ') { return Err(anyhow::Error::msg("Room name cannot contain spaces")); } Ok(RoomId(bytes)) } pub fn as_inner(&self) -> &Str { &self.0 } pub fn into_inner(self) -> Str { self.0 } } /// Shared data structure for storing metadata about rooms. #[derive(Clone)] pub struct RoomRegistry(Arc>); impl RoomRegistry { pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result { let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; let inner = RoomRegistryInner { rooms: HashMap::new(), metric_active_rooms, storage, }; Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner)))) } pub fn shutdown(self) -> Result<()> { let res = match Arc::try_unwrap(self.0) { Ok(e) => e, Err(_) => return Err(fail("failed to acquire rooms ownership on shutdown")), }; let res = res.into_inner(); // TODO drop all rooms drop(res); Ok(()) } #[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")] pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result { let mut inner = self.0.write().await; if let Some(room_handle) = inner.get_or_load_room(&room_id).await? { Ok(room_handle.clone()) } else { log::debug!("Creating room {}...", &room_id.0); let topic = "New room"; let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?; let room = Room { storage_id: id, room_id: room_id.clone(), subscriptions: HashMap::new(), members: HashSet::new(), topic: topic.into(), message_count: 0, storage: inner.storage.clone(), }; let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); inner.rooms.insert(room_id, room_handle.clone()); inner.metric_active_rooms.inc(); Ok(room_handle) } } #[tracing::instrument(skip(self), name = "RoomRegistry::get_room")] pub async fn get_room(&self, room_id: &RoomId) -> Option { let mut inner = self.0.write().await; inner.get_or_load_room(room_id).await.unwrap() } #[tracing::instrument(skip(self), name = "RoomRegistry::get_all_rooms")] pub async fn get_all_rooms(&self) -> Vec { let handles = { let inner = self.0.read().await; let handles = inner.rooms.values().cloned().collect::>(); handles }; let mut res = vec![]; for i in handles { res.push(i.get_room_info().await) } res } } struct RoomRegistryInner { rooms: HashMap, metric_active_rooms: IntGauge, storage: Storage, } impl RoomRegistryInner { #[tracing::instrument(skip(self), name = "RoomRegistryInner::get_or_load_room")] async fn get_or_load_room(&mut self, room_id: &RoomId) -> Result> { if let Some(room_handle) = self.rooms.get(room_id) { log::debug!("Room {} was loaded already", &room_id.0); Ok(Some(room_handle.clone())) } else if let Some(stored_room) = self.storage.retrieve_room_by_name(&*room_id.0).await? { log::debug!("Loading room {}...", &room_id.0); let room = Room { storage_id: stored_room.id, room_id: room_id.clone(), subscriptions: HashMap::new(), members: HashSet::new(), // TODO load members from storage topic: stored_room.topic.into(), message_count: stored_room.message_count, storage: self.storage.clone(), }; let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); self.rooms.insert(room_id.clone(), room_handle.clone()); self.metric_active_rooms.inc(); Ok(Some(room_handle)) } else { tracing::debug!("Room {} does not exist", &room_id.0); Ok(None) } } } #[derive(Clone)] pub struct RoomHandle(Arc>); impl RoomHandle { #[tracing::instrument(skip(self, player_handle), name = "RoomHandle::subscribe")] pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) { let mut lock = self.0.write().await; tracing::info!("Adding a subscriber to a room"); lock.subscriptions.insert(player_id.clone(), player_handle); } #[tracing::instrument(skip(self), name = "RoomHandle::add_member")] pub async fn add_member(&self, player_id: &PlayerId, player_storage_id: u32) { let mut lock = self.0.write().await; tracing::info!("Adding a new member to a room"); let room_storage_id = lock.storage_id; if !lock.storage.is_room_member(room_storage_id, player_storage_id).await.unwrap() { lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap(); } else { tracing::warn!("User {:#?} has already been added to the room.", player_id); } lock.members.insert(player_id.clone()); let update = Updates::RoomJoined { room_id: lock.room_id.clone(), new_member_id: player_id.clone(), }; lock.broadcast_update(update, player_id).await; } #[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")] pub async fn unsubscribe(&self, player_id: &PlayerId) { let mut lock = self.0.write().await; lock.subscriptions.remove(player_id); } #[tracing::instrument(skip(self), name = "RoomHandle::remove_member")] pub async fn remove_member(&self, player_id: &PlayerId, player_storage_id: u32) { let mut lock = self.0.write().await; tracing::info!("Removing a member from a room"); let room_storage_id = lock.storage_id; lock.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap(); lock.members.remove(player_id); let update = Updates::RoomLeft { room_id: lock.room_id.clone(), former_member_id: player_id.clone(), }; lock.broadcast_update(update, player_id).await; } #[tracing::instrument(skip(self, body, created_at), name = "RoomHandle::send_message")] pub async fn send_message(&self, player_id: &PlayerId, body: Str, created_at: DateTime) { let mut lock = self.0.write().await; let res = lock.send_message(player_id, body, created_at).await; if let Err(err) = res { log::warn!("Failed to send message: {err:?}"); } } #[tracing::instrument(skip(self), name = "RoomHandle::get_room_info")] pub async fn get_room_info(&self) -> RoomInfo { let lock = self.0.read().await; RoomInfo { id: lock.room_id.clone(), members: lock.subscriptions.keys().map(|x| x.clone()).collect::>(), topic: lock.topic.clone(), } } #[tracing::instrument(skip(self, new_topic), name = "RoomHandle::set_topic")] pub async fn set_topic(&self, changer_id: &PlayerId, new_topic: Str) { let mut lock = self.0.write().await; let storage_id = lock.storage_id; lock.topic = new_topic.clone(); lock.storage.set_room_topic(storage_id, &new_topic).await.unwrap(); let update = Updates::RoomTopicChanged { room_id: lock.room_id.clone(), new_topic: new_topic.clone(), }; lock.broadcast_update(update, changer_id).await; } } struct Room { /// The numeric node-local id of the room as it is stored in the database. storage_id: u32, /// The cluster-global id of the room. room_id: RoomId, /// Player actors on the local node which are subscribed to this room's updates. subscriptions: HashMap, /// Members of the room. members: HashSet, /// The total number of messages. Used to calculate the id of the new message. message_count: u32, topic: Str, storage: Storage, } impl Room { #[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")] async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime) -> Result<()> { tracing::info!("Adding a message to room"); self.storage .insert_message( self.storage_id, self.message_count, &body, &*author_id.as_inner(), &created_at, ) .await?; self.message_count += 1; let update = Updates::NewMessage { room_id: self.room_id.clone(), author_id: author_id.clone(), body, created_at, }; self.broadcast_update(update, author_id).await; Ok(()) } /// Broadcasts an update to all players except the one who caused the update. /// /// This is called after handling a client command. /// Sending the update to the player who sent the command is handled by the player actor. #[tracing::instrument(skip(self, update), name = "Room::broadcast_update")] async fn broadcast_update(&self, update: Updates, except: &PlayerId) { tracing::debug!("Broadcasting an update to {} subs", self.subscriptions.len()); for (player_id, sub) in &self.subscriptions { if player_id == except { continue; } log::info!("Sending a message from room to player"); sub.update(update.clone()).await; } } } #[derive(Serialize)] pub struct RoomInfo { pub id: RoomId, pub members: Vec, pub topic: Str, }