core: separate the model from the logic implementation (#66)

This separates the core in two layers – the model objects and the `LavinaCore` service. Service is responsible for implementing the application logic and exposing it as core's public API to projections, while the model objects will be independent of each other and responsible only for managing and owning in-memory data.

The model objects include:
1. `Storage` – the open connection to the SQLite DB.
2. `PlayerRegistry` – creates, stores refs to, and stops player actors.
3. `RoomRegistry` – manages active rooms.
4. `DialogRegistry` – manages active dialogs.
5. `Broadcasting` – manages subscriptions of players to rooms on remote cluster nodes.
6. `LavinaClient` – manages HTTP connections to remote cluster nodes.
7. `ClusterMetadata` – read-only configuration of the cluster metadata, i.e. allocation of entities to nodes.

As a result:
1. Model objects will be fully independent of each other, e.g. it's no longer necessary to provide a `Storage` to all registries, or to provide `PlayerRegistry` and `DialogRegistry` to each other.
2. Model objects will no longer be `Arc`-wrapped; instead the whole `Services` object will be `Arc`ed and provided to projections.
3. The public API of `lavina-core` will be properly delimited by the APIs of `LavinaCore`, `PlayerConnection` and so on.
4. `LavinaCore` and `PlayerConnection` will also contain APIs of all features, unlike it was previously with `RoomRegistry` and `DialogRegistry`. This is unfortunate, but it could be improved in future.

Reviewed-on: lavina/lavina#66
This commit is contained in:
Nikita Vilunov 2024-05-13 14:32:45 +00:00
parent d1a72e7978
commit 26720a2a08
20 changed files with 395 additions and 492 deletions

View File

@ -3,8 +3,7 @@ use argon2::password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, Salt
use argon2::Argon2; use argon2::Argon2;
use rand_core::OsRng; use rand_core::OsRng;
use crate::prelude::log; use crate::LavinaCore;
use crate::repo::Storage;
pub enum Verdict { pub enum Verdict {
Authenticated, Authenticated,
@ -17,18 +16,10 @@ pub enum UpdatePasswordResult {
UserNotFound, UserNotFound,
} }
#[derive(Clone)] impl LavinaCore {
pub struct Authenticator { #[tracing::instrument(skip(self, provided_password), name = "Services::authenticate")]
storage: Storage,
}
impl Authenticator {
pub fn new(storage: Storage) -> Self {
Self { storage }
}
#[tracing::instrument(skip(self, provided_password), name = "Authenticator::authenticate")]
pub async fn authenticate(&self, login: &str, provided_password: &str) -> Result<Verdict> { pub async fn authenticate(&self, login: &str, provided_password: &str) -> Result<Verdict> {
let Some(stored_user) = self.storage.retrieve_user_by_name(login).await? else { let Some(stored_user) = self.services.storage.retrieve_user_by_name(login).await? else {
return Ok(Verdict::UserNotFound); return Ok(Verdict::UserNotFound);
}; };
if let Some(argon2_hash) = stored_user.argon2_hash { if let Some(argon2_hash) = stored_user.argon2_hash {
@ -48,9 +39,9 @@ impl Authenticator {
Ok(Verdict::InvalidPassword) Ok(Verdict::InvalidPassword)
} }
#[tracing::instrument(skip(self, provided_password), name = "Authenticator::set_password")] #[tracing::instrument(skip(self, provided_password), name = "Services::set_password")]
pub async fn set_password(&self, login: &str, provided_password: &str) -> Result<UpdatePasswordResult> { pub async fn set_password(&self, login: &str, provided_password: &str) -> Result<UpdatePasswordResult> {
let Some(u) = self.storage.retrieve_user_by_name(login).await? else { let Some(u) = self.services.storage.retrieve_user_by_name(login).await? else {
return Ok(UpdatePasswordResult::UserNotFound); return Ok(UpdatePasswordResult::UserNotFound);
}; };
@ -60,8 +51,8 @@ impl Authenticator {
.hash_password(provided_password.as_bytes(), &salt) .hash_password(provided_password.as_bytes(), &salt)
.map_err(|e| anyhow!("Failed to hash password: {e:?}"))?; .map_err(|e| anyhow!("Failed to hash password: {e:?}"))?;
self.storage.set_argon2_challenge(u.id, password_hash.to_string().as_str()).await?; self.services.storage.set_argon2_challenge(u.id, password_hash.to_string().as_str()).await?;
log::info!("Password changed for player {login}"); tracing::info!("Password changed for player {login}");
Ok(UpdatePasswordResult::PasswordUpdated) Ok(UpdatePasswordResult::PasswordUpdated)
} }
} }

View File

@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use reqwest::Client; use reqwest::Client;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::{DefaultSpanBackend, TracingMiddleware}; use reqwest_tracing::{DefaultSpanBackend, TracingMiddleware};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
pub mod broadcast; pub mod broadcast;
pub mod room; pub mod room;
@ -27,19 +27,15 @@ pub struct ClusterMetadata {
pub rooms: HashMap<String, u32>, pub rooms: HashMap<String, u32>,
} }
#[derive(Clone)]
pub struct LavinaClient { pub struct LavinaClient {
addresses: Arc<Addresses>, addresses: Addresses,
client: ClientWithMiddleware, client: ClientWithMiddleware,
} }
impl LavinaClient { impl LavinaClient {
pub fn new(addresses: Addresses) -> Self { pub fn new(addresses: Addresses) -> Self {
let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build(); let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build();
Self { Self { addresses, client }
addresses: Arc::new(addresses),
client,
}
} }
async fn send_request(&self, node_id: u32, path: &str, req: impl Serialize) -> Result<()> { async fn send_request(&self, node_id: u32, path: &str, req: impl Serialize) -> Result<()> {

View File

@ -1,41 +1,32 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use crate::player::{PlayerId, PlayerRegistry, Updates}; use crate::player::{PlayerId, Updates};
use crate::prelude::Str; use crate::prelude::Str;
use crate::room::RoomId; use crate::room::RoomId;
use crate::Services;
/// Receives updates from other nodes and broadcasts them to local player actors. /// Receives updates from other nodes and broadcasts them to local player actors.
struct BroadcastingInner { struct BroadcastingInner {
subscriptions: HashMap<RoomId, HashSet<PlayerId>>, subscriptions: HashMap<RoomId, HashSet<PlayerId>>,
} }
impl Broadcasting {} pub struct Broadcasting(Mutex<BroadcastingInner>);
#[derive(Clone)]
pub struct Broadcasting(Arc<Mutex<BroadcastingInner>>);
impl Broadcasting { impl Broadcasting {
pub fn new() -> Self { pub fn new() -> Self {
let inner = BroadcastingInner { let inner = BroadcastingInner {
subscriptions: HashMap::new(), subscriptions: HashMap::new(),
}; };
Self(Arc::new(Mutex::new(inner))) Self(Mutex::new(inner))
} }
}
impl Services {
/// Broadcasts the given update to subscribed player actors on local node. /// Broadcasts the given update to subscribed player actors on local node.
#[tracing::instrument(skip(self, players, message, created_at), name = "Broadcasting::broadcast")] #[tracing::instrument(skip(self, message, created_at))]
pub async fn broadcast( pub async fn broadcast(&self, room_id: RoomId, author_id: PlayerId, message: Str, created_at: DateTime<Utc>) {
&self, let inner = self.broadcasting.0.lock().await;
players: &PlayerRegistry,
room_id: RoomId,
author_id: PlayerId,
message: Str,
created_at: DateTime<Utc>,
) {
let inner = self.0.lock().await;
let Some(subscribers) = inner.subscriptions.get(&room_id) else { let Some(subscribers) = inner.subscriptions.get(&room_id) else {
return; return;
}; };
@ -49,7 +40,7 @@ impl Broadcasting {
if i == &author_id { if i == &author_id {
continue; continue;
} }
let Some(player) = players.get_player(i).await else { let Some(player) = self.players.get_player(i).await else {
continue; continue;
}; };
player.update(update.clone()).await; player.update(update.clone()).await;
@ -57,6 +48,6 @@ impl Broadcasting {
} }
pub async fn subscribe(&self, subscriber: PlayerId, room_id: RoomId) { pub async fn subscribe(&self, subscriber: PlayerId, room_id: RoomId) {
self.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber); self.broadcasting.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber);
} }
} }

View File

@ -1,6 +1,11 @@
use anyhow::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::clustering::LavinaClient; use crate::clustering::LavinaClient;
use crate::player::PlayerId;
use crate::prelude::Str;
use crate::room::RoomId;
use crate::LavinaCore;
pub mod paths { pub mod paths {
pub const JOIN: &'static str = "/cluster/rooms/join"; pub const JOIN: &'static str = "/cluster/rooms/join";
@ -38,22 +43,46 @@ pub struct SetRoomTopicReq<'a> {
impl LavinaClient { impl LavinaClient {
#[tracing::instrument(skip(self, req), name = "LavinaClient::join_room")] #[tracing::instrument(skip(self, req), name = "LavinaClient::join_room")]
pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> anyhow::Result<()> { pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> Result<()> {
self.send_request(node_id, paths::JOIN, req).await self.send_request(node_id, paths::JOIN, req).await
} }
#[tracing::instrument(skip(self, req), name = "LavinaClient::leave_room")] #[tracing::instrument(skip(self, req), name = "LavinaClient::leave_room")]
pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> anyhow::Result<()> { pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> Result<()> {
self.send_request(node_id, paths::LEAVE, req).await self.send_request(node_id, paths::LEAVE, req).await
} }
#[tracing::instrument(skip(self, req), name = "LavinaClient::send_room_message")] #[tracing::instrument(skip(self, req), name = "LavinaClient::send_room_message")]
pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> anyhow::Result<()> { pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> Result<()> {
self.send_request(node_id, paths::ADD_MESSAGE, req).await self.send_request(node_id, paths::ADD_MESSAGE, req).await
} }
#[tracing::instrument(skip(self, req), name = "LavinaClient::set_room_topic")] #[tracing::instrument(skip(self, req), name = "LavinaClient::set_room_topic")]
pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> anyhow::Result<()> { pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> Result<()> {
self.send_request(node_id, paths::SET_TOPIC, req).await self.send_request(node_id, paths::SET_TOPIC, req).await
} }
} }
impl LavinaCore {
pub async fn cluster_join_room(&self, room_id: RoomId, player_id: &PlayerId) -> Result<()> {
let room_handle = self.services.rooms.get_or_create_room(&self.services, room_id).await?;
let storage_id =
self.services.storage.create_or_retrieve_user_id_by_name(player_id.as_inner().as_ref()).await?;
room_handle.add_member(&self.services, &player_id, storage_id).await;
Ok(())
}
pub async fn cluster_send_room_message(
&self,
room_id: RoomId,
player_id: &PlayerId,
message: Str,
created_at: chrono::DateTime<chrono::Utc>,
) -> Result<Option<()>> {
let Some(room_handle) = self.services.rooms.get_room(&self.services, &room_id).await else {
return Ok(None);
};
room_handle.send_message(&self.services, &player_id, message, created_at).await;
Ok(Some(()))
}
}

View File

@ -4,14 +4,13 @@
//! There are no admins or other roles in dialogs, both participants have equal rights. //! There are no admins or other roles in dialogs, both participants have equal rights.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
use crate::player::{PlayerId, PlayerRegistry, Updates}; use crate::player::{PlayerId, Updates};
use crate::prelude::*; use crate::prelude::*;
use crate::repo::Storage; use crate::Services;
/// Id of a conversation between two players. /// Id of a conversation between two players.
/// ///
@ -45,55 +44,50 @@ struct Dialog {
struct DialogRegistryInner { struct DialogRegistryInner {
dialogs: HashMap<DialogId, AsyncRwLock<Dialog>>, dialogs: HashMap<DialogId, AsyncRwLock<Dialog>>,
players: Option<PlayerRegistry>,
storage: Storage,
} }
#[derive(Clone)] pub(crate) struct DialogRegistry(AsyncRwLock<DialogRegistryInner>);
pub struct DialogRegistry(Arc<AsyncRwLock<DialogRegistryInner>>);
impl DialogRegistry { impl Services {
pub async fn send_message( #[tracing::instrument(skip(self, body, created_at))]
pub async fn send_dialog_message(
&self, &self,
from: PlayerId, from: PlayerId,
to: PlayerId, to: PlayerId,
body: Str, body: Str,
created_at: &DateTime<Utc>, created_at: &DateTime<Utc>,
) -> Result<()> { ) -> Result<()> {
let mut guard = self.0.read().await; let guard = self.dialogs.0.read().await;
let id = DialogId::new(from.clone(), to.clone()); let id = DialogId::new(from.clone(), to.clone());
let dialog = guard.dialogs.get(&id); let dialog = guard.dialogs.get(&id);
if let Some(d) = dialog { if let Some(d) = dialog {
let mut d = d.write().await; let mut d = d.write().await;
guard self.storage
.storage
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at) .insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
.await?; .await?;
d.message_count += 1; d.message_count += 1;
} else { } else {
drop(guard); drop(guard);
let mut guard2 = self.0.write().await; let mut guard2 = self.dialogs.0.write().await;
// double check in case concurrent access has loaded this dialog // double check in case concurrent access has loaded this dialog
if let Some(d) = guard2.dialogs.get(&id) { if let Some(d) = guard2.dialogs.get(&id) {
let mut d = d.write().await; let mut d = d.write().await;
guard2 self.storage
.storage
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at) .insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
.await?; .await?;
d.message_count += 1; d.message_count += 1;
} else { } else {
let (p1, p2) = id.as_inner(); let (p1, p2) = id.as_inner();
tracing::info!("Dialog {id:?} not found locally, trying to load from storage"); tracing::info!("Dialog {id:?} not found locally, trying to load from storage");
let stored_dialog = match guard2.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? { let stored_dialog = match self.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? {
Some(t) => t, Some(t) => t,
None => { None => {
tracing::info!("Dialog {id:?} does not exist, creating a new one in storage"); tracing::info!("Dialog {id:?} does not exist, creating a new one in storage");
guard2.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await? self.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await?
} }
}; };
tracing::info!("Dialog {id:?} loaded"); tracing::info!("Dialog {id:?} loaded");
guard2 self.storage
.storage
.insert_dialog_message( .insert_dialog_message(
stored_dialog.id, stored_dialog.id,
stored_dialog.message_count, stored_dialog.message_count,
@ -110,14 +104,10 @@ impl DialogRegistry {
}; };
guard2.dialogs.insert(id.clone(), AsyncRwLock::new(dialog)); guard2.dialogs.insert(id.clone(), AsyncRwLock::new(dialog));
} }
guard = guard2.downgrade(); drop(guard2);
} }
// TODO send message to the other player and persist it // TODO send message to the other player and persist it
let Some(players) = &guard.players else { let Some(player) = self.players.get_player(&to).await else {
tracing::error!("No player registry present");
return Ok(());
};
let Some(player) = players.get_player(&to).await else {
tracing::debug!("Player {to:?} not active, not sending message"); tracing::debug!("Player {to:?} not active, not sending message");
return Ok(()); return Ok(());
}; };
@ -133,33 +123,13 @@ impl DialogRegistry {
} }
impl DialogRegistry { impl DialogRegistry {
pub fn new(storage: Storage) -> DialogRegistry { pub fn new() -> DialogRegistry {
DialogRegistry(Arc::new(AsyncRwLock::new(DialogRegistryInner { DialogRegistry(AsyncRwLock::new(DialogRegistryInner {
dialogs: HashMap::new(), dialogs: HashMap::new(),
players: None, }))
storage,
})))
} }
pub async fn set_players(&self, players: PlayerRegistry) { pub fn shutdown(self) {}
let mut guard = self.0.write().await;
guard.players = Some(players);
}
pub async fn unset_players(&self) {
let mut guard = self.0.write().await;
guard.players = None;
}
pub fn shutdown(self) -> Result<()> {
let res = match Arc::try_unwrap(self.0) {
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire dialogs ownership on shutdown")),
};
let res = res.into_inner();
drop(res);
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,16 +1,16 @@
//! Domain definitions and implementation of common chat logic. //! Domain definitions and implementation of common chat logic.
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use prometheus::Registry as MetricsRegistry; use prometheus::Registry as MetricsRegistry;
use crate::auth::Authenticator;
use crate::clustering::broadcast::Broadcasting; use crate::clustering::broadcast::Broadcasting;
use crate::clustering::{ClusterConfig, LavinaClient}; use crate::clustering::{ClusterConfig, ClusterMetadata, LavinaClient};
use crate::dialog::DialogRegistry; use crate::dialog::DialogRegistry;
use crate::player::PlayerRegistry; use crate::player::{PlayerConnection, PlayerId, PlayerRegistry};
use crate::repo::Storage; use crate::repo::Storage;
use crate::room::RoomRegistry; use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry};
pub mod auth; pub mod auth;
pub mod clustering; pub mod clustering;
@ -25,50 +25,88 @@ mod table;
#[derive(Clone)] #[derive(Clone)]
pub struct LavinaCore { pub struct LavinaCore {
pub players: PlayerRegistry, services: Arc<Services>,
pub rooms: RoomRegistry, }
pub dialogs: DialogRegistry,
pub broadcasting: Broadcasting, impl Deref for LavinaCore {
pub authenticator: Authenticator, type Target = Services;
fn deref(&self) -> &Self::Target {
&self.services
}
}
impl LavinaCore {
pub async fn connect_to_player(&self, player_id: &PlayerId) -> PlayerConnection {
self.services.players.connect_to_player(&self, player_id).await
}
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
self.services.rooms.get_room(&self.services, room_id).await
}
pub async fn create_player(&self, player_id: &PlayerId) -> Result<()> {
self.services.storage.create_user(player_id.as_inner()).await
}
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
self.services.rooms.get_all_rooms().await
}
pub async fn stop_player(&self, player_id: &PlayerId) -> Result<Option<()>> {
self.services.players.stop_player(player_id).await
}
}
pub struct Services {
pub(crate) players: PlayerRegistry,
pub(crate) rooms: RoomRegistry,
pub(crate) dialogs: DialogRegistry,
pub(crate) broadcasting: Broadcasting,
pub(crate) client: LavinaClient,
pub(crate) storage: Storage,
pub(crate) cluster_metadata: ClusterMetadata,
} }
impl LavinaCore { impl LavinaCore {
pub async fn new( pub async fn new(
mut metrics: MetricsRegistry, metrics: &mut MetricsRegistry,
cluster_config: ClusterConfig, cluster_config: ClusterConfig,
storage: Storage, storage: Storage,
) -> Result<LavinaCore> { ) -> Result<LavinaCore> {
// TODO shutdown all services in reverse order on error // TODO shutdown all services in reverse order on error
let broadcasting = Broadcasting::new(); let broadcasting = Broadcasting::new();
let client = LavinaClient::new(cluster_config.addresses.clone()); let client = LavinaClient::new(cluster_config.addresses.clone());
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let rooms = RoomRegistry::new(metrics)?;
let dialogs = DialogRegistry::new(storage.clone()); let dialogs = DialogRegistry::new();
let players = PlayerRegistry::empty( let players = PlayerRegistry::empty(metrics)?;
rooms.clone(),
dialogs.clone(), let services = Services {
storage.clone(),
&mut metrics,
Arc::new(cluster_config.metadata),
client,
broadcasting.clone(),
)?;
dialogs.set_players(players.clone()).await;
let authenticator = Authenticator::new(storage.clone());
Ok(LavinaCore {
players, players,
rooms, rooms,
dialogs, dialogs,
broadcasting, broadcasting,
authenticator, client,
storage,
cluster_metadata: cluster_config.metadata,
};
Ok(LavinaCore {
services: Arc::new(services),
}) })
} }
pub async fn shutdown(mut self) -> Result<()> { pub async fn shutdown(self) -> Storage {
self.players.shutdown_all().await?; let _ = self.players.shutdown_all().await;
self.dialogs.unset_players().await; let services = match Arc::try_unwrap(self.services) {
self.players.shutdown()?; Ok(e) => e,
self.dialogs.shutdown()?; Err(_) => {
self.rooms.shutdown()?; panic!("failed to acquire services ownership on shutdown");
Ok(()) }
};
let _ = services.players.shutdown();
let _ = services.dialogs.shutdown();
let _ = services.rooms.shutdown();
services.storage
} }
} }

View File

@ -8,7 +8,6 @@
//! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor, //! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor,
//! so that they don't overload the room actor. //! so that they don't overload the room actor.
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use prometheus::{IntGauge, Registry as MetricsRegistry}; use prometheus::{IntGauge, Registry as MetricsRegistry};
@ -17,14 +16,11 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
use crate::clustering::broadcast::Broadcasting;
use crate::clustering::room::*; use crate::clustering::room::*;
use crate::clustering::{ClusterMetadata, LavinaClient};
use crate::dialog::DialogRegistry;
use crate::prelude::*; use crate::prelude::*;
use crate::repo::Storage; use crate::room::{RoomHandle, RoomId, RoomInfo};
use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry};
use crate::table::{AnonTable, Key as AnonKey}; use crate::table::{AnonTable, Key as AnonKey};
use crate::LavinaCore;
/// Opaque player identifier. Cannot contain spaces, must be shorter than 32. /// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
@ -266,41 +262,21 @@ pub enum Updates {
} }
/// Handle to a player registry — a shared data structure containing information about players. /// Handle to a player registry — a shared data structure containing information about players.
#[derive(Clone)] pub(crate) struct PlayerRegistry(RwLock<PlayerRegistryInner>);
pub struct PlayerRegistry(Arc<RwLock<PlayerRegistryInner>>);
impl PlayerRegistry { impl PlayerRegistry {
pub fn empty( pub fn empty(metrics: &mut MetricsRegistry) -> Result<PlayerRegistry> {
room_registry: RoomRegistry,
dialogs: DialogRegistry,
storage: Storage,
metrics: &mut MetricsRegistry,
cluster_metadata: Arc<ClusterMetadata>,
cluster_client: LavinaClient,
broadcasting: Broadcasting,
) -> Result<PlayerRegistry> {
let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?;
metrics.register(Box::new(metric_active_players.clone()))?; metrics.register(Box::new(metric_active_players.clone()))?;
let inner = PlayerRegistryInner { let inner = PlayerRegistryInner {
room_registry,
dialogs,
storage,
cluster_metadata,
cluster_client,
broadcasting,
players: HashMap::new(), players: HashMap::new(),
metric_active_players, metric_active_players,
}; };
Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) Ok(PlayerRegistry(RwLock::new(inner)))
} }
pub fn shutdown(self) -> Result<()> { pub fn shutdown(self) {
let res = match Arc::try_unwrap(self.0) { let res = self.0.into_inner();
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire players ownership on shutdown")),
};
let res = res.into_inner();
drop(res); drop(res);
Ok(())
} }
#[tracing::instrument(skip(self), name = "PlayerRegistry::get_player")] #[tracing::instrument(skip(self), name = "PlayerRegistry::get_player")]
@ -323,8 +299,8 @@ impl PlayerRegistry {
} }
} }
#[tracing::instrument(skip(self), name = "PlayerRegistry::get_or_launch_player")] #[tracing::instrument(skip(self, core), name = "PlayerRegistry::get_or_launch_player")]
pub async fn get_or_launch_player(&self, id: &PlayerId) -> PlayerHandle { pub async fn get_or_launch_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerHandle {
let inner = self.0.read().await; let inner = self.0.read().await;
if let Some((handle, _)) = inner.players.get(id) { if let Some((handle, _)) = inner.players.get(id) {
handle.clone() handle.clone()
@ -334,17 +310,7 @@ impl PlayerRegistry {
if let Some((handle, _)) = inner.players.get(id) { if let Some((handle, _)) = inner.players.get(id) {
handle.clone() handle.clone()
} else { } else {
let (handle, fiber) = Player::launch( let (handle, fiber) = Player::launch(id.clone(), core.clone()).await;
id.clone(),
self.clone(),
inner.room_registry.clone(),
inner.dialogs.clone(),
inner.cluster_metadata.clone(),
inner.cluster_client.clone(),
inner.broadcasting.clone(),
inner.storage.clone(),
)
.await;
inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.players.insert(id.clone(), (handle.clone(), fiber));
inner.metric_active_players.inc(); inner.metric_active_players.inc();
handle handle
@ -352,13 +318,13 @@ impl PlayerRegistry {
} }
} }
#[tracing::instrument(skip(self), name = "PlayerRegistry::connect_to_player")] #[tracing::instrument(skip(self, core), name = "PlayerRegistry::connect_to_player")]
pub async fn connect_to_player(&self, id: &PlayerId) -> PlayerConnection { pub async fn connect_to_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerConnection {
let player_handle = self.get_or_launch_player(id).await; let player_handle = self.get_or_launch_player(core, id).await;
player_handle.subscribe().await player_handle.subscribe().await
} }
pub async fn shutdown_all(&mut self) -> Result<()> { pub async fn shutdown_all(&self) -> Result<()> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
for (i, (k, j)) in inner.players.drain() { for (i, (k, j)) in inner.players.drain() {
k.send(ActorCommand::Stop).await; k.send(ActorCommand::Stop).await;
@ -373,12 +339,6 @@ impl PlayerRegistry {
/// The player registry state representation. /// The player registry state representation.
struct PlayerRegistryInner { struct PlayerRegistryInner {
room_registry: RoomRegistry,
dialogs: DialogRegistry,
storage: Storage,
cluster_metadata: Arc<ClusterMetadata>,
cluster_client: LavinaClient,
broadcasting: Broadcasting,
/// Active player actors. /// Active player actors.
players: HashMap<PlayerId, (PlayerHandle, JoinHandle<Player>)>, players: HashMap<PlayerId, (PlayerHandle, JoinHandle<Player>)>,
metric_active_players: IntGauge, metric_active_players: IntGauge,
@ -398,29 +358,14 @@ struct Player {
banned_from: HashSet<RoomId>, banned_from: HashSet<RoomId>,
rx: Receiver<(ActorCommand, Span)>, rx: Receiver<(ActorCommand, Span)>,
handle: PlayerHandle, handle: PlayerHandle,
players: PlayerRegistry, services: LavinaCore,
rooms: RoomRegistry,
dialogs: DialogRegistry,
storage: Storage,
cluster_metadata: Arc<ClusterMetadata>,
cluster_client: LavinaClient,
broadcasting: Broadcasting,
} }
impl Player { impl Player {
async fn launch( async fn launch(player_id: PlayerId, core: LavinaCore) -> (PlayerHandle, JoinHandle<Player>) {
player_id: PlayerId,
players: PlayerRegistry,
rooms: RoomRegistry,
dialogs: DialogRegistry,
cluster_metadata: Arc<ClusterMetadata>,
cluster_client: LavinaClient,
broadcasting: Broadcasting,
storage: Storage,
) -> (PlayerHandle, JoinHandle<Player>) {
let (tx, rx) = channel(32); let (tx, rx) = channel(32);
let handle = PlayerHandle { tx }; let handle = PlayerHandle { tx };
let handle_clone = handle.clone(); let handle_clone = handle.clone();
let storage_id = storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap(); let storage_id = core.services.storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap();
let player = Player { let player = Player {
player_id, player_id,
storage_id, storage_id,
@ -432,22 +377,16 @@ impl Player {
banned_from: HashSet::new(), banned_from: HashSet::new(),
rx, rx,
handle, handle,
players, services: core,
rooms,
dialogs,
storage,
cluster_metadata,
cluster_client,
broadcasting,
}; };
let fiber = tokio::task::spawn(player.main_loop()); let fiber = tokio::task::spawn(player.main_loop());
(handle_clone, fiber) (handle_clone, fiber)
} }
fn room_location(&self, room_id: &RoomId) -> Option<u32> { fn room_location(&self, room_id: &RoomId) -> Option<u32> {
let res = self.cluster_metadata.rooms.get(room_id.as_inner().as_ref()).copied(); let res = self.services.cluster_metadata.rooms.get(room_id.as_inner().as_ref()).copied();
let node = res.unwrap_or(self.cluster_metadata.main_owner); let node = res.unwrap_or(self.services.cluster_metadata.main_owner);
if node == self.cluster_metadata.node_id { if node == self.services.cluster_metadata.node_id {
None None
} else { } else {
Some(node) Some(node)
@ -455,13 +394,13 @@ impl Player {
} }
async fn main_loop(mut self) -> Self { async fn main_loop(mut self) -> Self {
let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap(); let rooms = self.services.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
for room_id in rooms { for room_id in rooms {
if let Some(remote_node) = self.room_location(&room_id) { if let Some(remote_node) = self.room_location(&room_id) {
self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node }); self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node });
self.broadcasting.subscribe(self.player_id.clone(), room_id).await; self.services.subscribe(self.player_id.clone(), room_id).await;
} else { } else {
let room = self.rooms.get_room(&room_id).await; let room = self.services.rooms.get_room(&self.services, &room_id).await;
if let Some(room) = room { if let Some(room) = room {
self.my_rooms.insert(room_id, RoomRef::Local(room)); self.my_rooms.insert(room_id, RoomRef::Local(room));
} else { } else {
@ -587,9 +526,10 @@ impl Player {
room_id: room_id.as_inner(), room_id: room_id.as_inner(),
player_id: self.player_id.as_inner(), player_id: self.player_id.as_inner(),
}; };
self.cluster_client.join_room(remote_node, req).await.unwrap(); self.services.client.join_room(remote_node, req).await.unwrap();
let room_storage_id = self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap(); let room_storage_id =
self.storage.add_room_member(room_storage_id, self.storage_id).await.unwrap(); self.services.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
self.services.storage.add_room_member(room_storage_id, self.storage_id).await.unwrap();
self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node }); self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node });
JoinResult::Success(RoomInfo { JoinResult::Success(RoomInfo {
id: room_id, id: room_id,
@ -597,14 +537,14 @@ impl Player {
members: vec![], members: vec![],
}) })
} else { } else {
let room = match self.rooms.get_or_create_room(room_id.clone()).await { let room = match self.services.rooms.get_or_create_room(&self.services, room_id.clone()).await {
Ok(room) => room, Ok(room) => room,
Err(e) => { Err(e) => {
log::error!("Failed to get or create room: {e}"); log::error!("Failed to get or create room: {e}");
todo!(); todo!();
} }
}; };
room.add_member(&self.player_id, self.storage_id).await; room.add_member(&self.services, &self.player_id, self.storage_id).await;
room.subscribe(&self.player_id, self.handle.clone()).await; room.subscribe(&self.player_id, self.handle.clone()).await;
self.my_rooms.insert(room_id.clone(), RoomRef::Local(room.clone())); self.my_rooms.insert(room_id.clone(), RoomRef::Local(room.clone()));
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
@ -624,17 +564,17 @@ impl Player {
match room { match room {
RoomRef::Local(room) => { RoomRef::Local(room) => {
room.unsubscribe(&self.player_id).await; room.unsubscribe(&self.player_id).await;
room.remove_member(&self.player_id, self.storage_id).await; room.remove_member(&self.services, &self.player_id, self.storage_id).await;
} }
RoomRef::Remote { node_id } => { RoomRef::Remote { node_id } => {
let req = LeaveRoomReq { let req = LeaveRoomReq {
room_id: room_id.as_inner(), room_id: room_id.as_inner(),
player_id: self.player_id.as_inner(), player_id: self.player_id.as_inner(),
}; };
self.cluster_client.leave_room(node_id, req).await.unwrap(); self.services.client.leave_room(node_id, req).await.unwrap();
let room_storage_id = let room_storage_id =
self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap(); self.services.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
self.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap(); self.services.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap();
} }
} }
} }
@ -656,10 +596,10 @@ impl Player {
tracing::info!("no room found"); tracing::info!("no room found");
return SendMessageResult::NoSuchRoom; return SendMessageResult::NoSuchRoom;
}; };
let created_at = chrono::Utc::now(); let created_at = Utc::now();
match room { match room {
RoomRef::Local(room) => { RoomRef::Local(room) => {
room.send_message(&self.player_id, body.clone(), created_at.clone()).await; room.send_message(&self.services, &self.player_id, body.clone(), created_at.clone()).await;
} }
RoomRef::Remote { node_id } => { RoomRef::Remote { node_id } => {
let req = SendMessageReq { let req = SendMessageReq {
@ -668,10 +608,9 @@ impl Player {
message: &*body, message: &*body,
created_at: &*created_at.to_rfc3339(), created_at: &*created_at.to_rfc3339(),
}; };
self.cluster_client.send_room_message(*node_id, req).await.unwrap(); self.services.client.send_room_message(*node_id, req).await.unwrap();
self.broadcasting self.services
.broadcast( .broadcast(
&self.players,
room_id.clone(), room_id.clone(),
self.player_id.clone(), self.player_id.clone(),
body.clone(), body.clone(),
@ -698,7 +637,7 @@ impl Player {
}; };
match room { match room {
RoomRef::Local(room) => { RoomRef::Local(room) => {
room.set_topic(&self.player_id, new_topic.clone()).await; room.set_topic(&self.services, &self.player_id, new_topic.clone()).await;
} }
RoomRef::Remote { node_id } => { RoomRef::Remote { node_id } => {
let req = SetRoomTopicReq { let req = SetRoomTopicReq {
@ -706,7 +645,7 @@ impl Player {
player_id: self.player_id.as_inner(), player_id: self.player_id.as_inner(),
topic: &*new_topic, topic: &*new_topic,
}; };
self.cluster_client.set_room_topic(*node_id, req).await.unwrap(); self.services.client.set_room_topic(*node_id, req).await.unwrap();
} }
} }
let update = Updates::RoomTopicChanged { room_id, new_topic }; let update = Updates::RoomTopicChanged { room_id, new_topic };
@ -736,8 +675,11 @@ impl Player {
#[tracing::instrument(skip(self, body), name = "Player::send_dialog_message")] #[tracing::instrument(skip(self, body), name = "Player::send_dialog_message")]
async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) { async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) {
let created_at = chrono::Utc::now(); let created_at = Utc::now();
self.dialogs.send_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at).await.unwrap(); self.services
.send_dialog_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at)
.await
.unwrap();
let update = Updates::NewDialogMessage { let update = Updates::NewDialogMessage {
sender: self.player_id.clone(), sender: self.player_id.clone(),
receiver: recipient.clone(), receiver: recipient.clone(),
@ -749,7 +691,7 @@ impl Player {
#[tracing::instrument(skip(self), name = "Player::check_user_existence")] #[tracing::instrument(skip(self), name = "Player::check_user_existence")]
async fn check_user_existence(&self, recipient: PlayerId) -> GetInfoResult { async fn check_user_existence(&self, recipient: PlayerId) -> GetInfoResult {
if self.storage.check_user_existence(recipient.as_inner().as_ref()).await.unwrap() { if self.services.storage.check_user_existence(recipient.as_inner().as_ref()).await.unwrap() {
GetInfoResult::UserExists GetInfoResult::UserExists
} else { } else {
GetInfoResult::UserDoesntExist GetInfoResult::UserDoesntExist

View File

@ -1,7 +1,6 @@
//! Storage and persistence logic. //! Storage and persistence logic.
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use serde::Deserialize; use serde::Deserialize;
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
@ -20,9 +19,8 @@ pub struct StorageConfig {
pub db_path: String, pub db_path: String,
} }
#[derive(Clone)]
pub struct Storage { pub struct Storage {
conn: Arc<Mutex<SqliteConnection>>, conn: Mutex<SqliteConnection>,
} }
impl Storage { impl Storage {
pub async fn open(config: StorageConfig) -> Result<Storage> { pub async fn open(config: StorageConfig) -> Result<Storage> {
@ -34,17 +32,17 @@ impl Storage {
migrator.run(&mut conn).await?; migrator.run(&mut conn).await?;
log::info!("Migrations passed"); log::info!("Migrations passed");
let conn = Arc::new(Mutex::new(conn)); let conn = Mutex::new(conn);
Ok(Storage { conn }) Ok(Storage { conn })
} }
pub async fn close(self) -> Result<()> { pub async fn close(self) {
let res = match Arc::try_unwrap(self.conn) { let res = self.conn.into_inner();
Ok(e) => e, match res.close().await {
Err(_) => return Err(fail("failed to acquire DB ownership on shutdown")), Ok(_) => {}
}; Err(e) => {
let res = res.into_inner(); tracing::error!("Failed to close the DB connection: {e:?}");
res.close().await?; }
Ok(()) }
} }
} }

View File

@ -30,7 +30,7 @@ impl Storage {
} }
#[tracing::instrument(skip(self, topic), name = "Storage::create_new_room")] #[tracing::instrument(skip(self, topic), name = "Storage::create_new_room")]
pub async fn create_new_room(&mut self, name: &str, topic: &str) -> Result<u32> { pub async fn create_new_room(&self, name: &str, topic: &str) -> Result<u32> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let (id,): (u32,) = sqlx::query_as( let (id,): (u32,) = sqlx::query_as(
"insert into rooms(name, topic) "insert into rooms(name, topic)
@ -47,7 +47,7 @@ impl Storage {
#[tracing::instrument(skip(self, content, created_at), name = "Storage::insert_room_message")] #[tracing::instrument(skip(self, content, created_at), name = "Storage::insert_room_message")]
pub async fn insert_room_message( pub async fn insert_room_message(
&mut self, &self,
room_id: u32, room_id: u32,
id: u32, id: u32,
content: &str, content: &str,
@ -130,7 +130,7 @@ impl Storage {
} }
#[tracing::instrument(skip(self, topic), name = "Storage::set_room_topic")] #[tracing::instrument(skip(self, topic), name = "Storage::set_room_topic")]
pub async fn set_room_topic(&mut self, id: u32, topic: &str) -> Result<()> { pub async fn set_room_topic(&self, id: u32, topic: &str) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
sqlx::query( sqlx::query(
"update rooms "update rooms

View File

@ -9,7 +9,7 @@ use tokio::sync::RwLock as AsyncRwLock;
use crate::player::{PlayerHandle, PlayerId, Updates}; use crate::player::{PlayerHandle, PlayerId, Updates};
use crate::prelude::*; use crate::prelude::*;
use crate::repo::Storage; use crate::Services;
/// Opaque room id /// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
@ -35,41 +35,32 @@ impl RoomId {
} }
/// Shared data structure for storing metadata about rooms. /// Shared data structure for storing metadata about rooms.
#[derive(Clone)] pub(crate) struct RoomRegistry(AsyncRwLock<RoomRegistryInner>);
pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
impl RoomRegistry { impl RoomRegistry {
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> { pub fn new(metrics: &mut MetricRegistry) -> Result<RoomRegistry> {
let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
metrics.register(Box::new(metric_active_rooms.clone()))?; metrics.register(Box::new(metric_active_rooms.clone()))?;
let inner = RoomRegistryInner { let inner = RoomRegistryInner {
rooms: HashMap::new(), rooms: HashMap::new(),
metric_active_rooms, metric_active_rooms,
storage,
}; };
Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner)))) Ok(RoomRegistry(AsyncRwLock::new(inner)))
} }
pub fn shutdown(self) -> Result<()> { pub fn shutdown(self) {
let res = match Arc::try_unwrap(self.0) { // TODO iterate over rooms and stop them
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire rooms ownership on shutdown")),
};
let res = res.into_inner();
// TODO drop all rooms
drop(res);
Ok(())
} }
#[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")] #[tracing::instrument(skip(self, services), name = "RoomRegistry::get_or_create_room")]
pub async fn get_or_create_room(&self, room_id: RoomId) -> Result<RoomHandle> { pub async fn get_or_create_room(&self, services: &Services, room_id: RoomId) -> Result<RoomHandle> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? { if let Some(room_handle) = inner.get_or_load_room(services, &room_id).await? {
Ok(room_handle.clone()) Ok(room_handle.clone())
} else { } else {
log::debug!("Creating room {}...", &room_id.0); log::debug!("Creating room {}...", &room_id.0);
let topic = "New room"; let topic = "New room";
let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?; let id = services.storage.create_new_room(&*room_id.0, &*topic).await?;
let room = Room { let room = Room {
storage_id: id, storage_id: id,
room_id: room_id.clone(), room_id: room_id.clone(),
@ -77,7 +68,6 @@ impl RoomRegistry {
members: HashSet::new(), members: HashSet::new(),
topic: topic.into(), topic: topic.into(),
message_count: 0, message_count: 0,
storage: inner.storage.clone(),
}; };
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone()); inner.rooms.insert(room_id, room_handle.clone());
@ -86,10 +76,10 @@ impl RoomRegistry {
} }
} }
#[tracing::instrument(skip(self), name = "RoomRegistry::get_room")] #[tracing::instrument(skip(self, services), name = "RoomRegistry::get_room")]
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> { pub async fn get_room(&self, services: &Services, room_id: &RoomId) -> Option<RoomHandle> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
inner.get_or_load_room(room_id).await.unwrap() inner.get_or_load_room(services, room_id).await.unwrap()
} }
#[tracing::instrument(skip(self), name = "RoomRegistry::get_all_rooms")] #[tracing::instrument(skip(self), name = "RoomRegistry::get_all_rooms")]
@ -110,16 +100,15 @@ impl RoomRegistry {
struct RoomRegistryInner { struct RoomRegistryInner {
rooms: HashMap<RoomId, RoomHandle>, rooms: HashMap<RoomId, RoomHandle>,
metric_active_rooms: IntGauge, metric_active_rooms: IntGauge,
storage: Storage,
} }
impl RoomRegistryInner { impl RoomRegistryInner {
#[tracing::instrument(skip(self), name = "RoomRegistryInner::get_or_load_room")] #[tracing::instrument(skip(self, services), name = "RoomRegistryInner::get_or_load_room")]
async fn get_or_load_room(&mut self, room_id: &RoomId) -> Result<Option<RoomHandle>> { async fn get_or_load_room(&mut self, services: &Services, room_id: &RoomId) -> Result<Option<RoomHandle>> {
if let Some(room_handle) = self.rooms.get(room_id) { if let Some(room_handle) = self.rooms.get(room_id) {
log::debug!("Room {} was loaded already", &room_id.0); log::debug!("Room {} was loaded already", &room_id.0);
Ok(Some(room_handle.clone())) Ok(Some(room_handle.clone()))
} else if let Some(stored_room) = self.storage.retrieve_room_by_name(&*room_id.0).await? { } else if let Some(stored_room) = services.storage.retrieve_room_by_name(&*room_id.0).await? {
log::debug!("Loading room {}...", &room_id.0); log::debug!("Loading room {}...", &room_id.0);
let room = Room { let room = Room {
storage_id: stored_room.id, storage_id: stored_room.id,
@ -128,7 +117,6 @@ impl RoomRegistryInner {
members: HashSet::new(), // TODO load members from storage members: HashSet::new(), // TODO load members from storage
topic: stored_room.topic.into(), topic: stored_room.topic.into(),
message_count: stored_room.message_count, message_count: stored_room.message_count,
storage: self.storage.clone(),
}; };
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
self.rooms.insert(room_id.clone(), room_handle.clone()); self.rooms.insert(room_id.clone(), room_handle.clone());
@ -152,13 +140,13 @@ impl RoomHandle {
lock.subscriptions.insert(player_id.clone(), player_handle); lock.subscriptions.insert(player_id.clone(), player_handle);
} }
#[tracing::instrument(skip(self), name = "RoomHandle::add_member")] #[tracing::instrument(skip(self, services), name = "RoomHandle::add_member")]
pub async fn add_member(&self, player_id: &PlayerId, player_storage_id: u32) { pub async fn add_member(&self, services: &Services, player_id: &PlayerId, player_storage_id: u32) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
tracing::info!("Adding a new member to a room"); tracing::info!("Adding a new member to a room");
let room_storage_id = lock.storage_id; let room_storage_id = lock.storage_id;
if !lock.storage.is_room_member(room_storage_id, player_storage_id).await.unwrap() { if !services.storage.is_room_member(room_storage_id, player_storage_id).await.unwrap() {
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap(); services.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
} else { } else {
tracing::warn!("User {:#?} has already been added to the room.", player_id); tracing::warn!("User {:#?} has already been added to the room.", player_id);
} }
@ -176,12 +164,12 @@ impl RoomHandle {
lock.subscriptions.remove(player_id); lock.subscriptions.remove(player_id);
} }
#[tracing::instrument(skip(self), name = "RoomHandle::remove_member")] #[tracing::instrument(skip(self, services), name = "RoomHandle::remove_member")]
pub async fn remove_member(&self, player_id: &PlayerId, player_storage_id: u32) { pub async fn remove_member(&self, services: &Services, player_id: &PlayerId, player_storage_id: u32) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
tracing::info!("Removing a member from a room"); tracing::info!("Removing a member from a room");
let room_storage_id = lock.storage_id; let room_storage_id = lock.storage_id;
lock.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap(); services.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap();
lock.members.remove(player_id); lock.members.remove(player_id);
let update = Updates::RoomLeft { let update = Updates::RoomLeft {
room_id: lock.room_id.clone(), room_id: lock.room_id.clone(),
@ -190,10 +178,10 @@ impl RoomHandle {
lock.broadcast_update(update, player_id).await; lock.broadcast_update(update, player_id).await;
} }
#[tracing::instrument(skip(self, body, created_at), name = "RoomHandle::send_message")] #[tracing::instrument(skip(self, services, body, created_at), name = "RoomHandle::send_message")]
pub async fn send_message(&self, player_id: &PlayerId, body: Str, created_at: DateTime<Utc>) { pub async fn send_message(&self, services: &Services, player_id: &PlayerId, body: Str, created_at: DateTime<Utc>) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
let res = lock.send_message(player_id, body, created_at).await; let res = lock.send_message(services, player_id, body, created_at).await;
if let Err(err) = res { if let Err(err) = res {
log::warn!("Failed to send message: {err:?}"); log::warn!("Failed to send message: {err:?}");
} }
@ -209,12 +197,12 @@ impl RoomHandle {
} }
} }
#[tracing::instrument(skip(self, new_topic), name = "RoomHandle::set_topic")] #[tracing::instrument(skip(self, services, new_topic), name = "RoomHandle::set_topic")]
pub async fn set_topic(&self, changer_id: &PlayerId, new_topic: Str) { pub async fn set_topic(&self, services: &Services, changer_id: &PlayerId, new_topic: Str) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
let storage_id = lock.storage_id; let storage_id = lock.storage_id;
lock.topic = new_topic.clone(); lock.topic = new_topic.clone();
lock.storage.set_room_topic(storage_id, &new_topic).await.unwrap(); services.storage.set_room_topic(storage_id, &new_topic).await.unwrap();
let update = Updates::RoomTopicChanged { let update = Updates::RoomTopicChanged {
room_id: lock.room_id.clone(), room_id: lock.room_id.clone(),
new_topic: new_topic.clone(), new_topic: new_topic.clone(),
@ -235,14 +223,20 @@ struct Room {
/// The total number of messages. Used to calculate the id of the new message. /// The total number of messages. Used to calculate the id of the new message.
message_count: u32, message_count: u32,
topic: Str, topic: Str,
storage: Storage,
} }
impl Room { impl Room {
#[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")] #[tracing::instrument(skip(self, services, body, created_at), name = "Room::send_message")]
async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime<Utc>) -> Result<()> { async fn send_message(
&mut self,
services: &Services,
author_id: &PlayerId,
body: Str,
created_at: DateTime<Utc>,
) -> Result<()> {
tracing::info!("Adding a message to room"); tracing::info!("Adding a message to room");
self.storage services
.storage
.insert_room_message( .insert_room_message(
self.storage_id, self.storage_id,
self.message_count, self.message_count,

View File

@ -14,10 +14,10 @@ use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
use lavina_core::auth::{Authenticator, Verdict}; use lavina_core::auth::Verdict;
use lavina_core::player::*; use lavina_core::player::*;
use lavina_core::prelude::*; use lavina_core::prelude::*;
use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; use lavina_core::room::{RoomId, RoomInfo};
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore; use lavina_core::LavinaCore;
use proto_irc::client::CapabilitySubcommand; use proto_irc::client::CapabilitySubcommand;
@ -79,7 +79,7 @@ async fn handle_socket(
match registered_user { match registered_user {
Ok(user) => { Ok(user) => {
log::debug!("User registered"); log::debug!("User registered");
handle_registered_socket(config, core.players, core.rooms, &mut reader, &mut writer, user).await?; handle_registered_socket(config, &core, &mut reader, &mut writer, user).await?;
} }
Err(err) => { Err(err) => {
log::debug!("Registration failed: {err}"); log::debug!("Registration failed: {err}");
@ -215,7 +215,7 @@ impl RegistrationState {
realname, realname,
enabled_capabilities: self.enabled_capabilities, enabled_capabilities: self.enabled_capabilities,
}; };
self.finalize_auth(candidate_user, writer, &core.authenticator, config).await self.finalize_auth(candidate_user, writer, core, config).await
} }
}, },
ClientMessage::Nick { nickname } => { ClientMessage::Nick { nickname } => {
@ -229,7 +229,7 @@ impl RegistrationState {
realname: realname.clone(), realname: realname.clone(),
enabled_capabilities: self.enabled_capabilities, enabled_capabilities: self.enabled_capabilities,
}; };
self.finalize_auth(candidate_user, writer, &core.authenticator, config).await self.finalize_auth(candidate_user, writer, core, config).await
} else { } else {
self.future_nickname = Some(nickname); self.future_nickname = Some(nickname);
Ok(None) Ok(None)
@ -246,7 +246,7 @@ impl RegistrationState {
realname, realname,
enabled_capabilities: self.enabled_capabilities, enabled_capabilities: self.enabled_capabilities,
}; };
self.finalize_auth(candidate_user, writer, &core.authenticator, config).await self.finalize_auth(candidate_user, writer, core, config).await
} else { } else {
self.future_username = Some((username, realname)); self.future_username = Some((username, realname));
Ok(None) Ok(None)
@ -277,7 +277,7 @@ impl RegistrationState {
} }
} else { } else {
let body = AuthBody::from_str(body.as_bytes())?; let body = AuthBody::from_str(body.as_bytes())?;
if let Err(e) = auth_user(&core.authenticator, &body.login, &body.password).await { if let Err(e) = auth_user(core, &body.login, &body.password).await {
tracing::warn!("Authentication failed: {:?}", e); tracing::warn!("Authentication failed: {:?}", e);
let target = self.future_nickname.clone().unwrap_or_else(|| "*".into()); let target = self.future_nickname.clone().unwrap_or_else(|| "*".into());
sasl_fail_message(config.server_name.clone(), target, "Bad credentials".into()) sasl_fail_message(config.server_name.clone(), target, "Bad credentials".into())
@ -325,7 +325,7 @@ impl RegistrationState {
&mut self, &mut self,
candidate_user: RegisteredUser, candidate_user: RegisteredUser,
writer: &mut BufWriter<WriteHalf<'_>>, writer: &mut BufWriter<WriteHalf<'_>>,
authenticator: &Authenticator, core: &LavinaCore,
config: &ServerConfig, config: &ServerConfig,
) -> Result<Option<RegisteredUser>> { ) -> Result<Option<RegisteredUser>> {
if self.enabled_capabilities.contains(Capabilities::Sasl) if self.enabled_capabilities.contains(Capabilities::Sasl)
@ -344,7 +344,7 @@ impl RegistrationState {
writer.flush().await?; writer.flush().await?;
return Ok(None); return Ok(None);
}; };
auth_user(authenticator, &*candidate_user.nickname, &*candidate_password).await?; auth_user(core, &*candidate_user.nickname, &*candidate_password).await?;
Ok(Some(candidate_user)) Ok(Some(candidate_user))
} }
} }
@ -406,8 +406,8 @@ fn sasl_fail_message(sender: Str, nick: Str, text: Str) -> ServerMessage {
} }
} }
async fn auth_user(authenticator: &Authenticator, login: &str, plain_password: &str) -> Result<()> { async fn auth_user(core: &LavinaCore, login: &str, plain_password: &str) -> Result<()> {
let verdict = authenticator.authenticate(login, plain_password).await?; let verdict = core.authenticate(login, plain_password).await?;
// TODO properly map these onto protocol messages // TODO properly map these onto protocol messages
match verdict { match verdict {
Verdict::Authenticated => Ok(()), Verdict::Authenticated => Ok(()),
@ -418,8 +418,7 @@ async fn auth_user(authenticator: &Authenticator, login: &str, plain_password: &
async fn handle_registered_socket<'a>( async fn handle_registered_socket<'a>(
config: ServerConfig, config: ServerConfig,
players: PlayerRegistry, core: &LavinaCore,
rooms: RoomRegistry,
reader: &mut BufReader<ReadHalf<'a>>, reader: &mut BufReader<ReadHalf<'a>>,
writer: &mut BufWriter<WriteHalf<'a>>, writer: &mut BufWriter<WriteHalf<'a>>,
user: RegisteredUser, user: RegisteredUser,
@ -428,7 +427,7 @@ async fn handle_registered_socket<'a>(
log::info!("Handling registered user: {user:?}"); log::info!("Handling registered user: {user:?}");
let player_id = PlayerId::from(user.nickname.clone())?; let player_id = PlayerId::from(user.nickname.clone())?;
let mut connection = players.connect_to_player(&player_id).await; let mut connection = core.connect_to_player(&player_id).await;
let text: Str = format!("Welcome to {} Server", &config.server_name).into(); let text: Str = format!("Welcome to {} Server", &config.server_name).into();
ServerMessage { ServerMessage {
@ -502,7 +501,7 @@ async fn handle_registered_socket<'a>(
len len
}; };
let incoming = std::str::from_utf8(&buffer[0..len-2])?; let incoming = std::str::from_utf8(&buffer[0..len-2])?;
if let HandleResult::Leave = handle_incoming_message(incoming, &config, &user, &rooms, &mut connection, writer).await? { if let HandleResult::Leave = handle_incoming_message(incoming, &config, &user, core, &mut connection, writer).await? {
break; break;
} }
buffer.clear(); buffer.clear();
@ -510,7 +509,7 @@ async fn handle_registered_socket<'a>(
update = connection.receiver.recv() => { update = connection.receiver.recv() => {
match update { match update {
Some(ConnectionMessage::Update(update)) => { Some(ConnectionMessage::Update(update)) => {
handle_update(&config, &user, &player_id, writer, &rooms, update).await?; handle_update(&config, &user, &player_id, writer, core, update).await?;
} }
Some(ConnectionMessage::Stop(_)) => { Some(ConnectionMessage::Stop(_)) => {
tracing::debug!("Connection is being terminated"); tracing::debug!("Connection is being terminated");
@ -561,14 +560,14 @@ async fn handle_update(
user: &RegisteredUser, user: &RegisteredUser,
player_id: &PlayerId, player_id: &PlayerId,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
rooms: &RoomRegistry, core: &LavinaCore,
update: Updates, update: Updates,
) -> Result<()> { ) -> Result<()> {
log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); log::debug!("Sending irc message to player {player_id:?} on update {update:?}");
match update { match update {
Updates::RoomJoined { new_member_id, room_id } => { Updates::RoomJoined { new_member_id, room_id } => {
if player_id == &new_member_id { if player_id == &new_member_id {
if let Some(room) = rooms.get_room(&room_id).await { if let Some(room) = core.get_room(&room_id).await {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
let chan = Chan::Global(room_id.as_inner().clone()); let chan = Chan::Global(room_id.as_inner().clone());
produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?; produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?;
@ -691,7 +690,7 @@ async fn handle_incoming_message(
buffer: &str, buffer: &str,
config: &ServerConfig, config: &ServerConfig,
user: &RegisteredUser, user: &RegisteredUser,
rooms: &RoomRegistry, core: &LavinaCore,
user_handle: &mut PlayerConnection, user_handle: &mut PlayerConnection,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<HandleResult> { ) -> Result<HandleResult> {
@ -775,7 +774,7 @@ async fn handle_incoming_message(
writer.flush().await?; writer.flush().await?;
} }
Recipient::Chan(Chan::Global(chan)) => { Recipient::Chan(Chan::Global(chan)) => {
let room = rooms.get_room(&RoomId::from(chan.clone())?).await; let room = core.get_room(&RoomId::from(chan.clone())?).await;
if let Some(room) = room { if let Some(room) = room {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
for member in room_info.members { for member in room_info.members {

View File

@ -8,7 +8,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use lavina_core::auth::Authenticator;
use lavina_core::clustering::{ClusterConfig, ClusterMetadata}; use lavina_core::clustering::{ClusterConfig, ClusterMetadata};
use lavina_core::player::{JoinResult, PlayerId, SendMessageResult}; use lavina_core::player::{JoinResult, PlayerId, SendMessageResult};
use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::repo::{Storage, StorageConfig};
@ -102,8 +101,6 @@ impl<'a> TestScope<'a> {
} }
struct TestServer { struct TestServer {
metrics: MetricsRegistry,
storage: Storage,
core: LavinaCore, core: LavinaCore,
server: RunningServer, server: RunningServer,
} }
@ -114,7 +111,7 @@ impl TestServer {
listen_on: "127.0.0.1:0".parse().unwrap(), listen_on: "127.0.0.1:0".parse().unwrap(),
server_name: "testserver".into(), server_name: "testserver".into(),
}; };
let metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(StorageConfig { let storage = Storage::open(StorageConfig {
db_path: ":memory:".into(), db_path: ":memory:".into(),
}) })
@ -127,14 +124,9 @@ impl TestServer {
rooms: Default::default(), rooms: Default::default(),
}, },
}; };
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer { core, server })
metrics,
storage,
core,
server,
})
} }
async fn reboot(self) -> Result<TestServer> { async fn reboot(self) -> Result<TestServer> {
@ -150,30 +142,19 @@ impl TestServer {
rooms: Default::default(), rooms: Default::default(),
}, },
}; };
let TestServer { let TestServer { core, server } = self;
metrics: _,
storage,
core,
server,
} = self;
server.terminate().await?; server.terminate().await?;
core.shutdown().await?; let storage = core.shutdown().await;
let metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer { core, server })
metrics,
storage,
core,
server,
})
} }
async fn shutdown(self) -> Result<()> { async fn shutdown(self) {
self.server.terminate().await?; let _ = self.server.terminate().await;
self.core.shutdown().await?; let storage = self.core.shutdown().await;
self.storage.close().await?; let _ = storage.close().await;
Ok(())
} }
} }
@ -183,8 +164,8 @@ async fn scenario_basic() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -202,7 +183,7 @@ async fn scenario_basic() -> Result<()> {
// wrap up // wrap up
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -212,8 +193,8 @@ async fn scenario_join_and_reboot() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -272,7 +253,7 @@ async fn scenario_join_and_reboot() -> Result<()> {
// wrap up // wrap up
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -282,8 +263,8 @@ async fn scenario_force_join_msg() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream1 = TcpStream::connect(server.server.addr).await?; let mut stream1 = TcpStream::connect(server.server.addr).await?;
let mut s1 = TestScope::new(&mut stream1); let mut s1 = TestScope::new(&mut stream1);
@ -338,7 +319,7 @@ async fn scenario_force_join_msg() -> Result<()> {
// wrap up // wrap up
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -348,10 +329,10 @@ async fn scenario_two_users() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester1").await?; server.core.create_player(&PlayerId::from("tester1")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester1", "password").await?; server.core.set_password("tester1", "password").await?;
server.storage.create_user("tester2").await?; server.core.create_player(&PlayerId::from("tester2")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester2", "password").await?; server.core.set_password("tester2", "password").await?;
let mut stream1 = TcpStream::connect(server.server.addr).await?; let mut stream1 = TcpStream::connect(server.server.addr).await?;
let mut s1 = TestScope::new(&mut stream1); let mut s1 = TestScope::new(&mut stream1);
@ -410,7 +391,7 @@ async fn scenario_two_users() -> Result<()> {
stream2.shutdown().await?; stream2.shutdown().await?;
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -424,8 +405,8 @@ async fn scenario_cap_full_negotiation() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -454,7 +435,7 @@ async fn scenario_cap_full_negotiation() -> Result<()> {
// wrap up // wrap up
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -464,8 +445,8 @@ async fn scenario_cap_full_negotiation_nick_last() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -493,7 +474,7 @@ async fn scenario_cap_full_negotiation_nick_last() -> Result<()> {
// wrap up // wrap up
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -503,8 +484,8 @@ async fn scenario_cap_short_negotiation() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -531,7 +512,7 @@ async fn scenario_cap_short_negotiation() -> Result<()> {
// wrap up // wrap up
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -541,8 +522,8 @@ async fn scenario_cap_sasl_fail() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -575,7 +556,7 @@ async fn scenario_cap_sasl_fail() -> Result<()> {
// wrap up // wrap up
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -585,8 +566,8 @@ async fn terminate_socket_scenario() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -598,7 +579,7 @@ async fn terminate_socket_scenario() -> Result<()> {
s.send("AUTHENTICATE PLAIN").await?; s.send("AUTHENTICATE PLAIN").await?;
s.expect(":testserver AUTHENTICATE +").await?; s.expect(":testserver AUTHENTICATE +").await?;
server.shutdown().await?; server.shutdown().await;
assert_eq!(stream.read_u8().await.unwrap_err().kind(), ErrorKind::UnexpectedEof); assert_eq!(stream.read_u8().await.unwrap_err().kind(), ErrorKind::UnexpectedEof);
Ok(()) Ok(())
@ -610,8 +591,8 @@ async fn server_time_capability() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -636,8 +617,8 @@ async fn server_time_capability() -> Result<()> {
s.expect(":testserver 353 tester = #test :tester").await?; s.expect(":testserver 353 tester = #test :tester").await?;
s.expect(":testserver 366 tester #test :End of /NAMES list").await?; s.expect(":testserver 366 tester #test :End of /NAMES list").await?;
server.storage.create_user("some_guy").await?; server.core.create_player(&PlayerId::from("some_guy")?).await?;
let mut conn = server.core.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await; let mut conn = server.core.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
let res = conn.join_room(RoomId::from("test").unwrap()).await?; let res = conn.join_room(RoomId::from("test").unwrap()).await?;
let JoinResult::Success(_) = res else { let JoinResult::Success(_) = res else {
panic!("Failed to join room"); panic!("Failed to join room");
@ -669,7 +650,7 @@ async fn server_time_capability() -> Result<()> {
// wrap up // wrap up
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }
@ -679,10 +660,10 @@ async fn scenario_two_players_dialog() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester1").await?; server.core.create_player(&PlayerId::from("tester1")?).await?;
server.storage.set_password("tester1", "password").await?; server.core.set_password("tester1", "password").await?;
server.storage.create_user("tester2").await?; server.core.create_player(&PlayerId::from("tester2")?).await?;
server.storage.set_password("tester2", "password").await?; server.core.set_password("tester2", "password").await?;
let mut stream1 = TcpStream::connect(server.server.addr).await?; let mut stream1 = TcpStream::connect(server.server.addr).await?;
let mut s1 = TestScope::new(&mut stream1); let mut s1 = TestScope::new(&mut stream1);
@ -733,7 +714,7 @@ async fn scenario_two_players_dialog() -> Result<()> {
stream1.shutdown().await?; stream1.shutdown().await?;
stream2.shutdown().await?; stream2.shutdown().await?;
server.shutdown().await?; server.shutdown().await;
Ok(()) Ok(())
} }

View File

@ -2,7 +2,8 @@
use quick_xml::events::Event; use quick_xml::events::Event;
use lavina_core::room::{RoomId, RoomRegistry}; use lavina_core::room::RoomId;
use lavina_core::LavinaCore;
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Server}; use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Server};
use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType}; use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType};
use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery}; use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery};
@ -73,7 +74,7 @@ impl<'a> XmppConnection<'a> {
} }
} }
IqClientBody::DiscoItem(item) => { IqClientBody::DiscoItem(item) => {
let response = self.disco_items(iq.to.as_ref(), &item, self.rooms).await; let response = self.disco_items(iq.to.as_ref(), &item, self.core).await;
let req = Iq { let req = Iq {
from: iq.to, from: iq.to,
id: iq.id, id: iq.id,
@ -162,7 +163,7 @@ impl<'a> XmppConnection<'a> {
resource: None, resource: None,
}) if server.0 == self.hostname_rooms => { }) if server.0 == self.hostname_rooms => {
let room_id = RoomId::from(room_name.0.clone()).unwrap(); let room_id = RoomId::from(room_name.0.clone()).unwrap();
let Some(_) = self.rooms.get_room(&room_id).await else { let Some(_) = self.core.get_room(&room_id).await else {
// TODO should return item-not-found // TODO should return item-not-found
// example: // example:
// <error type="cancel"> // <error type="cancel">
@ -196,7 +197,7 @@ impl<'a> XmppConnection<'a> {
}) })
} }
async fn disco_items(&self, to: Option<&Jid>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery { async fn disco_items(&self, to: Option<&Jid>, req: &ItemQuery, core: &LavinaCore) -> ItemQuery {
let item = match to { let item = match to {
Some(Jid { Some(Jid {
name: None, name: None,
@ -218,7 +219,7 @@ impl<'a> XmppConnection<'a> {
server, server,
resource: None, resource: None,
}) if server.0 == self.hostname_rooms => { }) if server.0 == self.hostname_rooms => {
let room_list = rooms.get_all_rooms().await; let room_list = core.get_all_rooms().await;
room_list room_list
.into_iter() .into_iter()
.map(|room_info| Item { .map(|room_info| Item {

View File

@ -25,7 +25,6 @@ use tokio_rustls::TlsAcceptor;
use lavina_core::auth::Verdict; use lavina_core::auth::Verdict;
use lavina_core::player::{ConnectionMessage, PlayerConnection, PlayerId, StopReason}; use lavina_core::player::{ConnectionMessage, PlayerConnection, PlayerId, StopReason};
use lavina_core::prelude::*; use lavina_core::prelude::*;
use lavina_core::room::RoomRegistry;
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore; use lavina_core::LavinaCore;
use proto_xmpp::bind::{Name, Resource}; use proto_xmpp::bind::{Name, Resource};
@ -203,14 +202,14 @@ async fn handle_socket(
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &core, &hostname) => { authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &core, &hostname) => {
match authenticated { match authenticated {
Ok(authenticated) => { Ok(authenticated) => {
let mut connection = core.players.connect_to_player(&authenticated.player_id).await; let mut connection = core.connect_to_player(&authenticated.player_id).await;
socket_final( socket_final(
&mut xml_reader, &mut xml_reader,
&mut xml_writer, &mut xml_writer,
&mut reader_buf, &mut reader_buf,
&authenticated, &authenticated,
&mut connection, &mut connection,
&core.rooms, &core,
&hostname, &hostname,
) )
.await?; .await?;
@ -296,7 +295,7 @@ async fn socket_auth(
match AuthBody::from_str(&auth.body) { match AuthBody::from_str(&auth.body) {
Ok(logopass) => { Ok(logopass) => {
let name = &logopass.login; let name = &logopass.login;
let verdict = core.authenticator.authenticate(name, &logopass.password).await?; let verdict = core.authenticate(name, &logopass.password).await?;
match verdict { match verdict {
Verdict::Authenticated => { Verdict::Authenticated => {
proto_xmpp::sasl::Success.write_xml(xml_writer).await?; proto_xmpp::sasl::Success.write_xml(xml_writer).await?;
@ -326,7 +325,7 @@ async fn socket_final(
reader_buf: &mut Vec<u8>, reader_buf: &mut Vec<u8>,
authenticated: &Authenticated, authenticated: &Authenticated,
user_handle: &mut PlayerConnection, user_handle: &mut PlayerConnection,
rooms: &RoomRegistry, core: &LavinaCore,
hostname: &Str, hostname: &Str,
) -> Result<()> { ) -> Result<()> {
// TODO validate the server hostname received in the stream start // TODO validate the server hostname received in the stream start
@ -359,7 +358,7 @@ async fn socket_final(
let mut conn = XmppConnection { let mut conn = XmppConnection {
user: authenticated, user: authenticated,
user_handle, user_handle,
rooms, core,
hostname: hostname.clone(), hostname: hostname.clone(),
hostname_rooms: format!("rooms.{}", hostname).into(), hostname_rooms: format!("rooms.{}", hostname).into(),
}; };
@ -442,7 +441,7 @@ async fn socket_final(
struct XmppConnection<'a> { struct XmppConnection<'a> {
user: &'a Authenticated, user: &'a Authenticated,
user_handle: &'a mut PlayerConnection, user_handle: &'a mut PlayerConnection,
rooms: &'a RoomRegistry, core: &'a LavinaCore,
hostname: Str, hostname: Str,
hostname_rooms: Str, hostname_rooms: Str,
} }

View File

@ -1,8 +1,8 @@
//! Handling of all client2server presence stanzas //! Handling of all client2server presence stanzas
use anyhow::Result;
use quick_xml::events::Event; use quick_xml::events::Event;
use lavina_core::prelude::*;
use lavina_core::room::RoomId; use lavina_core::room::RoomId;
use proto_xmpp::bind::{Jid, Name, Server}; use proto_xmpp::bind::{Jid, Name, Server};
use proto_xmpp::client::Presence; use proto_xmpp::client::Presence;
@ -87,15 +87,16 @@ impl<'a> XmppConnection<'a> {
mod tests { mod tests {
use crate::testkit::{expect_user_authenticated, TestServer}; use crate::testkit::{expect_user_authenticated, TestServer};
use crate::Authenticated; use crate::Authenticated;
use anyhow::Result;
use lavina_core::player::PlayerId; use lavina_core::player::PlayerId;
use proto_xmpp::bind::{Jid, Name, Resource, Server}; use proto_xmpp::bind::{Jid, Name, Resource, Server};
use proto_xmpp::client::Presence; use proto_xmpp::client::Presence;
#[tokio::test] #[tokio::test]
async fn test_muc_joining() { async fn test_muc_joining() -> Result<()> {
let server = TestServer::start().await.unwrap(); let server = TestServer::start().await.unwrap();
server.storage.create_user("tester").await.unwrap(); server.core.create_player(&PlayerId::from("tester")?).await?;
let player_id = PlayerId::from("tester").unwrap(); let player_id = PlayerId::from("tester").unwrap();
let user = Authenticated { let user = Authenticated {
@ -105,7 +106,7 @@ mod tests {
xmpp_muc_name: Resource("tester".into()), xmpp_muc_name: Resource("tester".into()),
}; };
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await; let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
@ -125,15 +126,16 @@ mod tests {
assert_eq!(expected, response); assert_eq!(expected, response);
server.shutdown().await.unwrap(); server.shutdown().await.unwrap();
Ok(())
} }
// Test that joining a room second time after a server restart, // Test that joining a room second time after a server restart,
// i.e. in-memory cache of memberships is cleaned, does not cause any issues. // i.e. in-memory cache of memberships is cleaned, does not cause any issues.
#[tokio::test] #[tokio::test]
async fn test_muc_joining_twice() { async fn test_muc_joining_twice() -> Result<()> {
let server = TestServer::start().await.unwrap(); let server = TestServer::start().await.unwrap();
server.storage.create_user("tester").await.unwrap(); server.core.create_player(&PlayerId::from("tester")?).await?;
let player_id = PlayerId::from("tester").unwrap(); let player_id = PlayerId::from("tester").unwrap();
let user = Authenticated { let user = Authenticated {
@ -143,7 +145,7 @@ mod tests {
xmpp_muc_name: Resource("tester".into()), xmpp_muc_name: Resource("tester".into()),
}; };
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await; let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
@ -165,12 +167,13 @@ mod tests {
drop(conn); drop(conn);
let server = server.reboot().await.unwrap(); let server = server.reboot().await.unwrap();
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await; let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
assert_eq!(expected, response); assert_eq!(expected, response);
server.shutdown().await.unwrap(); server.shutdown().await.unwrap();
Ok(())
} }
} }

View File

@ -8,15 +8,13 @@ use lavina_core::LavinaCore;
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server}; use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server};
pub(crate) struct TestServer { pub(crate) struct TestServer {
pub metrics: MetricsRegistry,
pub storage: Storage,
pub core: LavinaCore, pub core: LavinaCore,
} }
impl TestServer { impl TestServer {
pub async fn start() -> anyhow::Result<TestServer> { pub async fn start() -> anyhow::Result<TestServer> {
let _ = tracing_subscriber::fmt::try_init(); let _ = tracing_subscriber::fmt::try_init();
let metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(StorageConfig { let storage = Storage::open(StorageConfig {
db_path: ":memory:".into(), db_path: ":memory:".into(),
}) })
@ -29,14 +27,14 @@ impl TestServer {
}, },
addresses: vec![], addresses: vec![],
}; };
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
Ok(TestServer { metrics, storage, core }) Ok(TestServer { core })
} }
pub async fn reboot(self) -> anyhow::Result<TestServer> { pub async fn reboot(self) -> anyhow::Result<TestServer> {
self.core.shutdown().await?; let storage = self.core.shutdown().await;
let metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let cluster_config = ClusterConfig { let cluster_config = ClusterConfig {
metadata: ClusterMetadata { metadata: ClusterMetadata {
node_id: 0, node_id: 0,
@ -45,18 +43,14 @@ impl TestServer {
}, },
addresses: vec![], addresses: vec![],
}; };
let core = LavinaCore::new(metrics.clone(), cluster_config, self.storage.clone()).await?; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
Ok(TestServer { Ok(TestServer { core })
metrics,
storage: self.storage.clone(),
core,
})
} }
pub async fn shutdown(self) -> anyhow::Result<()> { pub async fn shutdown(self) -> anyhow::Result<()> {
self.core.shutdown().await?; let storage = self.core.shutdown().await;
self.storage.close().await?; storage.close().await;
Ok(()) Ok(())
} }
} }
@ -69,7 +63,7 @@ pub async fn expect_user_authenticated<'a>(
let conn = XmppConnection { let conn = XmppConnection {
user: &user, user: &user,
user_handle: conn, user_handle: conn,
rooms: &server.core.rooms, core: &server.core,
hostname: "localhost".into(), hostname: "localhost".into(),
hostname_rooms: "rooms.localhost".into(), hostname_rooms: "rooms.localhost".into(),
}; };

View File

@ -18,8 +18,8 @@ use tokio_rustls::rustls::client::ServerCertVerifier;
use tokio_rustls::rustls::{ClientConfig, ServerName}; use tokio_rustls::rustls::{ClientConfig, ServerName};
use tokio_rustls::TlsConnector; use tokio_rustls::TlsConnector;
use lavina_core::auth::Authenticator;
use lavina_core::clustering::{ClusterConfig, ClusterMetadata}; use lavina_core::clustering::{ClusterConfig, ClusterMetadata};
use lavina_core::player::PlayerId;
use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::LavinaCore; use lavina_core::LavinaCore;
use projection_xmpp::{launch, RunningServer, ServerConfig}; use projection_xmpp::{launch, RunningServer, ServerConfig};
@ -141,8 +141,6 @@ impl ServerCertVerifier for IgnoreCertVerification {
} }
struct TestServer { struct TestServer {
metrics: MetricsRegistry,
storage: Storage,
core: LavinaCore, core: LavinaCore,
server: RunningServer, server: RunningServer,
} }
@ -156,7 +154,7 @@ impl TestServer {
key: "tests/certs/xmpp.key".parse().unwrap(), key: "tests/certs/xmpp.key".parse().unwrap(),
hostname: "localhost".into(), hostname: "localhost".into(),
}; };
let metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(StorageConfig { let storage = Storage::open(StorageConfig {
db_path: ":memory:".into(), db_path: ":memory:".into(),
}) })
@ -169,20 +167,15 @@ impl TestServer {
rooms: Default::default(), rooms: Default::default(),
}, },
}; };
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer { core, server })
metrics,
storage,
core,
server,
})
} }
async fn shutdown(self) -> Result<()> { async fn shutdown(self) -> Result<()> {
self.server.terminate().await?; self.server.terminate().await?;
self.core.shutdown().await?; let storage = self.core.shutdown().await;
self.storage.close().await?; storage.close().await;
Ok(()) Ok(())
} }
} }
@ -193,8 +186,8 @@ async fn scenario_basic() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -261,8 +254,8 @@ async fn scenario_wrong_password() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -316,8 +309,8 @@ async fn scenario_basic_without_headers() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -362,8 +355,8 @@ async fn terminate_socket() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);
@ -402,8 +395,8 @@ async fn test_message_archive_request() -> Result<()> {
// test scenario // test scenario
server.storage.create_user("tester").await?; server.core.create_player(&PlayerId::from("tester")?).await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?; server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?; let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream); let mut s = TestScope::new(&mut stream);

View File

@ -13,10 +13,9 @@ use serde::{Deserialize, Serialize};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use lavina_core::auth::UpdatePasswordResult; use lavina_core::auth::UpdatePasswordResult;
use lavina_core::player::{PlayerId, PlayerRegistry, SendMessageResult}; use lavina_core::player::{PlayerId, SendMessageResult};
use lavina_core::prelude::*; use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::room::RoomId;
use lavina_core::room::{RoomId, RoomRegistry};
use lavina_core::terminator::Terminator; use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore; use lavina_core::LavinaCore;
use mgmt_api::*; use mgmt_api::*;
@ -30,16 +29,11 @@ pub struct ServerConfig {
pub listen_on: SocketAddr, pub listen_on: SocketAddr,
} }
pub async fn launch( pub async fn launch(config: ServerConfig, metrics: MetricsRegistry, core: LavinaCore) -> Result<Terminator> {
config: ServerConfig,
metrics: MetricsRegistry,
core: LavinaCore,
storage: Storage,
) -> Result<Terminator> {
log::info!("Starting the http service"); log::info!("Starting the http service");
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started"); log::debug!("Listener started");
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, storage, rx.map(|_| ()))); let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, rx.map(|_| ())));
Ok(terminator) Ok(terminator)
} }
@ -47,7 +41,6 @@ async fn main_loop(
listener: TcpListener, listener: TcpListener,
metrics: MetricsRegistry, metrics: MetricsRegistry,
core: LavinaCore, core: LavinaCore,
storage: Storage,
termination: impl Future<Output = ()>, termination: impl Future<Output = ()>,
) -> Result<()> { ) -> Result<()> {
pin!(termination); pin!(termination);
@ -60,9 +53,8 @@ async fn main_loop(
let stream = TokioIo::new(stream); let stream = TokioIo::new(stream);
let metrics = metrics.clone(); let metrics = metrics.clone();
let core = core.clone(); let core = core.clone();
let storage = storage.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
let svc_fn = service_fn(|r| route(&metrics, &core, &storage, r)); let svc_fn = service_fn(|r| route(&metrics, &core, r));
let server = http1::Builder::new().serve_connection(stream, svc_fn); let server = http1::Builder::new().serve_connection(stream, svc_fn);
if let Err(err) = server.await { if let Err(err) = server.await {
tracing::error!("Error serving connection: {:?}", err); tracing::error!("Error serving connection: {:?}", err);
@ -79,19 +71,18 @@ async fn main_loop(
async fn route( async fn route(
registry: &MetricsRegistry, registry: &MetricsRegistry,
core: &LavinaCore, core: &LavinaCore,
storage: &Storage,
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
) -> HttpResult<Response<Full<Bytes>>> { ) -> HttpResult<Response<Full<Bytes>>> {
propagade_span_from_headers(&request); propagade_span_from_headers(&request);
let res = match (request.method(), request.uri().path()) { let res = match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => endpoint_metrics(registry), (&Method::GET, "/metrics") => endpoint_metrics(registry),
(&Method::GET, "/rooms") => endpoint_rooms(&core.rooms).await, (&Method::GET, "/rooms") => endpoint_rooms(core).await,
(&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(), (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, core).await.or5xx(),
(&Method::POST, paths::STOP_PLAYER) => endpoint_stop_player(request, &core.players).await.or5xx(), (&Method::POST, paths::STOP_PLAYER) => endpoint_stop_player(request, core).await.or5xx(),
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, core).await.or5xx(), (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, core).await.or5xx(),
(&Method::POST, rooms::paths::SEND_MESSAGE) => endpoint_send_room_message(request, core).await.or5xx(), (&Method::POST, rooms::paths::SEND_MESSAGE) => endpoint_send_room_message(request, core).await.or5xx(),
(&Method::POST, rooms::paths::SET_TOPIC) => endpoint_set_room_topic(request, core).await.or5xx(), (&Method::POST, rooms::paths::SET_TOPIC) => endpoint_set_room_topic(request, core).await.or5xx(),
_ => clustering::route(core, storage, request).await.unwrap_or_else(endpoint_not_found), _ => clustering::route(core, request).await.unwrap_or_else(endpoint_not_found),
}; };
Ok(res) Ok(res)
} }
@ -104,23 +95,23 @@ fn endpoint_metrics(registry: &MetricsRegistry) -> Response<Full<Bytes>> {
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn endpoint_rooms(rooms: &RoomRegistry) -> Response<Full<Bytes>> { async fn endpoint_rooms(core: &LavinaCore) -> Response<Full<Bytes>> {
// TODO introduce management API types independent from core-domain types // TODO introduce management API types independent from core-domain types
// TODO remove `Serialize` implementations from all core-domain types // TODO remove `Serialize` implementations from all core-domain types
let room_list = rooms.get_all_rooms().await.to_body(); let room_list = core.get_all_rooms().await.to_body();
Response::new(room_list) Response::new(room_list)
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn endpoint_create_player( async fn endpoint_create_player(
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
storage: &Storage, core: &LavinaCore,
) -> Result<Response<Full<Bytes>>> { ) -> Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes(); let str = request.collect().await?.to_bytes();
let Ok(res) = serde_json::from_slice::<CreatePlayerRequest>(&str[..]) else { let Ok(res) = serde_json::from_slice::<CreatePlayerRequest>(&str[..]) else {
return Ok(malformed_request()); return Ok(malformed_request());
}; };
storage.create_user(&res.name).await?; core.create_player(&PlayerId::from(res.name)?).await?;
log::info!("Player {} created", res.name); log::info!("Player {} created", res.name);
let mut response = Response::new(Full::<Bytes>::default()); let mut response = Response::new(Full::<Bytes>::default());
*response.status_mut() = StatusCode::CREATED; *response.status_mut() = StatusCode::CREATED;
@ -130,7 +121,7 @@ async fn endpoint_create_player(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn endpoint_stop_player( async fn endpoint_stop_player(
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
players: &PlayerRegistry, core: &LavinaCore,
) -> Result<Response<Full<Bytes>>> { ) -> Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes(); let str = request.collect().await?.to_bytes();
let Ok(res) = serde_json::from_slice::<StopPlayerRequest>(&str[..]) else { let Ok(res) = serde_json::from_slice::<StopPlayerRequest>(&str[..]) else {
@ -139,7 +130,7 @@ async fn endpoint_stop_player(
let Ok(player_id) = PlayerId::from(res.name) else { let Ok(player_id) = PlayerId::from(res.name) else {
return Ok(player_not_found()); return Ok(player_not_found());
}; };
let Some(()) = players.stop_player(&player_id).await? else { let Some(()) = core.stop_player(&player_id).await? else {
return Ok(player_not_found()); return Ok(player_not_found());
}; };
Ok(empty_204_request()) Ok(empty_204_request())
@ -154,7 +145,7 @@ async fn endpoint_set_password(
let Ok(res) = serde_json::from_slice::<ChangePasswordRequest>(&str[..]) else { let Ok(res) = serde_json::from_slice::<ChangePasswordRequest>(&str[..]) else {
return Ok(malformed_request()); return Ok(malformed_request());
}; };
let verdict = core.authenticator.set_password(&res.player_name, &res.password).await?; let verdict = core.set_password(&res.player_name, &res.password).await?;
match verdict { match verdict {
UpdatePasswordResult::PasswordUpdated => {} UpdatePasswordResult::PasswordUpdated => {}
UpdatePasswordResult::UserNotFound => { UpdatePasswordResult::UserNotFound => {
@ -179,7 +170,7 @@ async fn endpoint_send_room_message(
let Ok(player_id) = PlayerId::from(req.author_id) else { let Ok(player_id) = PlayerId::from(req.author_id) else {
return Ok(player_not_found()); return Ok(player_not_found());
}; };
let mut player = core.players.connect_to_player(&player_id).await; let mut player = core.connect_to_player(&player_id).await;
let res = player.send_message(room_id, req.message.into()).await?; let res = player.send_message(room_id, req.message.into()).await?;
match res { match res {
SendMessageResult::NoSuchRoom => Ok(room_not_found()), SendMessageResult::NoSuchRoom => Ok(room_not_found()),
@ -202,7 +193,7 @@ async fn endpoint_set_room_topic(
let Ok(player_id) = PlayerId::from(req.author_id) else { let Ok(player_id) = PlayerId::from(req.author_id) else {
return Ok(player_not_found()); return Ok(player_not_found());
}; };
let mut player = core.players.connect_to_player(&player_id).await; let mut player = core.connect_to_player(&player_id).await;
player.change_topic(room_id, req.topic.into()).await?; player.change_topic(room_id, req.topic.into()).await?;
Ok(empty_204_request()) Ok(empty_204_request())
} }

View File

@ -6,19 +6,14 @@ use super::Or5xx;
use crate::http::{empty_204_request, malformed_request, player_not_found, room_not_found}; use crate::http::{empty_204_request, malformed_request, player_not_found, room_not_found};
use lavina_core::clustering::room::{paths, JoinRoomReq, SendMessageReq}; use lavina_core::clustering::room::{paths, JoinRoomReq, SendMessageReq};
use lavina_core::player::PlayerId; use lavina_core::player::PlayerId;
use lavina_core::repo::Storage;
use lavina_core::room::RoomId; use lavina_core::room::RoomId;
use lavina_core::LavinaCore; use lavina_core::LavinaCore;
// TODO move this into core // TODO move this into core
pub async fn route( pub async fn route(core: &LavinaCore, request: Request<hyper::body::Incoming>) -> Option<Response<Full<Bytes>>> {
core: &LavinaCore,
storage: &Storage,
request: Request<hyper::body::Incoming>,
) -> Option<Response<Full<Bytes>>> {
match (request.method(), request.uri().path()) { match (request.method(), request.uri().path()) {
(&Method::POST, paths::JOIN) => Some(endpoint_cluster_join_room(request, core, storage).await.or5xx()), (&Method::POST, paths::JOIN) => Some(endpoint_cluster_join_room(request, core).await.or5xx()),
(&Method::POST, paths::ADD_MESSAGE) => Some(endpoint_cluster_add_message(request, core).await.or5xx()), (&Method::POST, paths::ADD_MESSAGE) => Some(endpoint_cluster_add_message(request, core).await.or5xx()),
_ => None, _ => None,
} }
@ -28,7 +23,6 @@ pub async fn route(
async fn endpoint_cluster_join_room( async fn endpoint_cluster_join_room(
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
core: &LavinaCore, core: &LavinaCore,
storage: &Storage,
) -> lavina_core::prelude::Result<Response<Full<Bytes>>> { ) -> lavina_core::prelude::Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes(); let str = request.collect().await?.to_bytes();
let Ok(req) = serde_json::from_slice::<JoinRoomReq>(&str[..]) else { let Ok(req) = serde_json::from_slice::<JoinRoomReq>(&str[..]) else {
@ -43,9 +37,7 @@ async fn endpoint_cluster_join_room(
dbg!(&req.player_id); dbg!(&req.player_id);
return Ok(player_not_found()); return Ok(player_not_found());
}; };
let room_handle = core.rooms.get_or_create_room(room_id).await.unwrap(); core.cluster_join_room(room_id, &player_id).await?;
let storage_id = storage.create_or_retrieve_user_id_by_name(req.player_id).await?;
room_handle.add_member(&player_id, storage_id).await;
Ok(empty_204_request()) Ok(empty_204_request())
} }
@ -71,10 +63,10 @@ async fn endpoint_cluster_add_message(
dbg!(&req.player_id); dbg!(&req.player_id);
return Ok(player_not_found()); return Ok(player_not_found());
}; };
let Some(room_handle) = core.rooms.get_room(&room_id).await else { let res = core.cluster_send_room_message(room_id, &player_id, req.message.into(), created_at.to_utc()).await?;
dbg!(&room_id); if let Some(_) = res {
return Ok(room_not_found()); Ok(empty_204_request())
}; } else {
room_handle.send_message(&player_id, req.message.into(), created_at.to_utc()).await; Ok(room_not_found())
Ok(empty_204_request()) }
} }

View File

@ -69,10 +69,10 @@ async fn main() -> Result<()> {
cluster: cluster_config, cluster: cluster_config,
tracing: _, tracing: _,
} = config; } = config;
let metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone(), storage.clone()).await?; let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone()).await?;
let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone()).await?; let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone()).await?;
let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone()).await?; let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone()).await?;
tracing::info!("Started"); tracing::info!("Started");
@ -83,7 +83,8 @@ async fn main() -> Result<()> {
xmpp.terminate().await?; xmpp.terminate().await?;
irc.terminate().await?; irc.terminate().await?;
telemetry_terminator.terminate().await?; telemetry_terminator.terminate().await?;
core.shutdown().await?; let storage = core.shutdown().await;
storage.close().await;
tracing::info!("Shutdown complete"); tracing::info!("Shutdown complete");
Ok(()) Ok(())
} }