feat(xmpp): placeholder for xmpp projection and example of xml

This commit is contained in:
Nikita Vilunov 2023-02-17 22:27:58 +01:00
parent 0adc19558d
commit 494ddc7ee1
17 changed files with 380 additions and 60 deletions

118
Cargo.lock generated
View File

@ -23,17 +23,6 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-trait"
version = "0.1.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atomic"
version = "0.5.1"
@ -94,6 +83,12 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cc"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -436,25 +431,19 @@ dependencies = [
"hyper 1.0.0-rc.2",
"nom",
"prometheus",
"quick-xml",
"regex",
"reqwest",
"rustls-pemfile",
"serde",
"serde_json",
"tokio",
"tokio-rustls",
"tokio-tungstenite",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "lavina_proto"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"serde",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -664,6 +653,16 @@ dependencies = [
"thiserror",
]
[[package]]
name = "quick-xml"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffc053f057dd768a56f62cd7e434c42c831d296968997e9ac1f76ea7c2d14c41"
dependencies = [
"memchr",
"tokio",
]
[[package]]
name = "quote"
version = "1.0.23"
@ -763,6 +762,42 @@ dependencies = [
"winreg",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]]
name = "rustls"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.21.0",
]
[[package]]
name = "ryu"
version = "1.0.12"
@ -775,6 +810,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "serde"
version = "1.0.152"
@ -872,6 +917,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "syn"
version = "1.0.107"
@ -959,6 +1010,17 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "tokio-tungstenite"
version = "0.18.0"
@ -1119,6 +1181,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.3.1"
@ -1240,6 +1308,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@ -1,8 +1,3 @@
[workspace]
members = [
"crates/*"
]
[package]
name = "lavina"
version = "0.1.0"
@ -23,6 +18,9 @@ futures-util = "0.3.25"
prometheus = { version = "0.13.3", default-features = false }
regex = "1.7.1"
nom = "7.1.3"
tokio-rustls = "0.23.4"
rustls-pemfile = "1.0.2"
quick-xml = { version = "0.27.1", features = ["async-tokio"] }
[dev-dependencies]
assert_matches = "1.5.0"

2
certs/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*.pem
*.key

View File

@ -4,3 +4,8 @@ listen_on = "127.0.0.1:8080"
[irc]
listen_on = "127.0.0.1:6667"
server_name = "irc.localhost"
[xmpp]
listen_on = "127.0.0.1:5222"
cert = "./certs/xmpp.pem"
key = "./certs/xmpp.key"

View File

@ -1,9 +0,0 @@
[package]
name = "lavina_proto"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.68"
async-trait = "0.1.63"
serde = { version = "1.0.152", features = ["serde_derive"] }

View File

@ -1,2 +0,0 @@
mod prelude;
pub mod well_known;

View File

@ -1,3 +0,0 @@
pub type Result<T> = std::result::Result<T, anyhow::Error>;
pub use async_trait::async_trait;
pub use serde::{Deserialize, Serialize};

View File

@ -1,13 +0,0 @@
use crate::prelude::*;
#[derive(Serialize, Deserialize)]
pub struct ServerInfo {
pub name: String,
pub user_api_base_url: String,
pub fedi_api_base_url: String,
}
#[async_trait]
pub trait WellKnown {
async fn well_known(&self) -> Result<ServerInfo>;
}

20
docs/cheatsheet.md Normal file
View File

@ -0,0 +1,20 @@
# Cheatsheet
Some useful commands for development and testing.
<!-- please use spaces at line start to indicate shell cmds -->
## Certificates
Following commands require `OpenSSL` to be installed. It is provided as `openssl` package in Arch Linux.
Generate self-signed TLS certificate:
openssl req -x509 -newkey rsa:4096 -sha256 -days 365 -noenc \
-keyout certs/xmpp.key -out certs/xmpp.pem \
-subj "/CN=example.com"
Print content of a TLS certificate:
openssl x509 -in certs/xmpp.pem -text

View File

