diff --git a/Cargo.lock b/Cargo.lock index 18f238b..df548da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1111,6 +1111,7 @@ dependencies = [ "prometheus", "prost", "serde", + "serde_json", "sqlx", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index a8b9e1d..0a5d2d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ regex = "1.7.1" derive_more = "0.99.17" clap = { version = "4.4.4", features = ["derive"] } serde = { version = "1.0.152", features = ["rc", "serde_derive"] } +serde_json = "1.0.93" tracing = "0.1.37" # logging & tracing api prometheus = { version = "0.13.3", default-features = false } base64 = "0.21.3" @@ -48,7 +49,7 @@ hyper = { version = "1.0.1", features = ["server", "http1"] } # http server http-body-util = "0.1.0" hyper-util = { version = "0.1", features = ["server", "http1", "tokio"] } serde.workspace = true -serde_json = "1.0.93" +serde_json.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml index b61d056..3b77ca0 100644 --- a/crates/lavina-core/Cargo.toml +++ b/crates/lavina-core/Cargo.toml @@ -7,6 +7,7 @@ version.workspace = true anyhow.workspace = true sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } serde.workspace = true +serde_json.workspace = true tokio.workspace = true tracing.workspace = true prometheus.workspace = true diff --git a/crates/lavina-core/src/clustering/mod.rs b/crates/lavina-core/src/clustering/mod.rs index fcd4200..97d68ff 100644 --- a/crates/lavina-core/src/clustering/mod.rs +++ b/crates/lavina-core/src/clustering/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use prost::Message; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::{select, pin, spawn}; @@ -74,6 +74,12 @@ struct ClusterRegistryInner { peer_tasks: AnonTable<(Sender, JoinHandle<()>)>, } +/// Stored in etcd under the node key. +#[derive(Serialize, Deserialize, Debug, Clone)] +struct PeerData { + address: SocketAddr, +} + impl ClusterRegistry { /// Creates a new cluster registry. It contains all currently active peer connections. /// @@ -85,10 +91,29 @@ impl ClusterRegistry { let listener = TcpListener::bind(config.listen_on).await?; let listener_task = start_connection_listener(listener, inner.clone()).await; let mut client = EtcdClient::connect(&[config.etcd_address], None).await?; - let campaign_response = client.election_client().campaign(b"kek", b"my name", 0).await?; - dbg!(campaign_response); - let leader = client.leader("kek").await?; + + // Step 1. Put info about itself into etcd. + tracing::info!("Registering node in etcd"); + let key = format!("/lavina/clusters/{}/nodes/{}", config.cluster_name, config.self_id); + let data = PeerData { address: config.advertised_address.clone() }; + let serialized = serde_json::to_string(&data)?; + client.kv_client().put(key.as_bytes(), serialized, None).await?; + + // Step 2. Campaign for leadership of the cluster. + tracing::info!("Starting the election"); + let election_key = format!("/lavina/clusters/{}/election", config.cluster_name); + let self_key = format!("{}", config.self_id); + dbg!(&self_key); + let election = client.election_client().campaign(election_key.as_bytes(), self_key.clone(), 0).await?; + let leader = election.leader().unwrap(); dbg!(leader); + let lease = leader.lease(); + + loop { + let election = client.election_client().campaign(election_key.as_bytes(), self_key.clone(), lease).await?; + let leader = election.leader().unwrap(); + dbg!(leader); + } Ok(ClusterRegistry {