forked from lavina/lavina
remove trivial projection
This commit is contained in:
parent
08fe958d60
commit
7a988f39b5
|
@ -1,3 +1,2 @@
|
||||||
//! Protocol projections — implementations of public APIs.
|
//! Protocol projections — implementations of public APIs.
|
||||||
pub mod irc;
|
pub mod irc;
|
||||||
pub mod trivial;
|
|
||||||
|
|
|
@ -1,186 +0,0 @@
|
||||||
//! Projection into a primitive WebSocket-based protocol for testing.
|
|
||||||
use http_body_util::Empty;
|
|
||||||
use hyper::body::Incoming;
|
|
||||||
use hyper::header::{
|
|
||||||
CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
|
|
||||||
};
|
|
||||||
use hyper::http::HeaderValue;
|
|
||||||
use hyper::upgrade::Upgraded;
|
|
||||||
use hyper::{body::Bytes, Request, Response};
|
|
||||||
use hyper::{StatusCode, Version};
|
|
||||||
use regex::Regex;
|
|
||||||
use std::convert::Infallible;
|
|
||||||
|
|
||||||
use tokio_tungstenite::tungstenite::handshake::derive_accept_key;
|
|
||||||
use tokio_tungstenite::tungstenite::protocol::Role;
|
|
||||||
use tokio_tungstenite::tungstenite::Message;
|
|
||||||
use tokio_tungstenite::WebSocketStream;
|
|
||||||
|
|
||||||
use futures_util::sink::SinkExt;
|
|
||||||
use futures_util::stream::StreamExt;
|
|
||||||
|
|
||||||
use crate::core::player::{PlayerRegistry, Updates};
|
|
||||||
use crate::core::room::RoomId;
|
|
||||||
|
|
||||||
enum WsCommand {
|
|
||||||
CreateRoom,
|
|
||||||
Send { room_id: RoomId, body: String },
|
|
||||||
Join { room_id: RoomId },
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse(str: &str) -> Option<WsCommand> {
|
|
||||||
if str == "/create\n" {
|
|
||||||
return Some(WsCommand::CreateRoom);
|
|
||||||
}
|
|
||||||
let pattern_send = Regex::new(r"^/send (\d+) (.+)\n$").unwrap();
|
|
||||||
if let Some(captures) = pattern_send.captures(str) {
|
|
||||||
if let (Some(id), Some(msg)) = (captures.get(1), captures.get(2)) {
|
|
||||||
return Some(WsCommand::Send {
|
|
||||||
room_id: RoomId(id.as_str().parse().unwrap()),
|
|
||||||
body: msg.as_str().to_owned(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
let pattern_join = Regex::new(r"^/join (\d+)\n$").unwrap();
|
|
||||||
if let Some(captures) = pattern_join.captures(str) {
|
|
||||||
if let Some(id) = captures.get(1) {
|
|
||||||
return Some(WsCommand::Join {
|
|
||||||
room_id: RoomId(id.as_str().parse().unwrap()),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_connection(mut ws_stream: WebSocketStream<Upgraded>, mut players: PlayerRegistry) {
|
|
||||||
tracing::info!("WebSocket connection established");
|
|
||||||
|
|
||||||
let (player_id, mut player_handle) = players.create_player().await;
|
|
||||||
tracing::info!("New conn id: {player_id:?}");
|
|
||||||
|
|
||||||
ws_stream
|
|
||||||
.send(Message::Text("Started a connection!".into()))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut sub = player_handle.subscribe().await;
|
|
||||||
|
|
||||||
tracing::info!("Started stream for {player_id:?}");
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
biased;
|
|
||||||
msg = ws_stream.next() => {
|
|
||||||
match msg {
|
|
||||||
Some(Ok(msg)) => {
|
|
||||||
let txt = msg.to_text().unwrap().to_string();
|
|
||||||
tracing::info!("Received a message: {txt}, sub_id={player_id:?}");
|
|
||||||
let text = msg.into_text().unwrap();
|
|
||||||
let parsed = parse(text.as_str());
|
|
||||||
match parsed {
|
|
||||||
Some(WsCommand::CreateRoom) => {
|
|
||||||
player_handle.create_room().await
|
|
||||||
},
|
|
||||||
Some(WsCommand::Send { room_id, body }) => {
|
|
||||||
player_handle.send_message(room_id, body).await
|
|
||||||
},
|
|
||||||
Some(WsCommand::Join { room_id }) => {
|
|
||||||
player_handle.join_room(room_id).await
|
|
||||||
},
|
|
||||||
None => {
|
|
||||||
ws_stream.send(Message::Text(format!("Failed to parse: {text}"))).await;
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Some(Err(err)) => {
|
|
||||||
tracing::warn!("Client {player_id:?} failure: {err}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
tracing::info!("Client {player_id:?} closed the socket, stopping..");
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
msg = sub.recv() => {
|
|
||||||
match msg {
|
|
||||||
Some(msg) => {
|
|
||||||
match msg {
|
|
||||||
Updates::RoomJoined { room_id } => {
|
|
||||||
ws_stream.send(Message::Text(format!("Joined room {room_id:?}"))).await.unwrap();
|
|
||||||
},
|
|
||||||
Updates::NewMessage { room_id, body } => {
|
|
||||||
ws_stream.send(Message::Text(format!("{room_id:?}: {body}"))).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
None => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tracing::info!("Ended stream for {player_id:?}");
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handle_request(
|
|
||||||
mut req: Request<Incoming>,
|
|
||||||
players: PlayerRegistry,
|
|
||||||
) -> std::result::Result<Response<Empty<Bytes>>, Infallible> {
|
|
||||||
tracing::info!("Received a new WS request");
|
|
||||||
let upgrade = HeaderValue::from_static("Upgrade");
|
|
||||||
let websocket = HeaderValue::from_static("websocket");
|
|
||||||
let headers = req.headers();
|
|
||||||
let key = headers.get(SEC_WEBSOCKET_KEY);
|
|
||||||
let derived = key.map(|k| derive_accept_key(k.as_bytes()));
|
|
||||||
if req.version() < Version::HTTP_11
|
|
||||||
|| !headers
|
|
||||||
.get(CONNECTION)
|
|
||||||
.and_then(|h| h.to_str().ok())
|
|
||||||
.map(|h| {
|
|
||||||
h.split(|c| c == ' ' || c == ',')
|
|
||||||
.any(|p| p.eq_ignore_ascii_case(upgrade.to_str().unwrap()))
|
|
||||||
})
|
|
||||||
.unwrap_or(false)
|
|
||||||
|| !headers
|
|
||||||
.get(UPGRADE)
|
|
||||||
.and_then(|h| h.to_str().ok())
|
|
||||||
.map(|h| h.eq_ignore_ascii_case("websocket"))
|
|
||||||
.unwrap_or(false)
|
|
||||||
|| !headers
|
|
||||||
.get(SEC_WEBSOCKET_VERSION)
|
|
||||||
.map(|h| h == "13")
|
|
||||||
.unwrap_or(false)
|
|
||||||
|| key.is_none()
|
|
||||||
{
|
|
||||||
tracing::info!("Malformed request");
|
|
||||||
let mut resp = Response::new(Empty::new());
|
|
||||||
*resp.status_mut() = StatusCode::BAD_REQUEST;
|
|
||||||
return Ok(resp);
|
|
||||||
}
|
|
||||||
let ver = req.version();
|
|
||||||
|
|
||||||
let players = players.clone();
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
match hyper::upgrade::on(&mut req).await {
|
|
||||||
Ok(upgraded) => {
|
|
||||||
handle_connection(
|
|
||||||
WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await,
|
|
||||||
players,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
Err(err) => tracing::error!("upgrade error: {err}"),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
let mut res = Response::new(Empty::new());
|
|
||||||
*res.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
|
|
||||||
*res.version_mut() = ver;
|
|
||||||
res.headers_mut().append(CONNECTION, upgrade);
|
|
||||||
res.headers_mut().append(UPGRADE, websocket);
|
|
||||||
res.headers_mut()
|
|
||||||
.append(SEC_WEBSOCKET_ACCEPT, derived.unwrap().parse().unwrap());
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
Loading…
Reference in New Issue