From 99980720be638b6db585782534d348b43cd8ba3c Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Wed, 6 Sep 2023 18:23:33 +0200 Subject: [PATCH] de-arcify RoomRegistry --- src/core/player.rs | 53 ++++++++++++++++++++++------------------------ src/core/room.rs | 7 +++--- src/main.rs | 3 +-- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/src/core/player.rs b/src/core/player.rs index b3027f2..8386a9b 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -7,23 +7,17 @@ //! //! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor, //! so that they don't overload the room actor. -use std::{ - collections::{HashMap, HashSet}, - sync::{Arc, RwLock}, -}; +use std::collections::{HashMap, HashSet}; +use std::sync::RwLock; +use futures_util::FutureExt; use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; -use tokio::{ - sync::mpsc::{channel, Receiver, Sender}, - task::JoinHandle, -}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; -use crate::{ - core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}, - prelude::*, - util::table::{AnonTable, Key as AnonKey}, -}; +use crate::util::table::{AnonTable, Key as AnonKey}; +use crate::prelude::*; +use crate::core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; /// Opaque player identifier. Cannot contain spaces, must be shorter than 32. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -236,21 +230,23 @@ impl<'a> PlayerRegistry<'a> { let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; metrics.register(Box::new(metric_active_players.clone()))?; + let scope = unsafe { Scope::create() }; let inner = PlayerRegistryInner { room_registry, players: HashMap::new(), metric_active_players, + scope, }; Ok(PlayerRegistry(RwLock::new(inner))) } pub async fn get_or_create_player(&self, id: PlayerId) -> PlayerHandle { let mut inner = self.0.write().unwrap(); - if let Some((handle, _)) = inner.players.get(&id) { + if let Some(handle) = inner.players.get(&id) { handle.clone() } else { - let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone()); - inner.players.insert(id, (handle.clone(), fiber)); + let handle = Player::launch(id.clone(), inner.room_registry, &mut inner.scope); + inner.players.insert(id, handle.clone()); inner.metric_active_players.inc(); handle } @@ -261,15 +257,15 @@ impl<'a> PlayerRegistry<'a> { player_handle.subscribe().await } - pub async fn shutdown_all(&mut self) -> Result<()> { + pub async fn shutdown_all(self) -> Result<()> { let mut inner = self.0.write().unwrap(); let mut players = HashMap::new(); std::mem::swap(&mut players, &mut inner.players); - for (i, (k, j)) in inner.players.drain() { + for (i, k) in inner.players.drain() { drop(k); - j.await?; - log::debug!("Player stopped #{i:?}") + log::debug!("Stopping player #{i:?}") } + let _ = inner.scope.collect().await; log::debug!("All players stopped"); Ok(()) } @@ -278,22 +274,23 @@ impl<'a> PlayerRegistry<'a> { /// The player registry state representation. struct PlayerRegistryInner<'a> { room_registry: &'a RoomRegistry, - players: HashMap)>, + players: HashMap, metric_active_players: IntGauge, + scope: Scope<'a>, } /// Player actor inner state representation. -struct Player { +struct Player<'a> { player_id: PlayerId, connections: AnonTable>, my_rooms: HashMap, banned_from: HashSet, rx: Receiver, handle: PlayerHandle, - rooms: RoomRegistry, + rooms: &'a RoomRegistry, } -impl Player { - fn launch(player_id: PlayerId, rooms: RoomRegistry) -> (PlayerHandle, JoinHandle) { +impl<'a> Player<'a> { + fn launch(player_id: PlayerId, rooms: &'a RoomRegistry, scope: &mut Scope<'a>) -> PlayerHandle { let (tx, rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); @@ -306,11 +303,11 @@ impl Player { handle, rooms, }; - let fiber = tokio::task::spawn(player.main_loop()); - (handle_clone, fiber) + scope.spawn(player.main_loop().map(|_| ())); + handle_clone } - async fn main_loop(mut self) -> Self { + async fn main_loop(mut self) -> Player<'a> { while let Some(cmd) = self.rx.recv().await { match cmd { PlayerCommand::AddConnection { sender, promise } => { diff --git a/src/core/room.rs b/src/core/room.rs index 47826bb..f4d7adb 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -38,8 +38,7 @@ impl RoomId { } /// Shared datastructure for storing metadata about rooms. -#[derive(Clone)] -pub struct RoomRegistry(Arc>); +pub struct RoomRegistry(AsyncRwLock); impl RoomRegistry { pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result { let metric_active_rooms = @@ -50,10 +49,10 @@ impl RoomRegistry { metric_active_rooms, storage, }; - Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner)))) + Ok(RoomRegistry(AsyncRwLock::new(inner))) } - pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result { + pub async fn get_or_create_room(&self, room_id: RoomId) -> Result { let mut inner = self.0.write().await; if let Some(room_handle) = inner.rooms.get(&room_id) { // room was already loaded into memory diff --git a/src/main.rs b/src/main.rs index 211e62f..e352ab4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,7 +54,7 @@ async fn main() -> Result<()> { let mut metrics = MetricsRegistry::new(); let storage = Storage::open(storage_config).await?; let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; - let mut players = PlayerRegistry::empty(&rooms, &metrics)?; + let players = PlayerRegistry::empty(&rooms, &metrics)?; // unsafe: outer future is never dropped, scope is joined on `scope.collect` let mut scope = unsafe { Scope::create() }; @@ -73,7 +73,6 @@ async fn main() -> Result<()> { drop(scope); players.shutdown_all().await?; - drop(players); drop(rooms); storage.close().await?; tracing::info!("Shutdown complete");