diff --git a/migrations/0_first.sql b/migrations/0_first.sql index 6855fed..765b91d 100644 --- a/migrations/0_first.sql +++ b/migrations/0_first.sql @@ -11,5 +11,21 @@ create table challenges_plain_password( create table rooms( id integer primary key autoincrement not null, - name string unique not null + name string unique not null, + topic string not null, + message_count integer not null default 0 +); + +create table messages( + room_id integer not null, + id integer not null, -- unique per room, sequential in one room + content string not null, + primary key (room_id, id) +); + +create table memberships( + user_id integer not null, + room_id integer not null, + status integer not null, -- 0 for not-joined, 1 for joined, 2 for banned + primary key (user_id, room_id) ); diff --git a/src/core/player.rs b/src/core/player.rs index 768d018..688bba5 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -354,7 +354,13 @@ impl Player { return; } - let room = self.rooms.get_or_create_room(room_id.clone()); + let room = match self.rooms.get_or_create_room(room_id.clone()).await { + Ok(room) => room, + Err(e) => { + log::error!("Failed to get or create room: {e}"); + return; + } + }; room.subscribe(self.player_id.clone(), self.handle.clone()) .await; self.my_rooms.insert(room_id.clone(), room.clone()); @@ -384,7 +390,7 @@ impl Player { body, promise, } => { - let room = self.rooms.get_room(&room_id); + let room = self.rooms.get_room(&room_id).await; if let Some(room) = room { room.send_message(self.player_id.clone(), body.clone()) .await; @@ -404,7 +410,7 @@ impl Player { new_topic, promise, } => { - let room = self.rooms.get_room(&room_id); + let room = self.rooms.get_room(&room_id).await; if let Some(mut room) = room { room.set_topic(self.player_id.clone(), new_topic.clone()) .await; diff --git a/src/core/repo/mod.rs b/src/core/repo/mod.rs index 85c5e72..48eb652 100644 --- a/src/core/repo/mod.rs +++ b/src/core/repo/mod.rs @@ -47,10 +47,39 @@ impl Storage { Ok(res) } + pub async fn retrieve_room_by_name(&mut self, name: &str) -> Result> { + let mut executor = self.conn.lock().await; + let res = sqlx::query_as( + "select id, name, topic, message_count + from rooms + where name = ?;", + ) + .bind(name) + .fetch_optional(&mut *executor) + .await?; + + Ok(res) + } + + pub async fn create_new_room(&mut self, name: &str, topic: &str) -> Result<()> { + let mut executor = self.conn.lock().await; + let _: (u32,) = sqlx::query_as( + "insert into rooms(name, topic) + values (?, ?) + returning id;", + ) + .bind(name) + .bind(topic) + .fetch_one(&mut *executor) + .await?; + + Ok(()) + } + pub async fn close(mut self) -> Result<()> { let res = match Arc::try_unwrap(self.conn) { Ok(e) => e, - Err(_) => return Err(fail("failed to acquire DB ownership on shutdown")), + Err(e) => return Err(fail("failed to acquire DB ownership on shutdown")), }; let res = res.into_inner(); res.close().await?; @@ -64,3 +93,11 @@ pub struct StoredUser { pub name: String, pub password: Option, } + +#[derive(FromRow)] +pub struct StoredRoom { + pub id: u32, + pub name: String, + pub topic: String, + pub message_count: u32, +} diff --git a/src/core/room.rs b/src/core/room.rs index 0a4e19c..b658be2 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -2,17 +2,16 @@ use std::{ collections::HashMap, hash::Hash, - sync::{Arc, RwLock}, + sync::Arc, }; use prometheus::{IntGauge, Registry as MetricRegistry}; use serde::Serialize; use tokio::sync::RwLock as AsyncRwLock; -use crate::{ - core::player::{PlayerHandle, PlayerId, Updates}, - prelude::*, -}; +use crate::core::player::{PlayerHandle, PlayerId, Updates}; +use crate::core::repo::Storage; +use crate::prelude::*; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -40,45 +39,66 @@ impl RoomId { /// Shared datastructure for storing metadata about rooms. #[derive(Clone)] -pub struct RoomRegistry(Arc>); +pub struct RoomRegistry(Arc>); impl RoomRegistry { - pub fn empty(metrics: &mut MetricRegistry) -> Result { + pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result { let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; let inner = RoomRegistryInner { rooms: HashMap::new(), metric_active_rooms, + storage, }; - Ok(RoomRegistry(Arc::new(RwLock::new(inner)))) + Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner)))) } - pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle { - let mut inner = self.0.write().unwrap(); + pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result { + let mut inner = self.0.write().await; if let Some(room_handle) = inner.rooms.get(&room_id) { - room_handle.clone() - } else { + // room was already loaded into memory + log::debug!("Room {} was loaded already", &room_id.0); + Ok(room_handle.clone()) + } else if let Some(stored_room) = inner.storage.retrieve_room_by_name(&*room_id.0).await? { + // room exists, but was not loaded + log::debug!("Loading room {}...", &room_id.0); let room = Room { room_id: room_id.clone(), subscriptions: HashMap::new(), - topic: "New room".into(), + topic: stored_room.topic.into(), + message_count: stored_room.message_count, }; let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); inner.rooms.insert(room_id, room_handle.clone()); inner.metric_active_rooms.inc(); - room_handle + Ok(room_handle) + } else { + // room does not exist, create it and load + log::debug!("Creating room {}...", &room_id.0); + let topic = "New room"; + let _ = inner.storage.create_new_room(&*room_id.0, &*topic).await?; + let room = Room { + room_id: room_id.clone(), + subscriptions: HashMap::new(), + topic: topic.into(), + message_count: 0, + }; + let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); + inner.rooms.insert(room_id, room_handle.clone()); + inner.metric_active_rooms.inc(); + Ok(room_handle) } } - pub fn get_room(&self, room_id: &RoomId) -> Option { - let inner = self.0.read().unwrap(); + pub async fn get_room(&self, room_id: &RoomId) -> Option { + let inner = self.0.read().await; let res = inner.rooms.get(room_id); res.map(|r| r.clone()) } pub async fn get_all_rooms(&self) -> Vec { let handles = { - let inner = self.0.read().unwrap(); + let inner = self.0.read().await; let handles = inner.rooms.values().cloned().collect::>(); handles }; @@ -93,6 +113,7 @@ impl RoomRegistry { struct RoomRegistryInner { rooms: HashMap, metric_active_rooms: IntGauge, + storage: Storage, } #[derive(Clone)] @@ -145,6 +166,7 @@ impl RoomHandle { struct Room { room_id: RoomId, subscriptions: HashMap, + message_count: u32, topic: Str, } impl Room { diff --git a/src/main.rs b/src/main.rs index 93a170c..2353c50 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,7 +53,7 @@ async fn main() -> Result<()> { } = config; let mut metrics = MetricsRegistry::new(); let storage = Storage::open(storage_config).await?; - let rooms = RoomRegistry::empty(&mut metrics)?; + let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?; diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index e80a562..6c36716 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -311,7 +311,7 @@ async fn handle_update( room_id, } => { if player_id == &new_member_id { - if let Some(room) = rooms.get_room(&room_id) { + if let Some(room) = rooms.get_room(&room_id).await { let room_info = room.get_room_info().await; let chan = Chan::Global(room_id.as_inner().clone()); produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?; @@ -482,7 +482,7 @@ async fn handle_incoming_message( writer.flush().await?; } Recipient::Chan(Chan::Global(chan)) => { - let room = rooms.get_room(&RoomId::from(chan.clone())?); + let room = rooms.get_room(&RoomId::from(chan.clone())?).await; if let Some(room) = room { let room_info = room.get_room_info().await; for member in room_info.members { diff --git a/test/init_state.sql b/test/init_state.sql new file mode 100644 index 0000000..8555332 --- /dev/null +++ b/test/init_state.sql @@ -0,0 +1,6 @@ +insert into users(name) +values ('kek'), ('shrek') +returning id; + +insert into challenges_plain_password(user_id, password) +values (1, 'parolchik1'), (2, 'qwerty123');