From e76ebf060a2e4d5f38906d93fd957384ab358bac Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Thu, 25 Apr 2024 18:31:38 +0200 Subject: [PATCH] instrument player send msg --- crates/lavina-core/src/player.rs | 50 ++++++++++++++++++++--------- crates/lavina-core/src/repo/room.rs | 3 ++ crates/lavina-core/src/room.rs | 1 + 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 9f85ace..660e187 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -15,6 +15,7 @@ use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; +use tracing::{Instrument, Span}; use crate::dialog::DialogRegistry; use crate::prelude::*; @@ -128,14 +129,14 @@ impl PlayerConnection { /// Handle to a player actor. #[derive(Clone)] pub struct PlayerHandle { - tx: Sender, + tx: Sender<(ActorCommand, Span)>, } impl PlayerHandle { pub async fn subscribe(&self) -> PlayerConnection { let (sender, receiver) = channel(32); let (promise, deferred) = oneshot(); let cmd = ActorCommand::AddConnection { sender, promise }; - let _ = self.tx.send(cmd).await; + self.send(cmd).await; let connection_id = deferred.await.unwrap(); PlayerConnection { connection_id, @@ -145,8 +146,9 @@ impl PlayerHandle { } async fn send(&self, command: ActorCommand) { + let span = tracing::span!(tracing::Level::DEBUG, "PlayerHandle::send"); // TODO either handle the error or doc why it is safe to ignore - let _ = self.tx.send(command).await; + let _ = self.tx.send((command, span)).await; } pub async fn update(&self, update: Updates) { @@ -338,7 +340,7 @@ struct Player { connections: AnonTable>, my_rooms: HashMap, banned_from: HashSet, - rx: Receiver, + rx: Receiver<(ActorCommand, Span)>, handle: PlayerHandle, rooms: RoomRegistry, dialogs: DialogRegistry, @@ -385,20 +387,36 @@ impl Player { } } while let Some(cmd) = self.rx.recv().await { - match cmd { - ActorCommand::AddConnection { sender, promise } => { - let connection_id = self.connections.insert(sender); - if let Err(connection_id) = promise.send(ConnectionId(connection_id)) { - log::warn!("Connection {connection_id:?} terminated before finalization"); - self.terminate_connection(connection_id); + let (cmd, span) = cmd; + let should_stop = async { + match cmd { + ActorCommand::AddConnection { sender, promise } => { + let connection_id = self.connections.insert(sender); + if let Err(connection_id) = promise.send(ConnectionId(connection_id)) { + log::warn!("Connection {connection_id:?} terminated before finalization"); + self.terminate_connection(connection_id); + } + false } + ActorCommand::TerminateConnection(connection_id) => { + self.terminate_connection(connection_id); + false + } + ActorCommand::Update(update) => { + self.handle_update(update).await; + false + } + ActorCommand::ClientCommand(cmd, connection_id) => { + self.handle_cmd(cmd, connection_id).await; + false + } + ActorCommand::Stop => true, } - ActorCommand::TerminateConnection(connection_id) => { - self.terminate_connection(connection_id); - } - ActorCommand::Update(update) => self.handle_update(update).await, - ActorCommand::ClientCommand(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, - ActorCommand::Stop => break, + } + .instrument(span) + .await; + if should_stop { + break; } } log::debug!("Shutting down player actor #{:?}", self.player_id); diff --git a/crates/lavina-core/src/repo/room.rs b/crates/lavina-core/src/repo/room.rs index 96b89f2..38de47d 100644 --- a/crates/lavina-core/src/repo/room.rs +++ b/crates/lavina-core/src/repo/room.rs @@ -3,6 +3,7 @@ use anyhow::Result; use crate::repo::Storage; impl Storage { + #[tracing::instrument(skip(self), name = "Storage::add_room_member")] pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { let mut executor = self.conn.lock().await; sqlx::query( @@ -17,6 +18,7 @@ impl Storage { Ok(()) } + #[tracing::instrument(skip(self), name = "Storage::remove_room_member")] pub async fn remove_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { let mut executor = self.conn.lock().await; sqlx::query( @@ -31,6 +33,7 @@ impl Storage { Ok(()) } + #[tracing::instrument(skip(self, topic), name = "Storage::set_room_topic")] pub async fn set_room_topic(&mut self, id: u32, topic: &str) -> Result<()> { let mut executor = self.conn.lock().await; sqlx::query( diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index b8ce505..49de286 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -59,6 +59,7 @@ impl RoomRegistry { Ok(()) } + #[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")] pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result { let mut inner = self.0.write().await; if let Some(room_handle) = inner.get_or_load_room(&room_id).await? {