forked from lavina/lavina
1
0
Fork 0

feat(xmpp): glue parsing of incoming messages together

This commit is contained in:
Nikita Vilunov 2023-03-12 00:30:48 +01:00
parent 443f6a2c90
commit 4730526fee
5 changed files with 186 additions and 23 deletions

View File

@ -24,6 +24,7 @@ use crate::core::room::RoomRegistry;
use crate::prelude::*; use crate::prelude::*;
use crate::protos::xmpp; use crate::protos::xmpp;
use crate::protos::xmpp::stream::*; use crate::protos::xmpp::stream::*;
use crate::util::xml::{Continuation, FromXml, Parser};
use crate::util::Terminator; use crate::util::Terminator;
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
@ -261,6 +262,27 @@ async fn socket_final(
.write_xml(xml_writer) .write_xml(xml_writer)
.await?; .await?;
xml_writer.get_mut().flush().await?; xml_writer.get_mut().flush().await?;
let mut parser = proto::ClientPacket::parse();
loop {
reader_buf.clear();
let (ns, event) = xml_reader
.read_resolved_event_into_async(reader_buf)
.await?;
if let Event::Text(ref e) = event {
if **e == [0xAu8] {
continue;
}
}
match parser.consume(ns, &event) {
Continuation::Final(res) => {
let res = res?;
dbg!(res);
parser = proto::ClientPacket::parse();
}
Continuation::Continue(p) => parser = p,
}
}
Ok(()) Ok(())
} }

View File

@ -3,6 +3,7 @@ use quick_xml::events::Event;
use quick_xml::name::{Namespace, ResolveResult}; use quick_xml::name::{Namespace, ResolveResult};
use crate::protos::xmpp::bind::BindRequest; use crate::protos::xmpp::bind::BindRequest;
use crate::protos::xmpp::client::{Iq, Message};
use crate::util::xml::{Continuation, FromXml, Parser}; use crate::util::xml::{Continuation, FromXml, Parser};
use crate::prelude::*; use crate::prelude::*;
@ -41,7 +42,7 @@ impl Parser for IqClientBodyParser {
match self.0 { match self.0 {
Initial => { Initial => {
let Event::Start(bytes) = event else { let Event::Start(bytes) = event else {
return Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))); return Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}. Expected start of {}", BindRequest::NAME)));
}; };
if bytes.name().0 == BindRequest::NAME.as_bytes() if bytes.name().0 == BindRequest::NAME.as_bytes()
&& namespace == ResolveResult::Bound(Namespace(BindRequest::NS.as_bytes())) && namespace == ResolveResult::Bound(Namespace(BindRequest::NS.as_bytes()))
@ -69,3 +70,74 @@ impl Parser for IqClientBodyParser {
} }
} }
} }
#[derive(PartialEq, Eq, Debug, From)]
pub enum ClientPacket {
Iq(Iq<IqClientBody>),
Message(Message),
}
#[derive(From)]
pub struct ClientPacketParser(ClientPacketParserInner);
impl FromXml for ClientPacket {
type P = ClientPacketParser;
fn parse() -> Self::P {
ClientPacketParserInner::Initial.into()
}
}
#[derive(From)]
enum ClientPacketParserInner {
Initial,
Iq(<Iq<IqClientBody> as FromXml>::P),
Message(<Message as FromXml>::P),
}
impl Parser for ClientPacketParser {
type Output = Result<ClientPacket>;
fn consume<'a>(
self: Self,
namespace: ResolveResult,
event: &Event<'a>,
) -> Continuation<Self, Self::Output> {
use ClientPacketParserInner::{Initial, Iq as IqV, Message as MessageV};
match self.0 {
Initial => {
let Event::Start(bytes) = event else {
return Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}")));
};
if bytes.name().0 == Iq::<IqClientBody>::NAME.as_bytes()
&& namespace
== ResolveResult::Bound(Namespace(Iq::<IqClientBody>::NS.as_bytes()))
{
ClientPacketParser(IqV(Iq::<IqClientBody>::parse())).consume(namespace, event)
} else if bytes.name().0 == Message::NAME.as_bytes()
&& namespace == ResolveResult::Bound(Namespace(Message::NS.as_bytes()))
{
ClientPacketParser(MessageV(Message::parse())).consume(namespace, event)
} else {
Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}")))
}
}
IqV(p) => match p.consume(namespace, event) {
Continuation::Final(Ok(r)) => Continuation::Final(Ok(r.into())),
Continuation::Final(Err(e)) => Continuation::Final(Err(e)),
Continuation::Continue(s) => {
let inner: ClientPacketParserInner = s.into();
Continuation::Continue(inner.into())
}
},
MessageV(p) => match p.consume(namespace, event) {
Continuation::Final(Ok(r)) => Continuation::Final(Ok(r.into())),
Continuation::Final(Err(e)) => Continuation::Final(Err(e)),
Continuation::Continue(s) => {
let inner: ClientPacketParserInner = s.into();
Continuation::Continue(inner.into())
}
},
}
}
}

