forked from lavina/lavina
1
0
Fork 0
lavina/src/http.rs

200 lines
6.5 KiB
Rust
Raw Normal View History

use std::convert::Infallible;
use std::net::SocketAddr;
use futures_util::FutureExt;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
2023-09-24 20:59:34 +00:00
use hyper::{Method, Request, Response, StatusCode};
2023-12-10 21:22:26 +00:00
use hyper_util::rt::TokioIo;
use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder};
2023-09-24 20:59:34 +00:00
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use lavina_core::prelude::*;
2023-09-30 23:12:11 +00:00
use lavina_core::repo::Storage;
use lavina_core::room::RoomRegistry;
use lavina_core::terminator::Terminator;
2023-09-24 20:59:34 +00:00
use mgmt_api::*;
2023-02-15 17:10:54 +00:00
type HttpResult<T> = std::result::Result<T, Infallible>;
#[derive(Deserialize, Debug)]
pub struct ServerConfig {
pub listen_on: SocketAddr,
}
2023-02-15 17:10:54 +00:00
pub async fn launch(
config: ServerConfig,
metrics: MetricsRegistry,
rooms: RoomRegistry,
2023-09-24 20:59:34 +00:00
storage: Storage,
2023-02-15 17:10:54 +00:00
) -> Result<Terminator> {
2023-09-30 23:50:04 +00:00
log::info!("Starting the http service");
let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started");
2023-09-24 20:59:34 +00:00
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, storage, rx.map(|_| ())));
Ok(terminator)
}
async fn main_loop(
listener: TcpListener,
metrics: MetricsRegistry,
2023-02-15 17:10:54 +00:00
rooms: RoomRegistry,
2023-09-24 20:59:34 +00:00
storage: Storage,
termination: impl Future<Output = ()>,
) -> Result<()> {
pin!(termination);
loop {
select! {
biased;
_ = &mut termination => break,
result = listener.accept() => {
let (stream, _) = result?;
2023-12-10 21:22:26 +00:00
let stream = TokioIo::new(stream);
let metrics = metrics.clone();
2023-02-15 17:10:54 +00:00
let rooms = rooms.clone();
2023-09-24 20:59:34 +00:00
let storage = storage.clone();
tokio::task::spawn(async move {
let registry = metrics.clone();
2023-02-15 17:10:54 +00:00
let rooms = rooms.clone();
2023-09-24 20:59:34 +00:00
let storage = storage.clone();
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), storage.clone(), r)));
if let Err(err) = server.await {
tracing::error!("Error serving connection: {:?}", err);
}
});
},
}
}
2023-09-30 23:50:04 +00:00
log::info!("Terminating the http service");
Ok(())
}
async fn route(
registry: MetricsRegistry,
2023-02-15 17:10:54 +00:00
rooms: RoomRegistry,
2023-09-24 20:59:34 +00:00
storage: Storage,
request: Request<hyper::body::Incoming>,
2023-09-24 20:59:34 +00:00
) -> HttpResult<Response<Full<Bytes>>> {
let res = match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => endpoint_metrics(registry),
(&Method::GET, "/rooms") => endpoint_rooms(rooms).await,
(&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(),
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(),
_ => not_found(),
};
Ok(res)
}
2023-09-24 20:59:34 +00:00
fn endpoint_metrics(registry: MetricsRegistry) -> Response<Full<Bytes>> {
let mf = registry.gather();
let mut buffer = vec![];
2023-09-24 20:59:34 +00:00
TextEncoder.encode(&mf, &mut buffer).expect("write to vec cannot fail");
Response::new(Full::new(Bytes::from(buffer)))
}
2023-02-15 17:10:54 +00:00
2023-09-24 20:59:34 +00:00
async fn endpoint_rooms(rooms: RoomRegistry) -> Response<Full<Bytes>> {
// TODO introduce management API types independent from core-domain types
// TODO remove `Serialize` implementations from all core-domain types
let room_list = rooms.get_all_rooms().await.to_body();
Response::new(room_list)
}
async fn endpoint_create_player(
request: Request<hyper::body::Incoming>,
mut storage: Storage,
) -> Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes();
let Ok(res) = serde_json::from_slice::<CreatePlayerRequest>(&str[..]) else {
let payload = ErrorResponse {
code: errors::MALFORMED_REQUEST,
message: "The request payload contains incorrect JSON value",
}
.to_body();
let mut response = Response::new(payload);
*response.status_mut() = StatusCode::BAD_REQUEST;
return Ok(response);
};
storage.create_user(&res.name).await?;
log::info!("Player {} created", res.name);
let mut response = Response::new(Full::<Bytes>::default());
*response.status_mut() = StatusCode::CREATED;
Ok(response)
}
async fn endpoint_set_password(
request: Request<hyper::body::Incoming>,
mut storage: Storage,
) -> Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes();
let Ok(res) = serde_json::from_slice::<ChangePasswordRequest>(&str[..]) else {
let payload = ErrorResponse {
code: errors::MALFORMED_REQUEST,
message: "The request payload contains incorrect JSON value",
}
.to_body();
let mut response = Response::new(payload);
*response.status_mut() = StatusCode::BAD_REQUEST;
return Ok(response);
};
let Some(_) = storage.set_password(&res.player_name, &res.password).await? else {
let payload = ErrorResponse {
code: errors::PLAYER_NOT_FOUND,
message: "No such player exists",
}
.to_body();
let mut response = Response::new(payload);
*response.status_mut() = StatusCode::UNPROCESSABLE_ENTITY;
return Ok(response);
};
log::info!("Password changed for player {}", res.player_name);
let mut response = Response::new(Full::<Bytes>::default());
*response.status_mut() = StatusCode::NO_CONTENT;
Ok(response)
}
pub fn not_found() -> Response<Full<Bytes>> {
let payload = ErrorResponse {
code: errors::INVALID_PATH,
message: "The path does not exist",
}
.to_body();
let mut response = Response::new(payload);
*response.status_mut() = StatusCode::NOT_FOUND;
response
}
trait Or5xx {
fn or5xx(self) -> Response<Full<Bytes>>;
}
impl Or5xx for Result<Response<Full<Bytes>>> {
fn or5xx(self) -> Response<Full<Bytes>> {
match self {
Ok(e) => e,
Err(e) => {
let mut response = Response::new(Full::new(e.to_string().into()));
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
response
}
}
}
}
trait ToBody {
fn to_body(&self) -> Full<Bytes>;
}
impl<T> ToBody for T
where
T: Serialize,
{
fn to_body(&self) -> Full<Bytes> {
let mut buffer = vec![];
serde_json::to_writer(&mut buffer, self).expect("unexpected fail when writing to vec");
Full::new(Bytes::from(buffer))
}
2023-02-15 17:10:54 +00:00
}