forked from lavina/lavina
1
0
Fork 0

etcd client

This commit is contained in:
Nikita Vilunov 2023-12-22 11:17:29 +01:00
parent cfa1e051fb
commit fb3b2d8250
6 changed files with 236 additions and 33 deletions

218
Cargo.lock generated
View File

@ -105,6 +105,39 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]]
name = "async-trait"
version = "0.1.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]] [[package]]
name = "atoi" name = "atoi"
version = "2.0.0" version = "2.0.0"
@ -139,6 +172,51 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
dependencies = [
"async-trait",
"axum-core",
"bitflags 1.3.2",
"bytes",
"futures-util",
"http 0.2.11",
"http-body 0.4.6",
"hyper 0.14.27",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 0.2.11",
"http-body 0.4.6",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.69" version = "0.3.69"
@ -432,6 +510,22 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "etcd-client"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5231ad671c74ee5dc02753a0a9c855fe6e90de2a07acb2582f8a702470e04d1"
dependencies = [
"http 0.2.11",
"prost",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tower",
"tower-service",
]
[[package]] [[package]]
name = "etcetera" name = "etcetera"
version = "0.8.0" version = "0.8.0"
@ -630,13 +724,19 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-util", "futures-util",
"http 0.2.11", "http 0.2.11",
"indexmap", "indexmap 2.1.0",
"slab", "slab",
"tokio", "tokio",
"tokio-util", "tokio-util",
"tracing", "tracing",
] ]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.14.3" version = "0.14.3"
@ -653,7 +753,7 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7"
dependencies = [ dependencies = [
"hashbrown", "hashbrown 0.14.3",
] ]
[[package]] [[package]]
@ -814,6 +914,18 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper 0.14.27",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]] [[package]]
name = "hyper-util" name = "hyper-util"
version = "0.1.1" version = "0.1.1"
@ -844,6 +956,16 @@ dependencies = [
"unicode-normalization", "unicode-normalization",
] ]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
]
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.1.0" version = "2.1.0"
@ -851,7 +973,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown", "hashbrown 0.14.3",
] ]
[[package]] [[package]]
@ -940,6 +1062,7 @@ name = "lavina-core"
version = "0.0.2-dev" version = "0.0.2-dev"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"etcd-client",
"inner-api", "inner-api",
"prometheus", "prometheus",
"prost", "prost",
@ -1003,6 +1126,12 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]] [[package]]
name = "md-5" name = "md-5"
version = "0.10.6" version = "0.10.6"
@ -1254,7 +1383,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9"
dependencies = [ dependencies = [
"fixedbitset", "fixedbitset",
"indexmap", "indexmap 2.1.0",
] ]
[[package]] [[package]]
@ -1702,6 +1831,12 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "rustversion"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.15" version = "1.0.15"
@ -1946,7 +2081,7 @@ dependencies = [
"futures-util", "futures-util",
"hashlink", "hashlink",
"hex", "hex",
"indexmap", "indexmap 2.1.0",
"log", "log",
"memchr", "memchr",
"once_cell", "once_cell",
@ -2149,6 +2284,12 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "sync_wrapper"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]] [[package]]
name = "system-configuration" name = "system-configuration"
version = "0.5.1" version = "0.5.1"
@ -2247,6 +2388,16 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.2.0" version = "2.2.0"
@ -2268,6 +2419,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.10" version = "0.7.10"
@ -2309,13 +2471,53 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03"
dependencies = [ dependencies = [
"indexmap", "indexmap 2.1.0",
"serde", "serde",
"serde_spanned", "serde_spanned",
"toml_datetime", "toml_datetime",
"winnow", "winnow",
] ]
[[package]]
name = "tonic"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"h2",
"http 0.2.11",
"http-body 0.4.6",
"hyper 0.14.27",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn 2.0.39",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"
@ -2324,9 +2526,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"indexmap 1.9.3",
"pin-project", "pin-project",
"pin-project-lite", "pin-project-lite",
"rand",
"slab",
"tokio", "tokio",
"tokio-util",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing", "tracing",

View File

