From d6b22892213becb50ae0d67ee1099de245ad7a27 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Sat, 28 Jan 2023 00:43:20 +0400 Subject: [PATCH] prometheus example usage --- Cargo.lock | 15 +++++++++++++++ Cargo.toml | 1 + src/http.rs | 44 ++++++++++++++++++++++++++++++++++++-------- src/http/ws.rs | 9 ++------- src/main.rs | 8 +++++++- 5 files changed, 61 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0854670..80cfa2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -312,6 +312,7 @@ dependencies = [ "futures-util", "http-body-util", "hyper", + "prometheus", "serde", "tokio", "tokio-tungstenite", @@ -501,6 +502,20 @@ dependencies = [ "yansi", ] +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "thiserror", +] + [[package]] name = "quote" version = "1.0.23" diff --git a/Cargo.toml b/Cargo.toml index 4e81ff7..e10153a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,4 @@ tracing = "0.1.37" # logging & tracing api tracing-subscriber = "0.3.16" tokio-tungstenite = "0.18.0" futures-util = "0.3.25" +prometheus = { version = "0.13.3", default_features = false } diff --git a/src/http.rs b/src/http.rs index c8c9efc..45e2a52 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,15 +1,17 @@ use crate::prelude::*; use std::convert::Infallible; +use std::sync::Arc; -use http_body_util::{Full, BodyExt}; -use hyper::{StatusCode, Method}; +use http_body_util::{BodyExt, Full}; use hyper::server::conn::http1; use hyper::{body::Bytes, service::service_fn, Request, Response}; +use hyper::{Method, StatusCode}; +use prometheus::{Encoder, IntGauge, Opts, Registry, TextEncoder}; +use tokio::net::TcpListener; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -use tokio::net::TcpListener; mod ws; @@ -27,10 +29,25 @@ fn not_found() -> std::result::Result>, Infallible> { Ok(response) } -async fn route(request: Request) -> std::result::Result, Infallible> { +fn metrics(registry: Arc) -> std::result::Result>, Infallible> { + let mf = registry.gather(); + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + encoder + .encode(&mf, &mut buffer) + .expect("write to vec cannot fail"); + + Ok(Response::new(Full::new(Bytes::from(buffer)))) +} + +async fn route( + registry: Arc, + request: Request, +) -> std::result::Result, Infallible> { match (request.method(), request.uri().path()) { (&Method::GET, "/hello") => Ok(hello(request).await?.map(BodyExt::boxed)), (&Method::GET, "/socket") => Ok(ws::handle_request(request).await?.map(BodyExt::boxed)), + (&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)), _ => Ok(not_found()?.map(BodyExt::boxed)), } } @@ -40,9 +57,9 @@ pub struct HttpServerActor { fiber: JoinHandle>, } impl HttpServerActor { - pub async fn launch(listener: TcpListener) -> Result { + pub async fn launch(listener: TcpListener, metrics: Arc) -> Result { let (terminator, receiver) = tokio::sync::oneshot::channel::<()>(); - let fiber = tokio::task::spawn(Self::main_loop(listener, receiver)); + let fiber = tokio::task::spawn(Self::main_loop(listener, receiver, metrics)); Ok(HttpServerActor { terminator, fiber }) } @@ -55,9 +72,15 @@ impl HttpServerActor { Ok(()) } - async fn main_loop(listener: TcpListener, termination: impl Future) -> Result<()> { + async fn main_loop( + listener: TcpListener, + termination: impl Future, + registry: Arc, + ) -> Result<()> { log::info!("Starting the http server"); pin!(termination); + let reqs = IntGauge::with_opts(Opts::new("sockets", "Number of open sockets"))?; + registry.register(Box::new(reqs.clone()))?; loop { select! { @@ -67,14 +90,19 @@ impl HttpServerActor { }, result = listener.accept() => { let (stream, _) = result?; + let c = registry.clone(); + let reqs = reqs.clone(); tokio::task::spawn(async move { + reqs.inc(); + let c = c.clone(); if let Err(err) = http1::Builder::new() - .serve_connection(stream, service_fn(route)) + .serve_connection(stream, service_fn(move |r| route(c.clone(), r))) .with_upgrades() .await { tracing::error!("Error serving connection: {:?}", err); } + reqs.dec(); }); }, } diff --git a/src/http/ws.rs b/src/http/ws.rs index dada0c7..ef3fb64 100644 --- a/src/http/ws.rs +++ b/src/http/ws.rs @@ -17,18 +17,13 @@ use tokio_tungstenite::WebSocketStream; use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; -async fn handle_connection( - ws_stream: WebSocketStream, -) { +async fn handle_connection(ws_stream: WebSocketStream) { tracing::info!("WebSocket connection established"); let (mut outgoing, incoming) = ws_stream.split(); let broadcast_incoming = incoming.try_for_each(|msg| { - tracing::info!( - "Received a message: {}", - msg.to_text().unwrap() - ); + tracing::info!("Received a message: {}", msg.to_text().unwrap()); async { Ok(()) } }); diff --git a/src/main.rs b/src/main.rs index 7d72c5b..bbe83e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,11 +3,13 @@ mod prelude; mod tcp; use crate::prelude::*; +use prometheus::{IntCounter, Opts, Registry}; use tcp::ClientSocketActor; use std::collections::HashMap; use std::future::Future; use std::net::SocketAddr; +use std::sync::Arc; use figment::providers::Format; use tokio::net::{TcpListener, TcpStream}; @@ -41,9 +43,13 @@ async fn main() -> Result<()> { let config = load_config()?; dbg!(config); tracing::info!("Booting up"); + let registry = Arc::new(Registry::new()); + let counter = IntCounter::with_opts(Opts::new("actor_count", "Number of alive actors"))?; + registry.register(Box::new(counter.clone()))?; + let listener = TcpListener::bind("127.0.0.1:3721").await?; let listener_http = TcpListener::bind("127.0.0.1:8080").await?; - let http_server_actor = http::HttpServerActor::launch(listener_http).await?; + let http_server_actor = http::HttpServerActor::launch(listener_http, registry.clone()).await?; tracing::info!("Started");