add metrics to irc projection

This commit is contained in:
Nikita Vilunov 2023-02-09 20:01:21 +01:00
parent 7a988f39b5
commit c18f152e25
2 changed files with 25 additions and 7 deletions

View File

@ -35,11 +35,11 @@ async fn main() -> Result<()> {
tracing::info!("Booting up"); tracing::info!("Booting up");
tracing::info!("Loaded config: {config:?}"); tracing::info!("Loaded config: {config:?}");
let registry = MetricsRegistry::new(); let metrics = MetricsRegistry::new();
let rooms = RoomRegistry::empty(); let rooms = RoomRegistry::empty();
let players = PlayerRegistry::empty(rooms.clone()); let players = PlayerRegistry::empty(rooms.clone());
let telemetry_terminator = util::telemetry::launch(&config.telemetry, registry.clone()).await?; let telemetry_terminator = util::telemetry::launch(&config.telemetry, metrics.clone()).await?;
let irc = projections::irc::launch(&config.irc, players).await?; let irc = projections::irc::launch(&config.irc, players, metrics.clone()).await?;
tracing::info!("Started"); tracing::info!("Started");
sleep.await; sleep.await;

View File

@ -1,11 +1,12 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry};
use serde::Deserialize; use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::io::{AsyncBufReadExt, BufReader, BufWriter};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot::channel; use tokio::sync::oneshot::channel;
use crate::core::player::{PlayerRegistry, Updates}; use crate::core::player::PlayerRegistry;
use crate::prelude::*; use crate::prelude::*;
use crate::protos::irc::*; use crate::protos::irc::*;
use crate::util::Terminator; use crate::util::Terminator;
@ -19,6 +20,7 @@ async fn handle_socket(
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: SocketAddr, socket_addr: SocketAddr,
mut players: PlayerRegistry, mut players: PlayerRegistry,
current_connections: IntGauge,
) { ) {
let (reader, writer) = stream.split(); let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader); let mut reader = BufReader::new(reader);
@ -53,11 +55,25 @@ async fn handle_socket(
}, },
} }
} }
current_connections.dec();
} }
pub async fn launch(config: &ServerConfig, players: PlayerRegistry) -> Result<Terminator> { pub async fn launch(
config: &ServerConfig,
players: PlayerRegistry,
metrics: MetricsRegistry,
) -> Result<Terminator> {
log::info!("Starting IRC projection"); log::info!("Starting IRC projection");
let (signal, mut rx) = channel(); let (signal, mut rx) = channel();
let current_connections =
IntGauge::new("irc_current_connections", "Open and alive TCP connections")?;
let total_connections = IntCounter::new(
"irc_total_connections",
"Total number of opened connections",
)?;
metrics.register(Box::new(current_connections.clone()))?;
metrics.register(Box::new(total_connections.clone()))?;
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started"); log::debug!("Listener started");
@ -69,8 +85,10 @@ pub async fn launch(config: &ServerConfig, players: PlayerRegistry) -> Result<Te
new_conn = listener.accept() => { new_conn = listener.accept() => {
match new_conn { match new_conn {
Ok((stream, socket_addr)) => { Ok((stream, socket_addr)) => {
total_connections.inc();
current_connections.inc();
log::debug!("Incoming connection from {socket_addr}"); log::debug!("Incoming connection from {socket_addr}");
let handle = tokio::task::spawn(handle_socket(stream, socket_addr, players.clone())); let handle = tokio::task::spawn(handle_socket(stream, socket_addr, players.clone(), current_connections.clone()));
}, },
Err(err) => log::warn!("Failed to accept new connection: {err}"), Err(err) => log::warn!("Failed to accept new connection: {err}"),
} }