@ -14,7 +14,8 @@ key = "./certs/xmpp.key"
db_path = "db.sqlite" db_path = "db.sqlite"
[clustering] [clustering]
cluster_name = "example"
self_id = 1 self_id = 1
listen_on = "0.0.0.0:23732" listen_on = "0.0.0.0:23732"
advertised_address = "127.0.0.1:23732" advertised_address = "127.0.0.1:23732"
bootstrap_via = [] etcd_address = "http://127.0.0.1:2379"

View File

@ -14,7 +14,8 @@ key = "./certs/xmpp.key"
db_path = "db.sqlite" db_path = "db.sqlite"
[clustering] [clustering]
self_id = 1 cluster_name = "example"
self_id = 2
listen_on = "0.0.0.0:23733" listen_on = "0.0.0.0:23733"
advertised_address = "127.0.0.1:23733" advertised_address = "127.0.0.1:23733"
bootstrap_via = ["0.0.0.0:23732"] etcd_address = "127.0.0.1:2379"

View File

@ -12,3 +12,4 @@ tracing.workspace = true
prometheus.workspace = true prometheus.workspace = true
inner-api = { path = "../inner-api" } inner-api = { path = "../inner-api" }
prost.workspace = true prost.workspace = true
etcd-client = "0.12.3"

View File

@ -11,13 +11,19 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{info, trace, warn}; use tracing::{info, trace, warn};
use etcd_client::Client as EtcdClient;
use inner_api::proto; use inner_api::proto;
use crate::prelude::Str;
use crate::table::{AnonTable}; use crate::table::{AnonTable};
use crate::terminator::Terminator; use crate::terminator::Terminator;
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct ClusteringConfig { pub struct ClusteringConfig {
/// The name of the cluster used for prefixing the data in etcd and to prevent accidental communication between different clusters.
///
/// Useful for multi-tenant usage of etcd.
pub cluster_name: Str,
/// The unique identifier of the node in the cluster, needed to distinguish it form other nodes. /// The unique identifier of the node in the cluster, needed to distinguish it form other nodes.
pub self_id: u32, pub self_id: u32,
/// The local address where this node listens incoming for node-to-node communication. /// The local address where this node listens incoming for node-to-node communication.
@ -27,9 +33,8 @@ pub struct ClusteringConfig {
/// This is useful when a node is behind a NAT or a proxy, where the publicly accessible IP /// This is useful when a node is behind a NAT or a proxy, where the publicly accessible IP
/// address can differ from the local IP address. /// address can differ from the local IP address.
pub advertised_address: SocketAddr, pub advertised_address: SocketAddr,
/// A list of network addresses with the node's initial set of peers in the cluster. /// The address of the etcd node which will be used to discover or to bootstrap the cluster.
/// These nodes will be contacted first in order to discover additional nodes and to initialize the local state. pub etcd_address: Str,
pub bootstrap_via: Box<[SocketAddr]>,
} }
/// Cluster registry is a service which handles all connections to other nodes of the cluster. /// Cluster registry is a service which handles all connections to other nodes of the cluster.
@ -54,29 +59,10 @@ impl ClusterRegistry {
peer_tasks: AnonTable::new(), peer_tasks: AnonTable::new(),
})); }));
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
let listener_task = start_connection_listener(listener, inner.clone()).await; let listener_task = start_connection_listener(listener, inner.clone()).await;
let client = EtcdClient::connect(&[config.etcd_address], None).await?;
dbg!(client.kv_client().get(b"kek", None).await?);
if let Some(bootstrap) = config.bootstrap_via.first() {
info!("Connecting to the bootstrap server");
let mut stream = TcpStream::connect(bootstrap).await?;
let handshake_request = proto::HandshakeRequest {
node_id: config.self_id,
address: config.advertised_address.to_string(),
};
let mut buf = vec![];
handshake_request.encode(&mut buf).unwrap();
let length = u16::to_be_bytes(buf.len() as u16);
stream.write_all(&length).await?;
stream.write_all(&buf).await?;
stream.flush().await?;
let handshake_response = read_message::<proto::HandshakeResponse>(&mut stream).await?;
info!("Connection to the bootstrap server succeeded");
peer_launch(stream, inner.clone()).await;
}
Ok(ClusterRegistry { Ok(ClusterRegistry {
inner, inner,

8
docker-compose.yml Normal file
View File

@ -0,0 +1,8 @@
version: "3"
services:
etcd:
image: quay.io/coreos/etcd:v3.5.11
ports:
- "2379:2379"
- "2380:2380"