From 59528909c76d8b69e5d02f4a6564f34e31e6397b Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Wed, 5 Jun 2024 02:04:11 +0200 Subject: [PATCH] core: ADT for results of room history queries --- crates/lavina-core/src/player.rs | 29 ++++++++++----- crates/projection-irc/src/lib.rs | 51 ++++++++++++++------------ crates/projection-xmpp/src/presence.rs | 10 ++++- 3 files changed, 55 insertions(+), 35 deletions(-) diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index fb0fd1e..2493e8d 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -9,6 +9,7 @@ //! so that they don't overload the room actor. use std::collections::{HashMap, HashSet}; +use anyhow::anyhow; use chrono::{DateTime, Utc}; use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; @@ -112,7 +113,7 @@ impl PlayerConnection { } #[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")] - pub async fn get_room_message_history(&self, room_id: &RoomId, limit: u32) -> Result> { + pub async fn get_room_message_history(&self, room_id: &RoomId, limit: u32) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::GetRoomHistory { room_id: room_id.clone(), @@ -226,8 +227,8 @@ pub enum ClientCommand { }, GetRoomHistory { room_id: RoomId, - promise: Promise>>, limit: u32, + promise: Promise>, }, } @@ -252,6 +253,11 @@ pub enum SendMessageResult { NoSuchRoom, } +pub enum RoomHistoryResult { + Success(Vec), + NoSuchRoom, +} + /// Player update event type which is sent to a player actor and from there to a connection handler. #[derive(Clone, Debug)] pub enum Updates { @@ -544,8 +550,8 @@ impl Player { } ClientCommand::GetRoomHistory { room_id, - promise, limit, + promise, } => { let result = self.get_room_history(room_id, limit).await; let _ = promise.send(result); @@ -598,19 +604,22 @@ impl Player { } #[tracing::instrument(skip(self), name = "Player::retrieve_room_history")] - async fn get_room_history(&mut self, room_id: RoomId, limit: u32) -> Result> { + async fn get_room_history(&mut self, room_id: RoomId, limit: u32) -> Result { let room = self.my_rooms.get(&room_id); if let Some(room) = room { match room { - RoomRef::Local(room) => room.get_message_history(&self.services, limit).await, - RoomRef::Remote { node_id } => { - todo!() + RoomRef::Local(room) => { + let res = room.get_message_history(&self.services, limit).await?; + Ok(RoomHistoryResult::Success(res)) + } + RoomRef::Remote { node_id: _ } => { + tracing::error!("TODO Room #{room_id:?} is remote, cannot retrieve history"); + Err(anyhow!("Not implemented")) } } } else { - tracing::error!("Room with ID {room_id:?} not found"); - // todo: return error - todo!() + tracing::debug!("Room #{room_id:?} not found"); + Ok(RoomHistoryResult::NoSuchRoom) } } diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 0061744..0dbfda0 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -882,32 +882,35 @@ async fn handle_incoming_message( // TODO Respond with an error when a local channel is requested Chan::Local(chan) => chan, }; - let room = core.get_room(&RoomId::try_from(channel_name.clone())?).await?; - // TODO Handle non-existent room - if let Some(room) = room { - let room_id = &RoomId::try_from(channel_name.clone())?; - let messages = user_handle.get_room_message_history(room_id, limit).await?; - for message in messages { - let mut tags = vec![]; - if user.enabled_capabilities.contains(Capabilities::ServerTime) { - let tag = Tag { - key: "time".into(), - value: Some(message.created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()), - }; - tags.push(tag); + let room_id = &RoomId::try_from(channel_name.clone())?; + let res = user_handle.get_room_message_history(room_id, limit).await?; + match res { + RoomHistoryResult::Success(messages) => { + for message in messages { + let mut tags = vec![]; + if user.enabled_capabilities.contains(Capabilities::ServerTime) { + let tag = Tag { + key: "time".into(), + value: Some( + message.created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into(), + ), + }; + tags.push(tag); + } + ServerMessage { + tags, + sender: Some(message.author_name.into()), + body: ServerMessageBody::PrivateMessage { + target: Recipient::Chan(chan.clone()), + body: message.content.into(), + }, + } + .write_async(writer) + .await?; } - ServerMessage { - tags, - sender: Some(message.author_name.into()), - body: ServerMessageBody::PrivateMessage { - target: Recipient::Chan(chan.clone()), - body: message.content.into(), - }, - } - .write_async(writer) - .await?; + writer.flush().await?; } - writer.flush().await?; + RoomHistoryResult::NoSuchRoom => {} } } else { log::warn!( diff --git a/crates/projection-xmpp/src/presence.rs b/crates/projection-xmpp/src/presence.rs index aa9f972..4e12dc8 100644 --- a/crates/projection-xmpp/src/presence.rs +++ b/crates/projection-xmpp/src/presence.rs @@ -3,6 +3,7 @@ use anyhow::Result; use quick_xml::events::Event; +use lavina_core::player::RoomHistoryResult; use lavina_core::room::RoomId; use proto_xmpp::bind::{Jid, Name, Resource, Server}; use proto_xmpp::client::{Message, MessageType, Presence, Subject}; @@ -180,8 +181,15 @@ impl<'a> XmppConnection<'a> { async fn retrieve_message_history(&self, room_name: &Name) -> Result> { let room_id = RoomId::try_from(room_name.0.clone())?; let history_messages = self.user_handle.get_room_message_history(&room_id, 50).await?; - let mut response = vec![]; + let history_messages = match history_messages { + RoomHistoryResult::Success(messages) => messages, + RoomHistoryResult::NoSuchRoom => { + tracing::warn!("No room found during history retrieval on join"); + return Ok(vec![]); + } + }; + let mut response = vec![]; for history_message in history_messages.into_iter() { response.push(XmppHistoryMessage { id: history_message.id.to_string(),