diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 292c220..ce79edd 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -18,7 +18,7 @@ use tracing::{Instrument, Span}; use crate::clustering::room::*; use crate::prelude::*; -use crate::room::{RoomHandle, RoomId, RoomInfo}; +use crate::room::{HistoryMessage, RoomHandle, RoomId, RoomInfo}; use crate::table::{AnonTable, Key as AnonKey}; use crate::LavinaCore; @@ -111,6 +111,14 @@ impl PlayerConnection { Ok(deferred.await?) } + #[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")] + pub async fn get_room_message_history(&self, room_id: RoomId) -> Result<(Vec)> { + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::GetRoomHistory { room_id, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) + } + /// Handler in [Player::send_dialog_message]. #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_dialog_message")] pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> { @@ -212,6 +220,10 @@ pub enum ClientCommand { recipient: PlayerId, promise: Promise, }, + GetRoomHistory { + room_id: RoomId, + promise: Promise>, + }, } pub enum GetInfoResult { @@ -509,6 +521,11 @@ impl Player { let result = self.check_user_existence(recipient).await; let _ = promise.send(result); } + ClientCommand::GetRoomHistory { room_id, promise } => { + let result = self.get_room_history(room_id).await; + let _ = promise.send(result); + } + _ => {} } } @@ -557,6 +574,23 @@ impl Player { } } + #[tracing::instrument(skip(self), name = "Player::retrieve_room_history")] + async fn get_room_history(&mut self, room_id: RoomId) -> Vec { + let room = self.my_rooms.get(&room_id); + if let Some(room) = room { + match room { + RoomRef::Local(room) => room.get_message_history().await, + RoomRef::Remote { node_id } => { + todo!() + } + } + } else { + tracing::info!("Room with ID {room_id:?} not found"); + // todo: return error + todo!() + } + } + #[tracing::instrument(skip(self), name = "Player::leave_room")] async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) { let room = self.my_rooms.remove(&room_id); @@ -593,7 +627,7 @@ impl Player { body: Str, ) -> SendMessageResult { let Some(room) = self.my_rooms.get(&room_id) else { - tracing::info!("no room found"); + tracing::info!("Room with ID {room_id:?} not found"); return SendMessageResult::NoSuchRoom; }; let created_at = Utc::now(); @@ -632,7 +666,7 @@ impl Player { #[tracing::instrument(skip(self, new_topic), name = "Player::change_room_topic")] async fn change_room_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { let Some(room) = self.my_rooms.get(&room_id) else { - tracing::info!("no room found"); + tracing::info!("Room with ID {room_id:?} not found"); return; }; match room { diff --git a/crates/lavina-core/src/repo/room.rs b/crates/lavina-core/src/repo/room.rs index 3a07c6d..b63e71d 100644 --- a/crates/lavina-core/src/repo/room.rs +++ b/crates/lavina-core/src/repo/room.rs @@ -174,6 +174,6 @@ impl Storage { .fetch_all(&mut *executor) .await?; - res.into_iter().map(|(room_id,)| RoomId::from(room_id)).collect() + res.into_iter().map(|(room_id,)| RoomId::try_from(room_id)).collect() } } diff --git a/crates/lavina-core/src/repo/user.rs b/crates/lavina-core/src/repo/user.rs index a5471d0..720bf8e 100644 --- a/crates/lavina-core/src/repo/user.rs +++ b/crates/lavina-core/src/repo/user.rs @@ -96,6 +96,27 @@ impl Storage { Ok(res.map(|(id,)| id)) } + #[tracing::instrument(skip(self), name = "Storage::retrieve_user_by_id")] + pub async fn retrieve_user_by_id(&self, name: &str) -> Result> { + let mut executor = self.conn.lock().await; + let res: Option<(u32,)> = sqlx::query_as( + " + select + u.id, + u.name + from + users u + where + u.id = ?; + ", + ) + .bind(name) + .fetch_optional(&mut *executor) + .await?; + + Ok(res.map(|(id,)| id)) + } + #[tracing::instrument(skip(self), name = "Storage::create_or_retrieve_user_id_by_name")] pub async fn create_or_retrieve_user_id_by_name(&self, name: &str) -> Result { let mut executor = self.conn.lock().await; diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index bd26967..c567752 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -16,7 +16,7 @@ use crate::Services; pub struct RoomId(Str); impl RoomId { - pub fn from(str: impl Into) -> Result { + pub fn try_from(str: impl Into) -> Result { let bytes = str.into(); if bytes.len() > 32 { return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); @@ -158,6 +158,29 @@ impl RoomHandle { lock.broadcast_update(update, player_id).await; } + pub async fn get_message_history(&self) -> Vec { + return vec![ + HistoryMessage { + id: "kek0".to_string(), + body: "Willkommen in Brem'".to_string(), + created_at: Utc::now(), + author: User { + id: 0, + name: "sauer".to_string(), + }, + }, + HistoryMessage { + id: "kek1".to_string(), + body: "Willkommen in Hamburg".to_string(), + created_at: Utc::now(), + author: User { + id: 0, + name: "sauer".to_string(), + }, + }, + ]; + } + #[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")] pub async fn unsubscribe(&self, player_id: &PlayerId) { let mut lock = self.0.write().await; @@ -279,3 +302,15 @@ pub struct RoomInfo { pub members: Vec, pub topic: Str, } + +pub struct User { + pub id: u32, + pub name: String, +} + +pub struct HistoryMessage { + pub id: String, + pub author: User, + pub body: String, + pub created_at: DateTime, +} diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 0bf77fb..fe8dbe5 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -720,7 +720,7 @@ async fn handle_incoming_message( } ClientMessage::PrivateMessage { recipient, body } => match recipient { Recipient::Chan(Chan::Global(chan)) => { - let room_id = RoomId::from(chan)?; + let room_id = RoomId::try_from(chan)?; user_handle.send_message(room_id, body).await?; } Recipient::Nick(nick) => { @@ -732,7 +732,7 @@ async fn handle_incoming_message( ClientMessage::Topic { chan, topic } => { match chan { Chan::Global(chan) => { - let room_id = RoomId::from(chan)?; + let room_id = RoomId::try_from(chan)?; user_handle.change_topic(room_id.clone(), topic.clone()).await?; ServerMessage { tags: vec![], @@ -774,7 +774,7 @@ async fn handle_incoming_message( writer.flush().await?; } Recipient::Chan(Chan::Global(chan)) => { - let room = core.get_room(&RoomId::from(chan.clone())?).await; + let room = core.get_room(&RoomId::try_from(chan.clone())?).await; if let Some(room) = room { let room_info = room.get_room_info().await; for member in room_info.members { @@ -893,7 +893,7 @@ async fn handle_join( ) -> Result<()> { match chan { Chan::Global(chan_name) => { - let room_id = RoomId::from(chan_name.clone())?; + let room_id = RoomId::try_from(chan_name.clone())?; if let JoinResult::Success(room_info) = user_handle.join_room(room_id).await? { produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?; } else { @@ -924,7 +924,7 @@ async fn handle_part( writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { if let Chan::Global(chan_name) = chan { - let room_id = RoomId::from(chan_name.clone())?; + let room_id = RoomId::try_from(chan_name.clone())?; user_handle.leave_room(room_id).await?; ServerMessage { tags: vec![], diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs index 40a8c38..c94c25b 100644 --- a/crates/projection-irc/tests/lib.rs +++ b/crates/projection-irc/tests/lib.rs @@ -619,14 +619,14 @@ async fn server_time_capability() -> Result<()> { server.core.create_player(&PlayerId::from("some_guy")?).await?; let mut conn = server.core.connect_to_player(&PlayerId::from("some_guy").unwrap()).await; - let res = conn.join_room(RoomId::from("test").unwrap()).await?; + let res = conn.join_room(RoomId::try_from("test").unwrap()).await?; let JoinResult::Success(_) = res else { panic!("Failed to join room"); }; s.expect(":some_guy JOIN #test").await?; - let SendMessageResult::Success(res) = conn.send_message(RoomId::from("test").unwrap(), "Hello".into()).await? + let SendMessageResult::Success(res) = conn.send_message(RoomId::try_from("test").unwrap(), "Hello".into()).await? else { panic!("Failed to send message"); }; diff --git a/crates/projection-xmpp/src/iq.rs b/crates/projection-xmpp/src/iq.rs index a4cfdcc..d16678f 100644 --- a/crates/projection-xmpp/src/iq.rs +++ b/crates/projection-xmpp/src/iq.rs @@ -163,7 +163,7 @@ impl<'a> XmppConnection<'a> { server, resource: None, }) if server.0 == self.hostname_rooms => { - let room_id = RoomId::from(room_name.0.clone()).unwrap(); + let room_id = RoomId::try_from(room_name.0.clone()).unwrap(); let Some(_) = self.core.get_room(&room_id).await else { // TODO should return item-not-found // example: diff --git a/crates/projection-xmpp/src/message.rs b/crates/projection-xmpp/src/message.rs index 15a3e0d..928d4c0 100644 --- a/crates/projection-xmpp/src/message.rs +++ b/crates/projection-xmpp/src/message.rs @@ -20,7 +20,7 @@ impl<'a> XmppConnection<'a> { }) = m.to { if server.0.as_ref() == &*self.hostname_rooms && m.r#type == MessageType::Groupchat { - self.user_handle.send_message(RoomId::from(name.0.clone())?, m.body.clone().into()).await?; + self.user_handle.send_message(RoomId::try_from(name.0.clone())?, m.body.clone().into()).await?; Message::<()> { to: Some(Jid { name: Some(self.user.xmpp_name.clone()), diff --git a/crates/projection-xmpp/src/presence.rs b/crates/projection-xmpp/src/presence.rs index bd41c52..24eeed6 100644 --- a/crates/projection-xmpp/src/presence.rs +++ b/crates/projection-xmpp/src/presence.rs @@ -2,11 +2,12 @@ use anyhow::Result; use quick_xml::events::Event; +use serde::Serialize; use lavina_core::room::RoomId; use proto_xmpp::bind::{Jid, Name, Resource, Server}; use proto_xmpp::client::Presence; -use proto_xmpp::muc::{Delay, HistoryMessage, XUser}; +use proto_xmpp::muc::{Delay, XUser, XmppHistoryMessage}; use proto_xmpp::xml::{Ignore, ToXml}; use crate::XmppConnection; @@ -26,8 +27,11 @@ impl<'a> XmppConnection<'a> { let muc_join_response = self.muc_presence(&name).await?; muc_join_response.serialize(output); - let history_on_join_response = self.send_history_on_join().await?; - history_on_join_response.serialize(output) + let history_on_join_response = self.send_history_on_join(&name).await?; + + for message in history_on_join_response { + message.serialize(output) + } } _ => { // TODO other presence cases @@ -64,7 +68,7 @@ impl<'a> XmppConnection<'a> { } async fn muc_presence(&mut self, name: &Name) -> Result<(Presence)> { - let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?; + let a = self.user_handle.join_room(RoomId::try_from(name.0.clone())?).await?; // TODO handle bans let response = Presence { to: Some(Jid { @@ -82,29 +86,63 @@ impl<'a> XmppConnection<'a> { }; Ok(response) } - async fn send_history_on_join(&self) -> Result<(HistoryMessage)> { - Ok(HistoryMessage { - id: "kek".to_string(), - to: Jid { - name: Some(Name("sauer@oflor.me".into())), - server: Server("localhost".into()), - resource: Some(Resource("tester".into())), - }, - from: Jid { - name: Some(Name("pepe".into())), - server: Server("rooms.localhost".into()), - resource: Some(Resource("sauer".into())), - }, - delay: Delay::new( - Jid { - name: Some(Name("pepe".into())), - server: Server("rooms.localhost".into()), - resource: Some(Resource("tester".into())), + async fn send_history_on_join(&self, room_name: &Name) -> Result<(Vec)> { + let room_id = RoomId::try_from(room_name.0.clone())?; + let history_messages = self.user_handle.get_room_message_history(room_id).await?; + let mut response = vec![]; + + for history_message in history_messages.into_iter() { + let author_name = Option::from(Name(history_message.author.name.into())); + tracing::info!("author_name: {:?}", author_name); + + let author_jid = Jid { + name: author_name.clone(), + server: Server(self.hostname_rooms.clone()), + resource: None, // Option::from(Resource(room_name.0.clone())), + }; + + response.push(XmppHistoryMessage { + id: history_message.id, + // sauer@localhost/sauer + to: Jid { + name: Option::from(Name(self.user.xmpp_muc_name.0.clone().into())), // Option::from(Name("chelik".into())), + server: Server(self.hostname.clone()), + resource: None, // Option::from(Resource("sauer".into())), // Option::from(Resource(room_name.0.clone())), }, - "2021-10-10T10:10:10Z".to_string(), - ), - body: "Vasya Pupkin says hello.".to_string(), - }) + from: Jid { + name: Option::from(room_name.clone()), // Option::from(Name("chelik".into())), + server: Server(self.hostname_rooms.clone()), + resource: Option::from(Resource("sauer".into())), // Option::from(Resource(room_name.0.clone())), + }, + delay: Delay::new(author_jid.clone(), history_message.created_at.to_rfc3339()), + body: history_message.body, + }); + } + + return Ok(response); + + // Ok(HistoryMessage { + // id: "kek".to_string(), + // to: Jid { + // name: Some(Name("sauer@oflor.me".into())), + // server: Server("localhost".into()), + // resource: Some(Resource("tester".into())), + // }, + // from: Jid { + // name: Some(Name("pepe".into())), + // server: Server("rooms.localhost".into()), + // resource: Some(Resource("sauer".into())), + // }, + // delay: Delay::new( + // Jid { + // name: Some(Name("pepe".into())), + // server: Server("rooms.localhost".into()), + // resource: Some(Resource("tester".into())), + // }, + // "2021-10-10T10:10:10Z".to_string(), + // ), + // body: , + // }) } } diff --git a/crates/proto-xmpp/src/muc/mod.rs b/crates/proto-xmpp/src/muc/mod.rs index 32ea326..46f8947 100644 --- a/crates/proto-xmpp/src/muc/mod.rs +++ b/crates/proto-xmpp/src/muc/mod.rs @@ -175,7 +175,7 @@ impl Delay { } #[derive(Debug, PartialEq, Eq)] -pub struct HistoryMessage { +pub struct XmppHistoryMessage { pub id: String, pub to: Jid, pub from: Jid, @@ -183,7 +183,7 @@ pub struct HistoryMessage { pub body: String, } -impl ToXml for HistoryMessage { +impl ToXml for XmppHistoryMessage { fn serialize(&self, events: &mut Vec>) { let mut tag = BytesStart::new("message"); tag.push_attribute(Attribute { diff --git a/src/http.rs b/src/http.rs index bd41b4a..af66758 100644 --- a/src/http.rs +++ b/src/http.rs @@ -164,7 +164,7 @@ async fn endpoint_send_room_message( let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.author_id) else { @@ -187,7 +187,7 @@ async fn endpoint_set_room_topic( let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.author_id) else { diff --git a/src/http/clustering.rs b/src/http/clustering.rs index 01731b3..07e82da 100644 --- a/src/http/clustering.rs +++ b/src/http/clustering.rs @@ -29,7 +29,7 @@ async fn endpoint_cluster_join_room( return Ok(malformed_request()); }; tracing::info!("Incoming request: {:?}", &req); - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { dbg!(&req.room_id); return Ok(room_not_found()); }; @@ -55,7 +55,7 @@ async fn endpoint_cluster_add_message( dbg!(&req.created_at); return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { dbg!(&req.room_id); return Ok(room_not_found()); };