From c18f152e2543dbf898951286e8f2cb901247a3a2 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Thu, 9 Feb 2023 20:01:21 +0100 Subject: [PATCH] add metrics to irc projection --- src/main.rs | 6 +++--- src/projections/irc.rs | 26 ++++++++++++++++++++++---- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1a2356a..4dcc805 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,11 +35,11 @@ async fn main() -> Result<()> { tracing::info!("Booting up"); tracing::info!("Loaded config: {config:?}"); - let registry = MetricsRegistry::new(); + let metrics = MetricsRegistry::new(); let rooms = RoomRegistry::empty(); let players = PlayerRegistry::empty(rooms.clone()); - let telemetry_terminator = util::telemetry::launch(&config.telemetry, registry.clone()).await?; - let irc = projections::irc::launch(&config.irc, players).await?; + let telemetry_terminator = util::telemetry::launch(&config.telemetry, metrics.clone()).await?; + let irc = projections::irc::launch(&config.irc, players, metrics.clone()).await?; tracing::info!("Started"); sleep.await; diff --git a/src/projections/irc.rs b/src/projections/irc.rs index ee1eb98..6603624 100644 --- a/src/projections/irc.rs +++ b/src/projections/irc.rs @@ -1,11 +1,12 @@ use std::net::SocketAddr; +use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use serde::Deserialize; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, BufReader, BufWriter}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot::channel; -use crate::core::player::{PlayerRegistry, Updates}; +use crate::core::player::PlayerRegistry; use crate::prelude::*; use crate::protos::irc::*; use crate::util::Terminator; @@ -19,6 +20,7 @@ async fn handle_socket( mut stream: TcpStream, socket_addr: SocketAddr, mut players: PlayerRegistry, + current_connections: IntGauge, ) { let (reader, writer) = stream.split(); let mut reader = BufReader::new(reader); @@ -53,11 +55,25 @@ async fn handle_socket( }, } } + current_connections.dec(); } -pub async fn launch(config: &ServerConfig, players: PlayerRegistry) -> Result { +pub async fn launch( + config: &ServerConfig, + players: PlayerRegistry, + metrics: MetricsRegistry, +) -> Result { log::info!("Starting IRC projection"); let (signal, mut rx) = channel(); + let current_connections = + IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; + let total_connections = IntCounter::new( + "irc_total_connections", + "Total number of opened connections", + )?; + metrics.register(Box::new(current_connections.clone()))?; + metrics.register(Box::new(total_connections.clone()))?; + let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); @@ -69,8 +85,10 @@ pub async fn launch(config: &ServerConfig, players: PlayerRegistry) -> Result { match new_conn { Ok((stream, socket_addr)) => { + total_connections.inc(); + current_connections.inc(); log::debug!("Incoming connection from {socket_addr}"); - let handle = tokio::task::spawn(handle_socket(stream, socket_addr, players.clone())); + let handle = tokio::task::spawn(handle_socket(stream, socket_addr, players.clone(), current_connections.clone())); }, Err(err) => log::warn!("Failed to accept new connection: {err}"), }