handle part commands

This commit is contained in:
Nikita Vilunov 2023-02-15 18:54:48 +01:00
parent 1bc305962e
commit 203db3b207
4 changed files with 76 additions and 0 deletions

View File

@ -77,6 +77,12 @@ impl PlayerConnection {
Ok(deferred.await?)
}
pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> {
let (promise, deferred) = oneshot();
self.player_handle.send(PlayerCommand::Cmd(Cmd::LeaveRoom { room_id, promise }, self.connection_id.clone())).await;
Ok(deferred.await?)
}
pub async fn terminate(self) {
self.player_handle
.send(PlayerCommand::TerminateConnection(self.connection_id))
@ -166,6 +172,10 @@ pub enum Cmd {
room_id: RoomId,
promise: Promise<RoomInfo>,
},
LeaveRoom {
room_id: RoomId,
promise: Promise<()>,
},
SendMessage {
room_id: RoomId,
body: String,
@ -194,6 +204,10 @@ pub enum Updates {
room_id: RoomId,
new_member_id: PlayerId,
},
RoomLeft {
room_id: RoomId,
former_member_id: PlayerId,
},
}
/// Handle to a player registry — a shared data structure containing information about players.
@ -322,6 +336,18 @@ impl Player {
};
self.broadcast_update(update, connection_id).await;
}
Cmd::LeaveRoom { room_id, promise } => {
let room = self.my_rooms.remove(&room_id);
if let Some(room) = room {
room.unsubscribe(&self.player_id).await;
let room_info = room.get_room_info().await;
}
promise.send(());
let update = Updates::RoomLeft {
room_id, former_member_id: self.player_id.clone(),
};
self.broadcast_update(update, connection_id).await;
}
Cmd::SendMessage {
room_id,
body,

View File

@ -99,6 +99,16 @@ impl RoomHandle {
lock.add_subscriber(player_id, player_handle).await;
}
pub async fn unsubscribe(&self, player_id: &PlayerId) {
let mut lock = self.0.write().await;
lock.subscriptions.remove(player_id);
let update = Updates::RoomLeft {
room_id: lock.room_id.clone(),
former_member_id: player_id.clone(),
};
lock.broadcast_update(update, player_id).await;
}
pub async fn send_message(&self, player_id: PlayerId, body: String) {
let lock = self.0.read().await;
lock.send_message(player_id, body).await;

View File

@ -247,6 +247,14 @@ async fn handle_registered_socket<'a>(
writer.flush().await?
}
},
Updates::RoomLeft { room_id, former_member_id } => {
ServerMessage {
tags: vec![],
sender: Some(former_member_id.as_bytes().clone()),
body: ServerMessageBody::Part(Chan::Global(room_id.as_bytes().clone())),
}.write_async(writer).await?;
writer.flush().await?
},
Updates::NewMessage { author_id, room_id, body } => {
ServerMessage {
tags: vec![],
@ -302,6 +310,9 @@ async fn handle_incoming_message(
ClientMessage::Join(ref chan) => {
handle_join(&config, &user, user_handle, chan, writer).await?;
}
ClientMessage::Part { chan, message } => {
handle_part(config, user, user_handle, &chan, writer).await?;
}
ClientMessage::PrivateMessage { recipient, body } => match recipient {
Recipient::Chan(Chan::Global(chan)) => match String::from_utf8(body) {
Ok(body) => {
@ -365,6 +376,30 @@ async fn handle_join(
Ok(())
}
async fn handle_part(
config: &ServerConfig,
user: &RegisteredUser,
user_handle: &mut PlayerConnection,
chan: &Chan,
writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> {
if let Chan::Global(chan_name) = chan {
let room_id = RoomId::from_bytes(chan_name.clone())?;
user_handle.leave_room(room_id).await?;
ServerMessage {
tags: vec![],
sender: Some(user.nickname.clone()),
body: ServerMessageBody::Part(Chan::Global(chan_name.clone())),
}
.write_async(writer)
.await?;
writer.flush().await?;
} else {
log::warn!("Local chans unsupported");
}
Ok(())
}
async fn produce_on_join_cmd_messages(
config: &ServerConfig,
user: &RegisteredUser,

View File

@ -60,6 +60,7 @@ pub enum ServerMessageBody {
body: ByteVec,
},
Join(Chan),
Part(Chan),
N001Welcome {
client: ByteVec,
text: ByteVec,
@ -131,6 +132,10 @@ impl ServerMessageBody {
writer.write_all(b"JOIN ").await?;
chan.write_async(writer).await?;
}
ServerMessageBody::Part(chan) => {
writer.write_all(b"PART ").await?;
chan.write_async(writer).await?;
}
ServerMessageBody::N001Welcome { client, text } => {
writer.write_all(b"001 ").await?;
writer.write_all(&client).await?;