clean stuff, move metrics endpoint onto a separate port

This commit is contained in:
Nikita Vilunov 2023-02-09 19:19:03 +01:00
parent e0ae11c02d
commit 08fe958d60
10 changed files with 118 additions and 307 deletions

View File

@ -1,5 +1,5 @@
[cluster]
name = "localhost"
servers = ["127.0.0.1:3333"]
[telemetry]
listen_on = "127.0.0.1:8080"
[irc]
listen_on = "127.0.0.1:6667"

View File

@ -1,121 +0,0 @@
use crate::core::player::PlayerRegistry;
use crate::core::room::*;
use crate::prelude::*;
use crate::projections::trivial::handle_request;
use std::convert::Infallible;
use http_body_util::{BodyExt, Full};
use hyper::server::conn::http1;
use hyper::{body::Bytes, service::service_fn, Request, Response};
use hyper::{Method, StatusCode};
use prometheus::{Encoder, IntGauge, Opts, Registry as MetricsRegistry, TextEncoder};
use tokio::net::TcpListener;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>;
async fn hello(
_: Request<hyper::body::Incoming>,
) -> std::result::Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}
fn not_found() -> std::result::Result<Response<Full<Bytes>>, Infallible> {
let mut response = Response::new(Full::new(Bytes::from("404")));
*response.status_mut() = StatusCode::NOT_FOUND;
Ok(response)
}
fn metrics(registry: MetricsRegistry) -> std::result::Result<Response<Full<Bytes>>, Infallible> {
let mf = registry.gather();
let mut buffer = vec![];
let encoder = TextEncoder::new();
encoder
.encode(&mf, &mut buffer)
.expect("write to vec cannot fail");
Ok(Response::new(Full::new(Bytes::from(buffer))))
}
async fn route(
registry: MetricsRegistry,
chats: PlayerRegistry,
request: Request<hyper::body::Incoming>,
) -> std::result::Result<Response<BoxBody>, Infallible> {
match (request.method(), request.uri().path()) {
(&Method::GET, "/hello") => Ok(hello(request).await?.map(BodyExt::boxed)),
(&Method::GET, "/socket") => Ok(handle_request(request, chats).await?.map(BodyExt::boxed)),
(&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)),
_ => Ok(not_found()?.map(BodyExt::boxed)),
}
}
pub struct HttpServerActor {
terminator: Sender<()>,
fiber: JoinHandle<Result<()>>,
}
impl HttpServerActor {
pub async fn launch(
listener: TcpListener,
metrics: MetricsRegistry,
rooms: RoomRegistry,
players: PlayerRegistry,
) -> Result<HttpServerActor> {
let (terminator, receiver) = tokio::sync::oneshot::channel::<()>();
let fiber =
tokio::task::spawn(Self::main_loop(listener, receiver, metrics, rooms, players));
Ok(HttpServerActor { terminator, fiber })
}
pub async fn terminate(self) -> Result<()> {
match self.terminator.send(()) {
Ok(_) => {}
Err(_) => failure("wat")?,
}
self.fiber.await??;
Ok(())
}
async fn main_loop(
listener: TcpListener,
termination: impl Future,
registry: MetricsRegistry,
rooms: RoomRegistry,
players: PlayerRegistry,
) -> Result<()> {
log::info!("Starting the http server");
pin!(termination);
let reqs = IntGauge::with_opts(Opts::new("sockets", "Number of open sockets"))?;
registry.register(Box::new(reqs.clone()))?;
loop {
select! {
_ = &mut termination => {
log::info!("Terminating the http server");
return Ok(())
},
result = listener.accept() => {
let (stream, _) = result?;
let registry = registry.clone();
let players = players.clone();
let reqs = reqs.clone();
tokio::task::spawn(async move {
reqs.inc();
let registry = registry.clone();
if let Err(err) = http1::Builder::new()
.serve_connection(stream, service_fn(move |r| route(registry.clone(), players.clone(), r)))
.with_upgrades()
.await
{
tracing::error!("Error serving connection: {:?}", err);
}
reqs.dec();
});
},
}
}
}
}

View File

