From ff806cc3d986607edaefc3eee46c24be726bf184 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Fri, 13 Oct 2023 20:09:08 +0000 Subject: [PATCH] xmpp: split off stanza and update handling into modules (#27) Reviewed-on: https://git.vilunov.me/lavina/lavina/pulls/27 --- crates/projection-xmpp/src/iq.rs | 162 +++++++++++++ crates/projection-xmpp/src/lib.rs | 320 ++++--------------------- crates/projection-xmpp/src/message.rs | 52 ++++ crates/projection-xmpp/src/presence.rs | 55 +++++ crates/projection-xmpp/src/updates.rs | 45 ++++ 5 files changed, 359 insertions(+), 275 deletions(-) create mode 100644 crates/projection-xmpp/src/iq.rs create mode 100644 crates/projection-xmpp/src/message.rs create mode 100644 crates/projection-xmpp/src/presence.rs create mode 100644 crates/projection-xmpp/src/updates.rs diff --git a/crates/projection-xmpp/src/iq.rs b/crates/projection-xmpp/src/iq.rs new file mode 100644 index 0000000..3ad0ae5 --- /dev/null +++ b/crates/projection-xmpp/src/iq.rs @@ -0,0 +1,162 @@ +//! Handling of all client2server iq stanzas + +use quick_xml::events::Event; + +use lavina_core::room::RoomRegistry; +use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; +use proto_xmpp::client::{Iq, IqType}; +use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery}; +use proto_xmpp::roster::RosterQuery; +use proto_xmpp::session::Session; + +use crate::proto::IqClientBody; +use crate::XmppConnection; + +use proto_xmpp::xml::ToXml; + +impl<'a> XmppConnection<'a> { + pub async fn handle_iq(&self, output: &mut Vec>, iq: Iq) { + match iq.body { + IqClientBody::Bind(b) => { + let req = Iq { + from: None, + id: iq.id, + to: None, + r#type: IqType::Result, + body: BindResponse(Jid { + name: Some(Name("darova".into())), + server: Server("localhost".into()), + resource: Some(Resource("kek".into())), + }), + }; + req.serialize(output); + } + IqClientBody::Session(_) => { + let req = Iq { + from: None, + id: iq.id, + to: None, + r#type: IqType::Result, + body: Session, + }; + req.serialize(output); + } + IqClientBody::Roster(_) => { + let req = Iq { + from: None, + id: iq.id, + to: None, + r#type: IqType::Result, + body: RosterQuery, + }; + req.serialize(output); + } + IqClientBody::DiscoInfo(info) => { + let response = disco_info(iq.to.as_deref(), &info); + let req = Iq { + from: iq.to, + id: iq.id, + to: None, + r#type: IqType::Result, + body: response, + }; + req.serialize(output); + } + IqClientBody::DiscoItem(item) => { + let response = disco_items(iq.to.as_deref(), &item, self.rooms).await; + let req = Iq { + from: iq.to, + id: iq.id, + to: None, + r#type: IqType::Result, + body: response, + }; + req.serialize(output); + } + _ => { + let req = Iq { + from: None, + id: iq.id, + to: None, + r#type: IqType::Error, + body: (), + }; + req.serialize(output); + } + } + } +} + +fn disco_info(to: Option<&str>, req: &InfoQuery) -> InfoQuery { + let identity; + let feature; + match to { + Some("localhost") => { + identity = vec![Identity { + category: "server".into(), + name: None, + r#type: "im".into(), + }]; + feature = vec![ + Feature::new("http://jabber.org/protocol/disco#info"), + Feature::new("http://jabber.org/protocol/disco#items"), + Feature::new("iq"), + Feature::new("presence"), + ] + } + Some("rooms.localhost") => { + identity = vec![Identity { + category: "conference".into(), + name: Some("Chat rooms".into()), + r#type: "text".into(), + }]; + feature = vec![ + Feature::new("http://jabber.org/protocol/disco#info"), + Feature::new("http://jabber.org/protocol/disco#items"), + Feature::new("http://jabber.org/protocol/muc"), + ] + } + _ => { + identity = vec![]; + feature = vec![]; + } + }; + InfoQuery { + node: None, + identity, + feature, + } +} + +async fn disco_items(to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery { + let item = match to { + Some("localhost") => { + vec![Item { + jid: Jid { + name: None, + server: Server("rooms.localhost".into()), + resource: None, + }, + name: None, + node: None, + }] + } + Some("rooms.localhost") => { + let room_list = rooms.get_all_rooms().await; + room_list + .into_iter() + .map(|room_info| Item { + jid: Jid { + name: Some(Name(room_info.id.into_inner())), + server: Server("rooms.localhost".into()), + resource: None, + }, + name: None, + node: None, + }) + .collect() + } + _ => vec![], + }; + ItemQuery { item } +} diff --git a/crates/projection-xmpp/src/lib.rs b/crates/projection-xmpp/src/lib.rs index fb587ea..29b3dee 100644 --- a/crates/projection-xmpp/src/lib.rs +++ b/crates/projection-xmpp/src/lib.rs @@ -24,19 +24,20 @@ use tokio_rustls::TlsAcceptor; use lavina_core::player::{PlayerConnection, PlayerId, PlayerRegistry}; use lavina_core::prelude::*; -use lavina_core::room::{RoomId, RoomRegistry}; -use lavina_core::terminator::Terminator; use lavina_core::repo::Storage; -use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; -use proto_xmpp::client::{Iq, Message, MessageType, Presence}; -use proto_xmpp::disco::*; -use proto_xmpp::roster::RosterQuery; +use lavina_core::room::RoomRegistry; +use lavina_core::terminator::Terminator; +use proto_xmpp::bind::{Name, Resource}; use proto_xmpp::sasl::AuthBody; -use proto_xmpp::session::Session; use proto_xmpp::stream::*; use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; -use self::proto::{ClientPacket, IqClientBody}; +use self::proto::ClientPacket; + +mod iq; +mod message; +mod presence; +mod updates; #[derive(Deserialize, Debug, Clone)] pub struct ServerConfig { @@ -300,8 +301,8 @@ async fn socket_auth( xmpp_resource: Resource(name.to_string().into()), xmpp_muc_name: Resource(name.to_string().into()), }) - }, - Err(e) => return Err(e) + } + Err(e) => return Err(e), } } @@ -342,6 +343,11 @@ async fn socket_final( let mut next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf)); 'outer: loop { + let mut conn = XmppConnection { + user: authenticated, + user_handle, + rooms, + }; let should_recreate_xml_future = select! { biased; res = &mut next_xml_event => 's: { @@ -354,7 +360,7 @@ async fn socket_final( match parser.consume(ns, &event) { Continuation::Final(res) => { let res = res?; - let stop = handle_packet(&mut events, res, authenticated, user_handle, rooms).await?; + let stop = conn.handle_packet(&mut events, res).await?; for i in &events { xml_writer.write_event_async(i).await?; } @@ -369,32 +375,9 @@ async fn socket_final( } true }, - update = user_handle.receiver.recv() => { + update = conn.user_handle.receiver.recv() => { if let Some(update) = update { - match update { - lavina_core::player::Updates::NewMessage { room_id, author_id, body } => { - Message::<()> { - to: Some(Jid { - name: Some(authenticated.xmpp_name.clone()), - server: Server("localhost".into()), - resource: Some(authenticated.xmpp_resource.clone()), - }), - from: Some(Jid { - name: Some(Name(room_id.into_inner().into())), - server: Server("rooms.localhost".into()), - resource: Some(Resource(author_id.into_inner().into())), - }), - id: None, - r#type: proto_xmpp::client::MessageType::Groupchat, - lang: None, - subject: None, - body: body.into(), - custom: vec![], - } - .serialize(&mut events); - } - _ => {}, - } + conn.handle_update(&mut events, update).await?; for i in &events { xml_writer.write_event_async(i).await?; } @@ -416,249 +399,36 @@ async fn socket_final( Ok(()) } -async fn handle_packet( - output: &mut Vec>, - packet: ClientPacket, - user: &Authenticated, - user_handle: &mut PlayerConnection, - rooms: &RoomRegistry, -) -> Result { - Ok(match packet { - proto::ClientPacket::Iq(iq) => { - handle_iq(output, iq, rooms).await; - false - } - proto::ClientPacket::Message(m) => { - if let Some(Jid { - name: Some(name), - server, - resource: _, - }) = m.to - { - if server.0.as_ref() == "rooms.localhost" && m.r#type == MessageType::Groupchat { - user_handle - .send_message(RoomId::from(name.0.clone())?, m.body.clone().into()) - .await?; - Message::<()> { - to: Some(Jid { - name: Some(user.xmpp_name.clone()), - server: Server("localhost".into()), - resource: Some(user.xmpp_resource.clone()), - }), - from: Some(Jid { - name: Some(name), - server: Server("rooms.localhost".into()), - resource: Some(user.xmpp_muc_name.clone()), - }), - id: m.id, - r#type: proto_xmpp::client::MessageType::Groupchat, - lang: None, - subject: None, - body: m.body.clone(), - custom: vec![], - } - .serialize(output); - false - } else { - todo!() - } - } else { - todo!() +struct XmppConnection<'a> { + user: &'a Authenticated, + user_handle: &'a mut PlayerConnection, + rooms: &'a RoomRegistry, +} + +impl<'a> XmppConnection<'a> { + async fn handle_packet(&mut self, output: &mut Vec>, packet: ClientPacket) -> Result { + let res = match packet { + proto::ClientPacket::Iq(iq) => { + self.handle_iq(output, iq).await; + false } - } - proto::ClientPacket::Presence(p) => { - let response = if p.to.is_none() { - Presence::<()> { - to: Some(Jid { - name: Some(user.xmpp_name.clone()), - server: Server("localhost".into()), - resource: Some(user.xmpp_resource.clone()), - }), - from: Some(Jid { - name: Some(user.xmpp_name.clone()), - server: Server("localhost".into()), - resource: Some(user.xmpp_resource.clone()), - }), - ..Default::default() - } - } else if let Some(Jid { - name: Some(name), - server, - resource: Some(resource), - }) = p.to - { - let a = user_handle.join_room(RoomId::from(name.0.clone())?).await?; - Presence::<()> { - to: Some(Jid { - name: Some(user.xmpp_name.clone()), - server: Server("localhost".into()), - resource: Some(user.xmpp_resource.clone()), - }), - from: Some(Jid { - name: Some(name.clone()), - server: Server("rooms.localhost".into()), - resource: Some(user.xmpp_muc_name.clone()), - }), - ..Default::default() - } - } else { - Presence::<()>::default() - }; - response.serialize(output); - false - } - proto::ClientPacket::StreamEnd => { - ServerStreamEnd.serialize(output); - true - } - }) -} - -async fn handle_iq(output: &mut Vec>, iq: Iq, rooms: &RoomRegistry) { - match iq.body { - proto::IqClientBody::Bind(b) => { - let req = Iq { - from: None, - id: iq.id, - to: None, - r#type: proto_xmpp::client::IqType::Result, - body: BindResponse(Jid { - name: Some(Name("darova".into())), - server: Server("localhost".into()), - resource: Some(Resource("kek".into())), - }), - }; - req.serialize(output); - } - proto::IqClientBody::Session(_) => { - let req = Iq { - from: None, - id: iq.id, - to: None, - r#type: proto_xmpp::client::IqType::Result, - body: Session, - }; - req.serialize(output); - } - proto::IqClientBody::Roster(_) => { - let req = Iq { - from: None, - id: iq.id, - to: None, - r#type: proto_xmpp::client::IqType::Result, - body: RosterQuery, - }; - req.serialize(output); - } - proto::IqClientBody::DiscoInfo(info) => { - let response = disco_info(iq.to.as_deref(), &info); - let req = Iq { - from: iq.to, - id: iq.id, - to: None, - r#type: proto_xmpp::client::IqType::Result, - body: response, - }; - req.serialize(output); - } - proto::IqClientBody::DiscoItem(item) => { - let response = disco_items(iq.to.as_deref(), &item, rooms).await; - let req = Iq { - from: iq.to, - id: iq.id, - to: None, - r#type: proto_xmpp::client::IqType::Result, - body: response, - }; - req.serialize(output); - } - _ => { - let req = Iq { - from: None, - id: iq.id, - to: None, - r#type: proto_xmpp::client::IqType::Error, - body: (), - }; - req.serialize(output); - } + ClientPacket::Message(m) => { + self.handle_message(output, m).await?; + false + } + proto::ClientPacket::Presence(p) => { + self.handle_presence(output, p).await?; + false + } + proto::ClientPacket::StreamEnd => { + ServerStreamEnd.serialize(output); + true + } + }; + Ok(res) } } -fn disco_info(to: Option<&str>, req: &InfoQuery) -> InfoQuery { - let identity; - let feature; - match to { - Some("localhost") => { - identity = vec![Identity { - category: "server".into(), - name: None, - r#type: "im".into(), - }]; - feature = vec![ - Feature::new("http://jabber.org/protocol/disco#info"), - Feature::new("http://jabber.org/protocol/disco#items"), - Feature::new("iq"), - Feature::new("presence"), - ] - } - Some("rooms.localhost") => { - identity = vec![Identity { - category: "conference".into(), - name: Some("Chat rooms".into()), - r#type: "text".into(), - }]; - feature = vec![ - Feature::new("http://jabber.org/protocol/disco#info"), - Feature::new("http://jabber.org/protocol/disco#items"), - Feature::new("http://jabber.org/protocol/muc"), - ] - } - _ => { - identity = vec![]; - feature = vec![]; - } - }; - InfoQuery { - node: None, - identity, - feature, - } -} - -async fn disco_items(to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery { - let item = match to { - Some("localhost") => { - vec![Item { - jid: Jid { - name: None, - server: Server("rooms.localhost".into()), - resource: None, - }, - name: None, - node: None, - }] - } - Some("rooms.localhost") => { - let room_list = rooms.get_all_rooms().await; - room_list - .into_iter() - .map(|room_info| Item { - jid: Jid { - name: Some(Name(room_info.id.into_inner())), - server: Server("rooms.localhost".into()), - resource: None, - }, - name: None, - node: None, - }) - .collect() - } - _ => vec![], - }; - ItemQuery { item } -} - async fn read_xml_header( xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>, reader_buf: &mut Vec, diff --git a/crates/projection-xmpp/src/message.rs b/crates/projection-xmpp/src/message.rs new file mode 100644 index 0000000..9369076 --- /dev/null +++ b/crates/projection-xmpp/src/message.rs @@ -0,0 +1,52 @@ +//! Handling of all client2server message stanzas + +use quick_xml::events::Event; + +use lavina_core::prelude::*; +use lavina_core::room::RoomId; +use proto_xmpp::bind::{Jid, Server}; +use proto_xmpp::client::{Message, MessageType}; +use proto_xmpp::xml::{Ignore, ToXml}; + +use crate::XmppConnection; + +impl<'a> XmppConnection<'a> { + pub async fn handle_message(&mut self, output: &mut Vec>, m: Message) -> Result<()> { + if let Some(Jid { + name: Some(name), + server, + resource: _, + }) = m.to + { + if server.0.as_ref() == "rooms.localhost" && m.r#type == MessageType::Groupchat { + self.user_handle + .send_message(RoomId::from(name.0.clone())?, m.body.clone().into()) + .await?; + Message::<()> { + to: Some(Jid { + name: Some(self.user.xmpp_name.clone()), + server: Server("localhost".into()), + resource: Some(self.user.xmpp_resource.clone()), + }), + from: Some(Jid { + name: Some(name), + server: Server("rooms.localhost".into()), + resource: Some(self.user.xmpp_muc_name.clone()), + }), + id: m.id, + r#type: MessageType::Groupchat, + lang: None, + subject: None, + body: m.body.clone(), + custom: vec![], + } + .serialize(output); + Ok(()) + } else { + todo!() + } + } else { + todo!() + } + } +} diff --git a/crates/projection-xmpp/src/presence.rs b/crates/projection-xmpp/src/presence.rs new file mode 100644 index 0000000..eabf0fd --- /dev/null +++ b/crates/projection-xmpp/src/presence.rs @@ -0,0 +1,55 @@ +//! Handling of all client2server presence stanzas + +use quick_xml::events::Event; + +use lavina_core::prelude::*; +use lavina_core::room::RoomId; +use proto_xmpp::bind::{Jid, Server}; +use proto_xmpp::client::Presence; +use proto_xmpp::xml::{Ignore, ToXml}; + +use crate::XmppConnection; + +impl<'a> XmppConnection<'a> { + pub async fn handle_presence(&mut self, output: &mut Vec>, p: Presence) -> Result<()> { + let response = if p.to.is_none() { + Presence::<()> { + to: Some(Jid { + name: Some(self.user.xmpp_name.clone()), + server: Server("localhost".into()), + resource: Some(self.user.xmpp_resource.clone()), + }), + from: Some(Jid { + name: Some(self.user.xmpp_name.clone()), + server: Server("localhost".into()), + resource: Some(self.user.xmpp_resource.clone()), + }), + ..Default::default() + } + } else if let Some(Jid { + name: Some(name), + server, + resource: Some(resource), + }) = p.to + { + let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?; + Presence::<()> { + to: Some(Jid { + name: Some(self.user.xmpp_name.clone()), + server: Server("localhost".into()), + resource: Some(self.user.xmpp_resource.clone()), + }), + from: Some(Jid { + name: Some(name.clone()), + server: Server("rooms.localhost".into()), + resource: Some(self.user.xmpp_muc_name.clone()), + }), + ..Default::default() + } + } else { + Presence::<()>::default() + }; + response.serialize(output); + Ok(()) + } +} diff --git a/crates/projection-xmpp/src/updates.rs b/crates/projection-xmpp/src/updates.rs new file mode 100644 index 0000000..c211be8 --- /dev/null +++ b/crates/projection-xmpp/src/updates.rs @@ -0,0 +1,45 @@ +//! Handling of updates and converting them into server2client stanzas + +use anyhow::Result; +use quick_xml::events::Event; + +use lavina_core::player::Updates; +use proto_xmpp::bind::{Jid, Name, Resource, Server}; +use proto_xmpp::client::{Message, MessageType}; +use proto_xmpp::xml::ToXml; + +use crate::XmppConnection; + +impl<'a> XmppConnection<'a> { + pub async fn handle_update(&mut self, output: &mut Vec>, update: Updates) -> Result<()> { + match update { + Updates::NewMessage { + room_id, + author_id, + body, + } => { + Message::<()> { + to: Some(Jid { + name: Some(self.user.xmpp_name.clone()), + server: Server("localhost".into()), + resource: Some(self.user.xmpp_resource.clone()), + }), + from: Some(Jid { + name: Some(Name(room_id.into_inner().into())), + server: Server("rooms.localhost".into()), + resource: Some(Resource(author_id.into_inner().into())), + }), + id: None, + r#type: MessageType::Groupchat, + lang: None, + subject: None, + body: body.into(), + custom: vec![], + } + .serialize(output); + } + _ => {} + } + Ok(()) + } +}