diff --git a/Cargo.lock b/Cargo.lock index c358b02..8f58a48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,17 +23,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-trait" -version = "0.1.64" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atomic" version = "0.5.1" @@ -94,6 +83,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "cc" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" + [[package]] name = "cfg-if" version = "1.0.0" @@ -436,25 +431,19 @@ dependencies = [ "hyper 1.0.0-rc.2", "nom", "prometheus", + "quick-xml", "regex", "reqwest", + "rustls-pemfile", "serde", "serde_json", "tokio", + "tokio-rustls", "tokio-tungstenite", "tracing", "tracing-subscriber", ] -[[package]] -name = "lavina_proto" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-trait", - "serde", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -664,6 +653,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "quick-xml" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc053f057dd768a56f62cd7e434c42c831d296968997e9ac1f76ea7c2d14c41" +dependencies = [ + "memchr", + "tokio", +] + [[package]] name = "quote" version = "1.0.23" @@ -763,6 +762,42 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" +dependencies = [ + "base64 0.21.0", +] + [[package]] name = "ryu" version = "1.0.12" @@ -775,6 +810,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "serde" version = "1.0.152" @@ -872,6 +917,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "syn" version = "1.0.107" @@ -959,6 +1010,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "tokio-tungstenite" version = "0.18.0" @@ -1119,6 +1181,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.3.1" @@ -1240,6 +1308,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 7f7253a..9efe7dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,3 @@ -[workspace] -members = [ - "crates/*" -] - [package] name = "lavina" version = "0.1.0" @@ -23,6 +18,9 @@ futures-util = "0.3.25" prometheus = { version = "0.13.3", default-features = false } regex = "1.7.1" nom = "7.1.3" +tokio-rustls = "0.23.4" +rustls-pemfile = "1.0.2" +quick-xml = { version = "0.27.1", features = ["async-tokio"] } [dev-dependencies] assert_matches = "1.5.0" diff --git a/certs/.gitignore b/certs/.gitignore new file mode 100644 index 0000000..f71bcc5 --- /dev/null +++ b/certs/.gitignore @@ -0,0 +1,2 @@ +*.pem +*.key diff --git a/config.toml b/config.toml index d567a23..112dd00 100644 --- a/config.toml +++ b/config.toml @@ -3,4 +3,9 @@ listen_on = "127.0.0.1:8080" [irc] listen_on = "127.0.0.1:6667" -server_name = "irc.localhost" \ No newline at end of file +server_name = "irc.localhost" + +[xmpp] +listen_on = "127.0.0.1:5222" +cert = "./certs/xmpp.pem" +key = "./certs/xmpp.key" diff --git a/crates/proto/Cargo.toml b/crates/proto/Cargo.toml deleted file mode 100644 index 528c796..0000000 --- a/crates/proto/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "lavina_proto" -version = "0.1.0" -edition = "2021" - -[dependencies] -anyhow = "1.0.68" -async-trait = "0.1.63" -serde = { version = "1.0.152", features = ["serde_derive"] } diff --git a/crates/proto/src/lib.rs b/crates/proto/src/lib.rs deleted file mode 100644 index b47b3c7..0000000 --- a/crates/proto/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod prelude; -pub mod well_known; diff --git a/crates/proto/src/prelude.rs b/crates/proto/src/prelude.rs deleted file mode 100644 index af48d0d..0000000 --- a/crates/proto/src/prelude.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub type Result = std::result::Result; -pub use async_trait::async_trait; -pub use serde::{Deserialize, Serialize}; diff --git a/crates/proto/src/well_known.rs b/crates/proto/src/well_known.rs deleted file mode 100644 index f3f1a84..0000000 --- a/crates/proto/src/well_known.rs +++ /dev/null @@ -1,13 +0,0 @@ -use crate::prelude::*; - -#[derive(Serialize, Deserialize)] -pub struct ServerInfo { - pub name: String, - pub user_api_base_url: String, - pub fedi_api_base_url: String, -} - -#[async_trait] -pub trait WellKnown { - async fn well_known(&self) -> Result; -} diff --git a/docs/cheatsheet.md b/docs/cheatsheet.md new file mode 100644 index 0000000..b7b3855 --- /dev/null +++ b/docs/cheatsheet.md @@ -0,0 +1,20 @@ +# Cheatsheet + +Some useful commands for development and testing. + + + +## Certificates + +Following commands require `OpenSSL` to be installed. It is provided as `openssl` package in Arch Linux. + +Generate self-signed TLS certificate: + + openssl req -x509 -newkey rsa:4096 -sha256 -days 365 -noenc \ + -keyout certs/xmpp.key -out certs/xmpp.pem \ + -subj "/CN=example.com" + +Print content of a TLS certificate: + + openssl x509 -in certs/xmpp.pem -text + diff --git a/src/main.rs b/src/main.rs index acb458a..03e8e8d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,7 @@ use crate::prelude::*; struct ServerConfig { telemetry: util::telemetry::ServerConfig, irc: projections::irc::ServerConfig, + xmpp: projections::xmpp::ServerConfig, } fn load_config() -> Result { @@ -38,18 +39,21 @@ async fn main() -> Result<()> { let ServerConfig { telemetry: telemetry_config, irc: irc_config, + xmpp: xmpp_config, } = config; let mut metrics = MetricsRegistry::new(); let rooms = RoomRegistry::empty(&mut metrics)?; let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?; - let irc = projections::irc::launch(irc_config, players, rooms.clone(), metrics.clone()).await?; + let irc = projections::irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone()).await?; + let xmpp = projections::xmpp::launch(xmpp_config, players, rooms.clone(), metrics.clone()).await?; tracing::info!("Started"); sleep.await; tracing::info!("Begin shutdown"); + xmpp.terminate().await?; irc.terminate().await?; telemetry_terminator.terminate().await?; tracing::info!("Shutdown complete"); diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index 64a6063..c392269 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::net::SocketAddr; +use futures_util::future::join_all; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use serde::Deserialize; use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; @@ -43,7 +44,7 @@ async fn handle_socket( socket_addr: &SocketAddr, players: PlayerRegistry, rooms: RoomRegistry, - terminator: Deferred<()>, // TODO use it to stop the connection gracefully + termination: Deferred<()>, // TODO use it to stop the connection gracefully ) -> Result<()> { let (reader, writer) = stream.split(); let mut reader: BufReader = BufReader::new(reader); @@ -702,13 +703,13 @@ pub async fn launch( continue; } - let terminator = Terminator::spawn(|deferred| { + let terminator = Terminator::spawn(|termination| { let players = players.clone(); let rooms = rooms.clone(); let current_connections_clone = current_connections.clone(); let stopped_tx = stopped_tx.clone(); async move { - match handle_socket(config, stream, &socket_addr, players, rooms, deferred).await { + match handle_socket(config, stream, &socket_addr, players, rooms, termination).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } @@ -726,7 +727,7 @@ pub async fn launch( } log::info!("Stopping IRC projection"); - for (socket_addr, terminator) in actors { + join_all(actors.into_iter().map(|(socket_addr, terminator)| async move { log::debug!("Stopping IRC connection at {socket_addr}"); match terminator.terminate().await { Ok(_) => log::debug!("Stopped IRC connection at {socket_addr}"), @@ -734,7 +735,8 @@ pub async fn launch( log::warn!("IRC connection to {socket_addr} finished with error: {err}") } } - } + })).await; + log::info!("Stopped IRC projection"); Ok(()) }); diff --git a/src/projections/mod.rs b/src/projections/mod.rs index b53a1b5..333c971 100644 --- a/src/projections/mod.rs +++ b/src/projections/mod.rs @@ -1,2 +1,3 @@ //! Protocol projections — implementations of public APIs. pub mod irc; +pub mod xmpp; diff --git a/src/projections/xmpp/mod.rs b/src/projections/xmpp/mod.rs new file mode 100644 index 0000000..e4ee8dc --- /dev/null +++ b/src/projections/xmpp/mod.rs @@ -0,0 +1,170 @@ +use std::collections::HashMap; +use std::fs::File; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::io::BufReader as SyncBufReader; +use std::sync::Arc; + +use futures_util::future::join_all; +use prometheus::Registry as MetricsRegistry; +use rustls_pemfile::{certs, rsa_private_keys}; +use serde::Deserialize; +use tokio::io::{AsyncWriteExt, AsyncReadExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::mpsc::channel; +use tokio_rustls::TlsAcceptor; +use tokio_rustls::rustls::{Certificate, PrivateKey}; + +use crate::core::player::PlayerRegistry; +use crate::core::room::RoomRegistry; +use crate::prelude::*; +use crate::util::Terminator; + +#[derive(Deserialize, Debug, Clone)] +pub struct ServerConfig { + pub listen_on: SocketAddr, + pub cert: PathBuf, + pub key: PathBuf, +} + +struct LoadedConfig { + cert: Certificate, + key: PrivateKey, +} + +pub async fn launch( + config: ServerConfig, + players: PlayerRegistry, + rooms: RoomRegistry, + metrics: MetricsRegistry, +) -> Result { + log::info!("Starting XMPP projection"); + + let certs = certs(&mut SyncBufReader::new(File::open(config.cert)?))?; + let certs = certs.into_iter().map(Certificate).collect::>(); + + let keys = rsa_private_keys(&mut SyncBufReader::new(File::open(config.key)?))?; + let keys = keys.into_iter().map(PrivateKey).collect::>(); + + let loaded_config = Arc::new(LoadedConfig { + cert: certs.into_iter().next().expect("no certs in file"), + key: keys.into_iter().next().expect("no keys in file"), + }); + + let listener = TcpListener::bind(config.listen_on).await?; + let terminator = Terminator::spawn(|mut termination| async move { + let (stopped_tx, mut stopped_rx) = channel(32); + let mut actors = HashMap::new(); + loop { + select! { + biased; + _ = &mut termination => break, + stopped = stopped_rx.recv() => match stopped { + Some(stopped) => { let _ = actors.remove(&stopped); }, + None => unreachable!(), + }, + new_conn = listener.accept() => { + match new_conn { + Ok((stream, socket_addr)) => { + log::debug!("Incoming connection from {socket_addr}"); + if actors.contains_key(&socket_addr) { + log::warn!("Already contains connection form {socket_addr}"); + // TODO kill the older connection and restart it + continue; + } + let players = players.clone(); + let rooms = rooms.clone(); + let terminator = Terminator::spawn(|termination| { + let stopped_tx = stopped_tx.clone(); + let loaded_config = loaded_config.clone(); + async move { + match handle_socket(loaded_config, stream, &socket_addr, players, rooms, termination).await { + Ok(_) => log::info!("Connection terminated"), + Err(err) => log::warn!("Connection failed: {err}"), + } + stopped_tx.send(socket_addr).await?; + Ok(()) + } + }); + + actors.insert(socket_addr, terminator); + }, + Err(err) => log::warn!("Failed to accept new connection: {err}"), + } + }, + } + } + log::info!("Stopping XMPP projection"); + join_all(actors.into_iter().map(|(socket_addr, terminator)| async move { + log::debug!("Stopping XMPP connection at {socket_addr}"); + match terminator.terminate().await { + Ok(_) => log::debug!("Stopped XMPP connection at {socket_addr}"), + Err(err) => { + log::warn!("XMPP connection to {socket_addr} finished with error: {err}") + } + } + })).await; + log::info!("Stopped XMPP projection"); + Ok(()) + }); + log::info!("Started XMPP projection"); + Ok(terminator) +} + +async fn handle_socket( + config: Arc, + mut stream: TcpStream, + socket_addr: &SocketAddr, + players: PlayerRegistry, + rooms: RoomRegistry, + termination: Deferred<()>, // TODO use it to stop the connection gracefully +) -> Result<()> { + log::debug!("Received an XMPP connection from {socket_addr}"); + // writer.write_all(b"Hi!\n").await?; + let mut buf = [0; 1024]; + stream.write_all(br###" + + + + + + + "###).await?; + { + let i = stream.read(&mut buf).await?; + match std::str::from_utf8(&buf[0..i]) { + Ok(e) => println!("{} END", e), + Err(_) => println!("{:?} END", &buf[0..i]), + } + stream.write_all(br###""###).await?; + } + + let config = tokio_rustls::rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(vec![config.cert.clone()], config.key.clone())?; + + let i = stream.read(&mut buf).await?; + + match std::str::from_utf8(&buf[0..i]) { + Ok(e) => println!("{} END", e), + Err(_) => println!("{:?} END", &buf[0..i]), + } + + let acceptor = TlsAcceptor::from(Arc::new(config)); + let mut new_stream = acceptor.accept(stream).await?; + log::debug!("TLS connection established"); + + + + loop { + let i = new_stream.read(&mut buf).await?; + if i == 0 { break; } + match std::str::from_utf8(&buf[0..i]) { + Ok(e) => println!("{} END", e), + Err(_) => println!("{:?} END", &buf[0..i]), + } + } + new_stream.shutdown().await?; + Ok(()) +} diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 410866f..25f58e2 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -1,2 +1,3 @@ //! Definitions of wire protocols to be used in implementations of projections. pub mod irc; +pub mod xmpp; diff --git a/src/protos/xmpp/mod.rs b/src/protos/xmpp/mod.rs new file mode 100644 index 0000000..baf29e0 --- /dev/null +++ b/src/protos/xmpp/mod.rs @@ -0,0 +1 @@ +pub mod stream; diff --git a/src/protos/xmpp/stream.rs b/src/protos/xmpp/stream.rs new file mode 100644 index 0000000..98a12bf --- /dev/null +++ b/src/protos/xmpp/stream.rs @@ -0,0 +1,65 @@ +use quick_xml::name::{ResolveResult, Namespace, LocalName, QName}; +use quick_xml::{Result, NsReader}; +use quick_xml::events::Event; +use tokio::io::AsyncBufRead; + +pub static XMLNS: &'static str = "http://etherx.jabber.org/streams"; + +#[derive(Debug, PartialEq, Eq)] +pub struct ClientStreamStart { + pub to: String, + pub lang: String, + pub version: String, +} +impl ClientStreamStart { + pub async fn parse(reader: &mut NsReader, buf: &mut Vec) -> Result { + if let Event::Start(e) = reader.read_event_into_async(buf).await? { + let (ns, local) = reader.resolve_element(e.name()); + if ns != ResolveResult::Bound(Namespace(XMLNS.as_bytes())) { + return Err(panic!()); + } + if local.into_inner() != b"stream" { + return Err(panic!()); + } + let mut to = None; + let mut lang = None; + let mut version = None; + for attr in e.attributes() { + let attr = attr?; + let (ns, name) = reader.resolve_attribute(attr.key); + match (ns, name.into_inner()) { + (ResolveResult::Unbound, b"to") => { + let value = attr.unescape_value()?; + to = Some(value.to_string()); + }, + (ResolveResult::Bound(Namespace(b"http://www.w3.org/XML/1998/namespace")), b"lang") => { + let value = attr.unescape_value()?; + lang = Some(value.to_string()); + }, + (ResolveResult::Unbound, b"version") => { + let value = attr.unescape_value()?; + version = Some(value.to_string()); + }, + _ => {}, + } + } + Ok(ClientStreamStart { to: to.unwrap(), lang: lang.unwrap(), version: version.unwrap() }) + } else { + Err(panic!()) + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn stream_start_correct() { + let input = r###""###; + let mut reader = NsReader::from_reader(input.as_bytes()); + let mut buf = vec![]; + let res = ClientStreamStart::parse(&mut reader, &mut buf).await.unwrap(); + assert_eq!(res, ClientStreamStart { to: "vlnv.dev".to_owned(), lang: "en".to_owned(), version: "1.0".to_owned()}) + } +} \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs index cdf334d..fc6553f 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -14,7 +14,7 @@ impl Terminator { pub async fn terminate(self) -> Result<()> { match self.signal.send(()) { Ok(()) => {} - Err(_) => log::error!("Termination channel is dropped"), + Err(_) => log::warn!("Termination channel is dropped"), } self.completion.await??; Ok(())