use chrono::Utc; use std::convert::Infallible; use std::net::SocketAddr; use futures_util::FutureExt; use http_body_util::{BodyExt, Full}; use hyper::body::Bytes; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use hyper_util::rt::TokioIo; use opentelemetry::propagation::Extractor; use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; use lavina_core::auth::UpdatePasswordResult::PasswordUpdated; use lavina_core::auth::{Authenticator, UpdatePasswordResult}; use lavina_core::clustering::SendMessageReq; use lavina_core::player::{PlayerId, PlayerRegistry, SendMessageResult}; use lavina_core::prelude::*; use lavina_core::repo::Storage; use lavina_core::room::{RoomId, RoomRegistry}; use lavina_core::terminator::Terminator; use lavina_core::LavinaCore; use mgmt_api::*; type HttpResult = std::result::Result; #[derive(Deserialize, Debug)] pub struct ServerConfig { pub listen_on: SocketAddr, } pub async fn launch( config: ServerConfig, metrics: MetricsRegistry, core: LavinaCore, storage: Storage, ) -> Result { log::info!("Starting the http service"); let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, core, storage, rx.map(|_| ()))); Ok(terminator) } async fn main_loop( listener: TcpListener, metrics: MetricsRegistry, core: LavinaCore, storage: Storage, termination: impl Future, ) -> Result<()> { pin!(termination); loop { select! { biased; _ = &mut termination => break, result = listener.accept() => { let (stream, _) = result?; let stream = TokioIo::new(stream); let metrics = metrics.clone(); let core = core.clone(); let storage = storage.clone(); tokio::task::spawn(async move { let registry = metrics.clone(); let core = core.clone(); let storage = storage.clone(); let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), core.clone(), storage.clone(), r))); if let Err(err) = server.await { tracing::error!("Error serving connection: {:?}", err); } }); }, } } log::info!("Terminating the http service"); Ok(()) } #[tracing::instrument(skip_all, name = "route")] async fn route( registry: MetricsRegistry, core: LavinaCore, storage: Storage, request: Request, ) -> HttpResult>> { struct HttpReqExtractor<'a, T> { req: &'a Request, } impl<'a, T> Extractor for HttpReqExtractor<'a, T> { fn get(&self, key: &str) -> Option<&str> { self.req.headers().get(key).and_then(|v| v.to_str().ok()) } fn keys(&self) -> Vec<&str> { self.req.headers().keys().map(|k| k.as_str()).collect() } } let ctx = opentelemetry::global::get_text_map_propagator(|pp| pp.extract(&HttpReqExtractor { req: &request })); Span::current().set_parent(ctx); let res = match (request.method(), request.uri().path()) { (&Method::GET, "/metrics") => endpoint_metrics(registry), (&Method::GET, "/rooms") => endpoint_rooms(core.rooms).await, (&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(), (&Method::POST, paths::STOP_PLAYER) => endpoint_stop_player(request, core.players).await.or5xx(), (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(), (&Method::POST, rooms::paths::SEND_MESSAGE) => endpoint_send_room_message(request, core).await.or5xx(), (&Method::POST, rooms::paths::SET_TOPIC) => endpoint_set_room_topic(request, core).await.or5xx(), (&Method::POST, "/cluster/rooms/add_message") => endpoint_cluster_add_message(request, core).await.or5xx(), _ => endpoint_not_found(), }; Ok(res) } fn endpoint_metrics(registry: MetricsRegistry) -> Response> { let mf = registry.gather(); let mut buffer = vec![]; TextEncoder.encode(&mf, &mut buffer).expect("write to vec cannot fail"); Response::new(Full::new(Bytes::from(buffer))) } #[tracing::instrument(skip_all)] async fn endpoint_rooms(rooms: RoomRegistry) -> Response> { // TODO introduce management API types independent from core-domain types // TODO remove `Serialize` implementations from all core-domain types let room_list = rooms.get_all_rooms().await.to_body(); Response::new(room_list) } #[tracing::instrument(skip_all)] async fn endpoint_create_player( request: Request, mut storage: Storage, ) -> Result>> { let str = request.collect().await?.to_bytes(); let Ok(res) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; storage.create_user(&res.name).await?; log::info!("Player {} created", res.name); let mut response = Response::new(Full::::default()); *response.status_mut() = StatusCode::CREATED; Ok(response) } #[tracing::instrument(skip_all)] async fn endpoint_stop_player( request: Request, players: PlayerRegistry, ) -> Result>> { let str = request.collect().await?.to_bytes(); let Ok(res) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; let Ok(player_id) = PlayerId::from(res.name) else { return Ok(player_not_found()); }; let Some(()) = players.stop_player(&player_id).await? else { return Ok(player_not_found()); }; Ok(empty_204_request()) } #[tracing::instrument(skip_all)] async fn endpoint_set_password( request: Request, storage: Storage, ) -> Result>> { let str = request.collect().await?.to_bytes(); let Ok(res) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; let verdict = Authenticator::new(&storage).set_password(&res.player_name, &res.password).await?; match verdict { UpdatePasswordResult::PasswordUpdated => {} UpdatePasswordResult::UserNotFound => { return Ok(player_not_found()); } } Ok(empty_204_request()) } #[tracing::instrument(skip_all, name = "LavinaClient::endpoint_send_room_message")] async fn endpoint_send_room_message( request: Request, mut core: LavinaCore, ) -> Result>> { let str = request.collect().await?.to_bytes(); let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; let Ok(room_id) = RoomId::from(req.room_id) else { return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.author_id) else { return Ok(player_not_found()); }; let mut player = core.players.connect_to_player(&player_id).await; let res = player.send_message(room_id, req.message.into()).await?; match res { SendMessageResult::NoSuchRoom => Ok(room_not_found()), SendMessageResult::Success(_) => Ok(empty_204_request()), } } async fn endpoint_set_room_topic( request: Request, core: LavinaCore, ) -> Result>> { let str = request.collect().await?.to_bytes(); let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; let Ok(room_id) = RoomId::from(req.room_id) else { return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.author_id) else { return Ok(player_not_found()); }; let mut player = core.players.connect_to_player(&player_id).await; player.change_topic(room_id, req.topic.into()).await?; Ok(empty_204_request()) } #[tracing::instrument(skip_all, name = "endpoint_cluster_add_message")] async fn endpoint_cluster_add_message( request: Request, core: LavinaCore, ) -> Result>> { let str = request.collect().await?.to_bytes(); let Ok(req) = serde_json::from_slice::(&str[..]) else { return Ok(malformed_request()); }; tracing::info!("Incoming request: {:?}", &req); let Ok(created_at) = chrono::DateTime::parse_from_rfc3339(req.created_at) else { dbg!(&req.created_at); return Ok(malformed_request()); }; let Ok(room_id) = RoomId::from(req.room_id) else { dbg!(&req.room_id); return Ok(room_not_found()); }; let Ok(player_id) = PlayerId::from(req.player_id) else { dbg!(&req.player_id); return Ok(player_not_found()); }; let Some(room_handle) = core.rooms.get_room(&room_id).await else { dbg!(&room_id); return Ok(room_not_found()); }; room_handle.send_message(&player_id, req.message.into(), created_at.to_utc()).await; Ok(empty_204_request()) } fn endpoint_not_found() -> Response> { let payload = ErrorResponse { code: errors::INVALID_PATH, message: "The path does not exist", } .to_body(); let mut response = Response::new(payload); *response.status_mut() = StatusCode::NOT_FOUND; response } fn player_not_found() -> Response> { let payload = ErrorResponse { code: errors::PLAYER_NOT_FOUND, message: "No such player exists", } .to_body(); let mut response = Response::new(payload); *response.status_mut() = StatusCode::UNPROCESSABLE_ENTITY; response } fn room_not_found() -> Response> { let payload = ErrorResponse { code: rooms::errors::ROOM_NOT_FOUND, message: "No such room exists", } .to_body(); let mut response = Response::new(payload); *response.status_mut() = StatusCode::UNPROCESSABLE_ENTITY; response } fn malformed_request() -> Response> { let payload = ErrorResponse { code: errors::MALFORMED_REQUEST, message: "The request payload contains incorrect JSON value", } .to_body(); let mut response = Response::new(payload); *response.status_mut() = StatusCode::BAD_REQUEST; return response; } fn empty_204_request() -> Response> { let mut response = Response::new(Full::::default()); *response.status_mut() = StatusCode::NO_CONTENT; response } trait Or5xx { fn or5xx(self) -> Response>; } impl Or5xx for Result>> { fn or5xx(self) -> Response> { self.unwrap_or_else(|e| { let mut response = Response::new(Full::new(e.to_string().into())); *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; response }) } } trait ToBody { fn to_body(&self) -> Full; } impl ToBody for T where T: Serialize, { fn to_body(&self) -> Full { let mut buffer = vec![]; serde_json::to_writer(&mut buffer, self).expect("unexpected fail when writing to vec"); Full::new(Bytes::from(buffer)) } }