forked from lavina/lavina
1
0
Fork 0
lavina/src/main.rs

122 lines
3.0 KiB
Rust
Raw Normal View History

2023-01-19 17:18:41 +00:00
mod http;
mod prelude;
2023-01-19 17:58:56 +00:00
mod tcp;
2023-01-19 17:18:41 +00:00
use crate::prelude::*;
2023-01-19 17:58:56 +00:00
use tcp::ClientSocketActor;
2023-01-19 14:25:52 +00:00
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;
use figment::providers::Format;
use tokio::net::{TcpListener, TcpStream};
use figment::{providers::Toml, Figment};
use serde::Deserialize;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinHandle;
#[derive(Deserialize, Debug)]
struct ServerConfig {
cluster: ClusterConfig,
}
#[derive(Deserialize, Debug)]
struct ClusterConfig {
name: String,
servers: Vec<SocketAddr>,
}
fn load_config() -> Result<ServerConfig> {
let raw_config = Figment::new().merge(Toml::file("config.toml"));
let config: ServerConfig = raw_config.extract()?;
Ok(config)
}
#[tokio::main]
async fn main() -> Result<()> {
set_up_logging()?;
let sleep = ctrl_c()?;
let config = load_config()?;
dbg!(config);
tracing::info!("Booting up");
let listener = TcpListener::bind("127.0.0.1:3721").await?;
2023-01-19 17:18:41 +00:00
let listener_http = TcpListener::bind("127.0.0.1:8080").await?;
let http_server_actor = http::HttpServerActor::launch(listener_http).await?;
2023-01-19 14:25:52 +00:00
tracing::info!("Started");
run(listener, sleep).await?;
// sleep.await;
tracing::info!("Begin shutdown");
2023-01-19 17:18:41 +00:00
http_server_actor.terminate().await?;
2023-01-19 14:25:52 +00:00
tracing::info!("Shutdown complete");
Ok(())
}
fn ctrl_c() -> Result<impl Future<Output = ()>> {
use tokio::signal::unix::*;
let chan = signal(SignalKind::interrupt())?;
async fn recv(mut chan: Signal) {
let _ = chan.recv().await;
}
Ok(recv(chan))
}
fn set_up_logging() -> Result<()> {
tracing_subscriber::fmt::init();
Ok(())
}
async fn run(listener: TcpListener, shutdown: impl Future<Output = ()>) -> Result<()> {
tokio::pin!(shutdown);
let mut counter: u32 = 0;
let mut conns: HashMap<u32, _> = HashMap::new();
let (sender, mut chan) = tokio::sync::mpsc::channel(32);
loop {
let accept = listener.accept();
tokio::select! {
id = chan.recv() => {
match id {
Some(id) => {
conns.remove(&id);
}
None => {}
}
},
result = accept => {
let (connect, address) = result?;
let id = counter;
counter += 1;
let fiber = handle_connection(connect, address, id, sender.clone()).await?;
conns.insert(id, fiber);
},
_ = &mut shutdown => break,
}
}
for (_, i) in conns {
dbg!(i.terminate().await?);
}
Ok(())
}
async fn handle_connection(
connect: TcpStream,
address: SocketAddr,
id: u32,
updater: Sender<u32>,
) -> Result<ClientSocketActor> {
tracing::info!("Incoming socket #{} from {}", id, address);
let writer = BufWriter::new(connect);
Ok(ClientSocketActor::launch(writer, updater, id)?)
}