From 1bc305962ec52ac9f2e3422005e16892614c7f4e Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Wed, 15 Feb 2023 18:10:54 +0100 Subject: [PATCH] endpoint with a list of rooms --- Cargo.lock | 1 + Cargo.toml | 1 + src/core/player.rs | 7 +++++-- src/core/room.rs | 17 ++++++++++++++++- src/main.rs | 3 ++- src/util/telemetry.rs | 28 +++++++++++++++++++++++----- 6 files changed, 48 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea41c3c..c358b02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -439,6 +439,7 @@ dependencies = [ "regex", "reqwest", "serde", + "serde_json", "tokio", "tokio-tungstenite", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 56ff743..7f7253a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ figment = { version = "0.10.8", features = ["env", "toml"] } # configuration fil hyper = { version = "1.0.0-rc.2", features = ["server", "http1"] } # http server http-body-util = "0.1.0-rc.2" serde = { version = "1.0.152", features = ["rc", "serde_derive"] } +serde_json = "1.0.93" tokio = { version = "1.24.1", features = ["full"] } # async runtime tracing = "0.1.37" # logging & tracing api tracing-subscriber = "0.3.16" diff --git a/src/core/player.rs b/src/core/player.rs index fefd159..2b5300d 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -13,6 +13,7 @@ use std::{ }; use prometheus::{IntGauge, Registry as MetricsRegistry}; +use serde::Serialize; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, task::JoinHandle, @@ -25,7 +26,7 @@ use crate::{ }; /// Opaque player identifier. Cannot contain spaces, must be shorter than 32. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub struct PlayerId(ByteVec); impl PlayerId { pub fn from_bytes(bytes: ByteVec) -> Result { @@ -77,7 +78,9 @@ impl PlayerConnection { } pub async fn terminate(self) { - self.player_handle.send(PlayerCommand::TerminateConnection(self.connection_id)).await; + self.player_handle + .send(PlayerCommand::TerminateConnection(self.connection_id)) + .await; } pub async fn send(&self, command: PlayerCommand) { diff --git a/src/core/room.rs b/src/core/room.rs index 2bc1d6a..02bbb8b 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -6,6 +6,7 @@ use std::{ }; use prometheus::{IntGauge, Registry as MetricRegistry}; +use serde::Serialize; use tokio::sync::RwLock as AsyncRwLock; use crate::{ @@ -14,7 +15,7 @@ use crate::{ }; /// Opaque room id -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub struct RoomId(ByteVec); impl RoomId { pub fn from_bytes(bytes: ByteVec) -> Result { @@ -70,6 +71,19 @@ impl RoomRegistry { let res = inner.rooms.get(room_id); res.map(|r| r.clone()) } + + pub async fn get_all_rooms(&self) -> Vec { + let handles = { + let inner = self.0.read().unwrap(); + let handles = inner.rooms.values().cloned().collect::>(); + handles + }; + let mut res = vec![]; + for i in handles { + res.push(i.get_room_info().await) + } + res + } } struct RoomRegistryInner { @@ -151,6 +165,7 @@ impl Room { } } +#[derive(Serialize)] pub struct RoomInfo { pub id: RoomId, pub members: Vec, diff --git a/src/main.rs b/src/main.rs index 81749d8..acb458a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -42,7 +42,8 @@ async fn main() -> Result<()> { let mut metrics = MetricsRegistry::new(); let rooms = RoomRegistry::empty(&mut metrics)?; let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; - let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone()).await?; + let telemetry_terminator = + util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?; let irc = projections::irc::launch(irc_config, players, rooms.clone(), metrics.clone()).await?; tracing::info!("Started"); diff --git a/src/util/telemetry.rs b/src/util/telemetry.rs index aef5645..0eb3e42 100644 --- a/src/util/telemetry.rs +++ b/src/util/telemetry.rs @@ -12,24 +12,30 @@ use serde::Deserialize; use tokio::net::TcpListener; use tokio::sync::oneshot::channel; +use crate::core::room::RoomRegistry; use crate::prelude::*; use crate::util::http::*; use crate::util::Terminator; type BoxBody = http_body_util::combinators::BoxBody; +type HttpResult = std::result::Result; #[derive(Deserialize, Debug)] pub struct ServerConfig { pub listen_on: SocketAddr, } -pub async fn launch(config: ServerConfig, metrics: MetricsRegistry) -> Result { +pub async fn launch( + config: ServerConfig, + metrics: MetricsRegistry, + rooms: RoomRegistry, +) -> Result { log::info!("Starting the telemetry service"); let listener = TcpListener::bind(config.listen_on).await?; log::debug!("Listener started"); let (signal, rx) = channel(); - let handle = tokio::task::spawn(main_loop(listener, metrics, rx.map(|_| ()))); + let handle = tokio::task::spawn(main_loop(listener, metrics, rooms, rx.map(|_| ()))); let terminator = Terminator::from_raw(signal, handle); Ok(terminator) } @@ -37,6 +43,7 @@ pub async fn launch(config: ServerConfig, metrics: MetricsRegistry) -> Result, ) -> Result<()> { pin!(termination); @@ -47,9 +54,11 @@ async fn main_loop( result = listener.accept() => { let (stream, _) = result?; let metrics = metrics.clone(); + let rooms = rooms.clone(); tokio::task::spawn(async move { let registry = metrics.clone(); - let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), r))); + let rooms = rooms.clone(); + let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), r))); if let Err(err) = server.await { tracing::error!("Error serving connection: {:?}", err); } @@ -63,15 +72,17 @@ async fn main_loop( async fn route( registry: MetricsRegistry, + rooms: RoomRegistry, request: Request, ) -> std::result::Result, Infallible> { match (request.method(), request.uri().path()) { - (&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)), + (&Method::GET, "/metrics") => Ok(endpoint_metrics(registry)?.map(BodyExt::boxed)), + (&Method::GET, "/rooms") => Ok(endpoint_rooms(rooms).await?.map(BodyExt::boxed)), _ => Ok(not_found()?.map(BodyExt::boxed)), } } -fn metrics(registry: MetricsRegistry) -> std::result::Result>, Infallible> { +fn endpoint_metrics(registry: MetricsRegistry) -> HttpResult>> { let mf = registry.gather(); let mut buffer = vec![]; TextEncoder @@ -79,3 +90,10 @@ fn metrics(registry: MetricsRegistry) -> std::result::Result HttpResult>> { + let room_list = rooms.get_all_rooms().await; + let mut buffer = vec![]; + serde_json::to_writer(&mut buffer, &room_list).expect("unexpected fail when writing to vec"); + Ok(Response::new(Full::new(Bytes::from(buffer)))) +}