diff --git a/src/projections/mod.rs b/src/projections/mod.rs index b5e9363..b53a1b5 100644 --- a/src/projections/mod.rs +++ b/src/projections/mod.rs @@ -1,3 +1,2 @@ //! Protocol projections — implementations of public APIs. pub mod irc; -pub mod trivial; diff --git a/src/projections/trivial.rs b/src/projections/trivial.rs deleted file mode 100644 index 234a4af..0000000 --- a/src/projections/trivial.rs +++ /dev/null @@ -1,186 +0,0 @@ -//! Projection into a primitive WebSocket-based protocol for testing. -use http_body_util::Empty; -use hyper::body::Incoming; -use hyper::header::{ - CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE, -}; -use hyper::http::HeaderValue; -use hyper::upgrade::Upgraded; -use hyper::{body::Bytes, Request, Response}; -use hyper::{StatusCode, Version}; -use regex::Regex; -use std::convert::Infallible; - -use tokio_tungstenite::tungstenite::handshake::derive_accept_key; -use tokio_tungstenite::tungstenite::protocol::Role; -use tokio_tungstenite::tungstenite::Message; -use tokio_tungstenite::WebSocketStream; - -use futures_util::sink::SinkExt; -use futures_util::stream::StreamExt; - -use crate::core::player::{PlayerRegistry, Updates}; -use crate::core::room::RoomId; - -enum WsCommand { - CreateRoom, - Send { room_id: RoomId, body: String }, - Join { room_id: RoomId }, -} - -fn parse(str: &str) -> Option { - if str == "/create\n" { - return Some(WsCommand::CreateRoom); - } - let pattern_send = Regex::new(r"^/send (\d+) (.+)\n$").unwrap(); - if let Some(captures) = pattern_send.captures(str) { - if let (Some(id), Some(msg)) = (captures.get(1), captures.get(2)) { - return Some(WsCommand::Send { - room_id: RoomId(id.as_str().parse().unwrap()), - body: msg.as_str().to_owned(), - }); - } - return None; - } - let pattern_join = Regex::new(r"^/join (\d+)\n$").unwrap(); - if let Some(captures) = pattern_join.captures(str) { - if let Some(id) = captures.get(1) { - return Some(WsCommand::Join { - room_id: RoomId(id.as_str().parse().unwrap()), - }); - } - return None; - } - None -} - -async fn handle_connection(mut ws_stream: WebSocketStream, mut players: PlayerRegistry) { - tracing::info!("WebSocket connection established"); - - let (player_id, mut player_handle) = players.create_player().await; - tracing::info!("New conn id: {player_id:?}"); - - ws_stream - .send(Message::Text("Started a connection!".into())) - .await - .unwrap(); - - let mut sub = player_handle.subscribe().await; - - tracing::info!("Started stream for {player_id:?}"); - loop { - tokio::select! { - biased; - msg = ws_stream.next() => { - match msg { - Some(Ok(msg)) => { - let txt = msg.to_text().unwrap().to_string(); - tracing::info!("Received a message: {txt}, sub_id={player_id:?}"); - let text = msg.into_text().unwrap(); - let parsed = parse(text.as_str()); - match parsed { - Some(WsCommand::CreateRoom) => { - player_handle.create_room().await - }, - Some(WsCommand::Send { room_id, body }) => { - player_handle.send_message(room_id, body).await - }, - Some(WsCommand::Join { room_id }) => { - player_handle.join_room(room_id).await - }, - None => { - ws_stream.send(Message::Text(format!("Failed to parse: {text}"))).await; - }, - } - }, - Some(Err(err)) => { - tracing::warn!("Client {player_id:?} failure: {err}"); - break; - } - None => { - tracing::info!("Client {player_id:?} closed the socket, stopping.."); - break; - }, - } - }, - msg = sub.recv() => { - match msg { - Some(msg) => { - match msg { - Updates::RoomJoined { room_id } => { - ws_stream.send(Message::Text(format!("Joined room {room_id:?}"))).await.unwrap(); - }, - Updates::NewMessage { room_id, body } => { - ws_stream.send(Message::Text(format!("{room_id:?}: {body}"))).await.unwrap(); - } - } - }, - None => { - break; - } - } - } - } - } - tracing::info!("Ended stream for {player_id:?}"); -} - -pub async fn handle_request( - mut req: Request, - players: PlayerRegistry, -) -> std::result::Result>, Infallible> { - tracing::info!("Received a new WS request"); - let upgrade = HeaderValue::from_static("Upgrade"); - let websocket = HeaderValue::from_static("websocket"); - let headers = req.headers(); - let key = headers.get(SEC_WEBSOCKET_KEY); - let derived = key.map(|k| derive_accept_key(k.as_bytes())); - if req.version() < Version::HTTP_11 - || !headers - .get(CONNECTION) - .and_then(|h| h.to_str().ok()) - .map(|h| { - h.split(|c| c == ' ' || c == ',') - .any(|p| p.eq_ignore_ascii_case(upgrade.to_str().unwrap())) - }) - .unwrap_or(false) - || !headers - .get(UPGRADE) - .and_then(|h| h.to_str().ok()) - .map(|h| h.eq_ignore_ascii_case("websocket")) - .unwrap_or(false) - || !headers - .get(SEC_WEBSOCKET_VERSION) - .map(|h| h == "13") - .unwrap_or(false) - || key.is_none() - { - tracing::info!("Malformed request"); - let mut resp = Response::new(Empty::new()); - *resp.status_mut() = StatusCode::BAD_REQUEST; - return Ok(resp); - } - let ver = req.version(); - - let players = players.clone(); - tokio::task::spawn(async move { - match hyper::upgrade::on(&mut req).await { - Ok(upgraded) => { - handle_connection( - WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await, - players, - ) - .await; - } - Err(err) => tracing::error!("upgrade error: {err}"), - } - }); - let mut res = Response::new(Empty::new()); - *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; - *res.version_mut() = ver; - res.headers_mut().append(CONNECTION, upgrade); - res.headers_mut().append(UPGRADE, websocket); - res.headers_mut() - .append(SEC_WEBSOCKET_ACCEPT, derived.unwrap().parse().unwrap()); - Ok(res) -}