forked from lavina/lavina
				
			Compare commits
	
		
			No commits in common. "db8d151f157c9e06eba5547d71227620ced47154" and "d7a5c70f5311313b01f52a944decc24c632f9b17" have entirely different histories.
		
	
	
		
			db8d151f15
			...
			d7a5c70f53
		
	
		| 
						 | 
					@ -1,11 +1,4 @@
 | 
				
			||||||
//! Domain definitions and implementation of common chat logic.
 | 
					//! Domain definitions and implementation of common chat logic.
 | 
				
			||||||
use anyhow::Result;
 | 
					 | 
				
			||||||
use prometheus::Registry as MetricsRegistry;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
use crate::player::PlayerRegistry;
 | 
					 | 
				
			||||||
use crate::repo::Storage;
 | 
					 | 
				
			||||||
use crate::room::RoomRegistry;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
pub mod player;
 | 
					pub mod player;
 | 
				
			||||||
pub mod prelude;
 | 
					pub mod prelude;
 | 
				
			||||||
pub mod repo;
 | 
					pub mod repo;
 | 
				
			||||||
| 
						 | 
					@ -13,25 +6,3 @@ pub mod room;
 | 
				
			||||||
pub mod terminator;
 | 
					pub mod terminator;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
mod table;
 | 
					mod table;
 | 
				
			||||||
 | 
					 | 
				
			||||||
#[derive(Clone)]
 | 
					 | 
				
			||||||
pub struct LavinaCore {
 | 
					 | 
				
			||||||
    pub players: PlayerRegistry,
 | 
					 | 
				
			||||||
    pub rooms: RoomRegistry,
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
impl LavinaCore {
 | 
					 | 
				
