use std::convert::Infallible; use std::net::SocketAddr; use futures_util::FutureExt; use http_body_util::{BodyExt, Full}; use hyper::body::Bytes; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response}; use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; use serde::Deserialize; use tokio::net::TcpListener; use tokio::sync::oneshot::channel; use crate::prelude::*; use crate::util::http::*; use crate::util::Terminator; type BoxBody = http_body_util::combinators::BoxBody; #[derive(Deserialize, Debug)] pub struct ServerConfig { pub listen_on: SocketAddr, } pub async fn launch(config: ServerConfig, metrics: MetricsRegistry) -> Result { log::info!("Starting the telemetry service"); let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); let (signal, rx) = channel(); let handle = tokio::task::spawn(main_loop(listener, metrics, rx.map(|_| ()))); let terminator = Terminator::from_raw(signal, handle); Ok(terminator) } async fn main_loop( listener: TcpListener, metrics: MetricsRegistry, termination: impl Future, ) -> Result<()> { pin!(termination); loop { select! { biased; _ = &mut termination => break, result = listener.accept() => { let (stream, _) = result?; let metrics = metrics.clone(); tokio::task::spawn(async move { let registry = metrics.clone(); let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), r))); if let Err(err) = server.await { tracing::error!("Error serving connection: {:?}", err); } }); }, } } log::info!("Terminating the telemetry service"); Ok(()) } async fn route( registry: MetricsRegistry, request: Request, ) -> std::result::Result, Infallible> { match (request.method(), request.uri().path()) { (&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)), _ => Ok(not_found()?.map(BodyExt::boxed)), } } fn metrics(registry: MetricsRegistry) -> std::result::Result>, Infallible> { let mf = registry.gather(); let mut buffer = vec![]; TextEncoder .encode(&mf, &mut buffer) .expect("write to vec cannot fail"); Ok(Response::new(Full::new(Bytes::from(buffer)))) }