forked from lavina/lavina
1
0
Fork 0

instrument player send msg

This commit is contained in:
Nikita Vilunov 2024-04-25 18:31:38 +02:00
parent af52c9371c
commit e76ebf060a
3 changed files with 38 additions and 16 deletions

View File

@ -15,6 +15,7 @@ use prometheus::{IntGauge, Registry as MetricsRegistry};
use serde::Serialize; use serde::Serialize;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{Instrument, Span};
use crate::dialog::DialogRegistry; use crate::dialog::DialogRegistry;
use crate::prelude::*; use crate::prelude::*;
@ -128,14 +129,14 @@ impl PlayerConnection {
/// Handle to a player actor. /// Handle to a player actor.
#[derive(Clone)] #[derive(Clone)]
pub struct PlayerHandle { pub struct PlayerHandle {
tx: Sender<ActorCommand>, tx: Sender<(ActorCommand, Span)>,
} }
impl PlayerHandle { impl PlayerHandle {
pub async fn subscribe(&self) -> PlayerConnection { pub async fn subscribe(&self) -> PlayerConnection {
let (sender, receiver) = channel(32); let (sender, receiver) = channel(32);
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
let cmd = ActorCommand::AddConnection { sender, promise }; let cmd = ActorCommand::AddConnection { sender, promise };
let _ = self.tx.send(cmd).await; self.send(cmd).await;
let connection_id = deferred.await.unwrap(); let connection_id = deferred.await.unwrap();
PlayerConnection { PlayerConnection {
connection_id, connection_id,
@ -145,8 +146,9 @@ impl PlayerHandle {
} }
async fn send(&self, command: ActorCommand) { async fn send(&self, command: ActorCommand) {
let span = tracing::span!(tracing::Level::DEBUG, "PlayerHandle::send");
// TODO either handle the error or doc why it is safe to ignore // TODO either handle the error or doc why it is safe to ignore
let _ = self.tx.send(command).await; let _ = self.tx.send((command, span)).await;
} }
pub async fn update(&self, update: Updates) { pub async fn update(&self, update: Updates) {
@ -338,7 +340,7 @@ struct Player {
connections: AnonTable<Sender<Updates>>, connections: AnonTable<Sender<Updates>>,
my_rooms: HashMap<RoomId, RoomHandle>, my_rooms: HashMap<RoomId, RoomHandle>,
banned_from: HashSet<RoomId>, banned_from: HashSet<RoomId>,
rx: Receiver<ActorCommand>, rx: Receiver<(ActorCommand, Span)>,
handle: PlayerHandle, handle: PlayerHandle,
rooms: RoomRegistry, rooms: RoomRegistry,
dialogs: DialogRegistry, dialogs: DialogRegistry,
@ -385,6 +387,8 @@ impl Player {
} }
} }
while let Some(cmd) = self.rx.recv().await { while let Some(cmd) = self.rx.recv().await {
let (cmd, span) = cmd;
let should_stop = async {
match cmd { match cmd {
ActorCommand::AddConnection { sender, promise } => { ActorCommand::AddConnection { sender, promise } => {
let connection_id = self.connections.insert(sender); let connection_id = self.connections.insert(sender);
@ -392,13 +396,27 @@ impl Player {
log::warn!("Connection {connection_id:?} terminated before finalization"); log::warn!("Connection {connection_id:?} terminated before finalization");
self.terminate_connection(connection_id); self.terminate_connection(connection_id);
} }
false
} }
ActorCommand::TerminateConnection(connection_id) => { ActorCommand::TerminateConnection(connection_id) => {
self.terminate_connection(connection_id); self.terminate_connection(connection_id);
false
} }
ActorCommand::Update(update) => self.handle_update(update).await, ActorCommand::Update(update) => {
ActorCommand::ClientCommand(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, self.handle_update(update).await;
ActorCommand::Stop => break, false
}
ActorCommand::ClientCommand(cmd, connection_id) => {
self.handle_cmd(cmd, connection_id).await;
false
}
ActorCommand::Stop => true,
}
}
.instrument(span)
.await;
if should_stop {
break;
} }
} }
log::debug!("Shutting down player actor #{:?}", self.player_id); log::debug!("Shutting down player actor #{:?}", self.player_id);

View File

@ -3,6 +3,7 @@ use anyhow::Result;
use crate::repo::Storage; use crate::repo::Storage;
impl Storage { impl Storage {
#[tracing::instrument(skip(self), name = "Storage::add_room_member")]
pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
sqlx::query( sqlx::query(
@ -17,6 +18,7 @@ impl Storage {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self), name = "Storage::remove_room_member")]
pub async fn remove_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { pub async fn remove_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
sqlx::query( sqlx::query(
@ -31,6 +33,7 @@ impl Storage {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, topic), name = "Storage::set_room_topic")]
pub async fn set_room_topic(&mut self, id: u32, topic: &str) -> Result<()> { pub async fn set_room_topic(&mut self, id: u32, topic: &str) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
sqlx::query( sqlx::query(

View File

@ -59,6 +59,7 @@ impl RoomRegistry {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")]
pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> { pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? { if let Some(room_handle) = inner.get_or_load_room(&room_id).await? {