use std::convert::Infallible; use std::net::SocketAddr; use futures_util::FutureExt; use http_body_util::{BodyExt, Full}; use hyper::body::Bytes; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::RoomRegistry; use lavina_core::terminator::Terminator; use mgmt_api::*; type HttpResult = std::result::Result; #[derive(Deserialize, Debug)] pub struct ServerConfig { pub listen_on: SocketAddr, } pub async fn launch( config: ServerConfig, metrics: MetricsRegistry, rooms: RoomRegistry, storage: Storage, ) -> Result { log::info!("Starting the telemetry service"); let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, storage, rx.map(|_| ()))); Ok(terminator) } async fn main_loop( listener: TcpListener, metrics: MetricsRegistry, rooms: RoomRegistry, storage: Storage, termination: impl Future, ) -> Result<()> { pin!(termination); loop { select! { biased; _ = &mut termination => break, result = listener.accept() => { let (stream, _) = result?; let metrics = metrics.clone(); let rooms = rooms.clone(); let storage = storage.clone(); tokio::task::spawn(async move { let registry = metrics.clone(); let rooms = rooms.clone(); let storage = storage.clone(); let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), storage.clone(), r))); if let Err(err) = server.await { tracing::error!("Error serving connection: {:?}", err); } }); }, } } log::info!("Terminating the telemetry service"); Ok(()) } async fn route( registry: MetricsRegistry, rooms: RoomRegistry, storage: Storage, request: Request, ) -> HttpResult>> { let res = match (request.method(), request.uri().path()) { (&Method::GET, "/metrics") => endpoint_metrics(registry), (&Method::GET, "/rooms") => endpoint_rooms(rooms).await, (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(), (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(), _ => not_found(), }; Ok(res) } fn endpoint_metrics(registry: MetricsRegistry) -> Response> { let mf = registry.gather(); let mut buffer = vec![]; TextEncoder.encode(&mf, &mut buffer).expect("write to vec cannot fail"); Response::new(Full::new(Bytes::from(buffer))) } async fn endpoint_rooms(rooms: RoomRegistry) -> Response> { // TODO introduce management API types independent from core-domain types // TODO remove `Serialize` implementations from all core-domain types let room_list = rooms.get_all_rooms().await.to_body(); Response::new(room_list) } async fn endpoint_create_player( request: Request, mut storage: Storage, ) -> Result>> { let str = request.collect().await?.to_bytes(); let Ok(res) = serde_json::from_slice::(&str[..]) else { let payload = ErrorResponse { code: errors::MALFORMED_REQUEST, message: "The request payload contains incorrect JSON value", } .to_body(); let mut response = Response::new(payload); *response.status_mut() = StatusCode::BAD_REQUEST; return Ok(response); }; storage.create_user(&res.name).await?; log::info!("Player {} created", res.name); let mut response = Response::new(Full::::default()); *response.status_mut() = StatusCode::CREATED; Ok(response) } async fn endpoint_set_password( request: Request, mut storage: Storage, ) -> Result>> { let str = request.collect().await?.to_bytes(); let Ok(res) = serde_json::from_slice::(&str[..]) else { let payload = ErrorResponse { code: errors::MALFORMED_REQUEST, message: "The request payload contains incorrect JSON value", } .to_body(); let mut response = Response::new(payload); *response.status_mut() = StatusCode::BAD_REQUEST; return Ok(response); }; let Some(_) = storage.set_password(&res.player_name, &res.password).await? else { let payload = ErrorResponse { code: errors::PLAYER_NOT_FOUND, message: "No such player exists", } .to_body(); let mut response = Response::new(payload); *response.status_mut() = StatusCode::UNPROCESSABLE_ENTITY; return Ok(response); }; log::info!("Password changed for player {}", res.player_name); let mut response = Response::new(Full::::default()); *response.status_mut() = StatusCode::NO_CONTENT; Ok(response) } pub fn not_found() -> Response> { let payload = ErrorResponse { code: errors::INVALID_PATH, message: "The path does not exist", } .to_body(); let mut response = Response::new(payload); *response.status_mut() = StatusCode::NOT_FOUND; response } trait Or5xx { fn or5xx(self) -> Response>; } impl Or5xx for Result>> { fn or5xx(self) -> Response> { match self { Ok(e) => e, Err(e) => { let mut response = Response::new(Full::new(e.to_string().into())); *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; response } } } } trait ToBody { fn to_body(&self) -> Full; } impl ToBody for T where T: Serialize, { fn to_body(&self) -> Full { let mut buffer = vec![]; serde_json::to_writer(&mut buffer, self).expect("unexpected fail when writing to vec"); Full::new(Bytes::from(buffer)) } }