diff --git a/Cargo.lock b/Cargo.lock index a586d14..a909a9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1775,6 +1775,7 @@ dependencies = [ "atoi", "byteorder", "bytes", + "chrono", "crc", "crossbeam-queue", "either", @@ -1833,6 +1834,7 @@ dependencies = [ "sha2", "sqlx-core", "sqlx-mysql", + "sqlx-postgres", "sqlx-sqlite", "syn 1.0.109", "tempfile", @@ -1850,6 +1852,7 @@ dependencies = [ "bitflags 2.5.0", "byteorder", "bytes", + "chrono", "crc", "digest", "dotenvy", @@ -1891,6 +1894,7 @@ dependencies = [ "base64 0.21.7", "bitflags 2.5.0", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -1926,6 +1930,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa" dependencies = [ "atoi", + "chrono", "flume", "futures-channel", "futures-core", diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml index 92bf798..c49f83d 100644 --- a/crates/lavina-core/Cargo.toml +++ b/crates/lavina-core/Cargo.toml @@ -5,7 +5,7 @@ version.workspace = true [dependencies] anyhow.workspace = true -sqlx = { version = "0.7.4", features = ["sqlite", "migrate"] } +sqlx = { version = "0.7.4", features = ["sqlite", "migrate", "chrono"] } serde.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/lavina-core/migrations/3_dialogs.sql b/crates/lavina-core/migrations/3_dialogs.sql new file mode 100644 index 0000000..599b306 --- /dev/null +++ b/crates/lavina-core/migrations/3_dialogs.sql @@ -0,0 +1,17 @@ +create table dialogs( + id integer primary key autoincrement not null, + participant_1 integer not null, + participant_2 integer not null, + created_at timestamp not null, + message_count integer not null default 0, + unique (participant_1, participant_2) +); + +create table dialog_messages( + dialog_id integer not null, + id integer not null, -- unique per dialog, sequential in one dialog + author_id integer not null, + content string not null, + created_at timestamp not null, + primary key (dialog_id, id) +); diff --git a/crates/lavina-core/src/dialog.rs b/crates/lavina-core/src/dialog.rs new file mode 100644 index 0000000..66fe8b5 --- /dev/null +++ b/crates/lavina-core/src/dialog.rs @@ -0,0 +1,150 @@ +//! Domain of dialogs – conversations between two participants. +//! +//! Dialogs are different from rooms in that they are always between two participants. +//! There are no admins or other roles in dialogs, both participants have equal rights. + +use std::collections::HashMap; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use tokio::sync::RwLock as AsyncRwLock; + +use crate::player::{PlayerId, PlayerRegistry, Updates}; +use crate::prelude::*; +use crate::repo::Storage; + +/// Id of a conversation between two players. +/// +/// Dialogs are identified by the pair of participants' ids. The order of ids does not matter. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct DialogId(PlayerId, PlayerId); +impl DialogId { + pub fn new(a: PlayerId, b: PlayerId) -> DialogId { + if a.as_inner() < b.as_inner() { + DialogId(a, b) + } else { + DialogId(b, a) + } + } + + pub fn as_inner(&self) -> (&PlayerId, &PlayerId) { + (&self.0, &self.1) + } + + pub fn into_inner(self) -> (PlayerId, PlayerId) { + (self.0, self.1) + } +} + +struct Dialog { + storage_id: u32, + player_storage_id_1: u32, + player_storage_id_2: u32, + message_count: u32, +} + +struct DialogRegistryInner { + dialogs: HashMap>, + players: Option, + storage: Storage, +} + +#[derive(Clone)] +pub struct DialogRegistry(Arc>); + +impl DialogRegistry { + pub async fn send_message( + &self, + from: PlayerId, + to: PlayerId, + body: Str, + created_at: &DateTime, + ) -> Result<()> { + let mut guard = self.0.read().await; + let id = DialogId::new(from.clone(), to.clone()); + let dialog = guard.dialogs.get(&id); + if let Some(d) = dialog { + let mut d = d.write().await; + guard.storage.increment_dialog_message_count(d.storage_id).await?; + d.message_count += 1; + } else { + drop(guard); + let mut guard2 = self.0.write().await; + // double check in case concurrent access has loaded this dialog + if let Some(d) = guard2.dialogs.get(&id) { + let mut d = d.write().await; + guard2.storage.increment_dialog_message_count(d.storage_id).await?; + d.message_count += 1; + } else { + let (p1, p2) = id.as_inner(); + tracing::info!("Dialog {id:?} not found locally, trying to load from storage"); + let stored_dialog = match guard2.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? { + Some(t) => t, + None => { + tracing::info!("Dialog {id:?} does not exist, creating a new one in storage"); + guard2.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await? + } + }; + tracing::info!("Dialog {id:?} loaded"); + guard2.storage.increment_dialog_message_count(stored_dialog.id).await?; + let dialog = Dialog { + storage_id: stored_dialog.id, + player_storage_id_1: stored_dialog.participant_1, + player_storage_id_2: stored_dialog.participant_2, + message_count: stored_dialog.message_count + 1, + }; + guard2.dialogs.insert(id.clone(), AsyncRwLock::new(dialog)); + } + guard = guard2.downgrade(); + } + // TODO send message to the other player and persist it + let Some(players) = &guard.players else { + tracing::error!("No player registry present"); + return Ok(()); + }; + let Some(player) = players.get_player(&to).await else { + tracing::debug!("Player {to:?} not active, not sending message"); + return Ok(()); + }; + let update = Updates::NewDialogMessage { + sender: from.clone(), + receiver: to.clone(), + body: body.clone(), + created_at: created_at.clone(), + }; + player.update(update).await; + return Ok(()); + } +} + +impl DialogRegistry { + pub fn new(storage: Storage) -> DialogRegistry { + DialogRegistry(Arc::new(AsyncRwLock::new(DialogRegistryInner { + dialogs: HashMap::new(), + players: None, + storage, + }))) + } + + pub async fn set_players(&self, players: PlayerRegistry) { + let mut guard = self.0.write().await; + guard.players = Some(players); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dialog_id_new() { + let a = PlayerId::from("a").unwrap(); + let b = PlayerId::from("b").unwrap(); + let id1 = DialogId::new(a.clone(), b.clone()); + let id2 = DialogId::new(a.clone(), b.clone()); + // Dialog ids are invariant with respect to the order of participants + assert_eq!(id1, id2); + assert_eq!(id1.as_inner(), (&a, &b)); + assert_eq!(id2.as_inner(), (&a, &b)); + } +} diff --git a/crates/lavina-core/src/lib.rs b/crates/lavina-core/src/lib.rs index e611a01..1128c61 100644 --- a/crates/lavina-core/src/lib.rs +++ b/crates/lavina-core/src/lib.rs @@ -2,11 +2,13 @@ use anyhow::Result; use prometheus::Registry as MetricsRegistry; +use crate::dialog::DialogRegistry; use crate::player::PlayerRegistry; use crate::repo::Storage; use crate::room::RoomRegistry; pub mod auth; +pub mod dialog; pub mod player; pub mod prelude; pub mod repo; @@ -19,14 +21,21 @@ mod table; pub struct LavinaCore { pub players: PlayerRegistry, pub rooms: RoomRegistry, + pub dialogs: DialogRegistry, } impl LavinaCore { pub async fn new(mut metrics: MetricsRegistry, storage: Storage) -> Result { // TODO shutdown all services in reverse order on error let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; - let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?; - Ok(LavinaCore { players, rooms }) + let dialogs = DialogRegistry::new(storage.clone()); + let players = PlayerRegistry::empty(rooms.clone(), dialogs.clone(), storage.clone(), &mut metrics)?; + dialogs.set_players(players.clone()).await; + Ok(LavinaCore { + players, + rooms, + dialogs, + }) } pub async fn shutdown(mut self) -> Result<()> { diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 9925709..3a58812 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -16,6 +16,7 @@ use serde::Serialize; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; +use crate::dialog::DialogRegistry; use crate::prelude::*; use crate::repo::Storage; use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; @@ -104,6 +105,18 @@ impl PlayerConnection { self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } + + /// Handler in [Player::send_dialog_message]. + pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> { + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::SendDialogMessage { + recipient, + body, + promise, + }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) + } } /// Handle to a player actor. @@ -174,6 +187,11 @@ pub enum ClientCommand { GetRooms { promise: Promise>, }, + SendDialogMessage { + recipient: PlayerId, + body: Str, + promise: Promise<()>, + }, } pub enum JoinResult { @@ -210,6 +228,12 @@ pub enum Updates { }, /// The player was banned from the room and left it immediately. BannedFrom(RoomId), + NewDialogMessage { + sender: PlayerId, + receiver: PlayerId, + body: Str, + created_at: DateTime, + }, } /// Handle to a player registry — a shared data structure containing information about players. @@ -218,6 +242,7 @@ pub struct PlayerRegistry(Arc>); impl PlayerRegistry { pub fn empty( room_registry: RoomRegistry, + dialogs: DialogRegistry, storage: Storage, metrics: &mut MetricsRegistry, ) -> Result { @@ -225,6 +250,7 @@ impl PlayerRegistry { metrics.register(Box::new(metric_active_players.clone()))?; let inner = PlayerRegistryInner { room_registry, + dialogs, storage, players: HashMap::new(), metric_active_players, @@ -232,12 +258,23 @@ impl PlayerRegistry { Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) } + pub async fn get_player(&self, id: &PlayerId) -> Option { + let inner = self.0.read().await; + inner.players.get(id).map(|(handle, _)| handle.clone()) + } + pub async fn get_or_launch_player(&mut self, id: &PlayerId) -> PlayerHandle { let mut inner = self.0.write().await; if let Some((handle, _)) = inner.players.get(id) { handle.clone() } else { - let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.storage.clone()).await; + let (handle, fiber) = Player::launch( + id.clone(), + inner.room_registry.clone(), + inner.dialogs.clone(), + inner.storage.clone(), + ) + .await; inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.metric_active_players.inc(); handle @@ -265,6 +302,7 @@ impl PlayerRegistry { /// The player registry state representation. struct PlayerRegistryInner { room_registry: RoomRegistry, + dialogs: DialogRegistry, storage: Storage, /// Active player actors. players: HashMap)>, @@ -281,10 +319,16 @@ struct Player { rx: Receiver, handle: PlayerHandle, rooms: RoomRegistry, + dialogs: DialogRegistry, storage: Storage, } impl Player { - async fn launch(player_id: PlayerId, rooms: RoomRegistry, storage: Storage) -> (PlayerHandle, JoinHandle) { + async fn launch( + player_id: PlayerId, + rooms: RoomRegistry, + dialogs: DialogRegistry, + storage: Storage, + ) -> (PlayerHandle, JoinHandle) { let (tx, rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); @@ -301,6 +345,7 @@ impl Player { rx, handle, rooms, + dialogs, storage, }; let fiber = tokio::task::spawn(player.main_loop()); @@ -340,7 +385,7 @@ impl Player { /// Handle an incoming update by changing the internal state and broadcasting it to all connections if necessary. async fn handle_update(&mut self, update: Updates) { - log::info!( + log::debug!( "Player received an update, broadcasting to {} connections", self.connections.len() ); @@ -389,6 +434,14 @@ impl Player { let result = self.get_rooms().await; let _ = promise.send(result); } + ClientCommand::SendDialogMessage { + recipient, + body, + promise, + } => { + self.send_dialog_message(connection_id, recipient, body).await; + let _ = promise.send(()); + } } } @@ -467,6 +520,18 @@ impl Player { response } + async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) { + let created_at = chrono::Utc::now(); + self.dialogs.send_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at).await.unwrap(); + let update = Updates::NewDialogMessage { + sender: self.player_id.clone(), + receiver: recipient.clone(), + body, + created_at, + }; + self.broadcast_update(update, connection_id).await; + } + /// Broadcasts an update to all connections except the one with the given id. /// /// This is called after handling a client command. diff --git a/crates/lavina-core/src/repo/dialog.rs b/crates/lavina-core/src/repo/dialog.rs new file mode 100644 index 0000000..cbe3161 --- /dev/null +++ b/crates/lavina-core/src/repo/dialog.rs @@ -0,0 +1,68 @@ +use anyhow::Result; +use chrono::{DateTime, Utc}; +use sqlx::FromRow; + +use crate::repo::Storage; + +impl Storage { + pub async fn retrieve_dialog(&self, participant_1: &str, participant_2: &str) -> Result> { + let mut executor = self.conn.lock().await; + let res = sqlx::query_as( + "select r.id, r.participant_1, r.participant_2, r.message_count + from dialogs r join users u1 on r.participant_1 = u1.id join users u2 on r.participant_2 = u2.id + where u1.name = ? and u2.name = ?;", + ) + .bind(participant_1) + .bind(participant_2) + .fetch_optional(&mut *executor) + .await?; + + Ok(res) + } + + pub async fn increment_dialog_message_count(&self, storage_id: u32) -> Result<()> { + let mut executor = self.conn.lock().await; + sqlx::query( + "update rooms set message_count = message_count + 1 + where id = ?;", + ) + .bind(storage_id) + .execute(&mut *executor) + .await?; + + Ok(()) + } + + pub async fn initialize_dialog( + &self, + participant_1: &str, + participant_2: &str, + created_at: &DateTime, + ) -> Result { + let mut executor = self.conn.lock().await; + let res: StoredDialog = sqlx::query_as( + "insert into dialogs(participant_1, participant_2, created_at) + values ( + (select id from users where name = ?), + (select id from users where name = ?), + ? + ) + returning id, participant_1, participant_2, message_count;", + ) + .bind(participant_1) + .bind(participant_2) + .bind(&created_at) + .fetch_one(&mut *executor) + .await?; + + Ok(res) + } +} + +#[derive(FromRow)] +pub struct StoredDialog { + pub id: u32, + pub participant_1: u32, + pub participant_2: u32, + pub message_count: u32, +} diff --git a/crates/lavina-core/src/repo/mod.rs b/crates/lavina-core/src/repo/mod.rs index 714b8cd..dfa93c6 100644 --- a/crates/lavina-core/src/repo/mod.rs +++ b/crates/lavina-core/src/repo/mod.rs @@ -12,6 +12,7 @@ use tokio::sync::Mutex; use crate::prelude::*; +mod dialog; mod room; mod user; diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 278d456..342682a 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -643,6 +643,32 @@ async fn handle_update( .await?; writer.flush().await? } + Updates::NewDialogMessage { + sender, + receiver, + body, + created_at, + } => { + let mut tags = vec![]; + if user.enabled_capabilities.contains(Capabilities::ServerTime) { + let tag = Tag { + key: "time".into(), + value: Some(created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()), + }; + tags.push(tag); + } + ServerMessage { + tags, + sender: Some(sender.as_inner().clone()), + body: ServerMessageBody::PrivateMessage { + target: Recipient::Nick(receiver.as_inner().clone()), + body: body.clone(), + }, + } + .write_async(writer) + .await?; + writer.flush().await? + } } Ok(()) } @@ -689,6 +715,10 @@ async fn handle_incoming_message( let room_id = RoomId::from(chan)?; user_handle.send_message(room_id, body).await?; } + Recipient::Nick(nick) => { + let receiver = PlayerId::from(nick)?; + user_handle.send_dialog_message(receiver, body).await?; + } _ => log::warn!("Unsupported target type"), }, ClientMessage::Topic { chan, topic } => { diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs index 5a4eb7c..6a90c46 100644 --- a/crates/projection-irc/tests/lib.rs +++ b/crates/projection-irc/tests/lib.rs @@ -635,3 +635,63 @@ async fn server_time_capability() -> Result<()> { server.server.terminate().await?; Ok(()) } + +#[tokio::test] +async fn scenario_two_players_dialog() -> Result<()> { + let mut server = TestServer::start().await?; + + // test scenario + + server.storage.create_user("tester1").await?; + server.storage.set_password("tester1", "password").await?; + server.storage.create_user("tester2").await?; + server.storage.set_password("tester2", "password").await?; + + let mut stream1 = TcpStream::connect(server.server.addr).await?; + let mut s1 = TestScope::new(&mut stream1); + + let mut stream2 = TcpStream::connect(server.server.addr).await?; + let mut s2 = TestScope::new(&mut stream2); + + s1.send("CAP LS 302").await?; + s1.send("NICK tester1").await?; + s1.send("USER UserName 0 * :Real Name").await?; + s1.expect_cap_ls().await?; + s1.send("CAP REQ :sasl").await?; + s1.expect(":testserver CAP tester1 ACK :sasl").await?; + s1.send("AUTHENTICATE PLAIN").await?; + s1.expect(":testserver AUTHENTICATE +").await?; + s1.send("AUTHENTICATE dGVzdGVyMQB0ZXN0ZXIxAHBhc3N3b3Jk").await?; // base64-encoded 'tester1\x00tester1\x00password' + s1.expect(":testserver 900 tester1 tester1 tester1 :You are now logged in as tester1").await?; + s1.expect(":testserver 903 tester1 :SASL authentication successful").await?; + s1.send("CAP END").await?; + s1.expect_server_introduction("tester1").await?; + s1.expect_nothing().await?; + + s2.send("CAP LS 302").await?; + s2.send("NICK tester2").await?; + s2.send("USER UserName 0 * :Real Name").await?; + s2.expect_cap_ls().await?; + s2.send("CAP REQ :sasl").await?; + s2.expect(":testserver CAP tester2 ACK :sasl").await?; + s2.send("AUTHENTICATE PLAIN").await?; + s2.expect(":testserver AUTHENTICATE +").await?; + s2.send("AUTHENTICATE dGVzdGVyMgB0ZXN0ZXIyAHBhc3N3b3Jk").await?; // base64-encoded 'tester2\x00tester2\x00password' + s2.expect(":testserver 900 tester2 tester2 tester2 :You are now logged in as tester2").await?; + s2.expect(":testserver 903 tester2 :SASL authentication successful").await?; + s2.send("CAP END").await?; + s2.expect_server_introduction("tester2").await?; + s2.expect_nothing().await?; + + s1.send("PRIVMSG tester2 :Henlo! How are you?").await?; + s1.expect_nothing().await?; + s2.expect(":tester1 PRIVMSG tester2 :Henlo! How are you?").await?; + s2.expect_nothing().await?; + + s2.send("PRIVMSG tester1 good").await?; + s2.expect_nothing().await?; + s1.expect(":tester2 PRIVMSG tester1 :good").await?; + s1.expect_nothing().await?; + + Ok(()) +} diff --git a/crates/projection-xmpp/src/message.rs b/crates/projection-xmpp/src/message.rs index a737b2b..15a3e0d 100644 --- a/crates/projection-xmpp/src/message.rs +++ b/crates/projection-xmpp/src/message.rs @@ -1,5 +1,6 @@ //! Handling of all client2server message stanzas +use lavina_core::player::PlayerId; use quick_xml::events::Event; use lavina_core::prelude::*; @@ -40,6 +41,9 @@ impl<'a> XmppConnection<'a> { } .serialize(output); Ok(()) + } else if server.0.as_ref() == &*self.hostname && m.r#type == MessageType::Chat { + self.user_handle.send_dialog_message(PlayerId::from(name.0.clone())?, m.body.clone()).await?; + Ok(()) } else { todo!() } diff --git a/crates/projection-xmpp/src/updates.rs b/crates/projection-xmpp/src/updates.rs index fcc62b6..d659467 100644 --- a/crates/projection-xmpp/src/updates.rs +++ b/crates/projection-xmpp/src/updates.rs @@ -39,6 +39,34 @@ impl<'a> XmppConnection<'a> { } .serialize(output); } + Updates::NewDialogMessage { + sender, + receiver, + body, + created_at: _, + } => { + if receiver == self.user.player_id { + Message::<()> { + to: Some(Jid { + name: Some(self.user.xmpp_name.clone()), + server: Server(self.hostname.clone()), + resource: Some(self.user.xmpp_resource.clone()), + }), + from: Some(Jid { + name: Some(Name(sender.as_inner().clone())), + server: Server(self.hostname.clone()), + resource: Some(Resource(sender.into_inner())), + }), + id: None, + r#type: MessageType::Chat, + lang: None, + subject: None, + body: body.into(), + custom: vec![], + } + .serialize(output); + } + } _ => {} } Ok(())