new irc integration tests (#25)

This commit is contained in:
Nikita Vilunov 2023-10-09 11:35:41 +00:00
parent ac0b0dc962
commit f0b38545bf
4 changed files with 118 additions and 11 deletions

View File

@ -8,6 +8,11 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: setup rust - name: setup rust
uses: https://github.com/actions-rs/toolchain@v1 uses: https://github.com/actions-rs/toolchain@v1
- name: check formatting
uses: https://github.com/actions-rs/cargo@v1
with:
command: fmt
args: "--check -p mgmt-api -p lavina-core -p projection-irc"
- name: cargo check - name: cargo check
uses: https://github.com/actions-rs/cargo@v1 uses: https://github.com/actions-rs/cargo@v1
with: with:
@ -16,3 +21,4 @@ jobs:
uses: https://github.com/actions-rs/cargo@v1 uses: https://github.com/actions-rs/cargo@v1
with: with:
command: test command: test
args: "--all"

View File

@ -0,0 +1,2 @@
max_width = 120
chain_width = 120

View File

@ -104,7 +104,7 @@ async fn handle_registration<'a>(
log::info!("Terminating socket"); log::info!("Terminating socket");
break Err(anyhow::Error::msg("EOF")); break Err(anyhow::Error::msg("EOF"));
} }
match std::str::from_utf8(&buffer[..len-2]) { match std::str::from_utf8(&buffer[..len - 2]) {
Ok(res) => res, Ok(res) => res,
Err(e) => break Err(e.into()), Err(e) => break Err(e.into()),
} }
@ -292,7 +292,9 @@ async fn handle_registered_socket<'a>(
Ok(()) Ok(())
} }
async fn read_irc_message(reader: &mut BufReader<ReadHalf<'_>>, buf: &mut Vec<u8>) -> Result<usize> { // TODO this is public only for tests, perhaps move this into proto-irc
// TODO limit buffer size in size to protect against dos attacks with large payloads
pub async fn read_irc_message(reader: &mut BufReader<ReadHalf<'_>>, buf: &mut Vec<u8>) -> Result<usize> {
let mut size = 0; let mut size = 0;
'outer: loop { 'outer: loop {
let res = reader.read_until(b'\r', buf).await?; let res = reader.read_until(b'\r', buf).await?;
@ -300,8 +302,8 @@ async fn read_irc_message(reader: &mut BufReader<ReadHalf<'_>>, buf: &mut Vec<u8
let next = reader.read_u8().await?; let next = reader.read_u8().await?;
buf.push(next); buf.push(next);
size += 1; size += 1;
if next != b'\n' { if next != b'\n' {
continue 'outer; continue 'outer;
} }
return Ok(size); return Ok(size);
} }
@ -668,11 +670,8 @@ async fn produce_on_join_cmd_messages(
} }
.write_async(writer) .write_async(writer)
.await?; .await?;
let prefixed_members: Vec<PrefixedNick> = room_info let prefixed_members: Vec<PrefixedNick> =
.members room_info.members.iter().map(|member| PrefixedNick::from_str(member.clone().into_inner())).collect();
.iter()
.map(|member| PrefixedNick::from_str(member.clone().into_inner()))
.collect();
let non_empty_members: NonEmpty<PrefixedNick> = let non_empty_members: NonEmpty<PrefixedNick> =
NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]);
@ -700,13 +699,24 @@ async fn produce_on_join_cmd_messages(
Ok(()) Ok(())
} }
pub struct RunningServer {
pub addr: SocketAddr,
terminator: Terminator,
}
impl RunningServer {
pub async fn terminate(self) -> Result<()> {
self.terminator.terminate().await
}
}
pub async fn launch( pub async fn launch(
config: ServerConfig, config: ServerConfig,
players: PlayerRegistry, players: PlayerRegistry,
rooms: RoomRegistry, rooms: RoomRegistry,
metrics: MetricsRegistry, metrics: MetricsRegistry,
storage: Storage, storage: Storage,
) -> Result<Terminator> { ) -> Result<RunningServer> {
log::info!("Starting IRC projection"); log::info!("Starting IRC projection");
let (stopped_tx, mut stopped_rx) = channel(32); let (stopped_tx, mut stopped_rx) = channel(32);
let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?;
@ -715,6 +725,7 @@ pub async fn launch(
metrics.register(Box::new(total_connections.clone()))?; metrics.register(Box::new(total_connections.clone()))?;
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
let addr = listener.local_addr()?;
let terminator = Terminator::spawn(|mut rx| async move { let terminator = Terminator::spawn(|mut rx| async move {
// TODO probably should separate logic for accepting new connection and storing them // TODO probably should separate logic for accepting new connection and storing them
@ -782,5 +793,5 @@ pub async fn launch(
}); });
log::info!("Started IRC projection"); log::info!("Started IRC projection");
Ok(terminator) Ok(RunningServer { addr, terminator })
} }

View File

@ -0,0 +1,88 @@
use std::time::Duration;
use anyhow::Result;
use prometheus::Registry as MetricsRegistry;
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::{player::PlayerRegistry, room::RoomRegistry};
use projection_irc::{launch, read_irc_message, ServerConfig};
struct TestScope<'a> {
reader: BufReader<ReadHalf<'a>>,
writer: WriteHalf<'a>,
buffer: Vec<u8>,
pub timeout: Duration,
}
impl<'a> TestScope<'a> {
fn new(stream: &mut TcpStream) -> TestScope<'_> {
let (reader, writer) = stream.split();
let reader = BufReader::new(reader);
let buffer = vec![];
let timeout = Duration::from_millis(100);
TestScope {
reader,
writer,
buffer,
timeout,
}
}
async fn send(&mut self, str: &(impl AsRef<str> + ?Sized)) -> Result<()> {
self.writer.write_all(str.as_ref().as_bytes()).await?;
self.writer.write_all(b"\r\n").await?;
self.writer.flush().await?;
Ok(())
}
async fn expect(&mut self, str: &str) -> Result<()> {
let len = tokio::time::timeout(self.timeout, read_irc_message(&mut self.reader, &mut self.buffer)).await??;
assert_eq!(std::str::from_utf8(&self.buffer[..len - 2])?, str);
self.buffer.clear();
Ok(())
}
}
#[tokio::test]
async fn scenario_basic() -> Result<()> {
let config = ServerConfig {
listen_on: "127.0.0.1:0".parse().unwrap(),
server_name: "testserver".into(),
};
let mut metrics = MetricsRegistry::new();
let mut storage = Storage::open(StorageConfig {
db_path: ":memory:".into(),
})
.await?;
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
let players = PlayerRegistry::empty(rooms.clone(), &mut metrics).unwrap();
let server = launch(config, players, rooms, metrics, storage.clone()).await.unwrap();
// test scenario
storage.create_user("tester").await?;
storage.set_password("tester", "password").await?;
let mut stream = TcpStream::connect(server.addr).await?;
let mut s = TestScope::new(&mut stream);
s.send("PASS password").await?;
s.send("NICK tester").await?;
s.send("USER UserName 0 * :Real Name").await?;
s.expect(":testserver NOTICE * :Welcome to my server!").await?;
s.expect(":testserver 001 tester :Welcome to Kek Server").await?;
s.expect(":testserver 002 tester :Welcome to Kek Server").await?;
s.expect(":testserver 003 tester :Welcome to Kek Server").await?;
s.expect(":testserver 004 tester testserver kek-0.1.alpha.3 r CFILPQbcefgijklmnopqrstvz").await?;
s.expect(":testserver 005 tester CHANTYPES=# :are supported by this server").await?;
stream.shutdown().await?;
// wrap up
server.terminate().await.unwrap();
Ok(())
}