diff --git a/src/core/player.rs b/src/core/player.rs index 2b5300d..468d6a6 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -77,6 +77,12 @@ impl PlayerConnection { Ok(deferred.await?) } + pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { + let (promise, deferred) = oneshot(); + self.player_handle.send(PlayerCommand::Cmd(Cmd::LeaveRoom { room_id, promise }, self.connection_id.clone())).await; + Ok(deferred.await?) + } + pub async fn terminate(self) { self.player_handle .send(PlayerCommand::TerminateConnection(self.connection_id)) @@ -166,6 +172,10 @@ pub enum Cmd { room_id: RoomId, promise: Promise, }, + LeaveRoom { + room_id: RoomId, + promise: Promise<()>, + }, SendMessage { room_id: RoomId, body: String, @@ -194,6 +204,10 @@ pub enum Updates { room_id: RoomId, new_member_id: PlayerId, }, + RoomLeft { + room_id: RoomId, + former_member_id: PlayerId, + }, } /// Handle to a player registry — a shared data structure containing information about players. @@ -322,6 +336,18 @@ impl Player { }; self.broadcast_update(update, connection_id).await; } + Cmd::LeaveRoom { room_id, promise } => { + let room = self.my_rooms.remove(&room_id); + if let Some(room) = room { + room.unsubscribe(&self.player_id).await; + let room_info = room.get_room_info().await; + } + promise.send(()); + let update = Updates::RoomLeft { + room_id, former_member_id: self.player_id.clone(), + }; + self.broadcast_update(update, connection_id).await; + } Cmd::SendMessage { room_id, body, diff --git a/src/core/room.rs b/src/core/room.rs index 02bbb8b..d60da3c 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -99,6 +99,16 @@ impl RoomHandle { lock.add_subscriber(player_id, player_handle).await; } + pub async fn unsubscribe(&self, player_id: &PlayerId) { + let mut lock = self.0.write().await; + lock.subscriptions.remove(player_id); + let update = Updates::RoomLeft { + room_id: lock.room_id.clone(), + former_member_id: player_id.clone(), + }; + lock.broadcast_update(update, player_id).await; + } + pub async fn send_message(&self, player_id: PlayerId, body: String) { let lock = self.0.read().await; lock.send_message(player_id, body).await; diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index e343579..463314e 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -247,6 +247,14 @@ async fn handle_registered_socket<'a>( writer.flush().await? } }, + Updates::RoomLeft { room_id, former_member_id } => { + ServerMessage { + tags: vec![], + sender: Some(former_member_id.as_bytes().clone()), + body: ServerMessageBody::Part(Chan::Global(room_id.as_bytes().clone())), + }.write_async(writer).await?; + writer.flush().await? + }, Updates::NewMessage { author_id, room_id, body } => { ServerMessage { tags: vec![], @@ -302,6 +310,9 @@ async fn handle_incoming_message( ClientMessage::Join(ref chan) => { handle_join(&config, &user, user_handle, chan, writer).await?; } + ClientMessage::Part { chan, message } => { + handle_part(config, user, user_handle, &chan, writer).await?; + } ClientMessage::PrivateMessage { recipient, body } => match recipient { Recipient::Chan(Chan::Global(chan)) => match String::from_utf8(body) { Ok(body) => { @@ -365,6 +376,30 @@ async fn handle_join( Ok(()) } +async fn handle_part( + config: &ServerConfig, + user: &RegisteredUser, + user_handle: &mut PlayerConnection, + chan: &Chan, + writer: &mut (impl AsyncWrite + Unpin), +) -> Result<()> { + if let Chan::Global(chan_name) = chan { + let room_id = RoomId::from_bytes(chan_name.clone())?; + user_handle.leave_room(room_id).await?; + ServerMessage { + tags: vec![], + sender: Some(user.nickname.clone()), + body: ServerMessageBody::Part(Chan::Global(chan_name.clone())), + } + .write_async(writer) + .await?; + writer.flush().await?; + } else { + log::warn!("Local chans unsupported"); + } + Ok(()) +} + async fn produce_on_join_cmd_messages( config: &ServerConfig, user: &RegisteredUser, diff --git a/src/protos/irc/server.rs b/src/protos/irc/server.rs index 1e343c6..d81d351 100644 --- a/src/protos/irc/server.rs +++ b/src/protos/irc/server.rs @@ -60,6 +60,7 @@ pub enum ServerMessageBody { body: ByteVec, }, Join(Chan), + Part(Chan), N001Welcome { client: ByteVec, text: ByteVec, @@ -131,6 +132,10 @@ impl ServerMessageBody { writer.write_all(b"JOIN ").await?; chan.write_async(writer).await?; } + ServerMessageBody::Part(chan) => { + writer.write_all(b"PART ").await?; + chan.write_async(writer).await?; + } ServerMessageBody::N001Welcome { client, text } => { writer.write_all(b"001 ").await?; writer.write_all(&client).await?;