From 4621470bdebbab4ba0d8a9acc21d659710250aec Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Mon, 2 Oct 2023 23:35:23 +0200 Subject: [PATCH 1/2] fix graceful shutdown --- crates/lavina-core/src/player.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 763a82a..aa8ed10 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -155,6 +155,7 @@ enum PlayerCommand { GetRooms(Promise>), /** Events from rooms */ Update(Updates), + Stop, } pub enum Cmd { @@ -241,9 +242,8 @@ impl PlayerRegistry { pub async fn shutdown_all(&mut self) -> Result<()> { let mut inner = self.0.write().unwrap(); - let mut players = HashMap::new(); - std::mem::swap(&mut players, &mut inner.players); for (i, (k, j)) in inner.players.drain() { + k.send(PlayerCommand::Stop).await; drop(k); j.await?; log::debug!("Player stopped #{i:?}") @@ -325,6 +325,7 @@ impl Player { } } PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, + PlayerCommand::Stop => break, } } log::debug!("Shutting down player actor #{:?}", self.player_id); From 887fd95194fb9e1ea1ab72069b71ca7f1d68dfec Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Tue, 3 Oct 2023 00:17:48 +0200 Subject: [PATCH 2/2] xmpp: fix parsing of unknown elements in messages --- crates/proto-xmpp/src/client.rs | 77 +++++++++++++++++++------------- crates/proto-xmpp/src/xml/mod.rs | 19 +++++++- 2 files changed, 64 insertions(+), 32 deletions(-) diff --git a/crates/proto-xmpp/src/client.rs b/crates/proto-xmpp/src/client.rs index fd7ac93..8276d7c 100644 --- a/crates/proto-xmpp/src/client.rs +++ b/crates/proto-xmpp/src/client.rs @@ -103,14 +103,22 @@ impl Parser for MessageParser { Continuation::Final(Err(ffail!("Expected start"))) } } - Outer(state) => match event { + Outer(mut state) => match event { Event::Start(ref bytes) => { if bytes.name().0 == b"subject" { Continuation::Continue(InSubject(state).into()) } else if bytes.name().0 == b"body" { Continuation::Continue(InBody(state).into()) } else { - Continuation::Continue(InCustom(state, T::parse()).into()) + let parser = T::parse(); + match parser.consume(namespace, event) { + Continuation::Final(Ok(e)) => { + state.custom.push(e); + Continuation::Continue(Outer(state).into()) + } + Continuation::Final(Err(e)) => Continuation::Final(Err(e)), + Continuation::Continue(p) => Continuation::Continue(InCustom(state, p).into()), + } } } Event::End(_) => { @@ -129,6 +137,17 @@ impl Parser for MessageParser { Continuation::Final(Err(ffail!("Body not found"))) } } + Event::Empty(_) => { + let parser = T::parse(); + match parser.consume(namespace, event) { + Continuation::Final(Ok(e)) => { + state.custom.push(e); + Continuation::Continue(Outer(state).into()) + } + Continuation::Final(Err(e)) => Continuation::Final(Err(e)), + Continuation::Continue(p) => Continuation::Continue(InCustom(state, p).into()), + } + } _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), }, InSubject(mut state) => match event { @@ -151,7 +170,7 @@ impl Parser for MessageParser { Event::End(_) => Continuation::Continue(Outer(state).into()), _ => Continuation::Final(Err(ffail!("Unexpected XML event: {event:?}"))), }, - InCustom(mut state, mut custom) => match custom.consume(namespace, event) { + InCustom(mut state, custom) => match custom.consume(namespace, event) { Continuation::Final(Ok(e)) => { state.custom.push(e); Continuation::Continue(Outer(state).into()) @@ -594,25 +613,34 @@ mod tests { use crate::bind::{BindRequest, Name, Resource, Server}; use super::*; - use quick_xml::NsReader; #[tokio::test] async fn parse_message() { let input = r#"daabbb"#; - let mut reader = NsReader::from_reader(input.as_bytes()); - let mut buf = vec![]; - let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); - let mut parser = Message::parse().consume(ns, &event); - let result = loop { - match parser { - Continuation::Final(res) => break res, - Continuation::Continue(next) => { - let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); - parser = next.consume(ns, &event); - } + let result: Message = crate::xml::parse(input).unwrap(); + assert_eq!( + result, + Message:: { + from: None, + id: Some("aacea".to_string()), + to: Some(Jid { + name: Some(Name("nikita".into())), + server: Server("vlnv.dev".into()), + resource: None + }), + r#type: MessageType::Chat, + lang: None, + subject: Some("daa".into()), + body: "bbb".into(), + custom: vec![Ignore], } - } - .unwrap(); + ) + } + + #[tokio::test] + async fn parse_message_empty_custom() { + let input = r#"daabbb"#; + let result: Message = crate::xml::parse(input).unwrap(); assert_eq!( result, Message:: { @@ -635,20 +663,7 @@ mod tests { #[tokio::test] async fn parse_iq() { let input = r#"mobile"#; - let mut reader = NsReader::from_reader(input.as_bytes()); - let mut buf = vec![]; - let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); - let mut parser = Iq::::parse().consume(ns, &event); - let result = loop { - match parser { - Continuation::Final(res) => break res, - Continuation::Continue(next) => { - let (ns, event) = reader.read_resolved_event_into_async(&mut buf).await.unwrap(); - parser = next.consume(ns, &event); - } - } - } - .unwrap(); + let result: Iq = crate::xml::parse(input).unwrap(); assert_eq!( result, Iq { diff --git a/crates/proto-xmpp/src/xml/mod.rs b/crates/proto-xmpp/src/xml/mod.rs index 771bef4..c67fe49 100644 --- a/crates/proto-xmpp/src/xml/mod.rs +++ b/crates/proto-xmpp/src/xml/mod.rs @@ -1,10 +1,11 @@ use std::ops::Generator; use std::pin::Pin; +use quick_xml::NsReader; use quick_xml::events::Event; use quick_xml::name::ResolveResult; -use anyhow::{anyhow, Result}; +use anyhow::Result; mod ignore; pub use ignore::Ignore; @@ -63,6 +64,22 @@ pub enum Continuation { Continue(Parser), } +pub fn parse(input: &str) -> Result { + let mut reader = NsReader::from_reader(input.as_bytes()); + let mut buf = vec![]; + let (ns, event) = reader.read_resolved_event_into(&mut buf)?; + let mut parser: Continuation<_, std::result::Result> = T::parse().consume(ns, &event); + loop { + match parser { + Continuation::Final(res) => break res, + Continuation::Continue(next) => { + let (ns, event) = reader.read_resolved_event_into(&mut buf)?; + parser = next.consume(ns, &event); + } + } + } +} + macro_rules! fail_fast { ($errorable: expr) => { match $errorable {