From e0ae11c02d6e13f86944ced980882638cf630c3b Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Tue, 7 Feb 2023 16:21:00 +0100 Subject: [PATCH] irc parsing and initial projection --- Cargo.lock | 120 +++++++++++----- Cargo.toml | 2 + src/main.rs | 6 + src/projections/irc.rs | 85 ++++++++++++ src/projections/mod.rs | 1 + src/protos/irc/mod.rs | 301 +++++++++++++++++++++++++++++++++++++++++ src/protos/mod.rs | 2 + src/util/mod.rs | 29 ++++ 8 files changed, 510 insertions(+), 36 deletions(-) create mode 100644 src/projections/irc.rs create mode 100644 src/protos/irc/mod.rs create mode 100644 src/protos/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 4137ee4..11501d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,9 +13,15 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" +checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800" + +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" [[package]] name = "async-trait" @@ -84,9 +90,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "cfg-if" @@ -125,9 +131,9 @@ dependencies = [ [[package]] name = "encoding_rs" -version = "0.8.31" +version = "0.8.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" dependencies = [ "cfg-if", ] @@ -328,9 +334,9 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "hyper" -version = "0.14.23" +version = "0.14.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" +checksum = "5e011372fa0b68db8350aa7a248930ecc7839bf46d8485577d69f117a75f164c" dependencies = [ "bytes", "futures-channel", @@ -411,9 +417,9 @@ checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" [[package]] name = "js-sys" -version = "0.3.60" +version = "0.3.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47" +checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730" dependencies = [ "wasm-bindgen", ] @@ -423,10 +429,12 @@ name = "lavina" version = "0.1.0" dependencies = [ "anyhow", + "assert_matches", "figment", "futures-util", "http-body-util", "hyper 1.0.0-rc.2", + "nom", "prometheus", "regex", "reqwest", @@ -489,6 +497,12 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "0.8.5" @@ -498,7 +512,17 @@ dependencies = [ "libc", "log", "wasi", - "windows-sys", + "windows-sys 0.42.0", +] + +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", ] [[package]] @@ -545,15 +569,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.6" +version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf" +checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" dependencies = [ "cfg-if", "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.45.0", ] [[package]] @@ -605,9 +629,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" +checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" dependencies = [ "unicode-ident", ] @@ -718,7 +742,7 @@ dependencies = [ "h2", "http", "http-body 0.4.5", - "hyper 0.14.23", + "hyper 0.14.24", "ipnet", "js-sys", "log", @@ -772,9 +796,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" +checksum = "7434af0dc1cbd59268aa98b4c22c131c0584d2232f6fb166efb993e2832e896a" dependencies = [ "itoa", "ryu", @@ -898,9 +922,9 @@ dependencies = [ [[package]] name = "tinyvec_macros" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" @@ -919,7 +943,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -1140,9 +1164,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.83" +version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268" +checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1150,9 +1174,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.83" +version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142" +checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" dependencies = [ "bumpalo", "log", @@ -1165,9 +1189,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.33" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d" +checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454" dependencies = [ "cfg-if", "js-sys", @@ -1177,9 +1201,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.83" +version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810" +checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1187,9 +1211,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.83" +version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" +checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" dependencies = [ "proc-macro2", "quote", @@ -1200,15 +1224,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.83" +version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" +checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" [[package]] name = "web-sys" -version = "0.3.60" +version = "0.3.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" +checksum = "e33b99f4b23ba3eec1a53ac264e35a755f00e966e0065077d6027c0f575b0b97" dependencies = [ "js-sys", "wasm-bindgen", @@ -1251,6 +1275,30 @@ dependencies = [ "windows_x86_64_msvc", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.1" diff --git a/Cargo.toml b/Cargo.toml index b7b457d..ae5757e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,9 @@ tokio-tungstenite = "0.18.0" futures-util = "0.3.25" prometheus = { version = "0.13.3", default_features = false } regex = "1.7.1" +nom = "7.1.3" [dev-dependencies] +assert_matches = "1.5.0" regex = "1.7.1" reqwest = { version = "0.11", default_features = false } diff --git a/src/main.rs b/src/main.rs index fcb099c..870f2ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod core; mod http; mod prelude; mod projections; +mod protos; mod tcp; mod util; @@ -63,6 +64,10 @@ async fn main() -> Result<()> { players.clone(), ) .await?; + let irc_config = projections::irc::ServerConfig { + listen_on: "127.0.0.1:6667".parse()?, + }; + let irc = projections::irc::launch(irc_config, players).await?; tracing::info!("Started"); @@ -70,6 +75,7 @@ async fn main() -> Result<()> { // sleep.await; tracing::info!("Begin shutdown"); + irc.terminate().await?; http_server_actor.terminate().await?; tracing::info!("Shutdown complete"); Ok(()) diff --git a/src/projections/irc.rs b/src/projections/irc.rs new file mode 100644 index 0000000..80f9f1d --- /dev/null +++ b/src/projections/irc.rs @@ -0,0 +1,85 @@ +use std::net::SocketAddr; + +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::oneshot::channel; + +use crate::core::player::{PlayerRegistry, Updates}; +use crate::prelude::*; +use crate::protos::irc::*; +use crate::util::Terminator; + +pub struct ServerConfig { + pub listen_on: SocketAddr, +} + +async fn handle_socket( + mut stream: TcpStream, + socket_addr: SocketAddr, + mut players: PlayerRegistry, +) { + let (reader, writer) = stream.split(); + let mut reader = BufReader::new(reader); + let mut writer = BufWriter::new(writer); + + let mut buffer = vec![]; + + // let (player_id, mut player_handle) = players.create_player().await; + // let mut sub = player_handle.subscribe().await; + + loop { + select! { + biased; + res = reader.read_until(b'\n', &mut buffer) => { + match res { + Ok(len) => if len == 0 { + log::info!("Terminating socket"); + break; + }, + Err(err) => log::warn!("Failed to read from socket: {err}"), + } + let parsed = parse_client_message(&buffer[..]); + match parsed { + Ok((rest, msg)) => { + log::info!("Incoming IRC message: {msg:?}"); + }, + Err(err) => { + log::warn!("Failed to parse IRC message: {err}"); + }, + } + buffer.clear(); + }, + } + } +} + +pub async fn launch(config: ServerConfig, players: PlayerRegistry) -> Result { + log::info!("Starting IRC projection"); + let (signal, mut rx) = channel(); + let listener = TcpListener::bind(config.listen_on).await?; + log::debug!("Listener started"); + + let handle = tokio::task::spawn(async move { + loop { + select! { + biased; + _ = &mut rx => break, + new_conn = listener.accept() => { + match new_conn { + Ok((stream, socket_addr)) => { + log::debug!("Incoming connection from {socket_addr}"); + let handle = tokio::task::spawn(handle_socket(stream, socket_addr, players.clone())); + }, + Err(err) => log::warn!("Failed to accept new connection: {err}"), + } + }, + } + } + + log::info!("Stopping IRC projection"); + Ok(()) + }); + + let terminator = Terminator::from_raw(signal, handle); + Ok(terminator) +} diff --git a/src/projections/mod.rs b/src/projections/mod.rs index f7a2536..b5e9363 100644 --- a/src/projections/mod.rs +++ b/src/projections/mod.rs @@ -1,2 +1,3 @@ //! Protocol projections — implementations of public APIs. +pub mod irc; pub mod trivial; diff --git a/src/protos/irc/mod.rs b/src/protos/irc/mod.rs new file mode 100644 index 0000000..be8a263 --- /dev/null +++ b/src/protos/irc/mod.rs @@ -0,0 +1,301 @@ +//! Client-to-Server IRC protocol. + +use std::io::Write; + +use nom::{ + branch::alt, + bytes::complete::{tag, take, take_while}, + IResult, +}; + +type ByteVec = Vec; + +/// Server-to-client message. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ServerMessage { + /// Optional tags section, prefixed with `@` + tags: Vec, + /// Optional server name, prefixed with `:`. + sender: Option, + body: ServerMessageBody, +} +impl ServerMessage { + pub fn write(&self, writer: &mut impl Write) -> std::io::Result<()> { + self.body.write(writer)?; + writer.write(b"\n")?; + Ok(()) + } +} + +pub fn server_message(input: &[u8]) -> IResult<&[u8], ServerMessage> { + let (input, command) = server_message_body(input)?; + let (input, _) = tag(b"\n")(input)?; + + let message = ServerMessage { + tags: vec![], + sender: None, + body: command, + }; + Ok((input, message)) +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ServerMessageBody { + Notice { + first_target: ByteVec, + rest_targets: Vec, + text: ByteVec, + }, + Ping { + token: ByteVec, + }, + Pong { + from: ByteVec, + token: ByteVec, + }, +} +impl ServerMessageBody { + pub fn write(&self, writer: &mut impl Write) -> std::io::Result<()> { + match self { + ServerMessageBody::Notice { + first_target, + rest_targets, + text, + } => { + writer.write(b"NOTICE ")?; + writer.write(&first_target)?; + writer.write(b" :")?; + writer.write(&text)?; + } + ServerMessageBody::Ping { token } => { + writer.write(b"PING ")?; + writer.write(&token)?; + } + ServerMessageBody::Pong { from, token } => { + writer.write(b"PONG ")?; + writer.write(&from)?; + writer.write(b" :")?; + writer.write(&token)?; + } + } + Ok(()) + } +} + +fn server_message_body(input: &[u8]) -> IResult<&[u8], ServerMessageBody> { + alt(( + sserver_message_body_notice, + server_message_body_ping, + server_message_body_pong, + ))(input) +} + +fn sserver_message_body_notice(input: &[u8]) -> IResult<&[u8], ServerMessageBody> { + let (input, _) = tag("NOTICE ")(input)?; + let (input, first_target) = receiver(input)?; + let (input, _) = tag(" :")(input)?; + let (input, text) = token(input)?; + + let first_target = first_target.to_owned(); + let text = text.to_owned(); + Ok(( + input, + ServerMessageBody::Notice { + first_target, + rest_targets: vec![], + text, + }, + )) +} + +fn server_message_body_ping(input: &[u8]) -> IResult<&[u8], ServerMessageBody> { + let (input, _) = tag("PING ")(input)?; + let (input, token) = token(input)?; + + Ok(( + input, + ServerMessageBody::Ping { + token: token.to_owned(), + }, + )) +} + +fn server_message_body_pong(input: &[u8]) -> IResult<&[u8], ServerMessageBody> { + let (input, _) = tag("PONG ")(input)?; + let (input, from) = receiver(input)?; + let (input, _) = tag(" :")(input)?; + let (input, token) = token(input)?; + + Ok(( + input, + ServerMessageBody::Pong { + from: from.to_owned(), + token: token.to_owned(), + }, + )) +} + +/// Single message tag value. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Tag { + key: ByteVec, + value: Option, +} + +/// Client-to-server command. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ClientMessage { + /// CAP. Capability-related commands. + Capability { + subcommand: CapabilitySubcommand, + }, + /// PING + Ping { + token: ByteVec, + }, + Pong { + from: ByteVec, + token: ByteVec, + }, +} + +fn receiver(input: &[u8]) -> IResult<&[u8], &[u8]> { + take_while(|i| i != b'\n' && i != b'\r' && i != b' ')(input) +} + +fn token(input: &[u8]) -> IResult<&[u8], &[u8]> { + take_while(|i| i != b'\n' && i != b'\r')(input) +} + +pub fn parse_client_message(input: &[u8]) -> IResult<&[u8], ClientMessage> { + alt((command_capability, command_ping))(input) +} + +fn command_capability(input: &[u8]) -> IResult<&[u8], ClientMessage> { + let (input, _) = tag("CAP ")(input)?; + let (input, subcommand) = capability_subcommand(input)?; + + Ok((input, ClientMessage::Capability { subcommand })) +} + +fn command_ping(input: &[u8]) -> IResult<&[u8], ClientMessage> { + let (input, _) = tag("PING ")(input)?; + let (input, token) = token(input)?; + + Ok(( + input, + ClientMessage::Ping { + token: token.to_owned(), + }, + )) +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum CapabilitySubcommand { + /// CAP LS {code} + List { code: [u8; 3] }, + /// CAP END + End, +} + +fn capability_subcommand(input: &[u8]) -> IResult<&[u8], CapabilitySubcommand> { + alt((capability_subcommand_ls, capability_subcommand_end))(input) +} + +fn capability_subcommand_ls(input: &[u8]) -> IResult<&[u8], CapabilitySubcommand> { + let (input, _) = tag("LS ")(input)?; + let (input, code) = take(3usize)(input)?; + + Ok(( + input, + CapabilitySubcommand::List { + code: code.try_into().unwrap(), + }, + )) +} + +fn capability_subcommand_end(input: &[u8]) -> IResult<&[u8], CapabilitySubcommand> { + let (input, _) = tag("END")(input)?; + Ok((input, CapabilitySubcommand::End)) +} + +#[cfg(test)] +mod test { + use assert_matches::*; + + use super::*; + + #[test] + fn test_server_message_notice() { + let input = b"NOTICE * :*** Looking up your hostname...\n"; + let expected = ServerMessage { + tags: vec![], + sender: None, + body: ServerMessageBody::Notice { + first_target: b"*".to_vec(), + rest_targets: vec![], + text: b"*** Looking up your hostname...".to_vec(), + }, + }; + + let result = server_message(input); + assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); + + let mut bytes = vec![]; + expected.write(&mut bytes).unwrap(); + assert_eq!(bytes, input); + } + + #[test] + fn test_server_message_pong() { + let input = b"PONG server.example :LAG004911\n"; + let expected = ServerMessage { + tags: vec![], + sender: None, + body: ServerMessageBody::Pong { + from: b"server.example".to_vec(), + token: b"LAG004911".to_vec(), + }, + }; + + let result = server_message(input); + assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); + + let mut bytes = vec![]; + expected.write(&mut bytes).unwrap(); + assert_eq!(bytes, input); + } + + #[test] + fn test_client_message_cap_ls() { + let input = b"CAP LS 302"; + let expected = ClientMessage::Capability { + subcommand: CapabilitySubcommand::List { code: *b"302" }, + }; + + let result = parse_client_message(input); + assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); + } + + #[test] + fn test_client_message_cap_end() { + let input = b"CAP END"; + let expected = ClientMessage::Capability { + subcommand: CapabilitySubcommand::End, + }; + + let result = parse_client_message(input); + assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); + } + + #[test] + fn test_client_message_ping() { + let input = b"PING 1337"; + let expected = ClientMessage::Ping { + token: b"1337".to_vec(), + }; + + let result = parse_client_message(input); + assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); + } +} diff --git a/src/protos/mod.rs b/src/protos/mod.rs new file mode 100644 index 0000000..410866f --- /dev/null +++ b/src/protos/mod.rs @@ -0,0 +1,2 @@ +//! Definitions of wire protocols to be used in implementations of projections. +pub mod irc; diff --git a/src/util/mod.rs b/src/util/mod.rs index 13971b0..2612162 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1 +1,30 @@ +use futures_util::FutureExt; +use tokio::sync::oneshot::{channel, Sender}; + +use crate::prelude::*; + pub mod table; + +pub struct Terminator { + signal: Sender<()>, + completion: JoinHandle>, +} +impl Terminator { + pub fn from_raw(signal: Sender<()>, completion: JoinHandle>) -> Terminator { + Terminator { signal, completion } + } + + pub fn new(completion: JoinHandle>) -> (Terminator, impl Future) { + let (signal, rx) = channel(); + (Terminator { signal, completion }, rx.map(|_| ())) + } + + pub async fn terminate(self) -> Result<()> { + match self.signal.send(()) { + Ok(()) => {} + Err(_) => log::error!("Termination channel is dropped"), + } + self.completion.await??; + Ok(()) + } +}