This commit is contained in:
Mikhail 2024-05-23 14:35:52 +02:00
parent 6285a31c85
commit 8e7a01b567
12 changed files with 160 additions and 42 deletions

View File

@ -18,7 +18,7 @@ use tracing::{Instrument, Span};
use crate::clustering::room::*;
use crate::prelude::*;
use crate::room::{RoomHandle, RoomId, RoomInfo};
use crate::room::{HistoryMessage, RoomHandle, RoomId, RoomInfo};
use crate::table::{AnonTable, Key as AnonKey};
use crate::LavinaCore;
@ -111,6 +111,14 @@ impl PlayerConnection {
Ok(deferred.await?)
}
#[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")]
pub async fn get_room_message_history(&self, room_id: RoomId) -> Result<(Vec<HistoryMessage>)> {
let (promise, deferred) = oneshot();
let cmd = ClientCommand::GetRoomHistory { room_id, promise };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await;
Ok(deferred.await?)
}
/// Handler in [Player::send_dialog_message].
#[tracing::instrument(skip(self, body), name = "PlayerConnection::send_dialog_message")]
pub async fn send_dialog_message(&self, recipient: PlayerId, body: Str) -> Result<()> {
@ -212,6 +220,10 @@ pub enum ClientCommand {
recipient: PlayerId,
promise: Promise<GetInfoResult>,
},
GetRoomHistory {
room_id: RoomId,
promise: Promise<Vec<HistoryMessage>>,
},
}
pub enum GetInfoResult {
@ -509,6 +521,11 @@ impl Player {
let result = self.check_user_existence(recipient).await;
let _ = promise.send(result);
}
ClientCommand::GetRoomHistory { room_id, promise } => {
let result = self.get_room_history(room_id).await;
let _ = promise.send(result);
}
_ => {}
}
}
@ -557,6 +574,21 @@ impl Player {
}
}
#[tracing::instrument(skip(self), name = "Player::retrieve_room_history")]
async fn get_room_history(&mut self, room_id: RoomId) -> Vec<HistoryMessage> {
let room = self.my_rooms.get(&room_id);
if let Some(room) = room {
match room {
RoomRef::Local(room) => room.get_message_history().await,
RoomRef::Remote { node_id } => {
todo!()
}
}
} else {
todo!()
}
}
#[tracing::instrument(skip(self), name = "Player::leave_room")]
async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) {
let room = self.my_rooms.remove(&room_id);

View File

@ -174,6 +174,6 @@ impl Storage {
.fetch_all(&mut *executor)
.await?;
res.into_iter().map(|(room_id,)| RoomId::from(room_id)).collect()
res.into_iter().map(|(room_id,)| RoomId::try_from(room_id)).collect()
}
}

View File

