forked from lavina/lavina
1
0
Fork 0
This commit is contained in:
Nikita Vilunov 2024-02-02 20:05:18 +01:00
parent 1969901436
commit 9ae45380ae
4 changed files with 33 additions and 5 deletions

1
Cargo.lock generated
View File

@ -1111,6 +1111,7 @@ dependencies = [
"prometheus",
"prost",
"serde",
"serde_json",
"sqlx",
"tokio",
"tracing",

View File

@ -27,6 +27,7 @@ regex = "1.7.1"
derive_more = "0.99.17"
clap = { version = "4.4.4", features = ["derive"] }
serde = { version = "1.0.152", features = ["rc", "serde_derive"] }
serde_json = "1.0.93"
tracing = "0.1.37" # logging & tracing api
prometheus = { version = "0.13.3", default-features = false }
base64 = "0.21.3"
@ -48,7 +49,7 @@ hyper = { version = "1.0.1", features = ["server", "http1"] } # http server
http-body-util = "0.1.0"
hyper-util = { version = "0.1", features = ["server", "http1", "tokio"] }
serde.workspace = true
serde_json = "1.0.93"
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true

View File

@ -7,6 +7,7 @@ version.workspace = true
anyhow.workspace = true
sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] }
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
prometheus.workspace = true

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::Result;
use prost::Message;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::{select, pin, spawn};
@ -74,6 +74,12 @@ struct ClusterRegistryInner {
peer_tasks: AnonTable<(Sender<PeerMsg>, JoinHandle<()>)>,
}
/// Stored in etcd under the node key.
#[derive(Serialize, Deserialize, Debug, Clone)]
struct PeerData {
address: SocketAddr,
}
impl ClusterRegistry {
/// Creates a new cluster registry. It contains all currently active peer connections.
///
@ -85,10 +91,29 @@ impl ClusterRegistry {
let listener = TcpListener::bind(config.listen_on).await?;
let listener_task = start_connection_listener(listener, inner.clone()).await;
let mut client = EtcdClient::connect(&[config.etcd_address], None).await?;
let campaign_response = client.election_client().campaign(b"kek", b"my name", 0).await?;
dbg!(campaign_response);
let leader = client.leader("kek").await?;
// Step 1. Put info about itself into etcd.
tracing::info!("Registering node in etcd");
let key = format!("/lavina/clusters/{}/nodes/{}", config.cluster_name, config.self_id);
let data = PeerData { address: config.advertised_address.clone() };
let serialized = serde_json::to_string(&data)?;
client.kv_client().put(key.as_bytes(), serialized, None).await?;
// Step 2. Campaign for leadership of the cluster.
tracing::info!("Starting the election");
let election_key = format!("/lavina/clusters/{}/election", config.cluster_name);
let self_key = format!("{}", config.self_id);
dbg!(&self_key);
let election = client.election_client().campaign(election_key.as_bytes(), self_key.clone(), 0).await?;
let leader = election.leader().unwrap();
dbg!(leader);
let lease = leader.lease();
loop {
let election = client.election_client().campaign(election_key.as_bytes(), self_key.clone(), lease).await?;
let leader = election.leader().unwrap();
dbg!(leader);
}
Ok(ClusterRegistry {