refactor lavina core by grouping public services into a new LavinaCore struct.

this will be useful in future when additional services will be introduced and passed as dependencies
This commit is contained in:
Nikita Vilunov 2024-04-21 19:45:50 +02:00
parent 5a09b743c9
commit ddb348bee9
7 changed files with 79 additions and 84 deletions

View File

@ -1,4 +1,11 @@
//! Domain definitions and implementation of common chat logic. //! Domain definitions and implementation of common chat logic.
use anyhow::Result;
use prometheus::Registry as MetricsRegistry;
use crate::player::PlayerRegistry;
use crate::repo::Storage;
use crate::room::RoomRegistry;
pub mod player; pub mod player;
pub mod prelude; pub mod prelude;
pub mod repo; pub mod repo;
@ -6,3 +13,25 @@ pub mod room;
pub mod terminator; pub mod terminator;
mod table; mod table;
#[derive(Clone)]
pub struct LavinaCore {
pub players: PlayerRegistry,
pub rooms: RoomRegistry,
}
impl LavinaCore {
pub async fn new(mut metrics: MetricsRegistry, storage: Storage) -> Result<LavinaCore> {
// TODO shutdown all services in reverse order on error
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?;
Ok(LavinaCore { players, rooms })
}
pub async fn shutdown(mut self) -> Result<()> {
self.players.shutdown_all().await?;
drop(self.players);
drop(self.rooms);
Ok(())
}
}

View File

