diff --git a/Cargo.lock b/Cargo.lock index 6459827..895cc5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -858,19 +858,15 @@ dependencies = [ "mgmt-api", "nonempty", "projection-irc", + "projection-xmpp", "prometheus", - "proto-xmpp", - "quick-xml", "regex", "reqwest", - "rustls-pemfile", "serde", "serde_json", "tokio", - "tokio-rustls", "tracing", "tracing-subscriber", - "uuid", ] [[package]] @@ -1267,6 +1263,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "projection-xmpp" +version = "0.0.1-dev" +dependencies = [ + "anyhow", + "derive_more", + "futures-util", + "lavina-core", + "prometheus", + "proto-xmpp", + "quick-xml", + "rustls-pemfile", + "serde", + "tokio", + "tokio-rustls", + "tracing", + "uuid", +] + [[package]] name = "prometheus" version = "0.13.3" diff --git a/Cargo.toml b/Cargo.toml index 1bf61d3..12df833 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,14 +47,10 @@ tracing-subscriber = "0.3.16" futures-util.workspace = true prometheus.workspace = true nonempty.workspace = true -tokio-rustls = "0.24.1" -rustls-pemfile = "1.0.2" -quick-xml.workspace = true derive_more.workspace = true -uuid = { version = "1.3.0", features = ["v4"] } lavina-core.workspace = true projection-irc = { path = "crates/projection-irc" } -proto-xmpp = { path = "crates/proto-xmpp" } +projection-xmpp = { path = "crates/projection-xmpp" } mgmt-api = { path = "crates/mgmt-api" } clap.workspace = true diff --git a/crates/projection-irc/Cargo.toml b/crates/projection-irc/Cargo.toml index c607327..e8166f0 100644 --- a/crates/projection-irc/Cargo.toml +++ b/crates/projection-irc/Cargo.toml @@ -7,9 +7,10 @@ version.workspace = true lavina-core.workspace = true tracing.workspace = true anyhow.workspace = true -nonempty.workspace = true serde.workspace = true tokio.workspace = true prometheus.workspace = true -proto-irc = { path = "../proto-irc" } futures-util.workspace = true + +nonempty.workspace = true +proto-irc = { path = "../proto-irc" } diff --git a/crates/projection-xmpp/Cargo.toml b/crates/projection-xmpp/Cargo.toml new file mode 100644 index 0000000..c69cb12 --- /dev/null +++ b/crates/projection-xmpp/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "projection-xmpp" +edition = "2021" +version.workspace = true + +[dependencies] +lavina-core.workspace = true +tracing.workspace = true +anyhow.workspace = true +serde.workspace = true +tokio.workspace = true +prometheus.workspace = true +futures-util.workspace = true + +quick-xml.workspace = true +proto-xmpp = { path = "../proto-xmpp" } +uuid = { version = "1.3.0", features = ["v4"] } +tokio-rustls = "0.24.1" +rustls-pemfile = "1.0.2" +derive_more.workspace = true diff --git a/src/projections/xmpp/mod.rs b/crates/projection-xmpp/src/lib.rs similarity index 94% rename from src/projections/xmpp/mod.rs rename to crates/projection-xmpp/src/lib.rs index 530385c..d55a09f 100644 --- a/src/projections/xmpp/mod.rs +++ b/crates/projection-xmpp/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(generators, generator_trait, type_alias_impl_trait, impl_trait_in_assoc_type)] + mod proto; use std::collections::HashMap; @@ -7,6 +9,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use anyhow::anyhow; use futures_util::future::join_all; use prometheus::Registry as MetricsRegistry; use quick_xml::events::{BytesDecl, Event}; @@ -117,21 +120,15 @@ pub async fn launch( } } log::info!("Stopping XMPP projection"); - join_all( - actors - .into_iter() - .map(|(socket_addr, terminator)| async move { - log::debug!("Stopping XMPP connection at {socket_addr}"); - match terminator.terminate().await { - Ok(_) => log::debug!("Stopped XMPP connection at {socket_addr}"), - Err(err) => { - log::warn!( - "XMPP connection to {socket_addr} finished with error: {err}" - ) - } - } - }), - ) + join_all(actors.into_iter().map(|(socket_addr, terminator)| async move { + log::debug!("Stopping XMPP connection at {socket_addr}"); + match terminator.terminate().await { + Ok(_) => log::debug!("Stopped XMPP connection at {socket_addr}"), + Err(err) => { + log::warn!("XMPP connection to {socket_addr} finished with error: {err}") + } + } + })) .await; log::info!("Stopped XMPP projection"); Ok(()) @@ -172,9 +169,7 @@ async fn handle_socket( let authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf).await?; log::debug!("User authenticated"); - let mut connection = players - .connect_to_player(authenticated.player_id.clone()) - .await; + let mut connection = players.connect_to_player(authenticated.player_id.clone()).await; socket_final( &mut xml_reader, &mut xml_writer, @@ -396,10 +391,7 @@ async fn handle_packet( { if server.0.as_ref() == "rooms.localhost" && m.r#type == MessageType::Groupchat { user_handle - .send_message( - RoomId::from(name.0.clone())?, - m.body.clone().into(), - ) + .send_message(RoomId::from(name.0.clone())?, m.body.clone().into()) .await?; Message { to: Some(Jid { @@ -448,9 +440,7 @@ async fn handle_packet( resource: Some(resource), }) = p.to { - let a = user_handle - .join_room(RoomId::from(name.0.clone())?) - .await?; + let a = user_handle.join_room(RoomId::from(name.0.clone())?).await?; Presence::<()> { to: Some(Jid { name: Some(user.xmpp_name.clone()), @@ -633,13 +623,13 @@ async fn read_xml_header( if &*encoding == b"UTF-8" { Ok(()) } else { - Err(fail(format!("Unsupported encoding: {encoding:?}").as_str())) + Err(anyhow!("Unsupported encoding: {encoding:?}")) } } else { // Err(fail("No XML encoding provided")) Ok(()) } } else { - Err(fail("Expected XML header")) + Err(anyhow!("Expected XML header")) } } diff --git a/src/projections/xmpp/proto.rs b/crates/projection-xmpp/src/proto.rs similarity index 100% rename from src/projections/xmpp/proto.rs rename to crates/projection-xmpp/src/proto.rs diff --git a/src/main.rs b/src/main.rs index 61ff315..a7eab22 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,3 @@ -#![feature( - generators, - generator_trait, - type_alias_impl_trait, - impl_trait_in_assoc_type -)] - -mod projections; mod util; use std::future::Future; @@ -26,7 +18,7 @@ use lavina_core::room::RoomRegistry; struct ServerConfig { telemetry: util::telemetry::ServerConfig, irc: projection_irc::ServerConfig, - xmpp: projections::xmpp::ServerConfig, + xmpp: projection_xmpp::ServerConfig, storage: lavina_core::repo::StorageConfig, } @@ -64,7 +56,7 @@ async fn main() -> Result<()> { let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; let irc = projection_irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; - let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; + let xmpp = projection_xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; tracing::info!("Started"); sleep.await; diff --git a/src/projections/mod.rs b/src/projections/mod.rs deleted file mode 100644 index 3d73ce5..0000000 --- a/src/projections/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -//! Protocol projections — implementations of public APIs. -pub mod xmpp;