//! Domain of rooms — chats with multiple participants. use std::{ collections::HashMap, hash::Hash, sync::{Arc, RwLock}, }; use prometheus::{IntGauge, Registry as MetricRegistry}; use serde::Serialize; use tokio::sync::RwLock as AsyncRwLock; use crate::{ core::player::{PlayerHandle, PlayerId, Updates}, prelude::*, }; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub struct RoomId(ByteVec); impl RoomId { pub fn from_bytes(bytes: ByteVec) -> Result { if bytes.len() > 32 { return Err(anyhow::Error::msg( "Room name cannot be longer than 32 symbols", )); } if bytes.contains(&b' ') { return Err(anyhow::Error::msg("Room name cannot contain spaces")); } Ok(RoomId(bytes)) } pub fn as_bytes(&self) -> &ByteVec { &self.0 } } /// Shared datastructure for storing metadata about rooms. #[derive(Clone)] pub struct RoomRegistry(Arc>); impl RoomRegistry { pub fn empty(metrics: &mut MetricRegistry) -> Result { let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; let inner = RoomRegistryInner { rooms: HashMap::new(), metric_active_rooms, }; Ok(RoomRegistry(Arc::new(RwLock::new(inner)))) } pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle { let mut inner = self.0.write().unwrap(); if let Some(room_handle) = inner.rooms.get(&room_id) { room_handle.clone() } else { let room = Room { room_id: room_id.clone(), subscriptions: HashMap::new(), topic: b"New room".to_vec(), }; let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); inner.rooms.insert(room_id, room_handle.clone()); inner.metric_active_rooms.inc(); room_handle } } pub fn get_room(&self, room_id: &RoomId) -> Option { let inner = self.0.read().unwrap(); let res = inner.rooms.get(room_id); res.map(|r| r.clone()) } pub async fn get_all_rooms(&self) -> Vec { let handles = { let inner = self.0.read().unwrap(); let handles = inner.rooms.values().cloned().collect::>(); handles }; let mut res = vec![]; for i in handles { res.push(i.get_room_info().await) } res } } struct RoomRegistryInner { rooms: HashMap, metric_active_rooms: IntGauge, } #[derive(Clone)] pub struct RoomHandle(Arc>); impl RoomHandle { pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) { let mut lock = self.0.write().await; lock.add_subscriber(player_id, player_handle).await; } pub async fn unsubscribe(&self, player_id: &PlayerId) { let mut lock = self.0.write().await; lock.subscriptions.remove(player_id); let update = Updates::RoomLeft { room_id: lock.room_id.clone(), former_member_id: player_id.clone(), }; lock.broadcast_update(update, player_id).await; } pub async fn send_message(&self, player_id: PlayerId, body: String) { let lock = self.0.read().await; lock.send_message(player_id, body).await; } pub async fn get_room_info(&self) -> RoomInfo { let lock = self.0.read().await; RoomInfo { id: lock.room_id.clone(), members: lock .subscriptions .keys() .map(|x| x.clone()) .collect::>(), topic: lock.topic.clone(), } } pub async fn set_topic(&mut self, changer_id: PlayerId, new_topic: ByteVec) { let mut lock = self.0.write().await; lock.topic = new_topic.clone(); let update = Updates::RoomTopicChanged { room_id: lock.room_id.clone(), new_topic: new_topic.clone(), }; lock.broadcast_update(update, &changer_id).await; } } struct Room { room_id: RoomId, subscriptions: HashMap, topic: ByteVec, } impl Room { async fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { tracing::info!("Adding a subscriber to room"); self.subscriptions.insert(player_id.clone(), player_handle); let update = Updates::RoomJoined { room_id: self.room_id.clone(), new_member_id: player_id.clone(), }; self.broadcast_update(update, &player_id).await; } async fn send_message(&self, author_id: PlayerId, body: String) { tracing::info!("Adding a message to room"); let update = Updates::NewMessage { room_id: self.room_id.clone(), author_id: author_id.clone(), body, }; self.broadcast_update(update, &author_id).await; } async fn broadcast_update(&self, update: Updates, except: &PlayerId) { for (player_id, sub) in &self.subscriptions { if player_id == except { continue; } log::info!("Sending a message from room to player"); sub.update(update.clone()).await; } } } #[derive(Serialize)] pub struct RoomInfo { pub id: RoomId, pub members: Vec, pub topic: ByteVec, }