			||||||
    pub async fn new(mut metrics: MetricsRegistry, storage: Storage) -> Result<LavinaCore> {
 | 
					 | 
				
			||||||
        // TODO shutdown all services in reverse order on error
 | 
					 | 
				
			||||||
        let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
 | 
					 | 
				
			||||||
        let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?;
 | 
					 | 
				
			||||||
        Ok(LavinaCore { players, rooms })
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    pub async fn shutdown(mut self) -> Result<()> {
 | 
					 | 
				
			||||||
        self.players.shutdown_all().await?;
 | 
					 | 
				
			||||||
        drop(self.players);
 | 
					 | 
				
			||||||
        drop(self.rooms);
 | 
					 | 
				
			||||||
        Ok(())
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -19,7 +19,6 @@ use lavina_core::prelude::*;
 | 
				
			||||||
use lavina_core::repo::Storage;
 | 
					use lavina_core::repo::Storage;
 | 
				
			||||||
use lavina_core::room::{RoomId, RoomInfo, RoomRegistry};
 | 
					use lavina_core::room::{RoomId, RoomInfo, RoomRegistry};
 | 
				
			||||||
use lavina_core::terminator::Terminator;
 | 
					use lavina_core::terminator::Terminator;
 | 
				
			||||||
use lavina_core::LavinaCore;
 | 
					 | 
				
			||||||
use proto_irc::client::CapabilitySubcommand;
 | 
					use proto_irc::client::CapabilitySubcommand;
 | 
				
			||||||
use proto_irc::client::{client_message, ClientMessage};
 | 
					use proto_irc::client::{client_message, ClientMessage};
 | 
				
			||||||
use proto_irc::server::CapSubBody;
 | 
					use proto_irc::server::CapSubBody;
 | 
				
			||||||
| 
						 | 
					@ -57,7 +56,8 @@ async fn handle_socket(
 | 
				
			||||||
    config: ServerConfig,
 | 
					    config: ServerConfig,
 | 
				
			||||||
    mut stream: TcpStream,
 | 
					    mut stream: TcpStream,
 | 
				
			||||||
    socket_addr: &SocketAddr,
 | 
					    socket_addr: &SocketAddr,
 | 
				
			||||||
    mut core: LavinaCore,
 | 
					    players: PlayerRegistry,
 | 
				
			||||||
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
    termination: Deferred<()>, // TODO use it to stop the connection gracefully
 | 
					    termination: Deferred<()>, // TODO use it to stop the connection gracefully
 | 
				
			||||||
    mut storage: Storage,
 | 
					    mut storage: Storage,
 | 
				
			||||||
) -> Result<()> {
 | 
					) -> Result<()> {
 | 
				
			||||||
| 
						 | 
					@ -77,7 +77,7 @@ async fn handle_socket(
 | 
				
			||||||
            match registered_user {
 | 
					            match registered_user {
 | 
				
			||||||
                Ok(user) => {
 | 
					                Ok(user) => {
 | 
				
			||||||
                    log::debug!("User registered");
 | 
					                    log::debug!("User registered");
 | 
				
			||||||
                    handle_registered_socket(config, core.players, core.rooms, &mut reader, &mut writer, user).await?;
 | 
					                    handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?;
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                Err(err) => {
 | 
					                Err(err) => {
 | 
				
			||||||
                    log::debug!("Registration failed: {err}");
 | 
					                    log::debug!("Registration failed: {err}");
 | 
				
			||||||
| 
						 | 
					@ -970,7 +970,8 @@ impl RunningServer {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub async fn launch(
 | 
					pub async fn launch(
 | 
				
			||||||
    config: ServerConfig,
 | 
					    config: ServerConfig,
 | 
				
			||||||
    core: LavinaCore,
 | 
					    players: PlayerRegistry,
 | 
				
			||||||
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
    metrics: MetricsRegistry,
 | 
					    metrics: MetricsRegistry,
 | 
				
			||||||
    storage: Storage,
 | 
					    storage: Storage,
 | 
				
			||||||
) -> Result<RunningServer> {
 | 
					) -> Result<RunningServer> {
 | 
				
			||||||
| 
						 | 
					@ -1011,12 +1012,13 @@ pub async fn launch(
 | 
				
			||||||
                            }
 | 
					                            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            let terminator = Terminator::spawn(|termination| {
 | 
					                            let terminator = Terminator::spawn(|termination| {
 | 
				
			||||||
                                let core = core.clone();
 | 
					                                let players = players.clone();
 | 
				
			||||||
 | 
					                                let rooms = rooms.clone();
 | 
				
			||||||
                                let current_connections_clone = current_connections.clone();
 | 
					                                let current_connections_clone = current_connections.clone();
 | 
				
			||||||
                                let stopped_tx = stopped_tx.clone();
 | 
					                                let stopped_tx = stopped_tx.clone();
 | 
				
			||||||
                                let storage = storage.clone();
 | 
					                                let storage = storage.clone();
 | 
				
			||||||
                                async move {
 | 
					                                async move {
 | 
				
			||||||
                                    match handle_socket(config, stream, &socket_addr, core, termination, storage).await {
 | 
					                                    match handle_socket(config, stream, &socket_addr, players, rooms, termination, storage).await {
 | 
				
			||||||
                                        Ok(_) => log::info!("Connection terminated"),
 | 
					                                        Ok(_) => log::info!("Connection terminated"),
 | 
				
			||||||
                                        Err(err) => log::warn!("Connection failed: {err}"),
 | 
					                                        Err(err) => log::warn!("Connection failed: {err}"),
 | 
				
			||||||
                                    }
 | 
					                                    }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -11,7 +11,7 @@ use tokio::net::TcpStream;
 | 
				
			||||||
use lavina_core::player::{JoinResult, PlayerId, SendMessageResult};
 | 
					use lavina_core::player::{JoinResult, PlayerId, SendMessageResult};
 | 
				
			||||||
use lavina_core::repo::{Storage, StorageConfig};
 | 
					use lavina_core::repo::{Storage, StorageConfig};
 | 
				
			||||||
use lavina_core::room::RoomId;
 | 
					use lavina_core::room::RoomId;
 | 
				
			||||||
use lavina_core::LavinaCore;
 | 
					use lavina_core::{player::PlayerRegistry, room::RoomRegistry};
 | 
				
			||||||
use projection_irc::APP_VERSION;
 | 
					use projection_irc::APP_VERSION;
 | 
				
			||||||
use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig};
 | 
					use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -102,7 +102,8 @@ impl<'a> TestScope<'a> {
 | 
				
			||||||
struct TestServer {
 | 
					struct TestServer {
 | 
				
			||||||
    metrics: MetricsRegistry,
 | 
					    metrics: MetricsRegistry,
 | 
				
			||||||
    storage: Storage,
 | 
					    storage: Storage,
 | 
				
			||||||
    core: LavinaCore,
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
 | 
					    players: PlayerRegistry,
 | 
				
			||||||
    server: RunningServer,
 | 
					    server: RunningServer,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
impl TestServer {
 | 
					impl TestServer {
 | 
				
			||||||
| 
						 | 
					@ -117,36 +118,43 @@ impl TestServer {
 | 
				
			||||||
            db_path: ":memory:".into(),
 | 
					            db_path: ":memory:".into(),
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
        .await?;
 | 
					        .await?;
 | 
				
			||||||
        let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
 | 
					        let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
 | 
				
			||||||
        let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
 | 
					        let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
 | 
				
			||||||
 | 
					        let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
 | 
				
			||||||
        Ok(TestServer {
 | 
					        Ok(TestServer {
 | 
				
			||||||
            metrics,
 | 
					            metrics,
 | 
				
			||||||
            storage,
 | 
					            storage,
 | 
				
			||||||
            core,
 | 
					            rooms,
 | 
				
			||||||
 | 
					            players,
 | 
				
			||||||
            server,
 | 
					            server,
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async fn reboot(self) -> Result<TestServer> {
 | 
					    async fn reboot(mut self) -> Result<TestServer> {
 | 
				
			||||||
        let config = ServerConfig {
 | 
					        let config = ServerConfig {
 | 
				
			||||||
            listen_on: "127.0.0.1:0".parse().unwrap(),
 | 
					            listen_on: "127.0.0.1:0".parse().unwrap(),
 | 
				
			||||||
            server_name: "testserver".into(),
 | 
					            server_name: "testserver".into(),
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
        let TestServer {
 | 
					        let TestServer {
 | 
				
			||||||
            metrics: _,
 | 
					            mut metrics,
 | 
				
			||||||
            storage,
 | 
					            mut storage,
 | 
				
			||||||
            mut core,
 | 
					            rooms,
 | 
				
			||||||
 | 
					            mut players,
 | 
				
			||||||
            server,
 | 
					            server,
 | 
				
			||||||
        } = self;
 | 
					        } = self;
 | 
				
			||||||
        server.terminate().await?;
 | 
					        server.terminate().await?;
 | 
				
			||||||
        core.shutdown().await?;
 | 
					        players.shutdown_all().await.unwrap();
 | 
				
			||||||
        let metrics = MetricsRegistry::new();
 | 
					        drop(players);
 | 
				
			||||||
        let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
 | 
					        drop(rooms);
 | 
				
			||||||
        let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
 | 
					        let mut metrics = MetricsRegistry::new();
 | 
				
			||||||
 | 
					        let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
 | 
				
			||||||
 | 
					        let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
 | 
				
			||||||
 | 
					        let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
 | 
				
			||||||
        Ok(TestServer {
 | 
					        Ok(TestServer {
 | 
				
			||||||
            metrics,
 | 
					            metrics,
 | 
				
			||||||
            storage,
 | 
					            storage,
 | 
				
			||||||
            core,
 | 
					            rooms,
 | 
				
			||||||
 | 
					            players,
 | 
				
			||||||
            server,
 | 
					            server,
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					@ -582,7 +590,7 @@ async fn server_time_capability() -> Result<()> {
 | 
				
			||||||
    s.send("CAP LS 302").await?;
 | 
					    s.send("CAP LS 302").await?;
 | 
				
			||||||
    s.send("NICK tester").await?;
 | 
					    s.send("NICK tester").await?;
 | 
				
			||||||
    s.send("USER UserName 0 * :Real Name").await?;
 | 
					    s.send("USER UserName 0 * :Real Name").await?;
 | 
				
			||||||
    s.expect_cap_ls().await?;
 | 
					    s.expect(":testserver CAP * LS :sasl=PLAIN server-time").await?;
 | 
				
			||||||
    s.send("CAP REQ :sasl server-time").await?;
 | 
					    s.send("CAP REQ :sasl server-time").await?;
 | 
				
			||||||
    s.expect(":testserver CAP tester ACK :sasl server-time").await?;
 | 
					    s.expect(":testserver CAP tester ACK :sasl server-time").await?;
 | 
				
			||||||
    s.send("AUTHENTICATE PLAIN").await?;
 | 
					    s.send("AUTHENTICATE PLAIN").await?;
 | 
				
			||||||
| 
						 | 
					@ -600,7 +608,7 @@ async fn server_time_capability() -> Result<()> {
 | 
				
			||||||
    s.expect(":testserver 366 tester #test :End of /NAMES list").await?;
 | 
					    s.expect(":testserver 366 tester #test :End of /NAMES list").await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    server.storage.create_user("some_guy").await?;
 | 
					    server.storage.create_user("some_guy").await?;
 | 
				
			||||||
    let mut conn = server.core.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
 | 
					    let mut conn = server.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
 | 
				
			||||||
    let res = conn.join_room(RoomId::from("test").unwrap()).await?;
 | 
					    let res = conn.join_room(RoomId::from("test").unwrap()).await?;
 | 
				
			||||||
    let JoinResult::Success(_) = res else {
 | 
					    let JoinResult::Success(_) = res else {
 | 
				
			||||||
        panic!("Failed to join room");
 | 
					        panic!("Failed to join room");
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -26,7 +26,6 @@ use lavina_core::prelude::*;
 | 
				
			||||||
use lavina_core::repo::Storage;
 | 
					use lavina_core::repo::Storage;
 | 
				
			||||||
use lavina_core::room::RoomRegistry;
 | 
					use lavina_core::room::RoomRegistry;
 | 
				
			||||||
use lavina_core::terminator::Terminator;
 | 
					use lavina_core::terminator::Terminator;
 | 
				
			||||||
use lavina_core::LavinaCore;
 | 
					 | 
				
			||||||
use proto_xmpp::bind::{Name, Resource};
 | 
					use proto_xmpp::bind::{Name, Resource};
 | 
				
			||||||
use proto_xmpp::stream::*;
 | 
					use proto_xmpp::stream::*;
 | 
				
			||||||
use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml};
 | 
					use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml};
 | 
				
			||||||
| 
						 | 
					@ -80,7 +79,8 @@ impl RunningServer {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub async fn launch(
 | 
					pub async fn launch(
 | 
				
			||||||
    config: ServerConfig,
 | 
					    config: ServerConfig,
 | 
				
			||||||
    core: LavinaCore,
 | 
					    players: PlayerRegistry,
 | 
				
			||||||
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
    metrics: MetricsRegistry,
 | 
					    metrics: MetricsRegistry,
 | 
				
			||||||
    storage: Storage,
 | 
					    storage: Storage,
 | 
				
			||||||
) -> Result<RunningServer> {
 | 
					) -> Result<RunningServer> {
 | 
				
			||||||
| 
						 | 
					@ -122,14 +122,15 @@ pub async fn launch(
 | 
				
			||||||
                                // TODO kill the older connection and restart it
 | 
					                                // TODO kill the older connection and restart it
 | 
				
			||||||
                                continue;
 | 
					                                continue;
 | 
				
			||||||
                            }
 | 
					                            }
 | 
				
			||||||
                            let core = core.clone();
 | 
					                            let players = players.clone();
 | 
				
			||||||
 | 
					                            let rooms = rooms.clone();
 | 
				
			||||||
                            let storage = storage.clone();
 | 
					                            let storage = storage.clone();
 | 
				
			||||||
                            let hostname = config.hostname.clone();
 | 
					                            let hostname = config.hostname.clone();
 | 
				
			||||||
                            let terminator = Terminator::spawn(|termination| {
 | 
					                            let terminator = Terminator::spawn(|termination| {
 | 
				
			||||||
                                let stopped_tx = stopped_tx.clone();
 | 
					                                let stopped_tx = stopped_tx.clone();
 | 
				
			||||||
                                let loaded_config = loaded_config.clone();
 | 
					                                let loaded_config = loaded_config.clone();
 | 
				
			||||||
                                async move {
 | 
					                                async move {
 | 
				
			||||||
                                    match handle_socket(loaded_config, stream, &socket_addr, core, storage, hostname, termination).await {
 | 
					                                    match handle_socket(loaded_config, stream, &socket_addr, players, rooms, storage, hostname, termination).await {
 | 
				
			||||||
                                        Ok(_) => log::info!("Connection terminated"),
 | 
					                                        Ok(_) => log::info!("Connection terminated"),
 | 
				
			||||||
                                        Err(err) => log::warn!("Connection failed: {err}"),
 | 
					                                        Err(err) => log::warn!("Connection failed: {err}"),
 | 
				
			||||||
                                    }
 | 
					                                    }
 | 
				
			||||||
| 
						 | 
					@ -167,7 +168,8 @@ async fn handle_socket(
 | 
				
			||||||
    cert_config: Arc<LoadedConfig>,
 | 
					    cert_config: Arc<LoadedConfig>,
 | 
				
			||||||
    mut stream: TcpStream,
 | 
					    mut stream: TcpStream,
 | 
				
			||||||
    socket_addr: &SocketAddr,
 | 
					    socket_addr: &SocketAddr,
 | 
				
			||||||
    mut core: LavinaCore,
 | 
					    mut players: PlayerRegistry,
 | 
				
			||||||
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
    mut storage: Storage,
 | 
					    mut storage: Storage,
 | 
				
			||||||
    hostname: Str,
 | 
					    hostname: Str,
 | 
				
			||||||
    termination: Deferred<()>, // TODO use it to stop the connection gracefully
 | 
					    termination: Deferred<()>, // TODO use it to stop the connection gracefully
 | 
				
			||||||
| 
						 | 
					@ -205,14 +207,14 @@ async fn handle_socket(
 | 
				
			||||||
        authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => {
 | 
					        authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => {
 | 
				
			||||||
            match authenticated {
 | 
					            match authenticated {
 | 
				
			||||||
                Ok(authenticated) => {
 | 
					                Ok(authenticated) => {
 | 
				
			||||||
                    let mut connection = core.players.connect_to_player(&authenticated.player_id).await;
 | 
					                    let mut connection = players.connect_to_player(&authenticated.player_id).await;
 | 
				
			||||||
                    socket_final(
 | 
					                    socket_final(
 | 
				
			||||||
                        &mut xml_reader,
 | 
					                        &mut xml_reader,
 | 
				
			||||||
                        &mut xml_writer,
 | 
					                        &mut xml_writer,
 | 
				
			||||||
                        &mut reader_buf,
 | 
					                        &mut reader_buf,
 | 
				
			||||||
                        &authenticated,
 | 
					                        &authenticated,
 | 
				
			||||||
                        &mut connection,
 | 
					                        &mut connection,
 | 
				
			||||||
                        &core.rooms,
 | 
					                        &rooms,
 | 
				
			||||||
                        &hostname,
 | 
					                        &hostname,
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                    .await?;
 | 
					                    .await?;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -16,8 +16,9 @@ use tokio_rustls::rustls::client::ServerCertVerifier;
 | 
				
			||||||
use tokio_rustls::rustls::{ClientConfig, ServerName};
 | 
					use tokio_rustls::rustls::{ClientConfig, ServerName};
 | 
				
			||||||
use tokio_rustls::TlsConnector;
 | 
					use tokio_rustls::TlsConnector;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use lavina_core::player::PlayerRegistry;
 | 
				
			||||||
use lavina_core::repo::{Storage, StorageConfig};
 | 
					use lavina_core::repo::{Storage, StorageConfig};
 | 
				
			||||||
use lavina_core::LavinaCore;
 | 
					use lavina_core::room::RoomRegistry;
 | 
				
			||||||
use projection_xmpp::{launch, RunningServer, ServerConfig};
 | 
					use projection_xmpp::{launch, RunningServer, ServerConfig};
 | 
				
			||||||
use proto_xmpp::xml::{Continuation, FromXml, Parser};
 | 
					use proto_xmpp::xml::{Continuation, FromXml, Parser};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -123,7 +124,8 @@ impl ServerCertVerifier for IgnoreCertVerification {
 | 
				
			||||||
struct TestServer {
 | 
					struct TestServer {
 | 
				
			||||||
    metrics: MetricsRegistry,
 | 
					    metrics: MetricsRegistry,
 | 
				
			||||||
    storage: Storage,
 | 
					    storage: Storage,
 | 
				
			||||||
    core: LavinaCore,
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
 | 
					    players: PlayerRegistry,
 | 
				
			||||||
    server: RunningServer,
 | 
					    server: RunningServer,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
impl TestServer {
 | 
					impl TestServer {
 | 
				
			||||||
| 
						 | 
					@ -135,17 +137,19 @@ impl TestServer {
 | 
				
			||||||
            key: "tests/certs/xmpp.key".parse().unwrap(),
 | 
					            key: "tests/certs/xmpp.key".parse().unwrap(),
 | 
				
			||||||
            hostname: "localhost".into(),
 | 
					            hostname: "localhost".into(),
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
        let metrics = MetricsRegistry::new();
 | 
					        let mut metrics = MetricsRegistry::new();
 | 
				
			||||||
        let storage = Storage::open(StorageConfig {
 | 
					        let mut storage = Storage::open(StorageConfig {
 | 
				
			||||||
            db_path: ":memory:".into(),
 | 
					            db_path: ":memory:".into(),
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
        .await?;
 | 
					        .await?;
 | 
				
			||||||
        let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
 | 
					        let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
 | 
				
			||||||
        let server = launch(config, core.clone(), metrics.clone(), storage.clone()).await.unwrap();
 | 
					        let players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics).unwrap();
 | 
				
			||||||
 | 
					        let server = launch(config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await.unwrap();
 | 
				
			||||||
        Ok(TestServer {
 | 
					        Ok(TestServer {
 | 
				
			||||||
            metrics,
 | 
					            metrics,
 | 
				
			||||||
            storage,
 | 
					            storage,
 | 
				
			||||||
            core,
 | 
					            rooms,
 | 
				
			||||||
 | 
					            players,
 | 
				
			||||||
            server,
 | 
					            server,
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										17
									
								
								src/http.rs
								
								
								
								
							
							
						
						
									
										17
									
								
								src/http.rs
								
								
								
								
							| 
						 | 
					@ -16,7 +16,6 @@ use lavina_core::prelude::*;
 | 
				
			||||||
use lavina_core::repo::Storage;
 | 
					use lavina_core::repo::Storage;
 | 
				
			||||||
use lavina_core::room::RoomRegistry;
 | 
					use lavina_core::room::RoomRegistry;
 | 
				
			||||||
use lavina_core::terminator::Terminator;
 | 
					use lavina_core::terminator::Terminator;
 | 
				
			||||||
use lavina_core::LavinaCore;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
use mgmt_api::*;
 | 
					use mgmt_api::*;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -30,20 +29,20 @@ pub struct ServerConfig {
 | 
				
			||||||
pub async fn launch(
 | 
					pub async fn launch(
 | 
				
			||||||
    config: ServerConfig,
 | 
					    config: ServerConfig,
 | 
				
			||||||
    metrics: MetricsRegistry,
 | 
					    metrics: MetricsRegistry,
 | 
				
			||||||
    core: LavinaCore,
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
    storage: Storage,
 | 
					    storage: Storage,
 | 
				
			||||||
) -> Result<Terminator> {
 | 
					) -> Result<Terminator> {
 | 
				
			||||||
    log::info!("Starting the http service");
 | 
					    log::info!("Starting the http service");
 | 
				
			||||||
    let listener = TcpListener::bind(config.listen_on).await?;
 | 
					    let listener = TcpListener::bind(config.listen_on).await?;
 | 
				
			||||||
    log::debug!("Listener started");
 | 
					    log::debug!("Listener started");
 | 
				
			||||||
    let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, storage, rx.map(|_| ())));
 | 
					    let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, storage, rx.map(|_| ())));
 | 
				
			||||||
    Ok(terminator)
 | 
					    Ok(terminator)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn main_loop(
 | 
					async fn main_loop(
 | 
				
			||||||
    listener: TcpListener,
 | 
					    listener: TcpListener,
 | 
				
			||||||
    metrics: MetricsRegistry,
 | 
					    metrics: MetricsRegistry,
 | 
				
			||||||
    core: LavinaCore,
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
    storage: Storage,
 | 
					    storage: Storage,
 | 
				
			||||||
    termination: impl Future<Output = ()>,
 | 
					    termination: impl Future<Output = ()>,
 | 
				
			||||||
) -> Result<()> {
 | 
					) -> Result<()> {
 | 
				
			||||||
| 
						 | 
					@ -56,13 +55,13 @@ async fn main_loop(
 | 
				
			||||||
                let (stream, _) = result?;
 | 
					                let (stream, _) = result?;
 | 
				
			||||||
                let stream = TokioIo::new(stream);
 | 
					                let stream = TokioIo::new(stream);
 | 
				
			||||||
                let metrics = metrics.clone();
 | 
					                let metrics = metrics.clone();
 | 
				
			||||||
                let core = core.clone();
 | 
					                let rooms = rooms.clone();
 | 
				
			||||||
                let storage = storage.clone();
 | 
					                let storage = storage.clone();
 | 
				
			||||||
                tokio::task::spawn(async move {
 | 
					                tokio::task::spawn(async move {
 | 
				
			||||||
                    let registry = metrics.clone();
 | 
					                    let registry = metrics.clone();
 | 
				
			||||||
                    let core = core.clone();
 | 
					                    let rooms = rooms.clone();
 | 
				
			||||||
                    let storage = storage.clone();
 | 
					                    let storage = storage.clone();
 | 
				
			||||||
                    let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), core.clone(), storage.clone(), r)));
 | 
					                    let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), storage.clone(), r)));
 | 
				
			||||||
                    if let Err(err) = server.await {
 | 
					                    if let Err(err) = server.await {
 | 
				
			||||||
                        tracing::error!("Error serving connection: {:?}", err);
 | 
					                        tracing::error!("Error serving connection: {:?}", err);
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
| 
						 | 
					@ -76,13 +75,13 @@ async fn main_loop(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn route(
 | 
					async fn route(
 | 
				
			||||||
    registry: MetricsRegistry,
 | 
					    registry: MetricsRegistry,
 | 
				
			||||||
    core: LavinaCore,
 | 
					    rooms: RoomRegistry,
 | 
				
			||||||
    storage: Storage,
 | 
					    storage: Storage,
 | 
				
			||||||
    request: Request<hyper::body::Incoming>,
 | 
					    request: Request<hyper::body::Incoming>,
 | 
				
			||||||
) -> HttpResult<Response<Full<Bytes>>> {
 | 
					) -> HttpResult<Response<Full<Bytes>>> {
 | 
				
			||||||
    let res = match (request.method(), request.uri().path()) {
 | 
					    let res = match (request.method(), request.uri().path()) {
 | 
				
			||||||
        (&Method::GET, "/metrics") => endpoint_metrics(registry),
 | 
					        (&Method::GET, "/metrics") => endpoint_metrics(registry),
 | 
				
			||||||
        (&Method::GET, "/rooms") => endpoint_rooms(core.rooms).await,
 | 
					        (&Method::GET, "/rooms") => endpoint_rooms(rooms).await,
 | 
				
			||||||
        (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(),
 | 
					        (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(),
 | 
				
			||||||
        (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(),
 | 
					        (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(),
 | 
				
			||||||
        _ => not_found(),
 | 
					        _ => not_found(),
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										33
									
								
								src/main.rs
								
								
								
								
							
							
						
						
									
										33
									
								
								src/main.rs
								
								
								
								
							| 
						 | 
					@ -9,9 +9,10 @@ use figment::{providers::Toml, Figment};
 | 
				
			||||||
use prometheus::Registry as MetricsRegistry;
 | 
					use prometheus::Registry as MetricsRegistry;
 | 
				
			||||||
use serde::Deserialize;
 | 
					use serde::Deserialize;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use lavina_core::player::PlayerRegistry;
 | 
				
			||||||
use lavina_core::prelude::*;
 | 
					use lavina_core::prelude::*;
 | 
				
			||||||
use lavina_core::repo::Storage;
 | 
					use lavina_core::repo::Storage;
 | 
				
			||||||
use lavina_core::LavinaCore;
 | 
					use lavina_core::room::RoomRegistry;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Deserialize, Debug)]
 | 
					#[derive(Deserialize, Debug)]
 | 
				
			||||||
struct ServerConfig {
 | 
					struct ServerConfig {
 | 
				
			||||||
| 
						 | 
					@ -48,12 +49,27 @@ async fn main() -> Result<()> {
 | 
				
			||||||
        xmpp: xmpp_config,
 | 
					        xmpp: xmpp_config,
 | 
				
			||||||
        storage: storage_config,
 | 
					        storage: storage_config,
 | 
				
			||||||
    } = config;
 | 
					    } = config;
 | 
				
			||||||
    let metrics = MetricsRegistry::new();
 | 
					    let mut metrics = MetricsRegistry::new();
 | 
				
			||||||
    let storage = Storage::open(storage_config).await?;
 | 
					    let storage = Storage::open(storage_config).await?;
 | 
				
			||||||
    let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
 | 
					    let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
 | 
				
			||||||
    let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone(), storage.clone()).await?;
 | 
					    let mut players = PlayerRegistry::empty(rooms.clone(), storage.clone(), &mut metrics)?;
 | 
				
			||||||
    let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone(), storage.clone()).await?;
 | 
					    let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?;
 | 
				
			||||||
    let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone(), storage.clone()).await?;
 | 
					    let irc = projection_irc::launch(
 | 
				
			||||||
 | 
					        irc_config,
 | 
				
			||||||
 | 
					        players.clone(),
 | 
				
			||||||
 | 
					        rooms.clone(),
 | 
				
			||||||
 | 
					        metrics.clone(),
 | 
				
			||||||
 | 
					        storage.clone(),
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    .await?;
 | 
				
			||||||
 | 
					    let xmpp = projection_xmpp::launch(
 | 
				
			||||||
 | 
					        xmpp_config,
 | 
				
			||||||
 | 
					        players.clone(),
 | 
				
			||||||
 | 
					        rooms.clone(),
 | 
				
			||||||
 | 
					        metrics.clone(),
 | 
				
			||||||
 | 
					        storage.clone(),
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    .await?;
 | 
				
			||||||
    tracing::info!("Started");
 | 
					    tracing::info!("Started");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    sleep.await;
 | 
					    sleep.await;
 | 
				
			||||||
| 
						 | 
					@ -62,7 +78,10 @@ async fn main() -> Result<()> {
 | 
				
			||||||
    xmpp.terminate().await?;
 | 
					    xmpp.terminate().await?;
 | 
				
			||||||
    irc.terminate().await?;
 | 
					    irc.terminate().await?;
 | 
				
			||||||
    telemetry_terminator.terminate().await?;
 | 
					    telemetry_terminator.terminate().await?;
 | 
				
			||||||
    core.shutdown().await?;
 | 
					    players.shutdown_all().await?;
 | 
				
			||||||
 | 
					    drop(players);
 | 
				
			||||||
 | 
					    drop(rooms);
 | 
				
			||||||
 | 
					    storage.close().await?;
 | 
				
			||||||
    tracing::info!("Shutdown complete");
 | 
					    tracing::info!("Shutdown complete");
 | 
				
			||||||
    Ok(())
 | 
					    Ok(())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue