xmpp, core: Send message history on MUC join (#68)

Re-send the entire message history on MUC join. Contributes to #5.

Reviewed-on: lavina/lavina#68
Co-authored-by: Mikhail <mikhail@liamets.dev>
Co-committed-by: Mikhail <mikhail@liamets.dev>
This commit is contained in:
Mikhail 2024-05-26 11:20:26 +00:00 committed by Nikita Vilunov
parent bce8b332d2
commit 381b5650bc
15 changed files with 328 additions and 42 deletions

View File

@ -0,0 +1,2 @@
alter table messages drop column created_at;
alter table messages add column created_at datetime default "1970-01-01T00:00:00Z";

View File

@ -18,7 +18,7 @@ use tracing::{Instrument, Span};
use crate::clustering::room::*; use crate::clustering::room::*;
use crate::prelude::*; use crate::prelude::*;
use crate::room::{RoomHandle, RoomId, RoomInfo}; use crate::room::{RoomHandle, RoomId, RoomInfo, StoredMessage};
use crate::table::{AnonTable, Key as AnonKey}; use crate::table::{AnonTable, Key as AnonKey};
use crate::LavinaCore; use crate::LavinaCore;
@ -111,6 +111,14 @@ impl PlayerConnection {
Ok(deferred.await?) Ok(deferred.await?)
} }
#[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")]
pub async fn get_room_message_history(&self, room_id: RoomId) -> Result<Vec<StoredMessage>> {
let (promise, deferred) = oneshot();
let cmd = ClientCommand::GetRoomHistory { room_id, promise };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await;
Ok(deferred.await?)
}
/// Handler in [Player::send_dialog_message]. /// Handler in [Player::send_dialog_message].
#[tracing::instrument(skip(self, body), name = "PlayerConnection::send_dialog_message")] #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_dialog_message")]
pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> { pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> {
@ -212,6 +220,10 @@ pub enum ClientCommand {
recipient: PlayerId, recipient: PlayerId,
promise: Promise<GetInfoResult>, promise: Promise<GetInfoResult>,
}, },
GetRoomHistory {
room_id: RoomId,
promise: Promise<Vec<StoredMessage>>,
},
} }
pub enum GetInfoResult { pub enum GetInfoResult {
@ -509,6 +521,10 @@ impl Player {
let result = self.check_user_existence(recipient).await; let result = self.check_user_existence(recipient).await;
let _ = promise.send(result); let _ = promise.send(result);
} }
ClientCommand::GetRoomHistory { room_id, promise } => {
let result = self.get_room_history(room_id).await;
let _ = promise.send(result);
}
} }
} }
@ -557,6 +573,23 @@ impl Player {
} }
} }
#[tracing::instrument(skip(self), name = "Player::retrieve_room_history")]
async fn get_room_history(&mut self, room_id: RoomId) -> Vec<StoredMessage> {
let room = self.my_rooms.get(&room_id);
if let Some(room) = room {
match room {
RoomRef::Local(room) => room.get_message_history(&self.services).await,
RoomRef::Remote { node_id } => {
todo!()
}
}
} else {
tracing::error!("Room with ID {room_id:?} not found");
// todo: return error
todo!()
}
}
#[tracing::instrument(skip(self), name = "Player::leave_room")] #[tracing::instrument(skip(self), name = "Player::leave_room")]
async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) { async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) {
let room = self.my_rooms.remove(&room_id); let room = self.my_rooms.remove(&room_id);
@ -593,7 +626,7 @@ impl Player {
body: Str, body: Str,
) -> SendMessageResult { ) -> SendMessageResult {
let Some(room) = self.my_rooms.get(&room_id) else { let Some(room) = self.my_rooms.get(&room_id) else {
tracing::info!("no room found"); tracing::info!("Room with ID {room_id:?} not found");
return SendMessageResult::NoSuchRoom; return SendMessageResult::NoSuchRoom;
}; };
let created_at = Utc::now(); let created_at = Utc::now();
@ -632,7 +665,7 @@ impl Player {
#[tracing::instrument(skip(self, new_topic), name = "Player::change_room_topic")] #[tracing::instrument(skip(self, new_topic), name = "Player::change_room_topic")]
async fn change_room_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { async fn change_room_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) {
let Some(room) = self.my_rooms.get(&room_id) else { let Some(room) = self.my_rooms.get(&room_id) else {
tracing::info!("no room found"); tracing::info!("Room with ID {room_id:?} not found");
return; return;
}; };
match room { match room {

View File

@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
use sqlx::FromRow; use sqlx::FromRow;
use crate::repo::Storage; use crate::repo::Storage;
use crate::room::RoomId; use crate::room::{RoomId, StoredMessage};
#[derive(FromRow)] #[derive(FromRow)]
pub struct StoredRoom { pub struct StoredRoom {
@ -29,6 +29,34 @@ impl Storage {
Ok(res) Ok(res)
} }
#[tracing::instrument(skip(self), name = "Storage::retrieve_room_message_history")]
pub async fn get_room_message_history(&self, room_id: u32) -> Result<Vec<StoredMessage>> {
let mut executor = self.conn.lock().await;
let res = sqlx::query_as(
"
select
messages.id as id,
content,
created_at,
users.name as author_name
from
messages
join
users
on messages.author_id = users.id
where
room_id = ?
order by
messages.id;
",
)
.bind(room_id)
.fetch_all(&mut *executor)
.await?;
Ok(res)
}
#[tracing::instrument(skip(self, topic), name = "Storage::create_new_room")] #[tracing::instrument(skip(self, topic), name = "Storage::create_new_room")]
pub async fn create_new_room(&self, name: &str, topic: &str) -> Result<u32> { pub async fn create_new_room(&self, name: &str, topic: &str) -> Result<u32> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
@ -71,7 +99,7 @@ impl Storage {
.bind(id) .bind(id)
.bind(content) .bind(content)
.bind(author_id) .bind(author_id)
.bind(created_at.to_string()) .bind(created_at)
.bind(room_id) .bind(room_id)
.execute(&mut *executor) .execute(&mut *executor)
.await?; .await?;
@ -174,6 +202,6 @@ impl Storage {
.fetch_all(&mut *executor) .fetch_all(&mut *executor)
.await?; .await?;
res.into_iter().map(|(room_id,)| RoomId::from(room_id)).collect() res.into_iter().map(|(room_id,)| RoomId::try_from(room_id)).collect()
} }
} }

View File

@ -5,18 +5,20 @@ use std::{collections::HashMap, hash::Hash, sync::Arc};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use prometheus::{IntGauge, Registry as MetricRegistry}; use prometheus::{IntGauge, Registry as MetricRegistry};
use serde::Serialize; use serde::Serialize;
use sqlx::sqlite::SqliteRow;
use sqlx::{FromRow, Row};
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
use crate::player::{PlayerHandle, PlayerId, Updates}; use crate::player::{PlayerHandle, PlayerId, Updates};
use crate::prelude::*; use crate::prelude::*;
use crate::Services; use crate::{LavinaCore, Services};
/// Opaque room id /// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct RoomId(Str); pub struct RoomId(Str);
impl RoomId { impl RoomId {
pub fn from(str: impl Into<Str>) -> Result<RoomId> { pub fn try_from(str: impl Into<Str>) -> Result<RoomId> {
let bytes = str.into(); let bytes = str.into();
if bytes.len() > 32 { if bytes.len() > 32 {
return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols"));
@ -158,6 +160,10 @@ impl RoomHandle {
lock.broadcast_update(update, player_id).await; lock.broadcast_update(update, player_id).await;
} }
pub async fn get_message_history(&self, services: &LavinaCore) -> Vec<StoredMessage> {
return services.storage.get_room_message_history(self.0.read().await.storage_id).await.unwrap();
}
#[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")] #[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")]
pub async fn unsubscribe(&self, player_id: &PlayerId) { pub async fn unsubscribe(&self, player_id: &PlayerId) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
@ -279,3 +285,11 @@ pub struct RoomInfo {
pub members: Vec<PlayerId>, pub members: Vec<PlayerId>,
pub topic: Str, pub topic: Str,
} }
#[derive(Debug, FromRow)]
pub struct StoredMessage {
pub id: u32,
pub author_name: String,
pub content: String,
pub created_at: DateTime<Utc>,
}

View File

@ -720,7 +720,7 @@ async fn handle_incoming_message(
} }
ClientMessage::PrivateMessage { recipient, body } => match recipient { ClientMessage::PrivateMessage { recipient, body } => match recipient {
Recipient::Chan(Chan::Global(chan)) => { Recipient::Chan(Chan::Global(chan)) => {
let room_id = RoomId::from(chan)?; let room_id = RoomId::try_from(chan)?;
user_handle.send_message(room_id, body).await?; user_handle.send_message(room_id, body).await?;
} }
Recipient::Nick(nick) => { Recipient::Nick(nick) => {
@ -732,7 +732,7 @@ async fn handle_incoming_message(
ClientMessage::Topic { chan, topic } => { ClientMessage::Topic { chan, topic } => {
match chan { match chan {
Chan::Global(chan) => { Chan::Global(chan) => {
let room_id = RoomId::from(chan)?; let room_id = RoomId::try_from(chan)?;
user_handle.change_topic(room_id.clone(), topic.clone()).await?; user_handle.change_topic(room_id.clone(), topic.clone()).await?;
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
@ -774,7 +774,7 @@ async fn handle_incoming_message(
writer.flush().await?; writer.flush().await?;
} }
Recipient::Chan(Chan::Global(chan)) => { Recipient::Chan(Chan::Global(chan)) => {
let room = core.get_room(&RoomId::from(chan.clone())?).await; let room = core.get_room(&RoomId::try_from(chan.clone())?).await;
if let Some(room) = room { if let Some(room) = room {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
for member in room_info.members { for member in room_info.members {
@ -893,7 +893,7 @@ async fn handle_join(
) -> Result<()> { ) -> Result<()> {
match chan { match chan {
Chan::Global(chan_name) => { Chan::Global(chan_name) => {
let room_id = RoomId::from(chan_name.clone())?; let room_id = RoomId::try_from(chan_name.clone())?;
match user_handle.join_room(room_id).await? { match user_handle.join_room(room_id).await? {
JoinResult::Success(room_info) => { JoinResult::Success(room_info) => {
produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?; produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?;
@ -932,7 +932,7 @@ async fn handle_part(
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> { ) -> Result<()> {
if let Chan::Global(chan_name) = chan { if let Chan::Global(chan_name) = chan {
let room_id = RoomId::from(chan_name.clone())?; let room_id = RoomId::try_from(chan_name.clone())?;
user_handle.leave_room(room_id).await?; user_handle.leave_room(room_id).await?;
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],

View File

@ -619,14 +619,14 @@ async fn server_time_capability() -> Result<()> {
server.core.create_player(&PlayerId::from("some_guy")?).await?; server.core.create_player(&PlayerId::from("some_guy")?).await?;
let mut conn = server.core.connect_to_player(&PlayerId::from("some_guy").unwrap()).await; let mut conn = server.core.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
let res = conn.join_room(RoomId::from("test").unwrap()).await?; let res = conn.join_room(RoomId::try_from("test").unwrap()).await?;
let JoinResult::Success(_) = res else { let JoinResult::Success(_) = res else {
panic!("Failed to join room"); panic!("Failed to join room");
}; };
s.expect(":some_guy JOIN #test").await?; s.expect(":some_guy JOIN #test").await?;
let SendMessageResult::Success(res) = conn.send_message(RoomId::from("test").unwrap(), "Hello".into()).await? let SendMessageResult::Success(res) = conn.send_message(RoomId::try_from("test").unwrap(), "Hello".into()).await?
else { else {
panic!("Failed to send message"); panic!("Failed to send message");
}; };

View File

@ -163,7 +163,7 @@ impl<'a> XmppConnection<'a> {
server, server,
resource: None, resource: None,
}) if server.0 == self.hostname_rooms => { }) if server.0 == self.hostname_rooms => {
let room_id = RoomId::from(room_name.0.clone()).unwrap(); let room_id = RoomId::try_from(room_name.0.clone()).unwrap();
let Some(_) = self.core.get_room(&room_id).await else { let Some(_) = self.core.get_room(&room_id).await else {
// TODO should return item-not-found // TODO should return item-not-found
// example: // example:

View File

@ -21,7 +21,7 @@ impl<'a> XmppConnection<'a> {
{ {
if server.0.as_ref() == &*self.hostname_rooms && m.r#type == MessageType::Groupchat { if server.0.as_ref() == &*self.hostname_rooms && m.r#type == MessageType::Groupchat {
let Some(body) = &m.body else { return Ok(()) }; let Some(body) = &m.body else { return Ok(()) };
self.user_handle.send_message(RoomId::from(name.0.clone())?, body.clone()).await?; self.user_handle.send_message(RoomId::try_from(name.0.clone())?, body.clone()).await?;
Message::<()> { Message::<()> {
to: Some(Jid { to: Some(Jid {
name: Some(self.user.xmpp_name.clone()), name: Some(self.user.xmpp_name.clone()),

View File

@ -4,9 +4,9 @@ use anyhow::Result;
use quick_xml::events::Event; use quick_xml::events::Event;
use lavina_core::room::RoomId; use lavina_core::room::RoomId;
use proto_xmpp::bind::{Jid, Name, Server}; use proto_xmpp::bind::{Jid, Name, Resource, Server};
use proto_xmpp::client::{Message, MessageType, Presence, Subject}; use proto_xmpp::client::{Message, MessageType, Presence, Subject};
use proto_xmpp::muc::{Affiliation, Role, XUser, XUserItem}; use proto_xmpp::muc::{Affiliation, Delay, Role, XUser, XUserItem, XmppHistoryMessage};
use proto_xmpp::xml::{Ignore, ToXml}; use proto_xmpp::xml::{Ignore, ToXml};
use crate::XmppConnection; use crate::XmppConnection;
@ -23,11 +23,11 @@ impl<'a> XmppConnection<'a> {
// resources in MUCs are members' personas not implemented (yet?) // resources in MUCs are members' personas not implemented (yet?)
resource: Some(_), resource: Some(_),
}) if server.0 == self.hostname_rooms => { }) if server.0 == self.hostname_rooms => {
let mut response = self.muc_presence(&name).await?; let mut muc_presence = self.retrieve_muc_presence(&name).await?;
response.id = p.id; muc_presence.id = p.id;
let subject = Message::<()> { let subject = Message::<()> {
from: Some(Jid { from: Some(Jid {
name: Some(name), name: Some(name.clone()),
server: Server(self.hostname_rooms.clone()), server: Server(self.hostname_rooms.clone()),
resource: None, resource: None,
}), }),
@ -43,7 +43,13 @@ impl<'a> XmppConnection<'a> {
body: None, body: None,
custom: vec![], custom: vec![],
}; };
response.serialize(output); muc_presence.serialize(output);
let messages = self.retrieve_message_history(&name).await?;
for message in messages {
message.serialize(output)
}
// The subject is the last stanza sent during a MUC join process.
subject.serialize(output); subject.serialize(output);
} }
_ => { _ => {
@ -82,9 +88,8 @@ impl<'a> XmppConnection<'a> {
} }
} }
// todo: return Presence and serialize on the outside. async fn retrieve_muc_presence(&mut self, name: &Name) -> Result<Presence<XUser>> {
async fn muc_presence(&mut self, name: &Name) -> Result<(Presence<XUser>)> { let a = self.user_handle.join_room(RoomId::try_from(name.0.clone())?).await?;
let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?;
// TODO handle bans // TODO handle bans
let response = Presence { let response = Presence {
to: Some(Jid { to: Some(Jid {
@ -114,21 +119,62 @@ impl<'a> XmppConnection<'a> {
}; };
Ok(response) Ok(response)
} }
/// Retrieve a room's message history. The output can be serialized into a stream of XML stanzas.
///
/// Example in [XmppHistoryMessage]'s docs.
#[tracing::instrument(skip(self), name = "XmppConnection::retrieve_message_history")]
async fn retrieve_message_history(&self, room_name: &Name) -> Result<Vec<XmppHistoryMessage>> {
let room_id = RoomId::try_from(room_name.0.clone())?;
let history_messages = self.user_handle.get_room_message_history(room_id).await?;
let mut response = vec![];
for history_message in history_messages.into_iter() {
response.push(XmppHistoryMessage {
id: history_message.id.to_string(),
to: Jid {
name: Option::from(Name(self.user.xmpp_muc_name.0.clone().into())),
server: Server(self.hostname.clone()),
resource: None,
},
from: Jid {
name: Option::from(room_name.clone()),
server: Server(self.hostname_rooms.clone()),
resource: Option::from(Resource(history_message.author_name.clone().into())),
},
delay: Delay {
from: Jid {
name: Option::from(Name(history_message.author_name.clone().into())),
server: Server(self.hostname_rooms.clone()),
resource: None,
},
stamp: history_message.created_at.to_rfc3339(),
},
body: history_message.content.clone(),
});
tracing::info!(
"Retrieved message: {:?} {:?}",
history_message.author_name,
history_message.content.clone()
);
} }
// todo: set up so that the user has been previously joined. return Ok(response);
// todo: first call to muc_presence is OK, next one is OK too. }
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::testkit::{expect_user_authenticated, TestServer};
use crate::Authenticated;
use anyhow::Result; use anyhow::Result;
use lavina_core::player::PlayerId; use lavina_core::player::PlayerId;
use proto_xmpp::bind::{Jid, Name, Resource, Server}; use proto_xmpp::bind::{Jid, Name, Resource, Server};
use proto_xmpp::client::Presence; use proto_xmpp::client::Presence;
use proto_xmpp::muc::{Affiliation, Role, XUser, XUserItem}; use proto_xmpp::muc::{Affiliation, Role, XUser, XUserItem};
use crate::testkit::{expect_user_authenticated, TestServer};
use crate::Authenticated;
#[tokio::test] #[tokio::test]
async fn test_muc_joining() -> Result<()> { async fn test_muc_joining() -> Result<()> {
let server = TestServer::start().await.unwrap(); let server = TestServer::start().await.unwrap();
@ -146,7 +192,7 @@ mod tests {
let mut player_conn = server.core.connect_to_player(&user.player_id).await; let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); let muc_presence = conn.retrieve_muc_presence(&user.xmpp_name).await.unwrap();
let expected = Presence { let expected = Presence {
to: Some(Jid { to: Some(Jid {
name: Some(conn.user.xmpp_name.clone()), name: Some(conn.user.xmpp_name.clone()),
@ -173,7 +219,7 @@ mod tests {
}], }],
..Default::default() ..Default::default()
}; };
assert_eq!(expected, response); assert_eq!(expected, muc_presence);
server.shutdown().await.unwrap(); server.shutdown().await.unwrap();
Ok(()) Ok(())
@ -198,7 +244,7 @@ mod tests {
let mut player_conn = server.core.connect_to_player(&user.player_id).await; let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); let response = conn.retrieve_muc_presence(&user.xmpp_name).await.unwrap();
let expected = Presence { let expected = Presence {
to: Some(Jid { to: Some(Jid {
name: Some(conn.user.xmpp_name.clone()), name: Some(conn.user.xmpp_name.clone()),
@ -233,7 +279,7 @@ mod tests {
let mut player_conn = server.core.connect_to_player(&user.player_id).await; let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); let response = conn.retrieve_muc_presence(&user.xmpp_name).await.unwrap();
assert_eq!(expected, response); assert_eq!(expected, response);
server.shutdown().await.unwrap(); server.shutdown().await.unwrap();

View File

@ -15,6 +15,9 @@ pub mod streamerror;
pub mod tls; pub mod tls;
pub mod xml; pub mod xml;
#[cfg(test)]
mod testkit;
// Implemented as a macro instead of a fn due to borrowck limitations // Implemented as a macro instead of a fn due to borrowck limitations
macro_rules! skip_text { macro_rules! skip_text {
($reader: ident, $buf: ident) => { ($reader: ident, $buf: ident) => {

View File

@ -1,14 +1,16 @@
#![allow(unused_variables)] #![allow(unused_variables)]
use quick_xml::events::{BytesEnd, BytesStart, Event}; use anyhow::{anyhow, Result};
use quick_xml::name::ResolveResult; use quick_xml::events::attributes::Attribute;
use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event};
use quick_xml::name::{QName, ResolveResult};
use crate::bind::Jid; use crate::bind::Jid;
use crate::xml::*; use crate::xml::*;
use anyhow::{anyhow, Result};
pub const XMLNS: &'static str = "http://jabber.org/protocol/muc"; pub const XMLNS: &'static str = "http://jabber.org/protocol/muc";
pub const XMLNS_USER: &'static str = "http://jabber.org/protocol/muc#user"; pub const XMLNS_USER: &'static str = "http://jabber.org/protocol/muc#user";
pub const XMLNS_DELAY: &'static str = "urn:xmpp:delay";
#[derive(PartialEq, Eq, Debug, Default)] #[derive(PartialEq, Eq, Debug, Default)]
pub struct History { pub struct History {
@ -154,6 +156,7 @@ pub struct XUser {
/// Code 201. The room from which the presence stanza was sent was just created. /// Code 201. The room from which the presence stanza was sent was just created.
pub just_created: bool, pub just_created: bool,
} }
impl ToXml for XUser { impl ToXml for XUser {
fn serialize(&self, output: &mut Vec<Event<'static>>) { fn serialize(&self, output: &mut Vec<Event<'static>>) {
let mut tag = BytesStart::new("x"); let mut tag = BytesStart::new("x");
@ -180,6 +183,7 @@ pub struct XUserItem {
pub jid: Jid, pub jid: Jid,
pub role: Role, pub role: Role,
} }
impl ToXml for XUserItem { impl ToXml for XUserItem {
fn serialize(&self, output: &mut Vec<Event<'static>>) { fn serialize(&self, output: &mut Vec<Event<'static>>) {
let mut meg = BytesStart::new("item"); let mut meg = BytesStart::new("item");
@ -198,6 +202,7 @@ pub enum Affiliation {
Outcast, Outcast,
None, None,
} }
impl Affiliation { impl Affiliation {
pub fn from_str(s: &str) -> Option<Self> { pub fn from_str(s: &str) -> Option<Self> {
match s { match s {
@ -228,6 +233,7 @@ pub enum Role {
Visitor, Visitor,
None, None,
} }
impl Role { impl Role {
pub fn from_str(s: &str) -> Option<Self> { pub fn from_str(s: &str) -> Option<Self> {
match s { match s {
@ -249,9 +255,83 @@ impl Role {
} }
} }
#[derive(Debug, PartialEq, Eq)]
pub struct Delay {
pub from: Jid,
pub stamp: String,
}
impl ToXml for Delay {
fn serialize(&self, events: &mut Vec<Event>) {
let mut tag = BytesStart::new("delay");
tag.push_attribute(Attribute {
key: QName(b"xmlns"),
value: XMLNS_DELAY.as_bytes().into(),
});
tag.push_attribute(Attribute {
key: QName(b"from"),
value: self.from.to_string().into_bytes().into(),
});
tag.push_attribute(Attribute {
key: QName(b"stamp"),
value: self.stamp.as_bytes().into(),
});
events.push(Event::Empty(tag));
}
}
/// Message-stanza of a historic message.
///
/// Example:
/// ```xml
/// <message from="duqedadi@conference.example.com/misha" xml:lang="en" to="misha@example.com/tux" type="groupchat" id="7ca7cb14-b2af-49c9-bd90-05dabb1113a5">
/// <delay xmlns="urn:xmpp:delay" stamp="2024-05-17T16:05:28.440337Z" from="duqedadi@conference.example.com"/>
/// <body></body>
/// </message>
/// ```
#[derive(Debug, PartialEq, Eq)]
pub struct XmppHistoryMessage {
pub id: String,
pub to: Jid,
pub from: Jid,
pub delay: Delay,
pub body: String,
}
impl ToXml for XmppHistoryMessage {
fn serialize(&self, events: &mut Vec<Event<'static>>) {
let mut message_tag = BytesStart::new("message");
message_tag.push_attribute(Attribute {
key: QName(b"id"),
value: self.id.as_str().as_bytes().into(),
});
message_tag.push_attribute(Attribute {
key: QName(b"to"),
value: self.to.to_string().into_bytes().into(),
});
message_tag.push_attribute(Attribute {
key: QName(b"from"),
value: self.from.to_string().into_bytes().into(),
});
message_tag.push_attribute(Attribute {
key: QName(b"type"),
value: b"groupchat".into(),
});
events.push(Event::Start(message_tag));
self.delay.serialize(events);
let body_tag = BytesStart::new("body");
events.push(Event::Start(body_tag));
events.push(Event::Text(BytesText::new(self.body.to_string().as_str()).into_owned()));
events.push(Event::End(BytesEnd::new("body")));
events.push(Event::End(BytesEnd::new("message")));
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use crate::bind::{Name, Resource, Server};
use crate::testkit::assemble_string_from_event_flow;
#[test] #[test]
fn test_history_success_empty() { fn test_history_success_empty() {
@ -334,4 +414,40 @@ mod test {
}; };
assert_eq!(res, expected); assert_eq!(res, expected);
} }
#[test]
fn test_history_message_serialization() {
// Arrange
let history_message = XmppHistoryMessage {
id: "id".to_string(),
to: Jid {
name: Some(Name("sauer@example.com".into())),
server: Server("localhost".into()),
resource: Some(Resource("tester".into())),
},
from: Jid {
name: Some(Name("pepe".into())),
server: Server("rooms.localhost".into()),
resource: Some(Resource("sauer".into())),
},
delay: Delay {
from: Jid {
name: Some(Name("pepe".into())),
server: Server("rooms.localhost".into()),
resource: Some(Resource("tester".into())),
},
stamp: "2021-10-10T10:10:10Z".to_string(),
},
body: "Hello World.".to_string(),
};
let mut events = vec![];
let expected = r#"<message id="id" to="sauer@example.com@localhost/tester" from="pepe@rooms.localhost/sauer" type="groupchat"><delay xmlns="urn:xmpp:delay" from="pepe@rooms.localhost/tester" stamp="2021-10-10T10:10:10Z"/><body>Hello World.</body></message>"#;
// Act
history_message.serialize(&mut events);
let flow = assemble_string_from_event_flow(&events);
// Assert
assert_eq!(flow, expected);
}
} }

View File

@ -0,0 +1,12 @@
use quick_xml::events::Event;
use quick_xml::Writer;
use std::io::Cursor;
pub fn assemble_string_from_event_flow(events: &Vec<Event<'_>>) -> String {
let mut writer = Writer::new(Cursor::new(Vec::new()));
for event in events {
writer.write_event(event).unwrap();
}
let result = writer.into_inner().into_inner();
String::from_utf8(result).unwrap()
}

View File

@ -67,3 +67,35 @@ Or you can build it and run manually:
cargo build --release cargo build --release
./target/release/lavina --config config.toml ./target/release/lavina --config config.toml
## Migrations
### Prerequisites
Install sqlx-cli into ~/.local/bin:
cargo install --locked sqlx-cli
### Steps
Migrations run on every application start. For manual run, use sqlx:
sqlx mig run \
--source ./crates/lavina-core/migrations/ \
--database-url sqlite://db.sqlite
To see current status:
sqlx mig info \
--source ./crates/lavina-core/migrations/ \
--database-url sqlite://db.sqlite
sqlx mig info outputs
0/installed first
1/installed msg author
2/installed created at for messages
3/installed dialogs
4/installed new challenges
5/pending message datetime

View File

@ -164,7 +164,7 @@ async fn endpoint_send_room_message(
let Ok(req) = serde_json::from_slice::<rooms::SendMessageReq>(&str[..]) else { let Ok(req) = serde_json::from_slice::<rooms::SendMessageReq>(&str[..]) else {
return Ok(malformed_request()); return Ok(malformed_request());
}; };
let Ok(room_id) = RoomId::from(req.room_id) else { let Ok(room_id) = RoomId::try_from(req.room_id) else {
return Ok(room_not_found()); return Ok(room_not_found());
}; };
let Ok(player_id) = PlayerId::from(req.author_id) else { let Ok(player_id) = PlayerId::from(req.author_id) else {
@ -187,7 +187,7 @@ async fn endpoint_set_room_topic(
let Ok(req) = serde_json::from_slice::<rooms::SetTopicReq>(&str[..]) else { let Ok(req) = serde_json::from_slice::<rooms::SetTopicReq>(&str[..]) else {
return Ok(malformed_request()); return Ok(malformed_request());
}; };
let Ok(room_id) = RoomId::from(req.room_id) else { let Ok(room_id) = RoomId::try_from(req.room_id) else {
return Ok(room_not_found()); return Ok(room_not_found());
}; };
let Ok(player_id) = PlayerId::from(req.author_id) else { let Ok(player_id) = PlayerId::from(req.author_id) else {

View File

@ -29,7 +29,7 @@ async fn endpoint_cluster_join_room(
return Ok(malformed_request()); return Ok(malformed_request());
}; };
tracing::info!("Incoming request: {:?}", &req); tracing::info!("Incoming request: {:?}", &req);
let Ok(room_id) = RoomId::from(req.room_id) else { let Ok(room_id) = RoomId::try_from(req.room_id) else {
dbg!(&req.room_id); dbg!(&req.room_id);
return Ok(room_not_found()); return Ok(room_not_found());
}; };
@ -55,7 +55,7 @@ async fn endpoint_cluster_add_message(
dbg!(&req.created_at); dbg!(&req.created_at);
return Ok(malformed_request()); return Ok(malformed_request());
}; };
let Ok(room_id) = RoomId::from(req.room_id) else { let Ok(room_id) = RoomId::try_from(req.room_id) else {
dbg!(&req.room_id); dbg!(&req.room_id);
return Ok(room_not_found()); return Ok(room_not_found());
}; };