diff --git a/Cargo.lock b/Cargo.lock index 80cfa2b..4137ee4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "0.7.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +dependencies = [ + "memchr", +] + [[package]] name = "anyhow" version = "1.0.68" @@ -10,9 +19,9 @@ checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" [[package]] name = "async-trait" -version = "0.1.63" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2", "quote", @@ -40,6 +49,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" + [[package]] name = "bitflags" version = "1.3.2" @@ -55,6 +70,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" + [[package]] name = "byteorder" version = "1.4.3" @@ -102,6 +123,15 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "figment" version = "0.10.8" @@ -133,24 +163,24 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" +checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" [[package]] name = "futures-macro" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" dependencies = [ "proc-macro2", "quote", @@ -159,21 +189,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" +checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" [[package]] name = "futures-task" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" +checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" [[package]] name = "futures-util" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" +checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" dependencies = [ "futures-core", "futures-macro", @@ -205,6 +235,31 @@ dependencies = [ "wasi", ] +[[package]] +name = "h2" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hermit-abi" version = "0.2.6" @@ -225,6 +280,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.0-rc.2" @@ -244,7 +310,7 @@ dependencies = [ "bytes", "futures-util", "http", - "http-body", + "http-body 1.0.0-rc.2", "pin-project-lite", ] @@ -260,6 +326,30 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "hyper" +version = "0.14.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body 0.4.5", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.0.0-rc.2" @@ -271,7 +361,7 @@ dependencies = [ "futures-core", "futures-util", "http", - "http-body", + "http-body 1.0.0-rc.2", "httparse", "httpdate", "itoa", @@ -291,18 +381,43 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "inlinable_string" version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "ipnet" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30e22bd8629359895450b59ea7a776c850561b96a3b1d31321c1949d9e6c9146" + [[package]] name = "itoa" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" +[[package]] +name = "js-sys" +version = "0.3.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lavina" version = "0.1.0" @@ -311,8 +426,10 @@ dependencies = [ "figment", "futures-util", "http-body-util", - "hyper", + "hyper 1.0.0-rc.2", "prometheus", + "regex", + "reqwest", "serde", "tokio", "tokio-tungstenite", @@ -366,6 +483,12 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "mio" version = "0.8.5" @@ -564,6 +687,63 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" + +[[package]] +name = "reqwest" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21eed90ec8570952d53b772ecf8f206aa1ec9a3d76b2521c56c42973f2d91ee9" +dependencies = [ + "base64 0.21.0", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body 0.4.5", + "hyper 0.14.23", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + +[[package]] +name = "ryu" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" + [[package]] name = "scopeguard" version = "1.1.0" @@ -590,6 +770,29 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.10.5" @@ -701,9 +904,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.24.2" +version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" +checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" dependencies = [ "autocfg", "bytes", @@ -742,6 +945,20 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-util" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "toml" version = "0.5.11" @@ -751,6 +968,12 @@ dependencies = [ "serde", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.37" @@ -821,7 +1044,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" dependencies = [ - "base64", + "base64 0.13.1", "byteorder", "bytes", "http", @@ -915,6 +1138,82 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" + +[[package]] +name = "web-sys" +version = "0.3.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" @@ -994,6 +1293,15 @@ version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "yansi" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index c7b24d8..0fdcccf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,7 @@ tracing-subscriber = "0.3.16" tokio-tungstenite = "0.18.0" futures-util = "0.3.25" prometheus = { version = "0.13.3", default_features = false } + +[dev-dependencies] +regex = "1.7.1" +reqwest = { version = "0.11", default_features = false } diff --git a/src/chat.rs b/src/chat.rs index 09570a7..926f651 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -3,7 +3,7 @@ use std::{ sync::{Arc, RwLock}, }; -use tokio::sync::mpsc::{Sender, channel, Receiver}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::prelude::*; @@ -17,7 +17,10 @@ pub struct Chats { impl Chats { pub fn new() -> Chats { let subscriptions = HashMap::new(); - let chats_inner = ChatsInner { subscriptions, next_sub: 0 }; + let chats_inner = ChatsInner { + subscriptions, + next_sub: 0, + }; let inner = Arc::new(RwLock::new(chats_inner)); Chats { inner } } @@ -26,7 +29,7 @@ impl Chats { let mut inner = self.inner.write().unwrap(); let sub_id = inner.next_sub; inner.next_sub += 1; - let (rx,tx) = channel(32); + let (rx, tx) = channel(32); inner.subscriptions.insert(sub_id, rx); (sub_id, tx) } @@ -42,7 +45,7 @@ impl Chats { } Ok(()) } - + pub fn remove_sub(&self, sub_id: UserId) { let mut inner = self.inner.write().unwrap(); inner.subscriptions.remove(&sub_id); diff --git a/src/http.rs b/src/http.rs index dc6808c..e5be24a 100644 --- a/src/http.rs +++ b/src/http.rs @@ -47,7 +47,9 @@ async fn route( ) -> std::result::Result, Infallible> { match (request.method(), request.uri().path()) { (&Method::GET, "/hello") => Ok(hello(request).await?.map(BodyExt::boxed)), - (&Method::GET, "/socket") => Ok(ws::handle_request(request, chats).await?.map(BodyExt::boxed)), + (&Method::GET, "/socket") => Ok(ws::handle_request(request, chats) + .await? + .map(BodyExt::boxed)), (&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)), _ => Ok(not_found()?.map(BodyExt::boxed)), } @@ -58,7 +60,11 @@ pub struct HttpServerActor { fiber: JoinHandle>, } impl HttpServerActor { - pub async fn launch(listener: TcpListener, metrics: Registry, chats: Chats) -> Result { + pub async fn launch( + listener: TcpListener, + metrics: Registry, + chats: Chats, + ) -> Result { let (terminator, receiver) = tokio::sync::oneshot::channel::<()>(); let fiber = tokio::task::spawn(Self::main_loop(listener, receiver, metrics, chats)); Ok(HttpServerActor { terminator, fiber }) diff --git a/src/http/ws.rs b/src/http/ws.rs index 7b64d94..41d70ab 100644 --- a/src/http/ws.rs +++ b/src/http/ws.rs @@ -1,4 +1,3 @@ -use futures_util::TryStreamExt; use http_body_util::Empty; use hyper::body::Incoming; use hyper::header::{ @@ -20,67 +19,63 @@ use futures_util::stream::StreamExt; use crate::chat::Chats; -async fn handle_connection(ws_stream: WebSocketStream, chats: Chats) { +async fn handle_connection(mut ws_stream: WebSocketStream, chats: Chats) { tracing::info!("WebSocket connection established"); - let (mut outgoing, incoming) = ws_stream.split(); - let (sub_id, mut sub) = chats.new_sub(); tracing::info!("New conn id: {sub_id}"); let _ = chats.broadcast(format!("{sub_id} joined").as_str()).await; - let broadcast_incoming = async { - tracing::info!("Started incoming stream for {sub_id}"); - let res = incoming.try_for_each(|msg| { - let txt = msg.to_text().unwrap().to_string(); - let chats = chats.clone(); - async move { - tracing::info!("Received a message: {}, sub_id={}", txt, sub_id); - match chats.broadcast(format!("{sub_id}: {txt}").as_str()).await { - Ok(_) => {}, - Err(err) => { - tracing::error!("Failed to broadcast a message from sub_id={sub_id}: {err}"); - }, - } - Ok(()) - } - }).await; - tracing::info!("Stopped incoming stream for {}", sub_id); - res - }; - - - outgoing + ws_stream .send(Message::Text("Started a connection!".into())) .await .unwrap(); - let outgoing = async { - tracing::info!("Started outgoing stream for {sub_id}"); - while let Some(msg) = sub.recv().await { - match outgoing.send(Message::Text(msg)).await { - Ok(_) => {}, - Err(err) => { - tracing::warn!("Failed to send msg, sub_id={sub_id}: {err}"); - break; - }, + tracing::info!("Started stream for {sub_id}"); + loop { + tokio::select! { + biased; + msg = ws_stream.next() => { + match msg { + Some(msg) => { + let msg = msg.unwrap(); + let txt = msg.to_text().unwrap().to_string(); + tracing::info!("Received a message: {txt}, sub_id={sub_id}"); + match chats.broadcast(format!("{sub_id}: {txt}").as_str()).await { + Ok(_) => {}, + Err(err) => { + tracing::error!("Failed to broadcast a message from sub_id={sub_id}: {err}"); + }, + } + }, + None => { + tracing::info!("Client {sub_id} closed the socket, stopping.."); + break; + }, + } + }, + msg = sub.recv() => { + match msg { + Some(msg) => { + match ws_stream.send(Message::Text(msg)).await { + Ok(_) => {}, + Err(err) => { + tracing::warn!("Failed to send msg, sub_id={sub_id}: {err}"); + break; + }, + } + }, + None => { + break; + } + } } } - tracing::info!("Stopped outgoing stream for {}", sub_id); - }; - - let (broadcast_incoming, _) = tokio::join!(broadcast_incoming, outgoing); - - let _ = chats.broadcast(format!("{sub_id} left").as_str()).await; - - - match broadcast_incoming { - Ok(_) => tracing::info!("Disconnected"), - Err(err) => tracing::warn!("Socket failed: {err}"), } - tracing::info!("Terminating WS connection {sub_id}"); + tracing::info!("Ended stream for {sub_id}"); chats.remove_sub(sub_id); + let _ = chats.broadcast(format!("{sub_id} left").as_str()).await; } pub async fn handle_request( diff --git a/src/main.rs b/src/main.rs index bfcbb56..49daf29 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,7 +52,8 @@ async fn main() -> Result<()> { let listener = TcpListener::bind("127.0.0.1:3721").await?; let listener_http = TcpListener::bind("127.0.0.1:8080").await?; - let http_server_actor = http::HttpServerActor::launch(listener_http, registry.clone(), chats.clone()).await?; + let http_server_actor = + http::HttpServerActor::launch(listener_http, registry.clone(), chats.clone()).await?; tracing::info!("Started"); diff --git a/tests/mod.rs b/tests/mod.rs new file mode 100644 index 0000000..78fb04e --- /dev/null +++ b/tests/mod.rs @@ -0,0 +1,61 @@ +use futures_util::{SinkExt, StreamExt}; +use hyper::StatusCode; +use regex::Regex; +use tokio_tungstenite::{connect_async, tungstenite::Message}; + +type Test = Result<(), Box>; + +#[tokio::test] +async fn hello_endpoint() -> Test { + let resp = reqwest::get("http://localhost:8080/hello").await?; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(resp.text().await?, "Hello World!"); + Ok(()) +} + +#[tokio::test] +async fn websocket_connect() -> Test { + let connected = Regex::new(r"^(\d+) joined$").unwrap(); + let msg = Regex::new(r"^(\d+): (.*)").unwrap(); + + let (mut socket, response) = connect_async("ws://localhost:8080/socket").await?; + + assert_eq!(response.status(), StatusCode::SWITCHING_PROTOCOLS); + + { + let resp = socket.next().await.unwrap().unwrap(); + let resp = resp.to_text()?; + assert_eq!(resp, "Started a connection!"); + } + + let id = { + let resp = socket.next().await.unwrap().unwrap(); + let resp = resp.to_text()?; + let res = connected.captures(resp).unwrap(); + res.get(1).unwrap().as_str().to_string() + }; + + socket.send(Message::Text("hi!".to_string())).await?; + socket.flush().await?; + + { + let resp = socket.next().await.unwrap().unwrap(); + let resp = resp.to_text()?; + let res = msg.captures(resp).expect(resp); + let new_id = res.get(1).unwrap().as_str(); + let new_msg = res.get(2).unwrap().as_str(); + assert_eq!(new_id, id); + assert_eq!(new_msg, "hi!"); + } + + socket.close(None).await?; + + { + let resp = socket.next().await.unwrap().unwrap(); + assert_eq!(resp, Message::Close(None)); + } + + assert!(socket.next().await.is_none()); + + Ok(()) +}