forked from lavina/lavina
1
0
Fork 0

remove arcs

This commit is contained in:
Nikita Vilunov 2024-05-11 01:56:39 +02:00
parent e123fe85ca
commit a8a4b1c490
9 changed files with 68 additions and 106 deletions

View File

@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use reqwest::Client; use reqwest::Client;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::{DefaultSpanBackend, TracingMiddleware}; use reqwest_tracing::{DefaultSpanBackend, TracingMiddleware};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
pub mod broadcast; pub mod broadcast;
pub mod room; pub mod room;
@ -27,19 +27,15 @@ pub struct ClusterMetadata {
pub rooms: HashMap<String, u32>, pub rooms: HashMap<String, u32>,
} }
#[derive(Clone)]
pub struct LavinaClient { pub struct LavinaClient {
addresses: Arc<Addresses>, addresses: Addresses,
client: ClientWithMiddleware, client: ClientWithMiddleware,
} }
impl LavinaClient { impl LavinaClient {
pub fn new(addresses: Addresses) -> Self { pub fn new(addresses: Addresses) -> Self {
let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build(); let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build();
Self { Self { addresses, client }
addresses: Arc::new(addresses),
client,
}
} }
async fn send_request(&self, node_id: u32, path: &str, req: impl Serialize) -> Result<()> { async fn send_request(&self, node_id: u32, path: &str, req: impl Serialize) -> Result<()> {

View File

@ -1,5 +1,4 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -13,16 +12,13 @@ struct BroadcastingInner {
subscriptions: HashMap<RoomId, HashSet<PlayerId>>, subscriptions: HashMap<RoomId, HashSet<PlayerId>>,
} }
impl Broadcasting {} pub struct Broadcasting(Mutex<BroadcastingInner>);
#[derive(Clone)]
pub struct Broadcasting(Arc<Mutex<BroadcastingInner>>);
impl Broadcasting { impl Broadcasting {
pub fn new() -> Self { pub fn new() -> Self {
let inner = BroadcastingInner { let inner = BroadcastingInner {
subscriptions: HashMap::new(), subscriptions: HashMap::new(),
}; };
Self(Arc::new(Mutex::new(inner))) Self(Mutex::new(inner))
} }
/// Broadcasts the given update to subscribed player actors on local node. /// Broadcasts the given update to subscribed player actors on local node.

View File

@ -4,7 +4,6 @@
//! There are no admins or other roles in dialogs, both participants have equal rights. //! There are no admins or other roles in dialogs, both participants have equal rights.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
@ -47,8 +46,7 @@ struct DialogRegistryInner {
dialogs: HashMap<DialogId, AsyncRwLock<Dialog>>, dialogs: HashMap<DialogId, AsyncRwLock<Dialog>>,
} }
#[derive(Clone)] pub(crate) struct DialogRegistry(AsyncRwLock<DialogRegistryInner>);
pub(crate) struct DialogRegistry(Arc<AsyncRwLock<DialogRegistryInner>>);
impl DialogRegistry { impl DialogRegistry {
pub async fn send_message( pub async fn send_message(
@ -129,20 +127,12 @@ impl DialogRegistry {
impl DialogRegistry { impl DialogRegistry {
pub fn new() -> DialogRegistry { pub fn new() -> DialogRegistry {
DialogRegistry(Arc::new(AsyncRwLock::new(DialogRegistryInner { DialogRegistry(AsyncRwLock::new(DialogRegistryInner {
dialogs: HashMap::new(), dialogs: HashMap::new(),
}))) }))
} }
pub fn shutdown(self) -> Result<()> { pub fn shutdown(self) {}
let res = match Arc::try_unwrap(self.0) {
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire dialogs ownership on shutdown")),
};
let res = res.into_inner();
drop(res);
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,4 +1,5 @@
//! Domain definitions and implementation of common chat logic. //! Domain definitions and implementation of common chat logic.
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
@ -24,26 +25,26 @@ mod table;
#[derive(Clone)] #[derive(Clone)]
pub struct LavinaCore { pub struct LavinaCore {
services: Services, services: Arc<Services>,
}
impl Deref for LavinaCore {
type Target = Services;
fn deref(&self) -> &Self::Target {
&self.services
}
} }
impl LavinaCore { impl LavinaCore {
pub async fn connect_to_player(&self, player_id: &PlayerId) -> PlayerConnection { pub async fn connect_to_player(&self, player_id: &PlayerId) -> PlayerConnection {
self.services.players.connect_to_player(&self.services, player_id).await self.services.players.connect_to_player(&self, player_id).await
}
pub fn rooms(&self) -> &RoomRegistry {
&self.services.rooms
} }
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> { pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
self.services.rooms.get_room(&self.services, room_id).await self.services.rooms.get_room(&self.services, room_id).await
} }
pub async fn get_or_create_room(&self, room_id: RoomId) -> Result<RoomHandle> {
self.services.rooms.get_or_create_room(&self.services, room_id).await
}
pub async fn create_player(&self, player_id: &PlayerId) -> Result<()> { pub async fn create_player(&self, player_id: &PlayerId) -> Result<()> {
self.services.storage.create_user(player_id.as_inner()).await self.services.storage.create_user(player_id.as_inner()).await
} }
@ -57,8 +58,7 @@ impl LavinaCore {
} }
} }
#[derive(Clone)] pub struct Services {
pub(crate) struct Services {
pub(crate) players: PlayerRegistry, pub(crate) players: PlayerRegistry,
pub(crate) rooms: RoomRegistry, pub(crate) rooms: RoomRegistry,
pub(crate) dialogs: DialogRegistry, pub(crate) dialogs: DialogRegistry,
@ -82,23 +82,31 @@ impl LavinaCore {
let players = PlayerRegistry::empty(metrics)?; let players = PlayerRegistry::empty(metrics)?;
let services = Services { let services = Services {
players: players.clone(), players,
rooms: rooms.clone(), rooms,
dialogs: dialogs.clone(), dialogs,
broadcasting: broadcasting.clone(), broadcasting,
client, client,
storage, storage,
cluster_metadata: Arc::new(cluster_config.metadata), cluster_metadata: Arc::new(cluster_config.metadata),
}; };
Ok(LavinaCore { services }) Ok(LavinaCore {
services: Arc::new(services),
})
} }
pub async fn shutdown(mut self) -> Storage { pub async fn shutdown(self) -> Storage {
let _ = self.services.players.shutdown_all().await; let _ = self.players.shutdown_all().await;
let _ = self.services.players.shutdown(); let services = match Arc::try_unwrap(self.services) {
let _ = self.services.dialogs.shutdown(); Ok(e) => e,
let _ = self.services.rooms.shutdown(); Err(_) => {
self.services.storage panic!("failed to acquire services ownership on shutdown");
}
};
let _ = services.players.shutdown();
let _ = services.dialogs.shutdown();
let _ = services.rooms.shutdown();
services.storage
} }
} }

View File

@ -8,7 +8,6 @@
//! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor, //! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor,
//! so that they don't overload the room actor. //! so that they don't overload the room actor.
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use prometheus::{IntGauge, Registry as MetricsRegistry}; use prometheus::{IntGauge, Registry as MetricsRegistry};
@ -21,7 +20,7 @@ use crate::clustering::room::*;
use crate::prelude::*; use crate::prelude::*;
use crate::room::{RoomHandle, RoomId, RoomInfo}; use crate::room::{RoomHandle, RoomId, RoomInfo};
use crate::table::{AnonTable, Key as AnonKey}; use crate::table::{AnonTable, Key as AnonKey};
use crate::Services; use crate::LavinaCore;
/// Opaque player identifier. Cannot contain spaces, must be shorter than 32. /// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
@ -263,8 +262,7 @@ pub enum Updates {
} }
/// Handle to a player registry — a shared data structure containing information about players. /// Handle to a player registry — a shared data structure containing information about players.
#[derive(Clone)] pub(crate) struct PlayerRegistry(RwLock<PlayerRegistryInner>);
pub(crate) struct PlayerRegistry(Arc<RwLock<PlayerRegistryInner>>);
impl PlayerRegistry { impl PlayerRegistry {
pub fn empty(metrics: &mut MetricsRegistry) -> Result<PlayerRegistry> { pub fn empty(metrics: &mut MetricsRegistry) -> Result<PlayerRegistry> {
let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?;
@ -273,17 +271,12 @@ impl PlayerRegistry {
players: HashMap::new(), players: HashMap::new(),
metric_active_players, metric_active_players,
}; };
Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) Ok(PlayerRegistry(RwLock::new(inner)))
} }
pub fn shutdown(self) -> Result<()> { pub fn shutdown(self) {
let res = match Arc::try_unwrap(self.0) { let res = self.0.into_inner();
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire players ownership on shutdown")),
};
let res = res.into_inner();
drop(res); drop(res);
Ok(())
} }
#[tracing::instrument(skip(self), name = "PlayerRegistry::get_player")] #[tracing::instrument(skip(self), name = "PlayerRegistry::get_player")]
@ -306,8 +299,8 @@ impl PlayerRegistry {
} }
} }
#[tracing::instrument(skip(self, services), name = "PlayerRegistry::get_or_launch_player")] #[tracing::instrument(skip(self, core), name = "PlayerRegistry::get_or_launch_player")]
pub async fn get_or_launch_player(&self, services: &Services, id: &PlayerId) -> PlayerHandle { pub async fn get_or_launch_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerHandle {
let inner = self.0.read().await; let inner = self.0.read().await;
if let Some((handle, _)) = inner.players.get(id) { if let Some((handle, _)) = inner.players.get(id) {
handle.clone() handle.clone()
@ -317,7 +310,7 @@ impl PlayerRegistry {
if let Some((handle, _)) = inner.players.get(id) { if let Some((handle, _)) = inner.players.get(id) {
handle.clone() handle.clone()
} else { } else {
let (handle, fiber) = Player::launch(id.clone(), services.clone()).await; let (handle, fiber) = Player::launch(id.clone(), core.clone()).await;
inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.players.insert(id.clone(), (handle.clone(), fiber));
inner.metric_active_players.inc(); inner.metric_active_players.inc();
handle handle
@ -325,13 +318,13 @@ impl PlayerRegistry {
} }
} }
#[tracing::instrument(skip(self, services), name = "PlayerRegistry::connect_to_player")] #[tracing::instrument(skip(self, core), name = "PlayerRegistry::connect_to_player")]
pub async fn connect_to_player(&self, services: &Services, id: &PlayerId) -> PlayerConnection { pub async fn connect_to_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerConnection {
let player_handle = self.get_or_launch_player(services, id).await; let player_handle = self.get_or_launch_player(core, id).await;
player_handle.subscribe().await player_handle.subscribe().await
} }
pub async fn shutdown_all(&mut self) -> Result<()> { pub async fn shutdown_all(&self) -> Result<()> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
for (i, (k, j)) in inner.players.drain() { for (i, (k, j)) in inner.players.drain() {
k.send(ActorCommand::Stop).await; k.send(ActorCommand::Stop).await;
@ -365,14 +358,14 @@ struct Player {
banned_from: HashSet<RoomId>, banned_from: HashSet<RoomId>,
rx: Receiver<(ActorCommand, Span)>, rx: Receiver<(ActorCommand, Span)>,
handle: PlayerHandle, handle: PlayerHandle,
services: Services, services: LavinaCore,
} }
impl Player { impl Player {
async fn launch(player_id: PlayerId, services: Services) -> (PlayerHandle, JoinHandle<Player>) { async fn launch(player_id: PlayerId, core: LavinaCore) -> (PlayerHandle, JoinHandle<Player>) {
let (tx, rx) = channel(32); let (tx, rx) = channel(32);
let handle = PlayerHandle { tx }; let handle = PlayerHandle { tx };
let handle_clone = handle.clone(); let handle_clone = handle.clone();
let storage_id = services.storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap(); let storage_id = core.services.storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap();
let player = Player { let player = Player {
player_id, player_id,
storage_id, storage_id,
@ -384,7 +377,7 @@ impl Player {
banned_from: HashSet::new(), banned_from: HashSet::new(),
rx, rx,
handle, handle,
services, services: core,
}; };
let fiber = tokio::task::spawn(player.main_loop()); let fiber = tokio::task::spawn(player.main_loop());
(handle_clone, fiber) (handle_clone, fiber)

View File

@ -1,7 +1,6 @@
//! Storage and persistence logic. //! Storage and persistence logic.
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use serde::Deserialize; use serde::Deserialize;
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
@ -20,9 +19,8 @@ pub struct StorageConfig {
pub db_path: String, pub db_path: String,
} }
#[derive(Clone)]
pub struct Storage { pub struct Storage {
conn: Arc<Mutex<SqliteConnection>>, conn: Mutex<SqliteConnection>,
} }
impl Storage { impl Storage {
pub async fn open(config: StorageConfig) -> Result<Storage> { pub async fn open(config: StorageConfig) -> Result<Storage> {
@ -34,19 +32,12 @@ impl Storage {
migrator.run(&mut conn).await?; migrator.run(&mut conn).await?;
log::info!("Migrations passed"); log::info!("Migrations passed");
let conn = Arc::new(Mutex::new(conn)); let conn = Mutex::new(conn);
Ok(Storage { conn }) Ok(Storage { conn })
} }
pub async fn close(self) { pub async fn close(self) {
let res = match Arc::try_unwrap(self.conn) { let res = self.conn.into_inner();
Ok(e) => e,
Err(_) => {
tracing::error!("failed to acquire DB ownership on shutdown");
return;
}
};
let res = res.into_inner();
match res.close().await { match res.close().await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {

View File

@ -35,8 +35,7 @@ impl RoomId {
} }
/// Shared data structure for storing metadata about rooms. /// Shared data structure for storing metadata about rooms.
#[derive(Clone)] pub(crate) struct RoomRegistry(AsyncRwLock<RoomRegistryInner>);
pub(crate) struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
impl RoomRegistry { impl RoomRegistry {
pub fn new(metrics: &mut MetricRegistry) -> Result<RoomRegistry> { pub fn new(metrics: &mut MetricRegistry) -> Result<RoomRegistry> {
@ -46,18 +45,11 @@ impl RoomRegistry {
rooms: HashMap::new(), rooms: HashMap::new(),
metric_active_rooms, metric_active_rooms,
}; };
Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner)))) Ok(RoomRegistry(AsyncRwLock::new(inner)))
} }
pub fn shutdown(self) -> Result<()> { pub fn shutdown(self) {
let res = match Arc::try_unwrap(self.0) { // TODO iterate over rooms and stop them
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire rooms ownership on shutdown")),
};
let res = res.into_inner();
// TODO drop all rooms
drop(res);
Ok(())
} }
#[tracing::instrument(skip(self, services), name = "RoomRegistry::get_or_create_room")] #[tracing::instrument(skip(self, services), name = "RoomRegistry::get_or_create_room")]

View File

@ -142,17 +142,13 @@ impl TestServer {
rooms: Default::default(), rooms: Default::default(),
}, },
}; };
let TestServer { let TestServer { core, server } = self;
metrics: _,
core,
server,
} = self;
server.terminate().await?; server.terminate().await?;
let storage = core.shutdown().await; let storage = core.shutdown().await;
let mut metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer { metrics, core, server }) Ok(TestServer { core, server })
} }
async fn shutdown(self) { async fn shutdown(self) {

View File

@ -169,7 +169,7 @@ impl TestServer {
}; };
let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer { metrics, core, server }) Ok(TestServer { core, server })
} }
async fn shutdown(self) -> Result<()> { async fn shutdown(self) -> Result<()> {