forked from lavina/lavina
1
0
Fork 0

irc parsing and initial projection

This commit is contained in:
Nikita Vilunov 2023-02-07 16:21:00 +01:00
parent f9a6d8bdfc
commit e0ae11c02d
8 changed files with 510 additions and 36 deletions

120
Cargo.lock generated
View File

@ -13,9 +13,15 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.68"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800"
[[package]]
name = "assert_matches"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-trait"
@ -84,9 +90,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.3.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cfg-if"
@ -125,9 +131,9 @@ dependencies = [
[[package]]
name = "encoding_rs"
version = "0.8.31"
version = "0.8.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394"
dependencies = [
"cfg-if",
]
@ -328,9 +334,9 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.23"
version = "0.14.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c"
checksum = "5e011372fa0b68db8350aa7a248930ecc7839bf46d8485577d69f117a75f164c"
dependencies = [
"bytes",
"futures-channel",
@ -411,9 +417,9 @@ checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
[[package]]
name = "js-sys"
version = "0.3.60"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47"
checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730"
dependencies = [
"wasm-bindgen",
]
@ -423,10 +429,12 @@ name = "lavina"
version = "0.1.0"
dependencies = [
"anyhow",
"assert_matches",
"figment",
"futures-util",
"http-body-util",
"hyper 1.0.0-rc.2",
"nom",
"prometheus",
"regex",
"reqwest",
@ -489,6 +497,12 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "mio"
version = "0.8.5"
@ -498,7 +512,17 @@ dependencies = [
"libc",
"log",
"wasi",
"windows-sys",
"windows-sys 0.42.0",
]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
@ -545,15 +569,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.6"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@ -605,9 +629,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.50"
version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6"
dependencies = [
"unicode-ident",
]
@ -718,7 +742,7 @@ dependencies = [
"h2",
"http",
"http-body 0.4.5",
"hyper 0.14.23",
"hyper 0.14.24",
"ipnet",
"js-sys",
"log",
@ -772,9 +796,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.91"
version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
checksum = "7434af0dc1cbd59268aa98b4c22c131c0584d2232f6fb166efb993e2832e896a"
dependencies = [
"itoa",
"ryu",
@ -898,9 +922,9 @@ dependencies = [
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
@ -919,7 +943,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
"windows-sys 0.42.0",
]
[[package]]
@ -1140,9 +1164,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268"
checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@ -1150,9 +1174,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142"
checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9"
dependencies = [
"bumpalo",
"log",
@ -1165,9 +1189,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.33"
version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d"
checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454"
dependencies = [
"cfg-if",
"js-sys",
@ -1177,9 +1201,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810"
checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@ -1187,9 +1211,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c"
checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6"
dependencies = [
"proc-macro2",
"quote",
@ -1200,15 +1224,15 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f"
checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d"
[[package]]
name = "web-sys"
version = "0.3.60"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f"
checksum = "e33b99f4b23ba3eec1a53ac264e35a755f00e966e0065077d6027c0f575b0b97"
dependencies = [
"js-sys",
"wasm-bindgen",
@ -1251,6 +1275,30 @@ dependencies = [
"windows_x86_64_msvc",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.1"

View File

@ -21,7 +21,9 @@ tokio-tungstenite = "0.18.0"
futures-util = "0.3.25"
prometheus = { version = "0.13.3", default_features = false }
regex = "1.7.1"
nom = "7.1.3"
[dev-dependencies]
assert_matches = "1.5.0"
regex = "1.7.1"
reqwest = { version = "0.11", default_features = false }

View File

@ -2,6 +2,7 @@ mod core;
mod http;
mod prelude;
mod projections;
mod protos;
mod tcp;
mod util;
@ -63,6 +64,10 @@ async fn main() -> Result<()> {
players.clone(),
)
.await?;
let irc_config = projections::irc::ServerConfig {
listen_on: "127.0.0.1:6667".parse()?,
};
let irc = projections::irc::launch(irc_config, players).await?;
tracing::info!("Started");
@ -70,6 +75,7 @@ async fn main() -> Result<()> {
// sleep.await;
tracing::info!("Begin shutdown");
irc.terminate().await?;
http_server_actor.terminate().await?;
tracing::info!("Shutdown complete");
Ok(())

85
src/projections/irc.rs Normal file
View File

@ -0,0 +1,85 @@
use std::net::SocketAddr;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot::channel;
use crate::core::player::{PlayerRegistry, Updates};
use crate::prelude::*;
use crate::protos::irc::*;
use crate::util::Terminator;
pub struct ServerConfig {
pub listen_on: SocketAddr,
}
async fn handle_socket(
mut stream: TcpStream,
socket_addr: SocketAddr,
mut players: PlayerRegistry,
) {
let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
let mut buffer = vec![];
// let (player_id, mut player_handle) = players.create_player().await;
// let mut sub = player_handle.subscribe().await;
loop {
select! {
biased;
res = reader.read_until(b'\n', &mut buffer) => {
match res {
Ok(len) => if len == 0 {
log::info!("Terminating socket");
break;
},
Err(err) => log::warn!("Failed to read from socket: {err}"),
}
let parsed = parse_client_message(&buffer[..]);
match parsed {
Ok((rest, msg)) => {
log::info!("Incoming IRC message: {msg:?}");
},
Err(err) => {
log::warn!("Failed to parse IRC message: {err}");
},
}
buffer.clear();
},
}
}
}
pub async fn launch(config: ServerConfig, players: PlayerRegistry) -> Result<Terminator> {
log::info!("Starting IRC projection");
let (signal, mut rx) = channel();
let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started");
let handle = tokio::task::spawn(async move {
loop {
select! {
biased;
_ = &mut rx => break,
new_conn = listener.accept() => {
match new_conn {
Ok((stream, socket_addr)) => {
log::debug!("Incoming connection from {socket_addr}");
let handle = tokio::task::spawn(handle_socket(stream, socket_addr, players.clone()));
},
Err(err) => log::warn!("Failed to accept new connection: {err}"),
}
},
}
}
log::info!("Stopping IRC projection");
Ok(())
});
let terminator = Terminator::from_raw(signal, handle);
Ok(terminator)
}

View File

@ -1,2 +1,3 @@
//! Protocol projections — implementations of public APIs.
pub mod irc;
pub mod trivial;

301
src/protos/irc/mod.rs Normal file
View File

@ -0,0 +1,301 @@
//! Client-to-Server IRC protocol.
use std::io::Write;
use nom::{
branch::alt,
bytes::complete::{tag, take, take_while},
IResult,
};
type ByteVec = Vec<u8>;
/// Server-to-client message.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ServerMessage {
/// Optional tags section, prefixed with `@`
tags: Vec<Tag>,
/// Optional server name, prefixed with `:`.
sender: Option<String>,
body: ServerMessageBody,
}
impl ServerMessage {
pub fn write(&self, writer: &mut impl Write) -> std::io::Result<()> {
self.body.write(writer)?;
writer.write(b"\n")?;
Ok(())
}
}
pub fn server_message(input: &[u8]) -> IResult<&[u8], ServerMessage> {
let (input, command) = server_message_body(input)?;
let (input, _) = tag(b"\n")(input)?;
let message = ServerMessage {
tags: vec![],
sender: None,
body: command,
};
Ok((input, message))
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ServerMessageBody {
Notice {
first_target: ByteVec,
rest_targets: Vec<ByteVec>,
text: ByteVec,
},
Ping {
token: ByteVec,
},
Pong {
from: ByteVec,
token: ByteVec,
},
}
impl ServerMessageBody {
pub fn write(&self, writer: &mut impl Write) -> std::io::Result<()> {
match self {
ServerMessageBody::Notice {
first_target,
rest_targets,
text,
} => {
writer.write(b"NOTICE ")?;
writer.write(&first_target)?;
writer.write(b" :")?;
writer.write(&text)?;
}
ServerMessageBody::Ping { token } => {
writer.write(b"PING ")?;
writer.write(&token)?;
}
ServerMessageBody::Pong { from, token } => {
writer.write(b"PONG ")?;
writer.write(&from)?;
writer.write(b" :")?;
writer.write(&token)?;
}
}
Ok(())
}
}
fn server_message_body(input: &[u8]) -> IResult<&[u8], ServerMessageBody> {
alt((
sserver_message_body_notice,
server_message_body_ping,
server_message_body_pong,
))(input)
}
fn sserver_message_body_notice(input: &[u8]) -> IResult<&[u8], ServerMessageBody> {
let (input, _) = tag("NOTICE ")(input)?;
let (input, first_target) = receiver(input)?;
let (input, _) = tag(" :")(input)?;
let (input, text) = token(input)?;
let first_target = first_target.to_owned();
let text = text.to_owned();
Ok((
input,
ServerMessageBody::Notice {
first_target,
rest_targets: vec![],
text,
},
))
}
fn server_message_body_ping(input: &[u8]) -> IResult<&[u8], ServerMessageBody> {
let (input, _) = tag("PING ")(input)?;
let (input, token) = token(input)?;
Ok((
input,
ServerMessageBody::Ping {
token: token.to_owned(),
},
))
}
fn server_message_body_pong(input: &[u8]) -> IResult<&[u8], ServerMessageBody> {
let (input, _) = tag("PONG ")(input)?;
let (input, from) = receiver(input)?;
let (input, _) = tag(" :")(input)?;
let (input, token) = token(input)?;
Ok((
input,
ServerMessageBody::Pong {
from: from.to_owned(),
token: token.to_owned(),
},
))
}
/// Single message tag value.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Tag {
key: ByteVec,
value: Option<u8>,
}
/// Client-to-server command.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClientMessage {
/// CAP. Capability-related commands.
Capability {
subcommand: CapabilitySubcommand,
},
/// PING
Ping {
token: ByteVec,
},
Pong {
from: ByteVec,
token: ByteVec,
},
}
fn receiver(input: &[u8]) -> IResult<&[u8], &[u8]> {
take_while(|i| i != b'\n' && i != b'\r' && i != b' ')(input)
}
fn token(input: &[u8]) -> IResult<&[u8], &[u8]> {
take_while(|i| i != b'\n' && i != b'\r')(input)
}
pub fn parse_client_message(input: &[u8]) -> IResult<&[u8], ClientMessage> {
alt((command_capability, command_ping))(input)
}
fn command_capability(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("CAP ")(input)?;
let (input, subcommand) = capability_subcommand(input)?;
Ok((input, ClientMessage::Capability { subcommand }))
}
fn command_ping(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("PING ")(input)?;
let (input, token) = token(input)?;
Ok((
input,
ClientMessage::Ping {
token: token.to_owned(),
},
))
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CapabilitySubcommand {
/// CAP LS {code}
List { code: [u8; 3] },
/// CAP END
End,
}
fn capability_subcommand(input: &[u8]) -> IResult<&[u8], CapabilitySubcommand> {
alt((capability_subcommand_ls, capability_subcommand_end))(input)
}
fn capability_subcommand_ls(input: &[u8]) -> IResult<&[u8], CapabilitySubcommand> {
let (input, _) = tag("LS ")(input)?;
let (input, code) = take(3usize)(input)?;
Ok((
input,
CapabilitySubcommand::List {
code: code.try_into().unwrap(),
},
))
}
fn capability_subcommand_end(input: &[u8]) -> IResult<&[u8], CapabilitySubcommand> {
let (input, _) = tag("END")(input)?;
Ok((input, CapabilitySubcommand::End))
}
#[cfg(test)]
mod test {
use assert_matches::*;
use super::*;
#[test]
fn test_server_message_notice() {
let input = b"NOTICE * :*** Looking up your hostname...\n";
let expected = ServerMessage {
tags: vec![],
sender: None,
body: ServerMessageBody::Notice {
first_target: b"*".to_vec(),
rest_targets: vec![],
text: b"*** Looking up your hostname...".to_vec(),
},
};
let result = server_message(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));
let mut bytes = vec![];
expected.write(&mut bytes).unwrap();
assert_eq!(bytes, input);
}
#[test]
fn test_server_message_pong() {
let input = b"PONG server.example :LAG004911\n";
let expected = ServerMessage {
tags: vec![],
sender: None,
body: ServerMessageBody::Pong {
from: b"server.example".to_vec(),
token: b"LAG004911".to_vec(),
},
};
let result = server_message(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));
let mut bytes = vec![];
expected.write(&mut bytes).unwrap();
assert_eq!(bytes, input);
}
#[test]
fn test_client_message_cap_ls() {
let input = b"CAP LS 302";
let expected = ClientMessage::Capability {
subcommand: CapabilitySubcommand::List { code: *b"302" },
};
let result = parse_client_message(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));
}
#[test]
fn test_client_message_cap_end() {
let input = b"CAP END";
let expected = ClientMessage::Capability {
subcommand: CapabilitySubcommand::End,
};
let result = parse_client_message(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));
}
#[test]
fn test_client_message_ping() {
let input = b"PING 1337";
let expected = ClientMessage::Ping {
token: b"1337".to_vec(),
};
let result = parse_client_message(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));
}
}

2
src/protos/mod.rs Normal file
View File

@ -0,0 +1,2 @@
//! Definitions of wire protocols to be used in implementations of projections.
pub mod irc;

View File

@ -1 +1,30 @@
use futures_util::FutureExt;
use tokio::sync::oneshot::{channel, Sender};
use crate::prelude::*;
pub mod table;
pub struct Terminator {
signal: Sender<()>,
completion: JoinHandle<Result<()>>,
}
impl Terminator {
pub fn from_raw(signal: Sender<()>, completion: JoinHandle<Result<()>>) -> Terminator {
Terminator { signal, completion }
}
pub fn new(completion: JoinHandle<Result<()>>) -> (Terminator, impl Future<Output = ()>) {
let (signal, rx) = channel();
(Terminator { signal, completion }, rx.map(|_| ()))
}
pub async fn terminate(self) -> Result<()> {
match self.signal.send(()) {
Ok(()) => {}
Err(_) => log::error!("Termination channel is dropped"),
}
self.completion.await??;
Ok(())
}
}