mod chat; mod http; mod prelude; mod tcp; use crate::chat::Chats; use crate::prelude::*; use prometheus::{IntCounter, Opts, Registry}; use tcp::ClientSocketActor; use std::collections::HashMap; use std::future::Future; use std::net::SocketAddr; use figment::providers::Format; use tokio::net::{TcpListener, TcpStream}; use figment::{providers::Toml, Figment}; use serde::Deserialize; use tokio::io::BufWriter; use tokio::sync::mpsc::Sender; #[derive(Deserialize, Debug)] struct ServerConfig { cluster: ClusterConfig, } #[derive(Deserialize, Debug)] struct ClusterConfig { name: String, servers: Vec, } fn load_config() -> Result { let raw_config = Figment::new().merge(Toml::file("config.toml")); let config: ServerConfig = raw_config.extract()?; Ok(config) } #[tokio::main] async fn main() -> Result<()> { set_up_logging()?; let sleep = ctrl_c()?; let config = load_config()?; dbg!(config); tracing::info!("Booting up"); let registry = Registry::new(); let counter = IntCounter::with_opts(Opts::new("actor_count", "Number of alive actors"))?; registry.register(Box::new(counter.clone()))?; let chats = Chats::new(); let listener = TcpListener::bind("127.0.0.1:3721").await?; let listener_http = TcpListener::bind("127.0.0.1:8080").await?; let http_server_actor = http::HttpServerActor::launch(listener_http, registry.clone(), chats.clone()).await?; tracing::info!("Started"); run(listener, sleep).await?; // sleep.await; tracing::info!("Begin shutdown"); http_server_actor.terminate().await?; tracing::info!("Shutdown complete"); Ok(()) } fn ctrl_c() -> Result> { use tokio::signal::unix::*; let chan = signal(SignalKind::interrupt())?; async fn recv(mut chan: Signal) { let _ = chan.recv().await; } Ok(recv(chan)) } fn set_up_logging() -> Result<()> { tracing_subscriber::fmt::init(); Ok(()) } async fn run(listener: TcpListener, shutdown: impl Future) -> Result<()> { tokio::pin!(shutdown); let mut counter: u32 = 0; let mut conns: HashMap = HashMap::new(); let (sender, mut chan) = tokio::sync::mpsc::channel(32); loop { let accept = listener.accept(); tokio::select! { id = chan.recv() => { match id { Some(id) => { conns.remove(&id); } None => {} } }, result = accept => { let (connect, address) = result?; let id = counter; counter += 1; let fiber = handle_connection(connect, address, id, sender.clone()).await?; conns.insert(id, fiber); }, _ = &mut shutdown => break, } } for (_, i) in conns { dbg!(i.terminate().await?); } Ok(()) } async fn handle_connection( connect: TcpStream, address: SocketAddr, id: u32, updater: Sender, ) -> Result { tracing::info!("Incoming socket #{} from {}", id, address); let writer = BufWriter::new(connect); Ok(ClientSocketActor::launch(writer, updater, id)?) }