forked from lavina/lavina
280 lines
10 KiB
Rust
280 lines
10 KiB
Rust
//! Domain of rooms — chats with multiple participants.
|
|
use std::collections::HashSet;
|
|
use std::{collections::HashMap, hash::Hash, sync::Arc};
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use prometheus::{IntGauge, Registry as MetricRegistry};
|
|
use serde::Serialize;
|
|
use tokio::sync::RwLock as AsyncRwLock;
|
|
|
|
use crate::player::{PlayerHandle, PlayerId, Updates};
|
|
use crate::prelude::*;
|
|
use crate::repo::Storage;
|
|
|
|
/// Opaque room id
|
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
|
|
pub struct RoomId(Str);
|
|
impl RoomId {
|
|
pub fn from(str: impl Into<Str>) -> Result<RoomId> {
|
|
let bytes = str.into();
|
|
if bytes.len() > 32 {
|
|
return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols"));
|
|
}
|
|
if bytes.contains(' ') {
|
|
return Err(anyhow::Error::msg("Room name cannot contain spaces"));
|
|
}
|
|
Ok(RoomId(bytes))
|
|
}
|
|
pub fn as_inner(&self) -> &Str {
|
|
&self.0
|
|
}
|
|
pub fn into_inner(self) -> Str {
|
|
self.0
|
|
}
|
|
}
|
|
|
|
/// Shared data structure for storing metadata about rooms.
|
|
#[derive(Clone)]
|
|
pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
|
|
impl RoomRegistry {
|
|
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> {
|
|
let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
|
|
metrics.register(Box::new(metric_active_rooms.clone()))?;
|
|
let inner = RoomRegistryInner {
|
|
rooms: HashMap::new(),
|
|
metric_active_rooms,
|
|
storage,
|
|
};
|
|
Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner))))
|
|
}
|
|
|
|
pub fn shutdown(self) -> Result<()> {
|
|
let res = match Arc::try_unwrap(self.0) {
|
|
Ok(e) => e,
|
|
Err(_) => return Err(fail("failed to acquire rooms ownership on shutdown")),
|
|
};
|
|
let res = res.into_inner();
|
|
// TODO drop all rooms
|
|
drop(res);
|
|
Ok(())
|
|
}
|
|
|
|
#[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")]
|
|
pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> {
|
|
let mut inner = self.0.write().await;
|
|
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? {
|
|
Ok(room_handle.clone())
|
|
} else {
|
|
log::debug!("Creating room {}...", &room_id.0);
|
|
let topic = "New room";
|
|
let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?;
|
|
let room = Room {
|
|
storage_id: id,
|
|
room_id: room_id.clone(),
|
|
subscriptions: HashMap::new(),
|
|
members: HashSet::new(),
|
|
topic: topic.into(),
|
|
message_count: 0,
|
|
storage: inner.storage.clone(),
|
|
};
|
|
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
|
|
inner.rooms.insert(room_id, room_handle.clone());
|
|
inner.metric_active_rooms.inc();
|
|
Ok(room_handle)
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(skip(self), name = "RoomRegistry::get_room")]
|
|
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
|
|
let mut inner = self.0.write().await;
|
|
inner.get_or_load_room(room_id).await.unwrap()
|
|
}
|
|
|
|
#[tracing::instrument(skip(self), name = "RoomRegistry::get_all_rooms")]
|
|
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
|
|
let handles = {
|
|
let inner = self.0.read().await;
|
|
let handles = inner.rooms.values().cloned().collect::<Vec<_>>();
|
|
handles
|
|
};
|
|
let mut res = vec![];
|
|
for i in handles {
|
|
res.push(i.get_room_info().await)
|
|
}
|
|
res
|
|
}
|
|
}
|
|
|
|
struct RoomRegistryInner {
|
|
rooms: HashMap<RoomId, RoomHandle>,
|
|
metric_active_rooms: IntGauge,
|
|
storage: Storage,
|
|
}
|
|
|
|
impl RoomRegistryInner {
|
|
#[tracing::instrument(skip(self), name = "RoomRegistryInner::get_or_load_room")]
|
|
async fn get_or_load_room(&mut self, room_id: &RoomId) -> Result<Option<RoomHandle>> {
|
|
if let Some(room_handle) = self.rooms.get(room_id) {
|
|
log::debug!("Room {} was loaded already", &room_id.0);
|
|
Ok(Some(room_handle.clone()))
|
|
} else if let Some(stored_room) = self.storage.retrieve_room_by_name(&*room_id.0).await? {
|
|
log::debug!("Loading room {}...", &room_id.0);
|
|
let room = Room {
|
|
storage_id: stored_room.id,
|
|
room_id: room_id.clone(),
|
|
subscriptions: HashMap::new(),
|
|
members: HashSet::new(), // TODO load members from storage
|
|
topic: stored_room.topic.into(),
|
|
message_count: stored_room.message_count,
|
|
storage: self.storage.clone(),
|
|
};
|
|
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
|
|
self.rooms.insert(room_id.clone(), room_handle.clone());
|
|
self.metric_active_rooms.inc();
|
|
Ok(Some(room_handle))
|
|
} else {
|
|
tracing::debug!("Room {} does not exist", &room_id.0);
|
|
Ok(None)
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
|
|
impl RoomHandle {
|
|
#[tracing::instrument(skip(self, player_handle), name = "RoomHandle::subscribe")]
|
|
pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) {
|
|
let mut lock = self.0.write().await;
|
|
tracing::info!("Adding a subscriber to a room");
|
|
lock.subscriptions.insert(player_id.clone(), player_handle);
|
|
}
|
|
|
|
#[tracing::instrument(skip(self), name = "RoomHandle::add_member")]
|
|
pub async fn add_member(&self, player_id: &PlayerId, player_storage_id: u32) {
|
|
let mut lock = self.0.write().await;
|
|
tracing::info!("Adding a new member to a room");
|
|
let room_storage_id = lock.storage_id;
|
|
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
|
|
lock.members.insert(player_id.clone());
|
|
let update = Updates::RoomJoined {
|
|
room_id: lock.room_id.clone(),
|
|
new_member_id: player_id.clone(),
|
|
};
|
|
lock.broadcast_update(update, player_id).await;
|
|
}
|
|
|
|
#[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")]
|
|
pub async fn unsubscribe(&self, player_id: &PlayerId) {
|
|
let mut lock = self.0.write().await;
|
|
lock.subscriptions.remove(player_id);
|
|
}
|
|
|
|
#[tracing::instrument(skip(self), name = "RoomHandle::remove_member")]
|
|
pub async fn remove_member(&self, player_id: &PlayerId, player_storage_id: u32) {
|
|
let mut lock = self.0.write().await;
|
|
tracing::info!("Removing a member from a room");
|
|
let room_storage_id = lock.storage_id;
|
|
lock.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap();
|
|
lock.members.remove(player_id);
|
|
let update = Updates::RoomLeft {
|
|
room_id: lock.room_id.clone(),
|
|
former_member_id: player_id.clone(),
|
|
};
|
|
lock.broadcast_update(update, player_id).await;
|
|
}
|
|
|
|
#[tracing::instrument(skip(self, body, created_at), name = "RoomHandle::send_message")]
|
|
pub async fn send_message(&self, player_id: &PlayerId, body: Str, created_at: DateTime<Utc>) {
|
|
let mut lock = self.0.write().await;
|
|
let res = lock.send_message(player_id, body, created_at).await;
|
|
if let Err(err) = res {
|
|
log::warn!("Failed to send message: {err:?}");
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(skip(self), name = "RoomHandle::get_room_info")]
|
|
pub async fn get_room_info(&self) -> RoomInfo {
|
|
let lock = self.0.read().await;
|
|
RoomInfo {
|
|
id: lock.room_id.clone(),
|
|
members: lock.subscriptions.keys().map(|x| x.clone()).collect::<Vec<_>>(),
|
|
topic: lock.topic.clone(),
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(skip(self, new_topic), name = "RoomHandle::set_topic")]
|
|
pub async fn set_topic(&self, changer_id: &PlayerId, new_topic: Str) {
|
|
let mut lock = self.0.write().await;
|
|
let storage_id = lock.storage_id;
|
|
lock.topic = new_topic.clone();
|
|
lock.storage.set_room_topic(storage_id, &new_topic).await.unwrap();
|
|
let update = Updates::RoomTopicChanged {
|
|
room_id: lock.room_id.clone(),
|
|
new_topic: new_topic.clone(),
|
|
};
|
|
lock.broadcast_update(update, changer_id).await;
|
|
}
|
|
}
|
|
|
|
struct Room {
|
|
/// The numeric node-local id of the room as it is stored in the database.
|
|
storage_id: u32,
|
|
/// The cluster-global id of the room.
|
|
room_id: RoomId,
|
|
/// Player actors on the local node which are subscribed to this room's updates.
|
|
subscriptions: HashMap<PlayerId, PlayerHandle>,
|
|
/// Members of the room.
|
|
members: HashSet<PlayerId>,
|
|
/// The total number of messages. Used to calculate the id of the new message.
|
|
message_count: u32,
|
|
topic: Str,
|
|
storage: Storage,
|
|
}
|
|
impl Room {
|
|
#[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")]
|
|
async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime<Utc>) -> Result<()> {
|
|
tracing::info!("Adding a message to room");
|
|
self.storage
|
|
.insert_message(
|
|
self.storage_id,
|
|
self.message_count,
|
|
&body,
|
|
&*author_id.as_inner(),
|
|
&created_at,
|
|
)
|
|
.await?;
|
|
self.message_count += 1;
|
|
let update = Updates::NewMessage {
|
|
room_id: self.room_id.clone(),
|
|
author_id: author_id.clone(),
|
|
body,
|
|
created_at,
|
|
};
|
|
self.broadcast_update(update, author_id).await;
|
|
Ok(())
|
|
}
|
|
|
|
/// Broadcasts an update to all players except the one who caused the update.
|
|
///
|
|
/// This is called after handling a client command.
|
|
/// Sending the update to the player who sent the command is handled by the player actor.
|
|
#[tracing::instrument(skip(self, update), name = "Room::broadcast_update")]
|
|
async fn broadcast_update(&self, update: Updates, except: &PlayerId) {
|
|
tracing::debug!("Broadcasting an update to {} subs", self.subscriptions.len());
|
|
for (player_id, sub) in &self.subscriptions {
|
|
if player_id == except {
|
|
continue;
|
|
}
|
|
log::info!("Sending a message from room to player");
|
|
sub.update(update.clone()).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct RoomInfo {
|
|
pub id: RoomId,
|
|
pub members: Vec<PlayerId>,
|
|
pub topic: Str,
|
|
}
|