diff --git a/Cargo.lock b/Cargo.lock index 31cad40..2faea62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,6 +105,39 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + +[[package]] +name = "async-trait" +version = "0.1.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "atoi" version = "2.0.0" @@ -139,6 +172,51 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.27", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 0.2.11", + "http-body 0.4.6", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -432,6 +510,22 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "etcd-client" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5231ad671c74ee5dc02753a0a9c855fe6e90de2a07acb2582f8a702470e04d1" +dependencies = [ + "http 0.2.11", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower", + "tower-service", +] + [[package]] name = "etcetera" version = "0.8.0" @@ -630,13 +724,19 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.11", - "indexmap", + "indexmap 2.1.0", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.3" @@ -653,7 +753,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown", + "hashbrown 0.14.3", ] [[package]] @@ -814,6 +914,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.27", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-util" version = "0.1.1" @@ -844,6 +956,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.1.0" @@ -851,7 +973,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.3", ] [[package]] @@ -940,6 +1062,7 @@ name = "lavina-core" version = "0.0.2-dev" dependencies = [ "anyhow", + "etcd-client", "inner-api", "prometheus", "prost", @@ -1003,6 +1126,12 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1254,7 +1383,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 2.1.0", ] [[package]] @@ -1702,6 +1831,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "ryu" version = "1.0.15" @@ -1946,7 +2081,7 @@ dependencies = [ "futures-util", "hashlink", "hex", - "indexmap", + "indexmap 2.1.0", "log", "memchr", "once_cell", @@ -2149,6 +2284,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "system-configuration" version = "0.5.1" @@ -2247,6 +2388,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -2268,6 +2419,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -2309,13 +2471,53 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" dependencies = [ - "indexmap", + "indexmap 2.1.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.27", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.39", +] + [[package]] name = "tower" version = "0.4.13" @@ -2324,9 +2526,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/config.toml b/config.node-1.toml similarity index 83% rename from config.toml rename to config.node-1.toml index b0742ae..a513991 100644 --- a/config.toml +++ b/config.node-1.toml @@ -14,7 +14,8 @@ key = "./certs/xmpp.key" db_path = "db.sqlite" [clustering] +cluster_name = "example" self_id = 1 listen_on = "0.0.0.0:23732" advertised_address = "127.0.0.1:23732" -bootstrap_via = [] +etcd_address = "http://127.0.0.1:2379" diff --git a/config2.toml b/config.node-2.toml similarity index 81% rename from config2.toml rename to config.node-2.toml index f2ef981..9eb5771 100644 --- a/config2.toml +++ b/config.node-2.toml @@ -14,7 +14,8 @@ key = "./certs/xmpp.key" db_path = "db.sqlite" [clustering] -self_id = 1 +cluster_name = "example" +self_id = 2 listen_on = "0.0.0.0:23733" advertised_address = "127.0.0.1:23733" -bootstrap_via = ["0.0.0.0:23732"] +etcd_address = "127.0.0.1:2379" diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml index 25c37ae..b61d056 100644 --- a/crates/lavina-core/Cargo.toml +++ b/crates/lavina-core/Cargo.toml @@ -12,3 +12,4 @@ tracing.workspace = true prometheus.workspace = true inner-api = { path = "../inner-api" } prost.workspace = true +etcd-client = "0.12.3" diff --git a/crates/lavina-core/src/clustering/mod.rs b/crates/lavina-core/src/clustering/mod.rs index 1df0be2..57eb006 100644 --- a/crates/lavina-core/src/clustering/mod.rs +++ b/crates/lavina-core/src/clustering/mod.rs @@ -11,13 +11,19 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tracing::{info, trace, warn}; +use etcd_client::Client as EtcdClient; use inner_api::proto; +use crate::prelude::Str; use crate::table::{AnonTable}; use crate::terminator::Terminator; #[derive(Deserialize, Debug, Clone)] pub struct ClusteringConfig { + /// The name of the cluster used for prefixing the data in etcd and to prevent accidental communication between different clusters. + /// + /// Useful for multi-tenant usage of etcd. + pub cluster_name: Str, /// The unique identifier of the node in the cluster, needed to distinguish it form other nodes. pub self_id: u32, /// The local address where this node listens incoming for node-to-node communication. @@ -27,9 +33,8 @@ pub struct ClusteringConfig { /// This is useful when a node is behind a NAT or a proxy, where the publicly accessible IP /// address can differ from the local IP address. pub advertised_address: SocketAddr, - /// A list of network addresses with the node's initial set of peers in the cluster. - /// These nodes will be contacted first in order to discover additional nodes and to initialize the local state. - pub bootstrap_via: Box<[SocketAddr]>, + /// The address of the etcd node which will be used to discover or to bootstrap the cluster. + pub etcd_address: Str, } /// Cluster registry is a service which handles all connections to other nodes of the cluster. @@ -54,29 +59,10 @@ impl ClusterRegistry { peer_tasks: AnonTable::new(), })); let listener = TcpListener::bind(config.listen_on).await?; - let listener_task = start_connection_listener(listener, inner.clone()).await; + let client = EtcdClient::connect(&[config.etcd_address], None).await?; + dbg!(client.kv_client().get(b"kek", None).await?); - if let Some(bootstrap) = config.bootstrap_via.first() { - info!("Connecting to the bootstrap server"); - let mut stream = TcpStream::connect(bootstrap).await?; - let handshake_request = proto::HandshakeRequest { - node_id: config.self_id, - address: config.advertised_address.to_string(), - }; - let mut buf = vec![]; - - handshake_request.encode(&mut buf).unwrap(); - - let length = u16::to_be_bytes(buf.len() as u16); - stream.write_all(&length).await?; - stream.write_all(&buf).await?; - stream.flush().await?; - - let handshake_response = read_message::(&mut stream).await?; - info!("Connection to the bootstrap server succeeded"); - peer_launch(stream, inner.clone()).await; - } Ok(ClusterRegistry { inner, diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..263ecc9 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +version: "3" + +services: + etcd: + image: quay.io/coreos/etcd:v3.5.11 + ports: + - "2379:2379" + - "2380:2380"