From 854a244dbcaf72e302ae033e00174a76a0b9f3b1 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Sat, 30 Sep 2023 23:34:35 +0000 Subject: [PATCH] split irc proejction into a separate crate (#18) --- .gitea/workflows/test-pr.yml | 1 - Cargo.lock | 17 +- Cargo.toml | 6 +- crates/lavina-core/src/lib.rs | 3 +- crates/lavina-core/src/terminator.rs | 28 +++ crates/projection-irc/Cargo.toml | 15 ++ .../projection-irc/src/lib.rs | 70 +++----- src/main.rs | 7 +- src/prelude.rs | 25 --- src/projections/irc/test.rs | 166 ------------------ src/projections/mod.rs | 1 - src/projections/xmpp/mod.rs | 4 +- src/projections/xmpp/proto.rs | 12 +- src/util/mod.rs | 29 --- src/util/telemetry.rs | 4 +- src/util/testkit.rs | 2 +- 16 files changed, 104 insertions(+), 286 deletions(-) create mode 100644 crates/lavina-core/src/terminator.rs create mode 100644 crates/projection-irc/Cargo.toml rename src/projections/irc/mod.rs => crates/projection-irc/src/lib.rs (94%) delete mode 100644 src/prelude.rs delete mode 100644 src/projections/irc/test.rs diff --git a/.gitea/workflows/test-pr.yml b/.gitea/workflows/test-pr.yml index 18dca5b..3fabe6a 100644 --- a/.gitea/workflows/test-pr.yml +++ b/.gitea/workflows/test-pr.yml @@ -16,4 +16,3 @@ jobs: uses: https://github.com/actions-rs/cargo@v1 with: command: test - args: --workspace -- --skip projections::irc diff --git a/Cargo.lock b/Cargo.lock index 015cb46..6459827 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -857,8 +857,8 @@ dependencies = [ "lavina-core", "mgmt-api", "nonempty", + "projection-irc", "prometheus", - "proto-irc", "proto-xmpp", "quick-xml", "regex", @@ -1252,6 +1252,21 @@ dependencies = [ "yansi", ] +[[package]] +name = "projection-irc" +version = "0.0.1-dev" +dependencies = [ + "anyhow", + "futures-util", + "lavina-core", + "nonempty", + "prometheus", + "proto-irc", + "serde", + "tokio", + "tracing", +] + [[package]] name = "prometheus" version = "0.13.3" diff --git a/Cargo.toml b/Cargo.toml index 2e469b1..1bf61d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ ".", "crates/lavina-core", "crates/proto-irc", + "crates/projection-irc", "crates/proto-xmpp", "crates/mgmt-api", ] @@ -25,6 +26,7 @@ clap = { version = "4.4.4", features = ["derive"] } serde = { version = "1.0.152", features = ["rc", "serde_derive"] } tracing = "0.1.37" # logging & tracing api prometheus = { version = "0.13.3", default-features = false } +lavina-core = { path = "crates/lavina-core" } [package] name = "lavina" @@ -50,8 +52,8 @@ rustls-pemfile = "1.0.2" quick-xml.workspace = true derive_more.workspace = true uuid = { version = "1.3.0", features = ["v4"] } -lavina-core = { path = "crates/lavina-core" } -proto-irc = { path = "crates/proto-irc" } +lavina-core.workspace = true +projection-irc = { path = "crates/projection-irc" } proto-xmpp = { path = "crates/proto-xmpp" } mgmt-api = { path = "crates/mgmt-api" } clap.workspace = true diff --git a/crates/lavina-core/src/lib.rs b/crates/lavina-core/src/lib.rs index c1cef53..401e49e 100644 --- a/crates/lavina-core/src/lib.rs +++ b/crates/lavina-core/src/lib.rs @@ -1,7 +1,8 @@ //! Domain definitions and implementation of common chat logic. pub mod player; +pub mod prelude; pub mod repo; pub mod room; +pub mod terminator; -mod prelude; mod table; diff --git a/crates/lavina-core/src/terminator.rs b/crates/lavina-core/src/terminator.rs new file mode 100644 index 0000000..8abbf58 --- /dev/null +++ b/crates/lavina-core/src/terminator.rs @@ -0,0 +1,28 @@ +use crate::prelude::*; + +pub struct Terminator { + signal: Promise<()>, + completion: JoinHandle>, +} +impl Terminator { + pub async fn terminate(self) -> Result<()> { + match self.signal.send(()) { + Ok(()) => {} + Err(_) => log::warn!("Termination channel is dropped"), + } + self.completion.await??; + Ok(()) + } + + /// Used to spawn managed tasks with support for graceful shutdown + pub fn spawn(launcher: Fun) -> Terminator + where + Fun: FnOnce(Deferred<()>) -> Fut, + Fut: Future> + Send + 'static, + { + let (signal, rx) = oneshot(); + let future = launcher(rx); + let completion = tokio::task::spawn(future); + Terminator { signal, completion } + } +} diff --git a/crates/projection-irc/Cargo.toml b/crates/projection-irc/Cargo.toml new file mode 100644 index 0000000..c607327 --- /dev/null +++ b/crates/projection-irc/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "projection-irc" +edition = "2021" +version.workspace = true + +[dependencies] +lavina-core.workspace = true +tracing.workspace = true +anyhow.workspace = true +nonempty.workspace = true +serde.workspace = true +tokio.workspace = true +prometheus.workspace = true +proto-irc = { path = "../proto-irc" } +futures-util.workspace = true diff --git a/src/projections/irc/mod.rs b/crates/projection-irc/src/lib.rs similarity index 94% rename from src/projections/irc/mod.rs rename to crates/projection-irc/src/lib.rs index 1890890..42a4126 100644 --- a/src/projections/irc/mod.rs +++ b/crates/projection-irc/src/lib.rs @@ -1,11 +1,11 @@ use std::collections::HashMap; use std::net::SocketAddr; +use anyhow::{anyhow, Result}; use futures_util::future::join_all; use nonempty::nonempty; use nonempty::NonEmpty; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; - use serde::Deserialize; use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::tcp::{ReadHalf, WriteHalf}; @@ -13,17 +13,14 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::channel; use lavina_core::player::*; +use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; -use crate::prelude::*; +use lavina_core::terminator::Terminator; use proto_irc::client::{client_message, ClientMessage}; use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; +use proto_irc::user::PrefixedNick; use proto_irc::{Chan, Recipient}; -use proto_irc::user::{Prefix, PrefixedNick}; -use crate::util::Terminator; - -#[cfg(test)] -mod test; #[derive(Deserialize, Debug, Clone)] pub struct ServerConfig { @@ -70,14 +67,12 @@ async fn handle_socket( .await?; writer.flush().await?; - let registered_user: Result = - handle_registration(&mut reader, &mut writer, &mut storage).await; + let registered_user: Result = handle_registration(&mut reader, &mut writer, &mut storage).await; match registered_user { Ok(user) => { log::debug!("User registered"); - handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user) - .await?; + handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?; } Err(_) => { log::debug!("Registration failed"); @@ -159,23 +154,22 @@ async fn handle_registration<'a>( buffer.clear(); }?; - let stored_user = storage.retrieve_user_by_name(&*user.nickname).await?; let stored_user = match stored_user { Some(u) => u, None => { log::info!("User '{}' not found", user.nickname); - return Err(fail("no user found")); + return Err(anyhow!("no user found")); } }; if stored_user.password.is_none() { log::info!("Password not defined for user '{}'", user.nickname); - return Err(fail("password is not defined")); + return Err(anyhow!("password is not defined")); } if stored_user.password.as_deref() != pass.as_deref() { log::info!("Incorrect password supplied for user '{}'", user.nickname); - return Err(fail("passwords do not match")); + return Err(anyhow!("passwords do not match")); } // TODO properly implement session temination @@ -250,14 +244,7 @@ async fn handle_registered_socket<'a>( let rooms_list = connection.get_rooms().await?; for room in &rooms_list { - produce_on_join_cmd_messages( - &config, - &user, - &Chan::Global(room.id.as_inner().clone()), - room, - writer, - ) - .await?; + produce_on_join_cmd_messages(&config, &user, &Chan::Global(room.id.as_inner().clone()), room, writer).await?; } writer.flush().await?; @@ -314,10 +301,7 @@ async fn handle_update( ) -> Result<()> { log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); match update { - Updates::RoomJoined { - new_member_id, - room_id, - } => { + Updates::RoomJoined { new_member_id, room_id } => { if player_id == &new_member_id { if let Some(room) = rooms.get_room(&room_id).await { let room_info = room.get_room_info().await; @@ -438,16 +422,14 @@ async fn handle_incoming_message( Recipient::Chan(Chan::Global(chan)) => { let room_id = RoomId::from(chan)?; user_handle.send_message(room_id, body).await?; - }, + } _ => log::warn!("Unsupported target type"), }, ClientMessage::Topic { chan, topic } => { match chan { Chan::Global(chan) => { let room_id = RoomId::from(chan)?; - user_handle - .change_topic(room_id.clone(), topic.clone()) - .await?; + user_handle.change_topic(room_id.clone(), topic.clone()).await?; ServerMessage { tags: vec![], sender: Some(config.server_name.clone()), @@ -569,11 +551,7 @@ async fn handle_incoming_message( Ok(HandleResult::Continue) } -fn user_to_who_msg( - config: &ServerConfig, - requestor: &RegisteredUser, - target_user_nickname: &Str, -) -> ServerMessageBody { +fn user_to_who_msg(config: &ServerConfig, requestor: &RegisteredUser, target_user_nickname: &Str) -> ServerMessageBody { // Username is equal to nickname let username = format!("~{target_user_nickname}").into(); @@ -674,8 +652,13 @@ async fn produce_on_join_cmd_messages( } .write_async(writer) .await?; - let prefixed_members: Vec = room_info.members.iter().map(|member| PrefixedNick::from_str(member.clone().into_inner())).collect(); - let non_empty_members: NonEmpty = NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); + let prefixed_members: Vec = room_info + .members + .iter() + .map(|member| PrefixedNick::from_str(member.clone().into_inner())) + .collect(); + let non_empty_members: NonEmpty = + NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); ServerMessage { tags: vec![], @@ -710,12 +693,8 @@ pub async fn launch( ) -> Result { log::info!("Starting IRC projection"); let (stopped_tx, mut stopped_rx) = channel(32); - let current_connections = - IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; - let total_connections = IntCounter::new( - "irc_total_connections", - "Total number of opened connections", - )?; + let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; + let total_connections = IntCounter::new("irc_total_connections", "Total number of opened connections")?; metrics.register(Box::new(current_connections.clone()))?; metrics.register(Box::new(total_connections.clone()))?; @@ -780,7 +759,8 @@ pub async fn launch( log::warn!("IRC connection to {socket_addr} finished with error: {err}") } } - })).await; + })) + .await; log::info!("Stopped IRC projection"); Ok(()) }); diff --git a/src/main.rs b/src/main.rs index c23a7c0..61ff315 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,6 @@ impl_trait_in_assoc_type )] -mod prelude; mod projections; mod util; @@ -19,14 +18,14 @@ use prometheus::Registry as MetricsRegistry; use serde::Deserialize; use lavina_core::player::PlayerRegistry; +use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::RoomRegistry; -use crate::prelude::*; #[derive(Deserialize, Debug)] struct ServerConfig { telemetry: util::telemetry::ServerConfig, - irc: projections::irc::ServerConfig, + irc: projection_irc::ServerConfig, xmpp: projections::xmpp::ServerConfig, storage: lavina_core::repo::StorageConfig, } @@ -64,7 +63,7 @@ async fn main() -> Result<()> { let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; - let irc = projections::irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; + let irc = projection_irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; tracing::info!("Started"); diff --git a/src/prelude.rs b/src/prelude.rs deleted file mode 100644 index 9488bc2..0000000 --- a/src/prelude.rs +++ /dev/null @@ -1,25 +0,0 @@ -pub use std::future::Future; - -pub use tokio::pin; -pub use tokio::select; -pub use tokio::sync::oneshot::{channel as oneshot, Receiver as Deferred, Sender as Promise}; -pub use tokio::task::JoinHandle; - -pub mod log { - pub use tracing::{debug, error, info, warn}; -} - -pub type Result = std::result::Result; -pub type Str = std::sync::Arc; - -pub fn fail(msg: &str) -> anyhow::Error { - anyhow::Error::msg(msg.to_owned()) -} - -macro_rules! ffail { - ($($arg:tt)*) => { - fail(&format!($($arg)*)) - }; -} - -pub(crate) use ffail; diff --git a/src/projections/irc/test.rs b/src/projections/irc/test.rs deleted file mode 100644 index c3b238f..0000000 --- a/src/projections/irc/test.rs +++ /dev/null @@ -1,166 +0,0 @@ -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::net::tcp::{ReadHalf, WriteHalf}; -use tokio::net::TcpStream; - -use crate::prelude::*; - -struct TestScope<'a> { - reader: BufReader>, - writer: WriteHalf<'a>, - buffer: Vec, -} -impl<'a> TestScope<'a> { - async fn send(&mut self, str: &(impl AsRef + ?Sized)) -> Result<()> { - self.writer.write_all(str.as_ref().as_bytes()).await?; - self.writer.flush().await?; - Ok(()) - } - - async fn expect(&mut self, str: &(impl AsRef + ?Sized)) -> Result<()> { - let len = self.reader.read_until(b'\n', &mut self.buffer).await?; - assert_eq!(std::str::from_utf8(&self.buffer[0..len])?, str.as_ref()); - self.buffer.clear(); - Ok(()) - } - - async fn assert(&mut self, f: impl FnOnce(&str) -> Result<()>) -> Result<()> { - let len = self.reader.read_until(b'\n', &mut self.buffer).await?; - let res = f(std::str::from_utf8(&self.buffer[0..len])?); - self.buffer.clear(); - res - } -} - -async fn init_client(stream: &mut TcpStream) -> Result { - let (reader, writer) = stream.split(); - let reader = BufReader::new(reader); - let buffer = vec![]; - Ok(TestScope { - reader, - writer, - buffer, - }) -} - -macro_rules! send { - ($scope: expr, $($arg:tt),*) => {{ - $scope.send(&format!($($arg,)*)).await?; - }}; -} - -macro_rules! expect { - ($scope: expr, $($arg:tt),*) => {{ - $scope.expect(&format!($($arg,)*)).await?; - }}; -} - -const SERVER_HOSTNAME: &'static str = "irc.localhost"; - -#[rustfmt::skip] -async fn registration(scope: &mut TestScope<'_>, nickname: &str) -> Result<()> { - expect!(scope, ":irc.localhost NOTICE * :Welcome to my server!\r\n"); - send!(scope, "NICK {nickname}\r\n"); - send!(scope, "USER UserName 0 * :Real Name\r\n"); - expect!(scope, ":irc.localhost 001 {nickname} :Welcome to Kek Server\r\n"); - expect!(scope, ":irc.localhost 002 {nickname} :Welcome to Kek Server\r\n"); - expect!(scope, ":irc.localhost 003 {nickname} :Welcome to Kek Server\r\n"); - expect!(scope, ":irc.localhost 004 {nickname} irc.localhost kek-0.1.alpha.3 r CFILPQbcefgijklmnopqrstvz\r\n"); - expect!(scope, ":irc.localhost 005 {nickname} CHANTYPES=# :are supported by this server\r\n"); - Ok(()) -} - -#[rustfmt::skip] -async fn join(scope: &mut TestScope<'_>, nickname: &str, chan: &str) -> Result<()> { - send!(scope, "JOIN #{chan}\r\n"); - expect!(scope, ":{nickname} JOIN #{chan}\r\n"); - expect!(scope, ":irc.localhost 332 {nickname} #{chan} :New room\r\n"); - expect!(scope, ":irc.localhost 353 {nickname} = #{chan} :{nickname}\r\n"); - expect!(scope, ":irc.localhost 366 {nickname} #{chan} :End of /NAMES list\r\n"); - Ok(()) -} - -#[tokio::test] -async fn test_registration() -> Result<()> { - let mut stream = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope = init_client(&mut stream).await?; - - registration(&mut scope, "NickName1").await?; - join(&mut scope, "NickName1", "chan1").await?; - Ok(()) -} - -#[rustfmt::skip] -#[tokio::test] -async fn test_two_connections_one_player() -> Result<()> { - let mut stream1 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope1 = init_client(&mut stream1).await?; - let mut stream2 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope2 = init_client(&mut stream2).await?; - - let nickname = "NickName2"; - let chan = "chan2"; - - registration(&mut scope1, nickname).await?; - registration(&mut scope2, nickname).await?; - join(&mut scope1, nickname, chan).await?; - - // force join on second connection when the other one joins a room - expect!(scope2, ":{nickname} JOIN #{chan}\r\n"); - expect!(scope2, ":{SERVER_HOSTNAME} 332 {nickname} #{chan} :New room\r\n"); - expect!(scope2, ":{SERVER_HOSTNAME} 353 {nickname} = #{chan} :{nickname}\r\n"); - expect!(scope2, ":{SERVER_HOSTNAME} 366 {nickname} #{chan} :End of /NAMES list\r\n"); - - // force send PRIVMSG to other connections - send!(scope1, "PRIVMSG #{chan} :Chmoki vsem v etam chati!\r\n"); - expect!(scope2, ":{nickname} PRIVMSG #{chan} :Chmoki vsem v etam chati!\r\n"); - send!(scope2, "PRIVMSG #{chan} :I tebe privetiki\r\n"); - expect!(scope1, ":{nickname} PRIVMSG #{chan} :I tebe privetiki\r\n"); - - let mut stream3 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope3 = init_client(&mut stream3).await?; - registration(&mut scope3, nickname).await?; - - // force join on registration - expect!(scope3, ":{nickname} JOIN #{chan}\r\n"); - expect!(scope3, ":{SERVER_HOSTNAME} 332 {nickname} #{chan} :New room\r\n"); - expect!(scope3, ":{SERVER_HOSTNAME} 353 {nickname} = #{chan} :{nickname}\r\n"); - expect!(scope3, ":{SERVER_HOSTNAME} 366 {nickname} #{chan} :End of /NAMES list\r\n"); - - Ok(()) -} - -#[rustfmt::skip] -#[tokio::test] -async fn test_two_players() -> Result<()> { - let mut stream1 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope1 = init_client(&mut stream1).await?; - let mut stream2 = TcpStream::connect("127.0.0.1:6667").await?; - let mut scope2 = init_client(&mut stream2).await?; - - let nickname1 = "NickName3"; - let nickname2 = "NickName4"; - - let chan = "chan3"; - - registration(&mut scope1, "NickName3").await?; - registration(&mut scope2, "NickName4").await?; - join(&mut scope1, nickname1, "chan3").await?; - send!(scope2, "JOIN #{chan}\r\n"); - expect!(scope2, ":{nickname2} JOIN #{chan}\r\n"); - expect!(scope2, ":irc.localhost 332 {nickname2} #{chan} :New room\r\n"); - scope2.assert(|line| { - if line == format!(":irc.localhost 353 {nickname2} = #{chan} :{nickname1} {nickname2}\r\n") { return Ok(()) } - if line == format!(":irc.localhost 353 {nickname2} = #{chan} :{nickname2} {nickname1}\r\n") { return Ok(()) } - panic!("incorrect chan member list received: {line}"); - }).await?; - expect!(scope2, ":irc.localhost 366 {nickname2} #{chan} :End of /NAMES list\r\n"); - - expect!(scope1, ":{nickname2} JOIN #{chan}\r\n"); - - send!(scope1, "PRIVMSG #chan3 :Chmoki vsem v etam chati!\r\n"); - expect!(scope2, ":{nickname1} PRIVMSG #chan3 :Chmoki vsem v etam chati!\r\n"); - send!(scope2, "PRIVMSG #chan3 :I tebe privetiki\r\n"); - expect!(scope1, ":{nickname2} PRIVMSG #chan3 :I tebe privetiki\r\n"); - - Ok(()) -} diff --git a/src/projections/mod.rs b/src/projections/mod.rs index 333c971..3d73ce5 100644 --- a/src/projections/mod.rs +++ b/src/projections/mod.rs @@ -1,3 +1,2 @@ //! Protocol projections — implementations of public APIs. -pub mod irc; pub mod xmpp; diff --git a/src/projections/xmpp/mod.rs b/src/projections/xmpp/mod.rs index 5b9e035..530385c 100644 --- a/src/projections/xmpp/mod.rs +++ b/src/projections/xmpp/mod.rs @@ -20,8 +20,9 @@ use tokio_rustls::rustls::{Certificate, PrivateKey}; use tokio_rustls::TlsAcceptor; use lavina_core::player::{PlayerConnection, PlayerId, PlayerRegistry}; +use lavina_core::prelude::*; use lavina_core::room::{RoomId, RoomRegistry}; -use crate::prelude::*; +use lavina_core::terminator::Terminator; use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; use proto_xmpp::client::{Iq, Message, MessageType, Presence}; use proto_xmpp::disco::*; @@ -29,7 +30,6 @@ use proto_xmpp::roster::RosterQuery; use proto_xmpp::session::Session; use proto_xmpp::stream::*; use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; -use crate::util::Terminator; use self::proto::{ClientPacket, IqClientBody}; diff --git a/src/projections/xmpp/proto.rs b/src/projections/xmpp/proto.rs index 5fd54d1..f9bed8c 100644 --- a/src/projections/xmpp/proto.rs +++ b/src/projections/xmpp/proto.rs @@ -1,7 +1,9 @@ +use anyhow::anyhow; use derive_more::From; use quick_xml::events::Event; use quick_xml::name::{Namespace, ResolveResult}; +use lavina_core::prelude::*; use proto_xmpp::bind::BindRequest; use proto_xmpp::client::{Iq, Message, Presence}; use proto_xmpp::disco::{InfoQuery, ItemQuery}; @@ -9,8 +11,6 @@ use proto_xmpp::roster::RosterQuery; use proto_xmpp::session::Session; use proto_xmpp::xml::*; -use crate::prelude::*; - #[derive(PartialEq, Eq, Debug, From)] pub enum IqClientBody { Bind(BindRequest), @@ -29,7 +29,7 @@ impl FromXml for IqClientBody { let bytes = match event { Event::Start(bytes) => bytes, Event::Empty(bytes) => bytes, - _ => return Err(ffail!("Unexpected XML event: {event:?}")), + _ => return Err(anyhow!("Unexpected XML event: {event:?}")), }; let name = bytes.name(); match_parser!(name, namespace, event; @@ -67,7 +67,7 @@ impl FromXml for ClientPacket { Presence::, Message, { - Err(ffail!( + Err(anyhow!( "Unexpected XML event of name {:?} in namespace {:?}", name, namespace @@ -80,11 +80,11 @@ impl FromXml for ClientPacket { if name.local_name().as_ref() == b"stream" { return Ok(ClientPacket::StreamEnd); } else { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); } } _ => { - return Err(ffail!("Unexpected XML event: {event:?}")); + return Err(anyhow!("Unexpected XML event: {event:?}")); } } } diff --git a/src/util/mod.rs b/src/util/mod.rs index b3d7d82..90e7c7c 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,32 +1,3 @@ -use crate::prelude::*; - pub mod telemetry; #[cfg(test)] pub mod testkit; - -pub struct Terminator { - signal: Promise<()>, - completion: JoinHandle>, -} -impl Terminator { - pub async fn terminate(self) -> Result<()> { - match self.signal.send(()) { - Ok(()) => {} - Err(_) => log::warn!("Termination channel is dropped"), - } - self.completion.await??; - Ok(()) - } - - /// Used to spawn managed tasks with support for graceful shutdown - pub fn spawn(launcher: Fun) -> Terminator - where - Fun: FnOnce(Deferred<()>) -> Fut, - Fut: Future> + Send + 'static, - { - let (signal, rx) = oneshot(); - let future = launcher(rx); - let completion = tokio::task::spawn(future); - Terminator { signal, completion } - } -} diff --git a/src/util/telemetry.rs b/src/util/telemetry.rs index 92006ad..f238978 100644 --- a/src/util/telemetry.rs +++ b/src/util/telemetry.rs @@ -11,10 +11,10 @@ use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; +use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::RoomRegistry; -use crate::prelude::*; -use crate::util::Terminator; +use lavina_core::terminator::Terminator; use mgmt_api::*; diff --git a/src/util/testkit.rs b/src/util/testkit.rs index d568bea..59277e0 100644 --- a/src/util/testkit.rs +++ b/src/util/testkit.rs @@ -2,7 +2,7 @@ use std::task::{Context, Poll}; use futures_util::task::noop_waker_ref; -use crate::prelude::*; +use lavina_core::prelude::*; pub fn sync_future(future: impl Future) -> Result { let waker = noop_waker_ref();