core: do not ignore errors on sending to channels

This commit is contained in:
Nikita Vilunov 2024-06-05 00:43:39 +02:00
parent 2c828b482e
commit f3cd794431
2 changed files with 26 additions and 23 deletions

View File

@ -63,7 +63,7 @@ impl PlayerConnection {
pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<SendMessageResult> { pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<SendMessageResult> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::SendMessage { room_id, body, promise }; let cmd = ClientCommand::SendMessage { room_id, body, promise };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?;
deferred.await? deferred.await?
} }
@ -72,7 +72,7 @@ impl PlayerConnection {
pub async fn join_room(&mut self, room_id: RoomId) -> Result<JoinResult> { pub async fn join_room(&mut self, room_id: RoomId) -> Result<JoinResult> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::JoinRoom { room_id, promise }; let cmd = ClientCommand::JoinRoom { room_id, promise };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?;
deferred.await? deferred.await?
} }
@ -85,7 +85,7 @@ impl PlayerConnection {
new_topic, new_topic,
promise, promise,
}; };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?;
deferred.await? deferred.await?
} }
@ -94,12 +94,12 @@ impl PlayerConnection {
pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> { pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::LeaveRoom { room_id, promise }; let cmd = ClientCommand::LeaveRoom { room_id, promise };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?;
deferred.await? deferred.await?
} }
pub async fn terminate(self) { pub async fn terminate(self) -> Result<()> {
self.player_handle.send(ActorCommand::TerminateConnection(self.connection_id)).await; self.player_handle.send(ActorCommand::TerminateConnection(self.connection_id)).await
} }
/// Handled in [Player::get_rooms]. /// Handled in [Player::get_rooms].
@ -107,7 +107,7 @@ impl PlayerConnection {
pub async fn get_rooms(&self) -> Result<Vec<RoomInfo>> { pub async fn get_rooms(&self) -> Result<Vec<RoomInfo>> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::GetRooms { promise }; let cmd = ClientCommand::GetRooms { promise };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?;
deferred.await? deferred.await?
} }
@ -119,7 +119,7 @@ impl PlayerConnection {
promise, promise,
limit, limit,
}; };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?;
deferred.await? deferred.await?
} }
@ -132,7 +132,7 @@ impl PlayerConnection {
body, body,
promise, promise,
}; };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?;
deferred.await? deferred.await?
} }
@ -141,7 +141,7 @@ impl PlayerConnection {
pub async fn check_user_existence(&self, recipient: PlayerId) -> Result<GetInfoResult> { pub async fn check_user_existence(&self, recipient: PlayerId) -> Result<GetInfoResult> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ClientCommand::GetInfo { recipient, promise }; let cmd = ClientCommand::GetInfo { recipient, promise };
self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await?;
deferred.await? deferred.await?
} }
} }
@ -152,27 +152,27 @@ pub struct PlayerHandle {
tx: Sender<(ActorCommand, Span)>, tx: Sender<(ActorCommand, Span)>,
} }
impl PlayerHandle { impl PlayerHandle {
pub async fn subscribe(&self) -> PlayerConnection { pub async fn subscribe(&self) -> Result<PlayerConnection> {
let (sender, receiver) = channel(32); let (sender, receiver) = channel(32);
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ActorCommand::AddConnection { sender, promise }; let cmd = ActorCommand::AddConnection { sender, promise };
self.send(cmd).await; self.send(cmd).await?;
let connection_id = deferred.await.unwrap(); let connection_id = deferred.await?;
PlayerConnection { Ok(PlayerConnection {
connection_id, connection_id,
player_handle: self.clone(), player_handle: self.clone(),
receiver, receiver,
} })
} }
async fn send(&self, command: ActorCommand) { async fn send(&self, command: ActorCommand) -> Result<()> {
let span = tracing::span!(tracing::Level::INFO, "PlayerHandle::send"); let span = tracing::span!(tracing::Level::INFO, "PlayerHandle::send");
// TODO either handle the error or doc why it is safe to ignore self.tx.send((command, span)).await?;
let _ = self.tx.send((command, span)).await; Ok(())
} }
pub async fn update(&self, update: Updates) { pub async fn update(&self, update: Updates) {
self.send(ActorCommand::Update(update)).await; let _ = self.send(ActorCommand::Update(update)).await;
} }
} }
@ -311,7 +311,9 @@ impl PlayerRegistry {
pub async fn stop_player(&self, id: &PlayerId) -> Result<Option<()>> { pub async fn stop_player(&self, id: &PlayerId) -> Result<Option<()>> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
if let Some((handle, fiber)) = inner.players.remove(id) { if let Some((handle, fiber)) = inner.players.remove(id) {
handle.send(ActorCommand::Stop).await; if let Err(_) = handle.send(ActorCommand::Stop).await {
log::warn!("Failed to send Stop to the player actor #{id:?}. Ignoring, it is probably stopped already");
}
drop(handle); drop(handle);
fiber.await?; fiber.await?;
inner.metric_active_players.dec(); inner.metric_active_players.dec();
@ -347,13 +349,14 @@ impl PlayerRegistry {
let Some(player_handle) = self.get_or_launch_player(core, id).await? else { let Some(player_handle) = self.get_or_launch_player(core, id).await? else {
return Ok(PlayerConnectionResult::PlayerNotFound); return Ok(PlayerConnectionResult::PlayerNotFound);
}; };
Ok(PlayerConnectionResult::Success(player_handle.subscribe().await)) let new_conn = player_handle.subscribe().await?;
Ok(PlayerConnectionResult::Success(new_conn))
} }
pub async fn shutdown_all(&self) { pub async fn shutdown_all(&self) {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
for (id, (handle, task)) in inner.players.drain() { for (id, (handle, task)) in inner.players.drain() {
handle.send(ActorCommand::Stop).await; let _ = handle.send(ActorCommand::Stop).await;
drop(handle); drop(handle);
match task.await { match task.await {
Ok(_) => log::debug!("Player stopped #{id:?}"), Ok(_) => log::debug!("Player stopped #{id:?}"),

View File

@ -551,7 +551,7 @@ async fn handle_registered_socket<'a>(
.await?; .await?;
writer.flush().await?; writer.flush().await?;
connection.terminate().await; connection.terminate().await?;
Ok(()) Ok(())
} }