From 8e7a01b567ecfe810023c2a30585b9ad4346d79e Mon Sep 17 00:00:00 2001 From: Mikhail Date: Thu, 23 May 2024 14:35:52 +0200 Subject: [PATCH] Dirty --- crates/lavina-core/src/player.rs | 34 +++++++++++- crates/lavina-core/src/repo/room.rs | 2 +- crates/lavina-core/src/repo/user.rs | 21 ++++++++ crates/lavina-core/src/room.rs | 31 ++++++++++- crates/projection-irc/src/lib.rs | 10 ++-- crates/projection-irc/tests/lib.rs | 4 +- crates/projection-xmpp/src/iq.rs | 2 +- crates/projection-xmpp/src/message.rs | 2 +- crates/projection-xmpp/src/presence.rs | 72 +++++++++++++++++--------- crates/proto-xmpp/src/muc/mod.rs | 16 +++++- src/http.rs | 4 +- src/http/clustering.rs | 4 +- 12 files changed, 160 insertions(+), 42 deletions(-) diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 292c220..9c217df 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -18,7 +18,7 @@ use tracing::{Instrument, Span}; use crate::clustering::room::*; use crate::prelude::*; -use crate::room::{RoomHandle, RoomId, RoomInfo}; +use crate::room::{HistoryMessage, RoomHandle, RoomId, RoomInfo}; use crate::table::{AnonTable, Key as AnonKey}; use crate::LavinaCore; @@ -111,6 +111,14 @@ impl PlayerConnection { Ok(deferred.await?) } + #[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")] + pub async fn get_room_message_history(&self, room_id: RoomId) -> Result<(Vec)> { + let (promise, deferred) = oneshot(); + let cmd = ClientCommand::GetRoomHistory { room_id, promise }; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + Ok(deferred.await?) + } + /// Handler in [Player::send_dialog_message]. #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_dialog_message")] pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> { @@ -212,6 +220,10 @@ pub enum ClientCommand { recipient: PlayerId, promise: Promise, }, + GetRoomHistory { + room_id: RoomId, + promise: Promise>, + }, } pub enum GetInfoResult { @@ -509,6 +521,11 @@ impl Player { let result = self.check_user_existence(recipient).await; let _ = promise.send(result); } + ClientCommand::GetRoomHistory { room_id, promise } => { + let result = self.get_room_history(room_id).await; + let _ = promise.send(result); + } + _ => {} } } @@ -557,6 +574,21 @@ impl Player { } } + #[tracing::instrument(skip(self), name = "Player::retrieve_room_history")] + async fn get_room_history(&mut self, room_id: RoomId) -> Vec { + let room = self.my_rooms.get(&room_id); + if let Some(room) = room { + match room { + RoomRef::Local(room) => room.get_message_history().await, + RoomRef::Remote { node_id } => { + todo!() + } + } + } else { + todo!() + } + } + #[tracing::instrument(skip(self), name = "Player::leave_room")] async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) { let room = self.my_rooms.remove(&room_id); diff --git a/crates/lavina-core/src/repo/room.rs b/crates/lavina-core/src/repo/room.rs index 3a07c6d..b63e71d 100644 --- a/crates/lavina-core/src/repo/room.rs +++ b/crates/lavina-core/src/repo/room.rs @@ -174,6 +174,6 @@ impl Storage { .fetch_all(&mut *executor) .await?; - res.into_iter().map(|(room_id,)| RoomId::from(room_id)).collect() + res.into_iter().map(|(room_id,)| RoomId::try_from(room_id)).collect() } } diff --git a/crates/lavina-core/src/repo/user.rs b/crates/lavina-core/src/repo/user.rs index a5471d0..720bf8e 100644 --- a/crates/lavina-core/src/repo/user.rs +++ b/crates/lavina-core/src/repo/user.rs @@ -96,6 +96,27 @@ impl Storage { Ok(res.map(|(id,)| id)) } + #[tracing::instrument(skip(self), name = "Storage::retrieve_user_by_id")] + pub async fn retrieve_user_by_id(&self, name: &str) -> Result> { + let mut executor = self.conn.lock().await; + let res: Option<(u32,)> = sqlx::query_as( + " + select + u.id, + u.name + from + users u + where + u.id = ?; + ", + ) + .bind(name) + .fetch_optional(&mut *executor) + .await?; + + Ok(res.map(|(id,)| id)) + } + #[tracing::instrument(skip(self), name = "Storage::create_or_retrieve_user_id_by_name")] pub async fn create_or_retrieve_user_id_by_name(&self, name: &str) -> Result { let mut executor = self.conn.lock().await; diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index bd26967..a9af94b 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -16,7 +16,7 @@ use crate::Services; pub struct RoomId(Str); impl RoomId { - pub fn from(str: impl Into) -> Result { + pub fn try_from(str: impl Into) -> Result { let bytes = str.into(); if bytes.len() > 32 { return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); @@ -158,6 +158,23 @@ impl RoomHandle { lock.broadcast_update(update, player_id).await; } + pub async fn get_message_history(&self) -> Vec { + return vec![ + HistoryMessage { + id: "kek".to_string(), + author_id: 1, + body: "Willkom' in Brem'".to_string(), + created_at: Utc::now(), + }, + HistoryMessage { + id: "kek".to_string(), + author_id: 1, + body: "Willkom' in Hamburg".to_string(), + created_at: Utc::now(), + }, + ]; + } + #[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")] pub async fn unsubscribe(&self, player_id: &PlayerId) { let mut lock = self.0.write().await; @@ -279,3 +296,15 @@ pub struct RoomInfo { pub members: Vec, pub topic: Str, } + +pub struct User { + pub id: u32, + pub name: String, +} + +pub(crate) struct HistoryMessage { + pub id: String, + pub author: User, + pub body: String, + pub created_at: DateTime, +} diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 0bf77fb..fe8dbe5 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -720,7 +720,7 @@ async fn handle_incoming_message( } ClientMessage::PrivateMessage { recipient, body } => match recipient { Recipient::Chan(Chan::Global(chan)) => { - let room_id = RoomId::from(chan)?; + let room_id = RoomId::try_from(chan)?; user_handle.send_message(room_id, body).await?; } Recipient::Nick(nick) => { @@ -732,7 +732,7 @@ async fn handle_incoming_message( ClientMessage::Topic { chan, topic } => { match chan { Chan::Global(chan) => { - let room_id = RoomId::from(chan)?; + let room_id = RoomId::try_from(chan)?; user_handle.change_topic(room_id.clone(), topic.clone()).await?; ServerMessage { tags: vec![], @@ -774,7 +774,7 @@ async fn handle_incoming_message( writer.flush().await?; } Recipient::Chan(Chan::Global(chan)) => { - let room = core.get_room(&RoomId::from(chan.clone())?).await; + let room = core.get_room(&RoomId::try_from(chan.clone())?).await; if let Some(room) = room { let room_info = room.get_room_info().await; for member in room_info.members { @@ -893,7 +893,7 @@ async fn handle_join( ) -> Result<()> { match chan { Chan::Global(chan_name) => { - let room_id = RoomId::from(chan_name.clone())?; + let room_id = RoomId::try_from(chan_name.clone())?; if let JoinResult::Success(room_info) = user_handle.join_room(room_id).await? { produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?; } else { @@ -924,7 +924,7 @@ async fn handle_part( writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { if let Chan::Global(chan_name) = chan { - let room_id = RoomId::from(chan_name.clone())?; + let room_id = RoomId::try_from(chan_name.clone())?; user_handle.leave_room(room_id).await?; ServerMessage { tags: vec![], diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs index 40a8c38..c94c25b 100644 --- a/crates/projection-irc/tests/lib.rs +++ b/crates/projection-irc/tests/lib.rs @@ -619,14 +619,14 @@ async fn server_time_capability() -> Result<()> { server.core.create_player(&PlayerId::from("some_guy")?).await?; let mut conn = server.core.connect_to_player(&PlayerId::from("some_guy").unwrap()).await; - let res = conn.join_room(RoomId::from("test").unwrap()).await?; + let res = conn.join_room(RoomId::try_from("test").unwrap()).await?; let JoinResult::Success(_) = res else { panic!("Failed to join room"); }; s.expect(":some_guy JOIN #test").await?; - let SendMessageResult::Success(res) = conn.send_message(RoomId::from("test").unwrap(), "Hello".into()).await? + let SendMessageResult::Success(res) = conn.send_message(RoomId::try_from("test").unwrap(), "Hello".into()).await? else { panic!("Failed to send message"); }; diff --git a/crates/projection-xmpp/src/iq.rs b/crates/projection-xmpp/src/iq.rs index a4cfdcc..d16678f 100644 --- a/crates/projection-xmpp/src/iq.rs +++ b/crates/projection-xmpp/src/iq.rs @@ -163,7 +163,7 @@ impl<'a> XmppConnection<'a> { server, resource: None, }) if server.0 == self.hostname_rooms => { - let room_id = RoomId::from(room_name.0.clone()).unwrap(); + let room_id = RoomId::try_from(room_name.0.clone()).unwrap(); let Some(_) = self.core.get_room(&room_id).await else { // TODO should return item-not-found // example: diff --git a/crates/projection-xmpp/src/message.rs b/crates/projection-xmpp/src/message.rs index 15a3e0d..928d4c0 100644 --- a/crates/projection-xmpp/src/message.rs +++ b/crates/projection-xmpp/src/message.rs @@ -20,7 +20,7 @@ impl<'a> XmppConnection<'a> { }) = m.to { if server.0.as_ref() == &*self.hostname_rooms && m.r#type == MessageType::Groupchat { - self.user_handle.send_message(RoomId::from(name.0.clone())?, m.body.clone().into()).await?; + self.user_handle.send_message(RoomId::try_from(name.0.clone())?, m.body.clone().into()).await?; Message::<()> { to: Some(Jid { name: Some(self.user.xmpp_name.clone()), diff --git a/crates/projection-xmpp/src/presence.rs b/crates/projection-xmpp/src/presence.rs index bd41c52..d71d6c8 100644 --- a/crates/projection-xmpp/src/presence.rs +++ b/crates/projection-xmpp/src/presence.rs @@ -6,7 +6,7 @@ use quick_xml::events::Event; use lavina_core::room::RoomId; use proto_xmpp::bind::{Jid, Name, Resource, Server}; use proto_xmpp::client::Presence; -use proto_xmpp::muc::{Delay, HistoryMessage, XUser}; +use proto_xmpp::muc::{Delay, XmppHistoryMessage, XUser}; use proto_xmpp::xml::{Ignore, ToXml}; use crate::XmppConnection; @@ -64,7 +64,7 @@ impl<'a> XmppConnection<'a> { } async fn muc_presence(&mut self, name: &Name) -> Result<(Presence)> { - let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?; + let a = self.user_handle.join_room(RoomId::try_from(name.0.clone())?).await?; // TODO handle bans let response = Presence { to: Some(Jid { @@ -82,29 +82,53 @@ impl<'a> XmppConnection<'a> { }; Ok(response) } - async fn send_history_on_join(&self) -> Result<(HistoryMessage)> { - Ok(HistoryMessage { - id: "kek".to_string(), - to: Jid { - name: Some(Name("sauer@oflor.me".into())), - server: Server("localhost".into()), - resource: Some(Resource("tester".into())), - }, - from: Jid { - name: Some(Name("pepe".into())), - server: Server("rooms.localhost".into()), - resource: Some(Resource("sauer".into())), - }, - delay: Delay::new( - Jid { - name: Some(Name("pepe".into())), - server: Server("rooms.localhost".into()), - resource: Some(Resource("tester".into())), + async fn send_history_on_join(&self) -> Result<(Vec)> { + let room_id = RoomId::try_from("kek1337")?; + let history_messages = self.user_handle.get_room_message_history(room_id).await?; + let mut response = vec![]; + + // todo: get author + + for history_message in history_messages.into_iter() { + self.user_handle. + + let user = self.core.get_user(&history_message.author_id).await?; + + response.push(XmppHistoryMessage { + id: history_message.id, + to: Jid { + name: history_message.author_id, + server: Server(), + resource: None, }, - "2021-10-10T10:10:10Z".to_string(), - ), - body: "Vasya Pupkin says hello.".to_string(), - }) + from: Jid {}, + delay: (), + body: "".to_string(), + }); + } + + // Ok(HistoryMessage { + // id: "kek".to_string(), + // to: Jid { + // name: Some(Name("sauer@oflor.me".into())), + // server: Server("localhost".into()), + // resource: Some(Resource("tester".into())), + // }, + // from: Jid { + // name: Some(Name("pepe".into())), + // server: Server("rooms.localhost".into()), + // resource: Some(Resource("sauer".into())), + // }, + // delay: Delay::new( + // Jid { + // name: Some(Name("pepe".into())), + // server: Server("rooms.localhost".into()), + // resource: Some(Resource("tester".into())), + // }, + // "2021-10-10T10:10:10Z".to_string(), + // ), + // body: , + // }) } } diff --git a/crates/proto-xmpp/src/muc/mod.rs b/crates/proto-xmpp/src/muc/mod.rs index 32ea326..238a4ae 100644 --- a/crates/proto-xmpp/src/muc/mod.rs +++ b/crates/proto-xmpp/src/muc/mod.rs @@ -175,7 +175,7 @@ impl Delay { } #[derive(Debug, PartialEq, Eq)] -pub struct HistoryMessage { +pub struct XmppHistoryMessage { pub id: String, pub to: Jid, pub from: Jid, @@ -183,7 +183,19 @@ pub struct HistoryMessage { pub body: String, } -impl ToXml for HistoryMessage { +impl From for XmppHistoryMessage { + fn from(msg: HistoryMessage) -> Self { + Self { + id: msg.id, + to: msg.to, + from: msg.from, + delay: Delay::new(msg.from, msg.stamp), + body: msg.body, + } + } +} + +impl ToXml for XmppHistoryMessage { fn serialize(&self, events: &mut Vec>) { let mut tag = BytesStart::new("message"); tag.push_attribute(Attribute { diff --git a/src/http.rs b/src/http.rs index bd41b4a..af66758 100644 --- a/src/http.rs +++ b/src/http.rs @@ -164,7 +164,7 @@ async fn endpoint_send_room_message( let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.author_id) else { @@ -187,7 +187,7 @@ async fn endpoint_set_room_topic( let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.author_id) else { diff --git a/src/http/clustering.rs b/src/http/clustering.rs index 01731b3..07e82da 100644 --- a/src/http/clustering.rs +++ b/src/http/clustering.rs @@ -29,7 +29,7 @@ async fn endpoint_cluster_join_room( return Ok(malformed_request()); }; tracing::info!("Incoming request: {:?}", &req); - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { dbg!(&req.room_id); return Ok(room_not_found()); }; @@ -55,7 +55,7 @@ async fn endpoint_cluster_add_message( dbg!(&req.created_at); return Ok(malformed_request()); }; - let Ok(room_id) = RoomId::from(req.room_id) else { + let Ok(room_id) = RoomId::try_from(req.room_id) else { dbg!(&req.room_id); return Ok(room_not_found()); };