forked from lavina/lavina
1
0
Fork 0

split client command handling into methods

This commit is contained in:
Nikita Vilunov 2024-03-26 00:22:19 +01:00
parent 8e158336ad
commit ee10e3837a
1 changed files with 91 additions and 67 deletions

View File

@ -47,6 +47,7 @@ impl PlayerId {
pub struct ConnectionId(pub AnonKey); pub struct ConnectionId(pub AnonKey);
/// Representation of an authenticated client connection. /// Representation of an authenticated client connection.
/// The public API available to projections through which all client actions are executed.
/// ///
/// The connection is used to send commands to the player actor and to receive updates that might be sent to the client. /// The connection is used to send commands to the player actor and to receive updates that might be sent to the client.
pub struct PlayerConnection { pub struct PlayerConnection {
@ -55,6 +56,7 @@ pub struct PlayerConnection {
player_handle: PlayerHandle, player_handle: PlayerHandle,
} }
impl PlayerConnection { impl PlayerConnection {
/// Handled in [Player::send_message].
pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<()> { pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<()> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::SendMessage { room_id, body, promise }; let cmd = ClientCommand::SendMessage { room_id, body, promise };
@ -62,6 +64,7 @@ impl PlayerConnection {
Ok(deferred.await?) Ok(deferred.await?)
} }
/// Handled in [Player::join_room].
pub async fn join_room(&mut self, room_id: RoomId) -> Result<JoinResult> { pub async fn join_room(&mut self, room_id: RoomId) -> Result<JoinResult> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::JoinRoom { room_id, promise }; let cmd = ClientCommand::JoinRoom { room_id, promise };
@ -69,6 +72,7 @@ impl PlayerConnection {
Ok(deferred.await?) Ok(deferred.await?)
} }
/// Handled in [Player::change_topic].
pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::ChangeTopic { let cmd = ClientCommand::ChangeTopic {
@ -80,6 +84,7 @@ impl PlayerConnection {
Ok(deferred.await?) Ok(deferred.await?)
} }
/// Handled in [Player::leave_room].
pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::LeaveRoom { room_id, promise }; let cmd = ClientCommand::LeaveRoom { room_id, promise };
@ -296,7 +301,17 @@ impl Player {
} }
let _ = promise.send(response); let _ = promise.send(response);
} }
ActorCommand::Update(update) => { ActorCommand::Update(update) => self.handle_update(update).await,
ActorCommand::ClientCommand(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await,
ActorCommand::Stop => break,
}
}
log::debug!("Shutting down player actor #{:?}", self.player_id);
self
}
/// Handle an incoming update by changing the internal state and broadcasting it to all connections if necessary.
async fn handle_update(&mut self, update: Updates) {
log::info!( log::info!(
"Player received an update, broadcasting to {} connections", "Player received an update, broadcasting to {} connections",
self.connections.len() self.connections.len()
@ -312,13 +327,6 @@ impl Player {
let _ = connection.send(update.clone()).await; let _ = connection.send(update.clone()).await;
} }
} }
ActorCommand::ClientCommand(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await,
ActorCommand::Stop => break,
}
}
log::debug!("Shutting down player actor #{:?}", self.player_id);
self
}
fn terminate_connection(&mut self, connection_id: ConnectionId) { fn terminate_connection(&mut self, connection_id: ConnectionId) {
if let None = self.connections.pop(connection_id.0) { if let None = self.connections.pop(connection_id.0) {
@ -326,52 +334,74 @@ impl Player {
} }
} }
/// Dispatches a client command to the appropriate handler.
async fn handle_cmd(&mut self, cmd: ClientCommand, connection_id: ConnectionId) { async fn handle_cmd(&mut self, cmd: ClientCommand, connection_id: ConnectionId) {
match cmd { match cmd {
ClientCommand::JoinRoom { room_id, promise } => { ClientCommand::JoinRoom { room_id, promise } => {
let result = self.join_room(connection_id, room_id).await;
let _ = promise.send(result);
}
ClientCommand::LeaveRoom { room_id, promise } => {
self.leave_room(connection_id, room_id).await;
let _ = promise.send(());
}
ClientCommand::SendMessage { room_id, body, promise } => {
self.send_message(connection_id, room_id, body).await;
let _ = promise.send(());
}
ClientCommand::ChangeTopic {
room_id,
new_topic,
promise,
} => {
self.change_topic(connection_id, room_id, new_topic).await;
let _ = promise.send(());
}
}
}
async fn join_room(&mut self, connection_id: ConnectionId, room_id: RoomId) -> JoinResult {
if self.banned_from.contains(&room_id) { if self.banned_from.contains(&room_id) {
let _ = promise.send(JoinResult::Banned); return JoinResult::Banned;
return;
} }
let room = match self.rooms.get_or_create_room(room_id.clone()).await { let room = match self.rooms.get_or_create_room(room_id.clone()).await {
Ok(room) => room, Ok(room) => room,
Err(e) => { Err(e) => {
log::error!("Failed to get or create room: {e}"); log::error!("Failed to get or create room: {e}");
return; todo!();
} }
}; };
room.subscribe(self.player_id.clone(), self.handle.clone()).await; room.subscribe(self.player_id.clone(), self.handle.clone()).await;
self.my_rooms.insert(room_id.clone(), room.clone()); self.my_rooms.insert(room_id.clone(), room.clone());
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
let _ = promise.send(JoinResult::Success(room_info));
let update = Updates::RoomJoined { let update = Updates::RoomJoined {
room_id, room_id,
new_member_id: self.player_id.clone(), new_member_id: self.player_id.clone(),
}; };
self.broadcast_update(update, connection_id).await; self.broadcast_update(update, connection_id).await;
JoinResult::Success(room_info)
} }
ClientCommand::LeaveRoom { room_id, promise } => {
async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) {
let room = self.my_rooms.remove(&room_id); let room = self.my_rooms.remove(&room_id);
if let Some(room) = room { if let Some(room) = room {
room.unsubscribe(&self.player_id).await; room.unsubscribe(&self.player_id).await;
let room_info = room.get_room_info().await;
} }
let _ = promise.send(());
let update = Updates::RoomLeft { let update = Updates::RoomLeft {
room_id, room_id,
former_member_id: self.player_id.clone(), former_member_id: self.player_id.clone(),
}; };
self.broadcast_update(update, connection_id).await; self.broadcast_update(update, connection_id).await;
} }
ClientCommand::SendMessage { room_id, body, promise } => {
async fn send_message(&mut self, connection_id: ConnectionId, room_id: RoomId, body: Str) {
let room = self.rooms.get_room(&room_id).await; let room = self.rooms.get_room(&room_id).await;
if let Some(room) = room { if let Some(room) = room {
room.send_message(self.player_id.clone(), body.clone()).await; room.send_message(self.player_id.clone(), body.clone()).await;
} else { } else {
tracing::info!("no room found"); tracing::info!("no room found");
} }
let _ = promise.send(());
let update = Updates::NewMessage { let update = Updates::NewMessage {
room_id, room_id,
author_id: self.player_id.clone(), author_id: self.player_id.clone(),
@ -379,23 +409,17 @@ impl Player {
}; };
self.broadcast_update(update, connection_id).await; self.broadcast_update(update, connection_id).await;
} }
ClientCommand::ChangeTopic {
room_id, async fn change_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) {
new_topic,
promise,
} => {
let room = self.rooms.get_room(&room_id).await; let room = self.rooms.get_room(&room_id).await;
if let Some(mut room) = room { if let Some(mut room) = room {
room.set_topic(self.player_id.clone(), new_topic.clone()).await; room.set_topic(self.player_id.clone(), new_topic.clone()).await;
} else { } else {
tracing::info!("no room found"); tracing::info!("no room found");
} }
let _ = promise.send(());
let update = Updates::RoomTopicChanged { room_id, new_topic }; let update = Updates::RoomTopicChanged { room_id, new_topic };
self.broadcast_update(update, connection_id).await; self.broadcast_update(update, connection_id).await;
} }
}
}
/// Broadcasts an update to all connections except the one with the given id. /// Broadcasts an update to all connections except the one with the given id.
/// ///