forked from lavina/lavina
1
0
Fork 0

de-arcify RoomRegistry

This commit is contained in:
Nikita Vilunov 2023-09-06 18:23:33 +02:00
parent 5818c0f516
commit 99980720be
3 changed files with 29 additions and 34 deletions

View File

@ -7,23 +7,17 @@
//! //!
//! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor, //! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor,
//! so that they don't overload the room actor. //! so that they don't overload the room actor.
use std::{ use std::collections::{HashMap, HashSet};
collections::{HashMap, HashSet}, use std::sync::RwLock;
sync::{Arc, RwLock},
};
use futures_util::FutureExt;
use prometheus::{IntGauge, Registry as MetricsRegistry}; use prometheus::{IntGauge, Registry as MetricsRegistry};
use serde::Serialize; use serde::Serialize;
use tokio::{ use tokio::sync::mpsc::{channel, Receiver, Sender};
sync::mpsc::{channel, Receiver, Sender},
task::JoinHandle,
};
use crate::{ use crate::util::table::{AnonTable, Key as AnonKey};
core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}, use crate::prelude::*;
prelude::*, use crate::core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry};
util::table::{AnonTable, Key as AnonKey},
};
/// Opaque player identifier. Cannot contain spaces, must be shorter than 32. /// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
@ -236,21 +230,23 @@ impl<'a> PlayerRegistry<'a> {
let metric_active_players = let metric_active_players =
IntGauge::new("chat_players_active", "Number of alive player actors")?; IntGauge::new("chat_players_active", "Number of alive player actors")?;
metrics.register(Box::new(metric_active_players.clone()))?; metrics.register(Box::new(metric_active_players.clone()))?;
let scope = unsafe { Scope::create() };
let inner = PlayerRegistryInner { let inner = PlayerRegistryInner {
room_registry, room_registry,
players: HashMap::new(), players: HashMap::new(),
metric_active_players, metric_active_players,
scope,
}; };
Ok(PlayerRegistry(RwLock::new(inner))) Ok(PlayerRegistry(RwLock::new(inner)))
} }
pub async fn get_or_create_player(&self, id: PlayerId) -> PlayerHandle { pub async fn get_or_create_player(&self, id: PlayerId) -> PlayerHandle {
let mut inner = self.0.write().unwrap(); let mut inner = self.0.write().unwrap();
if let Some((handle, _)) = inner.players.get(&id) { if let Some(handle) = inner.players.get(&id) {
handle.clone() handle.clone()
} else { } else {
let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone()); let handle = Player::launch(id.clone(), inner.room_registry, &mut inner.scope);
inner.players.insert(id, (handle.clone(), fiber)); inner.players.insert(id, handle.clone());
inner.metric_active_players.inc(); inner.metric_active_players.inc();
handle handle
} }
@ -261,15 +257,15 @@ impl<'a> PlayerRegistry<'a> {
player_handle.subscribe().await player_handle.subscribe().await
} }
pub async fn shutdown_all(&mut self) -> Result<()> { pub async fn shutdown_all(self) -> Result<()> {
let mut inner = self.0.write().unwrap(); let mut inner = self.0.write().unwrap();
let mut players = HashMap::new(); let mut players = HashMap::new();
std::mem::swap(&mut players, &mut inner.players); std::mem::swap(&mut players, &mut inner.players);
for (i, (k, j)) in inner.players.drain() { for (i, k) in inner.players.drain() {
drop(k); drop(k);
j.await?; log::debug!("Stopping player #{i:?}")
log::debug!("Player stopped #{i:?}")
} }
let _ = inner.scope.collect().await;
log::debug!("All players stopped"); log::debug!("All players stopped");
Ok(()) Ok(())
} }
@ -278,22 +274,23 @@ impl<'a> PlayerRegistry<'a> {
/// The player registry state representation. /// The player registry state representation.
struct PlayerRegistryInner<'a> { struct PlayerRegistryInner<'a> {
room_registry: &'a RoomRegistry, room_registry: &'a RoomRegistry,
players: HashMap<PlayerId, (PlayerHandle, JoinHandle<Player>)>, players: HashMap<PlayerId, PlayerHandle>,
metric_active_players: IntGauge, metric_active_players: IntGauge,
scope: Scope<'a>,
} }
/// Player actor inner state representation. /// Player actor inner state representation.
struct Player { struct Player<'a> {
player_id: PlayerId, player_id: PlayerId,
connections: AnonTable<Sender<Updates>>, connections: AnonTable<Sender<Updates>>,
my_rooms: HashMap<RoomId, RoomHandle>, my_rooms: HashMap<RoomId, RoomHandle>,
banned_from: HashSet<RoomId>, banned_from: HashSet<RoomId>,
rx: Receiver<PlayerCommand>, rx: Receiver<PlayerCommand>,
handle: PlayerHandle, handle: PlayerHandle,
rooms: RoomRegistry, rooms: &'a RoomRegistry,
} }
impl Player { impl<'a> Player<'a> {
fn launch(player_id: PlayerId, rooms: RoomRegistry) -> (PlayerHandle, JoinHandle<Player>) { fn launch(player_id: PlayerId, rooms: &'a RoomRegistry, scope: &mut Scope<'a>) -> PlayerHandle {
let (tx, rx) = channel(32); let (tx, rx) = channel(32);
let handle = PlayerHandle { tx }; let handle = PlayerHandle { tx };
let handle_clone = handle.clone(); let handle_clone = handle.clone();
@ -306,11 +303,11 @@ impl Player {
handle, handle,
rooms, rooms,
}; };
let fiber = tokio::task::spawn(player.main_loop()); scope.spawn(player.main_loop().map(|_| ()));
(handle_clone, fiber) handle_clone
} }
async fn main_loop(mut self) -> Self { async fn main_loop(mut self) -> Player<'a> {
while let Some(cmd) = self.rx.recv().await { while let Some(cmd) = self.rx.recv().await {
match cmd { match cmd {
PlayerCommand::AddConnection { sender, promise } => { PlayerCommand::AddConnection { sender, promise } => {

View File

@ -38,8 +38,7 @@ impl RoomId {
} }
/// Shared datastructure for storing metadata about rooms. /// Shared datastructure for storing metadata about rooms.
#[derive(Clone)] pub struct RoomRegistry(AsyncRwLock<RoomRegistryInner>);
pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
impl RoomRegistry { impl RoomRegistry {
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> { pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> {
let metric_active_rooms = let metric_active_rooms =
@ -50,10 +49,10 @@ impl RoomRegistry {
metric_active_rooms, metric_active_rooms,
storage, storage,
}; };
Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner)))) Ok(RoomRegistry(AsyncRwLock::new(inner)))
} }
pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> { pub async fn get_or_create_room(&self, room_id: RoomId) -> Result<RoomHandle> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
if let Some(room_handle) = inner.rooms.get(&room_id) { if let Some(room_handle) = inner.rooms.get(&room_id) {
// room was already loaded into memory // room was already loaded into memory

View File

@ -54,7 +54,7 @@ async fn main() -> Result<()> {
let mut metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
let mut players = PlayerRegistry::empty(&rooms, &metrics)?; let players = PlayerRegistry::empty(&rooms, &metrics)?;
// unsafe: outer future is never dropped, scope is joined on `scope.collect` // unsafe: outer future is never dropped, scope is joined on `scope.collect`
let mut scope = unsafe { Scope::create() }; let mut scope = unsafe { Scope::create() };
@ -73,7 +73,6 @@ async fn main() -> Result<()> {
drop(scope); drop(scope);
players.shutdown_all().await?; players.shutdown_all().await?;
drop(players);
drop(rooms); drop(rooms);
storage.close().await?; storage.close().await?;
tracing::info!("Shutdown complete"); tracing::info!("Shutdown complete");