diff --git a/src/projections/xmpp/mod.rs b/src/projections/xmpp/mod.rs index 494fa1c..eb1b2cb 100644 --- a/src/projections/xmpp/mod.rs +++ b/src/projections/xmpp/mod.rs @@ -291,33 +291,80 @@ async fn socket_final( xml_writer.get_mut().flush().await?; let mut parser = proto::ClientPacket::parse(); - loop { - reader_buf.clear(); - let (ns, event) = xml_reader - .read_resolved_event_into_async(reader_buf) - .await?; - if let Event::Text(ref e) = event { - if e.iter().all(|x| *x == 0xA) { - continue; - } - } - let mut events = vec![]; - match parser.consume(ns, &event) { - Continuation::Final(res) => { - let res = res?; - dbg!(&res); - let stop = handle_packet(&mut events, res, authenticated, user_handle).await?; - for i in &events { - xml_writer.write_event_async(i).await?; + let mut events = vec![]; + reader_buf.clear(); + let mut next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf)); + + 'outer: loop { + let should_recreate_xml_future = select! { + biased; + res = &mut next_xml_event => 's: { + let (ns, event) = res?; + if let Event::Text(ref e) = event { + if e.iter().all(|x| *x == 0xA) { + break 's true; + } } - events.clear(); - xml_writer.get_mut().flush().await?; - if stop { + match parser.consume(ns, &event) { + Continuation::Final(res) => { + let res = res?; + dbg!(&res); + let stop = handle_packet(&mut events, res, authenticated, user_handle).await?; + for i in &events { + xml_writer.write_event_async(i).await?; + } + events.clear(); + xml_writer.get_mut().flush().await?; + if stop { + break 'outer; + } + parser = proto::ClientPacket::parse(); + } + Continuation::Continue(p) => parser = p, + } + true + }, + update = user_handle.receiver.recv() => { + if let Some(update) = update { + match update { + crate::core::player::Updates::NewMessage { room_id, author_id, body } => { + Message { + to: Some(Jid { + name: Some(authenticated.xmpp_name.clone()), + server: Server("localhost".into()), + resource: Some(authenticated.xmpp_resource.clone()), + }), + from: Some(Jid { + name: Some(Name(std::str::from_utf8(room_id.as_bytes())?.to_owned())), + server: Server("rooms.localhost".into()), + resource: Some(Resource(std::str::from_utf8(author_id.as_bytes())?.to_owned())), + }), + id: None, + r#type: xmpp::client::MessageType::Groupchat, + lang: None, + subject: None, + body: body, + } + .serialize(&mut events); + } + _ => {}, + } + for i in &events { + xml_writer.write_event_async(i).await?; + } + events.clear(); + xml_writer.get_mut().flush().await?; + } else { + log::warn!("Player is terminated, must terminate the connection"); break; } - parser = proto::ClientPacket::parse(); + false } - Continuation::Continue(p) => parser = p, + + }; + if should_recreate_xml_future { + drop(next_xml_event); + next_xml_event = Box::pin(xml_reader.read_resolved_event_into_async(reader_buf)); } } Ok(())