diff --git a/Cargo.lock b/Cargo.lock index c0ed647..31cad40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -475,6 +475,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flume" version = "0.11.0" @@ -854,12 +860,29 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "inner-api" +version = "0.0.2-dev" +dependencies = [ + "prost", + "prost-build", +] + [[package]] name = "ipnet" version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.0" @@ -917,7 +940,9 @@ name = "lavina-core" version = "0.0.2-dev" dependencies = [ "anyhow", + "inner-api", "prometheus", + "prost", "serde", "sqlx", "tokio", @@ -1033,6 +1058,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "nix" version = "0.27.1" @@ -1216,6 +1247,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -1281,6 +1322,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" +dependencies = [ + "proc-macro2", + "syn 2.0.39", +] + [[package]] name = "proc-macro2" version = "1.0.70" @@ -1357,6 +1408,60 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" +dependencies = [ + "bytes", + "heck", + "itertools 0.11.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.39", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn 2.0.39", +] + +[[package]] +name = "prost-types" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" +dependencies = [ + "prost", +] + [[package]] name = "proto-irc" version = "0.0.2-dev" @@ -1801,7 +1906,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" dependencies = [ - "itertools", + "itertools 0.12.0", "nom", "unicode_categories", ] @@ -2498,6 +2603,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "whoami" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 207b7cb..df1cd71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/proto-xmpp", "crates/mgmt-api", "crates/sasl", + "crates/inner-api", ] [workspace.package] @@ -31,6 +32,7 @@ base64 = "0.21.3" lavina-core = { path = "crates/lavina-core" } tracing-subscriber = "0.3.16" sasl = { path = "crates/sasl" } +prost = "0.12.3" [package] name = "lavina" diff --git a/config.toml b/config.toml index 84fe4ea..b0742ae 100644 --- a/config.toml +++ b/config.toml @@ -17,4 +17,4 @@ db_path = "db.sqlite" self_id = 1 listen_on = "0.0.0.0:23732" advertised_address = "127.0.0.1:23732" -bootstrap_via = ["0.0.0.0:23733"] +bootstrap_via = [] diff --git a/config2.toml b/config2.toml new file mode 100644 index 0000000..f2ef981 --- /dev/null +++ b/config2.toml @@ -0,0 +1,20 @@ +[telemetry] +listen_on = "127.0.0.1:18080" + +[irc] +listen_on = "127.0.0.1:16667" +server_name = "irc.localhost" + +[xmpp] +listen_on = "127.0.0.1:15222" +cert = "./certs/xmpp.pem" +key = "./certs/xmpp.key" + +[storage] +db_path = "db.sqlite" + +[clustering] +self_id = 1 +listen_on = "0.0.0.0:23733" +advertised_address = "127.0.0.1:23733" +bootstrap_via = ["0.0.0.0:23732"] diff --git a/crates/inner-api/build.rs b/crates/inner-api/build.rs new file mode 100644 index 0000000..a2f274b --- /dev/null +++ b/crates/inner-api/build.rs @@ -0,0 +1,4 @@ +fn main() { + let empty: &[&'_ str] = &[]; + prost_build::compile_protos(&["proto/api.proto"], empty).unwrap(); +} diff --git a/crates/inner-api/proto/api.proto b/crates/inner-api/proto/api.proto new file mode 100644 index 0000000..2cb8610 --- /dev/null +++ b/crates/inner-api/proto/api.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package lavina; + +message HandshakeRequest { + uint32 node_id = 1; + string address = 2; +} + +message HandshakeResponse { + +} + +message Request { + uint32 id = 1; + oneof payload { + GetNodes get_nodes = 2; + } +} + +message GetNodes { + +} + +message ResponseNodes { + +} + +message Response { + uint32 id = 1; + oneof payload { + ResponseNodes nodes = 2; + } +} diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml index 727835c..25c37ae 100644 --- a/crates/lavina-core/Cargo.toml +++ b/crates/lavina-core/Cargo.toml @@ -10,3 +10,5 @@ serde.workspace = true tokio.workspace = true tracing.workspace = true prometheus.workspace = true +inner-api = { path = "../inner-api" } +prost.workspace = true diff --git a/crates/lavina-core/src/clustering/mod.rs b/crates/lavina-core/src/clustering/mod.rs index 98a0187..936fb96 100644 --- a/crates/lavina-core/src/clustering/mod.rs +++ b/crates/lavina-core/src/clustering/mod.rs @@ -1,7 +1,20 @@ use std::net::SocketAddr; +use std::sync::Arc; use anyhow::Result; +use prost::Message; use serde::Deserialize; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::spawn; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tracing::{info, trace, warn}; + +use inner_api::proto; +use crate::table::{AnonTable}; +use crate::terminator::Terminator; #[derive(Deserialize, Debug, Clone)] pub struct ClusteringConfig { @@ -23,12 +36,159 @@ pub struct ClusteringConfig { /// /// It maintains the local view of the cluster state, including the set of nodes, /// the status of each node and the shard allocation metadata. -struct ClusterRegistry {} +pub struct ClusterRegistry { + inner: Arc>, + listener_task: Terminator, +} + +struct ClusterRegistryInner { + peer_tasks: AnonTable<(Sender, JoinHandle<()>)>, +} impl ClusterRegistry { - /// Creates a new cluster registry and launches all necessary daemon fibers. + /// Creates a new cluster registry. It contains all currently active peer connections. + /// + /// Starts a fiber which listens on the given TCP socket for incoming connections, handles incoming handshake and starts a peer task. pub async fn launch(config: ClusteringConfig) -> Result { - todo!() + let inner = Arc::new(RwLock::new(ClusterRegistryInner { + peer_tasks: AnonTable::new(), + })); + let listener = TcpListener::bind(config.listen_on).await?; + + let listener_task = start_connection_listener(listener, inner.clone()).await; + + if let Some(bootstrap) = config.bootstrap_via.first() { + info!("Connecting to the bootstrap server"); + let mut stream = TcpStream::connect(bootstrap).await?; + let handshake_request = proto::HandshakeRequest { + node_id: config.self_id, + address: config.advertised_address.to_string(), + }; + let mut buf = vec![]; + + handshake_request.encode(&mut buf).unwrap(); + + let length = u16::to_be_bytes(buf.len() as u16); + stream.write_all(&length).await?; + stream.write_all(&buf).await?; + stream.flush().await?; + + let handshake_response = read_message::(&mut stream).await?; + info!("Connection to the bootstrap server succeeded"); + peer_launch(stream , inner.clone()).await; + } + + Ok(ClusterRegistry { + inner, + listener_task, + }) + } + + pub async fn terminate(self) -> Result<()> { + if let Err(e) = self.listener_task.terminate().await { + warn!("Failed to terminate clustering listener: {e:?}"); + } + let mut inner = self.inner.write().await; + for (_, (k, j)) in inner.peer_tasks.into_iter() { + if let Err(e) = k.send(PeerMsg::Close).await { + warn!("Failed to stop peer actor"); + } + + } + Ok(()) } } +async fn start_connection_listener(listener: TcpListener, cluster: Arc>) -> Terminator { + Terminator::spawn(|_| async move { + loop { + let (mut stream, addr) = match listener.accept().await { + Ok(stream) => stream, + Err(e) => { + warn!("Error accepting connection: {}", e); + continue; + } + }; + info!("Incoming connection from {addr}"); + // We handle the handshaking in order to not block other incoming connections. + let cluster = cluster.clone(); + spawn(async move { + // All incoming connections expect the handshake to be initiated by the other peer. + // The peer sends a `HandshakeRequest` message and we respond to it with a `HandshakeResponse` message. + // After that a new task is launched for peer communication, which is the same fiber as the one started when we initiate the connection. + let handshake = read_message::(&mut stream).await; + let handshake = match handshake { + Ok(handshake) => handshake, + Err(e) => { + warn!("Error reading from socket: {}", e); + return; + } + }; + let handshake_response = proto::HandshakeResponse {}; + let mut buf = vec![]; + handshake_response.encode(&mut buf).unwrap(); + stream.flush().await; + if let Err(e) = stream.write_all(&buf).await { + warn!("Failed to reply with HandshakeResponse: {}", e); + } else { + info!("Connection from {addr} accepted"); + peer_launch(stream , cluster).await; + } + }); + } + }) +} + +async fn read_message(stream: &mut TcpStream) -> Result { + // All messages received from the stream start with 2 bytes which indicate N -- the length of the message in bytes. + let mut length_buf = [0; 2]; + if let Err(e) = stream.read_exact(&mut length_buf).await { + warn!("Error reading length from stream: {}", e); + return Err(e.into()); + } + + let length = u16::from_be_bytes(length_buf) as usize; + + // After that we expect a raw message of length N. + let mut msg_buf = vec![0; length]; + if let Err(e) = stream.read_exact(&mut msg_buf).await { + warn!("Error reading message from stream: {e:?}"); + return Err(e.into()); + } + + match ::decode(&*msg_buf) { + Ok(message) => { + Ok(message) + } + Err(e) => { + warn!("Invalid message: {e:?}"); + Err(e.into()) + } + } +} + +async fn peer_launch(stream: TcpStream, cluster: Arc>) { + let (tx, rx) = channel(32); + let join_handle = spawn(peer_communication_loop(stream, rx)); + let mut inner = cluster.write().await; + inner.peer_tasks.insert((tx, join_handle)); +} + +enum PeerMsg { + Close, +} + +/// The main loop which is executed from the peer task. Created from both the incoming and the outgoing socket handlers. +async fn peer_communication_loop(mut stream: TcpStream, rx: Receiver) { + loop { + match read_message::(&mut stream).await { + Ok(req) => { + trace!("Message received: {req:?}"); + } + Err(e) => { + warn!("Failed when receiving the next message, closing the socket: {e:?}"); + break; + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 0bcc612..09d64ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use serde::Deserialize; use lavina_core::player::PlayerRegistry; use lavina_core::prelude::*; use lavina_core::repo::Storage; +use lavina_core::clustering::ClusterRegistry; use lavina_core::room::RoomRegistry; #[derive(Deserialize, Debug)] @@ -53,6 +54,7 @@ async fn main() -> Result<()> { } = config; let mut metrics = MetricsRegistry::new(); let storage = Storage::open(storage_config).await?; + let cluster = ClusterRegistry::launch(clustering_config).await?; let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; @@ -76,6 +78,7 @@ async fn main() -> Result<()> { players.shutdown_all().await?; drop(players); drop(rooms); + cluster.terminate().await?; storage.close().await?; tracing::info!("Shutdown complete"); Ok(())