From 31ffe41929d856384fb4f282db6d41f98dce73b7 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Wed, 25 Jan 2023 16:50:14 +0400 Subject: [PATCH] websocket basic implementation --- Cargo.lock | 278 +++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + src/http.rs | 12 ++- src/http/ws.rs | 108 +++++++++++++++++++ 4 files changed, 396 insertions(+), 4 deletions(-) create mode 100644 src/http/ws.rs diff --git a/Cargo.lock b/Cargo.lock index 4daf179..0854670 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,12 +34,33 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "bitflags" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "block-buffer" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e" +dependencies = [ + "generic-array", +] + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.3.0" @@ -52,6 +73,35 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cpufeatures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "figment" version = "0.10.8" @@ -72,6 +122,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures-channel" version = "0.3.25" @@ -87,6 +146,23 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +[[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" + [[package]] name = "futures-task" version = "0.3.25" @@ -100,9 +176,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-core", + "futures-macro", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +dependencies = [ + "cfg-if", + "libc", + "wasi", ] [[package]] @@ -181,6 +281,16 @@ dependencies = [ "want", ] +[[package]] +name = "idna" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "inlinable_string" version = "0.1.15" @@ -199,10 +309,12 @@ version = "0.1.0" dependencies = [ "anyhow", "figment", + "futures-util", "http-body-util", "hyper", "serde", "tokio", + "tokio-tungstenite", "tracing", "tracing-subscriber", ] @@ -343,6 +455,12 @@ dependencies = [ "syn", ] +[[package]] +name = "percent-encoding" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -355,6 +473,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.50" @@ -386,6 +510,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -421,6 +575,17 @@ dependencies = [ "syn", ] +[[package]] +name = "sha1" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -439,6 +604,15 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.10.0" @@ -466,6 +640,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.4" @@ -475,6 +669,21 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" + [[package]] name = "tokio" version = "1.24.2" @@ -506,6 +715,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "toml" version = "0.5.11" @@ -579,6 +800,31 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" + [[package]] name = "uncased" version = "0.9.7" @@ -588,12 +834,44 @@ dependencies = [ "version_check", ] +[[package]] +name = "unicode-bidi" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" + [[package]] name = "unicode-ident" version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "url" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 20115f7..4e81ff7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,3 +26,5 @@ serde = { version = "1.0.152", features = ["rc", "serde_derive"] } tokio = { version = "1.24.1", features = ["full"] } # async runtime tracing = "0.1.37" # logging & tracing api tracing-subscriber = "0.3.16" +tokio-tungstenite = "0.18.0" +futures-util = "0.3.25" diff --git a/src/http.rs b/src/http.rs index 1b6b906..0757cdc 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,14 +1,17 @@ use crate::prelude::*; +use std::convert::Infallible; + use http_body_util::Full; use hyper::server::conn::http1; use hyper::{body::Bytes, service::service_fn, Request, Response}; -use std::convert::Infallible; -use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::task::JoinHandle; +use tokio::sync::oneshot::Sender; +use tokio::task::JoinHandle; use tokio::net::TcpListener; +mod ws; + pub struct HttpServerActor { terminator: Sender<()>, fiber: JoinHandle>, @@ -49,7 +52,8 @@ impl HttpServerActor { let (stream, _) = result?; tokio::task::spawn(async move { if let Err(err) = http1::Builder::new() - .serve_connection(stream, service_fn(HttpServerActor::hello)) + .serve_connection(stream, service_fn(ws::handle_request)) + .with_upgrades() .await { tracing::error!("Error serving connection: {:?}", err); diff --git a/src/http/ws.rs b/src/http/ws.rs new file mode 100644 index 0000000..a0d71c6 --- /dev/null +++ b/src/http/ws.rs @@ -0,0 +1,108 @@ +use futures_util::TryStreamExt; +use http_body_util::Empty; +use hyper::header::{ + CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE, +}; +use hyper::http::HeaderValue; +use hyper::upgrade::Upgraded; +use hyper::{body::Bytes, Request, Response}; +use hyper::{Method, StatusCode, Version}; +use std::convert::Infallible; + +use tokio_tungstenite::tungstenite::handshake::derive_accept_key; +use tokio_tungstenite::tungstenite::protocol::Role; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::WebSocketStream; + +use futures_util::sink::SinkExt; +use futures_util::stream::StreamExt; + +async fn handle_connection( + ws_stream: WebSocketStream, +) { + tracing::info!("WebSocket connection established"); + + let (mut outgoing, incoming) = ws_stream.split(); + + let broadcast_incoming = incoming.try_for_each(|msg| { + tracing::info!( + "Received a message: {}", + msg.to_text().unwrap() + ); + + async { Ok(()) } + }); + + outgoing.send(Message::Text("adsads".into())).await.unwrap(); + + match broadcast_incoming.await { + Ok(_) => tracing::info!("Disconnected"), + Err(e) => tracing::warn!("Socket failed: {}", e), + } +} + +pub async fn handle_request( + mut req: Request, +) -> std::result::Result>, Infallible> { + dbg!(&req); + println!("Received a new, potentially ws handshake"); + println!("The request's path is: {}", req.uri().path()); + println!("The request's headers are:"); + for (ref header, _value) in req.headers() { + println!("* {}", header); + } + let upgrade = HeaderValue::from_static("Upgrade"); + let websocket = HeaderValue::from_static("websocket"); + let headers = req.headers(); + let key = headers.get(SEC_WEBSOCKET_KEY); + let derived = key.map(|k| derive_accept_key(k.as_bytes())); + if req.method() != Method::GET + || req.version() < Version::HTTP_11 + || !headers + .get(CONNECTION) + .and_then(|h| h.to_str().ok()) + .map(|h| { + h.split(|c| c == ' ' || c == ',') + .any(|p| p.eq_ignore_ascii_case(upgrade.to_str().unwrap())) + }) + .unwrap_or(false) + || !headers + .get(UPGRADE) + .and_then(|h| h.to_str().ok()) + .map(|h| h.eq_ignore_ascii_case("websocket")) + .unwrap_or(false) + || !headers + .get(SEC_WEBSOCKET_VERSION) + .map(|h| h == "13") + .unwrap_or(false) + || key.is_none() + || req.uri() != "/socket" + { + dbg!(); + let mut resp = Response::new(Empty::new()); + *resp.status_mut() = StatusCode::BAD_REQUEST; + return Ok(resp); + } + let ver = req.version(); + + tokio::task::spawn(async move { + match hyper::upgrade::on(&mut req).await { + Ok(upgraded) => { + handle_connection( + WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await, + ) + .await; + } + Err(e) => println!("upgrade error: {}", e), + } + }); + let mut res = Response::new(Empty::new()); + *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + *res.version_mut() = ver; + res.headers_mut().append(CONNECTION, upgrade); + res.headers_mut().append(UPGRADE, websocket); + res.headers_mut() + .append(SEC_WEBSOCKET_ACCEPT, derived.unwrap().parse().unwrap()); + dbg!(&res); + Ok(res) +}