//! Domain of rooms — chats with multiple participants. use std::{ collections::HashMap, hash::Hash, sync::{Arc, RwLock}, }; use prometheus::{IntGauge, Registry as MetricRegistry}; use tokio::sync::RwLock as AsyncRwLock; use crate::{ core::player::{PlayerCommand, PlayerHandle, PlayerId}, prelude::*, }; use super::player::{ConnectionId, IncomingPlayerEvent}; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RoomId(pub ByteVec); /// Shared datastructure for storing metadata about rooms. #[derive(Clone)] pub struct RoomRegistry(Arc>); impl RoomRegistry { pub fn empty(metrics: &mut MetricRegistry) -> Result { let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; let inner = RoomRegistryInner { rooms: HashMap::new(), metric_active_rooms, }; Ok(RoomRegistry(Arc::new(RwLock::new(inner)))) } pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle { let mut inner = self.0.write().unwrap(); if let Some(room_handle) = inner.rooms.get(&room_id) { room_handle.clone() } else { let room = Room { room_id: room_id.clone(), subscriptions: HashMap::new(), topic: b"New room".to_vec(), }; let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); inner.rooms.insert(room_id, room_handle.clone()); inner.metric_active_rooms.inc(); room_handle } } pub fn get_room(&self, room_id: &RoomId) -> Option { let inner = self.0.read().unwrap(); let res = inner.rooms.get(room_id); res.map(|r| r.clone()) } } struct RoomRegistryInner { rooms: HashMap, metric_active_rooms: IntGauge, } #[derive(Clone)] pub struct RoomHandle(Arc>); impl RoomHandle { pub async fn subscribe( &self, player_id: PlayerId, connection_id: ConnectionId, player_handle: PlayerHandle, ) { let mut lock = self.0.write().await; lock.add_subscriber(player_id, connection_id, player_handle) .await; } pub async fn send_message( &self, player_id: PlayerId, connection_id: ConnectionId, body: String, ) { let lock = self.0.read().await; lock.send_message(player_id, connection_id, body).await; } pub async fn get_members(&self) -> Vec { let lock = self.0.read().await; lock.subscriptions .keys() .map(|x| x.clone()) .collect::>() } pub async fn get_room_info(&self) -> RoomInfo { let lock = self.0.read().await; RoomInfo { id: lock.room_id.clone(), members: lock .subscriptions .keys() .map(|x| x.clone()) .collect::>(), topic: lock.topic.clone(), } } pub async fn set_topic(&mut self, new_topic: ByteVec) { let mut lock = self.0.write().await; lock.topic = new_topic.clone(); for (player_id, player_handle) in &lock.subscriptions { let msg = player_handle .send(PlayerCommand::Event( IncomingPlayerEvent::IncomingRoomTopicChanged { room_id: lock.room_id.clone(), new_topic: new_topic.clone(), }, )) .await; } } } struct Room { room_id: RoomId, subscriptions: HashMap, topic: ByteVec, } impl Room { async fn add_subscriber( &mut self, player_id: PlayerId, connection_id: ConnectionId, player_handle: PlayerHandle, ) { tracing::info!("Adding a subscriber to room"); self.subscriptions.insert(player_id.clone(), player_handle); for (_, sub) in &self.subscriptions { sub.send(PlayerCommand::IncomingRoomJoined { room_id: self.room_id.clone(), new_member_id: player_id.clone(), connection_id: connection_id.clone(), }) .await; } } async fn send_message(&self, player_id: PlayerId, connection_id: ConnectionId, body: String) { tracing::info!("Adding a message to room"); for (_, sub) in &self.subscriptions { log::info!("Sending a message from room to player"); sub.receive_message( self.room_id.clone(), player_id.clone(), connection_id.clone(), body.clone(), ) .await; } } } pub struct RoomInfo { pub id: RoomId, pub members: Vec, pub topic: ByteVec, }