//! Domain of rooms — chats with multiple participants. use std::{ collections::HashMap, hash::Hash, sync::{Arc, RwLock}, }; use prometheus::{IntGauge, Registry as MetricRegistry}; use tokio::sync::mpsc::{channel, Sender}; use crate::{ core::player::{PlayerHandle, PlayerId}, prelude::*, }; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RoomId(pub ByteVec); /// Shared datastructure for storing metadata about rooms. #[derive(Clone)] pub struct RoomRegistry(Arc>); impl RoomRegistry { pub fn empty(metrics: &mut MetricRegistry) -> Result { let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; let inner = RoomRegistryInner { rooms: HashMap::new(), metric_active_rooms, }; Ok(RoomRegistry(Arc::new(RwLock::new(inner)))) } pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle { let room = Room { subscriptions: HashMap::new(), }; let mut inner = self.0.write().unwrap(); let (room_handle, fiber) = room.launch(room_id.clone()); inner.rooms.insert(room_id, (room_handle.clone(), fiber)); inner.metric_active_rooms.inc(); room_handle } pub fn get_room(&self, room_id: RoomId) -> Option { let inner = self.0.read().unwrap(); let res = inner.rooms.get(&room_id); res.map(|r| r.0.clone()) } } struct RoomRegistryInner { rooms: HashMap)>, metric_active_rooms: IntGauge, } #[derive(Clone)] pub struct RoomHandle { tx: Sender, } impl RoomHandle { pub async fn subscribe(&mut self, player_id: PlayerId, player: PlayerHandle) { match self .tx .send(RoomCommand::AddSubscriber { player_id, player }) .await { Ok(_) => {} Err(_) => { tracing::error!("Room mailbox is closed unexpectedly"); } }; } pub async fn send_message(&mut self, player_id: PlayerId, body: String) { self.tx .send(RoomCommand::SendMessage { player_id, body }) .await; } } enum RoomCommand { AddSubscriber { player_id: PlayerId, player: PlayerHandle, }, SendMessage { player_id: PlayerId, body: String, }, } struct Room { subscriptions: HashMap, } impl Room { fn launch(mut self, room_id: RoomId) -> (RoomHandle, JoinHandle) { let (tx, mut rx) = channel(32); let fiber = tokio::task::spawn(async move { tracing::info!("Starting room fiber"); while let Some(a) = rx.recv().await { match a { RoomCommand::AddSubscriber { player_id, player } => { tracing::info!("Adding a subscriber to room"); self.subscriptions.insert(player_id, player); } RoomCommand::SendMessage { player_id, body } => { tracing::info!("Adding a message to room"); for (_, sub) in &mut self.subscriptions { sub.receive_message(room_id.clone(), player_id.clone(), body.clone()) .await; } } } } tracing::info!("Stopping room fiber"); self }); (RoomHandle { tx }, fiber) } }