forked from lavina/lavina
1
0
Fork 0
This commit is contained in:
Nikita Vilunov 2023-12-13 15:57:49 +01:00
parent 3de244cb55
commit a9186451f6
9 changed files with 347 additions and 5 deletions

119
Cargo.lock generated
View File

@ -475,6 +475,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6"
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]] [[package]]
name = "flume" name = "flume"
version = "0.11.0" version = "0.11.0"
@ -854,12 +860,29 @@ version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb"
[[package]]
name = "inner-api"
version = "0.0.2-dev"
dependencies = [
"prost",
"prost-build",
]
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.9.0" version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.12.0" version = "0.12.0"
@ -917,7 +940,9 @@ name = "lavina-core"
version = "0.0.2-dev" version = "0.0.2-dev"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"inner-api",
"prometheus", "prometheus",
"prost",
"serde", "serde",
"sqlx", "sqlx",
"tokio", "tokio",
@ -1033,6 +1058,12 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.27.1" version = "0.27.1"
@ -1216,6 +1247,16 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "petgraph"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.1.3" version = "1.1.3"
@ -1281,6 +1322,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d"
dependencies = [
"proc-macro2",
"syn 2.0.39",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.70" version = "1.0.70"
@ -1357,6 +1408,60 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "prost"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2"
dependencies = [
"bytes",
"heck",
"itertools 0.11.0",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn 2.0.39",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e"
dependencies = [
"anyhow",
"itertools 0.11.0",
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]]
name = "prost-types"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e"
dependencies = [
"prost",
]
[[package]] [[package]]
name = "proto-irc" name = "proto-irc"
version = "0.0.2-dev" version = "0.0.2-dev"
@ -1801,7 +1906,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c"
dependencies = [ dependencies = [
"itertools", "itertools 0.12.0",
"nom", "nom",
"unicode_categories", "unicode_categories",
] ]
@ -2498,6 +2603,18 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix",
]
[[package]] [[package]]
name = "whoami" name = "whoami"
version = "1.4.1" version = "1.4.1"

View File

@ -7,6 +7,7 @@ members = [
"crates/proto-xmpp", "crates/proto-xmpp",
"crates/mgmt-api", "crates/mgmt-api",
"crates/sasl", "crates/sasl",
"crates/inner-api",
] ]
[workspace.package] [workspace.package]
@ -31,6 +32,7 @@ base64 = "0.21.3"
lavina-core = { path = "crates/lavina-core" } lavina-core = { path = "crates/lavina-core" }
tracing-subscriber = "0.3.16" tracing-subscriber = "0.3.16"
sasl = { path = "crates/sasl" } sasl = { path = "crates/sasl" }
prost = "0.12.3"
[package] [package]
name = "lavina" name = "lavina"

View File

@ -17,4 +17,4 @@ db_path = "db.sqlite"
self_id = 1 self_id = 1
listen_on = "0.0.0.0:23732" listen_on = "0.0.0.0:23732"
advertised_address = "127.0.0.1:23732" advertised_address = "127.0.0.1:23732"
bootstrap_via = ["0.0.0.0:23733"] bootstrap_via = []

20
config2.toml Normal file
View File

@ -0,0 +1,20 @@
[telemetry]
listen_on = "127.0.0.1:18080"
[irc]
listen_on = "127.0.0.1:16667"
server_name = "irc.localhost"
[xmpp]
listen_on = "127.0.0.1:15222"
cert = "./certs/xmpp.pem"
key = "./certs/xmpp.key"
[storage]
db_path = "db.sqlite"
[clustering]
self_id = 1
listen_on = "0.0.0.0:23733"
advertised_address = "127.0.0.1:23733"
bootstrap_via = ["0.0.0.0:23732"]

View File

