diff --git a/crates/lavina-core/src/clustering.rs b/crates/lavina-core/src/clustering.rs index 6431a27..348e556 100644 --- a/crates/lavina-core/src/clustering.rs +++ b/crates/lavina-core/src/clustering.rs @@ -1,11 +1,11 @@ +use std::collections::HashMap; +use std::net::SocketAddr; + use anyhow::{anyhow, Result}; use reqwest::Client; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::{DefaultSpanBackend, TracingMiddleware}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; pub mod broadcast; pub mod room; @@ -27,19 +27,15 @@ pub struct ClusterMetadata { pub rooms: HashMap, } -#[derive(Clone)] pub struct LavinaClient { - addresses: Arc, + addresses: Addresses, client: ClientWithMiddleware, } impl LavinaClient { pub fn new(addresses: Addresses) -> Self { let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::::new()).build(); - Self { - addresses: Arc::new(addresses), - client, - } + Self { addresses, client } } async fn send_request(&self, node_id: u32, path: &str, req: impl Serialize) -> Result<()> { diff --git a/crates/lavina-core/src/clustering/broadcast.rs b/crates/lavina-core/src/clustering/broadcast.rs index 8e9301b..49573dd 100644 --- a/crates/lavina-core/src/clustering/broadcast.rs +++ b/crates/lavina-core/src/clustering/broadcast.rs @@ -1,5 +1,4 @@ use std::collections::{HashMap, HashSet}; -use std::sync::Arc; use chrono::{DateTime, Utc}; use tokio::sync::Mutex; @@ -13,16 +12,13 @@ struct BroadcastingInner { subscriptions: HashMap>, } -impl Broadcasting {} - -#[derive(Clone)] -pub struct Broadcasting(Arc>); +pub struct Broadcasting(Mutex); impl Broadcasting { pub fn new() -> Self { let inner = BroadcastingInner { subscriptions: HashMap::new(), }; - Self(Arc::new(Mutex::new(inner))) + Self(Mutex::new(inner)) } /// Broadcasts the given update to subscribed player actors on local node. diff --git a/crates/lavina-core/src/dialog.rs b/crates/lavina-core/src/dialog.rs index 480b019..8cf4867 100644 --- a/crates/lavina-core/src/dialog.rs +++ b/crates/lavina-core/src/dialog.rs @@ -4,7 +4,6 @@ //! There are no admins or other roles in dialogs, both participants have equal rights. use std::collections::HashMap; -use std::sync::Arc; use chrono::{DateTime, Utc}; use tokio::sync::RwLock as AsyncRwLock; @@ -47,8 +46,7 @@ struct DialogRegistryInner { dialogs: HashMap>, } -#[derive(Clone)] -pub(crate) struct DialogRegistry(Arc>); +pub(crate) struct DialogRegistry(AsyncRwLock); impl DialogRegistry { pub async fn send_message( @@ -129,20 +127,12 @@ impl DialogRegistry { impl DialogRegistry { pub fn new() -> DialogRegistry { - DialogRegistry(Arc::new(AsyncRwLock::new(DialogRegistryInner { + DialogRegistry(AsyncRwLock::new(DialogRegistryInner { dialogs: HashMap::new(), - }))) + })) } - pub fn shutdown(self) -> Result<()> { - let res = match Arc::try_unwrap(self.0) { - Ok(e) => e, - Err(_) => return Err(fail("failed to acquire dialogs ownership on shutdown")), - }; - let res = res.into_inner(); - drop(res); - Ok(()) - } + pub fn shutdown(self) {} } #[cfg(test)] diff --git a/crates/lavina-core/src/lib.rs b/crates/lavina-core/src/lib.rs index 1f9f923..a5c3a1a 100644 --- a/crates/lavina-core/src/lib.rs +++ b/crates/lavina-core/src/lib.rs @@ -1,4 +1,5 @@ //! Domain definitions and implementation of common chat logic. +use std::ops::Deref; use std::sync::Arc; use anyhow::Result; @@ -24,26 +25,26 @@ mod table; #[derive(Clone)] pub struct LavinaCore { - services: Services, + services: Arc, +} + +impl Deref for LavinaCore { + type Target = Services; + + fn deref(&self) -> &Self::Target { + &self.services + } } impl LavinaCore { pub async fn connect_to_player(&self, player_id: &PlayerId) -> PlayerConnection { - self.services.players.connect_to_player(&self.services, player_id).await - } - - pub fn rooms(&self) -> &RoomRegistry { - &self.services.rooms + self.services.players.connect_to_player(&self, player_id).await } pub async fn get_room(&self, room_id: &RoomId) -> Option { self.services.rooms.get_room(&self.services, room_id).await } - pub async fn get_or_create_room(&self, room_id: RoomId) -> Result { - self.services.rooms.get_or_create_room(&self.services, room_id).await - } - pub async fn create_player(&self, player_id: &PlayerId) -> Result<()> { self.services.storage.create_user(player_id.as_inner()).await } @@ -57,8 +58,7 @@ impl LavinaCore { } } -#[derive(Clone)] -pub(crate) struct Services { +pub struct Services { pub(crate) players: PlayerRegistry, pub(crate) rooms: RoomRegistry, pub(crate) dialogs: DialogRegistry, @@ -82,23 +82,31 @@ impl LavinaCore { let players = PlayerRegistry::empty(metrics)?; let services = Services { - players: players.clone(), - rooms: rooms.clone(), - dialogs: dialogs.clone(), - broadcasting: broadcasting.clone(), + players, + rooms, + dialogs, + broadcasting, client, storage, cluster_metadata: Arc::new(cluster_config.metadata), }; - Ok(LavinaCore { services }) + Ok(LavinaCore { + services: Arc::new(services), + }) } - pub async fn shutdown(mut self) -> Storage { - let _ = self.services.players.shutdown_all().await; - let _ = self.services.players.shutdown(); - let _ = self.services.dialogs.shutdown(); - let _ = self.services.rooms.shutdown(); - self.services.storage + pub async fn shutdown(self) -> Storage { + let _ = self.players.shutdown_all().await; + let services = match Arc::try_unwrap(self.services) { + Ok(e) => e, + Err(_) => { + panic!("failed to acquire services ownership on shutdown"); + } + }; + let _ = services.players.shutdown(); + let _ = services.dialogs.shutdown(); + let _ = services.rooms.shutdown(); + services.storage } } diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index a4adef6..db45f7f 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -8,7 +8,6 @@ //! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor, //! so that they don't overload the room actor. use std::collections::{HashMap, HashSet}; -use std::sync::Arc; use chrono::{DateTime, Utc}; use prometheus::{IntGauge, Registry as MetricsRegistry}; @@ -21,7 +20,7 @@ use crate::clustering::room::*; use crate::prelude::*; use crate::room::{RoomHandle, RoomId, RoomInfo}; use crate::table::{AnonTable, Key as AnonKey}; -use crate::Services; +use crate::LavinaCore; /// Opaque player identifier. Cannot contain spaces, must be shorter than 32. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -263,8 +262,7 @@ pub enum Updates { } /// Handle to a player registry — a shared data structure containing information about players. -#[derive(Clone)] -pub(crate) struct PlayerRegistry(Arc>); +pub(crate) struct PlayerRegistry(RwLock); impl PlayerRegistry { pub fn empty(metrics: &mut MetricsRegistry) -> Result { let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; @@ -273,17 +271,12 @@ impl PlayerRegistry { players: HashMap::new(), metric_active_players, }; - Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) + Ok(PlayerRegistry(RwLock::new(inner))) } - pub fn shutdown(self) -> Result<()> { - let res = match Arc::try_unwrap(self.0) { - Ok(e) => e, - Err(_) => return Err(fail("failed to acquire players ownership on shutdown")), - }; - let res = res.into_inner(); + pub fn shutdown(self) { + let res = self.0.into_inner(); drop(res); - Ok(()) } #[tracing::instrument(skip(self), name = "PlayerRegistry::get_player")] @@ -306,8 +299,8 @@ impl PlayerRegistry { } } - #[tracing::instrument(skip(self, services), name = "PlayerRegistry::get_or_launch_player")] - pub async fn get_or_launch_player(&self, services: &Services, id: &PlayerId) -> PlayerHandle { + #[tracing::instrument(skip(self, core), name = "PlayerRegistry::get_or_launch_player")] + pub async fn get_or_launch_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerHandle { let inner = self.0.read().await; if let Some((handle, _)) = inner.players.get(id) { handle.clone() @@ -317,7 +310,7 @@ impl PlayerRegistry { if let Some((handle, _)) = inner.players.get(id) { handle.clone() } else { - let (handle, fiber) = Player::launch(id.clone(), services.clone()).await; + let (handle, fiber) = Player::launch(id.clone(), core.clone()).await; inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.metric_active_players.inc(); handle @@ -325,13 +318,13 @@ impl PlayerRegistry { } } - #[tracing::instrument(skip(self, services), name = "PlayerRegistry::connect_to_player")] - pub async fn connect_to_player(&self, services: &Services, id: &PlayerId) -> PlayerConnection { - let player_handle = self.get_or_launch_player(services, id).await; + #[tracing::instrument(skip(self, core), name = "PlayerRegistry::connect_to_player")] + pub async fn connect_to_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerConnection { + let player_handle = self.get_or_launch_player(core, id).await; player_handle.subscribe().await } - pub async fn shutdown_all(&mut self) -> Result<()> { + pub async fn shutdown_all(&self) -> Result<()> { let mut inner = self.0.write().await; for (i, (k, j)) in inner.players.drain() { k.send(ActorCommand::Stop).await; @@ -365,14 +358,14 @@ struct Player { banned_from: HashSet, rx: Receiver<(ActorCommand, Span)>, handle: PlayerHandle, - services: Services, + services: LavinaCore, } impl Player { - async fn launch(player_id: PlayerId, services: Services) -> (PlayerHandle, JoinHandle) { + async fn launch(player_id: PlayerId, core: LavinaCore) -> (PlayerHandle, JoinHandle) { let (tx, rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); - let storage_id = services.storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap(); + let storage_id = core.services.storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap(); let player = Player { player_id, storage_id, @@ -384,7 +377,7 @@ impl Player { banned_from: HashSet::new(), rx, handle, - services, + services: core, }; let fiber = tokio::task::spawn(player.main_loop()); (handle_clone, fiber) diff --git a/crates/lavina-core/src/repo/mod.rs b/crates/lavina-core/src/repo/mod.rs index cdd7a70..e2c96c9 100644 --- a/crates/lavina-core/src/repo/mod.rs +++ b/crates/lavina-core/src/repo/mod.rs @@ -1,7 +1,6 @@ //! Storage and persistence logic. use std::str::FromStr; -use std::sync::Arc; use serde::Deserialize; use sqlx::sqlite::SqliteConnectOptions; @@ -20,9 +19,8 @@ pub struct StorageConfig { pub db_path: String, } -#[derive(Clone)] pub struct Storage { - conn: Arc>, + conn: Mutex, } impl Storage { pub async fn open(config: StorageConfig) -> Result { @@ -34,19 +32,12 @@ impl Storage { migrator.run(&mut conn).await?; log::info!("Migrations passed"); - let conn = Arc::new(Mutex::new(conn)); + let conn = Mutex::new(conn); Ok(Storage { conn }) } pub async fn close(self) { - let res = match Arc::try_unwrap(self.conn) { - Ok(e) => e, - Err(_) => { - tracing::error!("failed to acquire DB ownership on shutdown"); - return; - } - }; - let res = res.into_inner(); + let res = self.conn.into_inner(); match res.close().await { Ok(_) => {} Err(e) => { diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index 6a1d831..bd26967 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -35,8 +35,7 @@ impl RoomId { } /// Shared data structure for storing metadata about rooms. -#[derive(Clone)] -pub(crate) struct RoomRegistry(Arc>); +pub(crate) struct RoomRegistry(AsyncRwLock); impl RoomRegistry { pub fn new(metrics: &mut MetricRegistry) -> Result { @@ -46,18 +45,11 @@ impl RoomRegistry { rooms: HashMap::new(), metric_active_rooms, }; - Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner)))) + Ok(RoomRegistry(AsyncRwLock::new(inner))) } - pub fn shutdown(self) -> Result<()> { - let res = match Arc::try_unwrap(self.0) { - Ok(e) => e, - Err(_) => return Err(fail("failed to acquire rooms ownership on shutdown")), - }; - let res = res.into_inner(); - // TODO drop all rooms - drop(res); - Ok(()) + pub fn shutdown(self) { + // TODO iterate over rooms and stop them } #[tracing::instrument(skip(self, services), name = "RoomRegistry::get_or_create_room")] diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs index ab0ffdf..40a8c38 100644 --- a/crates/projection-irc/tests/lib.rs +++ b/crates/projection-irc/tests/lib.rs @@ -142,17 +142,13 @@ impl TestServer { rooms: Default::default(), }, }; - let TestServer { - metrics: _, - core, - server, - } = self; + let TestServer { core, server } = self; server.terminate().await?; let storage = core.shutdown().await; let mut metrics = MetricsRegistry::new(); let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?; let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); - Ok(TestServer { metrics, core, server }) + Ok(TestServer { core, server }) } async fn shutdown(self) { diff --git a/crates/projection-xmpp/tests/lib.rs b/crates/projection-xmpp/tests/lib.rs index 7f15c2e..5b4d212 100644 --- a/crates/projection-xmpp/tests/lib.rs +++ b/crates/projection-xmpp/tests/lib.rs @@ -169,7 +169,7 @@ impl TestServer { }; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?; let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); - Ok(TestServer { metrics, core, server }) + Ok(TestServer { core, server }) } async fn shutdown(self) -> Result<()> {