diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..3feddeb --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +* +!/src/ +!/migrations/ +!Cargo.lock +!Cargo.toml +!rust-toolchain diff --git a/.gitea/workflows/test-pr.yml b/.gitea/workflows/test-pr.yml new file mode 100644 index 0000000..3fabe6a --- /dev/null +++ b/.gitea/workflows/test-pr.yml @@ -0,0 +1,18 @@ +name: check-and-test +on: [push, pull_request] +jobs: + check-and-test: + runs-on: ubuntu-latest + steps: + - name: git checkout repository + uses: actions/checkout@v4 + - name: setup rust + uses: https://github.com/actions-rs/toolchain@v1 + - name: cargo check + uses: https://github.com/actions-rs/cargo@v1 + with: + command: check + - name: test + uses: https://github.com/actions-rs/cargo@v1 + with: + command: test diff --git a/.gitignore b/.gitignore index e2c5d98..ce1cc97 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target /db.sqlite +.idea/ diff --git a/Cargo.lock b/Cargo.lock index 4bb1503..9f9e7f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,54 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +[[package]] +name = "anstream" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" + +[[package]] +name = "anstyle-parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +dependencies = [ + "anstyle", + "windows-sys", +] + [[package]] name = "anyhow" version = "1.0.75" @@ -173,6 +221,52 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clap" +version = "4.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0862016ff20d69b84ef8247369fabf5c008a7417002411897d40ee1f4532b873" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "clap_lex" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" + +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "const-oid" version = "0.9.5" @@ -778,31 +872,44 @@ dependencies = [ [[package]] name = "lavina" -version = "0.1.0" +version = "0.0.1-dev" dependencies = [ "anyhow", "assert_matches", "async-scoped", + "clap", "derive_more", "figment", "futures-util", "http-body-util", "hyper 1.0.0-rc.3", - "lazy_static", - "nom", + "lavina-core", + "mgmt-api", + "nonempty", + "projection-irc", + "projection-xmpp", "prometheus", - "quick-xml", "regex", "reqwest", - "rustls-pemfile", "serde", "serde_json", - "sqlx", "tokio", - "tokio-rustls", "tracing", "tracing-subscriber", - "uuid", +] + +[[package]] +name = "lavina-core" +version = "0.0.1-dev" +dependencies = [ + "anyhow", + "async-scoped", + "futures-util", + "prometheus", + "serde", + "sqlx", + "tokio", + "tracing", ] [[package]] @@ -874,6 +981,13 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "mgmt-api" +version = "0.0.1-dev" +dependencies = [ + "serde", +] + [[package]] name = "mime" version = "0.3.17" @@ -916,6 +1030,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonempty" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeaf4ad7403de93e699c191202f017118df734d3850b01e13a3a8b2e6953d3c9" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1159,6 +1279,40 @@ dependencies = [ "yansi", ] +[[package]] +name = "projection-irc" +version = "0.0.1-dev" +dependencies = [ + "anyhow", + "futures-util", + "lavina-core", + "nonempty", + "prometheus", + "proto-irc", + "serde", + "tokio", + "tracing", +] + +[[package]] +name = "projection-xmpp" +version = "0.0.1-dev" +dependencies = [ + "anyhow", + "derive_more", + "futures-util", + "lavina-core", + "prometheus", + "proto-xmpp", + "quick-xml", + "rustls-pemfile", + "serde", + "tokio", + "tokio-rustls", + "tracing", + "uuid", +] + [[package]] name = "prometheus" version = "0.13.3" @@ -1173,6 +1327,30 @@ dependencies = [ "thiserror", ] +[[package]] +name = "proto-irc" +version = "0.0.1-dev" +dependencies = [ + "anyhow", + "assert_matches", + "futures-util", + "nom", + "nonempty", + "tokio", +] + +[[package]] +name = "proto-xmpp" +version = "0.0.1-dev" +dependencies = [ + "anyhow", + "derive_more", + "lazy_static", + "quick-xml", + "regex", + "tokio", +] + [[package]] name = "quick-xml" version = "0.30.0" @@ -1632,19 +1810,14 @@ dependencies = [ "once_cell", "paste", "percent-encoding", - "rustls", - "rustls-pemfile", "serde", "serde_json", "sha2", "smallvec", "sqlformat", "thiserror", - "tokio", - "tokio-stream", "tracing", "url", - "webpki-roots", ] [[package]] @@ -1681,7 +1854,6 @@ dependencies = [ "sqlx-sqlite", "syn 1.0.109", "tempfile", - "tokio", "url", ] @@ -1798,6 +1970,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "subtle" version = "2.5.0" @@ -1924,17 +2102,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.8" @@ -2119,6 +2286,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "uuid" version = "1.4.1" @@ -2237,15 +2410,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-roots" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b291546d5d9d1eab74f069c77749f2cb8504a12caa20f0f2de93ddbf6f411888" -dependencies = [ - "rustls-webpki", -] - [[package]] name = "whoami" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 8950494..d11b917 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,33 +1,62 @@ +[workspace] +members = [ + ".", + "crates/lavina-core", + "crates/proto-irc", + "crates/projection-irc", + "crates/proto-xmpp", + "crates/mgmt-api", +] + +[workspace.package] +version = "0.0.1-dev" + +[workspace.dependencies] +nom = "7.1.3" +assert_matches = "1.5.0" +tokio = { version = "1.24.1", features = ["full"] } # async runtime +futures-util = "0.3.25" +anyhow = "1.0.68" # error utils +nonempty = "0.8.1" +quick-xml = { version = "0.30.0", features = ["async-tokio"] } +lazy_static = "1.4.0" +regex = "1.7.1" +derive_more = "0.99.17" +clap = { version = "4.4.4", features = ["derive"] } +serde = { version = "1.0.152", features = ["rc", "serde_derive"] } +tracing = "0.1.37" # logging & tracing api +prometheus = { version = "0.13.3", default-features = false } +lavina-core = { path = "crates/lavina-core" } +async-scoped = { version = "0.7.1", features = ["use-tokio"] } + [package] name = "lavina" -version = "0.1.0" +version.workspace = true edition = "2021" publish = false [dependencies] -anyhow = "1.0.68" # error utils +anyhow.workspace = true figment = { version = "0.10.8", features = ["env", "toml"] } # configuration files hyper = { version = "1.0.0-rc.3,<1.0.0-rc.4", features = ["server", "http1"] } # http server http-body-util = "0.1.0-rc.3" -serde = { version = "1.0.152", features = ["rc", "serde_derive"] } +serde.workspace = true serde_json = "1.0.93" -tokio = { version = "1.24.1", features = ["full"] } # async runtime -tracing = "0.1.37" # logging & tracing api +tokio.workspace = true +tracing.workspace = true tracing-subscriber = "0.3.16" -futures-util = "0.3.25" -prometheus = { version = "0.13.3", default-features = false } -regex = "1.7.1" -lazy_static = "1.4.0" -nom = "7.1.3" -tokio-rustls = "0.24.1" -rustls-pemfile = "1.0.2" -quick-xml = { version = "0.30.0", features = ["async-tokio"] } -derive_more = "0.99.17" -uuid = { version = "1.3.0", features = ["v4"] } -sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "runtime-tokio-rustls", "migrate"] } -async-scoped = { version = "0.7.1", features = ["use-tokio"] } +futures-util.workspace = true +prometheus.workspace = true +nonempty.workspace = true +derive_more.workspace = true +lavina-core.workspace = true +projection-irc = { path = "crates/projection-irc" } +projection-xmpp = { path = "crates/projection-xmpp" } +mgmt-api = { path = "crates/mgmt-api" } +clap.workspace = true +async-scoped.workspace = true [dev-dependencies] -assert_matches = "1.5.0" +assert_matches.workspace = true regex = "1.7.1" reqwest = { version = "0.11", default-features = false } diff --git a/crates/README.md b/crates/README.md new file mode 100644 index 0000000..04909f2 --- /dev/null +++ b/crates/README.md @@ -0,0 +1,30 @@ +## Dependency diagram of the project + +```mermaid +graph TD; + lavina-->mgmt-api; + lavina-->projection-irc; + lavina-->projection-xmpp; + lavina-->lavina-core; + + projection-irc-->proto-irc; + projection-irc-->lavina-core; + + projection-xmpp-->proto-xmpp; + projection-xmpp-->lavina-core; + + sim-irc-->proto-irc; + sim-irc-->mgmt-api; + + sim-xmpp-->proto-xmpp; + sim-xmpp-->mgmt-api; + + workspace-->lavina; + workspace-->sim-irc; + workspace-->sim-xmpp; +``` + +A few rules: +- Only projections should be direct deps of `lavina`, there is no need to depend on `proto-*` crates. +- On the other hand, projections should not be dependencies of `sim-*` crates. +- `lavina-core` does not depend on protocol-specific crates. diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml new file mode 100644 index 0000000..eb63f5f --- /dev/null +++ b/crates/lavina-core/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "lavina-core" +edition = "2021" +version.workspace = true + +[dependencies] +anyhow.workspace = true +sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } +serde.workspace = true +tokio.workspace = true +tracing.workspace = true +prometheus.workspace = true +futures-util.workspace = true +async-scoped.workspace = true diff --git a/migrations/0_first.sql b/crates/lavina-core/migrations/0_first.sql similarity index 100% rename from migrations/0_first.sql rename to crates/lavina-core/migrations/0_first.sql diff --git a/src/core/mod.rs b/crates/lavina-core/src/lib.rs similarity index 68% rename from src/core/mod.rs rename to crates/lavina-core/src/lib.rs index 8ee26f7..401e49e 100644 --- a/src/core/mod.rs +++ b/crates/lavina-core/src/lib.rs @@ -1,4 +1,8 @@ //! Domain definitions and implementation of common chat logic. pub mod player; +pub mod prelude; pub mod repo; pub mod room; +pub mod terminator; + +mod table; diff --git a/src/core/player.rs b/crates/lavina-core/src/player.rs similarity index 91% rename from src/core/player.rs rename to crates/lavina-core/src/player.rs index 239b4ea..4c40b82 100644 --- a/src/core/player.rs +++ b/crates/lavina-core/src/player.rs @@ -15,9 +15,9 @@ use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use crate::util::table::{AnonTable, Key as AnonKey}; use crate::prelude::*; -use crate::core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; +use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; +use crate::table::{AnonTable, Key as AnonKey}; /// Opaque player identifier. Cannot contain spaces, must be shorter than 32. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -57,9 +57,7 @@ impl PlayerConnection { } pub async fn join_room(&mut self, room_id: RoomId) -> Result { - self.player_handle - .join_room(room_id, self.connection_id.clone()) - .await + self.player_handle.join_room(room_id, self.connection_id.clone()).await } pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { @@ -94,9 +92,7 @@ impl PlayerConnection { pub async fn get_rooms(&self) -> Result> { let (promise, deferred) = oneshot(); - self.player_handle - .send(PlayerCommand::GetRooms(promise)) - .await; + self.player_handle.send(PlayerCommand::GetRooms(promise)).await; Ok(deferred.await?) } } @@ -120,27 +116,14 @@ impl PlayerHandle { } } - pub async fn send_message( - &self, - room_id: RoomId, - connection_id: ConnectionId, - body: Str, - ) -> Result<()> { + pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: Str) -> Result<()> { let (promise, deferred) = oneshot(); - let cmd = Cmd::SendMessage { - room_id, - body, - promise, - }; + let cmd = Cmd::SendMessage { room_id, body, promise }; let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; Ok(deferred.await?) } - pub async fn join_room( - &self, - room_id: RoomId, - connection_id: ConnectionId, - ) -> Result { + pub async fn join_room(&self, room_id: RoomId, connection_id: ConnectionId) -> Result { let (promise, deferred) = oneshot(); let cmd = Cmd::JoinRoom { room_id, promise }; let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; @@ -229,12 +212,8 @@ pub enum Updates { /// Handle to a player registry — a shared data structure containing information about players. pub struct PlayerRegistry<'a>(RwLock>); impl<'a> PlayerRegistry<'a> { - pub fn empty( - room_registry: &'a RoomRegistry<'a>, - metrics: &MetricsRegistry, - ) -> Result> { - let metric_active_players = - IntGauge::new("chat_players_active", "Number of alive player actors")?; + pub fn empty(room_registry: &'a RoomRegistry<'a>, metrics: &MetricsRegistry) -> Result> { + let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; metrics.register(Box::new(metric_active_players.clone()))?; let scope = unsafe { Scope::create() }; let inner = PlayerRegistryInner { @@ -303,7 +282,7 @@ impl<'a> Player<'a> { player_id, connections: AnonTable::new(), my_rooms: HashMap::new(), - banned_from: HashSet::from([RoomId::from("empty").unwrap()]), + banned_from: HashSet::from([RoomId::from("Empty").unwrap()]), rx, handle, rooms, @@ -349,7 +328,9 @@ impl<'a> Player<'a> { } } PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, - PlayerCommand::Stop => { break; } + PlayerCommand::Stop => { + break; + } } } log::debug!("Shutting down player actor #{:?}", self.player_id); @@ -377,8 +358,7 @@ impl<'a> Player<'a> { return; } }; - room.subscribe(self.player_id.clone(), self.handle.clone()) - .await; + room.subscribe(self.player_id.clone(), self.handle.clone()).await; self.my_rooms.insert(room_id.clone(), room.clone()); let room_info = room.get_room_info().await; let _ = promise.send(JoinResult::Success(room_info)); @@ -401,15 +381,10 @@ impl<'a> Player<'a> { }; self.broadcast_update(update, connection_id).await; } - Cmd::SendMessage { - room_id, - body, - promise, - } => { + Cmd::SendMessage { room_id, body, promise } => { let room = self.rooms.get_room(&room_id).await; if let Some(room) = room { - room.send_message(self.player_id.clone(), body.clone()) - .await; + room.send_message(self.player_id.clone(), body.clone()).await; } else { tracing::info!("no room found"); } @@ -428,8 +403,7 @@ impl<'a> Player<'a> { } => { let room = self.rooms.get_room(&room_id).await; if let Some(mut room) = room { - room.set_topic(self.player_id.clone(), new_topic.clone()) - .await; + room.set_topic(self.player_id.clone(), new_topic.clone()).await; } else { tracing::info!("no room found"); } diff --git a/src/prelude.rs b/crates/lavina-core/src/prelude.rs similarity index 82% rename from src/prelude.rs rename to crates/lavina-core/src/prelude.rs index e01332d..930be34 100644 --- a/src/prelude.rs +++ b/crates/lavina-core/src/prelude.rs @@ -16,12 +16,4 @@ pub fn fail(msg: &str) -> anyhow::Error { anyhow::Error::msg(msg.to_owned()) } -macro_rules! ffail { - ($($arg:tt)*) => { - fail(&format!($($arg)*)) - }; -} - -pub(crate) use ffail; - pub type Scope<'a> = async_scoped::Scope<'a, (), async_scoped::Tokio>; diff --git a/src/core/repo/mod.rs b/crates/lavina-core/src/repo/mod.rs similarity index 65% rename from src/core/repo/mod.rs rename to crates/lavina-core/src/repo/mod.rs index dfd4042..27b7339 100644 --- a/src/core/repo/mod.rs +++ b/crates/lavina-core/src/repo/mod.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use serde::Deserialize; use sqlx::sqlite::SqliteConnectOptions; -use sqlx::{ConnectOptions, Connection, FromRow, SqliteConnection}; +use sqlx::{ConnectOptions, Connection, FromRow, Sqlite, SqliteConnection, Transaction}; use tokio::sync::Mutex; use crate::prelude::*; @@ -96,6 +96,50 @@ impl Storage { res.close().await?; Ok(()) } + + pub async fn create_user(&self, name: &str) -> Result<()> { + let query = sqlx::query( + "insert into users(name) + values (?);", + ) + .bind(name); + let mut executor = self.conn.lock().await; + query.execute(&mut *executor).await?; + + Ok(()) + } + + pub async fn set_password<'a>(&'a self, name: &'a str, pwd: &'a str) -> Result> { + async fn inner(txn: &mut Transaction<'_, Sqlite>, name: &str, pwd: &str) -> Result> { + let id: Option<(u32,)> = sqlx::query_as("select * from users where name = ? limit 1;") + .bind(name) + .fetch_optional(&mut **txn) + .await?; + let Some((id,)) = id else { + return Ok(None); + }; + sqlx::query("insert or replace into challenges_plain_password(user_id, password) values (?, ?);") + .bind(id) + .bind(pwd) + .execute(&mut **txn) + .await?; + Ok(Some(())) + } + + let mut executor = self.conn.lock().await; + let mut tx = executor.begin().await?; + let res = inner(&mut tx, name, pwd).await; + match res { + Ok(e) => { + tx.commit().await?; + Ok(e) + } + Err(e) => { + tx.rollback().await?; + Err(e) + } + } + } } #[derive(FromRow)] diff --git a/src/core/room.rs b/crates/lavina-core/src/room.rs similarity index 90% rename from src/core/room.rs rename to crates/lavina-core/src/room.rs index 68026b2..9311432 100644 --- a/src/core/room.rs +++ b/crates/lavina-core/src/room.rs @@ -1,17 +1,13 @@ //! Domain of rooms — chats with multiple participants. -use std::{ - collections::HashMap, - hash::Hash, - sync::Arc, -}; +use std::{collections::HashMap, hash::Hash, sync::Arc}; use prometheus::{IntGauge, Registry as MetricRegistry}; use serde::Serialize; use tokio::sync::RwLock as AsyncRwLock; -use crate::core::player::{PlayerHandle, PlayerId, Updates}; -use crate::core::repo::Storage; +use crate::player::{PlayerHandle, PlayerId, Updates}; use crate::prelude::*; +use crate::repo::Storage; /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] @@ -20,9 +16,7 @@ impl RoomId { pub fn from(str: impl Into) -> Result { let bytes = str.into(); if bytes.len() > 32 { - return Err(anyhow::Error::msg( - "Room name cannot be longer than 32 symbols", - )); + return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); } if bytes.contains(' ') { return Err(anyhow::Error::msg("Room name cannot contain spaces")); @@ -41,8 +35,7 @@ impl RoomId { pub struct RoomRegistry<'a>(AsyncRwLock>); impl<'a> RoomRegistry<'a> { pub fn new(metrics: &mut MetricRegistry, storage: &'a Storage) -> Result> { - let metric_active_rooms = - IntGauge::new("chat_rooms_active", "Number of alive room actors")?; + let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; let inner = RoomRegistryInner { rooms: HashMap::new(), @@ -139,7 +132,7 @@ impl<'a> RoomHandle<'a> { pub async fn send_message(&self, player_id: PlayerId, body: Str) { let mut lock = self.0.write().await; - let res = lock.send_message(player_id, body).await; + let res = lock.send_message(player_id, body).await; if let Err(err) = res { log::warn!("Failed to send message: {err:?}"); } @@ -149,11 +142,7 @@ impl<'a> RoomHandle<'a> { let lock = self.0.read().await; RoomInfo { id: lock.room_id.clone(), - members: lock - .subscriptions - .keys() - .map(|x| x.clone()) - .collect::>(), + members: lock.subscriptions.keys().map(|x| x.clone()).collect::>(), topic: lock.topic.clone(), } } @@ -190,7 +179,9 @@ impl<'a> Room<'a> { async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> { tracing::info!("Adding a message to room"); - self.storage.insert_message(self.storage_id, self.message_count, &body).await?; + self.storage + .insert_message(self.storage_id, self.message_count, &body) + .await?; self.message_count += 1; let update = Updates::NewMessage { room_id: self.room_id.clone(), diff --git a/src/util/table.rs b/crates/lavina-core/src/table.rs similarity index 96% rename from src/util/table.rs rename to crates/lavina-core/src/table.rs index 2d93866..92d2ad7 100644 --- a/src/util/table.rs +++ b/crates/lavina-core/src/table.rs @@ -21,7 +21,7 @@ impl AnonTable { pub fn insert(&mut self, value: V) -> Key { let id = self.next; self.next += 1; - self.inner.insert(id, value); // should be always empty + self.inner.insert(id, value); // should be always Empty Key(id) } diff --git a/src/util/mod.rs b/crates/lavina-core/src/terminator.rs similarity index 89% rename from src/util/mod.rs rename to crates/lavina-core/src/terminator.rs index 0593dbd..8abbf58 100644 --- a/src/util/mod.rs +++ b/crates/lavina-core/src/terminator.rs @@ -1,12 +1,5 @@ use crate::prelude::*; -pub mod http; -pub mod table; -pub mod telemetry; -#[cfg(test)] -pub mod testkit; -pub mod xml; - pub struct Terminator { signal: Promise<()>, completion: JoinHandle>, diff --git a/crates/mgmt-api/Cargo.toml b/crates/mgmt-api/Cargo.toml new file mode 100644 index 0000000..030f3cf --- /dev/null +++ b/crates/mgmt-api/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "mgmt-api" +edition = "2021" +version.workspace = true +publish = false + +[dependencies] +serde.workspace = true diff --git a/crates/mgmt-api/src/lib.rs b/crates/mgmt-api/src/lib.rs new file mode 100644 index 0000000..cfe5b69 --- /dev/null +++ b/crates/mgmt-api/src/lib.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct ErrorResponse<'a> { + pub code: &'a str, + pub message: &'a str, +} + +#[derive(Serialize, Deserialize)] +pub struct CreatePlayerRequest<'a> { + pub name: &'a str, +} + +#[derive(Serialize, Deserialize)] +pub struct ChangePasswordRequest<'a> { + pub player_name: &'a str, + pub password: &'a str, +} + +pub mod paths { + pub const CREATE_PLAYER: &'static str = "/mgmt/create_player"; + pub const SET_PASSWORD: &'static str = "/mgmt/set_password"; +} + +pub mod errors { + pub const INVALID_PATH: &'static str = "invalid_path"; + pub const MALFORMED_REQUEST: &'static str = "malformed_request"; + pub const PLAYER_NOT_FOUND: &'static str = "player_not_found"; +} diff --git a/crates/projection-irc/Cargo.toml b/crates/projection-irc/Cargo.toml new file mode 100644 index 0000000..e8166f0 --- /dev/null +++ b/crates/projection-irc/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "projection-irc" +edition = "2021" +version.workspace = true + +[dependencies] +lavina-core.workspace = true +tracing.workspace = true +anyhow.workspace = true +serde.workspace = true +tokio.workspace = true +prometheus.workspace = true +futures-util.workspace = true + +nonempty.workspace = true +proto-irc = { path = "../proto-irc" } diff --git a/src/projections/irc/mod.rs b/crates/projection-irc/src/lib.rs similarity index 90% rename from src/projections/irc/mod.rs rename to crates/projection-irc/src/lib.rs index 2c020d0..8b22e53 100644 --- a/src/projections/irc/mod.rs +++ b/crates/projection-irc/src/lib.rs @@ -1,7 +1,11 @@ use std::collections::HashMap; use std::net::SocketAddr; +use anyhow::{anyhow, Result}; +use futures_util::future::join_all; use futures_util::FutureExt; +use nonempty::nonempty; +use nonempty::NonEmpty; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use serde::Deserialize; use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; @@ -9,16 +13,14 @@ use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::channel; -use crate::core::player::*; -use crate::core::repo::Storage; -use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; -use crate::prelude::*; -use crate::protos::irc::client::{client_message, ClientMessage}; -use crate::protos::irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; -use crate::protos::irc::{Chan, Recipient}; - -#[cfg(test)] -mod test; +use lavina_core::player::*; +use lavina_core::prelude::*; +use lavina_core::repo::Storage; +use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; +use proto_irc::client::{client_message, ClientMessage}; +use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; +use proto_irc::user::PrefixedNick; +use proto_irc::{Chan, Recipient}; #[derive(Deserialize, Debug, Clone)] pub struct ServerConfig { @@ -47,6 +49,7 @@ async fn handle_socket( termination: Deferred<()>, // TODO use it to stop the connection gracefully storage: &Storage, ) -> Result<()> { + log::info!("Received an IRC connection from {socket_addr}"); let (reader, writer) = stream.split(); let mut reader: BufReader = BufReader::new(reader); let mut writer = BufWriter::new(writer); @@ -64,15 +67,16 @@ async fn handle_socket( .await?; writer.flush().await?; - let registered_user: Result = - handle_registration(&mut reader, &mut writer, &storage).await; + let registered_user: Result = handle_registration(&mut reader, &mut writer, &storage).await; match registered_user { Ok(user) => { - handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user) - .await?; + log::debug!("User registered"); + handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?; + } + Err(_) => { + log::debug!("Registration failed"); } - Err(_) => {} } stream.shutdown().await?; @@ -150,23 +154,22 @@ async fn handle_registration<'a>( buffer.clear(); }?; - let stored_user = storage.retrieve_user_by_name(&*user.nickname).await?; let stored_user = match stored_user { Some(u) => u, None => { log::info!("User '{}' not found", user.nickname); - return Err(fail("no user found")); + return Err(anyhow!("no user found")); } }; if stored_user.password.is_none() { log::info!("Password not defined for user '{}'", user.nickname); - return Err(fail("password is not defined")); + return Err(anyhow!("password is not defined")); } if stored_user.password.as_deref() != pass.as_deref() { log::info!("Incorrect password supplied for user '{}'", user.nickname); - return Err(fail("passwords do not match")); + return Err(anyhow!("passwords do not match")); } // TODO properly implement session temination @@ -241,14 +244,7 @@ async fn handle_registered_socket<'a>( let rooms_list = connection.get_rooms().await?; for room in &rooms_list { - produce_on_join_cmd_messages( - &config, - &user, - &Chan::Global(room.id.as_inner().clone()), - room, - writer, - ) - .await?; + produce_on_join_cmd_messages(&config, &user, &Chan::Global(room.id.as_inner().clone()), room, writer).await?; } writer.flush().await?; @@ -305,10 +301,7 @@ async fn handle_update( ) -> Result<()> { log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); match update { - Updates::RoomJoined { - new_member_id, - room_id, - } => { + Updates::RoomJoined { new_member_id, room_id } => { if player_id == &new_member_id { if let Some(room) = rooms.get_room(&room_id).await { let room_info = room.get_room_info().await; @@ -429,16 +422,14 @@ async fn handle_incoming_message( Recipient::Chan(Chan::Global(chan)) => { let room_id = RoomId::from(chan)?; user_handle.send_message(room_id, body).await?; - }, + } _ => log::warn!("Unsupported target type"), }, ClientMessage::Topic { chan, topic } => { match chan { Chan::Global(chan) => { let room_id = RoomId::from(chan)?; - user_handle - .change_topic(room_id.clone(), topic.clone()) - .await?; + user_handle.change_topic(room_id.clone(), topic.clone()).await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), @@ -527,7 +518,17 @@ async fn handle_incoming_message( .await?; writer.flush().await?; } else { - // TODO send 502 (not 401) if the user is not the sender + ServerMessage { + tags: vec![], + sender: Some(config.server_name.clone()), + body: ServerMessageBody::N502UsersDontMatch { + client: user.nickname.clone(), + message: "Cant change mode for other users".into(), + }, + } + .write_async(writer) + .await?; + writer.flush().await?; } } Recipient::Chan(_) => { @@ -550,11 +551,7 @@ async fn handle_incoming_message( Ok(HandleResult::Continue) } -fn user_to_who_msg( - config: &ServerConfig, - requestor: &RegisteredUser, - target_user_nickname: &Str, -) -> ServerMessageBody { +fn user_to_who_msg(config: &ServerConfig, requestor: &RegisteredUser, target_user_nickname: &Str) -> ServerMessageBody { // Username is equal to nickname let username = format!("~{target_user_nickname}").into(); @@ -655,22 +652,21 @@ async fn produce_on_join_cmd_messages( } .write_async(writer) .await?; - let mut members: String = if let Some(head) = room_info.members.first() { - head.as_inner().clone() - } else { - user.nickname.clone() - }.as_ref().into(); - for i in &room_info.members[1..] { - members.push(' '); - members.push_str(i.as_inner()); - } + let prefixed_members: Vec = room_info + .members + .iter() + .map(|member| PrefixedNick::from_str(member.clone().into_inner())) + .collect(); + let non_empty_members: NonEmpty = + NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); + ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), body: ServerMessageBody::N353NamesReply { client: user.nickname.clone(), chan: chan.clone(), - members: members.into(), + members: non_empty_members.into(), }, } .write_async(writer) @@ -698,12 +694,8 @@ pub async fn launch<'a>( ) -> Result> { log::info!("Starting IRC projection"); let (stopped_tx, mut stopped_rx) = channel(32); - let current_connections = - IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; - let total_connections = IntCounter::new( - "irc_total_connections", - "Total number of opened connections", - )?; + let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; + let total_connections = IntCounter::new("irc_total_connections", "Total number of opened connections")?; metrics.register(Box::new(current_connections.clone()))?; metrics.register(Box::new(total_connections.clone()))?; @@ -764,7 +756,7 @@ pub async fn launch<'a>( for (socket_addr, terminator) in actors { match terminator.send(()) { Ok(_) => log::debug!("Stopping IRC connection at {socket_addr}"), - Err(_) => log::debug!("IRC connection at {socket_addr} already stopped") + Err(_) => log::debug!("IRC connection at {socket_addr} already stopped"), } } let _ = scope.collect().await; diff --git a/crates/projection-xmpp/Cargo.toml b/crates/projection-xmpp/Cargo.toml new file mode 100644 index 0000000..c69cb12 --- /dev/null +++ b/crates/projection-xmpp/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "projection-xmpp" +edition = "2021" +version.workspace = true + +[dependencies] +lavina-core.workspace = true +tracing.workspace = true +anyhow.workspace = true +serde.workspace = true +tokio.workspace = true +prometheus.workspace = true +futures-util.workspace = true + +quick-xml.workspace = true +proto-xmpp = { path = "../proto-xmpp" } +uuid = { version = "1.3.0", features = ["v4"] } +tokio-rustls = "0.24.1" +rustls-pemfile = "1.0.2" +derive_more.workspace = true diff --git a/src/projections/xmpp/mod.rs b/crates/projection-xmpp/src/lib.rs similarity index 90% rename from src/projections/xmpp/mod.rs rename to crates/projection-xmpp/src/lib.rs index a5b94d8..060819c 100644 --- a/src/projections/xmpp/mod.rs +++ b/crates/projection-xmpp/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(generators, generator_trait, type_alias_impl_trait, impl_trait_in_assoc_type)] + mod proto; use std::collections::HashMap; @@ -7,6 +9,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use anyhow::anyhow; use futures_util::FutureExt; use prometheus::Registry as MetricsRegistry; use quick_xml::events::{BytesDecl, Event}; @@ -19,17 +22,17 @@ use tokio::sync::mpsc::channel; use tokio_rustls::rustls::{Certificate, PrivateKey}; use tokio_rustls::TlsAcceptor; -use crate::core::player::{PlayerConnection, PlayerId, PlayerRegistry}; -use crate::core::room::{RoomId, RoomRegistry}; -use crate::prelude::*; -use crate::protos::xmpp; -use crate::protos::xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; -use crate::protos::xmpp::client::{Iq, Message, MessageType, Presence}; -use crate::protos::xmpp::disco::*; -use crate::protos::xmpp::roster::RosterQuery; -use crate::protos::xmpp::session::Session; -use crate::protos::xmpp::stream::*; -use crate::util::xml::{Continuation, FromXml, Parser, ToXml}; +use lavina_core::player::{PlayerConnection, PlayerId, PlayerRegistry}; +use lavina_core::prelude::*; +use lavina_core::room::{RoomId, RoomRegistry}; +use lavina_core::terminator::Terminator; +use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; +use proto_xmpp::client::{Iq, Message, MessageType, Presence}; +use proto_xmpp::disco::*; +use proto_xmpp::roster::RosterQuery; +use proto_xmpp::session::Session; +use proto_xmpp::stream::*; +use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; use self::proto::{ClientPacket, IqClientBody}; @@ -122,7 +125,7 @@ pub async fn launch<'a>( for (socket_addr, terminator) in actors { match terminator.send(()) { Ok(_) => log::debug!("Stopping XMPP connection at {socket_addr}"), - Err(_) => log::debug!("XMPP connection at {socket_addr} already stopped") + Err(_) => log::debug!("XMPP connection at {socket_addr} already stopped"), } } let _ = scope.collect().await; @@ -143,7 +146,7 @@ async fn handle_socket( rooms: &RoomRegistry<'_>, termination: Deferred<()>, // TODO use it to stop the connection gracefully ) -> Result<()> { - log::debug!("Received an XMPP connection from {socket_addr}"); + log::info!("Received an XMPP connection from {socket_addr}"); let mut reader_buf = vec![]; let (reader, writer) = stream.split(); let mut buf_reader = BufReader::new(reader); @@ -166,9 +169,8 @@ async fn handle_socket( let mut xml_writer = Writer::new(b); let authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf).await?; - let mut connection = players - .connect_to_player(authenticated.player_id.clone()) - .await; + log::debug!("User authenticated"); + let mut connection = players.connect_to_player(authenticated.player_id.clone()).await; socket_final( &mut xml_reader, &mut xml_writer, @@ -190,7 +192,7 @@ async fn socket_force_tls( writer: &mut (impl AsyncWrite + Unpin), reader_buf: &mut Vec, ) -> Result<()> { - use crate::protos::xmpp::tls::*; + use proto_xmpp::tls::*; let xml_reader = &mut NsReader::from_reader(reader); let xml_writer = &mut Writer::new(writer); read_xml_header(xml_reader, reader_buf).await?; @@ -247,8 +249,8 @@ async fn socket_auth( .await?; xml_writer.get_mut().flush().await?; - let _ = xmpp::sasl::Auth::parse(xml_reader, reader_buf).await?; - xmpp::sasl::Success.write_xml(xml_writer).await?; + let _ = proto_xmpp::sasl::Auth::parse(xml_reader, reader_buf).await?; + proto_xmpp::sasl::Success.write_xml(xml_writer).await?; let name: Str = "darova".into(); Ok(Authenticated { @@ -308,7 +310,6 @@ async fn socket_final( match parser.consume(ns, &event) { Continuation::Final(res) => { let res = res?; - dbg!(&res); let stop = handle_packet(&mut events, res, authenticated, user_handle, rooms).await?; for i in &events { xml_writer.write_event_async(i).await?; @@ -327,8 +328,8 @@ async fn socket_final( update = user_handle.receiver.recv() => { if let Some(update) = update { match update { - crate::core::player::Updates::NewMessage { room_id, author_id, body } => { - Message { + lavina_core::player::Updates::NewMessage { room_id, author_id, body } => { + Message::<()> { to: Some(Jid { name: Some(authenticated.xmpp_name.clone()), server: Server("localhost".into()), @@ -340,10 +341,11 @@ async fn socket_final( resource: Some(Resource(author_id.into_inner().into())), }), id: None, - r#type: xmpp::client::MessageType::Groupchat, + r#type: proto_xmpp::client::MessageType::Groupchat, lang: None, subject: None, body: body.into(), + custom: vec![], } .serialize(&mut events); } @@ -391,12 +393,9 @@ async fn handle_packet( { if server.0.as_ref() == "rooms.localhost" && m.r#type == MessageType::Groupchat { user_handle - .send_message( - RoomId::from(name.0.clone())?, - m.body.clone().into(), - ) + .send_message(RoomId::from(name.0.clone())?, m.body.clone().into()) .await?; - Message { + Message::<()> { to: Some(Jid { name: Some(user.xmpp_name.clone()), server: Server("localhost".into()), @@ -408,10 +407,11 @@ async fn handle_packet( resource: Some(user.xmpp_muc_name.clone()), }), id: m.id, - r#type: xmpp::client::MessageType::Groupchat, + r#type: proto_xmpp::client::MessageType::Groupchat, lang: None, subject: None, body: m.body.clone(), + custom: vec![], } .serialize(output); false @@ -443,9 +443,7 @@ async fn handle_packet( resource: Some(resource), }) = p.to { - let a = user_handle - .join_room(RoomId::from(name.0.clone())?) - .await?; + let a = user_handle.join_room(RoomId::from(name.0.clone())?).await?; Presence::<()> { to: Some(Jid { name: Some(user.xmpp_name.clone()), @@ -479,7 +477,7 @@ async fn handle_iq(output: &mut Vec>, iq: Iq, rooms from: None, id: iq.id, to: None, - r#type: xmpp::client::IqType::Result, + r#type: proto_xmpp::client::IqType::Result, body: BindResponse(Jid { name: Some(Name("darova".into())), server: Server("localhost".into()), @@ -493,7 +491,7 @@ async fn handle_iq(output: &mut Vec>, iq: Iq, rooms from: None, id: iq.id, to: None, - r#type: xmpp::client::IqType::Result, + r#type: proto_xmpp::client::IqType::Result, body: Session, }; req.serialize(output); @@ -503,7 +501,7 @@ async fn handle_iq(output: &mut Vec>, iq: Iq, rooms from: None, id: iq.id, to: None, - r#type: xmpp::client::IqType::Result, + r#type: proto_xmpp::client::IqType::Result, body: RosterQuery, }; req.serialize(output); @@ -514,7 +512,7 @@ async fn handle_iq(output: &mut Vec>, iq: Iq, rooms from: iq.to, id: iq.id, to: None, - r#type: xmpp::client::IqType::Result, + r#type: proto_xmpp::client::IqType::Result, body: response, }; req.serialize(output); @@ -525,7 +523,7 @@ async fn handle_iq(output: &mut Vec>, iq: Iq, rooms from: iq.to, id: iq.id, to: None, - r#type: xmpp::client::IqType::Result, + r#type: proto_xmpp::client::IqType::Result, body: response, }; req.serialize(output); @@ -535,7 +533,7 @@ async fn handle_iq(output: &mut Vec>, iq: Iq, rooms from: None, id: iq.id, to: None, - r#type: xmpp::client::IqType::Error, + r#type: proto_xmpp::client::IqType::Error, body: (), }; req.serialize(output); @@ -628,13 +626,13 @@ async fn read_xml_header( if &*encoding == b"UTF-8" { Ok(()) } else { - Err(fail(format!("Unsupported encoding: {encoding:?}").as_str())) + Err(anyhow!("Unsupported encoding: {encoding:?}")) } } else { // Err(fail("No XML encoding provided")) Ok(()) } } else { - Err(fail("Expected XML header")) + Err(anyhow!("Expected XML header")) } } diff --git a/src/projections/xmpp/proto.rs b/crates/projection-xmpp/src/proto.rs similarity index 78% rename from src/projections/xmpp/proto.rs rename to crates/projection-xmpp/src/proto.rs index 4667dd1..e486b65 100644 --- a/src/projections/xmpp/proto.rs +++ b/crates/projection-xmpp/src/proto.rs @@ -1,15 +1,15 @@ +use anyhow::anyhow; use derive_more::From; use quick_xml::events::Event; use quick_xml::name::{Namespace, ResolveResult}; -use crate::protos::xmpp::bind::BindRequest; -use crate::protos::xmpp::client::{Iq, Message, Presence}; -use crate::protos::xmpp::disco::{InfoQuery, ItemQuery}; -use crate::protos::xmpp::roster::RosterQuery; -use crate::protos::xmpp::session::Session; -use crate::util::xml::*; - -use crate::prelude::*; +use lavina_core::prelude::*; +use proto_xmpp::bind::BindRequest; +use proto_xmpp::client::{Iq, Message, Presence}; +use proto_xmpp::disco::{InfoQuery, ItemQuery}; +use proto_xmpp::roster::RosterQuery; +use proto_xmpp::session::Session; +use proto_xmpp::xml::*; #[derive(PartialEq, Eq, Debug, From)] pub enum IqClientBody { @@ -29,7 +29,7 @@ impl FromXml for IqClientBody { let bytes = match event { Event::Start(bytes) => bytes, Event::Empty(bytes) => bytes, - _ => return Err(ffail!("Unexpected XML event: {event:?}")), + _ => return Err(anyhow!("Unexpected XML event: {event:?}")), }; let name = bytes.name(); match_parser!(name, namespace, event; @@ -49,7 +49,7 @@ impl FromXml for IqClientBody { #[derive(PartialEq, Eq, Debug, From)] pub enum ClientPacket { Iq(Iq), - Message(Message), + Message(Message), Presence(Presence), StreamEnd, } @@ -65,9 +65,9 @@ impl FromXml for ClientPacket { match_parser!(name, namespace, event; Iq::, Presence::, - Message, + Message::, { - Err(ffail!( + Err(anyhow!( "Unexpected XML event of name {:?} in namespace {:?}", name, namespace @@ -80,11 +80,11 @@ impl FromXml for ClientPacket { if name.local_name().as_ref() == b"stream" { return Ok(ClientPacket::StreamEnd); } else { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); } } _ => { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); } } } diff --git a/crates/proto-irc/Cargo.toml b/crates/proto-irc/Cargo.toml new file mode 100644 index 0000000..4a5286e --- /dev/null +++ b/crates/proto-irc/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "proto-irc" +edition = "2021" +version.workspace = true +publish = false + +[dependencies] +nom.workspace = true +nonempty.workspace = true +tokio.workspace = true +futures-util.workspace = true +anyhow.workspace = true + +[dev-dependencies] +assert_matches.workspace = true diff --git a/src/protos/irc/client.rs b/crates/proto-irc/src/client.rs similarity index 100% rename from src/protos/irc/client.rs rename to crates/proto-irc/src/client.rs diff --git a/src/protos/irc/mod.rs b/crates/proto-irc/src/lib.rs similarity index 98% rename from src/protos/irc/mod.rs rename to crates/proto-irc/src/lib.rs index 4d65efd..cd1f64f 100644 --- a/src/protos/irc/mod.rs +++ b/crates/proto-irc/src/lib.rs @@ -1,9 +1,13 @@ //! Client-to-Server IRC protocol. pub mod client; +mod prelude; pub mod server; +#[cfg(test)] +mod testkit; +pub mod user; -use std::io::Result; use crate::prelude::Str; +use std::io::Result; use nom::{ branch::alt, @@ -100,7 +104,7 @@ mod test { use assert_matches::*; use super::*; - use crate::util::testkit::*; + use crate::testkit::*; #[test] fn test_chan_global() { diff --git a/crates/proto-irc/src/prelude.rs b/crates/proto-irc/src/prelude.rs new file mode 100644 index 0000000..04c4d62 --- /dev/null +++ b/crates/proto-irc/src/prelude.rs @@ -0,0 +1,3 @@ +use std::sync::Arc; + +pub type Str = Arc; diff --git a/src/protos/irc/server.rs b/crates/proto-irc/src/server.rs similarity index 93% rename from src/protos/irc/server.rs rename to crates/proto-irc/src/server.rs index 16b5804..1bead65 100644 --- a/src/protos/irc/server.rs +++ b/crates/proto-irc/src/server.rs @@ -1,7 +1,9 @@ +use nonempty::NonEmpty; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use super::*; +use crate::user::PrefixedNick; /// Server-to-client message. #[derive(Clone, Debug, PartialEq, Eq)] @@ -121,7 +123,7 @@ pub enum ServerMessageBody { N353NamesReply { client: Str, chan: Chan, - members: Str, // TODO make this a non-empty list with prefixes + members: NonEmpty, }, N366NamesReplyEnd { client: Str, @@ -132,6 +134,10 @@ pub enum ServerMessageBody { chan: Chan, message: Str, }, + N502UsersDontMatch { + client: Str, + message: Str, + }, } impl ServerMessageBody { @@ -280,7 +286,13 @@ impl ServerMessageBody { writer.write_all(b" = ").await?; chan.write_async(writer).await?; writer.write_all(b" :").await?; - writer.write_all(members.as_bytes()).await?; + for member in members { + writer + .write_all(member.prefix.to_string().as_bytes()) + .await?; + writer.write_all(member.nick.as_bytes()).await?; + writer.write_all(b" ").await?; + } } ServerMessageBody::N366NamesReplyEnd { client, chan } => { writer.write_all(b"366 ").await?; @@ -301,6 +313,13 @@ impl ServerMessageBody { writer.write_all(b" :").await?; writer.write_all(message.as_bytes()).await?; } + ServerMessageBody::N502UsersDontMatch { client, message } => { + writer.write_all(b"502 ").await?; + writer.write_all(client.as_bytes()).await?; + writer.write_all(b" ").await?; + writer.write_all(b" :").await?; + writer.write_all(message.as_bytes()).await?; + } } Ok(()) } @@ -370,7 +389,7 @@ mod test { use assert_matches::*; use super::*; - use crate::util::testkit::*; + use crate::testkit::*; #[test] fn test_server_message_notice() { diff --git a/src/util/testkit.rs b/crates/proto-irc/src/testkit.rs similarity index 73% rename from src/util/testkit.rs rename to crates/proto-irc/src/testkit.rs index d568bea..57949d0 100644 --- a/src/util/testkit.rs +++ b/crates/proto-irc/src/testkit.rs @@ -1,10 +1,10 @@ +use std::future::Future; use std::task::{Context, Poll}; use futures_util::task::noop_waker_ref; +use tokio::pin; -use crate::prelude::*; - -pub fn sync_future(future: impl Future) -> Result { +pub fn sync_future(future: impl Future) -> anyhow::Result { let waker = noop_waker_ref(); let mut context = Context::from_waker(waker); pin!(future); diff --git a/crates/proto-irc/src/user.rs b/crates/proto-irc/src/user.rs new file mode 100644 index 0000000..3998fc2 --- /dev/null +++ b/crates/proto-irc/src/user.rs @@ -0,0 +1,30 @@ +use super::*; +use std::fmt; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Prefix { + Empty, +} + +impl fmt::Display for Prefix { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Prefix::Empty => write!(f, ""), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PrefixedNick { + pub prefix: Prefix, + pub nick: Str, +} + +impl PrefixedNick { + pub fn from_str(nick: Str) -> PrefixedNick { + PrefixedNick { + prefix: Prefix::Empty, + nick, + } + } +} diff --git a/crates/proto-xmpp/Cargo.toml b/crates/proto-xmpp/Cargo.toml new file mode 100644 index 0000000..d8b28f9 --- /dev/null +++ b/crates/proto-xmpp/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "proto-xmpp" +edition = "2021" +version.workspace = true + +[dependencies] +quick-xml.workspace = true +lazy_static.workspace = true +regex.workspace = true +anyhow.workspace = true +tokio.workspace = true +derive_more.workspace = true diff --git a/src/protos/xmpp/bind.rs b/crates/proto-xmpp/src/bind.rs similarity index 62% rename from src/protos/xmpp/bind.rs rename to crates/proto-xmpp/src/bind.rs index 35d0dfc..8d9a4cf 100644 --- a/src/protos/xmpp/bind.rs +++ b/crates/proto-xmpp/src/bind.rs @@ -1,11 +1,11 @@ use std::fmt::Display; -use nom::AsBytes; +use anyhow::{anyhow, Result}; use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event}; use quick_xml::name::{Namespace, ResolveResult}; use crate::prelude::*; -use crate::util::xml::*; +use crate::xml::*; pub const XMLNS: &'static str = "urn:ietf:params:xml:ns:xmpp-bind"; @@ -49,7 +49,7 @@ impl Jid { } let m = RE .captures(i) - .ok_or(ffail!("Incorrectly format jid: {i}"))?; + .ok_or(anyhow!("Incorrectly format jid: {i}"))?; let name = m.get(2).map(|name| Name(name.as_str().into())); let server = m.get(3).unwrap(); @@ -78,89 +78,56 @@ impl Jid { #[derive(PartialEq, Eq, Debug)] pub struct BindRequest(pub Resource); -pub struct BindRequestParser(BindRequestParserInner); - -enum BindRequestParserInner { - Initial, - /// Consumed start and expects - InBind(Option), - /// Consumed start - InBindResourceInitial, - /// Consumer start and inner text - InBindResourceEnd(String), -} - impl FromXmlTag for BindRequest { const NS: &'static str = XMLNS; const NAME: &'static str = "bind"; } impl FromXml for BindRequest { - type P = BindRequestParser; + type P = impl Parser>; fn parse() -> Self::P { - BindRequestParser(BindRequestParserInner::Initial) - } -} - -// TODO rewrite as a generator -impl Parser for BindRequestParser { - type Output = Result; - - fn consume<'a>( - self: Self, - namespace: ResolveResult, - event: &Event<'a>, - ) -> Continuation { - // TODO validate tag names and namespaces - use BindRequestParserInner::*; - match self.0 { - Initial => { - let Event::Start(bytes) = event else { - return Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))); - }; - if bytes.name().0 != BindRequest::NAME.as_bytes() { - return Continuation::Final(Err(ffail!( - "Unexpected XML tag: {:?}", - bytes.name() - ))); - } - let ResolveResult::Bound(Namespace(ns)) = namespace else { - return Continuation::Final(Err(ffail!("No namespace provided"))); - }; - if ns != XMLNS.as_bytes() { - return Continuation::Final(Err(ffail!("Incorrect namespace"))); - } - Continuation::Continue(BindRequestParser(InBind(None))) + |(namespace, event): (ResolveResult<'static>, &'static Event<'static>)| -> Result { + let mut resource: Option = None; + let Event::Start(bytes) = event else { + return Err(anyhow!("Unexpected XML event: {event:?}")); + }; + if bytes.name().0 != BindRequest::NAME.as_bytes() { + return Err(anyhow!("Unexpected XML tag: {:?}", bytes.name())); } - InBind(resource) => match event { - Event::Start(bytes) => { - Continuation::Continue(BindRequestParser(InBindResourceInitial)) - } - Event::End(bytes) => { - let Some(resource) = resource else { - return Continuation::Final(Err(ffail!("No resource was provided"))); + let ResolveResult::Bound(Namespace(ns)) = namespace else { + return Err(anyhow!("No namespace provided")); + }; + if ns != XMLNS.as_bytes() { + return Err(anyhow!("Incorrect namespace")); + } + loop { + let (namespace, event) = yield; + match event { + Event::Start(bytes) if bytes.name().0 == b"resource" => { + let (namespace, event) = yield; + let Event::Text(text) = event else { + return Err(anyhow!("Unexpected XML event: {event:?}")); }; - Continuation::Final(Ok(BindRequest(Resource(resource.into())))) + resource = Some(std::str::from_utf8(&*text)?.into()); + let (namespace, event) = yield; + let Event::End(bytes) = event else { + return Err(anyhow!("Unexpected XML event: {event:?}")); + }; + if bytes.name().0 != b"resource" { + return Err(anyhow!("Unexpected XML tag: {:?}", bytes.name())); + } + } + Event::End(bytes) if bytes.name().0 == BindRequest::NAME.as_bytes() => { + break; + } + _ => return Err(anyhow!("Unexpected XML event: {event:?}")), } - _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), - }, - InBindResourceInitial => { - let Event::Text(text) = event else { - return Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))); - }; - let resource = match std::str::from_utf8(text.as_bytes()) { - Ok(e) => e.to_string(), - Err(err) => return Continuation::Final(Err(err.into())), - }; - Continuation::Continue(BindRequestParser(InBindResourceEnd(resource))) - } - InBindResourceEnd(resource) => { - let Event::End(bytes) = event else { - return Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))); - }; - Continuation::Continue(BindRequestParser(InBind(Some(resource)))) } + let Some(resource) = resource else { + return Err(anyhow!("No resource was provided")); + }; + Ok(BindRequest(Resource(resource))) } } } diff --git a/src/protos/xmpp/client.rs b/crates/proto-xmpp/src/client.rs similarity index 86% rename from src/protos/xmpp/client.rs rename to crates/proto-xmpp/src/client.rs index 606e03c..fd7ac93 100644 --- a/src/protos/xmpp/client.rs +++ b/crates/proto-xmpp/src/client.rs @@ -3,52 +3,55 @@ use quick_xml::events::attributes::Attribute; use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event}; use quick_xml::name::{QName, ResolveResult}; +use anyhow::{anyhow as ffail, Result}; + use crate::prelude::*; -use crate::util::xml::*; +use crate::xml::*; use super::bind::Jid; pub const XMLNS: &'static str = "jabber:client"; #[derive(PartialEq, Eq, Debug)] -pub struct Message { +pub struct Message { pub from: Option, pub id: Option, pub to: Option, // default is Normal pub r#type: MessageType, pub lang: Option, - pub subject: Option, pub body: Str, + pub custom: Vec, } -impl FromXmlTag for Message { +impl FromXmlTag for Message { const NS: &'static str = XMLNS; const NAME: &'static str = "message"; } -impl FromXml for Message { - type P = MessageParser; +impl FromXml for Message { + type P = impl Parser>; fn parse() -> Self::P { - MessageParserInner::Init.into() + MessageParser(MessageParserInner::Init) } } #[derive(From)] -pub struct MessageParser(MessageParserInner); +struct MessageParser(MessageParserInner); #[derive(Default)] -enum MessageParserInner { +enum MessageParserInner { #[default] Init, - Outer(MessageParserState), - InSubject(MessageParserState), - InBody(MessageParserState), + Outer(MessageParserState), + InSubject(MessageParserState), + InBody(MessageParserState), + InCustom(MessageParserState, T::P), } #[derive(Default)] -struct MessageParserState { +struct MessageParserState { from: Option, id: Option, to: Option, @@ -56,21 +59,27 @@ struct MessageParserState { lang: Option, subject: Option, body: Option, + custom: Vec, } -impl Parser for MessageParser { - type Output = Result; +impl Parser for MessageParser { + type Output = Result>; - fn consume<'a>( - self: Self, - namespace: ResolveResult, - event: &Event<'a>, - ) -> Continuation { + fn consume<'a>(self: Self, namespace: ResolveResult, event: &Event<'a>) -> Continuation { // TODO validate tag name and namespace at each stage use MessageParserInner::*; match self.0 { Init => { if let Event::Start(ref bytes) = event { - let mut state: MessageParserState = Default::default(); + let mut state: MessageParserState = MessageParserState { + from: None, + id: None, + to: None, + r#type: MessageType::Normal, + lang: None, + subject: None, + body: None, + custom: vec![], + }; for attr in bytes.attributes() { let attr = fail_fast!(attr); if attr.key.0 == b"from" { @@ -101,7 +110,7 @@ impl Parser for MessageParser { } else if bytes.name().0 == b"body" { Continuation::Continue(InBody(state).into()) } else { - Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))) + Continuation::Continue(InCustom(state, T::parse()).into()) } } Event::End(_) => { @@ -114,6 +123,7 @@ impl Parser for MessageParser { lang: state.lang, subject: state.subject, body, + custom: state.custom, })) } else { Continuation::Final(Err(ffail!("Body not found"))) @@ -141,11 +151,19 @@ impl Parser for MessageParser { Event::End(_) => Continuation::Continue(Outer(state).into()), _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), }, + InCustom(mut state, mut custom) => match custom.consume(namespace, event) { + Continuation::Final(Ok(e)) => { + state.custom.push(e); + Continuation::Continue(Outer(state).into()) + } + Continuation::Final(Err(e)) => Continuation::Final(Err(e)), + Continuation::Continue(c) => Continuation::Continue(InCustom(state, c).into()), + }, } } } -impl ToXml for Message { +impl ToXml for Message { fn serialize(&self, events: &mut Vec>) { let mut bytes = BytesStart::new(format!(r#"message xmlns="{}""#, XMLNS)); if let Some(from) = &self.from { @@ -258,11 +276,7 @@ struct IqParserState { impl Parser for IqParser { type Output = Result>; - fn consume<'a>( - self: Self, - namespace: ResolveResult, - event: &Event<'a>, - ) -> Continuation { + fn consume<'a>(self: Self, namespace: ResolveResult, event: &Event<'a>) -> Continuation { match self.0 { IqParserInner::Init => { if let Event::Start(ref bytes) = event { @@ -294,23 +308,21 @@ impl Parser for IqParser { Continuation::Final(Err(ffail!("Expected start"))) } } - IqParserInner::ParsingBody(mut state, parser) => { - match parser.consume(namespace, event) { - Continuation::Final(f) => { - let body = fail_fast!(f); - state.body = Some(body); - Continuation::Continue(IqParser(IqParserInner::Final(state))) - } - Continuation::Continue(parser) => { - Continuation::Continue(IqParser(IqParserInner::ParsingBody(state, parser))) - } + IqParserInner::ParsingBody(mut state, parser) => match parser.consume(namespace, event) { + Continuation::Final(f) => { + let body = fail_fast!(f); + state.body = Some(body); + Continuation::Continue(IqParser(IqParserInner::Final(state))) } - } + Continuation::Continue(parser) => { + Continuation::Continue(IqParser(IqParserInner::ParsingBody(state, parser))) + } + }, IqParserInner::Final(state) => { if let Event::End(ref bytes) = event { - let id = fail_fast!(state.id.ok_or_else(|| fail("No id provided"))); - let r#type = fail_fast!(state.r#type.ok_or_else(|| fail("No type provided"))); - let body = fail_fast!(state.body.ok_or_else(|| fail("No body provided"))); + let id = fail_fast!(state.id.ok_or_else(|| ffail!("No id provided"))); + let r#type = fail_fast!(state.r#type.ok_or_else(|| ffail!("No type provided"))); + let body = fail_fast!(state.body.ok_or_else(|| ffail!("No body provided"))); Continuation::Final(Ok(Iq { from: state.from, id, @@ -579,29 +591,23 @@ impl ToXml for Presence { #[cfg(test)] mod tests { - use crate::protos::xmpp::bind::{BindRequest, Name, Resource, Server}; + use crate::bind::{BindRequest, Name, Resource, Server}; use super::*; use quick_xml::NsReader; #[tokio::test] async fn parse_message() { - let input = r#"daabbb"#; + let input = r#"daabbb"#; let mut reader = NsReader::from_reader(input.as_bytes()); let mut buf = vec![]; - let (ns, event) = reader - .read_resolved_event_into_async(&mut buf) - .await - .unwrap(); + let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); let mut parser = Message::parse().consume(ns, &event); let result = loop { match parser { Continuation::Final(res) => break res, Continuation::Continue(next) => { - let (ns, event) = reader - .read_resolved_event_into_async(&mut buf) - .await - .unwrap(); + let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); parser = next.consume(ns, &event); } } @@ -609,7 +615,7 @@ mod tests { .unwrap(); assert_eq!( result, - Message { + Message:: { from: None, id: Some("aacea".to_string()), to: Some(Jid { @@ -621,6 +627,7 @@ mod tests { lang: None, subject: Some("daa".into()), body: "bbb".into(), + custom: vec![Ignore], } ) } @@ -630,19 +637,13 @@ mod tests { let input = r#"mobile"#; let mut reader = NsReader::from_reader(input.as_bytes()); let mut buf = vec![]; - let (ns, event) = reader - .read_resolved_event_into_async(&mut buf) - .await - .unwrap(); + let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); let mut parser = Iq::::parse().consume(ns, &event); let result = loop { match parser { Continuation::Final(res) => break res, Continuation::Continue(next) => { - let (ns, event) = reader - .read_resolved_event_into_async(&mut buf) - .await - .unwrap(); + let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); parser = next.consume(ns, &event); } } diff --git a/src/protos/xmpp/disco.rs b/crates/proto-xmpp/src/disco.rs similarity index 99% rename from src/protos/xmpp/disco.rs rename to crates/proto-xmpp/src/disco.rs index a15e610..973242d 100644 --- a/src/protos/xmpp/disco.rs +++ b/crates/proto-xmpp/src/disco.rs @@ -2,8 +2,8 @@ use quick_xml::events::attributes::Attribute; use quick_xml::events::{BytesEnd, BytesStart, Event}; use quick_xml::name::{QName, ResolveResult}; -use crate::prelude::*; -use crate::util::xml::*; +use anyhow::{Result, anyhow as ffail}; +use crate::xml::*; use super::bind::Jid; diff --git a/src/protos/xmpp/mod.rs b/crates/proto-xmpp/src/lib.rs similarity index 78% rename from src/protos/xmpp/mod.rs rename to crates/proto-xmpp/src/lib.rs index 0f99af7..ee85319 100644 --- a/src/protos/xmpp/mod.rs +++ b/crates/proto-xmpp/src/lib.rs @@ -1,3 +1,10 @@ +#![feature( + generators, + generator_trait, + type_alias_impl_trait, + impl_trait_in_assoc_type +)] + pub mod bind; pub mod client; pub mod disco; @@ -8,6 +15,8 @@ pub mod session; pub mod stanzaerror; pub mod stream; pub mod tls; +mod prelude; +pub mod xml; // Implemented as a macro instead of a fn due to borrowck limitations macro_rules! skip_text { @@ -25,4 +34,4 @@ macro_rules! skip_text { }; } -pub(super) use skip_text; +pub(crate) use skip_text; diff --git a/src/protos/xmpp/muc/mod.rs b/crates/proto-xmpp/src/muc/mod.rs similarity index 86% rename from src/protos/xmpp/muc/mod.rs rename to crates/proto-xmpp/src/muc/mod.rs index 8822fe3..250d50d 100644 --- a/src/protos/xmpp/muc/mod.rs +++ b/crates/proto-xmpp/src/muc/mod.rs @@ -1,8 +1,8 @@ use quick_xml::events::Event; use quick_xml::name::ResolveResult; -use crate::prelude::*; -use crate::util::xml::*; +use anyhow::{anyhow, Result}; +use crate::xml::*; pub const XMLNS: &'static str = "http://jabber.org/protocol/muc"; @@ -22,7 +22,7 @@ impl FromXml for History { let (bytes, end) = match event { Event::Start(bytes) => (bytes, false), Event::Empty(bytes) => (bytes, true), - _ => return Err(ffail!("Unexpected XML event: {event:?}")), + _ => return Err(anyhow!("Unexpected XML event: {event:?}")), }; for attr in bytes.attributes() { let attr = attr?; @@ -51,7 +51,7 @@ impl FromXml for History { let (namespace, event) = yield; let Event::End(bytes) = event else { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); }; Ok(history) } @@ -73,15 +73,15 @@ impl FromXml for Password { fn parse() -> Self::P { |(namespace, event): (ResolveResult<'static>, &'static Event<'static>)| -> Result { let Event::Start(bytes) = event else { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); }; let (namespace, event) = yield; let Event::Text(bytes) = event else { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); }; let s = std::str::from_utf8(bytes)?.to_string(); let Event::End(bytes) = event else { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); }; Ok(Password(s)) } @@ -109,7 +109,7 @@ impl FromXml for X { let (bytes, end) = match event { Event::Start(bytes) => (bytes, false), Event::Empty(bytes) => (bytes, true), - _ => return Err(ffail!("Unexpected XML event: {event:?}")), + _ => return Err(anyhow!("Unexpected XML event: {event:?}")), }; if end { return Ok(res); @@ -121,7 +121,7 @@ impl FromXml for X { Event::Start(bytes) => bytes, Event::Empty(bytes) => bytes, Event::End(_) => break, - _ => return Err(ffail!("Unexpected XML event: {event:?}")), + _ => return Err(anyhow!("Unexpected XML event: {event:?}")), }; if bytes.name().0 == Password::NAME.as_bytes() { let password = delegate_parsing!(Password, namespace, event)?; @@ -130,7 +130,7 @@ impl FromXml for X { let history = delegate_parsing!(History, namespace, event)?; res.history = Some(history); } else { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); } } diff --git a/crates/proto-xmpp/src/prelude.rs b/crates/proto-xmpp/src/prelude.rs new file mode 100644 index 0000000..04c4d62 --- /dev/null +++ b/crates/proto-xmpp/src/prelude.rs @@ -0,0 +1,3 @@ +use std::sync::Arc; + +pub type Str = Arc; diff --git a/src/protos/xmpp/roster.rs b/crates/proto-xmpp/src/roster.rs similarity index 96% rename from src/protos/xmpp/roster.rs rename to crates/proto-xmpp/src/roster.rs index 4990bc1..e153843 100644 --- a/src/protos/xmpp/roster.rs +++ b/crates/proto-xmpp/src/roster.rs @@ -1,7 +1,7 @@ use quick_xml::events::{BytesStart, Event}; -use crate::prelude::*; -use crate::util::xml::*; +use crate::xml::*; +use anyhow::{anyhow as ffail, Result}; pub const XMLNS: &'static str = "jabber:iq:roster"; diff --git a/src/protos/xmpp/sasl.rs b/crates/proto-xmpp/src/sasl.rs similarity index 83% rename from src/protos/xmpp/sasl.rs rename to crates/proto-xmpp/src/sasl.rs index e78f605..7068e7a 100644 --- a/src/protos/xmpp/sasl.rs +++ b/crates/proto-xmpp/src/sasl.rs @@ -7,7 +7,7 @@ use quick_xml::{ use tokio::io::{AsyncBufRead, AsyncWrite}; use super::skip_text; -use crate::prelude::*; +use anyhow::{anyhow, Result}; pub enum Mechanism { Plain, @@ -22,7 +22,7 @@ impl Mechanism { pub fn from_str(input: &[u8]) -> Result { match input { b"PLAIN" => Ok(Mechanism::Plain), - _ => Err(fail(format!("unknown auth mechanism: {input:?}").as_str())), + _ => Err(anyhow!("unknown auth mechanism: {input:?}")), } } } @@ -48,20 +48,20 @@ impl Auth { if let Some(mechanism) = mechanism { Mechanism::from_str(mechanism.borrow())? } else { - return Err(fail("expected mechanism attribute in ")); + return Err(anyhow!("expected mechanism attribute in ")); } } else { - return Err(fail("expected start of ")); + return Err(anyhow!("expected start of ")); }; let body = if let Event::Text(text) = reader.read_event_into_async(buf).await? { text.into_inner().into_owned() } else { - return Err(fail("expected text body in ")); + return Err(anyhow!("expected text body in ")); }; if let Event::End(_) = reader.read_event_into_async(buf).await? { //TODO } else { - return Err(fail("expected end of ")); + return Err(anyhow!("expected end of ")); }; Ok(Auth { mechanism, body }) diff --git a/src/protos/xmpp/session.rs b/crates/proto-xmpp/src/session.rs similarity index 86% rename from src/protos/xmpp/session.rs rename to crates/proto-xmpp/src/session.rs index 9f0104b..ba1ab43 100644 --- a/src/protos/xmpp/session.rs +++ b/crates/proto-xmpp/src/session.rs @@ -1,7 +1,7 @@ use quick_xml::events::{BytesStart, Event}; -use crate::prelude::*; -use crate::util::xml::*; +use crate::xml::*; +use anyhow::{anyhow, Result}; pub const XMLNS: &'static str = "urn:ietf:params:xml:ns:xmpp-session"; @@ -29,11 +29,11 @@ impl Parser for SessionParser { Continuation::Continue(SessionParser(SessionParserInner::InSession)) } Event::Empty(_) => Continuation::Final(Ok(Session)), - _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), + _ => Continuation::Final(Err(anyhow!("Unexpected XML event: {event:?}"))), }, SessionParserInner::InSession => match event { Event::End(_) => Continuation::Final(Ok(Session)), - _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), + _ => Continuation::Final(Err(anyhow!("Unexpected XML event: {event:?}"))), }, } } diff --git a/src/protos/xmpp/stanzaerror.rs b/crates/proto-xmpp/src/stanzaerror.rs similarity index 100% rename from src/protos/xmpp/stanzaerror.rs rename to crates/proto-xmpp/src/stanzaerror.rs diff --git a/src/protos/xmpp/stream.rs b/crates/proto-xmpp/src/stream.rs similarity index 98% rename from src/protos/xmpp/stream.rs rename to crates/proto-xmpp/src/stream.rs index 1f133a3..4d11039 100644 --- a/src/protos/xmpp/stream.rs +++ b/crates/proto-xmpp/src/stream.rs @@ -5,8 +5,9 @@ use quick_xml::{NsReader, Writer}; use tokio::io::{AsyncBufRead, AsyncWrite}; use super::skip_text; -use crate::prelude::*; -use crate::util::xml::ToXml; + +use anyhow::{anyhow, Result}; +use crate::xml::ToXml; pub static XMLNS: &'static str = "http://etherx.jabber.org/streams"; pub static PREFIX: &'static str = "stream"; @@ -63,7 +64,6 @@ impl ClientStreamStart { version: version.unwrap(), }) } else { - log::error!("WAT: {incoming:?}"); Err(panic!()) } } diff --git a/src/protos/xmpp/tls.rs b/crates/proto-xmpp/src/tls.rs similarity index 91% rename from src/protos/xmpp/tls.rs rename to crates/proto-xmpp/src/tls.rs index 6fdf4ad..a5a9e0d 100644 --- a/src/protos/xmpp/tls.rs +++ b/crates/proto-xmpp/src/tls.rs @@ -5,7 +5,8 @@ use quick_xml::{NsReader, Writer}; use tokio::io::{AsyncBufRead, AsyncWrite}; use super::skip_text; -use crate::prelude::*; + +use anyhow::{anyhow, Result}; pub static XMLNS: &'static str = "urn:ietf:params:xml:ns:xmpp-tls"; @@ -21,7 +22,7 @@ impl StartTLS { return Ok(StartTLS); } } - Err(ffail!("XML tag starttls expected, received: {incoming:?}")) + Err(anyhow!("XML tag starttls expected, received: {incoming:?}")) } } diff --git a/src/util/xml/ignore.rs b/crates/proto-xmpp/src/xml/ignore.rs similarity index 100% rename from src/util/xml/ignore.rs rename to crates/proto-xmpp/src/xml/ignore.rs diff --git a/src/util/xml.rs b/crates/proto-xmpp/src/xml/mod.rs similarity index 96% rename from src/util/xml.rs rename to crates/proto-xmpp/src/xml/mod.rs index 9ad847e..771bef4 100644 --- a/src/util/xml.rs +++ b/crates/proto-xmpp/src/xml/mod.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use quick_xml::events::Event; use quick_xml::name::ResolveResult; -use crate::prelude::Result; +use anyhow::{anyhow, Result}; mod ignore; pub use ignore::Ignore; @@ -72,6 +72,7 @@ macro_rules! fail_fast { }; } +#[macro_export] macro_rules! delegate_parsing { ($parser: ty, $namespace: expr, $event: expr) => {{ let mut parser = <$parser as FromXml>::parse().consume($namespace, $event); @@ -88,6 +89,7 @@ macro_rules! delegate_parsing { }}; } +#[macro_export] macro_rules! match_parser { ($name: expr, $ns: expr, $event: expr; $subtype: ty, $fin: block) => { if $name.0 == <$subtype as FromXmlTag>::NAME.as_bytes() && $ns == ResolveResult::Bound(Namespace(<$subtype as FromXmlTag>::NS.as_bytes())) { @@ -105,6 +107,6 @@ macro_rules! match_parser { }; } -pub(crate) use delegate_parsing; +pub use delegate_parsing; pub(crate) use fail_fast; -pub(crate) use match_parser; +pub use match_parser; diff --git a/deny.toml b/deny.toml index 19a24a9..f7ace18 100644 --- a/deny.toml +++ b/deny.toml @@ -17,14 +17,18 @@ allow = [ "MIT", "Apache-2.0", "ISC", - "MPL-2.0", "BSD-3-Clause", ] exceptions = [ { allow = ["Unicode-DFS-2016"], name = "unicode-ident" }, { allow = ["OpenSSL"], name = "ring" }, ] -deny = ["GPL-2.0", "GPL-3.0", "AGPL-3.0"] +deny = [ + "GPL-2.0", + "GPL-3.0", + "AGPL-3.0", + "MPL-2.0" # it is used by webpki-roots; we do not hardcode root certs in the binary +] copyleft = "deny" confidence-threshold = 0.93 private = { ignore = true } diff --git a/dist/alpine3.18.Dockerfile b/dist/alpine3.18.Dockerfile new file mode 100644 index 0000000..facdfb0 --- /dev/null +++ b/dist/alpine3.18.Dockerfile @@ -0,0 +1,11 @@ +FROM rust:1.72.0-alpine3.18@sha256:2f5592c561cef195c9fa4462633a674458dc375fc0ba4b80e7efe4c3c8e68403 as bld + +RUN apk add --no-cache musl-dev +COPY . . +RUN cargo build --release + +FROM alpine:3.18@sha256:7144f7bab3d4c2648d7e59409f15ec52a18006a128c733fcff20d3a4a54ba44a + +COPY --from=bld target/release/lavina /usr/bin/lavina +VOLUME ["/etc/lavina/", "/var/lib/lavina/"] +ENTRYPOINT ["lavina", "--config", "/etc/lavina/config.toml"] diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..866c756 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +max_width = 120 \ No newline at end of file diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..c6e5546 --- /dev/null +++ b/src/http.rs @@ -0,0 +1,190 @@ +use std::convert::Infallible; +use std::net::SocketAddr; + +use futures_util::FutureExt; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Method, Request, Response, StatusCode}; +use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; +use serde::{Deserialize, Serialize}; +use tokio::net::TcpListener; + +use lavina_core::prelude::*; +use lavina_core::repo::Storage; +use lavina_core::room::RoomRegistry; + +use mgmt_api::*; + +type HttpResult = std::result::Result; + +#[derive(Deserialize, Debug)] +pub struct ServerConfig { + pub listen_on: SocketAddr, +} + +pub async fn launch<'a>( + config: ServerConfig, + metrics: &'a MetricsRegistry, + rooms: &'a RoomRegistry<'_>, + storage: &'a Storage, + scope: &mut Scope<'a>, +) -> Result> { + log::info!("Starting the http service"); + let listener = TcpListener::bind(config.listen_on).await?; + log::debug!("Listener started"); + + let (signal, mut rx) = oneshot(); + + let future = async move { + let mut scope = unsafe { Scope::create() }; + loop { + select! { + biased; + _ = &mut rx => break, + result = listener.accept() => { + let (stream, _) = result?; + scope.spawn(async move { + let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(metrics, rooms, storage, r))); + if let Err(err) = server.await { + tracing::error!("Error serving connection: {:?}", err); + } + }); + }, + } + } + let _ = scope.collect().await; + drop(scope); + log::info!("Terminating the http service"); + Ok(()) + }; + scope.spawn(future.map(|_: Result<()>| ())); + + Ok(signal) +} + +async fn route( + registry: &MetricsRegistry, + rooms: &RoomRegistry<'_>, + storage: &Storage, + request: Request, +) -> HttpResult>> { + let res = match (request.method(), request.uri().path()) { + (&Method::GET, "/metrics") => endpoint_metrics(registry), + (&Method::GET, "/rooms") => endpoint_rooms(rooms).await, + (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(), + (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(), + _ => not_found(), + }; + Ok(res) +} + +fn endpoint_metrics(registry: &MetricsRegistry) -> Response> { + let mf = registry.gather(); + let mut buffer = vec![]; + TextEncoder.encode(&mf, &mut buffer).expect("write to vec cannot fail"); + Response::new(Full::new(Bytes::from(buffer))) +} + +async fn endpoint_rooms(rooms: &RoomRegistry<'_>) -> Response> { + // TODO introduce management API types independent from core-domain types + // TODO remove `Serialize` implementations from all core-domain types + let room_list = rooms.get_all_rooms().await.to_body(); + Response::new(room_list) +} + +async fn endpoint_create_player( + request: Request, + storage: &Storage, +) -> Result>> { + let str = request.collect().await?.to_bytes(); + let Ok(res) = serde_json::from_slice::(&str[..]) else { + let payload = ErrorResponse { + code: errors::MALFORMED_REQUEST, + message: "The request payload contains incorrect JSON value", + } + .to_body(); + let mut response = Response::new(payload); + *response.status_mut() = StatusCode::BAD_REQUEST; + return Ok(response); + }; + storage.create_user(&res.name).await?; + log::info!("Player {} created", res.name); + let mut response = Response::new(Full::::default()); + *response.status_mut() = StatusCode::CREATED; + Ok(response) +} + +async fn endpoint_set_password( + request: Request, + storage: &Storage, +) -> Result>> { + let str = request.collect().await?.to_bytes(); + let Ok(res) = serde_json::from_slice::(&str[..]) else { + let payload = ErrorResponse { + code: errors::MALFORMED_REQUEST, + message: "The request payload contains incorrect JSON value", + } + .to_body(); + let mut response = Response::new(payload); + *response.status_mut() = StatusCode::BAD_REQUEST; + return Ok(response); + }; + let Some(_) = storage.set_password(&res.player_name, &res.password).await? else { + let payload = ErrorResponse { + code: errors::PLAYER_NOT_FOUND, + message: "No such player exists", + } + .to_body(); + let mut response = Response::new(payload); + *response.status_mut() = StatusCode::UNPROCESSABLE_ENTITY; + return Ok(response); + }; + log::info!("Password changed for player {}", res.player_name); + let mut response = Response::new(Full::::default()); + *response.status_mut() = StatusCode::NO_CONTENT; + Ok(response) +} + +pub fn not_found() -> Response> { + let payload = ErrorResponse { + code: errors::INVALID_PATH, + message: "The path does not exist", + } + .to_body(); + + let mut response = Response::new(payload); + *response.status_mut() = StatusCode::NOT_FOUND; + response +} + +trait Or5xx { + fn or5xx(self) -> Response>; +} +impl Or5xx for Result>> { + fn or5xx(self) -> Response> { + match self { + Ok(e) => e, + Err(e) => { + let mut response = Response::new(Full::new(e.to_string().into())); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response + } + } + } +} + +trait ToBody { + fn to_body(&self) -> Full; +} +impl ToBody for T +where + T: Serialize, +{ + fn to_body(&self) -> Full { + let mut buffer = vec![]; + serde_json::to_writer(&mut buffer, self).expect("unexpected fail when writing to vec"); + Full::new(Bytes::from(buffer)) + } +} diff --git a/src/main.rs b/src/main.rs index e0a59a0..a531853 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,38 +1,36 @@ -#![feature( - generators, - generator_trait, - type_alias_impl_trait, - impl_trait_in_assoc_type -)] - -mod core; -mod prelude; -mod projections; -mod protos; -mod util; +mod http; use std::future::Future; +use std::path::Path; +use clap::Parser; use figment::providers::Format; use figment::{providers::Toml, Figment}; use prometheus::Registry as MetricsRegistry; use serde::Deserialize; -use crate::core::player::PlayerRegistry; -use crate::core::repo::Storage; -use crate::core::room::RoomRegistry; -use crate::prelude::*; +use lavina_core::player::PlayerRegistry; +use lavina_core::prelude::*; +use lavina_core::repo::Storage; +use lavina_core::room::RoomRegistry; #[derive(Deserialize, Debug)] struct ServerConfig { - telemetry: util::telemetry::ServerConfig, - irc: projections::irc::ServerConfig, - xmpp: projections::xmpp::ServerConfig, - storage: core::repo::StorageConfig, + telemetry: http::ServerConfig, + irc: projection_irc::ServerConfig, + xmpp: projection_xmpp::ServerConfig, + storage: lavina_core::repo::StorageConfig, +} + +#[derive(Parser)] +struct CliArgs { + #[arg(long)] + config: Box, } fn load_config() -> Result { - let raw_config = Figment::new().merge(Toml::file("config.toml")); + let args = CliArgs::parse(); + let raw_config = Figment::from(Toml::file(args.config)); let config: ServerConfig = raw_config.extract()?; Ok(config) } @@ -58,9 +56,9 @@ async fn main() -> Result<()> { // unsafe: outer future is never dropped, scope is joined on `scope.collect` let mut scope = unsafe { Scope::create() }; - let telemetry_terminator = util::telemetry::launch(telemetry_config, &metrics, &rooms, &mut scope).await?; - let irc = projections::irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?; - let xmpp = projections::xmpp::launch(xmpp_config, &players, &rooms, &metrics, &mut scope).await?; + let telemetry_terminator = http::launch(telemetry_config, &metrics, &rooms, &storage, &mut scope).await?; + let irc = projection_irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?; + let xmpp = projection_xmpp::launch(xmpp_config, &players, &rooms, &metrics, &mut scope).await?; tracing::info!("Started"); sleep.await; diff --git a/src/projections/irc/test.rs b/src/projections/irc/test.rs deleted file mode 100644 index c3b238f..0000000 --- a/src/projections/irc/test.rs +++ /dev/null @@ -1,166 +0,0 @@ -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::net::tcp::{ReadHalf, WriteHalf}; -use tokio::net::TcpStream; - -use crate::prelude::*; - -struct TestScope<'a> { - reader: BufReader>, - writer: WriteHalf<'a>, - buffer: Vec, -} -impl<'a> TestScope<'a> { - async fn send(&mut self, str: &(impl AsRef + ?Sized)) -> Result<()> { - self.writer.write_all(str.as_ref().as_bytes()).await?; - self.writer.flush().await?; - Ok(()) - } - - async fn expect(&mut self, str: &(impl AsRef + ?Sized)) -> Result<()> { - let len = self.reader.read_until(b'\n', &mut self.buffer).await?; - assert_eq!(std::str::from_utf8(&self.buffer[0..len])?, str.as_ref()); - self.buffer.clear(); - Ok(()) - } - - async fn assert(&mut self, f: impl FnOnce(&str) -> Result<()>) -> Result<()> { - let len = self.reader.read_until(b'\n', &mut self.buffer).await?; - let res = f(std::str::from_utf8(&self.buffer[0..len])?); - self.buffer.clear(); - res - } -} - -async fn init_client(stream: &mut TcpStream) -> Result { - let (reader, writer) = stream.split(); - let reader = BufReader::new(reader); - let buffer = vec![]; - Ok(TestScope { - reader, - writer, - buffer, - }) -} - -macro_rules! send { - ($scope: expr, $($arg:tt),*) => {{ - $scope.send(&format!($($arg,)*)).await?; - }}; -} - -macro_rules! expect { - ($scope: expr, $($arg:tt),*) => {{ - $scope.expect(&format!($($arg,)*)).await?; - }}; -} - -const SERVER_HOSTNAME: &'static str = "irc.localhost"; - -#[rustfmt::skip] -async fn registration(scope: &mut TestScope<'_>, nickname: &str) -> Result<()> { - expect!(scope, ":irc.localhost NOTICE * :Welcome to my server!\r\n"); - send!(scope, "NICK {nickname}\r\n"); - send!(scope, "USER UserName 0 * :Real Name\r\n"); - expect!(scope, ":irc.localhost 001 {nickname} :Welcome to Kek Server\r\n"); - expect!(scope, ":irc.localhost 002 {nickname} :Welcome to Kek Server\r\n"); - expect!(scope, ":irc.localhost 003 {nickname} :Welcome to Kek Server\r\n"); - expect!(scope, ":irc.localhost 004 {nickname} irc.localhost kek-0.1.alpha.3 r CFILPQbcefgijklmnopqrstvz\r\n"); - expect!(scope, ":irc.localhost 005 {nickname} CHANTYPES=# :are supported by this server\r\n"); - Ok(()) -} - -#[rustfmt::skip] -async fn join(scope: &mut TestScope<'_>, nickname: &str, chan: &str) -> Result<()> { - send!(scope, "JOIN #{chan}\r\n"); - expect!(scope, ":{nickname} JOIN #{chan}\r\n"); - expect!(scope, ":irc.localhost 332 {nickname} #{chan} :New room\r\n"); - expect!(scope, ":irc.localhost 353 {nickname} = #{chan} :{nickname}\r\n"); - expect!(scope, ":irc.localhost 366 {nickname} #{chan} :End of /NAMES list\r\n"); - Ok(()) -} - -#[tokio::test] -async fn test_registration() -> Result<()> { - let mut stream = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope = init_client(&mut stream).await?; - - registration(&mut scope, "NickName1").await?; - join(&mut scope, "NickName1", "chan1").await?; - Ok(()) -} - -#[rustfmt::skip] -#[tokio::test] -async fn test_two_connections_one_player() -> Result<()> { - let mut stream1 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope1 = init_client(&mut stream1).await?; - let mut stream2 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope2 = init_client(&mut stream2).await?; - - let nickname = "NickName2"; - let chan = "chan2"; - - registration(&mut scope1, nickname).await?; - registration(&mut scope2, nickname).await?; - join(&mut scope1, nickname, chan).await?; - - // force join on second connection when the other one joins a room - expect!(scope2, ":{nickname} JOIN #{chan}\r\n"); - expect!(scope2, ":{SERVER_HOSTNAME} 332 {nickname} #{chan} :New room\r\n"); - expect!(scope2, ":{SERVER_HOSTNAME} 353 {nickname} = #{chan} :{nickname}\r\n"); - expect!(scope2, ":{SERVER_HOSTNAME} 366 {nickname} #{chan} :End of /NAMES list\r\n"); - - // force send PRIVMSG to other connections - send!(scope1, "PRIVMSG #{chan} :Chmoki vsem v etam chati!\r\n"); - expect!(scope2, ":{nickname} PRIVMSG #{chan} :Chmoki vsem v etam chati!\r\n"); - send!(scope2, "PRIVMSG #{chan} :I tebe privetiki\r\n"); - expect!(scope1, ":{nickname} PRIVMSG #{chan} :I tebe privetiki\r\n"); - - let mut stream3 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope3 = init_client(&mut stream3).await?; - registration(&mut scope3, nickname).await?; - - // force join on registration - expect!(scope3, ":{nickname} JOIN #{chan}\r\n"); - expect!(scope3, ":{SERVER_HOSTNAME} 332 {nickname} #{chan} :New room\r\n"); - expect!(scope3, ":{SERVER_HOSTNAME} 353 {nickname} = #{chan} :{nickname}\r\n"); - expect!(scope3, ":{SERVER_HOSTNAME} 366 {nickname} #{chan} :End of /NAMES list\r\n"); - - Ok(()) -} - -#[rustfmt::skip] -#[tokio::test] -async fn test_two_players() -> Result<()> { - let mut stream1 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope1 = init_client(&mut stream1).await?; - let mut stream2 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope2 = init_client(&mut stream2).await?; - - let nickname1 = "NickName3"; - let nickname2 = "NickName4"; - - let chan = "chan3"; - - registration(&mut scope1, "NickName3").await?; - registration(&mut scope2, "NickName4").await?; - join(&mut scope1, nickname1, "chan3").await?; - send!(scope2, "JOIN #{chan}\r\n"); - expect!(scope2, ":{nickname2} JOIN #{chan}\r\n"); - expect!(scope2, ":irc.localhost 332 {nickname2} #{chan} :New room\r\n"); - scope2.assert(|line| { - if line == format!(":irc.localhost 353 {nickname2} = #{chan} :{nickname1} {nickname2}\r\n") { return Ok(()) } - if line == format!(":irc.localhost 353 {nickname2} = #{chan} :{nickname2} {nickname1}\r\n") { return Ok(()) } - panic!("incorrect chan member list received: {line}"); - }).await?; - expect!(scope2, ":irc.localhost 366 {nickname2} #{chan} :End of /NAMES list\r\n"); - - expect!(scope1, ":{nickname2} JOIN #{chan}\r\n"); - - send!(scope1, "PRIVMSG #chan3 :Chmoki vsem v etam chati!\r\n"); - expect!(scope2, ":{nickname1} PRIVMSG #chan3 :Chmoki vsem v etam chati!\r\n"); - send!(scope2, "PRIVMSG #chan3 :I tebe privetiki\r\n"); - expect!(scope1, ":{nickname2} PRIVMSG #chan3 :I tebe privetiki\r\n"); - - Ok(()) -} diff --git a/src/projections/mod.rs b/src/projections/mod.rs deleted file mode 100644 index 333c971..0000000 --- a/src/projections/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Protocol projections — implementations of public APIs. -pub mod irc; -pub mod xmpp; diff --git a/src/protos/mod.rs b/src/protos/mod.rs deleted file mode 100644 index 25f58e2..0000000 --- a/src/protos/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Definitions of wire protocols to be used in implementations of projections. -pub mod irc; -pub mod xmpp; diff --git a/src/util/http.rs b/src/util/http.rs deleted file mode 100644 index 1f79e32..0000000 --- a/src/util/http.rs +++ /dev/null @@ -1,10 +0,0 @@ -use std::convert::Infallible; - -use http_body_util::Full; -use hyper::{body::Bytes, Response, StatusCode}; - -pub fn not_found() -> std::result::Result>, Infallible> { - let mut response = Response::new(Full::new(Bytes::from("404"))); - *response.status_mut() = StatusCode::NOT_FOUND; - Ok(response) -} diff --git a/src/util/telemetry.rs b/src/util/telemetry.rs deleted file mode 100644 index 1483734..0000000 --- a/src/util/telemetry.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::convert::Infallible; -use std::net::SocketAddr; - -use futures_util::FutureExt; -use http_body_util::{BodyExt, Full}; -use hyper::body::Bytes; -use hyper::server::conn::http1; -use hyper::service::service_fn; -use hyper::{Method, Request, Response}; -use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; -use serde::Deserialize; -use tokio::net::TcpListener; - -use crate::core::room::RoomRegistry; -use crate::prelude::*; - -use crate::util::http::*; - -type BoxBody = http_body_util::combinators::BoxBody; -type HttpResult = std::result::Result; - -#[derive(Deserialize, Debug)] -pub struct ServerConfig { - pub listen_on: SocketAddr, -} - -pub async fn launch<'a>( - config: ServerConfig, - metrics: &'a MetricsRegistry, - rooms: &'a RoomRegistry<'_>, - scope: &mut Scope<'a>, -) -> Result> { - log::info!("Starting the telemetry service"); - let listener = TcpListener::bind(config.listen_on).await?; - log::debug!("Listener started"); - - let (signal, mut rx) = oneshot(); - - let future = async move { - let mut scope = unsafe { Scope::create() }; - loop { - select! { - biased; - _ = &mut rx => break, - result = listener.accept() => { - let (stream, _) = result?; - scope.spawn(async move { - let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(metrics, rooms, r))); - if let Err(err) = server.await { - tracing::error!("Error serving connection: {:?}", err); - } - }); - }, - } - } - let _ = scope.collect().await; - drop(scope); - log::info!("Terminating the telemetry service"); - Ok(()) - }; - scope.spawn(future.map(|_: Result<()>| ())); - - Ok(signal) -} - -async fn route( - registry: &MetricsRegistry, - rooms: &RoomRegistry<'_>, - request: Request, -) -> std::result::Result, Infallible> { - match (request.method(), request.uri().path()) { - (&Method::GET, "/metrics") => Ok(endpoint_metrics(registry)?.map(BodyExt::boxed)), - (&Method::GET, "/rooms") => Ok(endpoint_rooms(rooms).await?.map(BodyExt::boxed)), - _ => Ok(not_found()?.map(BodyExt::boxed)), - } -} - -fn endpoint_metrics(registry: &MetricsRegistry) -> HttpResult>> { - let mf = registry.gather(); - let mut buffer = vec![]; - TextEncoder - .encode(&mf, &mut buffer) - .expect("write to vec cannot fail"); - Ok(Response::new(Full::new(Bytes::from(buffer)))) -} - -async fn endpoint_rooms(rooms: &RoomRegistry<'_>) -> HttpResult>> { - let room_list = rooms.get_all_rooms().await; - let mut buffer = vec![]; - serde_json::to_writer(&mut buffer, &room_list).expect("unexpected fail when writing to vec"); - Ok(Response::new(Full::new(Bytes::from(buffer)))) -} diff --git a/test/.gitignore b/test/.gitignore deleted file mode 100644 index 397b4a7..0000000 --- a/test/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.log diff --git a/test/init_state.sql b/test/init_state.sql deleted file mode 100644 index 8555332..0000000 --- a/test/init_state.sql +++ /dev/null @@ -1,6 +0,0 @@ -insert into users(name) -values ('kek'), ('shrek') -returning id; - -insert into challenges_plain_password(user_id, password) -values (1, 'parolchik1'), (2, 'qwerty123'); diff --git a/test/run.sh b/test/run.sh deleted file mode 100755 index f1162e2..0000000 --- a/test/run.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh -cargo build -./target/debug/lavina 1>test/lavina.stdout.log 2>test/lavina.stderr.log & -cargo test -kill %1