forked from lavina/lavina
1
0
Fork 0

dialog message persistence

This commit is contained in:
Nikita Vilunov 2024-04-27 12:58:27 +02:00
parent 4b5ab02322
commit ea81ddadfc
3 changed files with 47 additions and 8 deletions

View File

@ -65,7 +65,10 @@ impl DialogRegistry {
let dialog = guard.dialogs.get(&id); let dialog = guard.dialogs.get(&id);
if let Some(d) = dialog { if let Some(d) = dialog {
let mut d = d.write().await; let mut d = d.write().await;
guard.storage.increment_dialog_message_count(d.storage_id).await?; guard
.storage
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
.await?;
d.message_count += 1; d.message_count += 1;
} else { } else {
drop(guard); drop(guard);
@ -73,7 +76,10 @@ impl DialogRegistry {
// double check in case concurrent access has loaded this dialog // double check in case concurrent access has loaded this dialog
if let Some(d) = guard2.dialogs.get(&id) { if let Some(d) = guard2.dialogs.get(&id) {
let mut d = d.write().await; let mut d = d.write().await;
guard2.storage.increment_dialog_message_count(d.storage_id).await?; guard2
.storage
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
.await?;
d.message_count += 1; d.message_count += 1;
} else { } else {
let (p1, p2) = id.as_inner(); let (p1, p2) = id.as_inner();
@ -86,7 +92,16 @@ impl DialogRegistry {
} }
}; };
tracing::info!("Dialog {id:?} loaded"); tracing::info!("Dialog {id:?} loaded");
guard2.storage.increment_dialog_message_count(stored_dialog.id).await?; guard2
.storage
.insert_dialog_message(
stored_dialog.id,
stored_dialog.message_count,
from.as_inner(),
&body,
created_at,
)
.await?;
let dialog = Dialog { let dialog = Dialog {
storage_id: stored_dialog.id, storage_id: stored_dialog.id,
player_storage_id_1: stored_dialog.participant_1, player_storage_id_1: stored_dialog.participant_1,

View File

@ -1,10 +1,11 @@
use anyhow::Result; use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sqlx::FromRow; use sqlx::FromRow;
use crate::repo::Storage; use crate::repo::Storage;
impl Storage { impl Storage {
#[tracing::instrument(skip(self), name = "Storage::retrieve_dialog")]
pub async fn retrieve_dialog(&self, participant_1: &str, participant_2: &str) -> Result<Option<StoredDialog>> { pub async fn retrieve_dialog(&self, participant_1: &str, participant_2: &str) -> Result<Option<StoredDialog>> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let res = sqlx::query_as( let res = sqlx::query_as(
@ -20,19 +21,41 @@ impl Storage {
Ok(res) Ok(res)
} }
pub async fn increment_dialog_message_count(&self, storage_id: u32) -> Result<()> { #[tracing::instrument(skip(self, content, created_at), name = "Storage::insert_dialog_message")]
pub async fn insert_dialog_message(
&self,
dialog_id: u32,
id: u32,
author_id: &str,
content: &str,
created_at: &DateTime<Utc>,
) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let res: Option<(u32,)> = sqlx::query_as("select id from users where name = ?;")
.bind(author_id)
.fetch_optional(&mut *executor)
.await?;
let Some((author_id,)) = res else {
return Err(anyhow!("No such user"));
};
sqlx::query( sqlx::query(
"update rooms set message_count = message_count + 1 "insert into dialog_messages(dialog_id, id, author_id, content, created_at)
where id = ?;", values (?, ?, ?, ?, ?);
update dialogs set message_count = message_count + 1 where id = ?;",
) )
.bind(storage_id) .bind(dialog_id)
.bind(id)
.bind(author_id)
.bind(content)
.bind(created_at)
.bind(dialog_id)
.execute(&mut *executor) .execute(&mut *executor)
.await?; .await?;
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, created_at), name = "Storage::initialize_dialog")]
pub async fn initialize_dialog( pub async fn initialize_dialog(
&self, &self,
participant_1: &str, participant_1: &str,

View File

@ -428,6 +428,7 @@ struct XmppConnection<'a> {
} }
impl<'a> XmppConnection<'a> { impl<'a> XmppConnection<'a> {
#[tracing::instrument(skip(self, output, packet), name = "XmppConnection::handle_packet")]
async fn handle_packet(&mut self, output: &mut Vec<Event<'static>>, packet: ClientPacket) -> Result<bool> { async fn handle_packet(&mut self, output: &mut Vec<Event<'static>>, packet: ClientPacket) -> Result<bool> {
let res = match packet { let res = match packet {
ClientPacket::Iq(iq) => { ClientPacket::Iq(iq) => {