From ea0377c78abf6ee779af60eab7c15efb3876883f Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Sun, 1 Oct 2023 21:54:14 +0200 Subject: [PATCH] fix message parsing --- crates/projection-xmpp/src/lib.rs | 6 +- crates/projection-xmpp/src/proto.rs | 4 +- crates/proto-xmpp/src/client.rs | 105 ++++++++++++++-------------- 3 files changed, 57 insertions(+), 58 deletions(-) diff --git a/crates/projection-xmpp/src/lib.rs b/crates/projection-xmpp/src/lib.rs index d55a09f..7e93537 100644 --- a/crates/projection-xmpp/src/lib.rs +++ b/crates/projection-xmpp/src/lib.rs @@ -328,7 +328,7 @@ async fn socket_final( if let Some(update) = update { match update { lavina_core::player::Updates::NewMessage { room_id, author_id, body } => { - Message { + Message::<()> { to: Some(Jid { name: Some(authenticated.xmpp_name.clone()), server: Server("localhost".into()), @@ -344,6 +344,7 @@ async fn socket_final( lang: None, subject: None, body: body.into(), + custom: vec![], } .serialize(&mut events); } @@ -393,7 +394,7 @@ async fn handle_packet( user_handle .send_message(RoomId::from(name.0.clone())?, m.body.clone().into()) .await?; - Message { + Message::<()> { to: Some(Jid { name: Some(user.xmpp_name.clone()), server: Server("localhost".into()), @@ -409,6 +410,7 @@ async fn handle_packet( lang: None, subject: None, body: m.body.clone(), + custom: vec![], } .serialize(output); false diff --git a/crates/projection-xmpp/src/proto.rs b/crates/projection-xmpp/src/proto.rs index f9bed8c..e486b65 100644 --- a/crates/projection-xmpp/src/proto.rs +++ b/crates/projection-xmpp/src/proto.rs @@ -49,7 +49,7 @@ impl FromXml for IqClientBody { #[derive(PartialEq, Eq, Debug, From)] pub enum ClientPacket { Iq(Iq), - Message(Message), + Message(Message), Presence(Presence), StreamEnd, } @@ -65,7 +65,7 @@ impl FromXml for ClientPacket { match_parser!(name, namespace, event; Iq::, Presence::, - Message, + Message::, { Err(anyhow!( "Unexpected XML event of name {:?} in namespace {:?}", diff --git a/crates/proto-xmpp/src/client.rs b/crates/proto-xmpp/src/client.rs index a209127..fd7ac93 100644 --- a/crates/proto-xmpp/src/client.rs +++ b/crates/proto-xmpp/src/client.rs @@ -3,8 +3,7 @@ use quick_xml::events::attributes::Attribute; use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event}; use quick_xml::name::{QName, ResolveResult}; -use anyhow::{Result, anyhow as ffail}; - +use anyhow::{anyhow as ffail, Result}; use crate::prelude::*; use crate::xml::*; @@ -26,33 +25,33 @@ pub struct Message { pub custom: Vec, } -impl FromXmlTag for Message { +impl FromXmlTag for Message { const NS: &'static str = XMLNS; const NAME: &'static str = "message"; } -impl FromXml for Message { - type P = MessageParser; +impl FromXml for Message { + type P = impl Parser>; fn parse() -> Self::P { - MessageParserInner::Init.into() + MessageParser(MessageParserInner::Init) } } #[derive(From)] -pub struct MessageParser(MessageParserInner); +struct MessageParser(MessageParserInner); #[derive(Default)] enum MessageParserInner { #[default] Init, - Outer(MessageParserState), - InSubject(MessageParserState), - InBody(MessageParserState), - InCustom(T::P), + Outer(MessageParserState), + InSubject(MessageParserState), + InBody(MessageParserState), + InCustom(MessageParserState, T::P), } #[derive(Default)] -struct MessageParserState { +struct MessageParserState { from: Option, id: Option, to: Option, @@ -60,21 +59,27 @@ struct MessageParserState { lang: Option, subject: Option, body: Option, + custom: Vec, } -impl Parser for MessageParser { +impl Parser for MessageParser { type Output = Result>; - fn consume<'a>( - self: Self, - namespace: ResolveResult, - event: &Event<'a>, - ) -> Continuation { + fn consume<'a>(self: Self, namespace: ResolveResult, event: &Event<'a>) -> Continuation { // TODO validate tag name and namespace at each stage use MessageParserInner::*; match self.0 { Init => { if let Event::Start(ref bytes) = event { - let mut state: MessageParserState = Default::default(); + let mut state: MessageParserState = MessageParserState { + from: None, + id: None, + to: None, + r#type: MessageType::Normal, + lang: None, + subject: None, + body: None, + custom: vec![], + }; for attr in bytes.attributes() { let attr = fail_fast!(attr); if attr.key.0 == b"from" { @@ -105,7 +110,7 @@ impl Parser for MessageParser { } else if bytes.name().0 == b"body" { Continuation::Continue(InBody(state).into()) } else { - Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))) + Continuation::Continue(InCustom(state, T::parse()).into()) } } Event::End(_) => { @@ -118,6 +123,7 @@ impl Parser for MessageParser { lang: state.lang, subject: state.subject, body, + custom: state.custom, })) } else { Continuation::Final(Err(ffail!("Body not found"))) @@ -145,6 +151,14 @@ impl Parser for MessageParser { Event::End(_) => Continuation::Continue(Outer(state).into()), _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), }, + InCustom(mut state, mut custom) => match custom.consume(namespace, event) { + Continuation::Final(Ok(e)) => { + state.custom.push(e); + Continuation::Continue(Outer(state).into()) + } + Continuation::Final(Err(e)) => Continuation::Final(Err(e)), + Continuation::Continue(c) => Continuation::Continue(InCustom(state, c).into()), + }, } } } @@ -262,11 +276,7 @@ struct IqParserState { impl Parser for IqParser { type Output = Result>; - fn consume<'a>( - self: Self, - namespace: ResolveResult, - event: &Event<'a>, - ) -> Continuation { + fn consume<'a>(self: Self, namespace: ResolveResult, event: &Event<'a>) -> Continuation { match self.0 { IqParserInner::Init => { if let Event::Start(ref bytes) = event { @@ -298,18 +308,16 @@ impl Parser for IqParser { Continuation::Final(Err(ffail!("Expected start"))) } } - IqParserInner::ParsingBody(mut state, parser) => { - match parser.consume(namespace, event) { - Continuation::Final(f) => { - let body = fail_fast!(f); - state.body = Some(body); - Continuation::Continue(IqParser(IqParserInner::Final(state))) - } - Continuation::Continue(parser) => { - Continuation::Continue(IqParser(IqParserInner::ParsingBody(state, parser))) - } + IqParserInner::ParsingBody(mut state, parser) => match parser.consume(namespace, event) { + Continuation::Final(f) => { + let body = fail_fast!(f); + state.body = Some(body); + Continuation::Continue(IqParser(IqParserInner::Final(state))) } - } + Continuation::Continue(parser) => { + Continuation::Continue(IqParser(IqParserInner::ParsingBody(state, parser))) + } + }, IqParserInner::Final(state) => { if let Event::End(ref bytes) = event { let id = fail_fast!(state.id.ok_or_else(|| ffail!("No id provided"))); @@ -590,22 +598,16 @@ mod tests { #[tokio::test] async fn parse_message() { - let input = r#"daabbb"#; + let input = r#"daabbb"#; let mut reader = NsReader::from_reader(input.as_bytes()); let mut buf = vec![]; - let (ns, event) = reader - .read_resolved_event_into_async(&mut buf) - .await - .unwrap(); + let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); let mut parser = Message::parse().consume(ns, &event); let result = loop { match parser { Continuation::Final(res) => break res, Continuation::Continue(next) => { - let (ns, event) = reader - .read_resolved_event_into_async(&mut buf) - .await - .unwrap(); + let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); parser = next.consume(ns, &event); } } @@ -613,7 +615,7 @@ mod tests { .unwrap(); assert_eq!( result, - Message { + Message:: { from: None, id: Some("aacea".to_string()), to: Some(Jid { @@ -625,6 +627,7 @@ mod tests { lang: None, subject: Some("daa".into()), body: "bbb".into(), + custom: vec![Ignore], } ) } @@ -634,19 +637,13 @@ mod tests { let input = r#"mobile"#; let mut reader = NsReader::from_reader(input.as_bytes()); let mut buf = vec![]; - let (ns, event) = reader - .read_resolved_event_into_async(&mut buf) - .await - .unwrap(); + let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); let mut parser = Iq::::parse().consume(ns, &event); let result = loop { match parser { Continuation::Final(res) => break res, Continuation::Continue(next) => { - let (ns, event) = reader - .read_resolved_event_into_async(&mut buf) - .await - .unwrap(); + let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); parser = next.consume(ns, &event); } }