forked from lavina/lavina
1
0
Fork 0

Compare commits

..

No commits in common. "db8d151f157c9e06eba5547d71227620ced47154" and "d7a5c70f5311313b01f52a944decc24c632f9b17" have entirely different histories.

7 changed files with 86 additions and 81 deletions

View File

@ -1,11 +1,4 @@
//! Domain definitions and implementation of common chat logic. //! Domain definitions and implementation of common chat logic.
use anyhow::Result;
use prometheus::Registry as MetricsRegistry;
use crate::player::PlayerRegistry;
use crate::repo::Storage;
use crate::room::RoomRegistry;
pub mod player; pub mod player;
pub mod prelude; pub mod prelude;
pub mod repo; pub mod repo;
@ -13,25 +6,3 @@ pub mod room;
pub mod terminator; pub mod terminator;
mod table; mod table;
#[derive(Clone)]
pub struct LavinaCore {
pub players: PlayerRegistry,
pub rooms: RoomRegistry,
}
impl LavinaCore {
pub async fn new(mut metrics: MetricsRegistry, storage: Storage) -> Result<LavinaCore> {
// TODO shutdown all services in reverse order on error
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?;
Ok(LavinaCore { players, rooms })
}
pub async fn shutdown(mut self) -> Result<()> {
self.players.shutdown_all().await?;
drop(self.players);
drop(self.rooms);
Ok(())
}
}

View File