@ -18,6 +18,7 @@ use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; use lavina_core::room::{RoomId, RoomInfo, RoomRegistry};
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use proto_irc::client::CapabilitySubcommand; use proto_irc::client::CapabilitySubcommand;
use proto_irc::client::{client_message, ClientMessage}; use proto_irc::client::{client_message, ClientMessage};
use proto_irc::server::CapSubBody; use proto_irc::server::CapSubBody;
@ -54,8 +55,7 @@ async fn handle_socket(
config: ServerConfig, config: ServerConfig,
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
players: PlayerRegistry, mut core: LavinaCore,
rooms: RoomRegistry,
termination: Deferred<()>, // TODO use it to stop the connection gracefully termination: Deferred<()>, // TODO use it to stop the connection gracefully
mut storage: Storage, mut storage: Storage,
) -> Result<()> { ) -> Result<()> {
@ -75,7 +75,7 @@ async fn handle_socket(
match registered_user { match registered_user {
Ok(user) => { Ok(user) => {
log::debug!("User registered"); log::debug!("User registered");
handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?; handle_registered_socket(config, core.players, core.rooms, &mut reader, &mut writer, user).await?;
} }
Err(err) => { Err(err) => {
log::debug!("Registration failed: {err}"); log::debug!("Registration failed: {err}");
@ -942,8 +942,7 @@ impl RunningServer {
pub async fn launch( pub async fn launch(
config: ServerConfig, config: ServerConfig,
players: PlayerRegistry, core: LavinaCore,
rooms: RoomRegistry,
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
) -> Result<RunningServer> { ) -> Result<RunningServer> {
@ -984,13 +983,12 @@ pub async fn launch(
} }
let terminator = Terminator::spawn(|termination| { let terminator = Terminator::spawn(|termination| {
let players = players.clone(); let core = core.clone();
let rooms = rooms.clone();
let current_connections_clone = current_connections.clone(); let current_connections_clone = current_connections.clone();
let stopped_tx = stopped_tx.clone(); let stopped_tx = stopped_tx.clone();
let storage = storage.clone(); let storage = storage.clone();
async move { async move {
match handle_socket(config, stream, &socket_addr, players, rooms, termination, storage).await { match handle_socket(config, stream, &socket_addr, core, termination, storage).await {
Ok(_) => log::info!("Connection terminated"), Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"), Err(err) => log::warn!("Connection failed: {err}"),
} }

View File

@ -9,7 +9,7 @@ use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::{player::PlayerRegistry, room::RoomRegistry}; use lavina_core::LavinaCore;
use projection_irc::APP_VERSION; use projection_irc::APP_VERSION;
use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig}; use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig};
struct TestScope<'a> { struct TestScope<'a> {
@ -94,8 +94,7 @@ impl<'a> TestScope<'a> {
struct TestServer { struct TestServer {
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
rooms: RoomRegistry, core: LavinaCore,
players: PlayerRegistry,
server: RunningServer, server: RunningServer,
} }
impl TestServer { impl TestServer {
@ -110,43 +109,36 @@ impl TestServer {
db_path: ":memory:".into(), db_path: ":memory:".into(),
}) })
.await?; .await?;
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap(); let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap(); let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer {
metrics, metrics,
storage, storage,
rooms, core,
players,
server, server,
}) })
} }
async fn reboot(mut self) -> Result<TestServer> { async fn reboot(self) -> Result<TestServer> {
let config = ServerConfig { let config = ServerConfig {
listen_on: "127.0.0.1:0".parse().unwrap(), listen_on: "127.0.0.1:0".parse().unwrap(),
server_name: "testserver".into(), server_name: "testserver".into(),
}; };
let TestServer { let TestServer {
mut metrics, metrics: _,
mut storage, storage,
rooms, mut core,
mut players,
server, server,
} = self; } = self;
server.terminate().await?; server.terminate().await?;
players.shutdown_all().await.unwrap(); core.shutdown().await?;
drop(players); let metrics = MetricsRegistry::new();
drop(rooms); let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
let mut metrics = MetricsRegistry::new(); let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer {
metrics, metrics,
storage, storage,
rooms, core,
players,
server, server,
}) })
} }

View File

@ -26,6 +26,7 @@ use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::room::RoomRegistry; use lavina_core::room::RoomRegistry;
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use proto_xmpp::bind::{Name, Resource}; use proto_xmpp::bind::{Name, Resource};
use proto_xmpp::stream::*; use proto_xmpp::stream::*;
use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml};
@ -79,8 +80,7 @@ impl RunningServer {
pub async fn launch( pub async fn launch(
config: ServerConfig, config: ServerConfig,
players: PlayerRegistry, core: LavinaCore,
rooms: RoomRegistry,
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
) -> Result<RunningServer> { ) -> Result<RunningServer> {
@ -122,15 +122,14 @@ pub async fn launch(
// TODO kill the older connection and restart it // TODO kill the older connection and restart it
continue; continue;
} }
let players = players.clone(); let core = core.clone();
let rooms = rooms.clone();
let storage = storage.clone(); let storage = storage.clone();
let hostname = config.hostname.clone(); let hostname = config.hostname.clone();
let terminator = Terminator::spawn(|termination| { let terminator = Terminator::spawn(|termination| {
let stopped_tx = stopped_tx.clone(); let stopped_tx = stopped_tx.clone();
let loaded_config = loaded_config.clone(); let loaded_config = loaded_config.clone();
async move { async move {
match handle_socket(loaded_config, stream, &socket_addr, players, rooms, storage, hostname, termination).await { match handle_socket(loaded_config, stream, &socket_addr, core, storage, hostname, termination).await {
Ok(_) => log::info!("Connection terminated"), Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"), Err(err) => log::warn!("Connection failed: {err}"),
} }
@ -168,8 +167,7 @@ async fn handle_socket(
cert_config: Arc<LoadedConfig>, cert_config: Arc<LoadedConfig>,
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
mut players: PlayerRegistry, mut core: LavinaCore,
rooms: RoomRegistry,
mut storage: Storage, mut storage: Storage,
hostname: Str, hostname: Str,
termination: Deferred<()>, // TODO use it to stop the connection gracefully termination: Deferred<()>, // TODO use it to stop the connection gracefully
@ -207,14 +205,14 @@ async fn handle_socket(
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => { authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => {
match authenticated { match authenticated {
Ok(authenticated) => { Ok(authenticated) => {
let mut connection = players.connect_to_player(&authenticated.player_id).await; let mut connection = core.players.connect_to_player(&authenticated.player_id).await;
socket_final( socket_final(
&mut xml_reader, &mut xml_reader,
&mut xml_writer, &mut xml_writer,
&mut reader_buf, &mut reader_buf,
&authenticated, &authenticated,
&mut connection, &mut connection,
&rooms, &core.rooms,
&hostname, &hostname,
) )
.await?; .await?;

View File

@ -16,9 +16,8 @@ use tokio_rustls::rustls::client::ServerCertVerifier;
use tokio_rustls::rustls::{ClientConfig, ServerName}; use tokio_rustls::rustls::{ClientConfig, ServerName};
use tokio_rustls::TlsConnector; use tokio_rustls::TlsConnector;
use lavina_core::player::PlayerRegistry;
use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::room::RoomRegistry; use lavina_core::LavinaCore;
use projection_xmpp::{launch, RunningServer, ServerConfig}; use projection_xmpp::{launch, RunningServer, ServerConfig};
use proto_xmpp::xml::{Continuation, FromXml, Parser}; use proto_xmpp::xml::{Continuation, FromXml, Parser};
@ -124,8 +123,7 @@ impl ServerCertVerifier for IgnoreCertVerification {
struct TestServer { struct TestServer {
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
rooms: RoomRegistry, core: LavinaCore,
players: PlayerRegistry,
server: RunningServer, server: RunningServer,
} }
impl TestServer { impl TestServer {
@ -137,19 +135,17 @@ impl TestServer {
key: "tests/certs/xmpp.key".parse().unwrap(), key: "tests/certs/xmpp.key".parse().unwrap(),
hostname: "localhost".into(), hostname: "localhost".into(),
}; };
let mut metrics = MetricsRegistry::new(); let metrics = MetricsRegistry::new();
let mut storage = Storage::open(StorageConfig { let storage = Storage::open(StorageConfig {
db_path: ":memory:".into(), db_path: ":memory:".into(),
}) })
.await?; .await?;
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap(); let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap(); let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer {
metrics, metrics,
storage, storage,
rooms, core,
players,
server, server,
}) })
} }

View File

@ -16,6 +16,7 @@ use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::room::RoomRegistry; use lavina_core::room::RoomRegistry;
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use mgmt_api::*; use mgmt_api::*;
@ -29,20 +30,20 @@ pub struct ServerConfig {
pub async fn launch( pub async fn launch(
config: ServerConfig, config: ServerConfig,
metrics: MetricsRegistry, metrics: MetricsRegistry,
rooms: RoomRegistry, core: LavinaCore,
storage: Storage, storage: Storage,
) -> Result<Terminator> { ) -> Result<Terminator> {
log::info!("Starting the http service"); log::info!("Starting the http service");
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started"); log::debug!("Listener started");
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, storage, rx.map(|_| ()))); let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, storage, rx.map(|_| ())));
Ok(terminator) Ok(terminator)
} }
async fn main_loop( async fn main_loop(
listener: TcpListener, listener: TcpListener,
metrics: MetricsRegistry, metrics: MetricsRegistry,
rooms: RoomRegistry, core: LavinaCore,
storage: Storage, storage: Storage,
termination: impl Future<Output = ()>, termination: impl Future<Output = ()>,
) -> Result<()> { ) -> Result<()> {
@ -55,13 +56,13 @@ async fn main_loop(
let (stream, _) = result?; let (stream, _) = result?;
let stream = TokioIo::new(stream); let stream = TokioIo::new(stream);
let metrics = metrics.clone(); let metrics = metrics.clone();
let rooms = rooms.clone(); let core = core.clone();
let storage = storage.clone(); let storage = storage.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
let registry = metrics.clone(); let registry = metrics.clone();
let rooms = rooms.clone(); let core = core.clone();
let storage = storage.clone(); let storage = storage.clone();
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), storage.clone(), r))); let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), core.clone(), storage.clone(), r)));
if let Err(err) = server.await { if let Err(err) = server.await {
tracing::error!("Error serving connection: {:?}", err); tracing::error!("Error serving connection: {:?}", err);
} }
@ -75,13 +76,13 @@ async fn main_loop(
async fn route( async fn route(
registry: MetricsRegistry, registry: MetricsRegistry,
rooms: RoomRegistry, core: LavinaCore,
storage: Storage, storage: Storage,
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
) -> HttpResult<Response<Full<Bytes>>> { ) -> HttpResult<Response<Full<Bytes>>> {
let res = match (request.method(), request.uri().path()) { let res = match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => endpoint_metrics(registry), (&Method::GET, "/metrics") => endpoint_metrics(registry),
(&Method::GET, "/rooms") => endpoint_rooms(rooms).await, (&Method::GET, "/rooms") => endpoint_rooms(core.rooms).await,
(&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(), (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(),
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(), (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(),
_ => not_found(), _ => not_found(),

View File

@ -9,10 +9,9 @@ use figment::{providers::Toml, Figment};
use prometheus::Registry as MetricsRegistry; use prometheus::Registry as MetricsRegistry;
use serde::Deserialize; use serde::Deserialize;
use lavina_core::player::PlayerRegistry;
use lavina_core::prelude::*; use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::room::RoomRegistry; use lavina_core::LavinaCore;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
struct ServerConfig { struct ServerConfig {
@ -49,27 +48,12 @@ async fn main() -> Result<()> {
xmpp: xmpp_config, xmpp: xmpp_config,
storage: storage_config, storage: storage_config,
} = config; } = config;
let mut metrics = MetricsRegistry::new(); let metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
let mut players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?; let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone(), storage.clone()).await?;
let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone(), storage.clone()).await?;
let irc = projection_irc::launch( let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone(), storage.clone()).await?;
irc_config,
players.clone(),
rooms.clone(),
metrics.clone(),
storage.clone(),
)
.await?;
let xmpp = projection_xmpp::launch(
xmpp_config,
players.clone(),
rooms.clone(),
metrics.clone(),
storage.clone(),
)
.await?;
tracing::info!("Started"); tracing::info!("Started");
sleep.await; sleep.await;
@ -78,10 +62,7 @@ async fn main() -> Result<()> {
xmpp.terminate().await?; xmpp.terminate().await?;
irc.terminate().await?; irc.terminate().await?;
telemetry_terminator.terminate().await?; telemetry_terminator.terminate().await?;
players.shutdown_all().await?; core.shutdown().await?;
drop(players);
drop(rooms);
storage.close().await?;
tracing::info!("Shutdown complete"); tracing::info!("Shutdown complete");
Ok(()) Ok(())
} }