@ -0,0 +1,4 @@
fn main() {
let empty: &[&'_ str] = &[];
prost_build::compile_protos(&["proto/api.proto"], empty).unwrap();
}

View File

@ -0,0 +1,34 @@
syntax = "proto3";
package lavina;
message HandshakeRequest {
uint32 node_id = 1;
string address = 2;
}
message HandshakeResponse {
}
message Request {
uint32 id = 1;
oneof payload {
GetNodes get_nodes = 2;
}
}
message GetNodes {
}
message ResponseNodes {
}
message Response {
uint32 id = 1;
oneof payload {
ResponseNodes nodes = 2;
}
}

View File

@ -10,3 +10,5 @@ serde.workspace = true
tokio.workspace = true tokio.workspace = true
tracing.workspace = true tracing.workspace = true
prometheus.workspace = true prometheus.workspace = true
inner-api = { path = "../inner-api" }
prost.workspace = true

View File

@ -1,7 +1,20 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use prost::Message;
use serde::Deserialize; use serde::Deserialize;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::spawn;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{info, trace, warn};
use inner_api::proto;
use crate::table::{AnonTable};
use crate::terminator::Terminator;
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct ClusteringConfig { pub struct ClusteringConfig {
@ -23,12 +36,159 @@ pub struct ClusteringConfig {
/// ///
/// It maintains the local view of the cluster state, including the set of nodes, /// It maintains the local view of the cluster state, including the set of nodes,
/// the status of each node and the shard allocation metadata. /// the status of each node and the shard allocation metadata.
struct ClusterRegistry {} pub struct ClusterRegistry {
inner: Arc<RwLock<ClusterRegistryInner>>,
listener_task: Terminator,
}
struct ClusterRegistryInner {
peer_tasks: AnonTable<(Sender<PeerMsg>, JoinHandle<()>)>,
}
impl ClusterRegistry { impl ClusterRegistry {
/// Creates a new cluster registry and launches all necessary daemon fibers. /// Creates a new cluster registry. It contains all currently active peer connections.
///
/// Starts a fiber which listens on the given TCP socket for incoming connections, handles incoming handshake and starts a peer task.
pub async fn launch(config: ClusteringConfig) -> Result<ClusterRegistry> { pub async fn launch(config: ClusteringConfig) -> Result<ClusterRegistry> {
todo!() let inner = Arc::new(RwLock::new(ClusterRegistryInner {
peer_tasks: AnonTable::new(),
}));
let listener = TcpListener::bind(config.listen_on).await?;
let listener_task = start_connection_listener(listener, inner.clone()).await;
if let Some(bootstrap) = config.bootstrap_via.first() {
info!("Connecting to the bootstrap server");
let mut stream = TcpStream::connect(bootstrap).await?;
let handshake_request = proto::HandshakeRequest {
node_id: config.self_id,
address: config.advertised_address.to_string(),
};
let mut buf = vec![];
handshake_request.encode(&mut buf).unwrap();
let length = u16::to_be_bytes(buf.len() as u16);
stream.write_all(&length).await?;
stream.write_all(&buf).await?;
stream.flush().await?;
let handshake_response = read_message::<proto::HandshakeResponse>(&mut stream).await?;
info!("Connection to the bootstrap server succeeded");
peer_launch(stream , inner.clone()).await;
}
Ok(ClusterRegistry {
inner,
listener_task,
})
}
pub async fn terminate(self) -> Result<()> {
if let Err(e) = self.listener_task.terminate().await {
warn!("Failed to terminate clustering listener: {e:?}");
}
let mut inner = self.inner.write().await;
for (_, (k, j)) in inner.peer_tasks.into_iter() {
if let Err(e) = k.send(PeerMsg::Close).await {
warn!("Failed to stop peer actor");
}
}
Ok(())
} }
} }
async fn start_connection_listener(listener: TcpListener, cluster: Arc<RwLock<ClusterRegistryInner>>) -> Terminator {
Terminator::spawn(|_| async move {
loop {
let (mut stream, addr) = match listener.accept().await {
Ok(stream) => stream,
Err(e) => {
warn!("Error accepting connection: {}", e);
continue;
}
};
info!("Incoming connection from {addr}");
// We handle the handshaking in order to not block other incoming connections.
let cluster = cluster.clone();
spawn(async move {
// All incoming connections expect the handshake to be initiated by the other peer.
// The peer sends a `HandshakeRequest` message and we respond to it with a `HandshakeResponse` message.
// After that a new task is launched for peer communication, which is the same fiber as the one started when we initiate the connection.
let handshake = read_message::<proto::HandshakeRequest>(&mut stream).await;
let handshake = match handshake {
Ok(handshake) => handshake,
Err(e) => {
warn!("Error reading from socket: {}", e);
return;
}
};
let handshake_response = proto::HandshakeResponse {};
let mut buf = vec![];
handshake_response.encode(&mut buf).unwrap();
stream.flush().await;
if let Err(e) = stream.write_all(&buf).await {
warn!("Failed to reply with HandshakeResponse: {}", e);
} else {
info!("Connection from {addr} accepted");
peer_launch(stream , cluster).await;
}
});
}
})
}
async fn read_message<T: Message + Default>(stream: &mut TcpStream) -> Result<T> {
// All messages received from the stream start with 2 bytes which indicate N -- the length of the message in bytes.
let mut length_buf = [0; 2];
if let Err(e) = stream.read_exact(&mut length_buf).await {
warn!("Error reading length from stream: {}", e);
return Err(e.into());
}
let length = u16::from_be_bytes(length_buf) as usize;
// After that we expect a raw message of length N.
let mut msg_buf = vec![0; length];
if let Err(e) = stream.read_exact(&mut msg_buf).await {
warn!("Error reading message from stream: {e:?}");
return Err(e.into());
}
match <T as Message>::decode(&*msg_buf) {
Ok(message) => {
Ok(message)
}
Err(e) => {
warn!("Invalid message: {e:?}");
Err(e.into())
}
}
}
async fn peer_launch(stream: TcpStream, cluster: Arc<RwLock<ClusterRegistryInner>>) {
let (tx, rx) = channel(32);
let join_handle = spawn(peer_communication_loop(stream, rx));
let mut inner = cluster.write().await;
inner.peer_tasks.insert((tx, join_handle));
}
enum PeerMsg {
Close,
}
/// The main loop which is executed from the peer task. Created from both the incoming and the outgoing socket handlers.
async fn peer_communication_loop(mut stream: TcpStream, rx: Receiver<PeerMsg>) {
loop {
match read_message::<proto::Request>(&mut stream).await {
Ok(req) => {
trace!("Message received: {req:?}");
}
Err(e) => {
warn!("Failed when receiving the next message, closing the socket: {e:?}");
break;
}
}
}
}

View File

@ -12,6 +12,7 @@ use serde::Deserialize;
use lavina_core::player::PlayerRegistry; use lavina_core::player::PlayerRegistry;
use lavina_core::prelude::*; use lavina_core::prelude::*;
use lavina_core::repo::Storage; use lavina_core::repo::Storage;
use lavina_core::clustering::ClusterRegistry;
use lavina_core::room::RoomRegistry; use lavina_core::room::RoomRegistry;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -53,6 +54,7 @@ async fn main() -> Result<()> {
} = config; } = config;
let mut metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
let cluster = ClusterRegistry::launch(clustering_config).await?;
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?;
let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?;
@ -76,6 +78,7 @@ async fn main() -> Result<()> {
players.shutdown_all().await?; players.shutdown_all().await?;
drop(players); drop(players);
drop(rooms); drop(rooms);
cluster.terminate().await?;
storage.close().await?; storage.close().await?;
tracing::info!("Shutdown complete"); tracing::info!("Shutdown complete");
Ok(()) Ok(())