diff --git a/.gitea/workflows/test-pr.yml b/.gitea/workflows/test-pr.yml index 3fabe6a..3cce6b1 100644 --- a/.gitea/workflows/test-pr.yml +++ b/.gitea/workflows/test-pr.yml @@ -8,6 +8,11 @@ jobs: uses: actions/checkout@v4 - name: setup rust uses: https://github.com/actions-rs/toolchain@v1 + - name: check formatting + uses: https://github.com/actions-rs/cargo@v1 + with: + command: fmt + args: ["--check", "-p", "mgmt-api", "-p", "lavina"] - name: cargo check uses: https://github.com/actions-rs/cargo@v1 with: diff --git a/crates/projection-irc/rustfmt.toml b/crates/projection-irc/rustfmt.toml new file mode 100644 index 0000000..5a3cea5 --- /dev/null +++ b/crates/projection-irc/rustfmt.toml @@ -0,0 +1,2 @@ +max_width = 120 +chain_width = 120 diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 71b0b23..0851c54 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -104,7 +104,7 @@ async fn handle_registration<'a>( log::info!("Terminating socket"); break Err(anyhow::Error::msg("EOF")); } - match std::str::from_utf8(&buffer[..len-2]) { + match std::str::from_utf8(&buffer[..len - 2]) { Ok(res) => res, Err(e) => break Err(e.into()), } @@ -292,7 +292,9 @@ async fn handle_registered_socket<'a>( Ok(()) } -async fn read_irc_message(reader: &mut BufReader>, buf: &mut Vec) -> Result { +// TODO this is public only for tests, perhaps move this into proto-irc +// TODO limit buffer size in size to protect against dos attacks with large payloads +pub async fn read_irc_message(reader: &mut BufReader>, buf: &mut Vec) -> Result { let mut size = 0; 'outer: loop { let res = reader.read_until(b'\r', buf).await?; @@ -300,8 +302,8 @@ async fn read_irc_message(reader: &mut BufReader>, buf: &mut Vec = room_info - .members - .iter() - .map(|member| PrefixedNick::from_str(member.clone().into_inner())) - .collect(); + let prefixed_members: Vec = + room_info.members.iter().map(|member| PrefixedNick::from_str(member.clone().into_inner())).collect(); let non_empty_members: NonEmpty = NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); @@ -700,13 +699,24 @@ async fn produce_on_join_cmd_messages( Ok(()) } +pub struct RunningServer { + pub addr: SocketAddr, + terminator: Terminator, +} + +impl RunningServer { + pub async fn terminate(self) -> Result<()> { + self.terminator.terminate().await + } +} + pub async fn launch( config: ServerConfig, players: PlayerRegistry, rooms: RoomRegistry, metrics: MetricsRegistry, storage: Storage, -) -> Result { +) -> Result { log::info!("Starting IRC projection"); let (stopped_tx, mut stopped_rx) = channel(32); let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; @@ -715,6 +725,7 @@ pub async fn launch( metrics.register(Box::new(total_connections.clone()))?; let listener = TcpListener::bind(config.listen_on).await?; + let addr = listener.local_addr()?; let terminator = Terminator::spawn(|mut rx| async move { // TODO probably should separate logic for accepting new connection and storing them @@ -782,5 +793,5 @@ pub async fn launch( }); log::info!("Started IRC projection"); - Ok(terminator) + Ok(RunningServer { addr, terminator }) } diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs new file mode 100644 index 0000000..9dc0f65 --- /dev/null +++ b/crates/projection-irc/tests/lib.rs @@ -0,0 +1,92 @@ +use std::time::Duration; + +use anyhow::Result; +use lavina_core::repo::{Storage, StorageConfig}; +use lavina_core::{player::PlayerRegistry, room::RoomRegistry}; +use projection_irc::{launch, read_irc_message, ServerConfig}; +use prometheus::Registry as MetricsRegistry; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::net::tcp::{ReadHalf, WriteHalf}; +use tokio::net::TcpStream; + +struct TestScope<'a> { + reader: BufReader>, + writer: WriteHalf<'a>, + buffer: Vec, + pub timeout: Duration, +} + +impl<'a> TestScope<'a> { + fn new(stream: &mut TcpStream) -> TestScope<'_> { + let (reader, writer) = stream.split(); + let reader = BufReader::new(reader); + TestScope { + reader, + writer, + buffer: vec![], + timeout: Duration::from_millis(100), + } + } + + async fn send(&mut self, str: &(impl AsRef + ?Sized)) -> Result<()> { + self.writer.write_all(str.as_ref().as_bytes()).await?; + self.writer.write_all(b"\r\n").await?; + self.writer.flush().await?; + Ok(()) + } + + async fn expect(&mut self, str: &str) -> Result<()> { + let len = tokio::time::timeout(self.timeout, read_irc_message(&mut self.reader, &mut self.buffer)).await??; + assert_eq!(std::str::from_utf8(&self.buffer[..len - 2])?, str); + self.buffer.clear(); + Ok(()) + } + + async fn assert(&mut self, f: impl FnOnce(&str) -> Result<()>) -> Result<()> { + let len = tokio::time::timeout(self.timeout, read_irc_message(&mut self.reader, &mut self.buffer)).await??; + let res = f(std::str::from_utf8(&self.buffer[..len - 2])?); + self.buffer.clear(); + res + } +} + +#[tokio::test] +async fn scenario_basic() -> Result<()> { + let config = ServerConfig { + listen_on: "127.0.0.1:0".parse().unwrap(), + server_name: "testserver".into(), + }; + let mut metrics = MetricsRegistry::new(); + let mut storage = Storage::open(StorageConfig { + db_path: ":memory:".into(), + }) + .await?; + let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap(); + let players = PlayerRegistry::empty(rooms.clone(), &mut metrics).unwrap(); + let server = launch(config, players, rooms, metrics, storage.clone()).await.unwrap(); + + // test scenario + + storage.create_user("tester").await?; + storage.set_password("tester", "password").await?; + + let mut stream = TcpStream::connect(server.addr).await?; + let mut s = TestScope::new(&mut stream); + + s.expect(":testserver NOTICE * :Welcome to my server!").await?; + s.send("PASS password").await?; + s.send("NICK tester").await?; + s.send("USER UserName 0 * :Real Name").await?; + s.expect(":testserver 001 tester :Welcome to Kek Server").await?; + s.expect(":testserver 002 tester :Welcome to Kek Server").await?; + s.expect(":testserver 003 tester :Welcome to Kek Server").await?; + s.expect(":testserver 004 tester testserver kek-0.1.alpha.3 r CFILPQbcefgijklmnopqrstvz").await?; + s.expect(":testserver 005 tester CHANTYPES=# :are supported by this server").await?; + + stream.shutdown().await?; + + // wrap up + + server.terminate().await.unwrap(); + Ok(()) +}