@ -19,7 +19,6 @@ use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; use lavina_core::room::{RoomId, RoomInfo, RoomRegistry};
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use proto_irc::client::CapabilitySubcommand; use proto_irc::client::CapabilitySubcommand;
use proto_irc::client::{client_message, ClientMessage}; use proto_irc::client::{client_message, ClientMessage};
use proto_irc::server::CapSubBody; use proto_irc::server::CapSubBody;
@ -57,7 +56,8 @@ async fn handle_socket(
config: ServerConfig, config: ServerConfig,
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
mut core: LavinaCore, players: PlayerRegistry,
rooms: RoomRegistry,
termination: Deferred<()>, // TODO use it to stop the connection gracefully termination: Deferred<()>, // TODO use it to stop the connection gracefully
mut storage: Storage, mut storage: Storage,
) -> Result<()> { ) -> Result<()> {
@ -77,7 +77,7 @@ async fn handle_socket(
match registered_user { match registered_user {
Ok(user) => { Ok(user) => {
log::debug!("User registered"); log::debug!("User registered");
handle_registered_socket(config, core.players, core.rooms, &mut reader, &mut writer, user).await?; handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?;
} }
Err(err) => { Err(err) => {
log::debug!("Registration failed: {err}"); log::debug!("Registration failed: {err}");
@ -970,7 +970,8 @@ impl RunningServer {
pub async fn launch( pub async fn launch(
config: ServerConfig, config: ServerConfig,
core: LavinaCore, players: PlayerRegistry,
rooms: RoomRegistry,
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
) -> Result<RunningServer> { ) -> Result<RunningServer> {
@ -1011,12 +1012,13 @@ pub async fn launch(
} }
let terminator = Terminator::spawn(|termination| { let terminator = Terminator::spawn(|termination| {
let core = core.clone(); let players = players.clone();
let rooms = rooms.clone();
let current_connections_clone = current_connections.clone(); let current_connections_clone = current_connections.clone();
let stopped_tx = stopped_tx.clone(); let stopped_tx = stopped_tx.clone();
let storage = storage.clone(); let storage = storage.clone();
async move { async move {
match handle_socket(config, stream, &socket_addr, core, termination, storage).await { match handle_socket(config, stream, &socket_addr, players, rooms, termination, storage).await {
Ok(_) => log::info!("Connection terminated"), Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"), Err(err) => log::warn!("Connection failed: {err}"),
} }

View File

@ -11,7 +11,7 @@ use tokio::net::TcpStream;
use lavina_core::player::{JoinResult, PlayerId, SendMessageResult}; use lavina_core::player::{JoinResult, PlayerId, SendMessageResult};
use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::room::RoomId; use lavina_core::room::RoomId;
use lavina_core::LavinaCore; use lavina_core::{player::PlayerRegistry, room::RoomRegistry};
use projection_irc::APP_VERSION; use projection_irc::APP_VERSION;
use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig}; use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig};
@ -102,7 +102,8 @@ impl<'a> TestScope<'a> {
struct TestServer { struct TestServer {
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
core: LavinaCore, rooms: RoomRegistry,
players: PlayerRegistry,
server: RunningServer, server: RunningServer,
} }
impl TestServer { impl TestServer {
@ -117,36 +118,43 @@ impl TestServer {
db_path: ":memory:".into(), db_path: ":memory:".into(),
}) })
.await?; .await?;
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap(); let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer {
metrics, metrics,
storage, storage,
core, rooms,
players,
server, server,
}) })
} }
async fn reboot(self) -> Result<TestServer> { async fn reboot(mut self) -> Result<TestServer> {
let config = ServerConfig { let config = ServerConfig {
listen_on: "127.0.0.1:0".parse().unwrap(), listen_on: "127.0.0.1:0".parse().unwrap(),
server_name: "testserver".into(), server_name: "testserver".into(),
}; };
let TestServer { let TestServer {
metrics: _, mut metrics,
storage, mut storage,
mut core, rooms,
mut players,
server, server,
} = self; } = self;
server.terminate().await?; server.terminate().await?;
core.shutdown().await?; players.shutdown_all().await.unwrap();
let metrics = MetricsRegistry::new(); drop(players);
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; drop(rooms);
let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap(); let mut metrics = MetricsRegistry::new();
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer {
metrics, metrics,
storage, storage,
core, rooms,
players,
server, server,
}) })
} }
@ -582,7 +590,7 @@ async fn server_time_capability() -> Result<()> {
s.send("CAP LS 302").await?; s.send("CAP LS 302").await?;
s.send("NICK tester").await?; s.send("NICK tester").await?;
s.send("USER UserName 0 * :Real Name").await?; s.send("USER UserName 0 * :Real Name").await?;
s.expect_cap_ls().await?; s.expect(":testserver CAP * LS :sasl=PLAIN server-time").await?;
s.send("CAP REQ :sasl server-time").await?; s.send("CAP REQ :sasl server-time").await?;
s.expect(":testserver CAP tester ACK :sasl server-time").await?; s.expect(":testserver CAP tester ACK :sasl server-time").await?;
s.send("AUTHENTICATE PLAIN").await?; s.send("AUTHENTICATE PLAIN").await?;
@ -600,7 +608,7 @@ async fn server_time_capability() -> Result<()> {
s.expect(":testserver 366 tester #test :End of /NAMES list").await?; s.expect(":testserver 366 tester #test :End of /NAMES list").await?;
server.storage.create_user("some_guy").await?; server.storage.create_user("some_guy").await?;
let mut conn = server.core.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await; let mut conn = server.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
let res = conn.join_room(RoomId::from("test").unwrap()).await?; let res = conn.join_room(RoomId::from("test").unwrap()).await?;
let JoinResult::Success(_) = res else { let JoinResult::Success(_) = res else {
panic!("Failed to join room"); panic!("Failed to join room");

View File

@ -26,7 +26,6 @@ use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::room::RoomRegistry; use lavina_core::room::RoomRegistry;
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use proto_xmpp::bind::{Name, Resource}; use proto_xmpp::bind::{Name, Resource};
use proto_xmpp::stream::*; use proto_xmpp::stream::*;
use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml};
@ -80,7 +79,8 @@ impl RunningServer {
pub async fn launch( pub async fn launch(
config: ServerConfig, config: ServerConfig,
core: LavinaCore, players: PlayerRegistry,
rooms: RoomRegistry,
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
) -> Result<RunningServer> { ) -> Result<RunningServer> {
@ -122,14 +122,15 @@ pub async fn launch(
// TODO kill the older connection and restart it // TODO kill the older connection and restart it
continue; continue;
} }
let core = core.clone(); let players = players.clone();
let rooms = rooms.clone();
let storage = storage.clone(); let storage = storage.clone();
let hostname = config.hostname.clone(); let hostname = config.hostname.clone();
let terminator = Terminator::spawn(|termination| { let terminator = Terminator::spawn(|termination| {
let stopped_tx = stopped_tx.clone(); let stopped_tx = stopped_tx.clone();
let loaded_config = loaded_config.clone(); let loaded_config = loaded_config.clone();
async move { async move {
match handle_socket(loaded_config, stream, &socket_addr, core, storage, hostname, termination).await { match handle_socket(loaded_config, stream, &socket_addr, players, rooms, storage, hostname, termination).await {
Ok(_) => log::info!("Connection terminated"), Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"), Err(err) => log::warn!("Connection failed: {err}"),
} }
@ -167,7 +168,8 @@ async fn handle_socket(
cert_config: Arc<LoadedConfig>, cert_config: Arc<LoadedConfig>,
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
mut core: LavinaCore, mut players: PlayerRegistry,
rooms: RoomRegistry,
mut storage: Storage, mut storage: Storage,
hostname: Str, hostname: Str,
termination: Deferred<()>, // TODO use it to stop the connection gracefully termination: Deferred<()>, // TODO use it to stop the connection gracefully
@ -205,14 +207,14 @@ async fn handle_socket(
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => { authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => {
match authenticated { match authenticated {
Ok(authenticated) => { Ok(authenticated) => {
let mut connection = core.players.connect_to_player(&authenticated.player_id).await; let mut connection = players.connect_to_player(&authenticated.player_id).await;
socket_final( socket_final(
&mut xml_reader, &mut xml_reader,
&mut xml_writer, &mut xml_writer,
&mut reader_buf, &mut reader_buf,
&authenticated, &authenticated,
&mut connection, &mut connection,
&core.rooms, &rooms,
&hostname, &hostname,
) )
.await?; .await?;

View File

@ -16,8 +16,9 @@ use tokio_rustls::rustls::client::ServerCertVerifier;
use tokio_rustls::rustls::{ClientConfig, ServerName}; use tokio_rustls::rustls::{ClientConfig, ServerName};
use tokio_rustls::TlsConnector; use tokio_rustls::TlsConnector;
use lavina_core::player::PlayerRegistry;
use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::LavinaCore; use lavina_core::room::RoomRegistry;
use projection_xmpp::{launch, RunningServer, ServerConfig}; use projection_xmpp::{launch, RunningServer, ServerConfig};
use proto_xmpp::xml::{Continuation, FromXml, Parser}; use proto_xmpp::xml::{Continuation, FromXml, Parser};
@ -123,7 +124,8 @@ impl ServerCertVerifier for IgnoreCertVerification {
struct TestServer { struct TestServer {
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
core: LavinaCore, rooms: RoomRegistry,
players: PlayerRegistry,
server: RunningServer, server: RunningServer,
} }
impl TestServer { impl TestServer {
@ -135,17 +137,19 @@ impl TestServer {
key: "tests/certs/xmpp.key".parse().unwrap(), key: "tests/certs/xmpp.key".parse().unwrap(),
hostname: "localhost".into(), hostname: "localhost".into(),
}; };
let metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(StorageConfig { let mut storage = Storage::open(StorageConfig {
db_path: ":memory:".into(), db_path: ":memory:".into(),
}) })
.await?; .await?;
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap(); let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer {
metrics, metrics,
storage, storage,
core, rooms,
players,
server, server,
}) })
} }

View File

@ -16,7 +16,6 @@ use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::room::RoomRegistry; use lavina_core::room::RoomRegistry;
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use mgmt_api::*; use mgmt_api::*;
@ -30,20 +29,20 @@ pub struct ServerConfig {
pub async fn launch( pub async fn launch(
config: ServerConfig, config: ServerConfig,
metrics: MetricsRegistry, metrics: MetricsRegistry,
core: LavinaCore, rooms: RoomRegistry,
storage: Storage, storage: Storage,
) -> Result<Terminator> { ) -> Result<Terminator> {
log::info!("Starting the http service"); log::info!("Starting the http service");
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started"); log::debug!("Listener started");
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, storage, rx.map(|_| ()))); let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, storage, rx.map(|_| ())));
Ok(terminator) Ok(terminator)
} }
async fn main_loop( async fn main_loop(
listener: TcpListener, listener: TcpListener,
metrics: MetricsRegistry, metrics: MetricsRegistry,
core: LavinaCore, rooms: RoomRegistry,
storage: Storage, storage: Storage,
termination: impl Future<Output = ()>, termination: impl Future<Output = ()>,
) -> Result<()> { ) -> Result<()> {
@ -56,13 +55,13 @@ async fn main_loop(
let (stream, _) = result?; let (stream, _) = result?;
let stream = TokioIo::new(stream); let stream = TokioIo::new(stream);
let metrics = metrics.clone(); let metrics = metrics.clone();
let core = core.clone(); let rooms = rooms.clone();
let storage = storage.clone(); let storage = storage.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
let registry = metrics.clone(); let registry = metrics.clone();
let core = core.clone(); let rooms = rooms.clone();
let storage = storage.clone(); let storage = storage.clone();
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), core.clone(), storage.clone(), r))); let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), storage.clone(), r)));
if let Err(err) = server.await { if let Err(err) = server.await {
tracing::error!("Error serving connection: {:?}", err); tracing::error!("Error serving connection: {:?}", err);
} }
@ -76,13 +75,13 @@ async fn main_loop(
async fn route( async fn route(
registry: MetricsRegistry, registry: MetricsRegistry,
core: LavinaCore, rooms: RoomRegistry,
storage: Storage, storage: Storage,
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
) -> HttpResult<Response<Full<Bytes>>> { ) -> HttpResult<Response<Full<Bytes>>> {
let res = match (request.method(), request.uri().path()) { let res = match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => endpoint_metrics(registry), (&Method::GET, "/metrics") => endpoint_metrics(registry),
(&Method::GET, "/rooms") => endpoint_rooms(core.rooms).await, (&Method::GET, "/rooms") => endpoint_rooms(rooms).await,
(&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(), (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(),
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(), (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(),
_ => not_found(), _ => not_found(),

View File

@ -9,9 +9,10 @@ use figment::{providers::Toml, Figment};
use prometheus::Registry as MetricsRegistry; use prometheus::Registry as MetricsRegistry;
use serde::Deserialize; use serde::Deserialize;
use lavina_core::player::PlayerRegistry;
use lavina_core::prelude::*; use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::LavinaCore; use lavina_core::room::RoomRegistry;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
struct ServerConfig { struct ServerConfig {
@ -48,12 +49,27 @@ async fn main() -> Result<()> {
xmpp: xmpp_config, xmpp: xmpp_config,
storage: storage_config, storage: storage_config,
} = config; } = config;
let metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone(), storage.clone()).await?; let mut players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?;
let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone(), storage.clone()).await?; let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?;
let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone(), storage.clone()).await?; let irc = projection_irc::launch(
irc_config,
players.clone(),
rooms.clone(),
metrics.clone(),
storage.clone(),
)
.await?;
let xmpp = projection_xmpp::launch(
xmpp_config,
players.clone(),
rooms.clone(),
metrics.clone(),
storage.clone(),
)
.await?;
tracing::info!("Started"); tracing::info!("Started");
sleep.await; sleep.await;
@ -62,7 +78,10 @@ async fn main() -> Result<()> {
xmpp.terminate().await?; xmpp.terminate().await?;
irc.terminate().await?; irc.terminate().await?;
telemetry_terminator.terminate().await?; telemetry_terminator.terminate().await?;
core.shutdown().await?; players.shutdown_all().await?;
drop(players);
drop(rooms);
storage.close().await?;
tracing::info!("Shutdown complete"); tracing::info!("Shutdown complete");
Ok(()) Ok(())
} }