diff --git a/config.toml b/config.toml index 81dd385..924c702 100644 --- a/config.toml +++ b/config.toml @@ -1,5 +1,5 @@ -[cluster] -name = "localhost" -servers = ["127.0.0.1:3333"] - +[telemetry] +listen_on = "127.0.0.1:8080" +[irc] +listen_on = "127.0.0.1:6667" diff --git a/src/http.rs b/src/http.rs deleted file mode 100644 index d38b14a..0000000 --- a/src/http.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::core::player::PlayerRegistry; -use crate::core::room::*; -use crate::prelude::*; -use crate::projections::trivial::handle_request; - -use std::convert::Infallible; - -use http_body_util::{BodyExt, Full}; -use hyper::server::conn::http1; -use hyper::{body::Bytes, service::service_fn, Request, Response}; -use hyper::{Method, StatusCode}; - -use prometheus::{Encoder, IntGauge, Opts, Registry as MetricsRegistry, TextEncoder}; -use tokio::net::TcpListener; -use tokio::sync::oneshot::Sender; -use tokio::task::JoinHandle; - -type BoxBody = http_body_util::combinators::BoxBody; - -async fn hello( - _: Request, -) -> std::result::Result>, Infallible> { - Ok(Response::new(Full::new(Bytes::from("Hello World!")))) -} - -fn not_found() -> std::result::Result>, Infallible> { - let mut response = Response::new(Full::new(Bytes::from("404"))); - *response.status_mut() = StatusCode::NOT_FOUND; - Ok(response) -} - -fn metrics(registry: MetricsRegistry) -> std::result::Result>, Infallible> { - let mf = registry.gather(); - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - encoder - .encode(&mf, &mut buffer) - .expect("write to vec cannot fail"); - - Ok(Response::new(Full::new(Bytes::from(buffer)))) -} - -async fn route( - registry: MetricsRegistry, - chats: PlayerRegistry, - request: Request, -) -> std::result::Result, Infallible> { - match (request.method(), request.uri().path()) { - (&Method::GET, "/hello") => Ok(hello(request).await?.map(BodyExt::boxed)), - (&Method::GET, "/socket") => Ok(handle_request(request, chats).await?.map(BodyExt::boxed)), - (&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)), - _ => Ok(not_found()?.map(BodyExt::boxed)), - } -} - -pub struct HttpServerActor { - terminator: Sender<()>, - fiber: JoinHandle>, -} -impl HttpServerActor { - pub async fn launch( - listener: TcpListener, - metrics: MetricsRegistry, - rooms: RoomRegistry, - players: PlayerRegistry, - ) -> Result { - let (terminator, receiver) = tokio::sync::oneshot::channel::<()>(); - let fiber = - tokio::task::spawn(Self::main_loop(listener, receiver, metrics, rooms, players)); - Ok(HttpServerActor { terminator, fiber }) - } - - pub async fn terminate(self) -> Result<()> { - match self.terminator.send(()) { - Ok(_) => {} - Err(_) => failure("wat")?, - } - self.fiber.await??; - Ok(()) - } - - async fn main_loop( - listener: TcpListener, - termination: impl Future, - registry: MetricsRegistry, - rooms: RoomRegistry, - players: PlayerRegistry, - ) -> Result<()> { - log::info!("Starting the http server"); - pin!(termination); - let reqs = IntGauge::with_opts(Opts::new("sockets", "Number of open sockets"))?; - registry.register(Box::new(reqs.clone()))?; - - loop { - select! { - _ = &mut termination => { - log::info!("Terminating the http server"); - return Ok(()) - }, - result = listener.accept() => { - let (stream, _) = result?; - let registry = registry.clone(); - let players = players.clone(); - let reqs = reqs.clone(); - tokio::task::spawn(async move { - reqs.inc(); - let registry = registry.clone(); - if let Err(err) = http1::Builder::new() - .serve_connection(stream, service_fn(move |r| route(registry.clone(), players.clone(), r))) - .with_upgrades() - .await - { - tracing::error!("Error serving connection: {:?}", err); - } - reqs.dec(); - }); - }, - } - } - } -} diff --git a/src/main.rs b/src/main.rs index 870f2ae..1a2356a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,38 +1,24 @@ mod core; -mod http; mod prelude; mod projections; mod protos; -mod tcp; mod util; +use std::future::Future; + +use figment::providers::Format; +use figment::{providers::Toml, Figment}; +use prometheus::Registry as MetricsRegistry; +use serde::Deserialize; + use crate::core::player::PlayerRegistry; use crate::core::room::RoomRegistry; use crate::prelude::*; -use prometheus::{IntCounter, Opts, Registry}; -use tcp::ClientSocketActor; - -use std::collections::HashMap; -use std::future::Future; -use std::net::SocketAddr; - -use figment::providers::Format; -use tokio::net::{TcpListener, TcpStream}; - -use figment::{providers::Toml, Figment}; -use serde::Deserialize; -use tokio::io::BufWriter; -use tokio::sync::mpsc::Sender; #[derive(Deserialize, Debug)] struct ServerConfig { - cluster: ClusterConfig, -} - -#[derive(Deserialize, Debug)] -struct ClusterConfig { - name: String, - servers: Vec, + telemetry: util::telemetry::ServerConfig, + irc: projections::irc::ServerConfig, } fn load_config() -> Result { @@ -46,37 +32,21 @@ async fn main() -> Result<()> { set_up_logging()?; let sleep = ctrl_c()?; let config = load_config()?; - dbg!(config); - tracing::info!("Booting up"); - let registry = Registry::new(); - let counter = IntCounter::with_opts(Opts::new("actor_count", "Number of alive actors"))?; - registry.register(Box::new(counter.clone()))?; + tracing::info!("Booting up"); + tracing::info!("Loaded config: {config:?}"); + let registry = MetricsRegistry::new(); let rooms = RoomRegistry::empty(); let players = PlayerRegistry::empty(rooms.clone()); - - let listener = TcpListener::bind("127.0.0.1:3721").await?; - let listener_http = TcpListener::bind("127.0.0.1:8080").await?; - let http_server_actor = http::HttpServerActor::launch( - listener_http, - registry.clone(), - rooms.clone(), - players.clone(), - ) - .await?; - let irc_config = projections::irc::ServerConfig { - listen_on: "127.0.0.1:6667".parse()?, - }; - let irc = projections::irc::launch(irc_config, players).await?; - + let telemetry_terminator = util::telemetry::launch(&config.telemetry, registry.clone()).await?; + let irc = projections::irc::launch(&config.irc, players).await?; tracing::info!("Started"); - run(listener, sleep).await?; + sleep.await; - // sleep.await; tracing::info!("Begin shutdown"); irc.terminate().await?; - http_server_actor.terminate().await?; + telemetry_terminator.terminate().await?; tracing::info!("Shutdown complete"); Ok(()) } @@ -94,51 +64,3 @@ fn set_up_logging() -> Result<()> { tracing_subscriber::fmt::init(); Ok(()) } - -async fn run(listener: TcpListener, shutdown: impl Future) -> Result<()> { - tokio::pin!(shutdown); - - let mut counter: u32 = 0; - let mut conns: HashMap = HashMap::new(); - - let (sender, mut chan) = tokio::sync::mpsc::channel(32); - - loop { - let accept = listener.accept(); - tokio::select! { - id = chan.recv() => { - match id { - Some(id) => { - conns.remove(&id); - } - None => {} - } - }, - result = accept => { - let (connect, address) = result?; - let id = counter; - counter += 1; - let fiber = handle_connection(connect, address, id, sender.clone()).await?; - conns.insert(id, fiber); - }, - _ = &mut shutdown => break, - } - } - - for (_, i) in conns { - dbg!(i.terminate().await?); - } - - Ok(()) -} - -async fn handle_connection( - connect: TcpStream, - address: SocketAddr, - id: u32, - updater: Sender, -) -> Result { - tracing::info!("Incoming socket #{} from {}", id, address); - let writer = BufWriter::new(connect); - Ok(ClientSocketActor::launch(writer, updater, id)?) -} diff --git a/src/prelude.rs b/src/prelude.rs index 518f64b..fe8b582 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,3 +1,5 @@ +pub use std::future::Future; + pub use tokio::pin; pub use tokio::select; pub use tokio::task::JoinHandle; @@ -7,9 +9,3 @@ pub mod log { } pub type Result = std::result::Result; - -pub fn failure(explain: &str) -> Result<()> { - panic!() -} - -pub use std::future::Future; diff --git a/src/projections/irc.rs b/src/projections/irc.rs index 80f9f1d..ee1eb98 100644 --- a/src/projections/irc.rs +++ b/src/projections/irc.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; +use serde::Deserialize; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot::channel; @@ -9,6 +10,7 @@ use crate::prelude::*; use crate::protos::irc::*; use crate::util::Terminator; +#[derive(Deserialize, Debug)] pub struct ServerConfig { pub listen_on: SocketAddr, } @@ -53,7 +55,7 @@ async fn handle_socket( } } -pub async fn launch(config: ServerConfig, players: PlayerRegistry) -> Result { +pub async fn launch(config: &ServerConfig, players: PlayerRegistry) -> Result { log::info!("Starting IRC projection"); let (signal, mut rx) = channel(); let listener = TcpListener::bind(config.listen_on).await?; diff --git a/src/tcp/client.rs b/src/tcp/client.rs deleted file mode 100644 index 94157d8..0000000 --- a/src/tcp/client.rs +++ /dev/null @@ -1,78 +0,0 @@ -use std::time::Duration; - -use crate::prelude::*; - -use tokio::{ - io::{AsyncWriteExt, BufWriter}, - net::TcpStream, - sync::mpsc::{Receiver, Sender}, -}; - -pub struct ClientSocketActor { - sender: Sender, - fiber: JoinHandle>, -} - -#[derive(Debug, Clone)] -enum ClientSocketActorMessage { - Terminate, -} - -#[derive(Debug, Clone)] -enum ClientSocketActorTermination { - ServerClosed, -} - -impl ClientSocketActor { - pub async fn terminate(self) -> Result<()> { - self.sender - .send(ClientSocketActorMessage::Terminate) - .await?; - self.fiber.await??; - Ok(()) - } - - pub fn launch( - writer: BufWriter, - updater: Sender, - id: u32, - ) -> Result { - let (sender, chan) = tokio::sync::mpsc::channel(32); - let fiber: JoinHandle> = - tokio::spawn(ClientSocketActor::handle_connect(writer, chan, updater, id)); - Ok(ClientSocketActor { sender, fiber }) - } - - async fn handle_connect( - mut writer: BufWriter, - mut messagebox: Receiver, - updater: Sender, - id: u32, - ) -> Result { - async fn handle( - messagebox: &mut Receiver, - writer: &mut BufWriter, - ) -> Result> { - writer.write_all("privet\n".as_bytes()).await?; - writer.flush().await?; - tracing::info!("Wrote"); - tokio::select! { - _ = messagebox.recv() => return Ok(Some(ClientSocketActorTermination::ServerClosed)), - _ = tokio::time::sleep(Duration::from_millis(200)) => (), - } - Ok(None) - } - - loop { - match handle(&mut messagebox, &mut writer).await { - Ok(None) => {} - Ok(Some(termination)) => return Ok(termination), - Err(err) => { - tracing::error!("{}", err); - updater.send(id).await?; - return Err(err); - } - } - } - } -} diff --git a/src/tcp/mod.rs b/src/tcp/mod.rs deleted file mode 100644 index b6c5f0f..0000000 --- a/src/tcp/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod client; - -pub use client::ClientSocketActor; diff --git a/src/util/http.rs b/src/util/http.rs new file mode 100644 index 0000000..1f79e32 --- /dev/null +++ b/src/util/http.rs @@ -0,0 +1,10 @@ +use std::convert::Infallible; + +use http_body_util::Full; +use hyper::{body::Bytes, Response, StatusCode}; + +pub fn not_found() -> std::result::Result>, Infallible> { + let mut response = Response::new(Full::new(Bytes::from("404"))); + *response.status_mut() = StatusCode::NOT_FOUND; + Ok(response) +} diff --git a/src/util/mod.rs b/src/util/mod.rs index 2612162..e9d3e02 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -3,7 +3,9 @@ use tokio::sync::oneshot::{channel, Sender}; use crate::prelude::*; +pub mod http; pub mod table; +pub mod telemetry; pub struct Terminator { signal: Sender<()>, diff --git a/src/util/telemetry.rs b/src/util/telemetry.rs new file mode 100644 index 0000000..fc00080 --- /dev/null +++ b/src/util/telemetry.rs @@ -0,0 +1,81 @@ +use std::convert::Infallible; +use std::net::SocketAddr; + +use futures_util::FutureExt; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Method, Request, Response}; +use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; +use serde::Deserialize; +use tokio::net::TcpListener; +use tokio::sync::oneshot::channel; + +use crate::prelude::*; + +use crate::util::http::*; +use crate::util::Terminator; + +type BoxBody = http_body_util::combinators::BoxBody; + +#[derive(Deserialize, Debug)] +pub struct ServerConfig { + pub listen_on: SocketAddr, +} + +pub async fn launch(config: &ServerConfig, metrics: MetricsRegistry) -> Result { + log::info!("Starting the telemetry service"); + let listener = TcpListener::bind(config.listen_on).await?; + log::debug!("Listener started"); + let (signal, rx) = channel(); + let handle = tokio::task::spawn(main_loop(listener, metrics, rx.map(|_| ()))); + let terminator = Terminator::from_raw(signal, handle); + Ok(terminator) +} + +async fn main_loop( + listener: TcpListener, + metrics: MetricsRegistry, + termination: impl Future, +) -> Result<()> { + pin!(termination); + loop { + select! { + biased; + _ = &mut termination => break, + result = listener.accept() => { + let (stream, _) = result?; + let metrics = metrics.clone(); + tokio::task::spawn(async move { + let registry = metrics.clone(); + let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), r))); + if let Err(err) = server.await { + tracing::error!("Error serving connection: {:?}", err); + } + }); + }, + } + } + log::info!("Terminating the telemetry service"); + Ok(()) +} + +async fn route( + registry: MetricsRegistry, + request: Request, +) -> std::result::Result, Infallible> { + match (request.method(), request.uri().path()) { + (&Method::GET, "/metrics") => Ok(metrics(registry)?.map(BodyExt::boxed)), + _ => Ok(not_found()?.map(BodyExt::boxed)), + } +} + +fn metrics(registry: MetricsRegistry) -> std::result::Result>, Infallible> { + let mf = registry.gather(); + let mut buffer = vec![]; + TextEncoder + .encode(&mf, &mut buffer) + .expect("write to vec cannot fail"); + Ok(Response::new(Full::new(Bytes::from(buffer)))) +}