diff --git a/Cargo.lock b/Cargo.lock index 2469e90..015cb46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -854,6 +854,7 @@ dependencies = [ "futures-util", "http-body-util", "hyper 1.0.0-rc.3", + "lavina-core", "mgmt-api", "nonempty", "prometheus", @@ -865,7 +866,6 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", - "sqlx", "tokio", "tokio-rustls", "tracing", @@ -873,6 +873,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "lavina-core" +version = "0.0.1-dev" +dependencies = [ + "anyhow", + "prometheus", + "serde", + "sqlx", + "tokio", + "tracing", +] + [[package]] name = "lazy_static" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index 4c90804..2e469b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ ".", + "crates/lavina-core", "crates/proto-irc", "crates/proto-xmpp", "crates/mgmt-api", @@ -22,6 +23,8 @@ regex = "1.7.1" derive_more = "0.99.17" clap = { version = "4.4.4", features = ["derive"] } serde = { version = "1.0.152", features = ["rc", "serde_derive"] } +tracing = "0.1.37" # logging & tracing api +prometheus = { version = "0.13.3", default-features = false } [package] name = "lavina" @@ -37,17 +40,17 @@ http-body-util = "0.1.0-rc.3" serde.workspace = true serde_json = "1.0.93" tokio.workspace = true -tracing = "0.1.37" # logging & tracing api +tracing.workspace = true tracing-subscriber = "0.3.16" futures-util.workspace = true -prometheus = { version = "0.13.3", default-features = false } +prometheus.workspace = true nonempty.workspace = true tokio-rustls = "0.24.1" rustls-pemfile = "1.0.2" quick-xml.workspace = true derive_more.workspace = true uuid = { version = "1.3.0", features = ["v4"] } -sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } +lavina-core = { path = "crates/lavina-core" } proto-irc = { path = "crates/proto-irc" } proto-xmpp = { path = "crates/proto-xmpp" } mgmt-api = { path = "crates/mgmt-api" } diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml new file mode 100644 index 0000000..727835c --- /dev/null +++ b/crates/lavina-core/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "lavina-core" +edition = "2021" +version.workspace = true + +[dependencies] +anyhow.workspace = true +sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } +serde.workspace = true +tokio.workspace = true +tracing.workspace = true +prometheus.workspace = true diff --git a/migrations/0_first.sql b/crates/lavina-core/migrations/0_first.sql similarity index 100% rename from migrations/0_first.sql rename to crates/lavina-core/migrations/0_first.sql diff --git a/src/core/mod.rs b/crates/lavina-core/src/lib.rs similarity index 81% rename from src/core/mod.rs rename to crates/lavina-core/src/lib.rs index 8ee26f7..c1cef53 100644 --- a/src/core/mod.rs +++ b/crates/lavina-core/src/lib.rs @@ -2,3 +2,6 @@ pub mod player; pub mod repo; pub mod room; + +mod prelude; +mod table; diff --git a/src/core/player.rs b/crates/lavina-core/src/player.rs similarity index 91% rename from src/core/player.rs rename to crates/lavina-core/src/player.rs index 1533941..763a82a 100644 --- a/src/core/player.rs +++ b/crates/lavina-core/src/player.rs @@ -19,11 +19,9 @@ use tokio::{ task::JoinHandle, }; -use crate::{ - core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}, - prelude::*, - util::table::{AnonTable, Key as AnonKey}, -}; +use crate::prelude::*; +use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; +use crate::table::{AnonTable, Key as AnonKey}; /// Opaque player identifier. Cannot contain spaces, must be shorter than 32. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -63,9 +61,7 @@ impl PlayerConnection { } pub async fn join_room(&mut self, room_id: RoomId) -> Result { - self.player_handle - .join_room(room_id, self.connection_id.clone()) - .await + self.player_handle.join_room(room_id, self.connection_id.clone()).await } pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { @@ -100,9 +96,7 @@ impl PlayerConnection { pub async fn get_rooms(&self) -> Result> { let (promise, deferred) = oneshot(); - self.player_handle - .send(PlayerCommand::GetRooms(promise)) - .await; + self.player_handle.send(PlayerCommand::GetRooms(promise)).await; Ok(deferred.await?) } } @@ -126,27 +120,14 @@ impl PlayerHandle { } } - pub async fn send_message( - &self, - room_id: RoomId, - connection_id: ConnectionId, - body: Str, - ) -> Result<()> { + pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: Str) -> Result<()> { let (promise, deferred) = oneshot(); - let cmd = Cmd::SendMessage { - room_id, - body, - promise, - }; + let cmd = Cmd::SendMessage { room_id, body, promise }; let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; Ok(deferred.await?) } - pub async fn join_room( - &self, - room_id: RoomId, - connection_id: ConnectionId, - ) -> Result { + pub async fn join_room(&self, room_id: RoomId, connection_id: ConnectionId) -> Result { let (promise, deferred) = oneshot(); let cmd = Cmd::JoinRoom { room_id, promise }; let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; @@ -230,12 +211,8 @@ pub enum Updates { #[derive(Clone)] pub struct PlayerRegistry(Arc>); impl PlayerRegistry { - pub fn empty( - room_registry: RoomRegistry, - metrics: &mut MetricsRegistry, - ) -> Result { - let metric_active_players = - IntGauge::new("chat_players_active", "Number of alive player actors")?; + pub fn empty(room_registry: RoomRegistry, metrics: &mut MetricsRegistry) -> Result { + let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; metrics.register(Box::new(metric_active_players.clone()))?; let inner = PlayerRegistryInner { room_registry, @@ -375,8 +352,7 @@ impl Player { return; } }; - room.subscribe(self.player_id.clone(), self.handle.clone()) - .await; + room.subscribe(self.player_id.clone(), self.handle.clone()).await; self.my_rooms.insert(room_id.clone(), room.clone()); let room_info = room.get_room_info().await; let _ = promise.send(JoinResult::Success(room_info)); @@ -399,15 +375,10 @@ impl Player { }; self.broadcast_update(update, connection_id).await; } - Cmd::SendMessage { - room_id, - body, - promise, - } => { + Cmd::SendMessage { room_id, body, promise } => { let room = self.rooms.get_room(&room_id).await; if let Some(room) = room { - room.send_message(self.player_id.clone(), body.clone()) - .await; + room.send_message(self.player_id.clone(), body.clone()).await; } else { tracing::info!("no room found"); } @@ -426,8 +397,7 @@ impl Player { } => { let room = self.rooms.get_room(&room_id).await; if let Some(mut room) = room { - room.set_topic(self.player_id.clone(), new_topic.clone()) - .await; + room.set_topic(self.player_id.clone(), new_topic.clone()).await; } else { tracing::info!("no room found"); } diff --git a/crates/lavina-core/src/prelude.rs b/crates/lavina-core/src/prelude.rs new file mode 100644 index 0000000..b405477 --- /dev/null +++ b/crates/lavina-core/src/prelude.rs @@ -0,0 +1,17 @@ +pub use std::future::Future; + +pub use tokio::pin; +pub use tokio::select; +pub use tokio::sync::oneshot::{channel as oneshot, Receiver as Deferred, Sender as Promise}; +pub use tokio::task::JoinHandle; + +pub mod log { + pub use tracing::{debug, error, info, warn}; +} + +pub type Result = std::result::Result; +pub type Str = std::sync::Arc; + +pub fn fail(msg: &str) -> anyhow::Error { + anyhow::Error::msg(msg.to_owned()) +} diff --git a/src/core/repo/mod.rs b/crates/lavina-core/src/repo/mod.rs similarity index 97% rename from src/core/repo/mod.rs rename to crates/lavina-core/src/repo/mod.rs index 57371ed..d24e8b5 100644 --- a/src/core/repo/mod.rs +++ b/crates/lavina-core/src/repo/mod.rs @@ -93,10 +93,10 @@ impl Storage { Ok(()) } - pub async fn close(mut self) -> Result<()> { + pub async fn close(self) -> Result<()> { let res = match Arc::try_unwrap(self.conn) { Ok(e) => e, - Err(e) => return Err(fail("failed to acquire DB ownership on shutdown")), + Err(_) => return Err(fail("failed to acquire DB ownership on shutdown")), }; let res = res.into_inner(); res.close().await?; diff --git a/src/core/room.rs b/crates/lavina-core/src/room.rs similarity index 90% rename from src/core/room.rs rename to crates/lavina-core/src/room.rs index 47826bb..5c6dbd9 100644 --- a/src/core/room.rs +++ b/crates/lavina-core/src/room.rs @@ -1,17 +1,13 @@ //! Domain of rooms — chats with multiple participants. -use std::{ - collections::HashMap, - hash::Hash, - sync::Arc, -}; +use std::{collections::HashMap, hash::Hash, sync::Arc}; use prometheus::{IntGauge, Registry as MetricRegistry}; use serde::Serialize; use tokio::sync::RwLock as AsyncRwLock; -use crate::core::player::{PlayerHandle, PlayerId, Updates}; -use crate::core::repo::Storage; +use crate::player::{PlayerHandle, PlayerId, Updates}; use crate::prelude::*; +use crate::repo::Storage; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -20,9 +16,7 @@ impl RoomId { pub fn from(str: impl Into) -> Result { let bytes = str.into(); if bytes.len() > 32 { - return Err(anyhow::Error::msg( - "Room name cannot be longer than 32 symbols", - )); + return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); } if bytes.contains(' ') { return Err(anyhow::Error::msg("Room name cannot contain spaces")); @@ -42,8 +36,7 @@ impl RoomId { pub struct RoomRegistry(Arc>); impl RoomRegistry { pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result { - let metric_active_rooms = - IntGauge::new("chat_rooms_active", "Number of alive room actors")?; + let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; let inner = RoomRegistryInner { rooms: HashMap::new(), @@ -140,7 +133,7 @@ impl RoomHandle { pub async fn send_message(&self, player_id: PlayerId, body: Str) { let mut lock = self.0.write().await; - let res = lock.send_message(player_id, body).await; + let res = lock.send_message(player_id, body).await; if let Err(err) = res { log::warn!("Failed to send message: {err:?}"); } @@ -150,11 +143,7 @@ impl RoomHandle { let lock = self.0.read().await; RoomInfo { id: lock.room_id.clone(), - members: lock - .subscriptions - .keys() - .map(|x| x.clone()) - .collect::>(), + members: lock.subscriptions.keys().map(|x| x.clone()).collect::>(), topic: lock.topic.clone(), } } @@ -191,7 +180,9 @@ impl Room { async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> { tracing::info!("Adding a message to room"); - self.storage.insert_message(self.storage_id, self.message_count, &body).await?; + self.storage + .insert_message(self.storage_id, self.message_count, &body) + .await?; self.message_count += 1; let update = Updates::NewMessage { room_id: self.room_id.clone(), diff --git a/src/util/table.rs b/crates/lavina-core/src/table.rs similarity index 100% rename from src/util/table.rs rename to crates/lavina-core/src/table.rs diff --git a/src/main.rs b/src/main.rs index ddaab27..c23a7c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,6 @@ impl_trait_in_assoc_type )] -mod core; mod prelude; mod projections; mod util; @@ -19,9 +18,9 @@ use figment::{providers::Toml, Figment}; use prometheus::Registry as MetricsRegistry; use serde::Deserialize; -use crate::core::player::PlayerRegistry; -use crate::core::repo::Storage; -use crate::core::room::RoomRegistry; +use lavina_core::player::PlayerRegistry; +use lavina_core::repo::Storage; +use lavina_core::room::RoomRegistry; use crate::prelude::*; #[derive(Deserialize, Debug)] @@ -29,7 +28,7 @@ struct ServerConfig { telemetry: util::telemetry::ServerConfig, irc: projections::irc::ServerConfig, xmpp: projections::xmpp::ServerConfig, - storage: core::repo::StorageConfig, + storage: lavina_core::repo::StorageConfig, } #[derive(Parser)] diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index a98394b..1890890 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -12,9 +12,9 @@ use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::channel; -use crate::core::player::*; -use crate::core::repo::Storage; -use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; +use lavina_core::player::*; +use lavina_core::repo::Storage; +use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; use crate::prelude::*; use proto_irc::client::{client_message, ClientMessage}; use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; diff --git a/src/projections/xmpp/mod.rs b/src/projections/xmpp/mod.rs index db19455..5b9e035 100644 --- a/src/projections/xmpp/mod.rs +++ b/src/projections/xmpp/mod.rs @@ -19,8 +19,8 @@ use tokio::sync::mpsc::channel; use tokio_rustls::rustls::{Certificate, PrivateKey}; use tokio_rustls::TlsAcceptor; -use crate::core::player::{PlayerConnection, PlayerId, PlayerRegistry}; -use crate::core::room::{RoomId, RoomRegistry}; +use lavina_core::player::{PlayerConnection, PlayerId, PlayerRegistry}; +use lavina_core::room::{RoomId, RoomRegistry}; use crate::prelude::*; use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; use proto_xmpp::client::{Iq, Message, MessageType, Presence}; @@ -332,7 +332,7 @@ async fn socket_final( update = user_handle.receiver.recv() => { if let Some(update) = update { match update { - crate::core::player::Updates::NewMessage { room_id, author_id, body } => { + lavina_core::player::Updates::NewMessage { room_id, author_id, body } => { Message { to: Some(Jid { name: Some(authenticated.xmpp_name.clone()), diff --git a/src/util/mod.rs b/src/util/mod.rs index fb2f952..b3d7d82 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,6 +1,5 @@ use crate::prelude::*; -pub mod table; pub mod telemetry; #[cfg(test)] pub mod testkit; diff --git a/src/util/telemetry.rs b/src/util/telemetry.rs index 3bb5ec9..92006ad 100644 --- a/src/util/telemetry.rs +++ b/src/util/telemetry.rs @@ -11,8 +11,8 @@ use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; -use crate::core::repo::Storage; -use crate::core::room::RoomRegistry; +use lavina_core::repo::Storage; +use lavina_core::room::RoomRegistry; use crate::prelude::*; use crate::util::Terminator;