mod http; mod prelude; use crate::prelude::*; use std::collections::HashMap; use std::future::Future; use std::net::SocketAddr; use std::time::Duration; use figment::providers::Format; use tokio::net::{TcpListener, TcpStream}; use figment::{providers::Toml, Figment}; use serde::Deserialize; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinHandle; #[derive(Deserialize, Debug)] struct ServerConfig { cluster: ClusterConfig, } #[derive(Deserialize, Debug)] struct ClusterConfig { name: String, servers: Vec, } fn load_config() -> Result { let raw_config = Figment::new().merge(Toml::file("config.toml")); let config: ServerConfig = raw_config.extract()?; Ok(config) } #[tokio::main] async fn main() -> Result<()> { set_up_logging()?; let sleep = ctrl_c()?; let config = load_config()?; dbg!(config); tracing::info!("Booting up"); let listener = TcpListener::bind("127.0.0.1:3721").await?; let listener_http = TcpListener::bind("127.0.0.1:8080").await?; let http_server_actor = http::HttpServerActor::launch(listener_http).await?; tracing::info!("Started"); run(listener, sleep).await?; // sleep.await; tracing::info!("Begin shutdown"); http_server_actor.terminate().await?; tracing::info!("Shutdown complete"); Ok(()) } fn ctrl_c() -> Result> { use tokio::signal::unix::*; let chan = signal(SignalKind::interrupt())?; async fn recv(mut chan: Signal) { let _ = chan.recv().await; } Ok(recv(chan)) } fn set_up_logging() -> Result<()> { tracing_subscriber::fmt::init(); Ok(()) } async fn run(listener: TcpListener, shutdown: impl Future) -> Result<()> { tokio::pin!(shutdown); let mut counter: u32 = 0; let mut conns: HashMap = HashMap::new(); let (sender, mut chan) = tokio::sync::mpsc::channel(32); loop { let accept = listener.accept(); tokio::select! { id = chan.recv() => { match id { Some(id) => { conns.remove(&id); } None => {} } }, result = accept => { let (connect, address) = result?; let id = counter; counter += 1; let fiber = handle_connection(connect, address, id, sender.clone()).await?; conns.insert(id, fiber); }, _ = &mut shutdown => break, } } for (_, i) in conns { dbg!(i.terminate().await?); } Ok(()) } async fn handle_connection( connect: TcpStream, address: SocketAddr, id: u32, updater: Sender, ) -> Result { tracing::info!("Incoming socket #{} from {}", id, address); let writer = BufWriter::new(connect); Ok(ClientSocketActor::launch(writer, updater, id)?) } struct ClientSocketActor { sender: Sender, fiber: JoinHandle>, } #[derive(Debug, Clone)] enum ClientSocketActorMessage { Terminate, } #[derive(Debug, Clone)] enum ClientSocketActorTermination { ServerClosed, } impl ClientSocketActor { pub async fn terminate(self) -> Result { self.sender .send(ClientSocketActorMessage::Terminate) .await?; Ok(self.fiber.await??) } pub fn launch( writer: BufWriter, updater: Sender, id: u32, ) -> Result { let (sender, chan) = tokio::sync::mpsc::channel(32); let fiber: JoinHandle> = tokio::spawn(ClientSocketActor::handle_connect(writer, chan, updater, id)); Ok(ClientSocketActor { sender, fiber }) } async fn handle_connect( mut writer: BufWriter, mut messagebox: Receiver, updater: Sender, id: u32, ) -> Result { async fn handle( messagebox: &mut Receiver, writer: &mut BufWriter, ) -> Result> { writer.write_all("privet\n".as_bytes()).await?; writer.flush().await?; tracing::info!("Wrote"); tokio::select! { _ = messagebox.recv() => return Ok(Some(ClientSocketActorTermination::ServerClosed)), _ = tokio::time::sleep(Duration::from_millis(200)) => (), } Ok(None) } loop { match handle(&mut messagebox, &mut writer).await { Ok(None) => {} Ok(Some(termination)) => return Ok(termination), Err(err) => { tracing::error!("{}", err); updater.send(id).await?; return Err(err); } } } } }