forked from lavina/lavina
1
0
Fork 0

endpoint with a list of rooms

This commit is contained in:
Nikita Vilunov 2023-02-15 18:10:54 +01:00
parent a03a3a11a3
commit 1bc305962e
6 changed files with 48 additions and 9 deletions

1
Cargo.lock generated
View File

@ -439,6 +439,7 @@ dependencies = [
"regex", "regex",
"reqwest", "reqwest",
"serde", "serde",
"serde_json",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
"tracing", "tracing",

View File

@ -14,6 +14,7 @@ figment = { version = "0.10.8", features = ["env", "toml"] } # configuration fil
hyper = { version = "1.0.0-rc.2", features = ["server", "http1"] } # http server hyper = { version = "1.0.0-rc.2", features = ["server", "http1"] } # http server
http-body-util = "0.1.0-rc.2" http-body-util = "0.1.0-rc.2"
serde = { version = "1.0.152", features = ["rc", "serde_derive"] } serde = { version = "1.0.152", features = ["rc", "serde_derive"] }
serde_json = "1.0.93"
tokio = { version = "1.24.1", features = ["full"] } # async runtime tokio = { version = "1.24.1", features = ["full"] } # async runtime
tracing = "0.1.37" # logging & tracing api tracing = "0.1.37" # logging & tracing api
tracing-subscriber = "0.3.16" tracing-subscriber = "0.3.16"

View File

@ -13,6 +13,7 @@ use std::{
}; };
use prometheus::{IntGauge, Registry as MetricsRegistry}; use prometheus::{IntGauge, Registry as MetricsRegistry};
use serde::Serialize;
use tokio::{ use tokio::{
sync::mpsc::{channel, Receiver, Sender}, sync::mpsc::{channel, Receiver, Sender},
task::JoinHandle, task::JoinHandle,
@ -25,7 +26,7 @@ use crate::{
}; };
/// Opaque player identifier. Cannot contain spaces, must be shorter than 32. /// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct PlayerId(ByteVec); pub struct PlayerId(ByteVec);
impl PlayerId { impl PlayerId {
pub fn from_bytes(bytes: ByteVec) -> Result<PlayerId> { pub fn from_bytes(bytes: ByteVec) -> Result<PlayerId> {
@ -77,7 +78,9 @@ impl PlayerConnection {
} }
pub async fn terminate(self) { pub async fn terminate(self) {
self.player_handle.send(PlayerCommand::TerminateConnection(self.connection_id)).await; self.player_handle
.send(PlayerCommand::TerminateConnection(self.connection_id))
.await;
} }
pub async fn send(&self, command: PlayerCommand) { pub async fn send(&self, command: PlayerCommand) {

View File

@ -6,6 +6,7 @@ use std::{
}; };
use prometheus::{IntGauge, Registry as MetricRegistry}; use prometheus::{IntGauge, Registry as MetricRegistry};
use serde::Serialize;
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
use crate::{ use crate::{
@ -14,7 +15,7 @@ use crate::{
}; };
/// Opaque room id /// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct RoomId(ByteVec); pub struct RoomId(ByteVec);
impl RoomId { impl RoomId {
pub fn from_bytes(bytes: ByteVec) -> Result<RoomId> { pub fn from_bytes(bytes: ByteVec) -> Result<RoomId> {
@ -70,6 +71,19 @@ impl RoomRegistry {
let res = inner.rooms.get(room_id); let res = inner.rooms.get(room_id);
res.map(|r| r.clone()) res.map(|r| r.clone())
} }
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
let handles = {
let inner = self.0.read().unwrap();
let handles = inner.rooms.values().cloned().collect::<Vec<_>>();
handles
};
let mut res = vec![];
for i in handles {
res.push(i.get_room_info().await)
}
res
}
} }
struct RoomRegistryInner { struct RoomRegistryInner {
@ -151,6 +165,7 @@ impl Room {
} }
} }
#[derive(Serialize)]
pub struct RoomInfo { pub struct RoomInfo {
pub id: RoomId, pub id: RoomId,
pub members: Vec<PlayerId>, pub members: Vec<PlayerId>,

View File

@ -42,7 +42,8 @@ async fn main() -> Result<()> {
let mut metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let rooms = RoomRegistry::empty(&mut metrics)?; let rooms = RoomRegistry::empty(&mut metrics)?;
let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?;
let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone()).await?; let telemetry_terminator =
util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?;
let irc = projections::irc::launch(irc_config, players, rooms.clone(), metrics.clone()).await?; let irc = projections::irc::launch(irc_config, players, rooms.clone(), metrics.clone()).await?;
tracing::info!("Started"); tracing::info!("Started");

View File

@ -12,24 +12,30 @@ use serde::Deserialize;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::oneshot::channel; use tokio::sync::oneshot::channel;
use crate::core::room::RoomRegistry;
use crate::prelude::*; use crate::prelude::*;
use crate::util::http::*; use crate::util::http::*;
use crate::util::Terminator; use crate::util::Terminator;
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>; type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>;
type HttpResult<T> = std::result::Result<T, Infallible>;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
pub struct ServerConfig { pub struct ServerConfig {
pub listen_on: SocketAddr, pub listen_on: SocketAddr,
} }
pub async fn launch(config: ServerConfig, metrics: MetricsRegistry) -> Result<Terminator> { pub async fn launch(
config: ServerConfig,
metrics: MetricsRegistry,
rooms: RoomRegistry,
) -> Result<Terminator> {
log::info!("Starting the telemetry service"); log::info!("Starting the telemetry service");
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started"); log::debug!("Listener started");
let (signal, rx) = channel(); let (signal, rx) = channel();
let handle = tokio::task::spawn(main_loop(listener, metrics, rx.map(|_| ()))); let handle = tokio::task::spawn(main_loop(listener, metrics, rooms, rx.map(|_| ())));
let terminator = Terminator::from_raw(signal, handle); let terminator = Terminator::from_raw(signal, handle);
Ok(terminator) Ok(terminator)
} }
@ -37,6 +43,7 @@ pub async fn launch(config: ServerConfig, metrics: MetricsRegistry) -> Result<Te
async fn main_loop( async fn main_loop(
listener: TcpListener, listener: TcpListener,
metrics: MetricsRegistry, metrics: MetricsRegistry,
rooms: RoomRegistry,
termination: impl Future<Output = ()>, termination: impl Future<Output = ()>,
) -> Result<()> { ) -> Result<()> {
pin!(termination); pin!(termination);
@ -47,9 +54,11 @@ async fn main_loop(
result = listener.accept() => { result = listener.accept() => {
let (stream, _) = result?; let (stream, _) = result?;
let metrics = metrics.clone(); let metrics = metrics.clone();
let rooms = rooms.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
let registry = metrics.clone(); let registry = metrics.clone();
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), r))); let rooms = rooms.clone();
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), r)));
if let Err(err) = server.await { if let Err(err) = server.await {
tracing::error!("Error serving connection: {:?}", err); tracing::error!("Error serving connection: {:?}", err);
} }
@ -63,15 +72,17 @@ async fn main_loop(
async fn route( async fn route(
registry: MetricsRegistry, registry: MetricsRegistry,
rooms: RoomRegistry,
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
) -> std::result::Result<Response<BoxBody>, Infallible> { ) -> std::result::Result<Response<BoxBody>, Infallible> {
match (request.method(), request.uri().path()) { match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)), (&Method::GET, "/metrics") => Ok(endpoint_metrics(registry)?.map(BodyExt::boxed)),
(&Method::GET, "/rooms") => Ok(endpoint_rooms(rooms).await?.map(BodyExt::boxed)),
_ => Ok(not_found()?.map(BodyExt::boxed)), _ => Ok(not_found()?.map(BodyExt::boxed)),
} }
} }
fn metrics(registry: MetricsRegistry) -> std::result::Result<Response<Full<Bytes>>, Infallible> { fn endpoint_metrics(registry: MetricsRegistry) -> HttpResult<Response<Full<Bytes>>> {
let mf = registry.gather(); let mf = registry.gather();
let mut buffer = vec![]; let mut buffer = vec![];
TextEncoder TextEncoder
@ -79,3 +90,10 @@ fn metrics(registry: MetricsRegistry) -> std::result::Result<Response<Full<Bytes
.expect("write to vec cannot fail"); .expect("write to vec cannot fail");
Ok(Response::new(Full::new(Bytes::from(buffer)))) Ok(Response::new(Full::new(Bytes::from(buffer))))
} }
async fn endpoint_rooms(rooms: RoomRegistry) -> HttpResult<Response<Full<Bytes>>> {
let room_list = rooms.get_all_rooms().await;
let mut buffer = vec![];
serde_json::to_writer(&mut buffer, &room_list).expect("unexpected fail when writing to vec");
Ok(Response::new(Full::new(Bytes::from(buffer))))
}