diff --git a/crates/lavina-core/migrations/3_dialogs.sql b/crates/lavina-core/migrations/3_dialogs.sql index 799628e..599b306 100644 --- a/crates/lavina-core/migrations/3_dialogs.sql +++ b/crates/lavina-core/migrations/3_dialogs.sql @@ -1,14 +1,17 @@ create table dialogs( + id integer primary key autoincrement not null, participant_1 integer not null, participant_2 integer not null, created_at timestamp not null, message_count integer not null default 0, - primary key (participant_1, participant_2) + unique (participant_1, participant_2) ); create table dialog_messages( dialog_id integer not null, id integer not null, -- unique per dialog, sequential in one dialog + author_id integer not null, content string not null, + created_at timestamp not null, primary key (dialog_id, id) ); diff --git a/crates/lavina-core/src/dialog.rs b/crates/lavina-core/src/dialog.rs index 35157ea..ab4a434 100644 --- a/crates/lavina-core/src/dialog.rs +++ b/crates/lavina-core/src/dialog.rs @@ -8,7 +8,8 @@ use std::sync::Arc; use tokio::sync::RwLock as AsyncRwLock; -use crate::player::PlayerId; +use crate::player::{PlayerId, PlayerRegistry, Updates}; +use crate::prelude::*; use crate::repo::Storage; /// Id of a conversation between two players. @@ -36,25 +37,84 @@ impl DialogId { struct Dialog { storage_id: u32, - id: DialogId, + player_storage_id_1: u32, + player_storage_id_2: u32, message_count: u32, } struct DialogRegistryInner { dialogs: HashMap>, + players: Option, storage: Storage, } #[derive(Clone)] pub struct DialogRegistry(Arc>); +impl DialogRegistry { + pub async fn send_message(&self, from: PlayerId, to: PlayerId, body: Str) -> Result<()> { + let guard = self.0.read().await; + let id = DialogId::new(from.clone(), to.clone()); + let dialog = guard.dialogs.get(&id); + if let Some(d) = dialog { + let mut d = d.write().await; + guard.storage.increment_dialog_message_count(d.storage_id).await?; + d.message_count += 1; + let Some(players) = &guard.players else { + tracing::error!("No player registry present"); + return Ok(()); + }; + let Some(player) = players.get_player(&to).await else { + tracing::debug!("Player {to:?} not active, not sending message"); + return Ok(()); + }; + let update = Updates::NewDialogMessage { + sender: from.clone(), + receiver: to.clone(), + body: body.clone(), + created_at: chrono::Utc::now(), // todo + }; + player.update(update).await; + return Ok(()); + } + drop(guard); + let mut guard = self.0.write().await; + let (p1, p2) = id.as_inner(); + tracing::info!("Dialog {id:?} not found locally, trying to load from storage"); + let stored_dialog = match guard.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? { + Some(t) => t, + None => { + tracing::info!("Dialog {id:?} does not exist, creating a new one in storage"); + guard.storage.initialize_dialog(p1.as_inner(), p2.as_inner()).await? + } + }; + tracing::info!("Dialog {id:?} loaded"); + guard.storage.increment_dialog_message_count(stored_dialog.storage_id).await?; + let dialog = Dialog { + storage_id: stored_dialog.storage_id, + player_storage_id_1: stored_dialog.participant_1, + player_storage_id_2: stored_dialog.participant_2, + message_count: stored_dialog.message_count + 1, + }; + guard.dialogs.insert(id.clone(), AsyncRwLock::new(dialog)); + // TODO send message to the other player and persist it + return Ok(()); + } +} + impl DialogRegistry { pub fn new(storage: Storage) -> DialogRegistry { DialogRegistry(Arc::new(AsyncRwLock::new(DialogRegistryInner { dialogs: HashMap::new(), + players: None, storage, }))) } + + pub async fn set_players(&self, players: PlayerRegistry) { + let mut guard = self.0.write().await; + guard.players = Some(players); + } } #[cfg(test)] diff --git a/crates/lavina-core/src/lib.rs b/crates/lavina-core/src/lib.rs index 1e5b861..c33f5b1 100644 --- a/crates/lavina-core/src/lib.rs +++ b/crates/lavina-core/src/lib.rs @@ -27,8 +27,9 @@ impl LavinaCore { pub async fn new(mut metrics: MetricsRegistry, storage: Storage) -> Result { // TODO shutdown all services in reverse order on error let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; - let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?; let dialogs = DialogRegistry::new(storage.clone()); + let players = PlayerRegistry::empty(rooms.clone(), dialogs.clone(), storage.clone(), &mut metrics)?; + dialogs.set_players(players.clone()).await; Ok(LavinaCore { players, rooms, diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 9925709..b2265fe 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -16,6 +16,7 @@ use serde::Serialize; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; +use crate::dialog::DialogRegistry; use crate::prelude::*; use crate::repo::Storage; use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; @@ -174,6 +175,11 @@ pub enum ClientCommand { GetRooms { promise: Promise>, }, + SendDialogMessage { + recipient: PlayerId, + body: Str, + promise: Promise<()>, + }, } pub enum JoinResult { @@ -210,6 +216,12 @@ pub enum Updates { }, /// The player was banned from the room and left it immediately. BannedFrom(RoomId), + NewDialogMessage { + sender: PlayerId, + receiver: PlayerId, + body: Str, + created_at: DateTime, + }, } /// Handle to a player registry — a shared data structure containing information about players. @@ -218,6 +230,7 @@ pub struct PlayerRegistry(Arc>); impl PlayerRegistry { pub fn empty( room_registry: RoomRegistry, + dialogs: DialogRegistry, storage: Storage, metrics: &mut MetricsRegistry, ) -> Result { @@ -225,19 +238,25 @@ impl PlayerRegistry { metrics.register(Box::new(metric_active_players.clone()))?; let inner = PlayerRegistryInner { room_registry, + dialogs, storage, players: HashMap::new(), metric_active_players, }; Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) } + + pub async fn get_player(&self, id: &PlayerId) -> Option { + let inner = self.0.read().await; + inner.players.get(id).map(|(handle, _)| handle.clone()) + } pub async fn get_or_launch_player(&mut self, id: &PlayerId) -> PlayerHandle { let mut inner = self.0.write().await; if let Some((handle, _)) = inner.players.get(id) { handle.clone() } else { - let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.storage.clone()).await; + let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.dialogs.clone(), inner.storage.clone()).await; inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.metric_active_players.inc(); handle @@ -265,6 +284,7 @@ impl PlayerRegistry { /// The player registry state representation. struct PlayerRegistryInner { room_registry: RoomRegistry, + dialogs: DialogRegistry, storage: Storage, /// Active player actors. players: HashMap)>, @@ -281,10 +301,11 @@ struct Player { rx: Receiver, handle: PlayerHandle, rooms: RoomRegistry, + dialogs: DialogRegistry, storage: Storage, } impl Player { - async fn launch(player_id: PlayerId, rooms: RoomRegistry, storage: Storage) -> (PlayerHandle, JoinHandle) { + async fn launch(player_id: PlayerId, rooms: RoomRegistry, dialogs: DialogRegistry, storage: Storage) -> (PlayerHandle, JoinHandle) { let (tx, rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); @@ -301,6 +322,7 @@ impl Player { rx, handle, rooms, + dialogs, storage, }; let fiber = tokio::task::spawn(player.main_loop()); @@ -389,6 +411,14 @@ impl Player { let result = self.get_rooms().await; let _ = promise.send(result); } + ClientCommand::SendDialogMessage { + recipient, + body, + promise, + } => { + self.send_dialog_message(connection_id, recipient, body).await; + let _ = promise.send(()); + } } } @@ -467,6 +497,18 @@ impl Player { response } + async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) { + let created_at = chrono::Utc::now(); + self.dialogs.send_message(self.player_id.clone(), recipient.clone(), body.clone()).await.unwrap(); + let update = Updates::NewDialogMessage { + sender: self.player_id.clone(), + receiver: recipient.clone(), + body, + created_at, + }; + self.broadcast_update(update, connection_id).await; + } + /// Broadcasts an update to all connections except the one with the given id. /// /// This is called after handling a client command. diff --git a/crates/lavina-core/src/repo/dialog.rs b/crates/lavina-core/src/repo/dialog.rs new file mode 100644 index 0000000..674c9a3 --- /dev/null +++ b/crates/lavina-core/src/repo/dialog.rs @@ -0,0 +1,61 @@ +use anyhow::Result; +use sqlx::FromRow; + +use crate::repo::Storage; + +impl Storage { + pub async fn retrieve_dialog(&self, participant_1: &str, participant_2: &str) -> Result> { + let mut executor = self.conn.lock().await; + let res = sqlx::query_as( + "select r.participant_1, r.participant_2, r.message_count + from rooms r join users u1 on r.participant_1 = u1.id join users u2 on r.participant_2 = u2.id + where u1.name = ? and u2.name = ?;", + ) + .bind(participant_1) + .bind(participant_2) + .fetch_optional(&mut *executor) + .await?; + + Ok(res) + } + + pub async fn increment_dialog_message_count(&self, storage_id: u32) -> Result<()> { + let mut executor = self.conn.lock().await; + sqlx::query( + "update rooms set message_count = message_count + 1 + where storage_id = ?;", + ) + .bind(storage_id) + .execute(&mut *executor) + .await?; + + Ok(()) + } + + pub async fn initialize_dialog(&self, participant_1: &str, participant_2: &str) -> Result { + let mut executor = self.conn.lock().await; + let res: StoredDialog = sqlx::query_as( + "insert into dialogs(participant_1, participant_2, created_at) + values ( + select id from users where name = ?, + select id from users where name = ?, + now() + ) + returning id, participant_1, participant_2, message_count;", + ) + .bind(participant_1) + .bind(participant_2) + .fetch_one(&mut *executor) + .await?; + + Ok(res) + } +} + +#[derive(FromRow)] +pub struct StoredDialog { + pub storage_id: u32, + pub participant_1: u32, + pub participant_2: u32, + pub message_count: u32, +} diff --git a/crates/lavina-core/src/repo/mod.rs b/crates/lavina-core/src/repo/mod.rs index 645c764..be574c5 100644 --- a/crates/lavina-core/src/repo/mod.rs +++ b/crates/lavina-core/src/repo/mod.rs @@ -12,6 +12,7 @@ use tokio::sync::Mutex; use crate::prelude::*; +mod dialog; mod room; mod user;