forked from lavina/lavina
				
			new irc integration tests
This commit is contained in:
		
							parent
							
								
									ac0b0dc962
								
							
						
					
					
						commit
						f09efa7147
					
				|  | @ -8,6 +8,11 @@ jobs: | |||
|       uses: actions/checkout@v4 | ||||
|     - name: setup rust | ||||
|       uses: https://github.com/actions-rs/toolchain@v1 | ||||
|     - name: check formatting | ||||
|       uses: https://github.com/actions-rs/cargo@v1 | ||||
|       with: | ||||
|         command: fmt | ||||
|         args: ["--check", "-p", "mgmt-api", "-p", "lavina"] | ||||
|     - name: cargo check | ||||
|       uses: https://github.com/actions-rs/cargo@v1 | ||||
|       with: | ||||
|  |  | |||
|  | @ -0,0 +1,2 @@ | |||
| max_width = 120 | ||||
| chain_width = 120 | ||||
|  | @ -104,7 +104,7 @@ async fn handle_registration<'a>( | |||
|                     log::info!("Terminating socket"); | ||||
|                     break Err(anyhow::Error::msg("EOF")); | ||||
|                 } | ||||
|                 match std::str::from_utf8(&buffer[..len-2]) { | ||||
|                 match std::str::from_utf8(&buffer[..len - 2]) { | ||||
|                     Ok(res) => res, | ||||
|                     Err(e) => break Err(e.into()), | ||||
|                 } | ||||
|  | @ -292,7 +292,9 @@ async fn handle_registered_socket<'a>( | |||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| async fn read_irc_message(reader: &mut BufReader<ReadHalf<'_>>, buf: &mut Vec<u8>) -> Result<usize> { | ||||
| // TODO this is public only for tests, perhaps move this into proto-irc
 | ||||
| // TODO limit buffer size in size to protect against dos attacks with large payloads
 | ||||
| pub async fn read_irc_message(reader: &mut BufReader<ReadHalf<'_>>, buf: &mut Vec<u8>) -> Result<usize> { | ||||
|     let mut size = 0; | ||||
|     'outer: loop { | ||||
|         let res = reader.read_until(b'\r', buf).await?; | ||||
|  | @ -300,8 +302,8 @@ async fn read_irc_message(reader: &mut BufReader<ReadHalf<'_>>, buf: &mut Vec<u8 | |||
|         let next = reader.read_u8().await?; | ||||
|         buf.push(next); | ||||
|         size += 1; | ||||
|         if next != b'\n' { 
 | ||||
|             continue 'outer; 
 | ||||
|         if next != b'\n' { | ||||
|             continue 'outer; | ||||
|         } | ||||
|         return Ok(size); | ||||
|     } | ||||
|  | @ -668,11 +670,8 @@ async fn produce_on_join_cmd_messages( | |||
|     } | ||||
|     .write_async(writer) | ||||
|     .await?; | ||||
|     let prefixed_members: Vec<PrefixedNick> = room_info | ||||
|         .members | ||||
|         .iter() | ||||
|         .map(|member| PrefixedNick::from_str(member.clone().into_inner())) | ||||
|         .collect(); | ||||
|     let prefixed_members: Vec<PrefixedNick> = | ||||
|         room_info.members.iter().map(|member| PrefixedNick::from_str(member.clone().into_inner())).collect(); | ||||
|     let non_empty_members: NonEmpty<PrefixedNick> = | ||||
|         NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); | ||||
| 
 | ||||
|  | @ -700,13 +699,24 @@ async fn produce_on_join_cmd_messages( | |||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| pub struct RunningServer { | ||||
|     pub addr: SocketAddr, | ||||
|     terminator: Terminator, | ||||
| } | ||||
| 
 | ||||
| impl RunningServer { | ||||
|     pub async fn terminate(self) -> Result<()> { | ||||
|         self.terminator.terminate().await | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub async fn launch( | ||||
|     config: ServerConfig, | ||||
|     players: PlayerRegistry, | ||||
|     rooms: RoomRegistry, | ||||
|     metrics: MetricsRegistry, | ||||
|     storage: Storage, | ||||
| ) -> Result<Terminator> { | ||||
| ) -> Result<RunningServer> { | ||||
|     log::info!("Starting IRC projection"); | ||||
|     let (stopped_tx, mut stopped_rx) = channel(32); | ||||
|     let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; | ||||
|  | @ -715,6 +725,7 @@ pub async fn launch( | |||
|     metrics.register(Box::new(total_connections.clone()))?; | ||||
| 
 | ||||
|     let listener = TcpListener::bind(config.listen_on).await?; | ||||
|     let addr = listener.local_addr()?; | ||||
| 
 | ||||
|     let terminator = Terminator::spawn(|mut rx| async move { | ||||
|         // TODO probably should separate logic for accepting new connection and storing them
 | ||||
|  | @ -782,5 +793,5 @@ pub async fn launch( | |||
|     }); | ||||
| 
 | ||||
|     log::info!("Started IRC projection"); | ||||
|     Ok(terminator) | ||||
|     Ok(RunningServer { addr, terminator }) | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,92 @@ | |||
| use std::time::Duration; | ||||
| 
 | ||||
| use anyhow::Result; | ||||
| use lavina_core::repo::{Storage, StorageConfig}; | ||||
| use lavina_core::{player::PlayerRegistry, room::RoomRegistry}; | ||||
| use projection_irc::{launch, read_irc_message, ServerConfig}; | ||||
| use prometheus::Registry as MetricsRegistry; | ||||
| use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; | ||||
| use tokio::net::tcp::{ReadHalf, WriteHalf}; | ||||
| use tokio::net::TcpStream; | ||||
| 
 | ||||
| struct TestScope<'a> { | ||||
|     reader: BufReader<ReadHalf<'a>>, | ||||
|     writer: WriteHalf<'a>, | ||||
|     buffer: Vec<u8>, | ||||
|     pub timeout: Duration, | ||||
| } | ||||
| 
 | ||||
| impl<'a> TestScope<'a> { | ||||
|     fn new(stream: &mut TcpStream) -> TestScope<'_> { | ||||
|         let (reader, writer) = stream.split(); | ||||
|         let reader = BufReader::new(reader); | ||||
|         TestScope { | ||||
|             reader, | ||||
|             writer, | ||||
|             buffer: vec![], | ||||
|             timeout: Duration::from_millis(100), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     async fn send(&mut self, str: &(impl AsRef<str> + ?Sized)) -> Result<()> { | ||||
|         self.writer.write_all(str.as_ref().as_bytes()).await?; | ||||
|         self.writer.write_all(b"\r\n").await?; | ||||
|         self.writer.flush().await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     async fn expect(&mut self, str: &str) -> Result<()> { | ||||
|         let len = tokio::time::timeout(self.timeout, read_irc_message(&mut self.reader, &mut self.buffer)).await??; | ||||
|         assert_eq!(std::str::from_utf8(&self.buffer[..len - 2])?, str); | ||||
|         self.buffer.clear(); | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     async fn assert(&mut self, f: impl FnOnce(&str) -> Result<()>) -> Result<()> { | ||||
|         let len = tokio::time::timeout(self.timeout, read_irc_message(&mut self.reader, &mut self.buffer)).await??; | ||||
|         let res = f(std::str::from_utf8(&self.buffer[..len - 2])?); | ||||
|         self.buffer.clear(); | ||||
|         res | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn scenario_basic() -> Result<()> { | ||||
|     let config = ServerConfig { | ||||
|         listen_on: "127.0.0.1:0".parse().unwrap(), | ||||
|         server_name: "testserver".into(), | ||||
|     }; | ||||
|     let mut metrics = MetricsRegistry::new(); | ||||
|     let mut storage = Storage::open(StorageConfig { | ||||
|         db_path: ":memory:".into(), | ||||
|     }) | ||||
|     .await?; | ||||
|     let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap(); | ||||
|     let players = PlayerRegistry::empty(rooms.clone(), &mut metrics).unwrap(); | ||||
|     let server = launch(config, players, rooms, metrics, storage.clone()).await.unwrap(); | ||||
| 
 | ||||
|     // test scenario
 | ||||
| 
 | ||||
|     storage.create_user("tester").await?; | ||||
|     storage.set_password("tester", "password").await?; | ||||
| 
 | ||||
|     let mut stream = TcpStream::connect(server.addr).await?; | ||||
|     let mut s = TestScope::new(&mut stream); | ||||
| 
 | ||||
|     s.expect(":testserver NOTICE * :Welcome to my server!").await?; | ||||
|     s.send("PASS password").await?; | ||||
|     s.send("NICK tester").await?; | ||||
|     s.send("USER UserName 0 * :Real Name").await?; | ||||
|     s.expect(":testserver 001 tester :Welcome to Kek Server").await?; | ||||
|     s.expect(":testserver 002 tester :Welcome to Kek Server").await?; | ||||
|     s.expect(":testserver 003 tester :Welcome to Kek Server").await?; | ||||
|     s.expect(":testserver 004 tester testserver kek-0.1.alpha.3 r CFILPQbcefgijklmnopqrstvz").await?; | ||||
|     s.expect(":testserver 005 tester CHANTYPES=# :are supported by this server").await?; | ||||
| 
 | ||||
|     stream.shutdown().await?; | ||||
| 
 | ||||
|     // wrap up
 | ||||
| 
 | ||||
|     server.terminate().await.unwrap(); | ||||
|     Ok(()) | ||||
| } | ||||
		Loading…
	
		Reference in New Issue