prometheus example usage

This commit is contained in:
Nikita Vilunov 2023-01-28 00:43:20 +04:00
parent 0b0c432bb8
commit d6b2289221
5 changed files with 61 additions and 16 deletions

15
Cargo.lock generated
View File

@ -312,6 +312,7 @@ dependencies = [
"futures-util", "futures-util",
"http-body-util", "http-body-util",
"hyper", "hyper",
"prometheus",
"serde", "serde",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
@ -501,6 +502,20 @@ dependencies = [
"yansi", "yansi",
] ]
[[package]]
name = "prometheus"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"thiserror",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.23" version = "1.0.23"

View File

@ -28,3 +28,4 @@ tracing = "0.1.37" # logging & tracing api
tracing-subscriber = "0.3.16" tracing-subscriber = "0.3.16"
tokio-tungstenite = "0.18.0" tokio-tungstenite = "0.18.0"
futures-util = "0.3.25" futures-util = "0.3.25"
prometheus = { version = "0.13.3", default_features = false }

View File

@ -1,15 +1,17 @@
use crate::prelude::*; use crate::prelude::*;
use std::convert::Infallible; use std::convert::Infallible;
use std::sync::Arc;
use http_body_util::{Full, BodyExt}; use http_body_util::{BodyExt, Full};
use hyper::{StatusCode, Method};
use hyper::server::conn::http1; use hyper::server::conn::http1;
use hyper::{body::Bytes, service::service_fn, Request, Response}; use hyper::{body::Bytes, service::service_fn, Request, Response};
use hyper::{Method, StatusCode};
use prometheus::{Encoder, IntGauge, Opts, Registry, TextEncoder};
use tokio::net::TcpListener;
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::net::TcpListener;
mod ws; mod ws;
@ -27,10 +29,25 @@ fn not_found() -> std::result::Result<Response<Full<Bytes>>, Infallible> {
Ok(response) Ok(response)
} }
async fn route(request: Request<hyper::body::Incoming>) -> std::result::Result<Response<BoxBody>, Infallible> { fn metrics(registry: Arc<Registry>) -> std::result::Result<Response<Full<Bytes>>, Infallible> {
let mf = registry.gather();
let mut buffer = vec![];
let encoder = TextEncoder::new();
encoder
.encode(&mf, &mut buffer)
.expect("write to vec cannot fail");
Ok(Response::new(Full::new(Bytes::from(buffer))))
}
async fn route(
registry: Arc<Registry>,
request: Request<hyper::body::Incoming>,
) -> std::result::Result<Response<BoxBody>, Infallible> {
match (request.method(), request.uri().path()) { match (request.method(), request.uri().path()) {
(&Method::GET, "/hello") => Ok(hello(request).await?.map(BodyExt::boxed)), (&Method::GET, "/hello") => Ok(hello(request).await?.map(BodyExt::boxed)),
(&Method::GET, "/socket") => Ok(ws::handle_request(request).await?.map(BodyExt::boxed)), (&Method::GET, "/socket") => Ok(ws::handle_request(request).await?.map(BodyExt::boxed)),
(&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)),
_ => Ok(not_found()?.map(BodyExt::boxed)), _ => Ok(not_found()?.map(BodyExt::boxed)),
} }
} }
@ -40,9 +57,9 @@ pub struct HttpServerActor {
fiber: JoinHandle<Result<()>>, fiber: JoinHandle<Result<()>>,
} }
impl HttpServerActor { impl HttpServerActor {
pub async fn launch(listener: TcpListener) -> Result<HttpServerActor> { pub async fn launch(listener: TcpListener, metrics: Arc<Registry>) -> Result<HttpServerActor> {
let (terminator, receiver) = tokio::sync::oneshot::channel::<()>(); let (terminator, receiver) = tokio::sync::oneshot::channel::<()>();
let fiber = tokio::task::spawn(Self::main_loop(listener, receiver)); let fiber = tokio::task::spawn(Self::main_loop(listener, receiver, metrics));
Ok(HttpServerActor { terminator, fiber }) Ok(HttpServerActor { terminator, fiber })
} }
@ -55,9 +72,15 @@ impl HttpServerActor {
Ok(()) Ok(())
} }
async fn main_loop(listener: TcpListener, termination: impl Future) -> Result<()> { async fn main_loop(
listener: TcpListener,
termination: impl Future,
registry: Arc<Registry>,
) -> Result<()> {
log::info!("Starting the http server"); log::info!("Starting the http server");
pin!(termination); pin!(termination);
let reqs = IntGauge::with_opts(Opts::new("sockets", "Number of open sockets"))?;
registry.register(Box::new(reqs.clone()))?;
loop { loop {
select! { select! {
@ -67,14 +90,19 @@ impl HttpServerActor {
}, },
result = listener.accept() => { result = listener.accept() => {
let (stream, _) = result?; let (stream, _) = result?;
let c = registry.clone();
let reqs = reqs.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
reqs.inc();
let c = c.clone();
if let Err(err) = http1::Builder::new() if let Err(err) = http1::Builder::new()
.serve_connection(stream, service_fn(route)) .serve_connection(stream, service_fn(move |r| route(c.clone(), r)))
.with_upgrades() .with_upgrades()
.await .await
{ {
tracing::error!("Error serving connection: {:?}", err); tracing::error!("Error serving connection: {:?}", err);
} }
reqs.dec();
}); });
}, },
} }

View File

@ -17,18 +17,13 @@ use tokio_tungstenite::WebSocketStream;
use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt; use futures_util::stream::StreamExt;
async fn handle_connection( async fn handle_connection(ws_stream: WebSocketStream<Upgraded>) {
ws_stream: WebSocketStream<Upgraded>,
) {
tracing::info!("WebSocket connection established"); tracing::info!("WebSocket connection established");
let (mut outgoing, incoming) = ws_stream.split(); let (mut outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each(|msg| { let broadcast_incoming = incoming.try_for_each(|msg| {
tracing::info!( tracing::info!("Received a message: {}", msg.to_text().unwrap());
"Received a message: {}",
msg.to_text().unwrap()
);
async { Ok(()) } async { Ok(()) }
}); });

View File

@ -3,11 +3,13 @@ mod prelude;
mod tcp; mod tcp;
use crate::prelude::*; use crate::prelude::*;
use prometheus::{IntCounter, Opts, Registry};
use tcp::ClientSocketActor; use tcp::ClientSocketActor;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use figment::providers::Format; use figment::providers::Format;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
@ -41,9 +43,13 @@ async fn main() -> Result<()> {
let config = load_config()?; let config = load_config()?;
dbg!(config); dbg!(config);
tracing::info!("Booting up"); tracing::info!("Booting up");
let registry = Arc::new(Registry::new());
let counter = IntCounter::with_opts(Opts::new("actor_count", "Number of alive actors"))?;
registry.register(Box::new(counter.clone()))?;
let listener = TcpListener::bind("127.0.0.1:3721").await?; let listener = TcpListener::bind("127.0.0.1:3721").await?;
let listener_http = TcpListener::bind("127.0.0.1:8080").await?; let listener_http = TcpListener::bind("127.0.0.1:8080").await?;
let http_server_actor = http::HttpServerActor::launch(listener_http).await?; let http_server_actor = http::HttpServerActor::launch(listener_http, registry.clone()).await?;
tracing::info!("Started"); tracing::info!("Started");