@ -1,38 +1,24 @@
mod core;
mod http;
mod prelude;
mod projections;
mod protos;
mod tcp;
mod util;
use std::future::Future;
use figment::providers::Format;
use figment::{providers::Toml, Figment};
use prometheus::Registry as MetricsRegistry;
use serde::Deserialize;
use crate::core::player::PlayerRegistry;
use crate::core::room::RoomRegistry;
use crate::prelude::*;
use prometheus::{IntCounter, Opts, Registry};
use tcp::ClientSocketActor;
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
use figment::providers::Format;
use tokio::net::{TcpListener, TcpStream};
use figment::{providers::Toml, Figment};
use serde::Deserialize;
use tokio::io::BufWriter;
use tokio::sync::mpsc::Sender;
#[derive(Deserialize, Debug)]
struct ServerConfig {
cluster: ClusterConfig,
}
#[derive(Deserialize, Debug)]
struct ClusterConfig {
name: String,
servers: Vec<SocketAddr>,
telemetry: util::telemetry::ServerConfig,
irc: projections::irc::ServerConfig,
}
fn load_config() -> Result<ServerConfig> {
@ -46,37 +32,21 @@ async fn main() -> Result<()> {
set_up_logging()?;
let sleep = ctrl_c()?;
let config = load_config()?;
dbg!(config);
tracing::info!("Booting up");
let registry = Registry::new();
let counter = IntCounter::with_opts(Opts::new("actor_count", "Number of alive actors"))?;
registry.register(Box::new(counter.clone()))?;
tracing::info!("Booting up");
tracing::info!("Loaded config: {config:?}");
let registry = MetricsRegistry::new();
let rooms = RoomRegistry::empty();
let players = PlayerRegistry::empty(rooms.clone());
let listener = TcpListener::bind("127.0.0.1:3721").await?;
let listener_http = TcpListener::bind("127.0.0.1:8080").await?;
let http_server_actor = http::HttpServerActor::launch(
listener_http,
registry.clone(),
rooms.clone(),
players.clone(),
)
.await?;
let irc_config = projections::irc::ServerConfig {
listen_on: "127.0.0.1:6667".parse()?,
};
let irc = projections::irc::launch(irc_config, players).await?;
let telemetry_terminator = util::telemetry::launch(&config.telemetry, registry.clone()).await?;
let irc = projections::irc::launch(&config.irc, players).await?;
tracing::info!("Started");
run(listener, sleep).await?;
sleep.await;
// sleep.await;
tracing::info!("Begin shutdown");
irc.terminate().await?;
http_server_actor.terminate().await?;
telemetry_terminator.terminate().await?;
tracing::info!("Shutdown complete");
Ok(())
}
@ -94,51 +64,3 @@ fn set_up_logging() -> Result<()> {
tracing_subscriber::fmt::init();
Ok(())
}
async fn run(listener: TcpListener, shutdown: impl Future<Output = ()>) -> Result<()> {
tokio::pin!(shutdown);
let mut counter: u32 = 0;
let mut conns: HashMap<u32, _> = HashMap::new();
let (sender, mut chan) = tokio::sync::mpsc::channel(32);
loop {
let accept = listener.accept();
tokio::select! {
id = chan.recv() => {
match id {
Some(id) => {
conns.remove(&id);
}
None => {}
}
},
result = accept => {
let (connect, address) = result?;
let id = counter;
counter += 1;
let fiber = handle_connection(connect, address, id, sender.clone()).await?;
conns.insert(id, fiber);
},
_ = &mut shutdown => break,
}
}
for (_, i) in conns {
dbg!(i.terminate().await?);
}
Ok(())
}
async fn handle_connection(
connect: TcpStream,
address: SocketAddr,
id: u32,
updater: Sender<u32>,
) -> Result<ClientSocketActor> {
tracing::info!("Incoming socket #{} from {}", id, address);
let writer = BufWriter::new(connect);
Ok(ClientSocketActor::launch(writer, updater, id)?)
}

View File

@ -1,3 +1,5 @@
pub use std::future::Future;
pub use tokio::pin;
pub use tokio::select;
pub use tokio::task::JoinHandle;
@ -7,9 +9,3 @@ pub mod log {
}
pub type Result<T> = std::result::Result<T, anyhow::Error>;
pub fn failure(explain: &str) -> Result<()> {
panic!()
}
pub use std::future::Future;

View File

@ -1,5 +1,6 @@
use std::net::SocketAddr;
use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot::channel;
@ -9,6 +10,7 @@ use crate::prelude::*;
use crate::protos::irc::*;
use crate::util::Terminator;
#[derive(Deserialize, Debug)]
pub struct ServerConfig {
pub listen_on: SocketAddr,
}
@ -53,7 +55,7 @@ async fn handle_socket(
}
}
pub async fn launch(config: ServerConfig, players: PlayerRegistry) -> Result<Terminator> {
pub async fn launch(config: &ServerConfig, players: PlayerRegistry) -> Result<Terminator> {
log::info!("Starting IRC projection");
let (signal, mut rx) = channel();
let listener = TcpListener::bind(config.listen_on).await?;

View File

@ -1,78 +0,0 @@
use std::time::Duration;
use crate::prelude::*;
use tokio::{
io::{AsyncWriteExt, BufWriter},
net::TcpStream,
sync::mpsc::{Receiver, Sender},
};
pub struct ClientSocketActor {
sender: Sender<ClientSocketActorMessage>,
fiber: JoinHandle<Result<ClientSocketActorTermination>>,
}
#[derive(Debug, Clone)]
enum ClientSocketActorMessage {
Terminate,
}
#[derive(Debug, Clone)]
enum ClientSocketActorTermination {
ServerClosed,
}
impl ClientSocketActor {
pub async fn terminate(self) -> Result<()> {
self.sender
.send(ClientSocketActorMessage::Terminate)
.await?;
self.fiber.await??;
Ok(())
}
pub fn launch(
writer: BufWriter<TcpStream>,
updater: Sender<u32>,
id: u32,
) -> Result<ClientSocketActor> {
let (sender, chan) = tokio::sync::mpsc::channel(32);
let fiber: JoinHandle<Result<ClientSocketActorTermination>> =
tokio::spawn(ClientSocketActor::handle_connect(writer, chan, updater, id));
Ok(ClientSocketActor { sender, fiber })
}
async fn handle_connect(
mut writer: BufWriter<TcpStream>,
mut messagebox: Receiver<ClientSocketActorMessage>,
updater: Sender<u32>,
id: u32,
) -> Result<ClientSocketActorTermination> {
async fn handle(
messagebox: &mut Receiver<ClientSocketActorMessage>,
writer: &mut BufWriter<TcpStream>,
) -> Result<Option<ClientSocketActorTermination>> {
writer.write_all("privet\n".as_bytes()).await?;
writer.flush().await?;
tracing::info!("Wrote");
tokio::select! {
_ = messagebox.recv() => return Ok(Some(ClientSocketActorTermination::ServerClosed)),
_ = tokio::time::sleep(Duration::from_millis(200)) => (),
}
Ok(None)
}
loop {
match handle(&mut messagebox, &mut writer).await {
Ok(None) => {}
Ok(Some(termination)) => return Ok(termination),
Err(err) => {
tracing::error!("{}", err);
updater.send(id).await?;
return Err(err);
}
}
}
}
}

View File

@ -1,3 +0,0 @@
mod client;
pub use client::ClientSocketActor;

10
src/util/http.rs Normal file
View File

@ -0,0 +1,10 @@
use std::convert::Infallible;
use http_body_util::Full;
use hyper::{body::Bytes, Response, StatusCode};
pub fn not_found() -> std::result::Result<Response<Full<Bytes>>, Infallible> {
let mut response = Response::new(Full::new(Bytes::from("404")));
*response.status_mut() = StatusCode::NOT_FOUND;
Ok(response)
}

View File

@ -3,7 +3,9 @@ use tokio::sync::oneshot::{channel, Sender};
use crate::prelude::*;
pub mod http;
pub mod table;
pub mod telemetry;
pub struct Terminator {
signal: Sender<()>,

81
src/util/telemetry.rs Normal file
View File

@ -0,0 +1,81 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use futures_util::FutureExt;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response};
use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder};
use serde::Deserialize;
use tokio::net::TcpListener;
use tokio::sync::oneshot::channel;
use crate::prelude::*;
use crate::util::http::*;
use crate::util::Terminator;
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>;
#[derive(Deserialize, Debug)]
pub struct ServerConfig {
pub listen_on: SocketAddr,
}
pub async fn launch(config: &ServerConfig, metrics: MetricsRegistry) -> Result<Terminator> {
log::info!("Starting the telemetry service");
let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started");
let (signal, rx) = channel();
let handle = tokio::task::spawn(main_loop(listener, metrics, rx.map(|_| ())));
let terminator = Terminator::from_raw(signal, handle);
Ok(terminator)
}
async fn main_loop(
listener: TcpListener,
metrics: MetricsRegistry,
termination: impl Future<Output = ()>,
) -> Result<()> {
pin!(termination);
loop {
select! {
biased;
_ = &mut termination => break,
result = listener.accept() => {
let (stream, _) = result?;
let metrics = metrics.clone();
tokio::task::spawn(async move {
let registry = metrics.clone();
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), r)));
if let Err(err) = server.await {
tracing::error!("Error serving connection: {:?}", err);
}
});
},
}
}
log::info!("Terminating the telemetry service");
Ok(())
}
async fn route(
registry: MetricsRegistry,
request: Request<hyper::body::Incoming>,
) -> std::result::Result<Response<BoxBody>, Infallible> {
match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)),
_ => Ok(not_found()?.map(BodyExt::boxed)),
}
}
fn metrics(registry: MetricsRegistry) -> std::result::Result<Response<Full<Bytes>>, Infallible> {
let mf = registry.gather();
let mut buffer = vec![];
TextEncoder
.encode(&mf, &mut buffer)
.expect("write to vec cannot fail");
Ok(Response::new(Full::new(Bytes::from(buffer))))
}