View File

@ -14,7 +14,7 @@ pub struct Name(String);
pub struct Server(String); pub struct Server(String);
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
pub struct Resource(String); pub struct Resource(pub(super) String);
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
pub struct Jid { pub struct Jid {
@ -33,7 +33,7 @@ pub struct Jid {
/// ``` /// ```
/// ///
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
pub struct BindRequest(Resource); pub struct BindRequest(pub Resource);
pub struct BindRequestParser(BindRequestParserInner); pub struct BindRequestParser(BindRequestParserInner);

View File

@ -1,10 +1,11 @@
use derive_more::From;
use quick_xml::events::Event; use quick_xml::events::Event;
use quick_xml::name::ResolveResult; use quick_xml::name::ResolveResult;
use crate::prelude::*; use crate::prelude::*;
use crate::util::xml::*; use crate::util::xml::*;
pub static XMLNS: &'static str = "jabber:client"; pub const XMLNS: &'static str = "jabber:client";
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
pub struct Message { pub struct Message {
@ -18,14 +19,25 @@ pub struct Message {
pub subject: Option<String>, pub subject: Option<String>,
pub body: String, pub body: String,
} }
impl Message { impl Message {
pub fn parse() -> impl Parser<Output = Result<Self>> { pub const NS: &'static str = XMLNS;
MessageParser::Init pub const NAME: &'static str = "message";
}
impl FromXml for Message {
type P = MessageParser;
fn parse() -> Self::P {
MessageParserInner::Init.into()
} }
} }
#[derive(From)]
pub struct MessageParser(MessageParserInner);
#[derive(Default)] #[derive(Default)]
enum MessageParser { enum MessageParserInner {
#[default] #[default]
Init, Init,
Outer(MessageParserState), Outer(MessageParserState),
@ -51,8 +63,9 @@ impl Parser for MessageParser {
event: &Event<'a>, event: &Event<'a>,
) -> Continuation<Self, Self::Output> { ) -> Continuation<Self, Self::Output> {
// TODO validate tag name and namespace at each stage // TODO validate tag name and namespace at each stage
match self { use MessageParserInner::*;
MessageParser::Init => { match self.0 {
Init => {
if let Event::Start(ref bytes) = event { if let Event::Start(ref bytes) = event {
let mut state: MessageParserState = Default::default(); let mut state: MessageParserState = Default::default();
for attr in bytes.attributes() { for attr in bytes.attributes() {
@ -71,17 +84,17 @@ impl Parser for MessageParser {
state.r#type = value; state.r#type = value;
} }
} }
Continuation::Continue(MessageParser::Outer(state)) Continuation::Continue(Outer(state).into())
} else { } else {
Continuation::Final(Err(ffail!("Expected start"))) Continuation::Final(Err(ffail!("Expected start")))
} }
} }
MessageParser::Outer(state) => match event { Outer(state) => match event {
Event::Start(ref bytes) => { Event::Start(ref bytes) => {
if bytes.name().0 == b"subject" { if bytes.name().0 == b"subject" {
Continuation::Continue(MessageParser::InSubject(state)) Continuation::Continue(InSubject(state).into())
} else if bytes.name().0 == b"body" { } else if bytes.name().0 == b"body" {
Continuation::Continue(MessageParser::InBody(state)) Continuation::Continue(InBody(state).into())
} else { } else {
Continuation::Final(Err(ffail!("Unexpected XML tag"))) Continuation::Final(Err(ffail!("Unexpected XML tag")))
} }
@ -103,24 +116,24 @@ impl Parser for MessageParser {
} }
_ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))),
}, },
MessageParser::InSubject(mut state) => match event { InSubject(mut state) => match event {
Event::Text(ref bytes) => { Event::Text(ref bytes) => {
let subject = fail_fast!(std::str::from_utf8(&*bytes)); let subject = fail_fast!(std::str::from_utf8(&*bytes));
state.subject = Some(subject.to_string()); state.subject = Some(subject.to_string());
Continuation::Continue(MessageParser::InSubject(state)) Continuation::Continue(InSubject(state).into())
} }
Event::End(_) => Continuation::Continue(MessageParser::Outer(state)), Event::End(_) => Continuation::Continue(Outer(state).into()),
_ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))),
}, },
MessageParser::InBody(mut state) => match event { InBody(mut state) => match event {
Event::Text(ref bytes) => match std::str::from_utf8(&*bytes) { Event::Text(ref bytes) => match std::str::from_utf8(&*bytes) {
Ok(subject) => { Ok(subject) => {
state.body = Some(subject.to_string()); state.body = Some(subject.to_string());
Continuation::Continue(MessageParser::InBody(state)) Continuation::Continue(InBody(state).into())
} }
Err(err) => Continuation::Final(Err(err.into())), Err(err) => Continuation::Final(Err(err.into())),
}, },
Event::End(_) => Continuation::Continue(MessageParser::Outer(state)), Event::End(_) => Continuation::Continue(Outer(state).into()),
_ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))),
}, },
} }
@ -157,6 +170,7 @@ impl MessageType {
} }
} }
#[derive(PartialEq, Eq, Debug)]
pub struct Iq<T> { pub struct Iq<T> {
pub from: Option<String>, pub from: Option<String>,
pub id: String, pub id: String,
@ -165,6 +179,11 @@ pub struct Iq<T> {
pub body: T, pub body: T,
} }
impl<T> Iq<T> {
pub const NS: &'static str = XMLNS;
pub const NAME: &'static str = "iq";
}
impl<T: FromXml> FromXml for Iq<T> { impl<T: FromXml> FromXml for Iq<T> {
type P = IqParser<T>; type P = IqParser<T>;
@ -199,7 +218,13 @@ impl<T: FromXml> Parser for IqParser<T> {
match self.0 { match self.0 {
IqParserInner::Init => { IqParserInner::Init => {
if let Event::Start(ref bytes) = event { if let Event::Start(ref bytes) = event {
let mut state: IqParserState<T> = IqParserState { from: None, id: None, to: None, r#type: None, body: None }; let mut state: IqParserState<T> = IqParserState {
from: None,
id: None,
to: None,
r#type: None,
body: None,
};
for attr in bytes.attributes() { for attr in bytes.attributes() {
let attr = fail_fast!(attr); let attr = fail_fast!(attr);
if attr.key.0 == b"from" { if attr.key.0 == b"from" {
@ -253,6 +278,7 @@ impl<T: FromXml> Parser for IqParser<T> {
} }
} }
#[derive(PartialEq, Eq, Debug)]
pub enum IqType { pub enum IqType {
Error, Error,
Get, Get,
@ -276,6 +302,8 @@ impl IqType {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::protos::xmpp::bind::{BindRequest, Resource};
use super::*; use super::*;
use quick_xml::NsReader; use quick_xml::NsReader;
@ -284,13 +312,19 @@ mod tests {
let input = r#"<message id="aacea" type="chat" to="nikita@vlnv.dev"><subject>daa</subject><body>bbb</body></message>"#; let input = r#"<message id="aacea" type="chat" to="nikita@vlnv.dev"><subject>daa</subject><body>bbb</body></message>"#;
let mut reader = NsReader::from_reader(input.as_bytes()); let mut reader = NsReader::from_reader(input.as_bytes());
let mut buf = vec![]; let mut buf = vec![];
let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); let (ns, event) = reader
.read_resolved_event_into_async(&mut buf)
.await
.unwrap();
let mut parser = Message::parse().consume(ns, &event); let mut parser = Message::parse().consume(ns, &event);
let result = loop { let result = loop {
match parser { match parser {
Continuation::Final(res) => break res, Continuation::Final(res) => break res,
Continuation::Continue(next) => { Continuation::Continue(next) => {
let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); let (ns, event) = reader
.read_resolved_event_into_async(&mut buf)
.await
.unwrap();
parser = next.consume(ns, &event); parser = next.consume(ns, &event);
} }
} }
@ -309,4 +343,39 @@ mod tests {
} }
) )
} }
#[tokio::test]
async fn parse_iq() {
let input = r#"<iq id="bind_1" type="set"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><resource>mobile</resource></bind></iq>"#;
let mut reader = NsReader::from_reader(input.as_bytes());
let mut buf = vec![];
let (ns, event) = reader
.read_resolved_event_into_async(&mut buf)
.await
.unwrap();
let mut parser = Iq::<BindRequest>::parse().consume(ns, &event);
let result = loop {
match parser {
Continuation::Final(res) => break res,
Continuation::Continue(next) => {
let (ns, event) = reader
.read_resolved_event_into_async(&mut buf)
.await
.unwrap();
parser = next.consume(ns, &event);
}
}
}
.unwrap();
assert_eq!(
result,
Iq {
from: None,
id: "bind_1".to_string(),
to: None,
r#type: IqType::Set,
body: BindRequest(Resource("mobile".to_string()))
}
)
}
} }

View File

@ -7,7 +7,7 @@ use tokio::io::{AsyncBufRead, AsyncWrite};
use super::client::Message; use super::client::Message;
use super::skip_text; use super::skip_text;
use crate::prelude::*; use crate::prelude::*;
use crate::util::xml::{Continuation, Parser}; use crate::util::xml::{Continuation, FromXml, Parser};
pub static XMLNS: &'static str = "http://etherx.jabber.org/streams"; pub static XMLNS: &'static str = "http://etherx.jabber.org/streams";
pub static PREFIX: &'static str = "stream"; pub static PREFIX: &'static str = "stream";