diff --git a/crates/lavina-core/src/lib.rs b/crates/lavina-core/src/lib.rs index 401e49e..ff52363 100644 --- a/crates/lavina-core/src/lib.rs +++ b/crates/lavina-core/src/lib.rs @@ -1,4 +1,11 @@ //! Domain definitions and implementation of common chat logic. +use anyhow::Result; +use prometheus::Registry as MetricsRegistry; + +use crate::player::PlayerRegistry; +use crate::repo::Storage; +use crate::room::RoomRegistry; + pub mod player; pub mod prelude; pub mod repo; @@ -6,3 +13,25 @@ pub mod room; pub mod terminator; mod table; + +#[derive(Clone)] +pub struct LavinaCore { + pub players: PlayerRegistry, + pub rooms: RoomRegistry, +} + +impl LavinaCore { + pub async fn new(mut metrics: MetricsRegistry, storage: Storage) -> Result { + // TODO shutdown all services in reverse order on error + let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; + let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?; + Ok(LavinaCore { players, rooms }) + } + + pub async fn shutdown(mut self) -> Result<()> { + self.players.shutdown_all().await?; + drop(self.players); + drop(self.rooms); + Ok(()) + } +} diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 7f1b49e..1513546 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -18,6 +18,7 @@ use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; use lavina_core::terminator::Terminator; +use lavina_core::LavinaCore; use proto_irc::client::CapabilitySubcommand; use proto_irc::client::{client_message, ClientMessage}; use proto_irc::server::CapSubBody; @@ -54,8 +55,7 @@ async fn handle_socket( config: ServerConfig, mut stream: TcpStream, socket_addr: &SocketAddr, - players: PlayerRegistry, - rooms: RoomRegistry, + mut core: LavinaCore, termination: Deferred<()>, // TODO use it to stop the connection gracefully mut storage: Storage, ) -> Result<()> { @@ -75,7 +75,7 @@ async fn handle_socket( match registered_user { Ok(user) => { log::debug!("User registered"); - handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?; + handle_registered_socket(config, core.players, core.rooms, &mut reader, &mut writer, user).await?; } Err(err) => { log::debug!("Registration failed: {err}"); @@ -942,8 +942,7 @@ impl RunningServer { pub async fn launch( config: ServerConfig, - players: PlayerRegistry, - rooms: RoomRegistry, + core: LavinaCore, metrics: MetricsRegistry, storage: Storage, ) -> Result { @@ -984,13 +983,12 @@ pub async fn launch( } let terminator = Terminator::spawn(|termination| { - let players = players.clone(); - let rooms = rooms.clone(); + let core = core.clone(); let current_connections_clone = current_connections.clone(); let stopped_tx = stopped_tx.clone(); let storage = storage.clone(); async move { - match handle_socket(config, stream, &socket_addr, players, rooms, termination, storage).await { + match handle_socket(config, stream, &socket_addr, core, termination, storage).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs index 145033b..f2f4505 100644 --- a/crates/projection-irc/tests/lib.rs +++ b/crates/projection-irc/tests/lib.rs @@ -9,7 +9,7 @@ use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::TcpStream; use lavina_core::repo::{Storage, StorageConfig}; -use lavina_core::{player::PlayerRegistry, room::RoomRegistry}; +use lavina_core::LavinaCore; use projection_irc::APP_VERSION; use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig}; struct TestScope<'a> { @@ -94,8 +94,7 @@ impl<'a> TestScope<'a> { struct TestServer { metrics: MetricsRegistry, storage: Storage, - rooms: RoomRegistry, - players: PlayerRegistry, + core: LavinaCore, server: RunningServer, } impl TestServer { @@ -110,43 +109,36 @@ impl TestServer { db_path: ":memory:".into(), }) .await?; - let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap(); - let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap(); - let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap(); + let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap(); Ok(TestServer { metrics, storage, - rooms, - players, + core, server, }) } - async fn reboot(mut self) -> Result { + async fn reboot(self) -> Result { let config = ServerConfig { listen_on: "127.0.0.1:0".parse().unwrap(), server_name: "testserver".into(), }; let TestServer { - mut metrics, - mut storage, - rooms, - mut players, + metrics: _, + storage, + mut core, server, } = self; server.terminate().await?; - players.shutdown_all().await.unwrap(); - drop(players); - drop(rooms); - let mut metrics = MetricsRegistry::new(); - let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap(); - let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap(); - let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap(); + core.shutdown().await?; + let metrics = MetricsRegistry::new(); + let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap(); Ok(TestServer { metrics, storage, - rooms, - players, + core, server, }) } diff --git a/crates/projection-xmpp/src/lib.rs b/crates/projection-xmpp/src/lib.rs index 30e0a3c..6539254 100644 --- a/crates/projection-xmpp/src/lib.rs +++ b/crates/projection-xmpp/src/lib.rs @@ -26,6 +26,7 @@ use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::RoomRegistry; use lavina_core::terminator::Terminator; +use lavina_core::LavinaCore; use proto_xmpp::bind::{Name, Resource}; use proto_xmpp::stream::*; use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; @@ -79,8 +80,7 @@ impl RunningServer { pub async fn launch( config: ServerConfig, - players: PlayerRegistry, - rooms: RoomRegistry, + core: LavinaCore, metrics: MetricsRegistry, storage: Storage, ) -> Result { @@ -122,15 +122,14 @@ pub async fn launch( // TODO kill the older connection and restart it continue; } - let players = players.clone(); - let rooms = rooms.clone(); + let core = core.clone(); let storage = storage.clone(); let hostname = config.hostname.clone(); let terminator = Terminator::spawn(|termination| { let stopped_tx = stopped_tx.clone(); let loaded_config = loaded_config.clone(); async move { - match handle_socket(loaded_config, stream, &socket_addr, players, rooms, storage, hostname, termination).await { + match handle_socket(loaded_config, stream, &socket_addr, core, storage, hostname, termination).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } @@ -168,8 +167,7 @@ async fn handle_socket( cert_config: Arc, mut stream: TcpStream, socket_addr: &SocketAddr, - mut players: PlayerRegistry, - rooms: RoomRegistry, + mut core: LavinaCore, mut storage: Storage, hostname: Str, termination: Deferred<()>, // TODO use it to stop the connection gracefully @@ -207,14 +205,14 @@ async fn handle_socket( authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => { match authenticated { Ok(authenticated) => { - let mut connection = players.connect_to_player(&authenticated.player_id).await; + let mut connection = core.players.connect_to_player(&authenticated.player_id).await; socket_final( &mut xml_reader, &mut xml_writer, &mut reader_buf, &authenticated, &mut connection, - &rooms, + &core.rooms, &hostname, ) .await?; diff --git a/crates/projection-xmpp/tests/lib.rs b/crates/projection-xmpp/tests/lib.rs index 29d0368..be687a4 100644 --- a/crates/projection-xmpp/tests/lib.rs +++ b/crates/projection-xmpp/tests/lib.rs @@ -16,9 +16,8 @@ use tokio_rustls::rustls::client::ServerCertVerifier; use tokio_rustls::rustls::{ClientConfig, ServerName}; use tokio_rustls::TlsConnector; -use lavina_core::player::PlayerRegistry; use lavina_core::repo::{Storage, StorageConfig}; -use lavina_core::room::RoomRegistry; +use lavina_core::LavinaCore; use projection_xmpp::{launch, RunningServer, ServerConfig}; use proto_xmpp::xml::{Continuation, FromXml, Parser}; @@ -124,8 +123,7 @@ impl ServerCertVerifier for IgnoreCertVerification { struct TestServer { metrics: MetricsRegistry, storage: Storage, - rooms: RoomRegistry, - players: PlayerRegistry, + core: LavinaCore, server: RunningServer, } impl TestServer { @@ -137,19 +135,17 @@ impl TestServer { key: "tests/certs/xmpp.key".parse().unwrap(), hostname: "localhost".into(), }; - let mut metrics = MetricsRegistry::new(); - let mut storage = Storage::open(StorageConfig { + let metrics = MetricsRegistry::new(); + let storage = Storage::open(StorageConfig { db_path: ":memory:".into(), }) .await?; - let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap(); - let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap(); - let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap(); + let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap(); Ok(TestServer { metrics, storage, - rooms, - players, + core, server, }) } diff --git a/src/http.rs b/src/http.rs index 89ba4ce..302bf5f 100644 --- a/src/http.rs +++ b/src/http.rs @@ -16,6 +16,7 @@ use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::RoomRegistry; use lavina_core::terminator::Terminator; +use lavina_core::LavinaCore; use mgmt_api::*; @@ -29,20 +30,20 @@ pub struct ServerConfig { pub async fn launch( config: ServerConfig, metrics: MetricsRegistry, - rooms: RoomRegistry, + core: LavinaCore, storage: Storage, ) -> Result { log::info!("Starting the http service"); let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); - let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, storage, rx.map(|_| ()))); + let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, storage, rx.map(|_| ()))); Ok(terminator) } async fn main_loop( listener: TcpListener, metrics: MetricsRegistry, - rooms: RoomRegistry, + core: LavinaCore, storage: Storage, termination: impl Future, ) -> Result<()> { @@ -55,13 +56,13 @@ async fn main_loop( let (stream, _) = result?; let stream = TokioIo::new(stream); let metrics = metrics.clone(); - let rooms = rooms.clone(); + let core = core.clone(); let storage = storage.clone(); tokio::task::spawn(async move { let registry = metrics.clone(); - let rooms = rooms.clone(); + let core = core.clone(); let storage = storage.clone(); - let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), storage.clone(), r))); + let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), core.clone(), storage.clone(), r))); if let Err(err) = server.await { tracing::error!("Error serving connection: {:?}", err); } @@ -75,13 +76,13 @@ async fn main_loop( async fn route( registry: MetricsRegistry, - rooms: RoomRegistry, + core: LavinaCore, storage: Storage, request: Request, ) -> HttpResult>> { let res = match (request.method(), request.uri().path()) { (&Method::GET, "/metrics") => endpoint_metrics(registry), - (&Method::GET, "/rooms") => endpoint_rooms(rooms).await, + (&Method::GET, "/rooms") => endpoint_rooms(core.rooms).await, (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(), (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(), _ => not_found(), diff --git a/src/main.rs b/src/main.rs index 0d03a89..98c45f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,10 +9,9 @@ use figment::{providers::Toml, Figment}; use prometheus::Registry as MetricsRegistry; use serde::Deserialize; -use lavina_core::player::PlayerRegistry; use lavina_core::prelude::*; use lavina_core::repo::Storage; -use lavina_core::room::RoomRegistry; +use lavina_core::LavinaCore; #[derive(Deserialize, Debug)] struct ServerConfig { @@ -49,27 +48,12 @@ async fn main() -> Result<()> { xmpp: xmpp_config, storage: storage_config, } = config; - let mut metrics = MetricsRegistry::new(); + let metrics = MetricsRegistry::new(); let storage = Storage::open(storage_config).await?; - let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; - let mut players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?; - let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; - let irc = projection_irc::launch( - irc_config, - players.clone(), - rooms.clone(), - metrics.clone(), - storage.clone(), - ) - .await?; - let xmpp = projection_xmpp::launch( - xmpp_config, - players.clone(), - rooms.clone(), - metrics.clone(), - storage.clone(), - ) - .await?; + let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone(), storage.clone()).await?; + let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone(), storage.clone()).await?; + let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone(), storage.clone()).await?; tracing::info!("Started"); sleep.await; @@ -78,10 +62,7 @@ async fn main() -> Result<()> { xmpp.terminate().await?; irc.terminate().await?; telemetry_terminator.terminate().await?; - players.shutdown_all().await?; - drop(players); - drop(rooms); - storage.close().await?; + core.shutdown().await?; tracing::info!("Shutdown complete"); Ok(()) }