diff --git a/Cargo.lock b/Cargo.lock index a586d14..a909a9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1775,6 +1775,7 @@ dependencies = [ "atoi", "byteorder", "bytes", + "chrono", "crc", "crossbeam-queue", "either", @@ -1833,6 +1834,7 @@ dependencies = [ "sha2", "sqlx-core", "sqlx-mysql", + "sqlx-postgres", "sqlx-sqlite", "syn 1.0.109", "tempfile", @@ -1850,6 +1852,7 @@ dependencies = [ "bitflags 2.5.0", "byteorder", "bytes", + "chrono", "crc", "digest", "dotenvy", @@ -1891,6 +1894,7 @@ dependencies = [ "base64 0.21.7", "bitflags 2.5.0", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -1926,6 +1930,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa" dependencies = [ "atoi", + "chrono", "flume", "futures-channel", "futures-core", diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml index 92bf798..c49f83d 100644 --- a/crates/lavina-core/Cargo.toml +++ b/crates/lavina-core/Cargo.toml @@ -5,7 +5,7 @@ version.workspace = true [dependencies] anyhow.workspace = true -sqlx = { version = "0.7.4", features = ["sqlite", "migrate"] } +sqlx = { version = "0.7.4", features = ["sqlite", "migrate", "chrono"] } serde.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/lavina-core/src/dialog.rs b/crates/lavina-core/src/dialog.rs index ab4a434..eae3915 100644 --- a/crates/lavina-core/src/dialog.rs +++ b/crates/lavina-core/src/dialog.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use std::sync::Arc; +use chrono::{DateTime, Utc}; use tokio::sync::RwLock as AsyncRwLock; use crate::player::{PlayerId, PlayerRegistry, Updates}; @@ -52,7 +53,13 @@ struct DialogRegistryInner { pub struct DialogRegistry(Arc>); impl DialogRegistry { - pub async fn send_message(&self, from: PlayerId, to: PlayerId, body: Str) -> Result<()> { + pub async fn send_message( + &self, + from: PlayerId, + to: PlayerId, + body: Str, + created_at: &DateTime, + ) -> Result<()> { let guard = self.0.read().await; let id = DialogId::new(from.clone(), to.clone()); let dialog = guard.dialogs.get(&id); @@ -85,13 +92,13 @@ impl DialogRegistry { Some(t) => t, None => { tracing::info!("Dialog {id:?} does not exist, creating a new one in storage"); - guard.storage.initialize_dialog(p1.as_inner(), p2.as_inner()).await? + guard.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await? } }; tracing::info!("Dialog {id:?} loaded"); - guard.storage.increment_dialog_message_count(stored_dialog.storage_id).await?; + guard.storage.increment_dialog_message_count(stored_dialog.id).await?; let dialog = Dialog { - storage_id: stored_dialog.storage_id, + storage_id: stored_dialog.id, player_storage_id_1: stored_dialog.participant_1, player_storage_id_2: stored_dialog.participant_2, message_count: stored_dialog.message_count + 1, diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index b2265fe..5696f08 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -105,6 +105,17 @@ impl PlayerConnection { self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } + + pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> { + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::SendDialogMessage { + recipient, + body, + promise, + }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) + } } /// Handle to a player actor. @@ -245,7 +256,7 @@ impl PlayerRegistry { }; Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) } - + pub async fn get_player(&self, id: &PlayerId) -> Option { let inner = self.0.read().await; inner.players.get(id).map(|(handle, _)| handle.clone()) @@ -256,7 +267,13 @@ impl PlayerRegistry { if let Some((handle, _)) = inner.players.get(id) { handle.clone() } else { - let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.dialogs.clone(), inner.storage.clone()).await; + let (handle, fiber) = Player::launch( + id.clone(), + inner.room_registry.clone(), + inner.dialogs.clone(), + inner.storage.clone(), + ) + .await; inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.metric_active_players.inc(); handle @@ -305,7 +322,12 @@ struct Player { storage: Storage, } impl Player { - async fn launch(player_id: PlayerId, rooms: RoomRegistry, dialogs: DialogRegistry, storage: Storage) -> (PlayerHandle, JoinHandle) { + async fn launch( + player_id: PlayerId, + rooms: RoomRegistry, + dialogs: DialogRegistry, + storage: Storage, + ) -> (PlayerHandle, JoinHandle) { let (tx, rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); @@ -499,7 +521,7 @@ impl Player { async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) { let created_at = chrono::Utc::now(); - self.dialogs.send_message(self.player_id.clone(), recipient.clone(), body.clone()).await.unwrap(); + self.dialogs.send_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at).await.unwrap(); let update = Updates::NewDialogMessage { sender: self.player_id.clone(), receiver: recipient.clone(), diff --git a/crates/lavina-core/src/repo/dialog.rs b/crates/lavina-core/src/repo/dialog.rs index 674c9a3..cbe3161 100644 --- a/crates/lavina-core/src/repo/dialog.rs +++ b/crates/lavina-core/src/repo/dialog.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use chrono::{DateTime, Utc}; use sqlx::FromRow; use crate::repo::Storage; @@ -7,8 +8,8 @@ impl Storage { pub async fn retrieve_dialog(&self, participant_1: &str, participant_2: &str) -> Result> { let mut executor = self.conn.lock().await; let res = sqlx::query_as( - "select r.participant_1, r.participant_2, r.message_count - from rooms r join users u1 on r.participant_1 = u1.id join users u2 on r.participant_2 = u2.id + "select r.id, r.participant_1, r.participant_2, r.message_count + from dialogs r join users u1 on r.participant_1 = u1.id join users u2 on r.participant_2 = u2.id where u1.name = ? and u2.name = ?;", ) .bind(participant_1) @@ -23,7 +24,7 @@ impl Storage { let mut executor = self.conn.lock().await; sqlx::query( "update rooms set message_count = message_count + 1 - where storage_id = ?;", + where id = ?;", ) .bind(storage_id) .execute(&mut *executor) @@ -32,19 +33,25 @@ impl Storage { Ok(()) } - pub async fn initialize_dialog(&self, participant_1: &str, participant_2: &str) -> Result { + pub async fn initialize_dialog( + &self, + participant_1: &str, + participant_2: &str, + created_at: &DateTime, + ) -> Result { let mut executor = self.conn.lock().await; let res: StoredDialog = sqlx::query_as( "insert into dialogs(participant_1, participant_2, created_at) values ( - select id from users where name = ?, - select id from users where name = ?, - now() + (select id from users where name = ?), + (select id from users where name = ?), + ? ) returning id, participant_1, participant_2, message_count;", ) .bind(participant_1) .bind(participant_2) + .bind(&created_at) .fetch_one(&mut *executor) .await?; @@ -54,7 +61,7 @@ impl Storage { #[derive(FromRow)] pub struct StoredDialog { - pub storage_id: u32, + pub id: u32, pub participant_1: u32, pub participant_2: u32, pub message_count: u32, diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index ce450c1..482c095 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -653,6 +653,32 @@ async fn handle_update( .await?; writer.flush().await? } + Updates::NewDialogMessage { + sender, + receiver, + body, + created_at, + } => { + let mut tags = vec![]; + if user.enabled_capabilities.contains(Capabilities::ServerTime) { + let tag = Tag { + key: "time".into(), + value: Some(created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()), + }; + tags.push(tag); + } + ServerMessage { + tags, + sender: Some(sender.as_inner().clone()), + body: ServerMessageBody::PrivateMessage { + target: Recipient::Nick(receiver.as_inner().clone()), + body: body.clone(), + }, + } + .write_async(writer) + .await?; + writer.flush().await? + } } Ok(()) } @@ -699,6 +725,10 @@ async fn handle_incoming_message( let room_id = RoomId::from(chan)?; user_handle.send_message(room_id, body).await?; } + Recipient::Nick(nick) => { + let receiver = PlayerId::from(nick)?; + user_handle.send_dialog_message(receiver, body).await?; + } _ => log::warn!("Unsupported target type"), }, ClientMessage::Topic { chan, topic } => { diff --git a/crates/projection-xmpp/src/message.rs b/crates/projection-xmpp/src/message.rs index a737b2b..15a3e0d 100644 --- a/crates/projection-xmpp/src/message.rs +++ b/crates/projection-xmpp/src/message.rs @@ -1,5 +1,6 @@ //! Handling of all client2server message stanzas +use lavina_core::player::PlayerId; use quick_xml::events::Event; use lavina_core::prelude::*; @@ -40,6 +41,9 @@ impl<'a> XmppConnection<'a> { } .serialize(output); Ok(()) + } else if server.0.as_ref() == &*self.hostname && m.r#type == MessageType::Chat { + self.user_handle.send_dialog_message(PlayerId::from(name.0.clone())?, m.body.clone()).await?; + Ok(()) } else { todo!() } diff --git a/crates/projection-xmpp/src/updates.rs b/crates/projection-xmpp/src/updates.rs index fcc62b6..d659467 100644 --- a/crates/projection-xmpp/src/updates.rs +++ b/crates/projection-xmpp/src/updates.rs @@ -39,6 +39,34 @@ impl<'a> XmppConnection<'a> { } .serialize(output); } + Updates::NewDialogMessage { + sender, + receiver, + body, + created_at: _, + } => { + if receiver == self.user.player_id { + Message::<()> { + to: Some(Jid { + name: Some(self.user.xmpp_name.clone()), + server: Server(self.hostname.clone()), + resource: Some(self.user.xmpp_resource.clone()), + }), + from: Some(Jid { + name: Some(Name(sender.as_inner().clone())), + server: Server(self.hostname.clone()), + resource: Some(Resource(sender.into_inner())), + }), + id: None, + r#type: MessageType::Chat, + lang: None, + subject: None, + body: body.into(), + custom: vec![], + } + .serialize(output); + } + } _ => {} } Ok(())