lavina/crates/lavina-core/src/room.rs

214 lines
7.3 KiB
Rust
Raw Normal View History

2023-02-04 01:01:49 +00:00
//! Domain of rooms — chats with multiple participants.
2023-09-30 23:12:11 +00:00
use std::{collections::HashMap, hash::Hash, sync::Arc};
2023-02-03 22:43:59 +00:00
2023-02-12 22:23:52 +00:00
use prometheus::{IntGauge, Registry as MetricRegistry};
2023-02-15 17:10:54 +00:00
use serde::Serialize;
use tokio::sync::RwLock as AsyncRwLock;
2023-02-03 22:43:59 +00:00
2023-09-30 23:12:11 +00:00
use crate::player::{PlayerHandle, PlayerId, Updates};
2023-08-17 13:41:28 +00:00
use crate::prelude::*;
2023-09-30 23:12:11 +00:00
use crate::repo::Storage;
2023-02-03 22:43:59 +00:00
/// Opaque room id
2023-02-15 17:10:54 +00:00
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
2023-04-13 22:38:26 +00:00
pub struct RoomId(Str);
2023-02-14 19:07:07 +00:00
impl RoomId {
2023-04-13 22:38:26 +00:00
pub fn from(str: impl Into<Str>) -> Result<RoomId> {
let bytes = str.into();
2023-02-14 19:07:07 +00:00
if bytes.len() > 32 {
2023-09-30 23:12:11 +00:00
return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols"));
2023-02-14 19:07:07 +00:00
}
2023-04-13 19:15:48 +00:00
if bytes.contains(' ') {
2023-02-14 19:07:07 +00:00
return Err(anyhow::Error::msg("Room name cannot contain spaces"));
}
Ok(RoomId(bytes))
}
2023-04-13 22:38:26 +00:00
pub fn as_inner(&self) -> &Str {
&self.0
}
2023-04-13 22:38:26 +00:00
pub fn into_inner(self) -> Str {
2023-04-11 16:28:03 +00:00
self.0
}
2023-02-14 19:07:07 +00:00
}
2023-02-03 22:43:59 +00:00
/// Shared datastructure for storing metadata about rooms.
#[derive(Clone)]
2023-08-17 13:41:28 +00:00
pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
2023-02-03 22:43:59 +00:00
impl RoomRegistry {
2023-08-17 13:41:28 +00:00
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> {
2023-09-30 23:12:11 +00:00
let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
2023-02-12 22:23:52 +00:00
metrics.register(Box::new(metric_active_rooms.clone()))?;
2023-02-03 22:43:59 +00:00
let inner = RoomRegistryInner {
rooms: HashMap::new(),
2023-02-12 22:23:52 +00:00
metric_active_rooms,
2023-08-17 13:41:28 +00:00
storage,
2023-02-03 22:43:59 +00:00
};
2023-08-17 13:41:28 +00:00
Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner))))
2023-02-03 22:43:59 +00:00
}
2023-08-17 13:41:28 +00:00
pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> {
let mut inner = self.0.write().await;
if let Some(room_handle) = inner.rooms.get(&room_id) {
2023-08-17 13:41:28 +00:00
// room was already loaded into memory
log::debug!("Room {} was loaded already", &room_id.0);
Ok(room_handle.clone())
} else if let Some(stored_room) = inner.storage.retrieve_room_by_name(&*room_id.0).await? {
// room exists, but was not loaded
log::debug!("Loading room {}...", &room_id.0);
let room = Room {
2023-08-18 14:45:48 +00:00
storage_id: stored_room.id,
2023-08-17 13:41:28 +00:00
room_id: room_id.clone(),
2023-08-18 14:45:48 +00:00
subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions
2023-08-17 13:41:28 +00:00
topic: stored_room.topic.into(),
message_count: stored_room.message_count,
2023-08-18 14:45:48 +00:00
storage: inner.storage.clone(),
2023-08-17 13:41:28 +00:00
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone());
inner.metric_active_rooms.inc();
Ok(room_handle)
2023-02-13 18:32:52 +00:00
} else {
2023-08-17 13:41:28 +00:00
// room does not exist, create it and load
log::debug!("Creating room {}...", &room_id.0);
let topic = "New room";
2023-08-18 14:45:48 +00:00
let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?;
let room = Room {
2023-08-18 14:45:48 +00:00
storage_id: id,
room_id: room_id.clone(),
subscriptions: HashMap::new(),
2023-08-17 13:41:28 +00:00
topic: topic.into(),
message_count: 0,
2023-08-18 14:45:48 +00:00
storage: inner.storage.clone(),
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone());
2023-02-13 18:32:52 +00:00
inner.metric_active_rooms.inc();
2023-08-17 13:41:28 +00:00
Ok(room_handle)
2023-02-13 18:32:52 +00:00
}
2023-02-03 22:43:59 +00:00
}
2023-08-17 13:41:28 +00:00
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
let inner = self.0.read().await;
let res = inner.rooms.get(room_id);
res.map(|r| r.clone())
2023-02-03 22:43:59 +00:00
}
2023-02-15 17:10:54 +00:00
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
let handles = {
2023-08-17 13:41:28 +00:00
let inner = self.0.read().await;
2023-02-15 17:10:54 +00:00
let handles = inner.rooms.values().cloned().collect::<Vec<_>>();
handles
};
let mut res = vec![];
for i in handles {
res.push(i.get_room_info().await)
}
res
}
2023-02-03 22:43:59 +00:00
}
struct RoomRegistryInner {
rooms: HashMap<RoomId, RoomHandle>,
2023-02-12 22:23:52 +00:00
metric_active_rooms: IntGauge,
2023-08-17 13:41:28 +00:00
storage: Storage,
2023-02-03 22:43:59 +00:00
}
#[derive(Clone)]
pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
2023-02-03 22:43:59 +00:00
impl RoomHandle {
pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) {
let mut lock = self.0.write().await;
lock.add_subscriber(player_id, player_handle).await;
2023-02-03 22:43:59 +00:00
}
2023-02-15 17:54:48 +00:00
pub async fn unsubscribe(&self, player_id: &PlayerId) {
let mut lock = self.0.write().await;
lock.subscriptions.remove(player_id);
let update = Updates::RoomLeft {
room_id: lock.room_id.clone(),
former_member_id: player_id.clone(),
};
lock.broadcast_update(update, player_id).await;
}
2023-04-13 22:38:26 +00:00
pub async fn send_message(&self, player_id: PlayerId, body: Str) {
2023-08-18 14:45:48 +00:00
let mut lock = self.0.write().await;
2023-09-30 23:12:11 +00:00
let res = lock.send_message(player_id, body).await;
2023-09-06 14:34:20 +00:00
if let Err(err) = res {
log::warn!("Failed to send message: {err:?}");
}
2023-02-03 22:43:59 +00:00
}
2023-02-14 00:44:03 +00:00
pub async fn get_room_info(&self) -> RoomInfo {
let lock = self.0.read().await;
RoomInfo {
id: lock.room_id.clone(),
2023-09-30 23:12:11 +00:00
members: lock.subscriptions.keys().map(|x| x.clone()).collect::<Vec<_>>(),
2023-02-14 18:46:42 +00:00
topic: lock.topic.clone(),
}
}
2023-04-13 22:38:26 +00:00
pub async fn set_topic(&mut self, changer_id: PlayerId, new_topic: Str) {
2023-02-14 18:46:42 +00:00
let mut lock = self.0.write().await;
lock.topic = new_topic.clone();
let update = Updates::RoomTopicChanged {
room_id: lock.room_id.clone(),
new_topic: new_topic.clone(),
};
lock.broadcast_update(update, &changer_id).await;
2023-02-14 00:44:03 +00:00
}
2023-02-03 22:43:59 +00:00
}
struct Room {
2023-08-18 14:45:48 +00:00
storage_id: u32,
room_id: RoomId,
2023-02-03 22:43:59 +00:00
subscriptions: HashMap<PlayerId, PlayerHandle>,
2023-08-17 13:41:28 +00:00
message_count: u32,
2023-04-13 22:38:26 +00:00
topic: Str,
2023-08-18 14:45:48 +00:00
storage: Storage,
2023-02-03 22:43:59 +00:00
}
impl Room {
async fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) {
tracing::info!("Adding a subscriber to room");
self.subscriptions.insert(player_id.clone(), player_handle);
let update = Updates::RoomJoined {
room_id: self.room_id.clone(),
new_member_id: player_id.clone(),
};
self.broadcast_update(update, &player_id).await;
}
2023-08-18 14:45:48 +00:00
async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> {
tracing::info!("Adding a message to room");
2023-09-30 23:12:11 +00:00
self.storage
2023-10-03 00:00:54 +00:00
.insert_message(self.storage_id, self.message_count, &body, &*author_id.as_inner())
2023-09-30 23:12:11 +00:00
.await?;
2023-08-18 14:45:48 +00:00
self.message_count += 1;
let update = Updates::NewMessage {
room_id: self.room_id.clone(),
author_id: author_id.clone(),
body,
};
self.broadcast_update(update, &author_id).await;
2023-08-18 14:45:48 +00:00
Ok(())
}
async fn broadcast_update(&self, update: Updates, except: &PlayerId) {
2023-09-06 14:34:20 +00:00
tracing::debug!("Broadcasting an update to {} subs", self.subscriptions.len());
for (player_id, sub) in &self.subscriptions {
if player_id == except {
continue;
}
log::info!("Sending a message from room to player");
sub.update(update.clone()).await;
}
2023-02-03 22:43:59 +00:00
}
}
2023-02-14 00:44:03 +00:00
2023-02-15 17:10:54 +00:00
#[derive(Serialize)]
2023-02-14 00:44:03 +00:00
pub struct RoomInfo {
pub id: RoomId,
pub members: Vec<PlayerId>,
2023-04-13 22:38:26 +00:00
pub topic: Str,
2023-02-14 00:44:03 +00:00
}