Send messages on join

This commit is contained in:
Mikhail 2024-05-28 20:31:02 +02:00
parent b3d27e96c2
commit b0789d5457
8 changed files with 106 additions and 61 deletions

View File

@ -112,9 +112,13 @@ impl PlayerConnection {
} }
#[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")] #[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")]
pub async fn get_room_message_history(&self, room_id: RoomId) -> Result<Vec<StoredMessage>> { pub async fn get_room_message_history(&self, room_id: &RoomId, limit: u32) -> Result<Vec<StoredMessage>> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::GetRoomHistory { room_id, promise }; let cmd = ClientCommand::GetRoomHistory {
room_id: room_id.clone(),
promise,
limit,
};
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await;
Ok(deferred.await?) Ok(deferred.await?)
} }
@ -223,6 +227,7 @@ pub enum ClientCommand {
GetRoomHistory { GetRoomHistory {
room_id: RoomId, room_id: RoomId,
promise: Promise<Vec<StoredMessage>>, promise: Promise<Vec<StoredMessage>>,
limit: u32,
}, },
} }
@ -522,8 +527,12 @@ impl Player {
let result = self.check_user_existence(recipient).await; let result = self.check_user_existence(recipient).await;
let _ = promise.send(result); let _ = promise.send(result);
} }
ClientCommand::GetRoomHistory { room_id, promise } => { ClientCommand::GetRoomHistory {
let result = self.get_room_history(room_id).await; room_id,
promise,
limit,
} => {
let result = self.get_room_history(room_id, limit).await;
let _ = promise.send(result); let _ = promise.send(result);
} }
} }
@ -575,11 +584,11 @@ impl Player {
} }
#[tracing::instrument(skip(self), name = "Player::retrieve_room_history")] #[tracing::instrument(skip(self), name = "Player::retrieve_room_history")]
async fn get_room_history(&mut self, room_id: RoomId) -> Vec<StoredMessage> { async fn get_room_history(&mut self, room_id: RoomId, limit: u32) -> Vec<StoredMessage> {
let room = self.my_rooms.get(&room_id); let room = self.my_rooms.get(&room_id);
if let Some(room) = room { if let Some(room) = room {
match room { match room {
RoomRef::Local(room) => room.get_message_history(&self.services).await, RoomRef::Local(room) => room.get_message_history(&self.services, limit).await,
RoomRef::Remote { node_id } => { RoomRef::Remote { node_id } => {
todo!() todo!()
} }

View File

@ -48,7 +48,6 @@ impl Storage {
room_id = ? room_id = ?
order by order by
messages.id messages.id
--
limit ?; limit ?;
", ",
// todo: implement limit // todo: implement limit

View File

@ -160,8 +160,8 @@ impl RoomHandle {
lock.broadcast_update(update, player_id).await; lock.broadcast_update(update, player_id).await;
} }
pub async fn get_message_history(&self, services: &LavinaCore) -> Vec<StoredMessage> { pub async fn get_message_history(&self, services: &Services, limit: u32) -> Vec<StoredMessage> {
return services.storage.get_room_message_history(self.0.read().await.storage_id).await.unwrap(); return services.storage.get_room_message_history(self.0.read().await.storage_id, limit).await.unwrap();
} }
#[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")] #[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")]

View File

@ -25,7 +25,7 @@ use proto_irc::client::{client_message, ClientMessage};
use proto_irc::server::CapSubBody; use proto_irc::server::CapSubBody;
use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody};
use proto_irc::user::PrefixedNick; use proto_irc::user::PrefixedNick;
use proto_irc::{Chan, Recipient, Tag}; use proto_irc::{ChannelName, Recipient, Tag};
use sasl::AuthBody; use sasl::AuthBody;
mod cap; mod cap;
use handler::Handler; use handler::Handler;
@ -491,7 +491,14 @@ async fn handle_registered_socket<'a>(
let rooms_list = connection.get_rooms().await?; let rooms_list = connection.get_rooms().await?;
for room in &rooms_list { for room in &rooms_list {
produce_on_join_cmd_messages(&config, &user, &Chan::Global(room.id.as_inner().clone()), room, writer).await?; produce_on_join_cmd_messages(
&config,
&user,
&ChannelName::Global(room.id.as_inner().clone()),
room,
writer,
)
.await?;
} }
writer.flush().await?; writer.flush().await?;
@ -576,7 +583,7 @@ async fn handle_update(
if player_id == &new_member_id { if player_id == &new_member_id {
if let Some(room) = core.get_room(&room_id).await { if let Some(room) = core.get_room(&room_id).await {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
let chan = Chan::Global(room_id.as_inner().clone()); let chan = ChannelName::Global(room_id.as_inner().clone());
produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?; produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?;
writer.flush().await?; writer.flush().await?;
} else { } else {
@ -586,7 +593,7 @@ async fn handle_update(
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(new_member_id.as_inner().clone()), sender: Some(new_member_id.as_inner().clone()),
body: ServerMessageBody::Join(Chan::Global(room_id.as_inner().clone())), body: ServerMessageBody::Join(ChannelName::Global(room_id.as_inner().clone())),
} }
.write_async(writer) .write_async(writer)
.await?; .await?;
@ -600,7 +607,7 @@ async fn handle_update(
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(former_member_id.as_inner().clone()), sender: Some(former_member_id.as_inner().clone()),
body: ServerMessageBody::Part(Chan::Global(room_id.as_inner().clone())), body: ServerMessageBody::Part(ChannelName::Global(room_id.as_inner().clone())),
} }
.write_async(writer) .write_async(writer)
.await?; .await?;
@ -624,7 +631,7 @@ async fn handle_update(
tags, tags,
sender: Some(author_id.as_inner().clone()), sender: Some(author_id.as_inner().clone()),
body: ServerMessageBody::PrivateMessage { body: ServerMessageBody::PrivateMessage {
target: Recipient::Chan(Chan::Global(room_id.as_inner().clone())), target: Recipient::Chan(ChannelName::Global(room_id.as_inner().clone())),
body: body.clone(), body: body.clone(),
}, },
} }
@ -638,7 +645,7 @@ async fn handle_update(
sender: Some(config.server_name.clone()), sender: Some(config.server_name.clone()),
body: ServerMessageBody::N332Topic { body: ServerMessageBody::N332Topic {
client: user.nickname.clone(), client: user.nickname.clone(),
chat: Chan::Global(room_id.as_inner().clone()), chat: ChannelName::Global(room_id.as_inner().clone()),
topic: new_topic, topic: new_topic,
}, },
} }
@ -651,7 +658,7 @@ async fn handle_update(
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(player_id.as_inner().clone()), sender: Some(player_id.as_inner().clone()),
body: ServerMessageBody::Part(Chan::Global(room_id.as_inner().clone())), body: ServerMessageBody::Part(ChannelName::Global(room_id.as_inner().clone())),
} }
.write_async(writer) .write_async(writer)
.await?; .await?;
@ -726,7 +733,7 @@ async fn handle_incoming_message(
handle_part(config, user, user_handle, &chan, writer).await?; handle_part(config, user, user_handle, &chan, writer).await?;
} }
ClientMessage::PrivateMessage { recipient, body } => match recipient { ClientMessage::PrivateMessage { recipient, body } => match recipient {
Recipient::Chan(Chan::Global(chan)) => { Recipient::Chan(ChannelName::Global(chan)) => {
let room_id = RoomId::try_from(chan)?; let room_id = RoomId::try_from(chan)?;
user_handle.send_message(room_id, body).await?; user_handle.send_message(room_id, body).await?;
} }
@ -738,7 +745,7 @@ async fn handle_incoming_message(
}, },
ClientMessage::Topic { chan, topic } => { ClientMessage::Topic { chan, topic } => {
match chan { match chan {
Chan::Global(chan) => { ChannelName::Global(chan) => {
let room_id = RoomId::try_from(chan)?; let room_id = RoomId::try_from(chan)?;
user_handle.change_topic(room_id.clone(), topic.clone()).await?; user_handle.change_topic(room_id.clone(), topic.clone()).await?;
ServerMessage { ServerMessage {
@ -746,7 +753,7 @@ async fn handle_incoming_message(
sender: Some(config.server_name.clone()), sender: Some(config.server_name.clone()),
body: ServerMessageBody::N332Topic { body: ServerMessageBody::N332Topic {
client: user.nickname.clone(), client: user.nickname.clone(),
chat: Chan::Global(room_id.as_inner().clone()), chat: ChannelName::Global(room_id.as_inner().clone()),
topic, topic,
}, },
} }
@ -754,7 +761,7 @@ async fn handle_incoming_message(
.await?; .await?;
writer.flush().await?; writer.flush().await?;
} }
Chan::Local(_) => {} ChannelName::Local(_) => {}
}; };
} }
ClientMessage::Who { target } => match &target { ClientMessage::Who { target } => match &target {
@ -780,7 +787,7 @@ async fn handle_incoming_message(
.await?; .await?;
writer.flush().await?; writer.flush().await?;
} }
Recipient::Chan(Chan::Global(chan)) => { Recipient::Chan(ChannelName::Global(chan)) => {
let room = core.get_room(&RoomId::try_from(chan.clone())?).await; let room = core.get_room(&RoomId::try_from(chan.clone())?).await;
if let Some(room) = room { if let Some(room) = room {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
@ -807,7 +814,7 @@ async fn handle_incoming_message(
.await?; .await?;
writer.flush().await?; writer.flush().await?;
} }
Recipient::Chan(Chan::Local(_)) => { Recipient::Chan(ChannelName::Local(_)) => {
log::warn!("Local chans not supported"); log::warn!("Local chans not supported");
} }
}, },
@ -860,8 +867,38 @@ async fn handle_incoming_message(
log::info!("Received QUIT"); log::info!("Received QUIT");
return Ok(HandleResult::Leave); return Ok(HandleResult::Leave);
} }
// todo: implement chat history logic here. ClientMessage::ChatHistory { chan, limit } => {
// ClientMessage:ChatHistory { ... } => {} let channel_name = match chan.clone() {
ChannelName::Global(chan) => chan,
ChannelName::Local(chan) => chan,
};
let room = core.get_room(&RoomId::try_from(channel_name.clone())?).await;
if let Some(room) = room {
let room_info = room.get_room_info().await;
let messages = user_handle.get_room_message_history(&room_info.id, limit).await?;
for message in messages {
let mut tags = vec![];
if user.enabled_capabilities.contains(Capabilities::ServerTime) {
let tag = Tag {
key: "time".into(),
value: Some(message.created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()),
};
tags.push(tag);
}
ServerMessage {
tags,
sender: Some(message.author_name.into()),
body: ServerMessageBody::PrivateMessage {
target: Recipient::Chan(chan.clone()),
body: message.content.into(),
},
}
.write_async(writer)
.await?;
}
writer.flush().await?;
}
}
cmd => { cmd => {
log::warn!("Not implemented handler for client command: {cmd:?}"); log::warn!("Not implemented handler for client command: {cmd:?}");
} }
@ -897,11 +934,11 @@ async fn handle_join(
config: &ServerConfig, config: &ServerConfig,
user: &RegisteredUser, user: &RegisteredUser,
user_handle: &mut PlayerConnection, user_handle: &mut PlayerConnection,
chan: &Chan, chan: &ChannelName,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> { ) -> Result<()> {
match chan { match chan {
Chan::Global(chan_name) => { ChannelName::Global(chan_name) => {
let room_id = RoomId::try_from(chan_name.clone())?; let room_id = RoomId::try_from(chan_name.clone())?;
match user_handle.join_room(room_id).await? { match user_handle.join_room(room_id).await? {
JoinResult::Success(room_info) => { JoinResult::Success(room_info) => {
@ -926,7 +963,7 @@ async fn handle_join(
} }
writer.flush().await?; writer.flush().await?;
} }
Chan::Local(_) => { ChannelName::Local(_) => {
// TODO handle join attempts to local chans with an error, we don't support these // TODO handle join attempts to local chans with an error, we don't support these
} }
}; };
@ -937,16 +974,16 @@ async fn handle_part(
config: &ServerConfig, config: &ServerConfig,
user: &RegisteredUser, user: &RegisteredUser,
user_handle: &mut PlayerConnection, user_handle: &mut PlayerConnection,
chan: &Chan, chan: &ChannelName,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> { ) -> Result<()> {
if let Chan::Global(chan_name) = chan { if let ChannelName::Global(chan_name) = chan {
let room_id = RoomId::try_from(chan_name.clone())?; let room_id = RoomId::try_from(chan_name.clone())?;
user_handle.leave_room(room_id).await?; user_handle.leave_room(room_id).await?;
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(user.nickname.clone()), sender: Some(user.nickname.clone()),
body: ServerMessageBody::Part(Chan::Global(chan_name.clone())), body: ServerMessageBody::Part(ChannelName::Global(chan_name.clone())),
} }
.write_async(writer) .write_async(writer)
.await?; .await?;
@ -960,7 +997,7 @@ async fn handle_part(
async fn produce_on_join_cmd_messages( async fn produce_on_join_cmd_messages(
config: &ServerConfig, config: &ServerConfig,
user: &RegisteredUser, user: &RegisteredUser,
chan: &Chan, chan: &ChannelName,
room_info: &RoomInfo, room_info: &RoomInfo,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> { ) -> Result<()> {

View File

@ -179,7 +179,7 @@ impl<'a> XmppConnection<'a> {
#[tracing::instrument(skip(self), name = "XmppConnection::retrieve_message_history")] #[tracing::instrument(skip(self), name = "XmppConnection::retrieve_message_history")]
async fn retrieve_message_history(&self, room_name: &Name) -> Result<Vec<XmppHistoryMessage>> { async fn retrieve_message_history(&self, room_name: &Name) -> Result<Vec<XmppHistoryMessage>> {
let room_id = RoomId::try_from(room_name.0.clone())?; let room_id = RoomId::try_from(room_name.0.clone())?;
let history_messages = self.user_handle.get_room_message_history(room_id).await?; let history_messages = self.user_handle.get_room_message_history(&room_id, 50).await?;
let mut response = vec![]; let mut response = vec![];
for history_message in history_messages.into_iter() { for history_message in history_messages.into_iter() {

View File

@ -33,7 +33,7 @@ pub enum ClientMessage {
realname: Str, realname: Str,
}, },
/// `JOIN <chan>` /// `JOIN <chan>`
Join(Chan), Join(ChannelName),
/// `MODE <target>` /// `MODE <target>`
Mode { Mode {
target: Recipient, target: Recipient,
@ -48,11 +48,11 @@ pub enum ClientMessage {
}, },
/// `TOPIC <chan> :<topic>` /// `TOPIC <chan> :<topic>`
Topic { Topic {
chan: Chan, chan: ChannelName,
topic: Str, topic: Str,
}, },
Part { Part {
chan: Chan, chan: ChannelName,
message: Option<Str>, message: Option<Str>,
}, },
/// `PRIVMSG <target> :<msg>` /// `PRIVMSG <target> :<msg>`
@ -65,8 +65,8 @@ pub enum ClientMessage {
reason: Str, reason: Str,
}, },
Authenticate(Str), Authenticate(Str),
Chathistory { ChatHistory {
chan: Chan, chan: ChannelName,
limit: u32, limit: u32,
}, },
} }
@ -294,7 +294,7 @@ fn client_message_chathistory(input: &str) -> IResult<&str, ClientMessage> {
let (input, _) = tag(" * ")(input)?; let (input, _) = tag(" * ")(input)?;
let (input, limit) = limit(input)?; let (input, limit) = limit(input)?;
Ok((input, ClientMessage::Chathistory { chan, limit })) Ok((input, ClientMessage::ChatHistory { chan, limit }))
} }
fn limit(input: &str) -> IResult<&str, u32> { fn limit(input: &str) -> IResult<&str, u32> {
@ -504,7 +504,7 @@ mod test {
fn test_client_message_part() { fn test_client_message_part() {
let input = "PART #chan :Pokasiki !!!"; let input = "PART #chan :Pokasiki !!!";
let expected = ClientMessage::Part { let expected = ClientMessage::Part {
chan: Chan::Global("chan".into()), chan: ChannelName::Global("chan".into()),
message: Some("Pokasiki !!!".into()), message: Some("Pokasiki !!!".into()),
}; };
@ -516,7 +516,7 @@ mod test {
fn test_client_message_part_empty() { fn test_client_message_part_empty() {
let input = "PART #chan"; let input = "PART #chan";
let expected = ClientMessage::Part { let expected = ClientMessage::Part {
chan: Chan::Global("chan".into()), chan: ChannelName::Global("chan".into()),
message: None, message: None,
}; };
@ -547,8 +547,8 @@ mod test {
#[test] #[test]
fn test_client_chat_history_latest() { fn test_client_chat_history_latest() {
let input = "CHATHISTORY LATEST #chan * 10"; let input = "CHATHISTORY LATEST #chan * 10";
let expected = ClientMessage::Chathistory { let expected = ClientMessage::ChatHistory {
chan: Chan::Global("chan".into()), chan: ChannelName::Global("chan".into()),
limit: 10, limit: 10,
}; };

View File

@ -48,20 +48,20 @@ fn params(input: &str) -> IResult<&str, &str> {
} }
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub enum Chan { pub enum ChannelName {
/// `#<name>` — network-global channel, available from any server in the network. /// `#<name>` — network-global channel, available from any server in the network.
Global(Str), Global(Str),
/// `&<name>` — server-local channel, available only to connections to the same server. Rarely used in practice. /// `&<name>` — server-local channel, available only to connections to the same server. Rarely used in practice.
Local(Str), Local(Str),
} }
impl Chan { impl ChannelName {
pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> { pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> {
match self { match self {
Chan::Global(name) => { ChannelName::Global(name) => {
writer.write_all(b"#").await?; writer.write_all(b"#").await?;
writer.write_all(name.as_bytes()).await?; writer.write_all(name.as_bytes()).await?;
} }
Chan::Local(name) => { ChannelName::Local(name) => {
writer.write_all(b"&").await?; writer.write_all(b"&").await?;
writer.write_all(name.as_bytes()).await?; writer.write_all(name.as_bytes()).await?;
} }
@ -70,17 +70,17 @@ impl Chan {
} }
} }
fn chan(input: &str) -> IResult<&str, Chan> { fn chan(input: &str) -> IResult<&str, ChannelName> {
fn chan_global(input: &str) -> IResult<&str, Chan> { fn chan_global(input: &str) -> IResult<&str, ChannelName> {
let (input, _) = tag("#")(input)?; let (input, _) = tag("#")(input)?;
let (input, name) = receiver(input)?; let (input, name) = receiver(input)?;
Ok((input, Chan::Global(name.into()))) Ok((input, ChannelName::Global(name.into())))
} }
fn chan_local(input: &str) -> IResult<&str, Chan> { fn chan_local(input: &str) -> IResult<&str, ChannelName> {
let (input, _) = tag("&")(input)?; let (input, _) = tag("&")(input)?;
let (input, name) = receiver(input)?; let (input, name) = receiver(input)?;
Ok((input, Chan::Local(name.into()))) Ok((input, ChannelName::Local(name.into())))
} }
alt((chan_global, chan_local))(input) alt((chan_global, chan_local))(input)
@ -89,7 +89,7 @@ fn chan(input: &str) -> IResult<&str, Chan> {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub enum Recipient { pub enum Recipient {
Nick(Str), Nick(Str),
Chan(Chan), Chan(ChannelName),
} }
impl Recipient { impl Recipient {
pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> { pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> {
@ -125,7 +125,7 @@ mod test {
#[test] #[test]
fn test_chan_global() { fn test_chan_global() {
let input = "#testchan"; let input = "#testchan";
let expected = Chan::Global("testchan".into()); let expected = ChannelName::Global("testchan".into());
let result = chan(input); let result = chan(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));
@ -139,7 +139,7 @@ mod test {
#[test] #[test]
fn test_chan_local() { fn test_chan_local() {
let input = "&localchan"; let input = "&localchan";
let expected = Chan::Local("localchan".into()); let expected = ChannelName::Local("localchan".into());
let result = chan(input); let result = chan(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));

View File

@ -69,8 +69,8 @@ pub enum ServerMessageBody {
target: Recipient, target: Recipient,
body: Str, body: Str,
}, },
Join(Chan), Join(ChannelName),
Part(Chan), Part(ChannelName),
Error { Error {
reason: Str, reason: Str,
}, },
@ -121,7 +121,7 @@ pub enum ServerMessageBody {
}, },
N332Topic { N332Topic {
client: Str, client: Str,
chat: Chan, chat: ChannelName,
topic: Str, topic: Str,
}, },
/// A reply to a client's [Who](crate::protos::irc::client::ClientMessage::Who) request. /// A reply to a client's [Who](crate::protos::irc::client::ClientMessage::Who) request.
@ -141,12 +141,12 @@ pub enum ServerMessageBody {
}, },
N353NamesReply { N353NamesReply {
client: Str, client: Str,
chan: Chan, chan: ChannelName,
members: NonEmpty<PrefixedNick>, members: NonEmpty<PrefixedNick>,
}, },
N366NamesReplyEnd { N366NamesReplyEnd {
client: Str, client: Str,
chan: Chan, chan: ChannelName,
}, },
N431ErrNoNicknameGiven { N431ErrNoNicknameGiven {
client: Str, client: Str,
@ -154,7 +154,7 @@ pub enum ServerMessageBody {
}, },
N474BannedFromChan { N474BannedFromChan {
client: Str, client: Str,
chan: Chan, chan: ChannelName,
message: Str, message: Str,
}, },
N502UsersDontMatch { N502UsersDontMatch {