@ -96,6 +96,27 @@ impl Storage {
Ok(res.map(|(id,)| id))
}
#[tracing::instrument(skip(self), name = "Storage::retrieve_user_by_id")]
pub async fn retrieve_user_by_id(&self, name: &str) -> Result<Option<u32>> {
let mut executor = self.conn.lock().await;
let res: Option<(u32,)> = sqlx::query_as(
"
select
u.id,
u.name
from
users u
where
u.id = ?;
",
)
.bind(name)
.fetch_optional(&mut *executor)
.await?;
Ok(res.map(|(id,)| id))
}
#[tracing::instrument(skip(self), name = "Storage::create_or_retrieve_user_id_by_name")]
pub async fn create_or_retrieve_user_id_by_name(&self, name: &str) -> Result<u32> {
let mut executor = self.conn.lock().await;

View File

@ -16,7 +16,7 @@ use crate::Services;
pub struct RoomId(Str);
impl RoomId {
pub fn from(str: impl Into<Str>) -> Result<RoomId> {
pub fn try_from(str: impl Into<Str>) -> Result<RoomId> {
let bytes = str.into();
if bytes.len() > 32 {
return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols"));
@ -158,6 +158,23 @@ impl RoomHandle {
lock.broadcast_update(update, player_id).await;
}
pub async fn get_message_history(&self) -> Vec<HistoryMessage> {
return vec![
HistoryMessage {
id: "kek".to_string(),
author_id: 1,
body: "Willkom' in Brem'".to_string(),
created_at: Utc::now(),
},
HistoryMessage {
id: "kek".to_string(),
author_id: 1,
body: "Willkom' in Hamburg".to_string(),
created_at: Utc::now(),
},
];
}
#[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")]
pub async fn unsubscribe(&self, player_id: &PlayerId) {
let mut lock = self.0.write().await;
@ -279,3 +296,15 @@ pub struct RoomInfo {
pub members: Vec<PlayerId>,
pub topic: Str,
}
pub struct User {
pub id: u32,
pub name: String,
}
pub(crate) struct HistoryMessage {
pub id: String,
pub author: User,
pub body: String,
pub created_at: DateTime<Utc>,
}

View File

@ -720,7 +720,7 @@ async fn handle_incoming_message(
}
ClientMessage::PrivateMessage { recipient, body } => match recipient {
Recipient::Chan(Chan::Global(chan)) => {
let room_id = RoomId::from(chan)?;
let room_id = RoomId::try_from(chan)?;
user_handle.send_message(room_id, body).await?;
}
Recipient::Nick(nick) => {
@ -732,7 +732,7 @@ async fn handle_incoming_message(
ClientMessage::Topic { chan, topic } => {
match chan {
Chan::Global(chan) => {
let room_id = RoomId::from(chan)?;
let room_id = RoomId::try_from(chan)?;
user_handle.change_topic(room_id.clone(), topic.clone()).await?;
ServerMessage {
tags: vec![],
@ -774,7 +774,7 @@ async fn handle_incoming_message(
writer.flush().await?;
}
Recipient::Chan(Chan::Global(chan)) => {
let room = core.get_room(&RoomId::from(chan.clone())?).await;
let room = core.get_room(&RoomId::try_from(chan.clone())?).await;
if let Some(room) = room {
let room_info = room.get_room_info().await;
for member in room_info.members {
@ -893,7 +893,7 @@ async fn handle_join(
) -> Result<()> {
match chan {
Chan::Global(chan_name) => {
let room_id = RoomId::from(chan_name.clone())?;
let room_id = RoomId::try_from(chan_name.clone())?;
if let JoinResult::Success(room_info) = user_handle.join_room(room_id).await? {
produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?;
} else {
@ -924,7 +924,7 @@ async fn handle_part(
writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> {
if let Chan::Global(chan_name) = chan {
let room_id = RoomId::from(chan_name.clone())?;
let room_id = RoomId::try_from(chan_name.clone())?;
user_handle.leave_room(room_id).await?;
ServerMessage {
tags: vec![],

View File

@ -619,14 +619,14 @@ async fn server_time_capability() -> Result<()> {
server.core.create_player(&PlayerId::from("some_guy")?).await?;
let mut conn = server.core.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
let res = conn.join_room(RoomId::from("test").unwrap()).await?;
let res = conn.join_room(RoomId::try_from("test").unwrap()).await?;
let JoinResult::Success(_) = res else {
panic!("Failed to join room");
};
s.expect(":some_guy JOIN #test").await?;
let SendMessageResult::Success(res) = conn.send_message(RoomId::from("test").unwrap(), "Hello".into()).await?
let SendMessageResult::Success(res) = conn.send_message(RoomId::try_from("test").unwrap(), "Hello".into()).await?
else {
panic!("Failed to send message");
};

View File

@ -163,7 +163,7 @@ impl<'a> XmppConnection<'a> {
server,
resource: None,
}) if server.0 == self.hostname_rooms => {
let room_id = RoomId::from(room_name.0.clone()).unwrap();
let room_id = RoomId::try_from(room_name.0.clone()).unwrap();
let Some(_) = self.core.get_room(&room_id).await else {
// TODO should return item-not-found
// example:

View File

@ -20,7 +20,7 @@ impl<'a> XmppConnection<'a> {
}) = m.to
{
if server.0.as_ref() == &*self.hostname_rooms && m.r#type == MessageType::Groupchat {
self.user_handle.send_message(RoomId::from(name.0.clone())?, m.body.clone().into()).await?;
self.user_handle.send_message(RoomId::try_from(name.0.clone())?, m.body.clone().into()).await?;
Message::<()> {
to: Some(Jid {
name: Some(self.user.xmpp_name.clone()),

View File

@ -6,7 +6,7 @@ use quick_xml::events::Event;
use lavina_core::room::RoomId;
use proto_xmpp::bind::{Jid, Name, Resource, Server};
use proto_xmpp::client::Presence;
use proto_xmpp::muc::{Delay, HistoryMessage, XUser};
use proto_xmpp::muc::{Delay, XmppHistoryMessage, XUser};
use proto_xmpp::xml::{Ignore, ToXml};
use crate::XmppConnection;
@ -64,7 +64,7 @@ impl<'a> XmppConnection<'a> {
}
async fn muc_presence(&mut self, name: &Name) -> Result<(Presence<XUser>)> {
let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?;
let a = self.user_handle.join_room(RoomId::try_from(name.0.clone())?).await?;
// TODO handle bans
let response = Presence {
to: Some(Jid {
@ -82,29 +82,53 @@ impl<'a> XmppConnection<'a> {
};
Ok(response)
}
async fn send_history_on_join(&self) -> Result<(HistoryMessage)> {
Ok(HistoryMessage {
id: "kek".to_string(),
async fn send_history_on_join(&self) -> Result<(Vec<XmppHistoryMessage>)> {
let room_id = RoomId::try_from("kek1337")?;
let history_messages = self.user_handle.get_room_message_history(room_id).await?;
let mut response = vec![];
// todo: get author
for history_message in history_messages.into_iter() {
self.user_handle.
let user = self.core.get_user(&history_message.author_id).await?;
response.push(XmppHistoryMessage {
id: history_message.id,
to: Jid {
name: Some(Name("sauer@oflor.me".into())),
server: Server("localhost".into()),
resource: Some(Resource("tester".into())),
name: history_message.author_id,
server: Server(),
resource: None,
},
from: Jid {
name: Some(Name("pepe".into())),
server: Server("rooms.localhost".into()),
resource: Some(Resource("sauer".into())),
},
delay: Delay::new(
Jid {
name: Some(Name("pepe".into())),
server: Server("rooms.localhost".into()),
resource: Some(Resource("tester".into())),
},
"2021-10-10T10:10:10Z".to_string(),
),
body: "Vasya Pupkin says hello.".to_string(),
})
from: Jid {},
delay: (),
body: "".to_string(),
});
}
// Ok(HistoryMessage {
// id: "kek".to_string(),
// to: Jid {
// name: Some(Name("sauer@oflor.me".into())),
// server: Server("localhost".into()),
// resource: Some(Resource("tester".into())),
// },
// from: Jid {
// name: Some(Name("pepe".into())),
// server: Server("rooms.localhost".into()),
// resource: Some(Resource("sauer".into())),
// },
// delay: Delay::new(
// Jid {
// name: Some(Name("pepe".into())),
// server: Server("rooms.localhost".into()),
// resource: Some(Resource("tester".into())),
// },
// "2021-10-10T10:10:10Z".to_string(),
// ),
// body: ,
// })
}
}

View File

@ -175,7 +175,7 @@ impl Delay {
}
#[derive(Debug, PartialEq, Eq)]
pub struct HistoryMessage {
pub struct XmppHistoryMessage {
pub id: String,
pub to: Jid,
pub from: Jid,
@ -183,7 +183,19 @@ pub struct HistoryMessage {
pub body: String,
}
impl ToXml for HistoryMessage {
impl From<XmppHistoryMessage> for XmppHistoryMessage {
fn from(msg: HistoryMessage) -> Self {
Self {
id: msg.id,
to: msg.to,
from: msg.from,
delay: Delay::new(msg.from, msg.stamp),
body: msg.body,
}
}
}
impl ToXml for XmppHistoryMessage {
fn serialize(&self, events: &mut Vec<Event<'static>>) {
let mut tag = BytesStart::new("message");
tag.push_attribute(Attribute {

View File

@ -164,7 +164,7 @@ async fn endpoint_send_room_message(
let Ok(req) = serde_json::from_slice::<rooms::SendMessageReq>(&str[..]) else {
return Ok(malformed_request());
};
let Ok(room_id) = RoomId::from(req.room_id) else {
let Ok(room_id) = RoomId::try_from(req.room_id) else {
return Ok(room_not_found());
};
let Ok(player_id) = PlayerId::from(req.author_id) else {
@ -187,7 +187,7 @@ async fn endpoint_set_room_topic(
let Ok(req) = serde_json::from_slice::<rooms::SetTopicReq>(&str[..]) else {
return Ok(malformed_request());
};
let Ok(room_id) = RoomId::from(req.room_id) else {
let Ok(room_id) = RoomId::try_from(req.room_id) else {
return Ok(room_not_found());
};
let Ok(player_id) = PlayerId::from(req.author_id) else {

View File

@ -29,7 +29,7 @@ async fn endpoint_cluster_join_room(
return Ok(malformed_request());
};
tracing::info!("Incoming request: {:?}", &req);
let Ok(room_id) = RoomId::from(req.room_id) else {
let Ok(room_id) = RoomId::try_from(req.room_id) else {
dbg!(&req.room_id);
return Ok(room_not_found());
};
@ -55,7 +55,7 @@ async fn endpoint_cluster_add_message(
dbg!(&req.created_at);
return Ok(malformed_request());
};
let Ok(room_id) = RoomId::from(req.room_id) else {
let Ok(room_id) = RoomId::try_from(req.room_id) else {
dbg!(&req.room_id);
return Ok(room_not_found());
};