From 381b5650bc15814b66d2879d148c9f60d218e0bf Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sun, 26 May 2024 11:20:26 +0000 Subject: [PATCH] xmpp, core: Send message history on MUC join (#68) Re-send the entire message history on MUC join. Contributes to #5. Reviewed-on: https://git.vilunov.me/lavina/lavina/pulls/68 Co-authored-by: Mikhail Co-committed-by: Mikhail --- .../migrations/5_message_datetime.sql | 2 + crates/lavina-core/src/player.rs | 39 +++++- crates/lavina-core/src/repo/room.rs | 34 ++++- crates/lavina-core/src/room.rs | 18 ++- crates/projection-irc/src/lib.rs | 10 +- crates/projection-irc/tests/lib.rs | 4 +- crates/projection-xmpp/src/iq.rs | 2 +- crates/projection-xmpp/src/message.rs | 2 +- crates/projection-xmpp/src/presence.rs | 82 +++++++++--- crates/proto-xmpp/src/lib.rs | 3 + crates/proto-xmpp/src/muc/mod.rs | 122 +++++++++++++++++- crates/proto-xmpp/src/testkit.rs | 12 ++ docs/running.md | 32 +++++ src/http.rs | 4 +- src/http/clustering.rs | 4 +- 15 files changed, 328 insertions(+), 42 deletions(-) create mode 100644 crates/lavina-core/migrations/5_message_datetime.sql create mode 100644 crates/proto-xmpp/src/testkit.rs diff --git a/crates/lavina-core/migrations/5_message_datetime.sql b/crates/lavina-core/migrations/5_message_datetime.sql new file mode 100644 index 0000000..0b4032b --- /dev/null +++ b/crates/lavina-core/migrations/5_message_datetime.sql @@ -0,0 +1,2 @@ +alter table messages drop column created_at; +alter table messages add column created_at datetime default "1970-01-01T00:00:00Z"; diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 292c220..633593e 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -18,7 +18,7 @@ use tracing::{Instrument, Span}; use crate::clustering::room::*; use crate::prelude::*; -use crate::room::{RoomHandle, RoomId, RoomInfo}; +use crate::room::{RoomHandle, RoomId, RoomInfo, StoredMessage}; use crate::table::{AnonTable, Key as AnonKey}; use crate::LavinaCore; @@ -111,6 +111,14 @@ impl PlayerConnection { Ok(deferred.await?) } + #[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")] + pub async fn get_room_message_history(&self, room_id: RoomId) -> Result> { + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::GetRoomHistory { room_id, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) + } + /// Handler in [Player::send_dialog_message]. #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_dialog_message")] pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> { @@ -212,6 +220,10 @@ pub enum ClientCommand { recipient: PlayerId, promise: Promise, }, + GetRoomHistory { + room_id: RoomId, + promise: Promise>, + }, } pub enum GetInfoResult { @@ -509,6 +521,10 @@ impl Player { let result = self.check_user_existence(recipient).await; let _ = promise.send(result); } + ClientCommand::GetRoomHistory { room_id, promise } => { + let result = self.get_room_history(room_id).await; + let _ = promise.send(result); + } } } @@ -557,6 +573,23 @@ impl Player { } } + #[tracing::instrument(skip(self), name = "Player::retrieve_room_history")] + async fn get_room_history(&mut self, room_id: RoomId) -> Vec { + let room = self.my_rooms.get(&room_id); + if let Some(room) = room { + match room { + RoomRef::Local(room) => room.get_message_history(&self.services).await, + RoomRef::Remote { node_id } => { + todo!() + } + } + } else { + tracing::error!("Room with ID {room_id:?} not found"); + // todo: return error + todo!() + } + } + #[tracing::instrument(skip(self), name = "Player::leave_room")] async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) { let room = self.my_rooms.remove(&room_id); @@ -593,7 +626,7 @@ impl Player { body: Str, ) -> SendMessageResult { let Some(room) = self.my_rooms.get(&room_id) else { - tracing::info!("no room found"); + tracing::info!("Room with ID {room_id:?} not found"); return SendMessageResult::NoSuchRoom; }; let created_at = Utc::now(); @@ -632,7 +665,7 @@ impl Player { #[tracing::instrument(skip(self, new_topic), name = "Player::change_room_topic")] async fn change_room_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { let Some(room) = self.my_rooms.get(&room_id) else { - tracing::info!("no room found"); + tracing::info!("Room with ID {room_id:?} not found"); return; }; match room { diff --git a/crates/lavina-core/src/repo/room.rs b/crates/lavina-core/src/repo/room.rs index 3a07c6d..bca612b 100644 --- a/crates/lavina-core/src/repo/room.rs +++ b/crates/lavina-core/src/repo/room.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use sqlx::FromRow; use crate::repo::Storage; -use crate::room::RoomId; +use crate::room::{RoomId, StoredMessage}; #[derive(FromRow)] pub struct StoredRoom { @@ -29,6 +29,34 @@ impl Storage { Ok(res) } + #[tracing::instrument(skip(self), name = "Storage::retrieve_room_message_history")] + pub async fn get_room_message_history(&self, room_id: u32) -> Result> { + let mut executor = self.conn.lock().await; + let res = sqlx::query_as( + " + select + messages.id as id, + content, + created_at, + users.name as author_name + from + messages + join + users + on messages.author_id = users.id + where + room_id = ? + order by + messages.id; + ", + ) + .bind(room_id) + .fetch_all(&mut *executor) + .await?; + + Ok(res) + } + #[tracing::instrument(skip(self, topic), name = "Storage::create_new_room")] pub async fn create_new_room(&self, name: &str, topic: &str) -> Result { let mut executor = self.conn.lock().await; @@ -71,7 +99,7 @@ impl Storage { .bind(id) .bind(content) .bind(author_id) - .bind(created_at.to_string()) + .bind(created_at) .bind(room_id) .execute(&mut *executor) .await?; @@ -174,6 +202,6 @@ impl Storage { .fetch_all(&mut *executor) .await?; - res.into_iter().map(|(room_id,)| RoomId::from(room_id)).collect() + res.into_iter().map(|(room_id,)| RoomId::try_from(room_id)).collect() } } diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index bd26967..1b1ce30 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -5,18 +5,20 @@ use std::{collections::HashMap, hash::Hash, sync::Arc}; use chrono::{DateTime, Utc}; use prometheus::{IntGauge, Registry as MetricRegistry}; use serde::Serialize; +use sqlx::sqlite::SqliteRow; +use sqlx::{FromRow, Row}; use tokio::sync::RwLock as AsyncRwLock; use crate::player::{PlayerHandle, PlayerId, Updates}; use crate::prelude::*; -use crate::Services; +use crate::{LavinaCore, Services}; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub struct RoomId(Str); impl RoomId { - pub fn from(str: impl Into) -> Result { + pub fn try_from(str: impl Into) -> Result { let bytes = str.into(); if bytes.len() > 32 { return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); @@ -158,6 +160,10 @@ impl RoomHandle { lock.broadcast_update(update, player_id).await; } + pub async fn get_message_history(&self, services: &LavinaCore) -> Vec { + return services.storage.get_room_message_history(self.0.read().await.storage_id).await.unwrap(); + } + #[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")] pub async fn unsubscribe(&self, player_id: &PlayerId) { let mut lock = self.0.write().await; @@ -279,3 +285,11 @@ pub struct RoomInfo { pub members: Vec, pub topic: Str, } + +#[derive(Debug, FromRow)] +pub struct StoredMessage { + pub id: u32, + pub author_name: String, + pub content: String, + pub created_at: DateTime, +} diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 6b4374c..4bceaf5 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -720,7 +720,7 @@ async fn handle_incoming_message( } ClientMessage::PrivateMessage { recipient, body } => match recipient { Recipient::Chan(Chan::Global(chan)) => { - let room_id = RoomId::from(chan)?; + let room_id = RoomId::try_from(chan)?; user_handle.send_message(room_id, body).await?; } Recipient::Nick(nick) => { @@ -732,7 +732,7 @@ async fn handle_incoming_message( ClientMessage::Topic { chan, topic } => { match chan { Chan::Global(chan) => { - let room_id = RoomId::from(chan)?; + let room_id = RoomId::try_from(chan)?; user_handle.change_topic(room_id.clone(), topic.clone()).await?; ServerMessage { tags: vec![], @@ -774,7 +774,7 @@ async fn handle_incoming_message( writer.flush().await?; } Recipient::Chan(Chan::Global(chan)) => { - let room = core.get_room(&RoomId::from(chan.clone())?).await; + let room = core.get_room(&RoomId::try_from(chan.clone())?).await; if let Some(room) = room { let room_info = room.get_room_info().await; for member in room_info.members { @@ -893,7 +893,7 @@ async fn handle_join( ) -> Result<()> { match chan { Chan::Global(chan_name) => { - let room_id = RoomId::from(chan_name.clone())?; + let room_id = RoomId::try_from(chan_name.clone())?; match user_handle.join_room(room_id).await? { JoinResult::Success(room_info) => { produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?; @@ -932,7 +932,7 @@ async fn handle_part( writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { if let Chan::Global(chan_name) = chan { - let room_id = RoomId::from(chan_name.clone())?; + let room_id = RoomId::try_from(chan_name.clone())?; user_handle.leave_room(room_id).await?; ServerMessage { tags: vec![], diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs index 40a8c38..c94c25b 100644 --- a/crates/projection-irc/tests/lib.rs +++ b/crates/projection-irc/tests/lib.rs @@ -619,14 +619,14 @@ async fn server_time_capability() -> Result<()> { server.core.create_player(&PlayerId::from("some_guy")?).await?; let mut conn = server.core.connect_to_player(&PlayerId::from("some_guy").unwrap()).await; - let res = conn.join_room(RoomId::from("test").unwrap()).await?; + let res = conn.join_room(RoomId::try_from("test").unwrap()).await?; let JoinResult::Success(_) = res else { panic!("Failed to join room"); }; s.expect(":some_guy JOIN #test").await?; - let SendMessageResult::Success(res) = conn.send_message(RoomId::from("test").unwrap(), "Hello".into()).await? + let SendMessageResult::Success(res) = conn.send_message(RoomId::try_from("test").unwrap(), "Hello".into()).await? else { panic!("Failed to send message"); }; diff --git a/crates/projection-xmpp/src/iq.rs b/crates/projection-xmpp/src/iq.rs index a4cfdcc..d16678f 100644 --- a/crates/projection-xmpp/src/iq.rs +++ b/crates/projection-xmpp/src/iq.rs @@ -163,7 +163,7 @@ impl<'a> XmppConnection<'a> { server, resource: None, }) if server.0 == self.hostname_rooms => { - let room_id = RoomId::from(room_name.0.clone()).unwrap(); + let room_id = RoomId::try_from(room_name.0.clone()).unwrap(); let Some(_) = self.core.get_room(&room_id).await else { // TODO should return item-not-found // example: diff --git a/crates/projection-xmpp/src/message.rs b/crates/projection-xmpp/src/message.rs index f490ace..1d6fc1d 100644 --- a/crates/projection-xmpp/src/message.rs +++ b/crates/projection-xmpp/src/message.rs @@ -21,7 +21,7 @@ impl<'a> XmppConnection<'a> { { if server.0.as_ref() == &*self.hostname_rooms && m.r#type == MessageType::Groupchat { let Some(body) = &m.body else { return Ok(()) }; - self.user_handle.send_message(RoomId::from(name.0.clone())?, body.clone()).await?; + self.user_handle.send_message(RoomId::try_from(name.0.clone())?, body.clone()).await?; Message::<()> { to: Some(Jid { name: Some(self.user.xmpp_name.clone()), diff --git a/crates/projection-xmpp/src/presence.rs b/crates/projection-xmpp/src/presence.rs index 8527e2e..2a0986d 100644 --- a/crates/projection-xmpp/src/presence.rs +++ b/crates/projection-xmpp/src/presence.rs @@ -4,9 +4,9 @@ use anyhow::Result; use quick_xml::events::Event; use lavina_core::room::RoomId; -use proto_xmpp::bind::{Jid, Name, Server}; +use proto_xmpp::bind::{Jid, Name, Resource, Server}; use proto_xmpp::client::{Message, MessageType, Presence, Subject}; -use proto_xmpp::muc::{Affiliation, Role, XUser, XUserItem}; +use proto_xmpp::muc::{Affiliation, Delay, Role, XUser, XUserItem, XmppHistoryMessage}; use proto_xmpp::xml::{Ignore, ToXml}; use crate::XmppConnection; @@ -23,11 +23,11 @@ impl<'a> XmppConnection<'a> { // resources in MUCs are members' personas – not implemented (yet?) resource: Some(_), }) if server.0 == self.hostname_rooms => { - let mut response = self.muc_presence(&name).await?; - response.id = p.id; + let mut muc_presence = self.retrieve_muc_presence(&name).await?; + muc_presence.id = p.id; let subject = Message::<()> { from: Some(Jid { - name: Some(name), + name: Some(name.clone()), server: Server(self.hostname_rooms.clone()), resource: None, }), @@ -43,7 +43,13 @@ impl<'a> XmppConnection<'a> { body: None, custom: vec![], }; - response.serialize(output); + muc_presence.serialize(output); + + let messages = self.retrieve_message_history(&name).await?; + for message in messages { + message.serialize(output) + } + // The subject is the last stanza sent during a MUC join process. subject.serialize(output); } _ => { @@ -82,9 +88,8 @@ impl<'a> XmppConnection<'a> { } } - // todo: return Presence and serialize on the outside. - async fn muc_presence(&mut self, name: &Name) -> Result<(Presence)> { - let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?; + async fn retrieve_muc_presence(&mut self, name: &Name) -> Result> { + let a = self.user_handle.join_room(RoomId::try_from(name.0.clone())?).await?; // TODO handle bans let response = Presence { to: Some(Jid { @@ -114,21 +119,62 @@ impl<'a> XmppConnection<'a> { }; Ok(response) } -} -// todo: set up so that the user has been previously joined. -// todo: first call to muc_presence is OK, next one is OK too. + /// Retrieve a room's message history. The output can be serialized into a stream of XML stanzas. + /// + /// Example in [XmppHistoryMessage]'s docs. + #[tracing::instrument(skip(self), name = "XmppConnection::retrieve_message_history")] + async fn retrieve_message_history(&self, room_name: &Name) -> Result> { + let room_id = RoomId::try_from(room_name.0.clone())?; + let history_messages = self.user_handle.get_room_message_history(room_id).await?; + let mut response = vec![]; + + for history_message in history_messages.into_iter() { + response.push(XmppHistoryMessage { + id: history_message.id.to_string(), + to: Jid { + name: Option::from(Name(self.user.xmpp_muc_name.0.clone().into())), + server: Server(self.hostname.clone()), + resource: None, + }, + from: Jid { + name: Option::from(room_name.clone()), + server: Server(self.hostname_rooms.clone()), + resource: Option::from(Resource(history_message.author_name.clone().into())), + }, + delay: Delay { + from: Jid { + name: Option::from(Name(history_message.author_name.clone().into())), + server: Server(self.hostname_rooms.clone()), + resource: None, + }, + stamp: history_message.created_at.to_rfc3339(), + }, + body: history_message.content.clone(), + }); + tracing::info!( + "Retrieved message: {:?} {:?}", + history_message.author_name, + history_message.content.clone() + ); + } + + return Ok(response); + } +} #[cfg(test)] mod tests { - use crate::testkit::{expect_user_authenticated, TestServer}; - use crate::Authenticated; use anyhow::Result; + use lavina_core::player::PlayerId; use proto_xmpp::bind::{Jid, Name, Resource, Server}; use proto_xmpp::client::Presence; use proto_xmpp::muc::{Affiliation, Role, XUser, XUserItem}; + use crate::testkit::{expect_user_authenticated, TestServer}; + use crate::Authenticated; + #[tokio::test] async fn test_muc_joining() -> Result<()> { let server = TestServer::start().await.unwrap(); @@ -146,7 +192,7 @@ mod tests { let mut player_conn = server.core.connect_to_player(&user.player_id).await; let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); - let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); + let muc_presence = conn.retrieve_muc_presence(&user.xmpp_name).await.unwrap(); let expected = Presence { to: Some(Jid { name: Some(conn.user.xmpp_name.clone()), @@ -173,7 +219,7 @@ mod tests { }], ..Default::default() }; - assert_eq!(expected, response); + assert_eq!(expected, muc_presence); server.shutdown().await.unwrap(); Ok(()) @@ -198,7 +244,7 @@ mod tests { let mut player_conn = server.core.connect_to_player(&user.player_id).await; let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); - let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); + let response = conn.retrieve_muc_presence(&user.xmpp_name).await.unwrap(); let expected = Presence { to: Some(Jid { name: Some(conn.user.xmpp_name.clone()), @@ -233,7 +279,7 @@ mod tests { let mut player_conn = server.core.connect_to_player(&user.player_id).await; let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); - let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); + let response = conn.retrieve_muc_presence(&user.xmpp_name).await.unwrap(); assert_eq!(expected, response); server.shutdown().await.unwrap(); diff --git a/crates/proto-xmpp/src/lib.rs b/crates/proto-xmpp/src/lib.rs index 71e8a94..a4d5b45 100644 --- a/crates/proto-xmpp/src/lib.rs +++ b/crates/proto-xmpp/src/lib.rs @@ -15,6 +15,9 @@ pub mod streamerror; pub mod tls; pub mod xml; +#[cfg(test)] +mod testkit; + // Implemented as a macro instead of a fn due to borrowck limitations macro_rules! skip_text { ($reader: ident, $buf: ident) => { diff --git a/crates/proto-xmpp/src/muc/mod.rs b/crates/proto-xmpp/src/muc/mod.rs index 2fcfe02..7e59162 100644 --- a/crates/proto-xmpp/src/muc/mod.rs +++ b/crates/proto-xmpp/src/muc/mod.rs @@ -1,14 +1,16 @@ #![allow(unused_variables)] -use quick_xml::events::{BytesEnd, BytesStart, Event}; -use quick_xml::name::ResolveResult; +use anyhow::{anyhow, Result}; +use quick_xml::events::attributes::Attribute; +use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event}; +use quick_xml::name::{QName, ResolveResult}; use crate::bind::Jid; use crate::xml::*; -use anyhow::{anyhow, Result}; pub const XMLNS: &'static str = "http://jabber.org/protocol/muc"; pub const XMLNS_USER: &'static str = "http://jabber.org/protocol/muc#user"; +pub const XMLNS_DELAY: &'static str = "urn:xmpp:delay"; #[derive(PartialEq, Eq, Debug, Default)] pub struct History { @@ -154,6 +156,7 @@ pub struct XUser { /// Code 201. The room from which the presence stanza was sent was just created. pub just_created: bool, } + impl ToXml for XUser { fn serialize(&self, output: &mut Vec>) { let mut tag = BytesStart::new("x"); @@ -180,6 +183,7 @@ pub struct XUserItem { pub jid: Jid, pub role: Role, } + impl ToXml for XUserItem { fn serialize(&self, output: &mut Vec>) { let mut meg = BytesStart::new("item"); @@ -198,6 +202,7 @@ pub enum Affiliation { Outcast, None, } + impl Affiliation { pub fn from_str(s: &str) -> Option { match s { @@ -228,6 +233,7 @@ pub enum Role { Visitor, None, } + impl Role { pub fn from_str(s: &str) -> Option { match s { @@ -249,9 +255,83 @@ impl Role { } } +#[derive(Debug, PartialEq, Eq)] +pub struct Delay { + pub from: Jid, + pub stamp: String, +} + +impl ToXml for Delay { + fn serialize(&self, events: &mut Vec) { + let mut tag = BytesStart::new("delay"); + tag.push_attribute(Attribute { + key: QName(b"xmlns"), + value: XMLNS_DELAY.as_bytes().into(), + }); + tag.push_attribute(Attribute { + key: QName(b"from"), + value: self.from.to_string().into_bytes().into(), + }); + tag.push_attribute(Attribute { + key: QName(b"stamp"), + value: self.stamp.as_bytes().into(), + }); + events.push(Event::Empty(tag)); + } +} + +/// Message-stanza of a historic message. +/// +/// Example: +/// ```xml +/// +/// +/// +/// +/// ``` +#[derive(Debug, PartialEq, Eq)] +pub struct XmppHistoryMessage { + pub id: String, + pub to: Jid, + pub from: Jid, + pub delay: Delay, + pub body: String, +} + +impl ToXml for XmppHistoryMessage { + fn serialize(&self, events: &mut Vec>) { + let mut message_tag = BytesStart::new("message"); + message_tag.push_attribute(Attribute { + key: QName(b"id"), + value: self.id.as_str().as_bytes().into(), + }); + message_tag.push_attribute(Attribute { + key: QName(b"to"), + value: self.to.to_string().into_bytes().into(), + }); + message_tag.push_attribute(Attribute { + key: QName(b"from"), + value: self.from.to_string().into_bytes().into(), + }); + message_tag.push_attribute(Attribute { + key: QName(b"type"), + value: b"groupchat".into(), + }); + events.push(Event::Start(message_tag)); + self.delay.serialize(events); + let body_tag = BytesStart::new("body"); + events.push(Event::Start(body_tag)); + events.push(Event::Text(BytesText::new(self.body.to_string().as_str()).into_owned())); + events.push(Event::End(BytesEnd::new("body"))); + events.push(Event::End(BytesEnd::new("message"))); + } +} + #[cfg(test)] mod test { use super::*; + use crate::bind::{Name, Resource, Server}; + use crate::testkit::assemble_string_from_event_flow; #[test] fn test_history_success_empty() { @@ -334,4 +414,40 @@ mod test { }; assert_eq!(res, expected); } + + #[test] + fn test_history_message_serialization() { + // Arrange + let history_message = XmppHistoryMessage { + id: "id".to_string(), + to: Jid { + name: Some(Name("sauer@example.com".into())), + server: Server("localhost".into()), + resource: Some(Resource("tester".into())), + }, + from: Jid { + name: Some(Name("pepe".into())), + server: Server("rooms.localhost".into()), + resource: Some(Resource("sauer".into())), + }, + delay: Delay { + from: Jid { + name: Some(Name("pepe".into())), + server: Server("rooms.localhost".into()), + resource: Some(Resource("tester".into())), + }, + stamp: "2021-10-10T10:10:10Z".to_string(), + }, + body: "Hello World.".to_string(), + }; + let mut events = vec![]; + let expected = r#"Hello World."#; + + // Act + history_message.serialize(&mut events); + let flow = assemble_string_from_event_flow(&events); + + // Assert + assert_eq!(flow, expected); + } } diff --git a/crates/proto-xmpp/src/testkit.rs b/crates/proto-xmpp/src/testkit.rs new file mode 100644 index 0000000..a40381f --- /dev/null +++ b/crates/proto-xmpp/src/testkit.rs @@ -0,0 +1,12 @@ +use quick_xml::events::Event; +use quick_xml::Writer; +use std::io::Cursor; + +pub fn assemble_string_from_event_flow(events: &Vec>) -> String { + let mut writer = Writer::new(Cursor::new(Vec::new())); + for event in events { + writer.write_event(event).unwrap(); + } + let result = writer.into_inner().into_inner(); + String::from_utf8(result).unwrap() +} diff --git a/docs/running.md b/docs/running.md index 74b4c60..53ffa84 100644 --- a/docs/running.md +++ b/docs/running.md @@ -67,3 +67,35 @@ Or you can build it and run manually: cargo build --release ./target/release/lavina --config config.toml + + +## Migrations + +### Prerequisites + +Install sqlx-cli into ~/.local/bin: + + cargo install --locked sqlx-cli + +### Steps + +Migrations run on every application start. For manual run, use sqlx: + + sqlx mig run \ + --source ./crates/lavina-core/migrations/ \ + --database-url sqlite://db.sqlite + +To see current status: + + sqlx mig info \ + --source ./crates/lavina-core/migrations/ \ + --database-url sqlite://db.sqlite + +sqlx mig info outputs + + 0/installed first + 1/installed msg author + 2/installed created at for messages + 3/installed dialogs + 4/installed new challenges + 5/pending message datetime diff --git a/src/http.rs b/src/http.rs index bd41b4a..af66758 100644 --- a/src/http.rs +++ b/src/http.rs @@ -164,7 +164,7 @@ async fn endpoint_send_room_message( let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.author_id) else { @@ -187,7 +187,7 @@ async fn endpoint_set_room_topic( let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.author_id) else { diff --git a/src/http/clustering.rs b/src/http/clustering.rs index 01731b3..07e82da 100644 --- a/src/http/clustering.rs +++ b/src/http/clustering.rs @@ -29,7 +29,7 @@ async fn endpoint_cluster_join_room( return Ok(malformed_request()); }; tracing::info!("Incoming request: {:?}", &req); - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { dbg!(&req.room_id); return Ok(room_not_found()); }; @@ -55,7 +55,7 @@ async fn endpoint_cluster_add_message( dbg!(&req.created_at); return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { dbg!(&req.room_id); return Ok(room_not_found()); };