forked from lavina/lavina
1
0
Fork 0

Compare commits

..

3 Commits

Author SHA1 Message Date
Nikita Vilunov b2a06ef984 xmpp: add x-user element to muc presence response 2024-05-14 03:21:06 +02:00
Nikita Vilunov 89918d9de1 xmpp: add item-not-found error condition to room disco#info iq 2024-05-13 16:42:52 +02:00
Nikita Vilunov 26720a2a08 core: separate the model from the logic implementation (#66)
This separates the core in two layers – the model objects and the `LavinaCore` service. Service is responsible for implementing the application logic and exposing it as core's public API to projections, while the model objects will be independent of each other and responsible only for managing and owning in-memory data.

The model objects include:
1. `Storage` – the open connection to the SQLite DB.
2. `PlayerRegistry` – creates, stores refs to, and stops player actors.
3. `RoomRegistry` – manages active rooms.
4. `DialogRegistry` – manages active dialogs.
5. `Broadcasting` – manages subscriptions of players to rooms on remote cluster nodes.
6. `LavinaClient` – manages HTTP connections to remote cluster nodes.
7. `ClusterMetadata` – read-only configuration of the cluster metadata, i.e. allocation of entities to nodes.

As a result:
1. Model objects will be fully independent of each other, e.g. it's no longer necessary to provide a `Storage` to all registries, or to provide `PlayerRegistry` and `DialogRegistry` to each other.
2. Model objects will no longer be `Arc`-wrapped; instead the whole `Services` object will be `Arc`ed and provided to projections.
3. The public API of `lavina-core` will be properly delimited by the APIs of `LavinaCore`, `PlayerConnection` and so on.
4. `LavinaCore` and `PlayerConnection` will also contain APIs of all features, unlike it was previously with `RoomRegistry` and `DialogRegistry`. This is unfortunate, but it could be improved in future.

Reviewed-on: lavina/lavina#66
2024-05-13 14:32:45 +00:00
22 changed files with 444 additions and 498 deletions

View File

@ -3,8 +3,7 @@ use argon2::password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, Salt
use argon2::Argon2;
use rand_core::OsRng;
use crate::prelude::log;
use crate::repo::Storage;
use crate::LavinaCore;
pub enum Verdict {
Authenticated,
@ -17,18 +16,10 @@ pub enum UpdatePasswordResult {
UserNotFound,
}
#[derive(Clone)]
pub struct Authenticator {
storage: Storage,
}
impl Authenticator {
pub fn new(storage: Storage) -> Self {
Self { storage }
}
#[tracing::instrument(skip(self, provided_password), name = "Authenticator::authenticate")]
impl LavinaCore {
#[tracing::instrument(skip(self, provided_password), name = "Services::authenticate")]
pub async fn authenticate(&self, login: &str, provided_password: &str) -> Result<Verdict> {
let Some(stored_user) = self.storage.retrieve_user_by_name(login).await? else {
let Some(stored_user) = self.services.storage.retrieve_user_by_name(login).await? else {
return Ok(Verdict::UserNotFound);
};
if let Some(argon2_hash) = stored_user.argon2_hash {
@ -48,9 +39,9 @@ impl Authenticator {
Ok(Verdict::InvalidPassword)
}
#[tracing::instrument(skip(self, provided_password), name = "Authenticator::set_password")]
#[tracing::instrument(skip(self, provided_password), name = "Services::set_password")]
pub async fn set_password(&self, login: &str, provided_password: &str) -> Result<UpdatePasswordResult> {
let Some(u) = self.storage.retrieve_user_by_name(login).await? else {
let Some(u) = self.services.storage.retrieve_user_by_name(login).await? else {
return Ok(UpdatePasswordResult::UserNotFound);
};
@ -60,8 +51,8 @@ impl Authenticator {
.hash_password(provided_password.as_bytes(), &salt)
.map_err(|e| anyhow!("Failed to hash password: {e:?}"))?;
self.storage.set_argon2_challenge(u.id, password_hash.to_string().as_str()).await?;
log::info!("Password changed for player {login}");
self.services.storage.set_argon2_challenge(u.id, password_hash.to_string().as_str()).await?;
tracing::info!("Password changed for player {login}");
Ok(UpdatePasswordResult::PasswordUpdated)
}
}

View File

@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use anyhow::{anyhow, Result};
use reqwest::Client;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::{DefaultSpanBackend, TracingMiddleware};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
pub mod broadcast;
pub mod room;
@ -27,19 +27,15 @@ pub struct ClusterMetadata {
pub rooms: HashMap<String, u32>,
}
#[derive(Clone)]
pub struct LavinaClient {
addresses: Arc<Addresses>,
addresses: Addresses,
client: ClientWithMiddleware,
}
impl LavinaClient {
pub fn new(addresses: Addresses) -> Self {
let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build();
Self {
addresses: Arc::new(addresses),
client,
}
Self { addresses, client }
}
async fn send_request(&self, node_id: u32, path: &str, req: impl Serialize) -> Result<()> {

View File

@ -1,41 +1,32 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::{DateTime, Utc};
use tokio::sync::Mutex;
use crate::player::{PlayerId, PlayerRegistry, Updates};
use crate::player::{PlayerId, Updates};
use crate::prelude::Str;
use crate::room::RoomId;
use crate::Services;
/// Receives updates from other nodes and broadcasts them to local player actors.
struct BroadcastingInner {
subscriptions: HashMap<RoomId, HashSet<PlayerId>>,
}
impl Broadcasting {}
#[derive(Clone)]
pub struct Broadcasting(Arc<Mutex<BroadcastingInner>>);
pub struct Broadcasting(Mutex<BroadcastingInner>);
impl Broadcasting {
pub fn new() -> Self {
let inner = BroadcastingInner {
subscriptions: HashMap::new(),
};
Self(Arc::new(Mutex::new(inner)))
Self(Mutex::new(inner))
}
}
impl Services {
/// Broadcasts the given update to subscribed player actors on local node.
#[tracing::instrument(skip(self, players, message, created_at), name = "Broadcasting::broadcast")]
pub async fn broadcast(
&self,
players: &PlayerRegistry,
room_id: RoomId,
author_id: PlayerId,
message: Str,
created_at: DateTime<Utc>,
) {
let inner = self.0.lock().await;
#[tracing::instrument(skip(self, message, created_at))]
pub async fn broadcast(&self, room_id: RoomId, author_id: PlayerId, message: Str, created_at: DateTime<Utc>) {
let inner = self.broadcasting.0.lock().await;
let Some(subscribers) = inner.subscriptions.get(&room_id) else {
return;
};
@ -49,7 +40,7 @@ impl Broadcasting {
if i == &author_id {
continue;
}
let Some(player) = players.get_player(i).await else {
let Some(player) = self.players.get_player(i).await else {
continue;
};
player.update(update.clone()).await;
@ -57,6 +48,6 @@ impl Broadcasting {
}
pub async fn subscribe(&self, subscriber: PlayerId, room_id: RoomId) {
self.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber);
self.broadcasting.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber);
}
}

View File

@ -1,6 +1,11 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use crate::clustering::LavinaClient;
use crate::player::PlayerId;
use crate::prelude::Str;
use crate::room::RoomId;
use crate::LavinaCore;
pub mod paths {
pub const JOIN: &'static str = "/cluster/rooms/join";
@ -38,22 +43,46 @@ pub struct SetRoomTopicReq<'a> {
impl LavinaClient {
#[tracing::instrument(skip(self, req), name = "LavinaClient::join_room")]
pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> anyhow::Result<()> {
pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> Result<()> {
self.send_request(node_id, paths::JOIN, req).await
}
#[tracing::instrument(skip(self, req), name = "LavinaClient::leave_room")]
pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> anyhow::Result<()> {
pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> Result<()> {
self.send_request(node_id, paths::LEAVE, req).await
}
#[tracing::instrument(skip(self, req), name = "LavinaClient::send_room_message")]
pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> anyhow::Result<()> {
pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> Result<()> {
self.send_request(node_id, paths::ADD_MESSAGE, req).await
}
#[tracing::instrument(skip(self, req), name = "LavinaClient::set_room_topic")]
pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> anyhow::Result<()> {
pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> Result<()> {
self.send_request(node_id, paths::SET_TOPIC, req).await
}
}
impl LavinaCore {
pub async fn cluster_join_room(&self, room_id: RoomId, player_id: &PlayerId) -> Result<()> {
let room_handle = self.services.rooms.get_or_create_room(&self.services, room_id).await?;
let storage_id =
self.services.storage.create_or_retrieve_user_id_by_name(player_id.as_inner().as_ref()).await?;
room_handle.add_member(&self.services, &player_id, storage_id).await;
Ok(())
}
pub async fn cluster_send_room_message(
&self,
room_id: RoomId,
player_id: &PlayerId,
message: Str,
created_at: chrono::DateTime<chrono::Utc>,
) -> Result<Option<()>> {
let Some(room_handle) = self.services.rooms.get_room(&self.services, &room_id).await else {
return Ok(None);
};
room_handle.send_message(&self.services, &player_id, message, created_at).await;
Ok(Some(()))
}
}

View File

@ -4,14 +4,13 @@
//! There are no admins or other roles in dialogs, both participants have equal rights.
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use tokio::sync::RwLock as AsyncRwLock;
use crate::player::{PlayerId, PlayerRegistry, Updates};
use crate::player::{PlayerId, Updates};
use crate::prelude::*;
use crate::repo::Storage;
use crate::Services;
/// Id of a conversation between two players.
///
@ -45,55 +44,50 @@ struct Dialog {
struct DialogRegistryInner {
dialogs: HashMap<DialogId, AsyncRwLock<Dialog>>,
players: Option<PlayerRegistry>,
storage: Storage,
}
#[derive(Clone)]
pub struct DialogRegistry(Arc<AsyncRwLock<DialogRegistryInner>>);
pub(crate) struct DialogRegistry(AsyncRwLock<DialogRegistryInner>);
impl DialogRegistry {
pub async fn send_message(
impl Services {
#[tracing::instrument(skip(self, body, created_at))]
pub async fn send_dialog_message(
&self,
from: PlayerId,
to: PlayerId,
body: Str,
created_at: &DateTime<Utc>,
) -> Result<()> {
let mut guard = self.0.read().await;
let guard = self.dialogs.0.read().await;
let id = DialogId::new(from.clone(), to.clone());
let dialog = guard.dialogs.get(&id);
if let Some(d) = dialog {
let mut d = d.write().await;
guard
.storage
self.storage
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
.await?;
d.message_count += 1;
} else {
drop(guard);
let mut guard2 = self.0.write().await;
let mut guard2 = self.dialogs.0.write().await;
// double check in case concurrent access has loaded this dialog
if let Some(d) = guard2.dialogs.get(&id) {
let mut d = d.write().await;
guard2
.storage
self.storage
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
.await?;
d.message_count += 1;
} else {
let (p1, p2) = id.as_inner();
tracing::info!("Dialog {id:?} not found locally, trying to load from storage");
let stored_dialog = match guard2.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? {
let stored_dialog = match self.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? {
Some(t) => t,
None => {
tracing::info!("Dialog {id:?} does not exist, creating a new one in storage");
guard2.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await?
self.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await?
}
};
tracing::info!("Dialog {id:?} loaded");
guard2
.storage
self.storage
.insert_dialog_message(
stored_dialog.id,
stored_dialog.message_count,
@ -110,14 +104,10 @@ impl DialogRegistry {
};
guard2.dialogs.insert(id.clone(), AsyncRwLock::new(dialog));
}
guard = guard2.downgrade();
drop(guard2);
}
// TODO send message to the other player and persist it
let Some(players) = &guard.players else {
tracing::error!("No player registry present");
return Ok(());
};
let Some(player) = players.get_player(&to).await else {
let Some(player) = self.players.get_player(&to).await else {
tracing::debug!("Player {to:?} not active, not sending message");
return Ok(());
};
@ -133,33 +123,13 @@ impl DialogRegistry {
}
impl DialogRegistry {
pub fn new(storage: Storage) -> DialogRegistry {
DialogRegistry(Arc::new(AsyncRwLock::new(DialogRegistryInner {
pub fn new() -> DialogRegistry {
DialogRegistry(AsyncRwLock::new(DialogRegistryInner {
dialogs: HashMap::new(),
players: None,
storage,
})))
}))
}
pub async fn set_players(&self, players: PlayerRegistry) {
let mut guard = self.0.write().await;
guard.players = Some(players);
}
pub async fn unset_players(&self) {
let mut guard = self.0.write().await;
guard.players = None;
}
pub fn shutdown(self) -> Result<()> {
let res = match Arc::try_unwrap(self.0) {
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire dialogs ownership on shutdown")),
};
let res = res.into_inner();
drop(res);
Ok(())
}
pub fn shutdown(self) {}
}
#[cfg(test)]

View File

@ -1,16 +1,16 @@
//! Domain definitions and implementation of common chat logic.
use std::ops::Deref;
use std::sync::Arc;
use anyhow::Result;
use prometheus::Registry as MetricsRegistry;
use crate::auth::Authenticator;
use crate::clustering::broadcast::Broadcasting;
use crate::clustering::{ClusterConfig, LavinaClient};
use crate::clustering::{ClusterConfig, ClusterMetadata, LavinaClient};
use crate::dialog::DialogRegistry;
use crate::player::PlayerRegistry;
use crate::player::{PlayerConnection, PlayerId, PlayerRegistry};
use crate::repo::Storage;
use crate::room::RoomRegistry;
use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry};
pub mod auth;
pub mod clustering;
@ -25,50 +25,88 @@ mod table;
#[derive(Clone)]
pub struct LavinaCore {
pub players: PlayerRegistry,
pub rooms: RoomRegistry,
pub dialogs: DialogRegistry,
pub broadcasting: Broadcasting,
pub authenticator: Authenticator,
services: Arc<Services>,
}
impl Deref for LavinaCore {
type Target = Services;
fn deref(&self) -> &Self::Target {
&self.services
}
}
impl LavinaCore {
pub async fn connect_to_player(&self, player_id: &PlayerId) -> PlayerConnection {
self.services.players.connect_to_player(&self, player_id).await
}
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
self.services.rooms.get_room(&self.services, room_id).await
}
pub async fn create_player(&self, player_id: &PlayerId) -> Result<()> {
self.services.storage.create_user(player_id.as_inner()).await
}
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
self.services.rooms.get_all_rooms().await
}
pub async fn stop_player(&self, player_id: &PlayerId) -> Result<Option<()>> {
self.services.players.stop_player(player_id).await
}
}
pub struct Services {
pub(crate) players: PlayerRegistry,
pub(crate) rooms: RoomRegistry,
pub(crate) dialogs: DialogRegistry,
pub(crate) broadcasting: Broadcasting,
pub(crate) client: LavinaClient,
pub(crate) storage: Storage,
pub(crate) cluster_metadata: ClusterMetadata,
}
impl LavinaCore {
pub async fn new(
mut metrics: MetricsRegistry,
metrics: &mut MetricsRegistry,
cluster_config: ClusterConfig,
storage: Storage,
) -> Result<LavinaCore> {
// TODO shutdown all services in reverse order on error
let broadcasting = Broadcasting::new();
let client = LavinaClient::new(cluster_config.addresses.clone());
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
let dialogs = DialogRegistry::new(storage.clone());
let players = PlayerRegistry::empty(
rooms.clone(),
dialogs.clone(),
storage.clone(),
&mut metrics,
Arc::new(cluster_config.metadata),
client,
broadcasting.clone(),
)?;
dialogs.set_players(players.clone()).await;
let authenticator = Authenticator::new(storage.clone());
Ok(LavinaCore {
let rooms = RoomRegistry::new(metrics)?;
let dialogs = DialogRegistry::new();
let players = PlayerRegistry::empty(metrics)?;
let services = Services {
players,
rooms,
dialogs,
broadcasting,
authenticator,
client,
storage,
cluster_metadata: cluster_config.metadata,
};
Ok(LavinaCore {
services: Arc::new(services),
})
}
pub async fn shutdown(mut self) -> Result<()> {
self.players.shutdown_all().await?;
self.dialogs.unset_players().await;
self.players.shutdown()?;
self.dialogs.shutdown()?;
self.rooms.shutdown()?;
Ok(())
pub async fn shutdown(self) -> Storage {
let _ = self.players.shutdown_all().await;
let services = match Arc::try_unwrap(self.services) {
Ok(e) => e,
Err(_) => {
panic!("failed to acquire services ownership on shutdown");
}
};
let _ = services.players.shutdown();
let _ = services.dialogs.shutdown();
let _ = services.rooms.shutdown();
services.storage
}
}

View File

@ -8,7 +8,6 @@
//! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor,
//! so that they don't overload the room actor.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::{DateTime, Utc};
use prometheus::{IntGauge, Registry as MetricsRegistry};
@ -17,14 +16,11 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tracing::{Instrument, Span};
use crate::clustering::broadcast::Broadcasting;
use crate::clustering::room::*;
use crate::clustering::{ClusterMetadata, LavinaClient};
use crate::dialog::DialogRegistry;
use crate::prelude::*;
use crate::repo::Storage;
use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry};
use crate::room::{RoomHandle, RoomId, RoomInfo};
use crate::table::{AnonTable, Key as AnonKey};
use crate::LavinaCore;
/// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
@ -266,41 +262,21 @@ pub enum Updates {
}
/// Handle to a player registry — a shared data structure containing information about players.
#[derive(Clone)]
pub struct PlayerRegistry(Arc<RwLock<PlayerRegistryInner>>);
pub(crate) struct PlayerRegistry(RwLock<PlayerRegistryInner>);
impl PlayerRegistry {
pub fn empty(
room_registry: RoomRegistry,
dialogs: DialogRegistry,
storage: Storage,
metrics: &mut MetricsRegistry,
cluster_metadata: Arc<ClusterMetadata>,
cluster_client: LavinaClient,
broadcasting: Broadcasting,
) -> Result<PlayerRegistry> {
pub fn empty(metrics: &mut MetricsRegistry) -> Result<PlayerRegistry> {
let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?;
metrics.register(Box::new(metric_active_players.clone()))?;
let inner = PlayerRegistryInner {
room_registry,
dialogs,
storage,
cluster_metadata,
cluster_client,
broadcasting,
players: HashMap::new(),
metric_active_players,
};
Ok(PlayerRegistry(Arc::new(RwLock::new(inner))))
Ok(PlayerRegistry(RwLock::new(inner)))
}
pub fn shutdown(self) -> Result<()> {
let res = match Arc::try_unwrap(self.0) {
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire players ownership on shutdown")),
};
let res = res.into_inner();
pub fn shutdown(self) {
let res = self.0.into_inner();
drop(res);
Ok(())
}
#[tracing::instrument(skip(self), name = "PlayerRegistry::get_player")]
@ -323,8 +299,8 @@ impl PlayerRegistry {
}
}
#[tracing::instrument(skip(self), name = "PlayerRegistry::get_or_launch_player")]
pub async fn get_or_launch_player(&self, id: &PlayerId) -> PlayerHandle {
#[tracing::instrument(skip(self, core), name = "PlayerRegistry::get_or_launch_player")]
pub async fn get_or_launch_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerHandle {
let inner = self.0.read().await;
if let Some((handle, _)) = inner.players.get(id) {
handle.clone()
@ -334,17 +310,7 @@ impl PlayerRegistry {
if let Some((handle, _)) = inner.players.get(id) {
handle.clone()
} else {
let (handle, fiber) = Player::launch(
id.clone(),
self.clone(),
inner.room_registry.clone(),
inner.dialogs.clone(),
inner.cluster_metadata.clone(),
inner.cluster_client.clone(),
inner.broadcasting.clone(),
inner.storage.clone(),
)
.await;
let (handle, fiber) = Player::launch(id.clone(), core.clone()).await;
inner.players.insert(id.clone(), (handle.clone(), fiber));
inner.metric_active_players.inc();
handle
@ -352,13 +318,13 @@ impl PlayerRegistry {
}
}
#[tracing::instrument(skip(self), name = "PlayerRegistry::connect_to_player")]
pub async fn connect_to_player(&self, id: &PlayerId) -> PlayerConnection {
let player_handle = self.get_or_launch_player(id).await;
#[tracing::instrument(skip(self, core), name = "PlayerRegistry::connect_to_player")]
pub async fn connect_to_player(&self, core: &LavinaCore, id: &PlayerId) -> PlayerConnection {
let player_handle = self.get_or_launch_player(core, id).await;
player_handle.subscribe().await
}
pub async fn shutdown_all(&mut self) -> Result<()> {
pub async fn shutdown_all(&self) -> Result<()> {
let mut inner = self.0.write().await;
for (i, (k, j)) in inner.players.drain() {
k.send(ActorCommand::Stop).await;
@ -373,12 +339,6 @@ impl PlayerRegistry {
/// The player registry state representation.
struct PlayerRegistryInner {
room_registry: RoomRegistry,
dialogs: DialogRegistry,
storage: Storage,
cluster_metadata: Arc<ClusterMetadata>,
cluster_client: LavinaClient,
broadcasting: Broadcasting,
/// Active player actors.
players: HashMap<PlayerId, (PlayerHandle, JoinHandle<Player>)>,
metric_active_players: IntGauge,
@ -398,29 +358,14 @@ struct Player {
banned_from: HashSet<RoomId>,
rx: Receiver<(ActorCommand, Span)>,
handle: PlayerHandle,
players: PlayerRegistry,
rooms: RoomRegistry,
dialogs: DialogRegistry,
storage: Storage,
cluster_metadata: Arc<ClusterMetadata>,
cluster_client: LavinaClient,
broadcasting: Broadcasting,
services: LavinaCore,
}
impl Player {
async fn launch(
player_id: PlayerId,
players: PlayerRegistry,
rooms: RoomRegistry,
dialogs: DialogRegistry,
cluster_metadata: Arc<ClusterMetadata>,
cluster_client: LavinaClient,
broadcasting: Broadcasting,
storage: Storage,
) -> (PlayerHandle, JoinHandle<Player>) {
async fn launch(player_id: PlayerId, core: LavinaCore) -> (PlayerHandle, JoinHandle<Player>) {
let (tx, rx) = channel(32);
let handle = PlayerHandle { tx };
let handle_clone = handle.clone();
let storage_id = storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap();
let storage_id = core.services.storage.retrieve_user_id_by_name(player_id.as_inner()).await.unwrap().unwrap();
let player = Player {
player_id,
storage_id,
@ -432,22 +377,16 @@ impl Player {
banned_from: HashSet::new(),
rx,
handle,
players,
rooms,
dialogs,
storage,
cluster_metadata,
cluster_client,
broadcasting,
services: core,
};
let fiber = tokio::task::spawn(player.main_loop());
(handle_clone, fiber)
}
fn room_location(&self, room_id: &RoomId) -> Option<u32> {
let res = self.cluster_metadata.rooms.get(room_id.as_inner().as_ref()).copied();
let node = res.unwrap_or(self.cluster_metadata.main_owner);
if node == self.cluster_metadata.node_id {
let res = self.services.cluster_metadata.rooms.get(room_id.as_inner().as_ref()).copied();
let node = res.unwrap_or(self.services.cluster_metadata.main_owner);
if node == self.services.cluster_metadata.node_id {
None
} else {
Some(node)
@ -455,13 +394,13 @@ impl Player {
}
async fn main_loop(mut self) -> Self {
let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
let rooms = self.services.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
for room_id in rooms {
if let Some(remote_node) = self.room_location(&room_id) {
self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node });
self.broadcasting.subscribe(self.player_id.clone(), room_id).await;
self.services.subscribe(self.player_id.clone(), room_id).await;
} else {
let room = self.rooms.get_room(&room_id).await;
let room = self.services.rooms.get_room(&self.services, &room_id).await;
if let Some(room) = room {
self.my_rooms.insert(room_id, RoomRef::Local(room));
} else {
@ -587,9 +526,10 @@ impl Player {
room_id: room_id.as_inner(),
player_id: self.player_id.as_inner(),
};
self.cluster_client.join_room(remote_node, req).await.unwrap();
let room_storage_id = self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
self.storage.add_room_member(room_storage_id, self.storage_id).await.unwrap();
self.services.client.join_room(remote_node, req).await.unwrap();
let room_storage_id =
self.services.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
self.services.storage.add_room_member(room_storage_id, self.storage_id).await.unwrap();
self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node });
JoinResult::Success(RoomInfo {
id: room_id,
@ -597,14 +537,14 @@ impl Player {
members: vec![],
})
} else {
let room = match self.rooms.get_or_create_room(room_id.clone()).await {
let room = match self.services.rooms.get_or_create_room(&self.services, room_id.clone()).await {
Ok(room) => room,
Err(e) => {
log::error!("Failed to get or create room: {e}");
todo!();
}
};
room.add_member(&self.player_id, self.storage_id).await;
room.add_member(&self.services, &self.player_id, self.storage_id).await;
room.subscribe(&self.player_id, self.handle.clone()).await;
self.my_rooms.insert(room_id.clone(), RoomRef::Local(room.clone()));
let room_info = room.get_room_info().await;
@ -624,17 +564,17 @@ impl Player {
match room {
RoomRef::Local(room) => {
room.unsubscribe(&self.player_id).await;
room.remove_member(&self.player_id, self.storage_id).await;
room.remove_member(&self.services, &self.player_id, self.storage_id).await;
}
RoomRef::Remote { node_id } => {
let req = LeaveRoomReq {
room_id: room_id.as_inner(),
player_id: self.player_id.as_inner(),
};
self.cluster_client.leave_room(node_id, req).await.unwrap();
self.services.client.leave_room(node_id, req).await.unwrap();
let room_storage_id =
self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
self.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap();
self.services.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
self.services.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap();
}
}
}
@ -656,10 +596,10 @@ impl Player {
tracing::info!("no room found");
return SendMessageResult::NoSuchRoom;
};
let created_at = chrono::Utc::now();
let created_at = Utc::now();
match room {
RoomRef::Local(room) => {
room.send_message(&self.player_id, body.clone(), created_at.clone()).await;
room.send_message(&self.services, &self.player_id, body.clone(), created_at.clone()).await;
}
RoomRef::Remote { node_id } => {
let req = SendMessageReq {
@ -668,10 +608,9 @@ impl Player {
message: &*body,
created_at: &*created_at.to_rfc3339(),
};
self.cluster_client.send_room_message(*node_id, req).await.unwrap();
self.broadcasting
self.services.client.send_room_message(*node_id, req).await.unwrap();
self.services
.broadcast(
&self.players,
room_id.clone(),
self.player_id.clone(),
body.clone(),
@ -698,7 +637,7 @@ impl Player {
};
match room {
RoomRef::Local(room) => {
room.set_topic(&self.player_id, new_topic.clone()).await;
room.set_topic(&self.services, &self.player_id, new_topic.clone()).await;
}
RoomRef::Remote { node_id } => {
let req = SetRoomTopicReq {
@ -706,7 +645,7 @@ impl Player {
player_id: self.player_id.as_inner(),
topic: &*new_topic,
};
self.cluster_client.set_room_topic(*node_id, req).await.unwrap();
self.services.client.set_room_topic(*node_id, req).await.unwrap();
}
}
let update = Updates::RoomTopicChanged { room_id, new_topic };
@ -736,8 +675,11 @@ impl Player {
#[tracing::instrument(skip(self, body), name = "Player::send_dialog_message")]
async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) {
let created_at = chrono::Utc::now();
self.dialogs.send_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at).await.unwrap();
let created_at = Utc::now();
self.services
.send_dialog_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at)
.await
.unwrap();
let update = Updates::NewDialogMessage {
sender: self.player_id.clone(),
receiver: recipient.clone(),
@ -749,7 +691,7 @@ impl Player {
#[tracing::instrument(skip(self), name = "Player::check_user_existence")]
async fn check_user_existence(&self, recipient: PlayerId) -> GetInfoResult {
if self.storage.check_user_existence(recipient.as_inner().as_ref()).await.unwrap() {
if self.services.storage.check_user_existence(recipient.as_inner().as_ref()).await.unwrap() {
GetInfoResult::UserExists
} else {
GetInfoResult::UserDoesntExist

View File

@ -1,7 +1,6 @@
//! Storage and persistence logic.
use std::str::FromStr;
use std::sync::Arc;
use serde::Deserialize;
use sqlx::sqlite::SqliteConnectOptions;
@ -20,9 +19,8 @@ pub struct StorageConfig {
pub db_path: String,
}
#[derive(Clone)]
pub struct Storage {
conn: Arc<Mutex<SqliteConnection>>,
conn: Mutex<SqliteConnection>,
}
impl Storage {
pub async fn open(config: StorageConfig) -> Result<Storage> {
@ -34,17 +32,17 @@ impl Storage {
migrator.run(&mut conn).await?;
log::info!("Migrations passed");
let conn = Arc::new(Mutex::new(conn));
let conn = Mutex::new(conn);
Ok(Storage { conn })
}
pub async fn close(self) -> Result<()> {
let res = match Arc::try_unwrap(self.conn) {
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire DB ownership on shutdown")),
};
let res = res.into_inner();
res.close().await?;
Ok(())
pub async fn close(self) {
let res = self.conn.into_inner();
match res.close().await {
Ok(_) => {}
Err(e) => {
tracing::error!("Failed to close the DB connection: {e:?}");
}
}
}
}

View File

@ -30,7 +30,7 @@ impl Storage {
}
#[tracing::instrument(skip(self, topic), name = "Storage::create_new_room")]
pub async fn create_new_room(&mut self, name: &str, topic: &str) -> Result<u32> {
pub async fn create_new_room(&self, name: &str, topic: &str) -> Result<u32> {
let mut executor = self.conn.lock().await;
let (id,): (u32,) = sqlx::query_as(
"insert into rooms(name, topic)
@ -47,7 +47,7 @@ impl Storage {
#[tracing::instrument(skip(self, content, created_at), name = "Storage::insert_room_message")]
pub async fn insert_room_message(
&mut self,
&self,
room_id: u32,
id: u32,
content: &str,
@ -130,7 +130,7 @@ impl Storage {
}
#[tracing::instrument(skip(self, topic), name = "Storage::set_room_topic")]
pub async fn set_room_topic(&mut self, id: u32, topic: &str) -> Result<()> {
pub async fn set_room_topic(&self, id: u32, topic: &str) -> Result<()> {
let mut executor = self.conn.lock().await;
sqlx::query(
"update rooms

View File

@ -9,7 +9,7 @@ use tokio::sync::RwLock as AsyncRwLock;
use crate::player::{PlayerHandle, PlayerId, Updates};
use crate::prelude::*;
use crate::repo::Storage;
use crate::Services;
/// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
@ -35,41 +35,32 @@ impl RoomId {
}
/// Shared data structure for storing metadata about rooms.
#[derive(Clone)]
pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
pub(crate) struct RoomRegistry(AsyncRwLock<RoomRegistryInner>);
impl RoomRegistry {
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> {
pub fn new(metrics: &mut MetricRegistry) -> Result<RoomRegistry> {
let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
metrics.register(Box::new(metric_active_rooms.clone()))?;
let inner = RoomRegistryInner {
rooms: HashMap::new(),
metric_active_rooms,
storage,
};
Ok(RoomRegistry(Arc::new(AsyncRwLock::new(inner))))
Ok(RoomRegistry(AsyncRwLock::new(inner)))
}
pub fn shutdown(self) -> Result<()> {
let res = match Arc::try_unwrap(self.0) {
Ok(e) => e,
Err(_) => return Err(fail("failed to acquire rooms ownership on shutdown")),
};
let res = res.into_inner();
// TODO drop all rooms
drop(res);
Ok(())
pub fn shutdown(self) {
// TODO iterate over rooms and stop them
}
#[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")]
pub async fn get_or_create_room(&self, room_id: RoomId) -> Result<RoomHandle> {
#[tracing::instrument(skip(self, services), name = "RoomRegistry::get_or_create_room")]
pub async fn get_or_create_room(&self, services: &Services, room_id: RoomId) -> Result<RoomHandle> {
let mut inner = self.0.write().await;
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? {
if let Some(room_handle) = inner.get_or_load_room(services, &room_id).await? {
Ok(room_handle.clone())
} else {
log::debug!("Creating room {}...", &room_id.0);
let topic = "New room";
let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?;
let id = services.storage.create_new_room(&*room_id.0, &*topic).await?;
let room = Room {
storage_id: id,
room_id: room_id.clone(),
@ -77,7 +68,6 @@ impl RoomRegistry {
members: HashSet::new(),
topic: topic.into(),
message_count: 0,
storage: inner.storage.clone(),
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone());
@ -86,10 +76,10 @@ impl RoomRegistry {
}
}
#[tracing::instrument(skip(self), name = "RoomRegistry::get_room")]
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
#[tracing::instrument(skip(self, services), name = "RoomRegistry::get_room")]
pub async fn get_room(&self, services: &Services, room_id: &RoomId) -> Option<RoomHandle> {
let mut inner = self.0.write().await;
inner.get_or_load_room(room_id).await.unwrap()
inner.get_or_load_room(services, room_id).await.unwrap()
}
#[tracing::instrument(skip(self), name = "RoomRegistry::get_all_rooms")]
@ -110,16 +100,15 @@ impl RoomRegistry {
struct RoomRegistryInner {
rooms: HashMap<RoomId, RoomHandle>,
metric_active_rooms: IntGauge,
storage: Storage,
}
impl RoomRegistryInner {
#[tracing::instrument(skip(self), name = "RoomRegistryInner::get_or_load_room")]
async fn get_or_load_room(&mut self, room_id: &RoomId) -> Result<Option<RoomHandle>> {
#[tracing::instrument(skip(self, services), name = "RoomRegistryInner::get_or_load_room")]
async fn get_or_load_room(&mut self, services: &Services, room_id: &RoomId) -> Result<Option<RoomHandle>> {
if let Some(room_handle) = self.rooms.get(room_id) {
log::debug!("Room {} was loaded already", &room_id.0);
Ok(Some(room_handle.clone()))
} else if let Some(stored_room) = self.storage.retrieve_room_by_name(&*room_id.0).await? {
} else if let Some(stored_room) = services.storage.retrieve_room_by_name(&*room_id.0).await? {
log::debug!("Loading room {}...", &room_id.0);
let room = Room {
storage_id: stored_room.id,
@ -128,7 +117,6 @@ impl RoomRegistryInner {
members: HashSet::new(), // TODO load members from storage
topic: stored_room.topic.into(),
message_count: stored_room.message_count,
storage: self.storage.clone(),
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
self.rooms.insert(room_id.clone(), room_handle.clone());
@ -152,13 +140,13 @@ impl RoomHandle {
lock.subscriptions.insert(player_id.clone(), player_handle);
}
#[tracing::instrument(skip(self), name = "RoomHandle::add_member")]
pub async fn add_member(&self, player_id: &PlayerId, player_storage_id: u32) {
#[tracing::instrument(skip(self, services), name = "RoomHandle::add_member")]
pub async fn add_member(&self, services: &Services, player_id: &PlayerId, player_storage_id: u32) {
let mut lock = self.0.write().await;
tracing::info!("Adding a new member to a room");
let room_storage_id = lock.storage_id;
if !lock.storage.is_room_member(room_storage_id, player_storage_id).await.unwrap() {
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
if !services.storage.is_room_member(room_storage_id, player_storage_id).await.unwrap() {
services.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
} else {
tracing::warn!("User {:#?} has already been added to the room.", player_id);
}
@ -176,12 +164,12 @@ impl RoomHandle {
lock.subscriptions.remove(player_id);
}
#[tracing::instrument(skip(self), name = "RoomHandle::remove_member")]
pub async fn remove_member(&self, player_id: &PlayerId, player_storage_id: u32) {
#[tracing::instrument(skip(self, services), name = "RoomHandle::remove_member")]
pub async fn remove_member(&self, services: &Services, player_id: &PlayerId, player_storage_id: u32) {
let mut lock = self.0.write().await;
tracing::info!("Removing a member from a room");
let room_storage_id = lock.storage_id;
lock.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap();
services.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap();
lock.members.remove(player_id);
let update = Updates::RoomLeft {
room_id: lock.room_id.clone(),
@ -190,10 +178,10 @@ impl RoomHandle {
lock.broadcast_update(update, player_id).await;
}
#[tracing::instrument(skip(self, body, created_at), name = "RoomHandle::send_message")]
pub async fn send_message(&self, player_id: &PlayerId, body: Str, created_at: DateTime<Utc>) {
#[tracing::instrument(skip(self, services, body, created_at), name = "RoomHandle::send_message")]
pub async fn send_message(&self, services: &Services, player_id: &PlayerId, body: Str, created_at: DateTime<Utc>) {
let mut lock = self.0.write().await;
let res = lock.send_message(player_id, body, created_at).await;
let res = lock.send_message(services, player_id, body, created_at).await;
if let Err(err) = res {
log::warn!("Failed to send message: {err:?}");
}
@ -209,12 +197,12 @@ impl RoomHandle {
}
}
#[tracing::instrument(skip(self, new_topic), name = "RoomHandle::set_topic")]
pub async fn set_topic(&self, changer_id: &PlayerId, new_topic: Str) {
#[tracing::instrument(skip(self, services, new_topic), name = "RoomHandle::set_topic")]
pub async fn set_topic(&self, services: &Services, changer_id: &PlayerId, new_topic: Str) {
let mut lock = self.0.write().await;
let storage_id = lock.storage_id;
lock.topic = new_topic.clone();
lock.storage.set_room_topic(storage_id, &new_topic).await.unwrap();
services.storage.set_room_topic(storage_id, &new_topic).await.unwrap();
let update = Updates::RoomTopicChanged {
room_id: lock.room_id.clone(),
new_topic: new_topic.clone(),
@ -235,14 +223,20 @@ struct Room {
/// The total number of messages. Used to calculate the id of the new message.
message_count: u32,
topic: Str,
storage: Storage,
}
impl Room {
#[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")]
async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime<Utc>) -> Result<()> {
#[tracing::instrument(skip(self, services, body, created_at), name = "Room::send_message")]
async fn send_message(
&mut self,
services: &Services,
author_id: &PlayerId,
body: Str,
created_at: DateTime<Utc>,
) -> Result<()> {
tracing::info!("Adding a message to room");
self.storage
services
.storage
.insert_room_message(
self.storage_id,
self.message_count,

View File

@ -14,10 +14,10 @@ use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::channel;
use lavina_core::auth::{Authenticator, Verdict};
use lavina_core::auth::Verdict;
use lavina_core::player::*;
use lavina_core::prelude::*;
use lavina_core::room::{RoomId, RoomInfo, RoomRegistry};
use lavina_core::room::{RoomId, RoomInfo};
use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use proto_irc::client::CapabilitySubcommand;
@ -79,7 +79,7 @@ async fn handle_socket(
match registered_user {
Ok(user) => {
log::debug!("User registered");
handle_registered_socket(config, core.players, core.rooms, &mut reader, &mut writer, user).await?;
handle_registered_socket(config, &core, &mut reader, &mut writer, user).await?;
}
Err(err) => {
log::debug!("Registration failed: {err}");
@ -215,7 +215,7 @@ impl RegistrationState {
realname,
enabled_capabilities: self.enabled_capabilities,
};
self.finalize_auth(candidate_user, writer, &core.authenticator, config).await
self.finalize_auth(candidate_user, writer, core, config).await
}
},
ClientMessage::Nick { nickname } => {
@ -229,7 +229,7 @@ impl RegistrationState {
realname: realname.clone(),
enabled_capabilities: self.enabled_capabilities,
};
self.finalize_auth(candidate_user, writer, &core.authenticator, config).await
self.finalize_auth(candidate_user, writer, core, config).await
} else {
self.future_nickname = Some(nickname);
Ok(None)
@ -246,7 +246,7 @@ impl RegistrationState {
realname,
enabled_capabilities: self.enabled_capabilities,
};
self.finalize_auth(candidate_user, writer, &core.authenticator, config).await
self.finalize_auth(candidate_user, writer, core, config).await
} else {
self.future_username = Some((username, realname));
Ok(None)
@ -277,7 +277,7 @@ impl RegistrationState {
}
} else {
let body = AuthBody::from_str(body.as_bytes())?;
if let Err(e) = auth_user(&core.authenticator, &body.login, &body.password).await {
if let Err(e) = auth_user(core, &body.login, &body.password).await {
tracing::warn!("Authentication failed: {:?}", e);
let target = self.future_nickname.clone().unwrap_or_else(|| "*".into());
sasl_fail_message(config.server_name.clone(), target, "Bad credentials".into())
@ -325,7 +325,7 @@ impl RegistrationState {
&mut self,
candidate_user: RegisteredUser,
writer: &mut BufWriter<WriteHalf<'_>>,
authenticator: &Authenticator,
core: &LavinaCore,
config: &ServerConfig,
) -> Result<Option<RegisteredUser>> {
if self.enabled_capabilities.contains(Capabilities::Sasl)
@ -344,7 +344,7 @@ impl RegistrationState {
writer.flush().await?;
return Ok(None);
};
auth_user(authenticator, &*candidate_user.nickname, &*candidate_password).await?;
auth_user(core, &*candidate_user.nickname, &*candidate_password).await?;
Ok(Some(candidate_user))
}
}
@ -406,8 +406,8 @@ fn sasl_fail_message(sender: Str, nick: Str, text: Str) -> ServerMessage {
}
}
async fn auth_user(authenticator: &Authenticator, login: &str, plain_password: &str) -> Result<()> {
let verdict = authenticator.authenticate(login, plain_password).await?;
async fn auth_user(core: &LavinaCore, login: &str, plain_password: &str) -> Result<()> {
let verdict = core.authenticate(login, plain_password).await?;
// TODO properly map these onto protocol messages
match verdict {
Verdict::Authenticated => Ok(()),
@ -418,8 +418,7 @@ async fn auth_user(authenticator: &Authenticator, login: &str, plain_password: &
async fn handle_registered_socket<'a>(
config: ServerConfig,
players: PlayerRegistry,
rooms: RoomRegistry,
core: &LavinaCore,
reader: &mut BufReader<ReadHalf<'a>>,
writer: &mut BufWriter<WriteHalf<'a>>,
user: RegisteredUser,
@ -428,7 +427,7 @@ async fn handle_registered_socket<'a>(
log::info!("Handling registered user: {user:?}");
let player_id = PlayerId::from(user.nickname.clone())?;
let mut connection = players.connect_to_player(&player_id).await;
let mut connection = core.connect_to_player(&player_id).await;
let text: Str = format!("Welcome to {} Server", &config.server_name).into();
ServerMessage {
@ -502,7 +501,7 @@ async fn handle_registered_socket<'a>(
len
};
let incoming = std::str::from_utf8(&buffer[0..len-2])?;
if let HandleResult::Leave = handle_incoming_message(incoming, &config, &user, &rooms, &mut connection, writer).await? {
if let HandleResult::Leave = handle_incoming_message(incoming, &config, &user, core, &mut connection, writer).await? {
break;
}
buffer.clear();
@ -510,7 +509,7 @@ async fn handle_registered_socket<'a>(
update = connection.receiver.recv() => {
match update {
Some(ConnectionMessage::Update(update)) => {
handle_update(&config, &user, &player_id, writer, &rooms, update).await?;
handle_update(&config, &user, &player_id, writer, core, update).await?;
}
Some(ConnectionMessage::Stop(_)) => {
tracing::debug!("Connection is being terminated");
@ -561,14 +560,14 @@ async fn handle_update(
user: &RegisteredUser,
player_id: &PlayerId,
writer: &mut (impl AsyncWrite + Unpin),
rooms: &RoomRegistry,
core: &LavinaCore,
update: Updates,
) -> Result<()> {
log::debug!("Sending irc message to player {player_id:?} on update {update:?}");
match update {
Updates::RoomJoined { new_member_id, room_id } => {
if player_id == &new_member_id {
if let Some(room) = rooms.get_room(&room_id).await {
if let Some(room) = core.get_room(&room_id).await {
let room_info = room.get_room_info().await;
let chan = Chan::Global(room_id.as_inner().clone());
produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?;
@ -691,7 +690,7 @@ async fn handle_incoming_message(
buffer: &str,
config: &ServerConfig,
user: &RegisteredUser,
rooms: &RoomRegistry,
core: &LavinaCore,
user_handle: &mut PlayerConnection,
writer: &mut (impl AsyncWrite + Unpin),
) -> Result<HandleResult> {
@ -775,7 +774,7 @@ async fn handle_incoming_message(
writer.flush().await?;
}
Recipient::Chan(Chan::Global(chan)) => {
let room = rooms.get_room(&RoomId::from(chan.clone())?).await;
let room = core.get_room(&RoomId::from(chan.clone())?).await;
if let Some(room) = room {
let room_info = room.get_room_info().await;
for member in room_info.members {

View File

@ -8,7 +8,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use lavina_core::auth::Authenticator;
use lavina_core::clustering::{ClusterConfig, ClusterMetadata};
use lavina_core::player::{JoinResult, PlayerId, SendMessageResult};
use lavina_core::repo::{Storage, StorageConfig};
@ -102,8 +101,6 @@ impl<'a> TestScope<'a> {
}
struct TestServer {
metrics: MetricsRegistry,
storage: Storage,
core: LavinaCore,
server: RunningServer,
}
@ -114,7 +111,7 @@ impl TestServer {
listen_on: "127.0.0.1:0".parse().unwrap(),
server_name: "testserver".into(),
};
let metrics = MetricsRegistry::new();
let mut metrics = MetricsRegistry::new();
let storage = Storage::open(StorageConfig {
db_path: ":memory:".into(),
})
@ -127,14 +124,9 @@ impl TestServer {
rooms: Default::default(),
},
};
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?;
let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer {
metrics,
storage,
core,
server,
})
Ok(TestServer { core, server })
}
async fn reboot(self) -> Result<TestServer> {
@ -150,30 +142,19 @@ impl TestServer {
rooms: Default::default(),
},
};
let TestServer {
metrics: _,
storage,
core,
server,
} = self;
let TestServer { core, server } = self;
server.terminate().await?;
core.shutdown().await?;
let metrics = MetricsRegistry::new();
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?;
let storage = core.shutdown().await;
let mut metrics = MetricsRegistry::new();
let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer {
metrics,
storage,
core,
server,
})
Ok(TestServer { core, server })
}
async fn shutdown(self) -> Result<()> {
self.server.terminate().await?;
self.core.shutdown().await?;
self.storage.close().await?;
Ok(())
async fn shutdown(self) {
let _ = self.server.terminate().await;
let storage = self.core.shutdown().await;
let _ = storage.close().await;
}
}
@ -183,8 +164,8 @@ async fn scenario_basic() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -202,7 +183,7 @@ async fn scenario_basic() -> Result<()> {
// wrap up
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -212,8 +193,8 @@ async fn scenario_join_and_reboot() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -272,7 +253,7 @@ async fn scenario_join_and_reboot() -> Result<()> {
// wrap up
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -282,8 +263,8 @@ async fn scenario_force_join_msg() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream1 = TcpStream::connect(server.server.addr).await?;
let mut s1 = TestScope::new(&mut stream1);
@ -338,7 +319,7 @@ async fn scenario_force_join_msg() -> Result<()> {
// wrap up
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -348,10 +329,10 @@ async fn scenario_two_users() -> Result<()> {
// test scenario
server.storage.create_user("tester1").await?;
Authenticator::new(server.storage.clone()).set_password("tester1", "password").await?;
server.storage.create_user("tester2").await?;
Authenticator::new(server.storage.clone()).set_password("tester2", "password").await?;
server.core.create_player(&PlayerId::from("tester1")?).await?;
server.core.set_password("tester1", "password").await?;
server.core.create_player(&PlayerId::from("tester2")?).await?;
server.core.set_password("tester2", "password").await?;
let mut stream1 = TcpStream::connect(server.server.addr).await?;
let mut s1 = TestScope::new(&mut stream1);
@ -410,7 +391,7 @@ async fn scenario_two_users() -> Result<()> {
stream2.shutdown().await?;
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -424,8 +405,8 @@ async fn scenario_cap_full_negotiation() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -454,7 +435,7 @@ async fn scenario_cap_full_negotiation() -> Result<()> {
// wrap up
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -464,8 +445,8 @@ async fn scenario_cap_full_negotiation_nick_last() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -493,7 +474,7 @@ async fn scenario_cap_full_negotiation_nick_last() -> Result<()> {
// wrap up
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -503,8 +484,8 @@ async fn scenario_cap_short_negotiation() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -531,7 +512,7 @@ async fn scenario_cap_short_negotiation() -> Result<()> {
// wrap up
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -541,8 +522,8 @@ async fn scenario_cap_sasl_fail() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -575,7 +556,7 @@ async fn scenario_cap_sasl_fail() -> Result<()> {
// wrap up
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -585,8 +566,8 @@ async fn terminate_socket_scenario() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -598,7 +579,7 @@ async fn terminate_socket_scenario() -> Result<()> {
s.send("AUTHENTICATE PLAIN").await?;
s.expect(":testserver AUTHENTICATE +").await?;
server.shutdown().await?;
server.shutdown().await;
assert_eq!(stream.read_u8().await.unwrap_err().kind(), ErrorKind::UnexpectedEof);
Ok(())
@ -610,8 +591,8 @@ async fn server_time_capability() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -636,8 +617,8 @@ async fn server_time_capability() -> Result<()> {
s.expect(":testserver 353 tester = #test :tester").await?;
s.expect(":testserver 366 tester #test :End of /NAMES list").await?;
server.storage.create_user("some_guy").await?;
let mut conn = server.core.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
server.core.create_player(&PlayerId::from("some_guy")?).await?;
let mut conn = server.core.connect_to_player(&PlayerId::from("some_guy").unwrap()).await;
let res = conn.join_room(RoomId::from("test").unwrap()).await?;
let JoinResult::Success(_) = res else {
panic!("Failed to join room");
@ -669,7 +650,7 @@ async fn server_time_capability() -> Result<()> {
// wrap up
server.shutdown().await?;
server.shutdown().await;
Ok(())
}
@ -679,10 +660,10 @@ async fn scenario_two_players_dialog() -> Result<()> {
// test scenario
server.storage.create_user("tester1").await?;
server.storage.set_password("tester1", "password").await?;
server.storage.create_user("tester2").await?;
server.storage.set_password("tester2", "password").await?;
server.core.create_player(&PlayerId::from("tester1")?).await?;
server.core.set_password("tester1", "password").await?;
server.core.create_player(&PlayerId::from("tester2")?).await?;
server.core.set_password("tester2", "password").await?;
let mut stream1 = TcpStream::connect(server.server.addr).await?;
let mut s1 = TestScope::new(&mut stream1);
@ -733,7 +714,7 @@ async fn scenario_two_players_dialog() -> Result<()> {
stream1.shutdown().await?;
stream2.shutdown().await?;
server.shutdown().await?;
server.shutdown().await;
Ok(())
}

View File

@ -2,9 +2,10 @@
use quick_xml::events::Event;
use lavina_core::room::{RoomId, RoomRegistry};
use lavina_core::room::RoomId;
use lavina_core::LavinaCore;
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Server};
use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType};
use proto_xmpp::client::{Iq, IqError, IqErrorCondition, IqErrorType, IqType};
use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery};
use proto_xmpp::mam::{Fin, Set};
use proto_xmpp::roster::RosterQuery;
@ -73,7 +74,7 @@ impl<'a> XmppConnection<'a> {
}
}
IqClientBody::DiscoItem(item) => {
let response = self.disco_items(iq.to.as_ref(), &item, self.rooms).await;
let response = self.disco_items(iq.to.as_ref(), &item, self.core).await;
let req = Iq {
from: iq.to,
id: iq.id,
@ -103,6 +104,7 @@ impl<'a> XmppConnection<'a> {
r#type: IqType::Error,
body: IqError {
r#type: IqErrorType::Cancel,
condition: None,
},
};
req.serialize(output);
@ -162,7 +164,7 @@ impl<'a> XmppConnection<'a> {
resource: None,
}) if server.0 == self.hostname_rooms => {
let room_id = RoomId::from(room_name.0.clone()).unwrap();
let Some(_) = self.rooms.get_room(&room_id).await else {
let Some(_) = self.core.get_room(&room_id).await else {
// TODO should return item-not-found
// example:
// <error type="cancel">
@ -171,6 +173,7 @@ impl<'a> XmppConnection<'a> {
// </error>
return Err(IqError {
r#type: IqErrorType::Cancel,
condition: Some(IqErrorCondition::ItemNotFound),
});
};
identity = vec![Identity {
@ -196,7 +199,7 @@ impl<'a> XmppConnection<'a> {
})
}
async fn disco_items(&self, to: Option<&Jid>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery {
async fn disco_items(&self, to: Option<&Jid>, req: &ItemQuery, core: &LavinaCore) -> ItemQuery {
let item = match to {
Some(Jid {
name: None,
@ -218,7 +221,7 @@ impl<'a> XmppConnection<'a> {
server,
resource: None,
}) if server.0 == self.hostname_rooms => {
let room_list = rooms.get_all_rooms().await;
let room_list = core.get_all_rooms().await;
room_list
.into_iter()
.map(|room_info| Item {

View File

@ -25,7 +25,6 @@ use tokio_rustls::TlsAcceptor;
use lavina_core::auth::Verdict;
use lavina_core::player::{ConnectionMessage, PlayerConnection, PlayerId, StopReason};
use lavina_core::prelude::*;
use lavina_core::room::RoomRegistry;
use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use proto_xmpp::bind::{Name, Resource};
@ -203,14 +202,14 @@ async fn handle_socket(
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &core, &hostname) => {
match authenticated {
Ok(authenticated) => {
let mut connection = core.players.connect_to_player(&authenticated.player_id).await;
let mut connection = core.connect_to_player(&authenticated.player_id).await;
socket_final(
&mut xml_reader,
&mut xml_writer,
&mut reader_buf,
&authenticated,
&mut connection,
&core.rooms,
&core,
&hostname,
)
.await?;
@ -296,7 +295,7 @@ async fn socket_auth(
match AuthBody::from_str(&auth.body) {
Ok(logopass) => {
let name = &logopass.login;
let verdict = core.authenticator.authenticate(name, &logopass.password).await?;
let verdict = core.authenticate(name, &logopass.password).await?;
match verdict {
Verdict::Authenticated => {
proto_xmpp::sasl::Success.write_xml(xml_writer).await?;
@ -326,7 +325,7 @@ async fn socket_final(
reader_buf: &mut Vec<u8>,
authenticated: &Authenticated,
user_handle: &mut PlayerConnection,
rooms: &RoomRegistry,
core: &LavinaCore,
hostname: &Str,
) -> Result<()> {
// TODO validate the server hostname received in the stream start
@ -359,7 +358,7 @@ async fn socket_final(
let mut conn = XmppConnection {
user: authenticated,
user_handle,
rooms,
core,
hostname: hostname.clone(),
hostname_rooms: format!("rooms.{}", hostname).into(),
};
@ -442,7 +441,7 @@ async fn socket_final(
struct XmppConnection<'a> {
user: &'a Authenticated,
user_handle: &'a mut PlayerConnection,
rooms: &'a RoomRegistry,
core: &'a LavinaCore,
hostname: Str,
hostname_rooms: Str,
}

View File

@ -1,11 +1,12 @@
//! Handling of all client2server presence stanzas
use anyhow::Result;
use quick_xml::events::Event;
use lavina_core::prelude::*;
use lavina_core::room::RoomId;
use proto_xmpp::bind::{Jid, Name, Server};
use proto_xmpp::client::Presence;
use proto_xmpp::muc::XUser;
use proto_xmpp::xml::{Ignore, ToXml};
use crate::XmppConnection;
@ -60,10 +61,10 @@ impl<'a> XmppConnection<'a> {
}
// todo: return Presence and serialize on the outside.
async fn muc_presence(&mut self, name: &Name) -> Result<(Presence<()>)> {
async fn muc_presence(&mut self, name: &Name) -> Result<(Presence<XUser>)> {
let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?;
// TODO handle bans
let response = Presence::<()> {
let response = Presence {
to: Some(Jid {
name: Some(self.user.xmpp_name.clone()),
server: Server(self.hostname.clone()),
@ -74,6 +75,7 @@ impl<'a> XmppConnection<'a> {
server: Server(self.hostname_rooms.clone()),
resource: Some(self.user.xmpp_muc_name.clone()),
}),
custom: vec![XUser],
..Default::default()
};
Ok(response)
@ -87,15 +89,17 @@ impl<'a> XmppConnection<'a> {
mod tests {
use crate::testkit::{expect_user_authenticated, TestServer};
use crate::Authenticated;
use anyhow::Result;
use lavina_core::player::PlayerId;
use proto_xmpp::bind::{Jid, Name, Resource, Server};
use proto_xmpp::client::Presence;
use proto_xmpp::muc::XUser;
#[tokio::test]
async fn test_muc_joining() {
async fn test_muc_joining() -> Result<()> {
let server = TestServer::start().await.unwrap();
server.storage.create_user("tester").await.unwrap();
server.core.create_player(&PlayerId::from("tester")?).await?;
let player_id = PlayerId::from("tester").unwrap();
let user = Authenticated {
@ -105,11 +109,11 @@ mod tests {
xmpp_muc_name: Resource("tester".into()),
};
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
let expected = Presence::<()> {
let expected = Presence {
to: Some(Jid {
name: Some(conn.user.xmpp_name.clone()),
server: Server(conn.hostname.clone()),
@ -120,20 +124,22 @@ mod tests {
server: Server(conn.hostname_rooms.clone()),
resource: Some(conn.user.xmpp_muc_name.clone()),
}),
custom: vec![XUser],
..Default::default()
};
assert_eq!(expected, response);
server.shutdown().await.unwrap();
Ok(())
}
// Test that joining a room second time after a server restart,
// i.e. in-memory cache of memberships is cleaned, does not cause any issues.
#[tokio::test]
async fn test_muc_joining_twice() {
async fn test_muc_joining_twice() -> Result<()> {
let server = TestServer::start().await.unwrap();
server.storage.create_user("tester").await.unwrap();
server.core.create_player(&PlayerId::from("tester")?).await?;
let player_id = PlayerId::from("tester").unwrap();
let user = Authenticated {
@ -143,7 +149,7 @@ mod tests {
xmpp_muc_name: Resource("tester".into()),
};
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
@ -165,12 +171,13 @@ mod tests {
drop(conn);
let server = server.reboot().await.unwrap();
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
let mut player_conn = server.core.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
assert_eq!(expected, response);
server.shutdown().await.unwrap();
Ok(())
}
}

View File

@ -8,15 +8,13 @@ use lavina_core::LavinaCore;
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server};
pub(crate) struct TestServer {
pub metrics: MetricsRegistry,
pub storage: Storage,
pub core: LavinaCore,
}
impl TestServer {
pub async fn start() -> anyhow::Result<TestServer> {
let _ = tracing_subscriber::fmt::try_init();
let metrics = MetricsRegistry::new();
let mut metrics = MetricsRegistry::new();
let storage = Storage::open(StorageConfig {
db_path: ":memory:".into(),
})
@ -29,14 +27,14 @@ impl TestServer {
},
addresses: vec![],
};
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?;
Ok(TestServer { metrics, storage, core })
let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
Ok(TestServer { core })
}
pub async fn reboot(self) -> anyhow::Result<TestServer> {
self.core.shutdown().await?;
let storage = self.core.shutdown().await;
let metrics = MetricsRegistry::new();
let mut metrics = MetricsRegistry::new();
let cluster_config = ClusterConfig {
metadata: ClusterMetadata {
node_id: 0,
@ -45,18 +43,14 @@ impl TestServer {
},
addresses: vec![],
};
let core = LavinaCore::new(metrics.clone(), cluster_config, self.storage.clone()).await?;
let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
Ok(TestServer {
metrics,
storage: self.storage.clone(),
core,
})
Ok(TestServer { core })
}
pub async fn shutdown(self) -> anyhow::Result<()> {
self.core.shutdown().await?;
self.storage.close().await?;
let storage = self.core.shutdown().await;
storage.close().await;
Ok(())
}
}
@ -69,7 +63,7 @@ pub async fn expect_user_authenticated<'a>(
let conn = XmppConnection {
user: &user,
user_handle: conn,
rooms: &server.core.rooms,
core: &server.core,
hostname: "localhost".into(),
hostname_rooms: "rooms.localhost".into(),
};

View File

@ -18,8 +18,8 @@ use tokio_rustls::rustls::client::ServerCertVerifier;
use tokio_rustls::rustls::{ClientConfig, ServerName};
use tokio_rustls::TlsConnector;
use lavina_core::auth::Authenticator;
use lavina_core::clustering::{ClusterConfig, ClusterMetadata};
use lavina_core::player::PlayerId;
use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::LavinaCore;
use projection_xmpp::{launch, RunningServer, ServerConfig};
@ -141,8 +141,6 @@ impl ServerCertVerifier for IgnoreCertVerification {
}
struct TestServer {
metrics: MetricsRegistry,
storage: Storage,
core: LavinaCore,
server: RunningServer,
}
@ -156,7 +154,7 @@ impl TestServer {
key: "tests/certs/xmpp.key".parse().unwrap(),
hostname: "localhost".into(),
};
let metrics = MetricsRegistry::new();
let mut metrics = MetricsRegistry::new();
let storage = Storage::open(StorageConfig {
db_path: ":memory:".into(),
})
@ -169,20 +167,15 @@ impl TestServer {
rooms: Default::default(),
},
};
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?;
let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer {
metrics,
storage,
core,
server,
})
Ok(TestServer { core, server })
}
async fn shutdown(self) -> Result<()> {
self.server.terminate().await?;
self.core.shutdown().await?;
self.storage.close().await?;
let storage = self.core.shutdown().await;
storage.close().await;
Ok(())
}
}
@ -193,8 +186,8 @@ async fn scenario_basic() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -261,8 +254,8 @@ async fn scenario_wrong_password() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -316,8 +309,8 @@ async fn scenario_basic_without_headers() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -362,8 +355,8 @@ async fn terminate_socket() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);
@ -402,8 +395,8 @@ async fn test_message_archive_request() -> Result<()> {
// test scenario
server.storage.create_user("tester").await?;
Authenticator::new(server.storage.clone()).set_password("tester", "password").await?;
server.core.create_player(&PlayerId::from("tester")?).await?;
server.core.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.server.addr).await?;
let mut s = TestScope::new(&mut stream);

View File

@ -260,6 +260,18 @@ impl MessageType {
/// https://xmpp.org/rfcs/rfc6120.html#stanzas-error
pub struct IqError {
pub r#type: IqErrorType,
pub condition: Option<IqErrorCondition>,
}
pub enum IqErrorCondition {
ItemNotFound,
}
impl IqErrorCondition {
pub fn as_str(&self) -> &'static str {
match self {
IqErrorCondition::ItemNotFound => "item-not-found",
}
}
}
pub enum IqErrorType {
@ -289,8 +301,19 @@ impl IqErrorType {
impl ToXml for IqError {
fn serialize(&self, events: &mut Vec<Event<'static>>) {
let bytes = BytesStart::new(format!(r#"error xmlns="{}" type="{}""#, XMLNS, self.r#type.as_str()));
match self.condition {
None => {
events.push(Event::Empty(bytes));
}
Some(IqErrorCondition::ItemNotFound) => {
events.push(Event::Start(bytes));
let bytes2 = BytesStart::new(r#"item-not-found xmlns="urn:ietf:params:xml:ns:xmpp-stanzas""#);
events.push(Event::Empty(bytes2));
let bytes = BytesEnd::new("error");
events.push(Event::End(bytes));
}
}
}
}
#[derive(PartialEq, Eq, Debug)]
@ -646,6 +669,9 @@ impl<T: ToXml> ToXml for Presence<T> {
Event::End(BytesEnd::new("priority")),
]);
}
for c in &self.custom {
c.serialize(events);
}
events.push(Event::End(BytesEnd::new("presence")));
}
}

View File

@ -1,12 +1,13 @@
#![allow(unused_variables)]
use quick_xml::events::Event;
use quick_xml::events::{BytesStart, Event};
use quick_xml::name::ResolveResult;
use crate::xml::*;
use anyhow::{anyhow, Result};
pub const XMLNS: &'static str = "http://jabber.org/protocol/muc";
pub const XMLNS_USER: &'static str = "http://jabber.org/protocol/muc#user";
#[derive(PartialEq, Eq, Debug, Default)]
pub struct History {
@ -143,6 +144,16 @@ impl FromXml for X {
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct XUser;
impl ToXml for XUser {
fn serialize(&self, output: &mut Vec<Event<'static>>) {
let mut tag = BytesStart::new("x");
tag.push_attribute(("xmlns", XMLNS_USER));
output.push(Event::Empty(tag));
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -13,10 +13,9 @@ use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use lavina_core::auth::UpdatePasswordResult;
use lavina_core::player::{PlayerId, PlayerRegistry, SendMessageResult};
use lavina_core::player::{PlayerId, SendMessageResult};
use lavina_core::prelude::*;
use lavina_core::repo::Storage;
use lavina_core::room::{RoomId, RoomRegistry};
use lavina_core::room::RoomId;
use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use mgmt_api::*;
@ -30,16 +29,11 @@ pub struct ServerConfig {
pub listen_on: SocketAddr,
}
pub async fn launch(
config: ServerConfig,
metrics: MetricsRegistry,
core: LavinaCore,
storage: Storage,
) -> Result<Terminator> {
pub async fn launch(config: ServerConfig, metrics: MetricsRegistry, core: LavinaCore) -> Result<Terminator> {
log::info!("Starting the http service");
let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started");
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, storage, rx.map(|_| ())));
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, rx.map(|_| ())));
Ok(terminator)
}
@ -47,7 +41,6 @@ async fn main_loop(
listener: TcpListener,
metrics: MetricsRegistry,
core: LavinaCore,
storage: Storage,
termination: impl Future<Output = ()>,
) -> Result<()> {
pin!(termination);
@ -60,9 +53,8 @@ async fn main_loop(
let stream = TokioIo::new(stream);
let metrics = metrics.clone();
let core = core.clone();
let storage = storage.clone();
tokio::task::spawn(async move {
let svc_fn = service_fn(|r| route(&metrics, &core, &storage, r));
let svc_fn = service_fn(|r| route(&metrics, &core, r));
let server = http1::Builder::new().serve_connection(stream, svc_fn);
if let Err(err) = server.await {
tracing::error!("Error serving connection: {:?}", err);
@ -79,19 +71,18 @@ async fn main_loop(
async fn route(
registry: &MetricsRegistry,
core: &LavinaCore,
storage: &Storage,
request: Request<hyper::body::Incoming>,
) -> HttpResult<Response<Full<Bytes>>> {
propagade_span_from_headers(&request);
let res = match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => endpoint_metrics(registry),
(&Method::GET, "/rooms") => endpoint_rooms(&core.rooms).await,
(&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(),
(&Method::POST, paths::STOP_PLAYER) => endpoint_stop_player(request, &core.players).await.or5xx(),
(&Method::GET, "/rooms") => endpoint_rooms(core).await,
(&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, core).await.or5xx(),
(&Method::POST, paths::STOP_PLAYER) => endpoint_stop_player(request, core).await.or5xx(),
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, core).await.or5xx(),
(&Method::POST, rooms::paths::SEND_MESSAGE) => endpoint_send_room_message(request, core).await.or5xx(),
(&Method::POST, rooms::paths::SET_TOPIC) => endpoint_set_room_topic(request, core).await.or5xx(),
_ => clustering::route(core, storage, request).await.unwrap_or_else(endpoint_not_found),
_ => clustering::route(core, request).await.unwrap_or_else(endpoint_not_found),
};
Ok(res)
}
@ -104,23 +95,23 @@ fn endpoint_metrics(registry: &MetricsRegistry) -> Response<Full<Bytes>> {
}
#[tracing::instrument(skip_all)]
async fn endpoint_rooms(rooms: &RoomRegistry) -> Response<Full<Bytes>> {
async fn endpoint_rooms(core: &LavinaCore) -> Response<Full<Bytes>> {
// TODO introduce management API types independent from core-domain types
// TODO remove `Serialize` implementations from all core-domain types
let room_list = rooms.get_all_rooms().await.to_body();
let room_list = core.get_all_rooms().await.to_body();
Response::new(room_list)
}
#[tracing::instrument(skip_all)]
async fn endpoint_create_player(
request: Request<hyper::body::Incoming>,
storage: &Storage,
core: &LavinaCore,
) -> Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes();
let Ok(res) = serde_json::from_slice::<CreatePlayerRequest>(&str[..]) else {
return Ok(malformed_request());
};
storage.create_user(&res.name).await?;
core.create_player(&PlayerId::from(res.name)?).await?;
log::info!("Player {} created", res.name);
let mut response = Response::new(Full::<Bytes>::default());
*response.status_mut() = StatusCode::CREATED;
@ -130,7 +121,7 @@ async fn endpoint_create_player(
#[tracing::instrument(skip_all)]
async fn endpoint_stop_player(
request: Request<hyper::body::Incoming>,
players: &PlayerRegistry,
core: &LavinaCore,
) -> Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes();
let Ok(res) = serde_json::from_slice::<StopPlayerRequest>(&str[..]) else {
@ -139,7 +130,7 @@ async fn endpoint_stop_player(
let Ok(player_id) = PlayerId::from(res.name) else {
return Ok(player_not_found());
};
let Some(()) = players.stop_player(&player_id).await? else {
let Some(()) = core.stop_player(&player_id).await? else {
return Ok(player_not_found());
};
Ok(empty_204_request())
@ -154,7 +145,7 @@ async fn endpoint_set_password(
let Ok(res) = serde_json::from_slice::<ChangePasswordRequest>(&str[..]) else {
return Ok(malformed_request());
};
let verdict = core.authenticator.set_password(&res.player_name, &res.password).await?;
let verdict = core.set_password(&res.player_name, &res.password).await?;
match verdict {
UpdatePasswordResult::PasswordUpdated => {}
UpdatePasswordResult::UserNotFound => {
@ -179,7 +170,7 @@ async fn endpoint_send_room_message(
let Ok(player_id) = PlayerId::from(req.author_id) else {
return Ok(player_not_found());
};
let mut player = core.players.connect_to_player(&player_id).await;
let mut player = core.connect_to_player(&player_id).await;
let res = player.send_message(room_id, req.message.into()).await?;
match res {
SendMessageResult::NoSuchRoom => Ok(room_not_found()),
@ -202,7 +193,7 @@ async fn endpoint_set_room_topic(
let Ok(player_id) = PlayerId::from(req.author_id) else {
return Ok(player_not_found());
};
let mut player = core.players.connect_to_player(&player_id).await;
let mut player = core.connect_to_player(&player_id).await;
player.change_topic(room_id, req.topic.into()).await?;
Ok(empty_204_request())
}

View File

@ -6,19 +6,14 @@ use super::Or5xx;
use crate::http::{empty_204_request, malformed_request, player_not_found, room_not_found};
use lavina_core::clustering::room::{paths, JoinRoomReq, SendMessageReq};
use lavina_core::player::PlayerId;
use lavina_core::repo::Storage;
use lavina_core::room::RoomId;
use lavina_core::LavinaCore;
// TODO move this into core
pub async fn route(
core: &LavinaCore,
storage: &Storage,
request: Request<hyper::body::Incoming>,
) -> Option<Response<Full<Bytes>>> {
pub async fn route(core: &LavinaCore, request: Request<hyper::body::Incoming>) -> Option<Response<Full<Bytes>>> {
match (request.method(), request.uri().path()) {
(&Method::POST, paths::JOIN) => Some(endpoint_cluster_join_room(request, core, storage).await.or5xx()),
(&Method::POST, paths::JOIN) => Some(endpoint_cluster_join_room(request, core).await.or5xx()),
(&Method::POST, paths::ADD_MESSAGE) => Some(endpoint_cluster_add_message(request, core).await.or5xx()),
_ => None,
}
@ -28,7 +23,6 @@ pub async fn route(
async fn endpoint_cluster_join_room(
request: Request<hyper::body::Incoming>,
core: &LavinaCore,
storage: &Storage,
) -> lavina_core::prelude::Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes();
let Ok(req) = serde_json::from_slice::<JoinRoomReq>(&str[..]) else {
@ -43,9 +37,7 @@ async fn endpoint_cluster_join_room(
dbg!(&req.player_id);
return Ok(player_not_found());
};
let room_handle = core.rooms.get_or_create_room(room_id).await.unwrap();
let storage_id = storage.create_or_retrieve_user_id_by_name(req.player_id).await?;
room_handle.add_member(&player_id, storage_id).await;
core.cluster_join_room(room_id, &player_id).await?;
Ok(empty_204_request())
}
@ -71,10 +63,10 @@ async fn endpoint_cluster_add_message(
dbg!(&req.player_id);
return Ok(player_not_found());
};
let Some(room_handle) = core.rooms.get_room(&room_id).await else {
dbg!(&room_id);
return Ok(room_not_found());
};
room_handle.send_message(&player_id, req.message.into(), created_at.to_utc()).await;
let res = core.cluster_send_room_message(room_id, &player_id, req.message.into(), created_at.to_utc()).await?;
if let Some(_) = res {
Ok(empty_204_request())
} else {
Ok(room_not_found())
}
}

View File

@ -69,10 +69,10 @@ async fn main() -> Result<()> {
cluster: cluster_config,
tracing: _,
} = config;
let metrics = MetricsRegistry::new();
let mut metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?;
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?;
let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone(), storage.clone()).await?;
let core = LavinaCore::new(&mut metrics, cluster_config, storage).await?;
let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone()).await?;
let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone()).await?;
let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone()).await?;
tracing::info!("Started");
@ -83,7 +83,8 @@ async fn main() -> Result<()> {
xmpp.terminate().await?;
irc.terminate().await?;
telemetry_terminator.terminate().await?;
core.shutdown().await?;
let storage = core.shutdown().await;
storage.close().await;
tracing::info!("Shutdown complete");
Ok(())
}