forked from lavina/lavina
1
0
Fork 0
lavina/src/core/room.rs

125 lines
3.7 KiB
Rust
Raw Normal View History

2023-02-04 01:01:49 +00:00
//! Domain of rooms — chats with multiple participants.
2023-02-03 22:43:59 +00:00
use std::{
collections::HashMap,
hash::Hash,
sync::{Arc, RwLock},
};
2023-02-12 22:23:52 +00:00
use prometheus::{IntGauge, Registry as MetricRegistry};
2023-02-03 22:43:59 +00:00
use tokio::sync::mpsc::{channel, Sender};
use crate::{
2023-02-04 01:01:49 +00:00
core::player::{PlayerHandle, PlayerId},
2023-02-03 22:43:59 +00:00
prelude::*,
};
/// Opaque room id
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RoomId(pub u64);
/// Shared datastructure for storing metadata about rooms.
#[derive(Clone)]
pub struct RoomRegistry(Arc<RwLock<RoomRegistryInner>>);
impl RoomRegistry {
2023-02-12 22:23:52 +00:00
pub fn empty(metrics: &mut MetricRegistry) -> Result<RoomRegistry> {
let metric_active_rooms =
IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
metrics.register(Box::new(metric_active_rooms.clone()))?;
2023-02-03 22:43:59 +00:00
let inner = RoomRegistryInner {
next_room_id: RoomId(0),
rooms: HashMap::new(),
2023-02-12 22:23:52 +00:00
metric_active_rooms,
2023-02-03 22:43:59 +00:00
};
2023-02-12 22:23:52 +00:00
Ok(RoomRegistry(Arc::new(RwLock::new(inner))))
2023-02-03 22:43:59 +00:00
}
pub fn create_room(&mut self) -> (RoomId, RoomHandle) {
let room = Room {
subscriptions: HashMap::new(),
};
let mut inner = self.0.write().unwrap();
let room_id = inner.next_room_id;
inner.next_room_id.0 += 1;
let (room_handle, fiber) = room.launch(room_id);
inner.rooms.insert(room_id, (room_handle.clone(), fiber));
(room_id, room_handle)
}
pub fn get_room(&self, room_id: RoomId) -> Option<RoomHandle> {
let inner = self.0.read().unwrap();
let res = inner.rooms.get(&room_id);
res.map(|r| r.0.clone())
}
}
struct RoomRegistryInner {
next_room_id: RoomId,
rooms: HashMap<RoomId, (RoomHandle, JoinHandle<Room>)>,
2023-02-12 22:23:52 +00:00
metric_active_rooms: IntGauge,
2023-02-03 22:43:59 +00:00
}
#[derive(Clone)]
pub struct RoomHandle {
tx: Sender<RoomCommand>,
}
impl RoomHandle {
pub async fn subscribe(&mut self, player_id: PlayerId, player: PlayerHandle) {
match self
.tx
.send(RoomCommand::AddSubscriber { player_id, player })
.await
{
Ok(_) => {}
Err(_) => {
tracing::error!("Room mailbox is closed unexpectedly");
}
};
}
pub async fn send_message(&mut self, player_id: PlayerId, body: String) {
self.tx
.send(RoomCommand::SendMessage { player_id, body })
.await;
}
}
enum RoomCommand {
AddSubscriber {
player_id: PlayerId,
player: PlayerHandle,
},
SendMessage {
player_id: PlayerId,
body: String,
},
}
struct Room {
subscriptions: HashMap<PlayerId, PlayerHandle>,
}
impl Room {
fn launch(mut self, room_id: RoomId) -> (RoomHandle, JoinHandle<Room>) {
let (tx, mut rx) = channel(32);
let fiber = tokio::task::spawn(async move {
tracing::info!("Starting room fiber");
while let Some(a) = rx.recv().await {
match a {
RoomCommand::AddSubscriber { player_id, player } => {
tracing::info!("Adding a subscriber to room");
self.subscriptions.insert(player_id, player);
}
RoomCommand::SendMessage { player_id, body } => {
tracing::info!("Adding a message to room");
for (_, sub) in &mut self.subscriptions {
sub.receive_message(room_id, player_id, body.clone()).await;
}
}
}
}
tracing::info!("Stopping room fiber");
self
});
(RoomHandle { tx }, fiber)
}
}