#![feature(coroutines, coroutine_trait, type_alias_impl_trait, impl_trait_in_assoc_type)] mod proto; use std::collections::HashMap; use std::fs::File; use std::io::BufReader as SyncBufReader; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use anyhow::anyhow; use futures_util::future::join_all; use prometheus::Registry as MetricsRegistry; use quick_xml::events::{BytesDecl, Event}; use quick_xml::{NsReader, Writer}; use rustls_pemfile::{certs, read_one, Item as PemItem}; use serde::Deserialize; use tokio::io::{AsyncBufRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::channel; use tokio_rustls::rustls::{Certificate, PrivateKey}; use tokio_rustls::TlsAcceptor; use lavina_core::player::{PlayerConnection, PlayerId, PlayerRegistry}; use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::RoomRegistry; use lavina_core::terminator::Terminator; use proto_xmpp::bind::{Name, Resource}; use proto_xmpp::stream::*; use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; use sasl::AuthBody; use self::proto::ClientPacket; mod iq; mod message; mod presence; mod updates; #[derive(Deserialize, Debug, Clone)] pub struct ServerConfig { pub listen_on: SocketAddr, pub cert: PathBuf, pub key: PathBuf, } struct LoadedConfig { cert: Certificate, key: PrivateKey, } struct Authenticated { player_id: PlayerId, xmpp_name: Name, xmpp_resource: Resource, xmpp_muc_name: Resource, } pub struct RunningServer { pub addr: SocketAddr, terminator: Terminator, } impl RunningServer { pub async fn terminate(self) -> Result<()> { self.terminator.terminate().await } } pub async fn launch( config: ServerConfig, players: PlayerRegistry, rooms: RoomRegistry, metrics: MetricsRegistry, storage: Storage, ) -> Result { log::info!("Starting XMPP projection"); let certs = certs(&mut SyncBufReader::new(File::open(config.cert)?))?; let certs = certs.into_iter().map(Certificate).collect::>(); let key = match read_one(&mut SyncBufReader::new(File::open(config.key)?))? { Some(PemItem::ECKey(k) | PemItem::PKCS8Key(k) | PemItem::RSAKey(k)) => PrivateKey(k), _ => return Err(fail("no keys in file")), }; let loaded_config = Arc::new(LoadedConfig { cert: certs.into_iter().next().expect("no certs in file"), key, }); let listener = TcpListener::bind(config.listen_on).await?; let addr = listener.local_addr()?; let terminator = Terminator::spawn(|mut termination| async move { let (stopped_tx, mut stopped_rx) = channel(32); let mut actors = HashMap::new(); loop { select! { biased; _ = &mut termination => break, stopped = stopped_rx.recv() => match stopped { Some(stopped) => { let _ = actors.remove(&stopped); }, None => unreachable!(), }, new_conn = listener.accept() => { match new_conn { Ok((stream, socket_addr)) => { log::debug!("Incoming connection from {socket_addr}"); if actors.contains_key(&socket_addr) { log::warn!("Already contains connection form {socket_addr}"); // TODO kill the older connection and restart it continue; } let players = players.clone(); let rooms = rooms.clone(); let storage = storage.clone(); let terminator = Terminator::spawn(|termination| { let stopped_tx = stopped_tx.clone(); let loaded_config = loaded_config.clone(); async move { match handle_socket(loaded_config, stream, &socket_addr, players, rooms, storage, termination).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } stopped_tx.send(socket_addr).await?; Ok(()) } }); actors.insert(socket_addr, terminator); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), } }, } } log::info!("Stopping XMPP projection"); join_all(actors.into_iter().map(|(socket_addr, terminator)| async move { log::debug!("Stopping XMPP connection at {socket_addr}"); match terminator.terminate().await { Ok(_) => log::debug!("Stopped XMPP connection at {socket_addr}"), Err(err) => { log::warn!("XMPP connection to {socket_addr} finished with error: {err}") } } })) .await; log::info!("Stopped XMPP projection"); Ok(()) }); log::info!("Started XMPP projection"); Ok(RunningServer { addr, terminator }) } async fn handle_socket( config: Arc, mut stream: TcpStream, socket_addr: &SocketAddr, mut players: PlayerRegistry, rooms: RoomRegistry, mut storage: Storage, termination: Deferred<()>, // TODO use it to stop the connection gracefully ) -> Result<()> { log::info!("Received an XMPP connection from {socket_addr}"); let mut reader_buf = vec![]; let (reader, writer) = stream.split(); let mut buf_reader = BufReader::new(reader); let mut buf_writer = BufWriter::new(writer); socket_force_tls(&mut buf_reader, &mut buf_writer, &mut reader_buf).await?; let mut config = tokio_rustls::rustls::ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() .with_single_cert(vec![config.cert.clone()], config.key.clone())?; config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new()); log::debug!("Accepting TLS connection..."); let acceptor = TlsAcceptor::from(Arc::new(config)); let new_stream = acceptor.accept(stream).await?; log::debug!("TLS connection established"); let (a, b) = tokio::io::split(new_stream); let mut xml_reader = NsReader::from_reader(BufReader::new(a)); let mut xml_writer = Writer::new(b); let authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage).await?; log::debug!("User authenticated"); let mut connection = players.connect_to_player(authenticated.player_id.clone()).await; socket_final( &mut xml_reader, &mut xml_writer, &mut reader_buf, &authenticated, &mut connection, &rooms, ) .await?; let a = xml_reader.into_inner().into_inner(); let b = xml_writer.into_inner(); a.unsplit(b).shutdown().await?; Ok(()) } async fn socket_force_tls( reader: &mut (impl AsyncBufRead + Unpin), writer: &mut (impl AsyncWrite + Unpin), reader_buf: &mut Vec, ) -> Result<()> { use proto_xmpp::tls::*; let xml_reader = &mut NsReader::from_reader(reader); let xml_writer = &mut Writer::new(writer); read_xml_header(xml_reader, reader_buf).await?; let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?; let event = Event::Decl(BytesDecl::new("1.0", None, None)); xml_writer.write_event_async(event).await?; let msg = ServerStreamStart { from: "localhost".into(), lang: "en".into(), id: uuid::Uuid::new_v4().to_string(), version: "1.0".into(), }; msg.write_xml(xml_writer).await?; let msg = Features { start_tls: true, mechanisms: false, bind: false, }; msg.write_xml(xml_writer).await?; xml_writer.get_mut().flush().await?; let StartTLS = StartTLS::parse(xml_reader, reader_buf).await?; ProceedTLS.write_xml(xml_writer).await?; xml_writer.get_mut().flush().await?; Ok(()) } async fn socket_auth( xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>, xml_writer: &mut Writer<(impl AsyncWrite + Unpin)>, reader_buf: &mut Vec, storage: &mut Storage, ) -> Result { read_xml_header(xml_reader, reader_buf).await?; let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?; xml_writer.write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))).await?; ServerStreamStart { from: "localhost".into(), lang: "en".into(), id: uuid::Uuid::new_v4().to_string(), version: "1.0".into(), } .write_xml(xml_writer) .await?; Features { start_tls: false, mechanisms: true, bind: false, } .write_xml(xml_writer) .await?; xml_writer.get_mut().flush().await?; let auth: proto_xmpp::sasl::Auth = proto_xmpp::sasl::Auth::parse(xml_reader, reader_buf).await?; proto_xmpp::sasl::Success.write_xml(xml_writer).await?; match AuthBody::from_str(&auth.body) { Ok(logopass) => { let name = &logopass.login; let stored_user = storage.retrieve_user_by_name(name).await?; let stored_user = match stored_user { Some(u) => u, None => { log::info!("User '{}' not found", name); return Err(fail("no user found")); } }; // TODO return proper XML errors to the client if stored_user.password.is_none() { log::info!("Password not defined for user '{}'", name); return Err(fail("password is not defined")); } if stored_user.password.as_deref() != Some(&logopass.password) { log::info!("Incorrect password supplied for user '{}'", name); return Err(fail("passwords do not match")); } Ok(Authenticated { player_id: PlayerId::from(name.as_str())?, xmpp_name: Name(name.to_string().into()), xmpp_resource: Resource(name.to_string().into()), xmpp_muc_name: Resource(name.to_string().into()), }) } Err(e) => return Err(e), } } async fn socket_final( xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>, xml_writer: &mut Writer<(impl AsyncWrite + Unpin)>, reader_buf: &mut Vec, authenticated: &Authenticated, user_handle: &mut PlayerConnection, rooms: &RoomRegistry, ) -> Result<()> { read_xml_header(xml_reader, reader_buf).await?; let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?; xml_writer.write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))).await?; ServerStreamStart { from: "localhost".into(), lang: "en".into(), id: uuid::Uuid::new_v4().to_string(), version: "1.0".into(), } .write_xml(xml_writer) .await?; Features { start_tls: false, mechanisms: false, bind: true, } .write_xml(xml_writer) .await?; xml_writer.get_mut().flush().await?; let mut parser = proto::ClientPacket::parse(); let mut events = vec![]; reader_buf.clear(); let mut next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf)); 'outer: loop { let mut conn = XmppConnection { user: authenticated, user_handle, rooms, }; let should_recreate_xml_future = select! { biased; res = &mut next_xml_event => 's: { let (ns, event) = res?; if let Event::Text(ref e) = event { if e.iter().all(|x| *x == 0xA) { break 's true; } } match parser.consume(ns, &event) { Continuation::Final(res) => { let res = res?; let stop = conn.handle_packet(&mut events, res).await?; for i in &events { xml_writer.write_event_async(i).await?; } events.clear(); xml_writer.get_mut().flush().await?; if stop { break 'outer; } parser = proto::ClientPacket::parse(); } Continuation::Continue(p) => parser = p, } true }, update = conn.user_handle.receiver.recv() => { if let Some(update) = update { conn.handle_update(&mut events, update).await?; for i in &events { xml_writer.write_event_async(i).await?; } events.clear(); xml_writer.get_mut().flush().await?; } else { log::warn!("Player is terminated, must terminate the connection"); break; } false } }; if should_recreate_xml_future { drop(next_xml_event); next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf)); } } Ok(()) } struct XmppConnection<'a> { user: &'a Authenticated, user_handle: &'a mut PlayerConnection, rooms: &'a RoomRegistry, } impl<'a> XmppConnection<'a> { async fn handle_packet(&mut self, output: &mut Vec>, packet: ClientPacket) -> Result { let res = match packet { proto::ClientPacket::Iq(iq) => { self.handle_iq(output, iq).await; false } ClientPacket::Message(m) => { self.handle_message(output, m).await?; false } proto::ClientPacket::Presence(p) => { self.handle_presence(output, p).await?; false } proto::ClientPacket::StreamEnd => { ServerStreamEnd.serialize(output); true } }; Ok(res) } } async fn read_xml_header( xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>, reader_buf: &mut Vec, ) -> Result<()> { if let Event::Decl(bytes) = xml_reader.read_event_into_async(reader_buf).await? { // this is header if let Some(encoding) = bytes.encoding() { let encoding = encoding?; if &*encoding == b"UTF-8" { Ok(()) } else { Err(anyhow!("Unsupported encoding: {encoding:?}")) } } else { // Err(fail("No XML encoding provided")) Ok(()) } } else { Err(anyhow!("Expected XML header")) } }