mod proto; use std::collections::HashMap; use std::fs::File; use std::io::BufReader as SyncBufReader; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use futures_util::future::join_all; use prometheus::Registry as MetricsRegistry; use quick_xml::events::{BytesDecl, Event}; use quick_xml::{NsReader, Writer}; use rustls_pemfile::{certs, rsa_private_keys}; use serde::Deserialize; use tokio::io::{AsyncBufRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::channel; use tokio_rustls::rustls::{Certificate, PrivateKey}; use tokio_rustls::TlsAcceptor; use crate::core::player::{PlayerConnection, PlayerId, PlayerRegistry}; use crate::core::room::{RoomId, RoomRegistry}; use crate::prelude::*; use crate::protos::xmpp; use crate::protos::xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; use crate::protos::xmpp::client::{Iq, Message, MessageType, Presence}; use crate::protos::xmpp::disco::*; use crate::protos::xmpp::roster::RosterQuery; use crate::protos::xmpp::session::Session; use crate::protos::xmpp::stream::*; use crate::util::xml::{Continuation, FromXml, Parser, ToXml}; use crate::util::Terminator; use self::proto::{ClientPacket, IqClientBody}; #[derive(Deserialize, Debug, Clone)] pub struct ServerConfig { pub listen_on: SocketAddr, pub cert: PathBuf, pub key: PathBuf, } struct LoadedConfig { cert: Certificate, key: PrivateKey, } struct Authenticated { player_id: PlayerId, xmpp_name: Name, xmpp_resource: Resource, xmpp_muc_name: Resource, } pub async fn launch( config: ServerConfig, players: PlayerRegistry, rooms: RoomRegistry, metrics: MetricsRegistry, ) -> Result { log::info!("Starting XMPP projection"); let certs = certs(&mut SyncBufReader::new(File::open(config.cert)?))?; let certs = certs.into_iter().map(Certificate).collect::>(); let keys = rsa_private_keys(&mut SyncBufReader::new(File::open(config.key)?))?; let keys = keys.into_iter().map(PrivateKey).collect::>(); let loaded_config = Arc::new(LoadedConfig { cert: certs.into_iter().next().expect("no certs in file"), key: keys.into_iter().next().expect("no keys in file"), }); let listener = TcpListener::bind(config.listen_on).await?; let terminator = Terminator::spawn(|mut termination| async move { let (stopped_tx, mut stopped_rx) = channel(32); let mut actors = HashMap::new(); loop { select! { biased; _ = &mut termination => break, stopped = stopped_rx.recv() => match stopped { Some(stopped) => { let _ = actors.remove(&stopped); }, None => unreachable!(), }, new_conn = listener.accept() => { match new_conn { Ok((stream, socket_addr)) => { log::debug!("Incoming connection from {socket_addr}"); if actors.contains_key(&socket_addr) { log::warn!("Already contains connection form {socket_addr}"); // TODO kill the older connection and restart it continue; } let players = players.clone(); let rooms = rooms.clone(); let terminator = Terminator::spawn(|termination| { let stopped_tx = stopped_tx.clone(); let loaded_config = loaded_config.clone(); async move { match handle_socket(loaded_config, stream, &socket_addr, players, rooms, termination).await { Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"), } stopped_tx.send(socket_addr).await?; Ok(()) } }); actors.insert(socket_addr, terminator); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), } }, } } log::info!("Stopping XMPP projection"); join_all( actors .into_iter() .map(|(socket_addr, terminator)| async move { log::debug!("Stopping XMPP connection at {socket_addr}"); match terminator.terminate().await { Ok(_) => log::debug!("Stopped XMPP connection at {socket_addr}"), Err(err) => { log::warn!( "XMPP connection to {socket_addr} finished with error: {err}" ) } } }), ) .await; log::info!("Stopped XMPP projection"); Ok(()) }); log::info!("Started XMPP projection"); Ok(terminator) } async fn handle_socket( config: Arc, mut stream: TcpStream, socket_addr: &SocketAddr, mut players: PlayerRegistry, rooms: RoomRegistry, termination: Deferred<()>, // TODO use it to stop the connection gracefully ) -> Result<()> { log::debug!("Received an XMPP connection from {socket_addr}"); let mut reader_buf = vec![]; let (reader, writer) = stream.split(); let mut buf_reader = BufReader::new(reader); let mut buf_writer = BufWriter::new(writer); socket_force_tls(&mut buf_reader, &mut buf_writer, &mut reader_buf).await?; let mut config = tokio_rustls::rustls::ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() .with_single_cert(vec![config.cert.clone()], config.key.clone())?; config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new()); let acceptor = TlsAcceptor::from(Arc::new(config)); let new_stream = acceptor.accept(stream).await?; log::debug!("TLS connection established"); let (a, b) = tokio::io::split(new_stream); let mut xml_reader = NsReader::from_reader(BufReader::new(a)); let mut xml_writer = Writer::new(b); let authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf).await?; let mut connection = players .connect_to_player(authenticated.player_id.clone()) .await; socket_final( &mut xml_reader, &mut xml_writer, &mut reader_buf, &authenticated, &mut connection, &rooms, ) .await?; let a = xml_reader.into_inner().into_inner(); let b = xml_writer.into_inner(); a.unsplit(b).shutdown().await?; Ok(()) } async fn socket_force_tls( reader: &mut (impl AsyncBufRead + Unpin), writer: &mut (impl AsyncWrite + Unpin), reader_buf: &mut Vec, ) -> Result<()> { use crate::protos::xmpp::tls::*; let xml_reader = &mut NsReader::from_reader(reader); let xml_writer = &mut Writer::new(writer); read_xml_header(xml_reader, reader_buf).await?; let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?; let event = Event::Decl(BytesDecl::new("1.0", None, None)); xml_writer.write_event_async(event).await?; let msg = ServerStreamStart { from: "localhost".into(), lang: "en".into(), id: uuid::Uuid::new_v4().to_string(), version: "1.0".into(), }; msg.write_xml(xml_writer).await?; let msg = Features { start_tls: true, mechanisms: false, bind: false, }; msg.write_xml(xml_writer).await?; xml_writer.get_mut().flush().await?; let StartTLS = StartTLS::parse(xml_reader, reader_buf).await?; ProceedTLS.write_xml(xml_writer).await?; xml_writer.get_mut().flush().await?; Ok(()) } async fn socket_auth( xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>, xml_writer: &mut Writer<(impl AsyncWrite + Unpin)>, reader_buf: &mut Vec, ) -> Result { read_xml_header(xml_reader, reader_buf).await?; let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?; xml_writer .write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))) .await?; ServerStreamStart { from: "localhost".into(), lang: "en".into(), id: uuid::Uuid::new_v4().to_string(), version: "1.0".into(), } .write_xml(xml_writer) .await?; Features { start_tls: false, mechanisms: true, bind: false, } .write_xml(xml_writer) .await?; xml_writer.get_mut().flush().await?; let _ = xmpp::sasl::Auth::parse(xml_reader, reader_buf).await?; xmpp::sasl::Success.write_xml(xml_writer).await?; let name: Str = "darova".into(); Ok(Authenticated { player_id: PlayerId::from("darova")?, xmpp_name: Name(name.clone()), xmpp_resource: Resource(name.clone()), xmpp_muc_name: Resource(name), }) } async fn socket_final( xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>, xml_writer: &mut Writer<(impl AsyncWrite + Unpin)>, reader_buf: &mut Vec, authenticated: &Authenticated, user_handle: &mut PlayerConnection, rooms: &RoomRegistry, ) -> Result<()> { read_xml_header(xml_reader, reader_buf).await?; let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?; xml_writer .write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))) .await?; ServerStreamStart { from: "localhost".into(), lang: "en".into(), id: uuid::Uuid::new_v4().to_string(), version: "1.0".into(), } .write_xml(xml_writer) .await?; Features { start_tls: false, mechanisms: false, bind: true, } .write_xml(xml_writer) .await?; xml_writer.get_mut().flush().await?; let mut parser = proto::ClientPacket::parse(); let mut events = vec![]; reader_buf.clear(); let mut next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf)); 'outer: loop { let should_recreate_xml_future = select! { biased; res = &mut next_xml_event => 's: { let (ns, event) = res?; if let Event::Text(ref e) = event { if e.iter().all(|x| *x == 0xA) { break 's true; } } match parser.consume(ns, &event) { Continuation::Final(res) => { let res = res?; dbg!(&res); let stop = handle_packet(&mut events, res, authenticated, user_handle, rooms).await?; for i in &events { xml_writer.write_event_async(i).await?; } events.clear(); xml_writer.get_mut().flush().await?; if stop { break 'outer; } parser = proto::ClientPacket::parse(); } Continuation::Continue(p) => parser = p, } true }, update = user_handle.receiver.recv() => { if let Some(update) = update { match update { crate::core::player::Updates::NewMessage { room_id, author_id, body } => { Message { to: Some(Jid { name: Some(authenticated.xmpp_name.clone()), server: Server("localhost".into()), resource: Some(authenticated.xmpp_resource.clone()), }), from: Some(Jid { name: Some(Name(room_id.into_inner().into())), server: Server("rooms.localhost".into()), resource: Some(Resource(author_id.into_inner().into())), }), id: None, r#type: xmpp::client::MessageType::Groupchat, lang: None, subject: None, body: body.into(), } .serialize(&mut events); } _ => {}, } for i in &events { xml_writer.write_event_async(i).await?; } events.clear(); xml_writer.get_mut().flush().await?; } else { log::warn!("Player is terminated, must terminate the connection"); break; } false } }; if should_recreate_xml_future { drop(next_xml_event); next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf)); } } Ok(()) } async fn handle_packet( output: &mut Vec>, packet: ClientPacket, user: &Authenticated, user_handle: &mut PlayerConnection, rooms: &RoomRegistry, ) -> Result { Ok(match packet { proto::ClientPacket::Iq(iq) => { handle_iq(output, iq, rooms).await; false } proto::ClientPacket::Message(m) => { if let Some(Jid { name: Some(name), server, resource: _, }) = m.to { if server.0.as_ref() == "rooms.localhost" && m.r#type == MessageType::Groupchat { user_handle .send_message( RoomId::from(name.0.clone())?, m.body.clone().into(), ) .await?; Message { to: Some(Jid { name: Some(user.xmpp_name.clone()), server: Server("localhost".into()), resource: Some(user.xmpp_resource.clone()), }), from: Some(Jid { name: Some(name), server: Server("rooms.localhost".into()), resource: Some(user.xmpp_muc_name.clone()), }), id: m.id, r#type: xmpp::client::MessageType::Groupchat, lang: None, subject: None, body: m.body.clone(), } .serialize(output); false } else { todo!() } } else { todo!() } } proto::ClientPacket::Presence(p) => { let response = if p.to.is_none() { Presence::<()> { to: Some(Jid { name: Some(user.xmpp_name.clone()), server: Server("localhost".into()), resource: Some(user.xmpp_resource.clone()), }), from: Some(Jid { name: Some(user.xmpp_name.clone()), server: Server("localhost".into()), resource: Some(user.xmpp_resource.clone()), }), ..Default::default() } } else if let Some(Jid { name: Some(name), server, resource: Some(resource), }) = p.to { let a = user_handle .join_room(RoomId::from(name.0.clone())?) .await?; Presence::<()> { to: Some(Jid { name: Some(user.xmpp_name.clone()), server: Server("localhost".into()), resource: Some(user.xmpp_resource.clone()), }), from: Some(Jid { name: Some(name.clone()), server: Server("rooms.localhost".into()), resource: Some(user.xmpp_muc_name.clone()), }), ..Default::default() } } else { Presence::<()>::default() }; response.serialize(output); false } proto::ClientPacket::StreamEnd => { ServerStreamEnd.serialize(output); true } }) } async fn handle_iq(output: &mut Vec>, iq: Iq, rooms: &RoomRegistry) { match iq.body { proto::IqClientBody::Bind(b) => { let req = Iq { from: None, id: iq.id, to: None, r#type: xmpp::client::IqType::Result, body: BindResponse(Jid { name: Some(Name("darova".into())), server: Server("localhost".into()), resource: Some(Resource("kek".into())), }), }; req.serialize(output); } proto::IqClientBody::Session(_) => { let req = Iq { from: None, id: iq.id, to: None, r#type: xmpp::client::IqType::Result, body: Session, }; req.serialize(output); } proto::IqClientBody::Roster(_) => { let req = Iq { from: None, id: iq.id, to: None, r#type: xmpp::client::IqType::Result, body: RosterQuery, }; req.serialize(output); } proto::IqClientBody::DiscoInfo(info) => { let response = disco_info(iq.to.as_deref(), &info); let req = Iq { from: iq.to, id: iq.id, to: None, r#type: xmpp::client::IqType::Result, body: response, }; req.serialize(output); } proto::IqClientBody::DiscoItem(item) => { let response = disco_items(iq.to.as_deref(), &item, rooms).await; let req = Iq { from: iq.to, id: iq.id, to: None, r#type: xmpp::client::IqType::Result, body: response, }; req.serialize(output); } _ => { let req = Iq { from: None, id: iq.id, to: None, r#type: xmpp::client::IqType::Error, body: (), }; req.serialize(output); } } } fn disco_info(to: Option<&str>, req: &InfoQuery) -> InfoQuery { let identity; let feature; match to { Some("localhost") => { identity = vec![Identity { category: "server".into(), name: None, r#type: "im".into(), }]; feature = vec![ Feature::new("http://jabber.org/protocol/disco#info"), Feature::new("http://jabber.org/protocol/disco#items"), Feature::new("iq"), Feature::new("presence"), ] } Some("rooms.localhost") => { identity = vec![Identity { category: "conference".into(), name: Some("Chat rooms".into()), r#type: "text".into(), }]; feature = vec![ Feature::new("http://jabber.org/protocol/disco#info"), Feature::new("http://jabber.org/protocol/disco#items"), Feature::new("http://jabber.org/protocol/muc"), ] } _ => { identity = vec![]; feature = vec![]; } }; InfoQuery { node: None, identity, feature, } } async fn disco_items(to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery { let item = match to { Some("localhost") => { vec![Item { jid: Jid { name: None, server: Server("rooms.localhost".into()), resource: None, }, name: None, node: None, }] } Some("rooms.localhost") => { let room_list = rooms.get_all_rooms().await; room_list .into_iter() .map(|room_info| Item { jid: Jid { name: Some(Name(room_info.id.into_inner())), server: Server("rooms.localhost".into()), resource: None, }, name: None, node: None, }) .collect() } _ => vec![], }; ItemQuery { item } } async fn read_xml_header( xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>, reader_buf: &mut Vec, ) -> Result<()> { if let Event::Decl(bytes) = xml_reader.read_event_into_async(reader_buf).await? { // this is header if let Some(encoding) = bytes.encoding() { let encoding = encoding?; if &*encoding == b"UTF-8" { Ok(()) } else { Err(fail(format!("Unsupported encoding: {encoding:?}").as_str())) } } else { // Err(fail("No XML encoding provided")) Ok(()) } } else { Err(fail("Expected XML header")) } }