Read and serve

This commit is contained in:
Mikhail 2024-05-23 20:52:03 +02:00
parent 6f25ce0fd5
commit f9eb510062
6 changed files with 78 additions and 34 deletions

View File

@ -0,0 +1,2 @@
alter table messages drop column created_at;
alter table messages add column created_at datetime default "1970-01-01T00:00:00Z";

View File

@ -579,13 +579,13 @@ impl Player {
let room = self.my_rooms.get(&room_id);
if let Some(room) = room {
match room {
RoomRef::Local(room) => room.get_message_history().await,
RoomRef::Local(room) => room.get_message_history(&self.services).await,
RoomRef::Remote { node_id } => {
todo!()
}
}
} else {
tracing::info!("Room with ID {room_id:?} not found");
tracing::error!("Room with ID {room_id:?} not found");
// todo: return error
todo!()
}

View File

@ -89,3 +89,13 @@ pub struct StoredDialog {
pub participant_2: u32,
pub message_count: u32,
}
#[derive(FromRow)]
pub struct StoredMessageWithAuthor {
pub room_id: u32,
pub id: u32,
pub content: String,
pub author_id: u32,
pub author_name: String,
pub created_at: DateTime<Utc>,
}

View File

@ -1,9 +1,10 @@
use crate::repo::dialog::StoredMessageWithAuthor;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use sqlx::FromRow;
use crate::repo::Storage;
use crate::room::RoomId;
use crate::room::{HistoryMessage, RoomId, User};
#[derive(FromRow)]
pub struct StoredRoom {
@ -29,6 +30,35 @@ impl Storage {
Ok(res)
}
#[tracing::instrument(skip(self), name = "Storage::retrieve_room_message_history")]
pub async fn get_room_message_history(&self, room_id: u32) -> Result<Vec<HistoryMessage>> {
let mut executor = self.conn.lock().await;
let res = sqlx::query_as(
"
select
messages.id as id,
content,
created_at,
users.id as author_id,
users.name as author_name
from
messages
join
users
on messages.author_id = users.id
where
room_id = ?
order by
created_at desc;
",
)
.bind(room_id)
.fetch_all(&mut *executor)
.await?;
Ok(res)
}
#[tracing::instrument(skip(self, topic), name = "Storage::create_new_room")]
pub async fn create_new_room(&self, name: &str, topic: &str) -> Result<u32> {
let mut executor = self.conn.lock().await;
@ -71,7 +101,7 @@ impl Storage {
.bind(id)
.bind(content)
.bind(author_id)
.bind(created_at.to_string())
.bind(created_at)
.bind(room_id)
.execute(&mut *executor)
.await?;

View File

@ -5,11 +5,13 @@ use std::{collections::HashMap, hash::Hash, sync::Arc};
use chrono::{DateTime, Utc};
use prometheus::{IntGauge, Registry as MetricRegistry};
use serde::Serialize;
use sqlx::sqlite::SqliteRow;
use sqlx::{FromRow, Row};
use tokio::sync::RwLock as AsyncRwLock;
use crate::player::{PlayerHandle, PlayerId, Updates};
use crate::prelude::*;
use crate::Services;
use crate::{LavinaCore, Services};
/// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
@ -158,27 +160,8 @@ impl RoomHandle {
lock.broadcast_update(update, player_id).await;
}
pub async fn get_message_history(&self) -> Vec<HistoryMessage> {
return vec![
HistoryMessage {
id: "kek0".to_string(),
body: "Willkommen in Brem'".to_string(),
created_at: Utc::now(),
author: User {
id: 0,
name: "sauer".to_string(),
},
},
HistoryMessage {
id: "kek1".to_string(),
body: "Willkommen in Hamburg".to_string(),
created_at: Utc::now(),
author: User {
id: 0,
name: "sauer".to_string(),
},
},
];
pub async fn get_message_history(&self, services: &LavinaCore) -> Vec<HistoryMessage> {
return services.storage.get_room_message_history(self.0.read().await.storage_id).await.unwrap();
}
#[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")]
@ -303,14 +286,30 @@ pub struct RoomInfo {
pub topic: Str,
}
#[derive(Debug)]
pub struct User {
pub id: u32,
pub name: String,
}
#[derive(Debug)]
pub struct HistoryMessage {
pub id: String,
pub id: u32,
pub author: User,
pub body: String,
pub content: String,
pub created_at: DateTime<Utc>,
}
impl FromRow<'_, SqliteRow> for HistoryMessage {
fn from_row(row: &SqliteRow) -> sqlx::Result<Self> {
Ok(Self {
id: row.try_get("id")?,
author: User {
id: row.try_get("author_id")?,
name: row.try_get("author_name")?,
},
content: row.try_get("content")?,
created_at: row.try_get("created_at")?,
})
}
}

View File

@ -102,10 +102,8 @@ impl<'a> XmppConnection<'a> {
let mut response = vec![];
for history_message in history_messages.into_iter() {
let author_name = Option::from(Name(history_message.author.name.into()));
response.push(XmppHistoryMessage {
id: history_message.id,
id: history_message.id.to_string(),
to: Jid {
name: Option::from(Name(self.user.xmpp_muc_name.0.clone().into())),
server: Server(self.hostname.clone()),
@ -114,18 +112,23 @@ impl<'a> XmppConnection<'a> {
from: Jid {
name: Option::from(room_name.clone()),
server: Server(self.hostname_rooms.clone()),
resource: Option::from(Resource("sauer".into())),
resource: Option::from(Resource(history_message.author.name.clone().into())),
},
delay: Delay::new(
Jid {
name: author_name.clone(),
name: Option::from(Name(history_message.author.name.clone().into())),
server: Server(self.hostname_rooms.clone()),
resource: None,
},
history_message.created_at.to_rfc3339(),
),
body: history_message.body,
body: history_message.content.clone(),
});
tracing::info!(
"Retrieved message: {:?} {:?}",
history_message.author,
history_message.content.clone()
);
}
return Ok(response);