lavina/crates/projection-xmpp/src/lib.rs

480 lines
17 KiB
Rust
Raw Normal View History

2023-11-20 16:19:12 +00:00
#![feature(coroutines, coroutine_trait, type_alias_impl_trait, impl_trait_in_assoc_type)]
2023-03-11 17:36:38 +00:00
mod proto;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader as SyncBufReader;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::anyhow;
use futures_util::future::join_all;
use prometheus::Registry as MetricsRegistry;
use quick_xml::events::{BytesDecl, Event};
use quick_xml::{NsReader, Writer};
2023-08-04 22:38:56 +00:00
use rustls_pemfile::{certs, read_one, Item as PemItem};
use serde::Deserialize;
2023-03-06 11:05:33 +00:00
use tokio::io::{AsyncBufRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::channel;
use tokio_rustls::rustls::{Certificate, PrivateKey};
use tokio_rustls::TlsAcceptor;
use lavina_core::auth::{Authenticator, Verdict};
use lavina_core::player::{ConnectionMessage, PlayerConnection, PlayerId, PlayerRegistry, StopReason};
use lavina_core::prelude::*;
use lavina_core::repo::Storage;
use lavina_core::room::RoomRegistry;
use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
use proto_xmpp::bind::{Name, Resource};
use proto_xmpp::stream::*;
use proto_xmpp::streamerror::{StreamError, StreamErrorKind};
use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml};
2023-10-13 14:54:08 +00:00
use sasl::AuthBody;
use self::proto::ClientPacket;
mod iq;
mod message;
mod presence;
mod updates;
#[derive(Deserialize, Debug, Clone)]
pub struct ServerConfig {
pub listen_on: SocketAddr,
pub cert: PathBuf,
pub key: PathBuf,
pub hostname: Str,
}
struct LoadedConfig {
cert: Certificate,
key: PrivateKey,
}
struct Authenticated {
2024-04-07 12:06:23 +00:00
/// Identifier of the authenticated player.
///
/// Used when communicating with lavina-core on behalf of the player.
player_id: PlayerId,
2024-04-07 12:06:23 +00:00
/// The user's XMPP name.
///
/// Used in `to` and `from` fields of XMPP messages.
xmpp_name: Name,
2024-04-07 12:06:23 +00:00
/// The resource given to this user by the server.
xmpp_resource: Resource,
2024-04-07 12:06:23 +00:00
/// The resource used by this user when joining MUCs.
xmpp_muc_name: Resource,
}
pub struct RunningServer {
pub addr: SocketAddr,
terminator: Terminator,
}
impl RunningServer {
pub async fn terminate(self) -> Result<()> {
self.terminator.terminate().await
}
}
pub async fn launch(
config: ServerConfig,
core: LavinaCore,
metrics: MetricsRegistry,
storage: Storage,
) -> Result<RunningServer> {
log::info!("Starting XMPP projection");
let certs = certs(&mut SyncBufReader::new(File::open(config.cert)?))?;
let certs = certs.into_iter().map(Certificate).collect::<Vec<_>>();
2023-08-04 22:38:56 +00:00
let key = match read_one(&mut SyncBufReader::new(File::open(config.key)?))? {
Some(PemItem::ECKey(k) | PemItem::PKCS8Key(k) | PemItem::RSAKey(k)) => PrivateKey(k),
_ => return Err(fail("no keys in file")),
2023-08-04 22:38:56 +00:00
};
let loaded_config = Arc::new(LoadedConfig {
cert: certs.into_iter().next().expect("no certs in file"),
2023-08-04 22:38:56 +00:00
key,
});
let listener = TcpListener::bind(config.listen_on).await?;
let addr = listener.local_addr()?;
let terminator = Terminator::spawn(|mut termination| async move {
let (stopped_tx, mut stopped_rx) = channel(32);
let mut actors = HashMap::new();
loop {
select! {
biased;
_ = &mut termination => break,
stopped = stopped_rx.recv() => match stopped {
Some(stopped) => { let _ = actors.remove(&stopped); },
None => unreachable!(),
},
new_conn = listener.accept() => {
match new_conn {
Ok((stream, socket_addr)) => {
log::debug!("Incoming connection from {socket_addr}");
if actors.contains_key(&socket_addr) {
log::warn!("Already contains connection form {socket_addr}");
// TODO kill the older connection and restart it
continue;
}
let core = core.clone();
let storage = storage.clone();
let hostname = config.hostname.clone();
let terminator = Terminator::spawn(|termination| {
let stopped_tx = stopped_tx.clone();
let loaded_config = loaded_config.clone();
async move {
match handle_socket(loaded_config, stream, &socket_addr, core, storage, hostname, termination).await {
Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"),
}
stopped_tx.send(socket_addr).await?;
Ok(())
}
});
actors.insert(socket_addr, terminator);
},
Err(err) => log::warn!("Failed to accept new connection: {err}"),
}
},
}
}
log::info!("Stopping XMPP projection");
join_all(actors.into_iter().map(|(socket_addr, terminator)| async move {
log::debug!("Stopping XMPP connection at {socket_addr}");
match terminator.terminate().await {
Ok(_) => log::debug!("Stopped XMPP connection at {socket_addr}"),
Err(err) => {
log::warn!("XMPP connection to {socket_addr} finished with error: {err}")
}
}
}))
.await;
log::info!("Stopped XMPP projection");
Ok(())
});
log::info!("Started XMPP projection");
Ok(RunningServer { addr, terminator })
}
async fn handle_socket(
cert_config: Arc<LoadedConfig>,
mut stream: TcpStream,
socket_addr: &SocketAddr,
mut core: LavinaCore,
mut storage: Storage,
hostname: Str,
termination: Deferred<()>, // TODO use it to stop the connection gracefully
) -> Result<()> {
2023-09-22 23:12:03 +00:00
log::info!("Received an XMPP connection from {socket_addr}");
let mut reader_buf = vec![];
let (reader, writer) = stream.split();
let mut buf_reader = BufReader::new(reader);
let mut buf_writer = BufWriter::new(writer);
socket_force_tls(&mut buf_reader, &mut buf_writer, &mut reader_buf, &hostname).await?;
let mut config = tokio_rustls::rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(vec![cert_config.cert.clone()], cert_config.key.clone())?;
config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new());
log::debug!("Accepting TLS connection...");
let acceptor = TlsAcceptor::from(Arc::new(config));
let new_stream = acceptor.accept(stream).await?;
log::debug!("TLS connection established");
let (a, b) = tokio::io::split(new_stream);
2023-03-06 11:05:33 +00:00
let mut xml_reader = NsReader::from_reader(BufReader::new(a));
let mut xml_writer = Writer::new(BufWriter::new(b));
pin!(termination);
select! {
biased;
_ = &mut termination =>{
log::info!("Socket handling was terminated");
return Ok(())
},
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => {
match authenticated {
Ok(authenticated) => {
let mut connection = core.players.connect_to_player(&authenticated.player_id).await;
socket_final(
&mut xml_reader,
&mut xml_writer,
&mut reader_buf,
&authenticated,
&mut connection,
&core.rooms,
&hostname,
)
.await?;
},
Err(err) => {
log::error!("Authentication error: {:?}", err);
}
}
},
}
let a = xml_reader.into_inner().into_inner();
let b = xml_writer.into_inner().into_inner();
a.unsplit(b).shutdown().await?;
Ok(())
}
2023-03-06 11:05:33 +00:00
async fn socket_force_tls(
reader: &mut (impl AsyncBufRead + Unpin),
writer: &mut (impl AsyncWrite + Unpin),
reader_buf: &mut Vec<u8>,
hostname: &Str,
2023-03-06 11:05:33 +00:00
) -> Result<()> {
use proto_xmpp::tls::*;
let xml_reader = &mut NsReader::from_reader(reader);
let xml_writer = &mut Writer::new(writer);
// TODO validate the server hostname received in the stream start
2023-03-06 11:05:33 +00:00
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
let event = Event::Decl(BytesDecl::new("1.0", None, None));
xml_writer.write_event_async(event).await?;
let msg = ServerStreamStart {
from: hostname.to_string(),
2023-03-06 11:05:33 +00:00
lang: "en".into(),
2023-04-05 12:31:44 +00:00
id: uuid::Uuid::new_v4().to_string(),
2023-03-06 11:05:33 +00:00
version: "1.0".into(),
};
msg.write_xml(xml_writer).await?;
let msg = Features {
start_tls: true,
mechanisms: false,
bind: false,
};
msg.write_xml(xml_writer).await?;
xml_writer.get_mut().flush().await?;
let StartTLS = StartTLS::parse(xml_reader, reader_buf).await?;
ProceedTLS.write_xml(xml_writer).await?;
2023-03-06 11:05:33 +00:00
xml_writer.get_mut().flush().await?;
Ok(())
}
async fn socket_auth(
xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>,
xml_writer: &mut Writer<(impl AsyncWrite + Unpin)>,
reader_buf: &mut Vec<u8>,
storage: &mut Storage,
hostname: &Str,
) -> Result<Authenticated> {
// TODO validate the server hostname received in the stream start
2023-03-06 11:05:33 +00:00
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
2023-10-13 14:54:08 +00:00
xml_writer.write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))).await?;
2023-03-06 11:05:33 +00:00
ServerStreamStart {
from: hostname.to_string(),
2023-03-06 11:05:33 +00:00
lang: "en".into(),
2023-04-05 12:31:44 +00:00
id: uuid::Uuid::new_v4().to_string(),
2023-03-06 11:05:33 +00:00
version: "1.0".into(),
}
.write_xml(xml_writer)
.await?;
Features {
start_tls: false,
mechanisms: true,
bind: false,
}
.write_xml(xml_writer)
.await?;
xml_writer.get_mut().flush().await?;
let auth: proto_xmpp::sasl::Auth = proto_xmpp::sasl::Auth::parse(xml_reader, reader_buf).await?;
2023-04-13 22:38:26 +00:00
match AuthBody::from_str(&auth.body) {
Ok(logopass) => {
let name = &logopass.login;
let verdict = Authenticator::new(storage).authenticate(name, &logopass.password).await?;
match verdict {
Verdict::Authenticated => {
proto_xmpp::sasl::Success.write_xml(xml_writer).await?;
xml_writer.get_mut().flush().await?;
}
Verdict::UserNotFound | Verdict::InvalidPassword => {
proto_xmpp::sasl::Failure.write_xml(xml_writer).await?;
xml_writer.get_mut().flush().await?;
return Err(anyhow!("incorrect credentials"));
}
}
2024-04-07 12:06:23 +00:00
let name: Str = name.as_str().into();
Ok(Authenticated {
2024-04-07 12:06:23 +00:00
player_id: PlayerId::from(name.clone())?,
xmpp_name: Name(name.clone()),
xmpp_resource: Resource(name.clone()),
xmpp_muc_name: Resource(name.clone()),
})
}
Err(e) => return Err(e),
}
2023-03-06 11:05:33 +00:00
}
async fn socket_final(
xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>,
xml_writer: &mut Writer<(impl AsyncWrite + Unpin)>,
reader_buf: &mut Vec<u8>,
authenticated: &Authenticated,
user_handle: &mut PlayerConnection,
2023-04-11 16:28:03 +00:00
rooms: &RoomRegistry,
hostname: &Str,
2023-03-06 11:05:33 +00:00
) -> Result<()> {
// TODO validate the server hostname received in the stream start
2023-03-06 11:05:33 +00:00
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
2023-10-13 14:54:08 +00:00
xml_writer.write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))).await?;
2023-03-06 11:05:33 +00:00
ServerStreamStart {
from: hostname.to_string(),
2023-03-06 11:05:33 +00:00
lang: "en".into(),
2023-04-05 12:31:44 +00:00
id: uuid::Uuid::new_v4().to_string(),
2023-03-06 11:05:33 +00:00
version: "1.0".into(),
}
.write_xml(xml_writer)
.await?;
Features {
start_tls: false,
mechanisms: false,
bind: true,
}
.write_xml(xml_writer)
.await?;
xml_writer.get_mut().flush().await?;
let mut parser = proto::ClientPacket::parse();
let mut events = vec![];
reader_buf.clear();
let mut next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf));
'outer: loop {
let mut conn = XmppConnection {
user: authenticated,
user_handle,
rooms,
hostname: hostname.clone(),
hostname_rooms: format!("rooms.{}", hostname).into(),
};
let should_recreate_xml_future = select! {
biased;
res = &mut next_xml_event => 's: {
let (ns, event) = res?;
if let Event::Text(ref e) = event {
if e.iter().all(|x| *x == b'\n' || *x == b' ') {
break 's true;
}
}
match parser.consume(ns, &event) {
Continuation::Final(res) => {
let res = res?;
let stop = conn.handle_packet(&mut events, res).await?;
for i in &events {
xml_writer.write_event_async(i).await?;
}
events.clear();
xml_writer.get_mut().flush().await?;
if stop {
break 'outer;
}
parser = proto::ClientPacket::parse();
}
Continuation::Continue(p) => parser = p,
}
true
},
update = conn.user_handle.receiver.recv() => {
match update {
Some(ConnectionMessage::Update(update)) => {
conn.handle_update(&mut events, update).await?;
for i in &events {
xml_writer.write_event_async(i).await?;
}
events.clear();
xml_writer.get_mut().flush().await?;
}
Some(ConnectionMessage::Stop(reason)) => {
tracing::debug!("Connection is being terminated: {reason:?}");
let kind = match reason {
StopReason::ServerShutdown => StreamErrorKind::SystemShutdown,
StopReason::InternalError => StreamErrorKind::InternalServerError,
};
StreamError { kind }.serialize(&mut events);
ServerStreamEnd.serialize(&mut events);
for i in &events {
xml_writer.write_event_async(i).await?;
}
events.clear();
xml_writer.get_mut().flush().await?;
break;
}
None => {
log::error!("Player is terminated, must terminate the connection");
StreamError { kind: StreamErrorKind::SystemShutdown }.serialize(&mut events);
ServerStreamEnd.serialize(&mut events);
for i in &events {
xml_writer.write_event_async(i).await?;
}
events.clear();
xml_writer.get_mut().flush().await?;
break;
}
}
false
}
};
if should_recreate_xml_future {
drop(next_xml_event);
next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf));
}
}
Ok(())
}
struct XmppConnection<'a> {
user: &'a Authenticated,
user_handle: &'a mut PlayerConnection,
rooms: &'a RoomRegistry,
hostname: Str,
hostname_rooms: Str,
}
impl<'a> XmppConnection<'a> {
2024-04-27 10:58:27 +00:00
#[tracing::instrument(skip(self, output, packet), name = "XmppConnection::handle_packet")]
async fn handle_packet(&mut self, output: &mut Vec<Event<'static>>, packet: ClientPacket) -> Result<bool> {
let res = match packet {
ClientPacket::Iq(iq) => {
self.handle_iq(output, iq).await;
false
}
ClientPacket::Message(m) => {
self.handle_message(output, m).await?;
false
}
ClientPacket::Presence(p) => {
self.handle_presence(output, p).await?;
false
}
ClientPacket::StreamEnd => {
ServerStreamEnd.serialize(output);
true
}
ClientPacket::Eos => true,
};
Ok(res)
}
2023-03-30 12:31:20 +00:00
}