forked from lavina/lavina
1
0
Fork 0

Compare commits

...

2 Commits

Author SHA1 Message Date
Nikita Vilunov 53703152ff wip 2024-02-06 13:14:22 +01:00
Nikita Vilunov 9ae45380ae wip 2024-02-02 20:05:18 +01:00
4 changed files with 113 additions and 5 deletions

80
Cargo.lock generated
View File

@ -93,6 +93,15 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "anyerror"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcd04a72664a65fb9adeae7ced0c98efd68a2b7a45adda8319b3bb36538404b8"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.75" version = "1.0.75"
@ -280,6 +289,16 @@ version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
[[package]]
name = "byte-unit"
version = "4.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da78b32057b8fdfc352504708feeba7216dcd65a2c9ab02978cbd288d1279b6c"
dependencies = [
"serde",
"utf8-width",
]
[[package]] [[package]]
name = "bytemuck" name = "bytemuck"
version = "1.14.0" version = "1.14.0"
@ -639,6 +658,21 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "futures"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.29" version = "0.3.29"
@ -712,6 +746,7 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro", "futures-macro",
@ -1108,9 +1143,11 @@ dependencies = [
"anyhow", "anyhow",
"etcd-client", "etcd-client",
"inner-api", "inner-api",
"openraft",
"prometheus", "prometheus",
"prost", "prost",
"serde", "serde",
"serde_json",
"sqlx", "sqlx",
"tokio", "tokio",
"tracing", "tracing",
@ -1170,6 +1207,12 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "maplit"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]] [[package]]
name = "matchit" name = "matchit"
version = "0.7.3" version = "0.7.3"
@ -1353,6 +1396,27 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "openraft"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e189b3a5107b4911365ab1d2e26e04f9f30602927ff81265d2cb2f6ee454313c"
dependencies = [
"anyerror",
"async-trait",
"byte-unit",
"clap",
"derive_more",
"futures",
"maplit",
"pin-utils",
"rand",
"thiserror",
"tokio",
"tracing",
"tracing-futures",
]
[[package]] [[package]]
name = "overload" name = "overload"
version = "0.1.1" version = "0.1.1"
@ -2665,6 +2729,16 @@ dependencies = [
"valuable", "valuable",
] ]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]] [[package]]
name = "tracing-log" name = "tracing-log"
version = "0.2.0" version = "0.2.0"
@ -2767,6 +2841,12 @@ version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
[[package]]
name = "utf8-width"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86bd8d4e895da8537e5315b8254664e6b769c4ff3db18321b297a1e7004392e3"
[[package]] [[package]]
name = "utf8parse" name = "utf8parse"
version = "0.2.1" version = "0.2.1"

View File

@ -27,6 +27,7 @@ regex = "1.7.1"
derive_more = "0.99.17" derive_more = "0.99.17"
clap = { version = "4.4.4", features = ["derive"] } clap = { version = "4.4.4", features = ["derive"] }
serde = { version = "1.0.152", features = ["rc", "serde_derive"] } serde = { version = "1.0.152", features = ["rc", "serde_derive"] }
serde_json = "1.0.93"
tracing = "0.1.37" # logging & tracing api tracing = "0.1.37" # logging & tracing api
prometheus = { version = "0.13.3", default-features = false } prometheus = { version = "0.13.3", default-features = false }
base64 = "0.21.3" base64 = "0.21.3"
@ -48,7 +49,7 @@ hyper = { version = "1.0.1", features = ["server", "http1"] } # http server
http-body-util = "0.1.0" http-body-util = "0.1.0"
hyper-util = { version = "0.1", features = ["server", "http1", "tokio"] } hyper-util = { version = "0.1", features = ["server", "http1", "tokio"] }
serde.workspace = true serde.workspace = true
serde_json = "1.0.93" serde_json.workspace = true
tokio.workspace = true tokio.workspace = true
tracing.workspace = true tracing.workspace = true
tracing-subscriber.workspace = true tracing-subscriber.workspace = true

View File

@ -7,9 +7,11 @@ version.workspace = true
anyhow.workspace = true anyhow.workspace = true
sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] }
serde.workspace = true serde.workspace = true
serde_json.workspace = true
tokio.workspace = true tokio.workspace = true
tracing.workspace = true tracing.workspace = true
prometheus.workspace = true prometheus.workspace = true
inner-api = { path = "../inner-api" } inner-api = { path = "../inner-api" }
prost.workspace = true prost.workspace = true
etcd-client = "0.12.3" etcd-client = "0.12.3"
openraft = "0.8.8"

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use prost::Message; use prost::Message;
use serde::Deserialize; use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::{select, pin, spawn}; use tokio::{select, pin, spawn};
@ -74,6 +74,12 @@ struct ClusterRegistryInner {
peer_tasks: AnonTable<(Sender<PeerMsg>, JoinHandle<()>)>, peer_tasks: AnonTable<(Sender<PeerMsg>, JoinHandle<()>)>,
} }
/// Stored in etcd under the node key.
#[derive(Serialize, Deserialize, Debug, Clone)]
struct PeerData {
address: SocketAddr,
}
impl ClusterRegistry { impl ClusterRegistry {
/// Creates a new cluster registry. It contains all currently active peer connections. /// Creates a new cluster registry. It contains all currently active peer connections.
/// ///
@ -85,10 +91,29 @@ impl ClusterRegistry {
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
let listener_task = start_connection_listener(listener, inner.clone()).await; let listener_task = start_connection_listener(listener, inner.clone()).await;
let mut client = EtcdClient::connect(&[config.etcd_address], None).await?; let mut client = EtcdClient::connect(&[config.etcd_address], None).await?;
let campaign_response = client.election_client().campaign(b"kek", b"my name", 0).await?;
dbg!(campaign_response); // Step 1. Put info about itself into etcd.
let leader = client.leader("kek").await?; tracing::info!("Registering node in etcd");
let key = format!("/lavina/clusters/{}/nodes/{}", config.cluster_name, config.self_id);
let data = PeerData { address: config.advertised_address.clone() };
let serialized = serde_json::to_string(&data)?;
client.kv_client().put(key.as_bytes(), serialized, None).await?;
// Step 2. Campaign for leadership of the cluster.
tracing::info!("Starting the election");
let election_key = format!("/lavina/clusters/{}/election", config.cluster_name);
let self_key = format!("{}", config.self_id);
dbg!(&self_key);
let election = client.election_client().campaign(election_key.as_bytes(), self_key.clone(), 0).await?;
let leader = election.leader().unwrap();
dbg!(leader); dbg!(leader);
let lease = leader.lease();
loop {
let election = client.election_client().campaign(election_key.as_bytes(), self_key.clone(), lease).await?;
let leader = election.leader().unwrap();
dbg!(leader);
}
Ok(ClusterRegistry { Ok(ClusterRegistry {