diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index bd869d7..fb0fd1e 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -63,7 +63,7 @@ impl PlayerConnection { pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::SendMessage { room_id, body, promise }; - self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?; deferred.await? } @@ -72,7 +72,7 @@ impl PlayerConnection { pub async fn join_room(&mut self, room_id: RoomId) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::JoinRoom { room_id, promise }; - self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?; deferred.await? } @@ -85,7 +85,7 @@ impl PlayerConnection { new_topic, promise, }; - self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?; deferred.await? } @@ -94,12 +94,12 @@ impl PlayerConnection { pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::LeaveRoom { room_id, promise }; - self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?; deferred.await? } - pub async fn terminate(self) { - self.player_handle.send(ActorCommand::TerminateConnection(self.connection_id)).await; + pub async fn terminate(self) -> Result<()> { + self.player_handle.send(ActorCommand::TerminateConnection(self.connection_id)).await } /// Handled in [Player::get_rooms]. @@ -107,7 +107,7 @@ impl PlayerConnection { pub async fn get_rooms(&self) -> Result> { let (promise, deferred) = oneshot(); let cmd = ClientCommand::GetRooms { promise }; - self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?; deferred.await? } @@ -119,7 +119,7 @@ impl PlayerConnection { promise, limit, }; - self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?; deferred.await? } @@ -132,7 +132,7 @@ impl PlayerConnection { body, promise, }; - self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?; deferred.await? } @@ -141,7 +141,7 @@ impl PlayerConnection { pub async fn check_user_existence(&self, recipient: PlayerId) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::GetInfo { recipient, promise }; - self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; + self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?; deferred.await? } } @@ -152,27 +152,27 @@ pub struct PlayerHandle { tx: Sender<(ActorCommand, Span)>, } impl PlayerHandle { - pub async fn subscribe(&self) -> PlayerConnection { + pub async fn subscribe(&self) -> Result { let (sender, receiver) = channel(32); let (promise, deferred) = oneshot(); let cmd = ActorCommand::AddConnection { sender, promise }; - self.send(cmd).await; - let connection_id = deferred.await.unwrap(); - PlayerConnection { + self.send(cmd).await?; + let connection_id = deferred.await?; + Ok(PlayerConnection { connection_id, player_handle: self.clone(), receiver, - } + }) } - async fn send(&self, command: ActorCommand) { + async fn send(&self, command: ActorCommand) -> Result<()> { let span = tracing::span!(tracing::Level::INFO, "PlayerHandle::send"); - // TODO either handle the error or doc why it is safe to ignore - let _ = self.tx.send((command, span)).await; + self.tx.send((command, span)).await?; + Ok(()) } pub async fn update(&self, update: Updates) { - self.send(ActorCommand::Update(update)).await; + let _ = self.send(ActorCommand::Update(update)).await; } } @@ -311,7 +311,9 @@ impl PlayerRegistry { pub async fn stop_player(&self, id: &PlayerId) -> Result> { let mut inner = self.0.write().await; if let Some((handle, fiber)) = inner.players.remove(id) { - handle.send(ActorCommand::Stop).await; + if let Err(_) = handle.send(ActorCommand::Stop).await { + log::warn!("Failed to send Stop to the player actor #{id:?}. Ignoring, it is probably stopped already"); + } drop(handle); fiber.await?; inner.metric_active_players.dec(); @@ -347,13 +349,14 @@ impl PlayerRegistry { let Some(player_handle) = self.get_or_launch_player(core, id).await? else { return Ok(PlayerConnectionResult::PlayerNotFound); }; - Ok(PlayerConnectionResult::Success(player_handle.subscribe().await)) + let new_conn = player_handle.subscribe().await?; + Ok(PlayerConnectionResult::Success(new_conn)) } pub async fn shutdown_all(&self) { let mut inner = self.0.write().await; for (id, (handle, task)) in inner.players.drain() { - handle.send(ActorCommand::Stop).await; + let _ = handle.send(ActorCommand::Stop).await; drop(handle); match task.await { Ok(_) => log::debug!("Player stopped #{id:?}"), diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index c31696d..0061744 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -551,7 +551,7 @@ async fn handle_registered_socket<'a>( .await?; writer.flush().await?; - connection.terminate().await; + connection.terminate().await?; Ok(()) }