forked from lavina/lavina
Compare commits
3 Commits
d7a5c70f53
...
db8d151f15
Author | SHA1 | Date |
---|---|---|
Nikita Vilunov | db8d151f15 | |
Nikita Vilunov | b787f1a545 | |
Nikita Vilunov | ddb348bee9 |
|
@ -1,4 +1,11 @@
|
|||
//! Domain definitions and implementation of common chat logic.
|
||||
use anyhow::Result;
|
||||
use prometheus::Registry as MetricsRegistry;
|
||||
|
||||
use crate::player::PlayerRegistry;
|
||||
use crate::repo::Storage;
|
||||
use crate::room::RoomRegistry;
|
||||
|
||||
pub mod player;
|
||||
pub mod prelude;
|
||||
pub mod repo;
|
||||
|
@ -6,3 +13,25 @@ pub mod room;
|
|||
pub mod terminator;
|
||||
|
||||
mod table;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LavinaCore {
|
||||
pub players: PlayerRegistry,
|
||||
pub rooms: RoomRegistry,
|
||||
}
|
||||
|
||||
impl LavinaCore {
|
||||
pub async fn new(mut metrics: MetricsRegistry, storage: Storage) -> Result<LavinaCore> {
|
||||
// TODO shutdown all services in reverse order on error
|
||||
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
|
||||
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?;
|
||||
Ok(LavinaCore { players, rooms })
|
||||
}
|
||||
|
||||
pub async fn shutdown(mut self) -> Result<()> {
|
||||
self.players.shutdown_all().await?;
|
||||
drop(self.players);
|
||||
drop(self.rooms);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ use lavina_core::prelude::*;
|
|||
use lavina_core::repo::Storage;
|
||||
use lavina_core::room::{RoomId, RoomInfo, RoomRegistry};
|
||||
use lavina_core::terminator::Terminator;
|
||||
use lavina_core::LavinaCore;
|
||||
use proto_irc::client::CapabilitySubcommand;
|
||||
use proto_irc::client::{client_message, ClientMessage};
|
||||
use proto_irc::server::CapSubBody;
|
||||
|
@ -56,8 +57,7 @@ async fn handle_socket(
|
|||
config: ServerConfig,
|
||||
mut stream: TcpStream,
|
||||
socket_addr: &SocketAddr,
|
||||
players: PlayerRegistry,
|
||||
rooms: RoomRegistry,
|
||||
mut core: LavinaCore,
|
||||
termination: Deferred<()>, // TODO use it to stop the connection gracefully
|
||||
mut storage: Storage,
|
||||
) -> Result<()> {
|
||||
|
@ -77,7 +77,7 @@ async fn handle_socket(
|
|||
match registered_user {
|
||||
Ok(user) => {
|
||||
log::debug!("User registered");
|
||||
handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?;
|
||||
handle_registered_socket(config, core.players, core.rooms, &mut reader, &mut writer, user).await?;
|
||||
}
|
||||
Err(err) => {
|
||||
log::debug!("Registration failed: {err}");
|
||||
|
@ -970,8 +970,7 @@ impl RunningServer {
|
|||
|
||||
pub async fn launch(
|
||||
config: ServerConfig,
|
||||
players: PlayerRegistry,
|
||||
rooms: RoomRegistry,
|
||||
core: LavinaCore,
|
||||
metrics: MetricsRegistry,
|
||||
storage: Storage,
|
||||
) -> Result<RunningServer> {
|
||||
|
@ -1012,13 +1011,12 @@ pub async fn launch(
|
|||
}
|
||||
|
||||
let terminator = Terminator::spawn(|termination| {
|
||||
let players = players.clone();
|
||||
let rooms = rooms.clone();
|
||||
let core = core.clone();
|
||||
let current_connections_clone = current_connections.clone();
|
||||
let stopped_tx = stopped_tx.clone();
|
||||
let storage = storage.clone();
|
||||
async move {
|
||||
match handle_socket(config, stream, &socket_addr, players, rooms, termination, storage).await {
|
||||
match handle_socket(config, stream, &socket_addr, core, termination, storage).await {
|
||||
Ok(_) => log::info!("Connection terminated"),
|
||||
Err(err) => log::warn!("Connection failed: {err}"),
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ use tokio::net::TcpStream;
|
|||
use lavina_core::player::{JoinResult, PlayerId, SendMessageResult};
|
||||
use lavina_core::repo::{Storage, StorageConfig};
|
||||
use lavina_core::room::RoomId;
|
||||
use lavina_core::{player::PlayerRegistry, room::RoomRegistry};
|
||||
use lavina_core::LavinaCore;
|
||||
use projection_irc::APP_VERSION;
|
||||
use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig};
|
||||
|
||||
|
@ -102,8 +102,7 @@ impl<'a> TestScope<'a> {
|
|||
struct TestServer {
|
||||
metrics: MetricsRegistry,
|
||||
storage: Storage,
|
||||
rooms: RoomRegistry,
|
||||
players: PlayerRegistry,
|
||||
core: LavinaCore,
|
||||
server: RunningServer,
|
||||
}
|
||||
impl TestServer {
|
||||
|
@ -118,43 +117,36 @@ impl TestServer {
|
|||
db_path: ":memory:".into(),
|
||||
})
|
||||
.await?;
|
||||
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
|
||||
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
|
||||
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
|
||||
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
|
||||
let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
|
||||
Ok(TestServer {
|
||||
metrics,
|
||||
storage,
|
||||
rooms,
|
||||
players,
|
||||
core,
|
||||
server,
|
||||
})
|
||||
}
|
||||
|
||||
async fn reboot(mut self) -> Result<TestServer> {
|
||||
async fn reboot(self) -> Result<TestServer> {
|
||||
let config = ServerConfig {
|
||||
listen_on: "127.0.0.1:0".parse().unwrap(),
|
||||
server_name: "testserver".into(),
|
||||
};
|
||||
let TestServer {
|
||||
mut metrics,
|
||||
mut storage,
|
||||
rooms,
|
||||
mut players,
|
||||
metrics: _,
|
||||
storage,
|
||||
mut core,
|
||||
server,
|
||||
} = self;
|
||||
server.terminate().await?;
|
||||
players.shutdown_all().await.unwrap();
|
||||
drop(players);
|
||||
drop(rooms);
|
||||
let mut metrics = MetricsRegistry::new();
|
||||
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
|
||||
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
|
||||
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
|
||||
core.shutdown().await?;
|
||||
let metrics = MetricsRegistry::new();
|
||||
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
|
||||
let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
|
||||
Ok(TestServer {
|
||||
metrics,
|
||||
storage,
|
||||
rooms,
|
||||
players,
|
||||
core,
|
||||
server,
|
||||
})
|
||||
}
|
||||
|
@ -590,7 +582,7 @@ async fn server_time_capability() -> Result<()> {
|
|||
s.send("CAP LS 302").await?;
|
||||
s.send("NICK tester").await?;
|
||||
s.send("USER UserName 0 * :Real Name").await?;
|
||||
s.expect(":testserver CAP * LS :sasl=PLAIN server-time").await?;
|
||||
s.expect_cap_ls().await?;
|
||||
s.send("CAP REQ :sasl server-time").await?;
|
||||
s.expect(":testserver CAP tester ACK :sasl server-time").await?;
|
||||
s.send("AUTHENTICATE PLAIN").await?;
|
||||
|
@ -608,7 +600,7 @@ async fn server_time_capability() -> Result<()> {
|
|||
s.expect(":testserver 366 tester #test :End of /NAMES list").await?;
|
||||
|
||||
server.storage.create_user("some_guy").await?;
|
||||
let mut conn = server.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
|
||||
let mut conn = server.core.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
|
||||
let res = conn.join_room(RoomId::from("test").unwrap()).await?;
|
||||
let JoinResult::Success(_) = res else {
|
||||
panic!("Failed to join room");
|
||||
|
|
|
@ -26,6 +26,7 @@ use lavina_core::prelude::*;
|
|||
use lavina_core::repo::Storage;
|
||||
use lavina_core::room::RoomRegistry;
|
||||
use lavina_core::terminator::Terminator;
|
||||
use lavina_core::LavinaCore;
|
||||
use proto_xmpp::bind::{Name, Resource};
|
||||
use proto_xmpp::stream::*;
|
||||
use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml};
|
||||
|
@ -79,8 +80,7 @@ impl RunningServer {
|
|||
|
||||
pub async fn launch(
|
||||
config: ServerConfig,
|
||||
players: PlayerRegistry,
|
||||
rooms: RoomRegistry,
|
||||
core: LavinaCore,
|
||||
metrics: MetricsRegistry,
|
||||
storage: Storage,
|
||||
) -> Result<RunningServer> {
|
||||
|
@ -122,15 +122,14 @@ pub async fn launch(
|
|||
// TODO kill the older connection and restart it
|
||||
continue;
|
||||
}
|
||||
let players = players.clone();
|
||||
let rooms = rooms.clone();
|
||||
let core = core.clone();
|
||||
let storage = storage.clone();
|
||||
let hostname = config.hostname.clone();
|
||||
let terminator = Terminator::spawn(|termination| {
|
||||
let stopped_tx = stopped_tx.clone();
|
||||
let loaded_config = loaded_config.clone();
|
||||
async move {
|
||||
match handle_socket(loaded_config, stream, &socket_addr, players, rooms, storage, hostname, termination).await {
|
||||
match handle_socket(loaded_config, stream, &socket_addr, core, storage, hostname, termination).await {
|
||||
Ok(_) => log::info!("Connection terminated"),
|
||||
Err(err) => log::warn!("Connection failed: {err}"),
|
||||
}
|
||||
|
@ -168,8 +167,7 @@ async fn handle_socket(
|
|||
cert_config: Arc<LoadedConfig>,
|
||||
mut stream: TcpStream,
|
||||
socket_addr: &SocketAddr,
|
||||
mut players: PlayerRegistry,
|
||||
rooms: RoomRegistry,
|
||||
mut core: LavinaCore,
|
||||
mut storage: Storage,
|
||||
hostname: Str,
|
||||
termination: Deferred<()>, // TODO use it to stop the connection gracefully
|
||||
|
@ -207,14 +205,14 @@ async fn handle_socket(
|
|||
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => {
|
||||
match authenticated {
|
||||
Ok(authenticated) => {
|
||||
let mut connection = players.connect_to_player(&authenticated.player_id).await;
|
||||
let mut connection = core.players.connect_to_player(&authenticated.player_id).await;
|
||||
socket_final(
|
||||
&mut xml_reader,
|
||||
&mut xml_writer,
|
||||
&mut reader_buf,
|
||||
&authenticated,
|
||||
&mut connection,
|
||||
&rooms,
|
||||
&core.rooms,
|
||||
&hostname,
|
||||
)
|
||||
.await?;
|
||||
|
|
|
@ -16,9 +16,8 @@ use tokio_rustls::rustls::client::ServerCertVerifier;
|
|||
use tokio_rustls::rustls::{ClientConfig, ServerName};
|
||||
use tokio_rustls::TlsConnector;
|
||||
|
||||
use lavina_core::player::PlayerRegistry;
|
||||
use lavina_core::repo::{Storage, StorageConfig};
|
||||
use lavina_core::room::RoomRegistry;
|
||||
use lavina_core::LavinaCore;
|
||||
use projection_xmpp::{launch, RunningServer, ServerConfig};
|
||||
use proto_xmpp::xml::{Continuation, FromXml, Parser};
|
||||
|
||||
|
@ -124,8 +123,7 @@ impl ServerCertVerifier for IgnoreCertVerification {
|
|||
struct TestServer {
|
||||
metrics: MetricsRegistry,
|
||||
storage: Storage,
|
||||
rooms: RoomRegistry,
|
||||
players: PlayerRegistry,
|
||||
core: LavinaCore,
|
||||
server: RunningServer,
|
||||
}
|
||||
impl TestServer {
|
||||
|
@ -137,19 +135,17 @@ impl TestServer {
|
|||
key: "tests/certs/xmpp.key".parse().unwrap(),
|
||||
hostname: "localhost".into(),
|
||||
};
|
||||
let mut metrics = MetricsRegistry::new();
|
||||
let mut storage = Storage::open(StorageConfig {
|
||||
let metrics = MetricsRegistry::new();
|
||||
let storage = Storage::open(StorageConfig {
|
||||
db_path: ":memory:".into(),
|
||||
})
|
||||
.await?;
|
||||
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
|
||||
let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
|
||||
let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
|
||||
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
|
||||
let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
|
||||
Ok(TestServer {
|
||||
metrics,
|
||||
storage,
|
||||
rooms,
|
||||
players,
|
||||
core,
|
||||
server,
|
||||
})
|
||||
}
|
||||
|
|
17
src/http.rs
17
src/http.rs
|
@ -16,6 +16,7 @@ use lavina_core::prelude::*;
|
|||
use lavina_core::repo::Storage;
|
||||
use lavina_core::room::RoomRegistry;
|
||||
use lavina_core::terminator::Terminator;
|
||||
use lavina_core::LavinaCore;
|
||||
|
||||
use mgmt_api::*;
|
||||
|
||||
|
@ -29,20 +30,20 @@ pub struct ServerConfig {
|
|||
pub async fn launch(
|
||||
config: ServerConfig,
|
||||
metrics: MetricsRegistry,
|
||||
rooms: RoomRegistry,
|
||||
core: LavinaCore,
|
||||
storage: Storage,
|
||||
) -> Result<Terminator> {
|
||||
log::info!("Starting the http service");
|
||||
let listener = TcpListener::bind(config.listen_on).await?;
|
||||
log::debug!("Listener started");
|
||||
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, storage, rx.map(|_| ())));
|
||||
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, storage, rx.map(|_| ())));
|
||||
Ok(terminator)
|
||||
}
|
||||
|
||||
async fn main_loop(
|
||||
listener: TcpListener,
|
||||
metrics: MetricsRegistry,
|
||||
rooms: RoomRegistry,
|
||||
core: LavinaCore,
|
||||
storage: Storage,
|
||||
termination: impl Future<Output = ()>,
|
||||
) -> Result<()> {
|
||||
|
@ -55,13 +56,13 @@ async fn main_loop(
|
|||
let (stream, _) = result?;
|
||||
let stream = TokioIo::new(stream);
|
||||
let metrics = metrics.clone();
|
||||
let rooms = rooms.clone();
|
||||
let core = core.clone();
|
||||
let storage = storage.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let registry = metrics.clone();
|
||||
let rooms = rooms.clone();
|
||||
let core = core.clone();
|
||||
let storage = storage.clone();
|
||||
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), storage.clone(), r)));
|
||||
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), core.clone(), storage.clone(), r)));
|
||||
if let Err(err) = server.await {
|
||||
tracing::error!("Error serving connection: {:?}", err);
|
||||
}
|
||||
|
@ -75,13 +76,13 @@ async fn main_loop(
|
|||
|
||||
async fn route(
|
||||
registry: MetricsRegistry,
|
||||
rooms: RoomRegistry,
|
||||
core: LavinaCore,
|
||||
storage: Storage,
|
||||
request: Request<hyper::body::Incoming>,
|
||||
) -> HttpResult<Response<Full<Bytes>>> {
|
||||
let res = match (request.method(), request.uri().path()) {
|
||||
(&Method::GET, "/metrics") => endpoint_metrics(registry),
|
||||
(&Method::GET, "/rooms") => endpoint_rooms(rooms).await,
|
||||
(&Method::GET, "/rooms") => endpoint_rooms(core.rooms).await,
|
||||
(&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(),
|
||||
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(),
|
||||
_ => not_found(),
|
||||
|
|
33
src/main.rs
33
src/main.rs
|
@ -9,10 +9,9 @@ use figment::{providers::Toml, Figment};
|
|||
use prometheus::Registry as MetricsRegistry;
|
||||
use serde::Deserialize;
|
||||
|
||||
use lavina_core::player::PlayerRegistry;
|
||||
use lavina_core::prelude::*;
|
||||
use lavina_core::repo::Storage;
|
||||
use lavina_core::room::RoomRegistry;
|
||||
use lavina_core::LavinaCore;
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct ServerConfig {
|
||||
|
@ -49,27 +48,12 @@ async fn main() -> Result<()> {
|
|||
xmpp: xmpp_config,
|
||||
storage: storage_config,
|
||||
} = config;
|
||||
let mut metrics = MetricsRegistry::new();
|
||||
let metrics = MetricsRegistry::new();
|
||||
let storage = Storage::open(storage_config).await?;
|
||||
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
|
||||
let mut players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?;
|
||||
let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?;
|
||||
let irc = projection_irc::launch(
|
||||
irc_config,
|
||||
players.clone(),
|
||||
rooms.clone(),
|
||||
metrics.clone(),
|
||||
storage.clone(),
|
||||
)
|
||||
.await?;
|
||||
let xmpp = projection_xmpp::launch(
|
||||
xmpp_config,
|
||||
players.clone(),
|
||||
rooms.clone(),
|
||||
metrics.clone(),
|
||||
storage.clone(),
|
||||
)
|
||||
.await?;
|
||||
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
|
||||
let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone(), storage.clone()).await?;
|
||||
let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone(), storage.clone()).await?;
|
||||
let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone(), storage.clone()).await?;
|
||||
tracing::info!("Started");
|
||||
|
||||
sleep.await;
|
||||
|
@ -78,10 +62,7 @@ async fn main() -> Result<()> {
|
|||
xmpp.terminate().await?;
|
||||
irc.terminate().await?;
|
||||
telemetry_terminator.terminate().await?;
|
||||
players.shutdown_all().await?;
|
||||
drop(players);
|
||||
drop(rooms);
|
||||
storage.close().await?;
|
||||
core.shutdown().await?;
|
||||
tracing::info!("Shutdown complete");
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue