From ea81ddadfc72ecbc01ed25c24881c64037933e81 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Sat, 27 Apr 2024 12:58:27 +0200 Subject: [PATCH] dialog message persistence --- crates/lavina-core/src/dialog.rs | 21 ++++++++++++++--- crates/lavina-core/src/repo/dialog.rs | 33 +++++++++++++++++++++++---- crates/projection-xmpp/src/lib.rs | 1 + 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/crates/lavina-core/src/dialog.rs b/crates/lavina-core/src/dialog.rs index f87294c..f06d5e8 100644 --- a/crates/lavina-core/src/dialog.rs +++ b/crates/lavina-core/src/dialog.rs @@ -65,7 +65,10 @@ impl DialogRegistry { let dialog = guard.dialogs.get(&id); if let Some(d) = dialog { let mut d = d.write().await; - guard.storage.increment_dialog_message_count(d.storage_id).await?; + guard + .storage + .insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at) + .await?; d.message_count += 1; } else { drop(guard); @@ -73,7 +76,10 @@ impl DialogRegistry { // double check in case concurrent access has loaded this dialog if let Some(d) = guard2.dialogs.get(&id) { let mut d = d.write().await; - guard2.storage.increment_dialog_message_count(d.storage_id).await?; + guard2 + .storage + .insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at) + .await?; d.message_count += 1; } else { let (p1, p2) = id.as_inner(); @@ -86,7 +92,16 @@ impl DialogRegistry { } }; tracing::info!("Dialog {id:?} loaded"); - guard2.storage.increment_dialog_message_count(stored_dialog.id).await?; + guard2 + .storage + .insert_dialog_message( + stored_dialog.id, + stored_dialog.message_count, + from.as_inner(), + &body, + created_at, + ) + .await?; let dialog = Dialog { storage_id: stored_dialog.id, player_storage_id_1: stored_dialog.participant_1, diff --git a/crates/lavina-core/src/repo/dialog.rs b/crates/lavina-core/src/repo/dialog.rs index cbe3161..e228303 100644 --- a/crates/lavina-core/src/repo/dialog.rs +++ b/crates/lavina-core/src/repo/dialog.rs @@ -1,10 +1,11 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use chrono::{DateTime, Utc}; use sqlx::FromRow; use crate::repo::Storage; impl Storage { + #[tracing::instrument(skip(self), name = "Storage::retrieve_dialog")] pub async fn retrieve_dialog(&self, participant_1: &str, participant_2: &str) -> Result> { let mut executor = self.conn.lock().await; let res = sqlx::query_as( @@ -20,19 +21,41 @@ impl Storage { Ok(res) } - pub async fn increment_dialog_message_count(&self, storage_id: u32) -> Result<()> { + #[tracing::instrument(skip(self, content, created_at), name = "Storage::insert_dialog_message")] + pub async fn insert_dialog_message( + &self, + dialog_id: u32, + id: u32, + author_id: &str, + content: &str, + created_at: &DateTime, + ) -> Result<()> { let mut executor = self.conn.lock().await; + let res: Option<(u32,)> = sqlx::query_as("select id from users where name = ?;") + .bind(author_id) + .fetch_optional(&mut *executor) + .await?; + let Some((author_id,)) = res else { + return Err(anyhow!("No such user")); + }; sqlx::query( - "update rooms set message_count = message_count + 1 - where id = ?;", + "insert into dialog_messages(dialog_id, id, author_id, content, created_at) + values (?, ?, ?, ?, ?); + update dialogs set message_count = message_count + 1 where id = ?;", ) - .bind(storage_id) + .bind(dialog_id) + .bind(id) + .bind(author_id) + .bind(content) + .bind(created_at) + .bind(dialog_id) .execute(&mut *executor) .await?; Ok(()) } + #[tracing::instrument(skip(self, created_at), name = "Storage::initialize_dialog")] pub async fn initialize_dialog( &self, participant_1: &str, diff --git a/crates/projection-xmpp/src/lib.rs b/crates/projection-xmpp/src/lib.rs index 01f0171..d79bd50 100644 --- a/crates/projection-xmpp/src/lib.rs +++ b/crates/projection-xmpp/src/lib.rs @@ -428,6 +428,7 @@ struct XmppConnection<'a> { } impl<'a> XmppConnection<'a> { + #[tracing::instrument(skip(self, output, packet), name = "XmppConnection::handle_packet")] async fn handle_packet(&mut self, output: &mut Vec>, packet: ClientPacket) -> Result { let res = match packet { ClientPacket::Iq(iq) => {