@ -19,6 +19,7 @@ use crate::prelude::*;
struct ServerConfig {
telemetry: util::telemetry::ServerConfig,
irc: projections::irc::ServerConfig,
xmpp: projections::xmpp::ServerConfig,
}
fn load_config() -> Result<ServerConfig> {
@ -38,18 +39,21 @@ async fn main() -> Result<()> {
let ServerConfig {
telemetry: telemetry_config,
irc: irc_config,
xmpp: xmpp_config,
} = config;
let mut metrics = MetricsRegistry::new();
let rooms = RoomRegistry::empty(&mut metrics)?;
let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?;
let telemetry_terminator =
util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?;
let irc = projections::irc::launch(irc_config, players, rooms.clone(), metrics.clone()).await?;
let irc = projections::irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone()).await?;
let xmpp = projections::xmpp::launch(xmpp_config, players, rooms.clone(), metrics.clone()).await?;
tracing::info!("Started");
sleep.await;
tracing::info!("Begin shutdown");
xmpp.terminate().await?;
irc.terminate().await?;
telemetry_terminator.terminate().await?;
tracing::info!("Shutdown complete");

View File

@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use futures_util::future::join_all;
use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry};
use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
@ -43,7 +44,7 @@ async fn handle_socket(
socket_addr: &SocketAddr,
players: PlayerRegistry,
rooms: RoomRegistry,
terminator: Deferred<()>, // TODO use it to stop the connection gracefully
termination: Deferred<()>, // TODO use it to stop the connection gracefully
) -> Result<()> {
let (reader, writer) = stream.split();
let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
@ -702,13 +703,13 @@ pub async fn launch(
continue;
}
let terminator = Terminator::spawn(|deferred| {
let terminator = Terminator::spawn(|termination| {
let players = players.clone();
let rooms = rooms.clone();
let current_connections_clone = current_connections.clone();
let stopped_tx = stopped_tx.clone();
async move {
match handle_socket(config, stream, &socket_addr, players, rooms, deferred).await {
match handle_socket(config, stream, &socket_addr, players, rooms, termination).await {
Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"),
}
@ -726,7 +727,7 @@ pub async fn launch(
}
log::info!("Stopping IRC projection");
for (socket_addr, terminator) in actors {
join_all(actors.into_iter().map(|(socket_addr, terminator)| async move {
log::debug!("Stopping IRC connection at {socket_addr}");
match terminator.terminate().await {
Ok(_) => log::debug!("Stopped IRC connection at {socket_addr}"),
@ -734,7 +735,8 @@ pub async fn launch(
log::warn!("IRC connection to {socket_addr} finished with error: {err}")
}
}
}
})).await;
log::info!("Stopped IRC projection");
Ok(())
});

View File

@ -1,2 +1,3 @@
//! Protocol projections — implementations of public APIs.
pub mod irc;
pub mod xmpp;

170
src/projections/xmpp/mod.rs Normal file
View File

@ -0,0 +1,170 @@
use std::collections::HashMap;
use std::fs::File;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::io::BufReader as SyncBufReader;
use std::sync::Arc;
use futures_util::future::join_all;
use prometheus::Registry as MetricsRegistry;
use rustls_pemfile::{certs, rsa_private_keys};
use serde::Deserialize;
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::channel;
use tokio_rustls::TlsAcceptor;
use tokio_rustls::rustls::{Certificate, PrivateKey};
use crate::core::player::PlayerRegistry;
use crate::core::room::RoomRegistry;
use crate::prelude::*;
use crate::util::Terminator;
#[derive(Deserialize, Debug, Clone)]
pub struct ServerConfig {
pub listen_on: SocketAddr,
pub cert: PathBuf,
pub key: PathBuf,
}
struct LoadedConfig {
cert: Certificate,
key: PrivateKey,
}
pub async fn launch(
config: ServerConfig,
players: PlayerRegistry,
rooms: RoomRegistry,
metrics: MetricsRegistry,
) -> Result<Terminator> {
log::info!("Starting XMPP projection");
let certs = certs(&mut SyncBufReader::new(File::open(config.cert)?))?;
let certs = certs.into_iter().map(Certificate).collect::<Vec<_>>();
let keys = rsa_private_keys(&mut SyncBufReader::new(File::open(config.key)?))?;
let keys = keys.into_iter().map(PrivateKey).collect::<Vec<_>>();
let loaded_config = Arc::new(LoadedConfig {
cert: certs.into_iter().next().expect("no certs in file"),
key: keys.into_iter().next().expect("no keys in file"),
});
let listener = TcpListener::bind(config.listen_on).await?;
let terminator = Terminator::spawn(|mut termination| async move {
let (stopped_tx, mut stopped_rx) = channel(32);
let mut actors = HashMap::new();
loop {
select! {
biased;
_ = &mut termination => break,
stopped = stopped_rx.recv() => match stopped {
Some(stopped) => { let _ = actors.remove(&stopped); },
None => unreachable!(),
},
new_conn = listener.accept() => {
match new_conn {
Ok((stream, socket_addr)) => {
log::debug!("Incoming connection from {socket_addr}");
if actors.contains_key(&socket_addr) {
log::warn!("Already contains connection form {socket_addr}");
// TODO kill the older connection and restart it
continue;
}
let players = players.clone();
let rooms = rooms.clone();
let terminator = Terminator::spawn(|termination| {
let stopped_tx = stopped_tx.clone();
let loaded_config = loaded_config.clone();
async move {
match handle_socket(loaded_config, stream, &socket_addr, players, rooms, termination).await {
Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"),
}
stopped_tx.send(socket_addr).await?;
Ok(())
}
});
actors.insert(socket_addr, terminator);
},
Err(err) => log::warn!("Failed to accept new connection: {err}"),
}
},
}
}
log::info!("Stopping XMPP projection");
join_all(actors.into_iter().map(|(socket_addr, terminator)| async move {
log::debug!("Stopping XMPP connection at {socket_addr}");
match terminator.terminate().await {
Ok(_) => log::debug!("Stopped XMPP connection at {socket_addr}"),
Err(err) => {
log::warn!("XMPP connection to {socket_addr} finished with error: {err}")
}
}
})).await;
log::info!("Stopped XMPP projection");
Ok(())
});
log::info!("Started XMPP projection");
Ok(terminator)
}
async fn handle_socket(
config: Arc<LoadedConfig>,
mut stream: TcpStream,
socket_addr: &SocketAddr,
players: PlayerRegistry,
rooms: RoomRegistry,
termination: Deferred<()>, // TODO use it to stop the connection gracefully
) -> Result<()> {
log::debug!("Received an XMPP connection from {socket_addr}");
// writer.write_all(b"Hi!\n").await?;
let mut buf = [0; 1024];
stream.write_all(br###"<?xml version='1.0'?>
<stream:stream id='11698431101746707873' version='1.0' xml:lang='en' xmlns:stream='http://etherx.jabber.org/streams' from='localhost' xmlns='jabber:client'>
<stream:features>
<starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls">
<required/>
</starttls>
</stream:features>
"###).await?;
{
let i = stream.read(&mut buf).await?;
match std::str::from_utf8(&buf[0..i]) {
Ok(e) => println!("{} END", e),
Err(_) => println!("{:?} END", &buf[0..i]),
}
stream.write_all(br###"<proceed xmlns="urn:ietf:params:xml:ns:xmpp-tls"/>"###).await?;
}
let config = tokio_rustls::rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(vec![config.cert.clone()], config.key.clone())?;
let i = stream.read(&mut buf).await?;
match std::str::from_utf8(&buf[0..i]) {
Ok(e) => println!("{} END", e),
Err(_) => println!("{:?} END", &buf[0..i]),
}
let acceptor = TlsAcceptor::from(Arc::new(config));
let mut new_stream = acceptor.accept(stream).await?;
log::debug!("TLS connection established");
loop {
let i = new_stream.read(&mut buf).await?;
if i == 0 { break; }
match std::str::from_utf8(&buf[0..i]) {
Ok(e) => println!("{} END", e),
Err(_) => println!("{:?} END", &buf[0..i]),
}
}
new_stream.shutdown().await?;
Ok(())
}

View File

@ -1,2 +1,3 @@
//! Definitions of wire protocols to be used in implementations of projections.
pub mod irc;
pub mod xmpp;

1
src/protos/xmpp/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod stream;

65
src/protos/xmpp/stream.rs Normal file
View File

@ -0,0 +1,65 @@
use quick_xml::name::{ResolveResult, Namespace, LocalName, QName};
use quick_xml::{Result, NsReader};
use quick_xml::events::Event;
use tokio::io::AsyncBufRead;
pub static XMLNS: &'static str = "http://etherx.jabber.org/streams";
#[derive(Debug, PartialEq, Eq)]
pub struct ClientStreamStart {
pub to: String,
pub lang: String,
pub version: String,
}
impl ClientStreamStart {
pub async fn parse(reader: &mut NsReader<impl AsyncBufRead + Unpin>, buf: &mut Vec<u8>) -> Result<ClientStreamStart> {
if let Event::Start(e) = reader.read_event_into_async(buf).await? {
let (ns, local) = reader.resolve_element(e.name());
if ns != ResolveResult::Bound(Namespace(XMLNS.as_bytes())) {
return Err(panic!());
}
if local.into_inner() != b"stream" {
return Err(panic!());
}
let mut to = None;
let mut lang = None;
let mut version = None;
for attr in e.attributes() {
let attr = attr?;
let (ns, name) = reader.resolve_attribute(attr.key);
match (ns, name.into_inner()) {
(ResolveResult::Unbound, b"to") => {
let value = attr.unescape_value()?;
to = Some(value.to_string());
},
(ResolveResult::Bound(Namespace(b"http://www.w3.org/XML/1998/namespace")), b"lang") => {
let value = attr.unescape_value()?;
lang = Some(value.to_string());
},
(ResolveResult::Unbound, b"version") => {
let value = attr.unescape_value()?;
version = Some(value.to_string());
},
_ => {},
}
}
Ok(ClientStreamStart { to: to.unwrap(), lang: lang.unwrap(), version: version.unwrap() })
} else {
Err(panic!())
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn stream_start_correct() {
let input = r###"<stream:stream xmlns:stream="http://etherx.jabber.org/streams" to="vlnv.dev" version="1.0" xmlns="jabber:client" xml:lang="en" xmlns:xml="http://www.w3.org/XML/1998/namespace">"###;
let mut reader = NsReader::from_reader(input.as_bytes());
let mut buf = vec![];
let res = ClientStreamStart::parse(&mut reader, &mut buf).await.unwrap();
assert_eq!(res, ClientStreamStart { to: "vlnv.dev".to_owned(), lang: "en".to_owned(), version: "1.0".to_owned()})
}
}

View File

@ -14,7 +14,7 @@ impl Terminator {
pub async fn terminate(self) -> Result<()> {
match self.signal.send(()) {
Ok(()) => {}
Err(_) => log::error!("Termination channel is dropped"),
Err(_) => log::warn!("Termination channel is dropped"),
}
self.completion.await??;
Ok(())