core: ADT for results of room history queries

This commit is contained in:
Nikita Vilunov 2024-06-05 02:04:11 +02:00
parent f3cd794431
commit 59528909c7
3 changed files with 55 additions and 35 deletions

View File

@ -9,6 +9,7 @@
//! so that they don't overload the room actor. //! so that they don't overload the room actor.
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use anyhow::anyhow;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use prometheus::{IntGauge, Registry as MetricsRegistry}; use prometheus::{IntGauge, Registry as MetricsRegistry};
use serde::Serialize; use serde::Serialize;
@ -112,7 +113,7 @@ impl PlayerConnection {
} }
#[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")] #[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")]
pub async fn get_room_message_history(&self, room_id: &RoomId, limit: u32) -> Result<Vec<StoredMessage>> { pub async fn get_room_message_history(&self, room_id: &RoomId, limit: u32) -> Result<RoomHistoryResult> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::GetRoomHistory { let cmd = ClientCommand::GetRoomHistory {
room_id: room_id.clone(), room_id: room_id.clone(),
@ -226,8 +227,8 @@ pub enum ClientCommand {
}, },
GetRoomHistory { GetRoomHistory {
room_id: RoomId, room_id: RoomId,
promise: Promise<Result<Vec<StoredMessage>>>,
limit: u32, limit: u32,
promise: Promise<Result<RoomHistoryResult>>,
}, },
} }
@ -252,6 +253,11 @@ pub enum SendMessageResult {
NoSuchRoom, NoSuchRoom,
} }
pub enum RoomHistoryResult {
Success(Vec<StoredMessage>),
NoSuchRoom,
}
/// Player update event type which is sent to a player actor and from there to a connection handler. /// Player update event type which is sent to a player actor and from there to a connection handler.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Updates { pub enum Updates {
@ -544,8 +550,8 @@ impl Player {
} }
ClientCommand::GetRoomHistory { ClientCommand::GetRoomHistory {
room_id, room_id,
promise,
limit, limit,
promise,
} => { } => {
let result = self.get_room_history(room_id, limit).await; let result = self.get_room_history(room_id, limit).await;
let _ = promise.send(result); let _ = promise.send(result);
@ -598,19 +604,22 @@ impl Player {
} }
#[tracing::instrument(skip(self), name = "Player::retrieve_room_history")] #[tracing::instrument(skip(self), name = "Player::retrieve_room_history")]
async fn get_room_history(&mut self, room_id: RoomId, limit: u32) -> Result<Vec<StoredMessage>> { async fn get_room_history(&mut self, room_id: RoomId, limit: u32) -> Result<RoomHistoryResult> {
let room = self.my_rooms.get(&room_id); let room = self.my_rooms.get(&room_id);
if let Some(room) = room { if let Some(room) = room {
match room { match room {
RoomRef::Local(room) => room.get_message_history(&self.services, limit).await, RoomRef::Local(room) => {
RoomRef::Remote { node_id } => { let res = room.get_message_history(&self.services, limit).await?;
todo!() Ok(RoomHistoryResult::Success(res))
}
RoomRef::Remote { node_id: _ } => {
tracing::error!("TODO Room #{room_id:?} is remote, cannot retrieve history");
Err(anyhow!("Not implemented"))
} }
} }
} else { } else {
tracing::error!("Room with ID {room_id:?} not found"); tracing::debug!("Room #{room_id:?} not found");
// todo: return error Ok(RoomHistoryResult::NoSuchRoom)
todo!()
} }
} }

View File

@ -882,17 +882,18 @@ async fn handle_incoming_message(
// TODO Respond with an error when a local channel is requested // TODO Respond with an error when a local channel is requested
Chan::Local(chan) => chan, Chan::Local(chan) => chan,
}; };
let room = core.get_room(&RoomId::try_from(channel_name.clone())?).await?;
// TODO Handle non-existent room
if let Some(room) = room {
let room_id = &RoomId::try_from(channel_name.clone())?; let room_id = &RoomId::try_from(channel_name.clone())?;
let messages = user_handle.get_room_message_history(room_id, limit).await?; let res = user_handle.get_room_message_history(room_id, limit).await?;
match res {
RoomHistoryResult::Success(messages) => {
for message in messages { for message in messages {
let mut tags = vec![]; let mut tags = vec![];
if user.enabled_capabilities.contains(Capabilities::ServerTime) { if user.enabled_capabilities.contains(Capabilities::ServerTime) {
let tag = Tag { let tag = Tag {
key: "time".into(), key: "time".into(),
value: Some(message.created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()), value: Some(
message.created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into(),
),
}; };
tags.push(tag); tags.push(tag);
} }
@ -909,6 +910,8 @@ async fn handle_incoming_message(
} }
writer.flush().await?; writer.flush().await?;
} }
RoomHistoryResult::NoSuchRoom => {}
}
} else { } else {
log::warn!( log::warn!(
"Requested chat history for user {user:?} even though the capability was not negotiated" "Requested chat history for user {user:?} even though the capability was not negotiated"

View File

@ -3,6 +3,7 @@
use anyhow::Result; use anyhow::Result;
use quick_xml::events::Event; use quick_xml::events::Event;
use lavina_core::player::RoomHistoryResult;
use lavina_core::room::RoomId; use lavina_core::room::RoomId;
use proto_xmpp::bind::{Jid, Name, Resource, Server}; use proto_xmpp::bind::{Jid, Name, Resource, Server};
use proto_xmpp::client::{Message, MessageType, Presence, Subject}; use proto_xmpp::client::{Message, MessageType, Presence, Subject};
@ -180,8 +181,15 @@ impl<'a> XmppConnection<'a> {
async fn retrieve_message_history(&self, room_name: &Name) -> Result<Vec<XmppHistoryMessage>> { async fn retrieve_message_history(&self, room_name: &Name) -> Result<Vec<XmppHistoryMessage>> {
let room_id = RoomId::try_from(room_name.0.clone())?; let room_id = RoomId::try_from(room_name.0.clone())?;
let history_messages = self.user_handle.get_room_message_history(&room_id, 50).await?; let history_messages = self.user_handle.get_room_message_history(&room_id, 50).await?;
let mut response = vec![]; let history_messages = match history_messages {
RoomHistoryResult::Success(messages) => messages,
RoomHistoryResult::NoSuchRoom => {
tracing::warn!("No room found during history retrieval on join");
return Ok(vec![]);
}
};
let mut response = vec![];
for history_message in history_messages.into_iter() { for history_message in history_messages.into_iter() {
response.push(XmppHistoryMessage { response.push(XmppHistoryMessage {
id: history_message.id.to_string(), id: history_message.id.to_string(),