forked from lavina/lavina
1
0
Fork 0

persistence for rooms

This commit is contained in:
Nikita Vilunov 2023-08-17 15:41:28 +02:00
parent ef5c0dbbf6
commit 1b5ac1491a
7 changed files with 112 additions and 25 deletions

View File

@ -11,5 +11,21 @@ create table challenges_plain_password(
create table rooms( create table rooms(
id integer primary key autoincrement not null, id integer primary key autoincrement not null,
name string unique not null name string unique not null,
topic string not null,
message_count integer not null default 0
);
create table messages(
room_id integer not null,
id integer not null, -- unique per room, sequential in one room
content string not null,
primary key (room_id, id)
);
create table memberships(
user_id integer not null,
room_id integer not null,
status integer not null, -- 0 for not-joined, 1 for joined, 2 for banned
primary key (user_id, room_id)
); );

View File

@ -354,7 +354,13 @@ impl Player {
return; return;
} }
let room = self.rooms.get_or_create_room(room_id.clone()); let room = match self.rooms.get_or_create_room(room_id.clone()).await {
Ok(room) => room,
Err(e) => {
log::error!("Failed to get or create room: {e}");
return;
}
};
room.subscribe(self.player_id.clone(), self.handle.clone()) room.subscribe(self.player_id.clone(), self.handle.clone())
.await; .await;
self.my_rooms.insert(room_id.clone(), room.clone()); self.my_rooms.insert(room_id.clone(), room.clone());
@ -384,7 +390,7 @@ impl Player {
body, body,
promise, promise,
} => { } => {
let room = self.rooms.get_room(&room_id); let room = self.rooms.get_room(&room_id).await;
if let Some(room) = room { if let Some(room) = room {
room.send_message(self.player_id.clone(), body.clone()) room.send_message(self.player_id.clone(), body.clone())
.await; .await;
@ -404,7 +410,7 @@ impl Player {
new_topic, new_topic,
promise, promise,
} => { } => {
let room = self.rooms.get_room(&room_id); let room = self.rooms.get_room(&room_id).await;
if let Some(mut room) = room { if let Some(mut room) = room {
room.set_topic(self.player_id.clone(), new_topic.clone()) room.set_topic(self.player_id.clone(), new_topic.clone())
.await; .await;

View File

@ -47,10 +47,39 @@ impl Storage {
Ok(res) Ok(res)
} }
pub async fn retrieve_room_by_name(&mut self, name: &str) -> Result<Option<StoredRoom>> {
let mut executor = self.conn.lock().await;
let res = sqlx::query_as(
"select id, name, topic, message_count
from rooms
where name = ?;",
)
.bind(name)
.fetch_optional(&mut *executor)
.await?;
Ok(res)
}
pub async fn create_new_room(&mut self, name: &str, topic: &str) -> Result<()> {
let mut executor = self.conn.lock().await;
let _: (u32,) = sqlx::query_as(
"insert into rooms(name, topic)
values (?, ?)
returning id;",
)
.bind(name)
.bind(topic)
.fetch_one(&mut *executor)
.await?;
Ok(())
}
pub async fn close(mut self) -> Result<()> { pub async fn close(mut self) -> Result<()> {
let res = match Arc::try_unwrap(self.conn) { let res = match Arc::try_unwrap(self.conn) {
Ok(e) => e, Ok(e) => e,
Err(_) => return Err(fail("failed to acquire DB ownership on shutdown")), Err(e) => return Err(fail("failed to acquire DB ownership on shutdown")),
}; };
let res = res.into_inner(); let res = res.into_inner();
res.close().await?; res.close().await?;
@ -64,3 +93,11 @@ pub struct StoredUser {
pub name: String, pub name: String,
pub password: Option<String>, pub password: Option<String>,
} }
#[derive(FromRow)]
pub struct StoredRoom {
pub id: u32,
pub name: String,
pub topic: String,
pub message_count: u32,
}

View File

@ -2,17 +2,16 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
hash::Hash, hash::Hash,
sync::{Arc, RwLock}, sync::Arc,
}; };
use prometheus::{IntGauge, Registry as MetricRegistry}; use prometheus::{IntGauge, Registry as MetricRegistry};
use serde::Serialize; use serde::Serialize;
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
use crate::{ use crate::core::player::{PlayerHandle, PlayerId, Updates};
core::player::{PlayerHandle, PlayerId, Updates}, use crate::core::repo::Storage;
prelude::*, use crate::prelude::*;
};
/// Opaque room id /// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
@ -40,45 +39,66 @@ impl RoomId {
/// Shared datastructure for storing metadata about rooms. /// Shared datastructure for storing metadata about rooms.
#[derive(Clone)] #[derive(Clone)]
pub struct RoomRegistry(Arc<RwLock<RoomRegistryInner>>); pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
impl RoomRegistry { impl RoomRegistry {
pub fn empty(metrics: &mut MetricRegistry) -> Result<RoomRegistry> { pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> {
let metric_active_rooms = let metric_active_rooms =
IntGauge::new("chat_rooms_active", "Number of alive room actors")?; IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
metrics.register(Box::new(metric_active_rooms.clone()))?; metrics.register(Box::new(metric_active_rooms.clone()))?;
let inner = RoomRegistryInner { let inner = RoomRegistryInner {
rooms: HashMap::new(), rooms: HashMap::new(),
metric_active_rooms, metric_active_rooms,
storage,
}; };
Ok(RoomRegistry(Arc::new(RwLock::new(inner)))) Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner))))
} }
pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle { pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> {
let mut inner = self.0.write().unwrap(); let mut inner = self.0.write().await;
if let Some(room_handle) = inner.rooms.get(&room_id) { if let Some(room_handle) = inner.rooms.get(&room_id) {
room_handle.clone() // room was already loaded into memory
} else { log::debug!("Room {} was loaded already", &room_id.0);
Ok(room_handle.clone())
} else if let Some(stored_room) = inner.storage.retrieve_room_by_name(&*room_id.0).await? {
// room exists, but was not loaded
log::debug!("Loading room {}...", &room_id.0);
let room = Room { let room = Room {
room_id: room_id.clone(), room_id: room_id.clone(),
subscriptions: HashMap::new(), subscriptions: HashMap::new(),
topic: "New room".into(), topic: stored_room.topic.into(),
message_count: stored_room.message_count,
}; };
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone()); inner.rooms.insert(room_id, room_handle.clone());
inner.metric_active_rooms.inc(); inner.metric_active_rooms.inc();
room_handle Ok(room_handle)
} else {
// room does not exist, create it and load
log::debug!("Creating room {}...", &room_id.0);
let topic = "New room";
let _ = inner.storage.create_new_room(&*room_id.0, &*topic).await?;
let room = Room {
room_id: room_id.clone(),
subscriptions: HashMap::new(),
topic: topic.into(),
message_count: 0,
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone());
inner.metric_active_rooms.inc();
Ok(room_handle)
} }
} }
pub fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> { pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
let inner = self.0.read().unwrap(); let inner = self.0.read().await;
let res = inner.rooms.get(room_id); let res = inner.rooms.get(room_id);
res.map(|r| r.clone()) res.map(|r| r.clone())
} }
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> { pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
let handles = { let handles = {
let inner = self.0.read().unwrap(); let inner = self.0.read().await;
let handles = inner.rooms.values().cloned().collect::<Vec<_>>(); let handles = inner.rooms.values().cloned().collect::<Vec<_>>();
handles handles
}; };
@ -93,6 +113,7 @@ impl RoomRegistry {
struct RoomRegistryInner { struct RoomRegistryInner {
rooms: HashMap<RoomId, RoomHandle>, rooms: HashMap<RoomId, RoomHandle>,
metric_active_rooms: IntGauge, metric_active_rooms: IntGauge,
storage: Storage,
} }
#[derive(Clone)] #[derive(Clone)]
@ -145,6 +166,7 @@ impl RoomHandle {
struct Room { struct Room {
room_id: RoomId, room_id: RoomId,
subscriptions: HashMap<PlayerId, PlayerHandle>, subscriptions: HashMap<PlayerId, PlayerHandle>,
message_count: u32,
topic: Str, topic: Str,
} }
impl Room { impl Room {

View File

@ -53,7 +53,7 @@ async fn main() -> Result<()> {
} = config; } = config;
let mut metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
let rooms = RoomRegistry::empty(&mut metrics)?; let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?;
let telemetry_terminator = let telemetry_terminator =
util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?; util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?;

View File

@ -311,7 +311,7 @@ async fn handle_update(
room_id, room_id,
} => { } => {
if player_id == &new_member_id { if player_id == &new_member_id {
if let Some(room) = rooms.get_room(&room_id) { if let Some(room) = rooms.get_room(&room_id).await {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
let chan = Chan::Global(room_id.as_inner().clone()); let chan = Chan::Global(room_id.as_inner().clone());
produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?; produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?;
@ -482,7 +482,7 @@ async fn handle_incoming_message(
writer.flush().await?; writer.flush().await?;
} }
Recipient::Chan(Chan::Global(chan)) => { Recipient::Chan(Chan::Global(chan)) => {
let room = rooms.get_room(&RoomId::from(chan.clone())?); let room = rooms.get_room(&RoomId::from(chan.clone())?).await;
if let Some(room) = room { if let Some(room) = room {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
for member in room_info.members { for member in room_info.members {

6
test/init_state.sql Normal file
View File

@ -0,0 +1,6 @@
insert into users(name)
values ('kek'), ('shrek')
returning id;
insert into challenges_plain_password(user_id, password)
values (1, 'parolchik1'), (2, 'qwerty123');