diff --git a/.gitignore b/.gitignore index 75d301e..2ee75f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ /target -/db.sqlite +*.sqlite .idea/ .DS_Store diff --git a/Cargo.lock b/Cargo.lock index f3bbd73..eeb7143 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -204,7 +204,7 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.28", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -709,8 +709,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1097,8 +1099,13 @@ dependencies = [ "anyhow", "argon2", "chrono", + "mgmt-api", + "opentelemetry", "prometheus", "rand_core", + "reqwest", + "reqwest-middleware", + "reqwest-tracing", "serde", "sqlx", "tokio", @@ -1165,6 +1172,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "540f1c43aed89909c0cc0cc604e3bb2f7e7a341a3728a9e6cfe760e733cd11ed" + [[package]] name = "md-5" version = "0.10.6" @@ -1764,9 +1777,9 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "reqwest" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e6cc1e89e689536eb5aeede61520e874df5a4707df811cd5da4aa5fbb2aae19" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" dependencies = [ "base64 0.22.0", "bytes", @@ -1797,6 +1810,39 @@ dependencies = [ "winreg", ] +[[package]] +name = "reqwest-middleware" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0209efb52486ad88136190094ee214759ef7507068b27992256ed6610eb71a01" +dependencies = [ + "anyhow", + "async-trait", + "http 1.1.0", + "reqwest", + "serde", + "thiserror", + "tower-service", +] + +[[package]] +name = "reqwest-tracing" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b253954a1979e02eabccd7e9c3d61d8f86576108baa160775e7f160bb4e800a3" +dependencies = [ + "anyhow", + "async-trait", + "getrandom", + "http 1.1.0", + "matchit 0.8.2", + "opentelemetry", + "reqwest", + "reqwest-middleware", + "tracing", + "tracing-opentelemetry", +] + [[package]] name = "ring" version = "0.17.8" diff --git a/Cargo.toml b/Cargo.toml index 8e026da..a339b56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ lavina-core = { path = "crates/lavina-core" } tracing-subscriber = "0.3.16" sasl = { path = "crates/sasl" } chrono = "0.4.37" +reqwest = { version = "0.12.0", default-features = false, features = ["json"] } [package] name = "lavina" @@ -69,4 +70,4 @@ chrono.workspace = true [dev-dependencies] assert_matches.workspace = true regex = "1.7.1" -reqwest = { version = "0.12.0", default-features = false } +reqwest.workspace = true diff --git a/config.0.toml b/config.0.toml new file mode 100644 index 0000000..183a6b8 --- /dev/null +++ b/config.0.toml @@ -0,0 +1,30 @@ +[telemetry] +listen_on = "127.0.0.1:8080" + +[irc] +listen_on = "127.0.0.1:6667" +server_name = "irc.localhost" + +[xmpp] +listen_on = "127.0.0.1:5222" +cert = "./certs/xmpp.pem" +key = "./certs/xmpp.key" +hostname = "localhost" + +[storage] +db_path = "db.0.sqlite" + +[cluster] +addresses = [ + "127.0.0.1:8080", + "127.0.0.1:8081", +] + +[cluster.metadata] +node_id = 0 +main_owner = 0 +rooms = { aaaaa = 1, test = 0 } + +[tracing] +endpoint = "http://localhost:4317" +service_name = "lavina-0" diff --git a/config.1.toml b/config.1.toml new file mode 100644 index 0000000..cf4a0f9 --- /dev/null +++ b/config.1.toml @@ -0,0 +1,30 @@ +[telemetry] +listen_on = "127.0.0.1:8081" + +[irc] +listen_on = "127.0.0.1:6668" +server_name = "irc.localhost" + +[xmpp] +listen_on = "127.0.0.1:5223" +cert = "./certs/xmpp.pem" +key = "./certs/xmpp.key" +hostname = "localhost" + +[storage] +db_path = "db.1.sqlite" + +[cluster] +addresses = [ + "127.0.0.1:8080", + "127.0.0.1:8081", +] + +[cluster.metadata] +node_id = 1 +main_owner = 0 +rooms = { aaaaa = 1, test = 0 } + +[tracing] +endpoint = "http://localhost:4317" +service_name = "lavina-1" diff --git a/config.toml b/config.toml index 4765aa0..b4c9926 100644 --- a/config.toml +++ b/config.toml @@ -13,3 +13,11 @@ hostname = "localhost" [storage] db_path = "db.sqlite" + +[cluster] +addresses = [] + +[cluster.metadata] +node_id = 0 +main_owner = 0 +rooms = {} diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml index ab26daf..c212d4f 100644 --- a/crates/lavina-core/Cargo.toml +++ b/crates/lavina-core/Cargo.toml @@ -13,3 +13,8 @@ prometheus.workspace = true chrono.workspace = true argon2 = { version = "0.5.3" } rand_core = { version = "0.6.4", features = ["getrandom"] } +reqwest.workspace = true +reqwest-middleware = { version = "0.3", features = ["json"] } +opentelemetry = "0.22.0" +mgmt-api = { path = "../mgmt-api" } +reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_22"] } diff --git a/crates/lavina-core/src/clustering.rs b/crates/lavina-core/src/clustering.rs new file mode 100644 index 0000000..6431a27 --- /dev/null +++ b/crates/lavina-core/src/clustering.rs @@ -0,0 +1,60 @@ +use anyhow::{anyhow, Result}; +use reqwest::Client; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_tracing::{DefaultSpanBackend, TracingMiddleware}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; + +pub mod broadcast; +pub mod room; + +type Addresses = Vec; + +#[derive(Deserialize, Debug, Clone)] +pub struct ClusterConfig { + pub metadata: ClusterMetadata, + pub addresses: Addresses, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct ClusterMetadata { + /// The node id of the current node. + pub node_id: u32, + /// Owns all rooms in the cluster except the ones specified in `rooms`. + pub main_owner: u32, + pub rooms: HashMap, +} + +#[derive(Clone)] +pub struct LavinaClient { + addresses: Arc, + client: ClientWithMiddleware, +} + +impl LavinaClient { + pub fn new(addresses: Addresses) -> Self { + let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::::new()).build(); + Self { + addresses: Arc::new(addresses), + client, + } + } + + async fn send_request(&self, node_id: u32, path: &str, req: impl Serialize) -> Result<()> { + let Some(address) = self.addresses.get(node_id as usize) else { + return Err(anyhow!("Unknown node")); + }; + match self.client.post(format!("http://{}{}", address, path)).json(&req).send().await { + Ok(res) => { + if res.status().is_server_error() || res.status().is_client_error() { + tracing::error!("Cluster request failed: {:?}", res); + return Err(anyhow!("Server error")); + } + Ok(()) + } + Err(e) => Err(e.into()), + } + } +} diff --git a/crates/lavina-core/src/clustering/broadcast.rs b/crates/lavina-core/src/clustering/broadcast.rs new file mode 100644 index 0000000..8e9301b --- /dev/null +++ b/crates/lavina-core/src/clustering/broadcast.rs @@ -0,0 +1,62 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use tokio::sync::Mutex; + +use crate::player::{PlayerId, PlayerRegistry, Updates}; +use crate::prelude::Str; +use crate::room::RoomId; + +/// Receives updates from other nodes and broadcasts them to local player actors. +struct BroadcastingInner { + subscriptions: HashMap>, +} + +impl Broadcasting {} + +#[derive(Clone)] +pub struct Broadcasting(Arc>); +impl Broadcasting { + pub fn new() -> Self { + let inner = BroadcastingInner { + subscriptions: HashMap::new(), + }; + Self(Arc::new(Mutex::new(inner))) + } + + /// Broadcasts the given update to subscribed player actors on local node. + #[tracing::instrument(skip(self, players, message, created_at), name = "Broadcasting::broadcast")] + pub async fn broadcast( + &self, + players: &PlayerRegistry, + room_id: RoomId, + author_id: PlayerId, + message: Str, + created_at: DateTime, + ) { + let inner = self.0.lock().await; + let Some(subscribers) = inner.subscriptions.get(&room_id) else { + return; + }; + let update = Updates::NewMessage { + room_id: room_id.clone(), + author_id: author_id.clone(), + body: message.clone(), + created_at: created_at.clone(), + }; + for i in subscribers { + if i == &author_id { + continue; + } + let Some(player) = players.get_player(i).await else { + continue; + }; + player.update(update.clone()).await; + } + } + + pub async fn subscribe(&self, subscriber: PlayerId, room_id: RoomId) { + self.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber); + } +} diff --git a/crates/lavina-core/src/clustering/room.rs b/crates/lavina-core/src/clustering/room.rs new file mode 100644 index 0000000..dff5fa3 --- /dev/null +++ b/crates/lavina-core/src/clustering/room.rs @@ -0,0 +1,59 @@ +use serde::{Deserialize, Serialize}; + +use crate::clustering::LavinaClient; + +pub mod paths { + pub const JOIN: &'static str = "/cluster/rooms/join"; + pub const LEAVE: &'static str = "/cluster/rooms/leave"; + pub const ADD_MESSAGE: &'static str = "/cluster/rooms/add_message"; + pub const SET_TOPIC: &'static str = "/cluster/rooms/set_topic"; +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct JoinRoomReq<'a> { + pub room_id: &'a str, + pub player_id: &'a str, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct LeaveRoomReq<'a> { + pub room_id: &'a str, + pub player_id: &'a str, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SendMessageReq<'a> { + pub room_id: &'a str, + pub player_id: &'a str, + pub message: &'a str, + pub created_at: &'a str, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SetRoomTopicReq<'a> { + pub room_id: &'a str, + pub player_id: &'a str, + pub topic: &'a str, +} + +impl LavinaClient { + #[tracing::instrument(skip(self, req), name = "LavinaClient::join_room")] + pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> anyhow::Result<()> { + self.send_request(node_id, paths::JOIN, req).await + } + + #[tracing::instrument(skip(self, req), name = "LavinaClient::leave_room")] + pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> anyhow::Result<()> { + self.send_request(node_id, paths::LEAVE, req).await + } + + #[tracing::instrument(skip(self, req), name = "LavinaClient::send_room_message")] + pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> anyhow::Result<()> { + self.send_request(node_id, paths::ADD_MESSAGE, req).await + } + + #[tracing::instrument(skip(self, req), name = "LavinaClient::set_room_topic")] + pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> anyhow::Result<()> { + self.send_request(node_id, paths::SET_TOPIC, req).await + } +} diff --git a/crates/lavina-core/src/lib.rs b/crates/lavina-core/src/lib.rs index 3ea0b33..f5df5e6 100644 --- a/crates/lavina-core/src/lib.rs +++ b/crates/lavina-core/src/lib.rs @@ -1,14 +1,19 @@ //! Domain definitions and implementation of common chat logic. +use std::sync::Arc; + use anyhow::Result; use prometheus::Registry as MetricsRegistry; use crate::auth::Authenticator; +use crate::clustering::broadcast::Broadcasting; +use crate::clustering::{ClusterConfig, LavinaClient}; use crate::dialog::DialogRegistry; use crate::player::PlayerRegistry; use crate::repo::Storage; use crate::room::RoomRegistry; pub mod auth; +pub mod clustering; pub mod dialog; pub mod player; pub mod prelude; @@ -23,21 +28,37 @@ pub struct LavinaCore { pub players: PlayerRegistry, pub rooms: RoomRegistry, pub dialogs: DialogRegistry, + pub broadcasting: Broadcasting, pub authenticator: Authenticator, } impl LavinaCore { - pub async fn new(mut metrics: MetricsRegistry, storage: Storage) -> Result { + pub async fn new( + mut metrics: MetricsRegistry, + cluster_config: ClusterConfig, + storage: Storage, + ) -> Result { // TODO shutdown all services in reverse order on error + let broadcasting = Broadcasting::new(); + let client = LavinaClient::new(cluster_config.addresses.clone()); let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let dialogs = DialogRegistry::new(storage.clone()); - let players = PlayerRegistry::empty(rooms.clone(), dialogs.clone(), storage.clone(), &mut metrics)?; + let players = PlayerRegistry::empty( + rooms.clone(), + dialogs.clone(), + storage.clone(), + &mut metrics, + Arc::new(cluster_config.metadata), + client, + broadcasting.clone(), + )?; dialogs.set_players(players.clone()).await; let authenticator = Authenticator::new(storage.clone()); Ok(LavinaCore { players, rooms, dialogs, + broadcasting, authenticator, }) } diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 21914c4..872e8bc 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -18,6 +18,9 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use tracing::{Instrument, Span}; +use crate::clustering::broadcast::Broadcasting; +use crate::clustering::room::*; +use crate::clustering::{ClusterMetadata, LavinaClient}; use crate::dialog::DialogRegistry; use crate::prelude::*; use crate::repo::Storage; @@ -60,7 +63,7 @@ pub struct PlayerConnection { player_handle: PlayerHandle, } impl PlayerConnection { - /// Handled in [Player::send_message]. + /// Handled in [Player::send_room_message]. #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_message")] pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result { let (promise, deferred) = oneshot(); @@ -78,7 +81,7 @@ impl PlayerConnection { Ok(deferred.await?) } - /// Handled in [Player::change_topic]. + /// Handled in [Player::change_room_topic]. #[tracing::instrument(skip(self, new_topic), name = "PlayerConnection::change_topic")] pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { let (promise, deferred) = oneshot(); @@ -272,6 +275,9 @@ impl PlayerRegistry { dialogs: DialogRegistry, storage: Storage, metrics: &mut MetricsRegistry, + cluster_metadata: Arc, + cluster_client: LavinaClient, + broadcasting: Broadcasting, ) -> Result { let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; metrics.register(Box::new(metric_active_players.clone()))?; @@ -279,6 +285,9 @@ impl PlayerRegistry { room_registry, dialogs, storage, + cluster_metadata, + cluster_client, + broadcasting, players: HashMap::new(), metric_active_players, }; @@ -328,8 +337,12 @@ impl PlayerRegistry { } else { let (handle, fiber) = Player::launch( id.clone(), + self.clone(), inner.room_registry.clone(), inner.dialogs.clone(), + inner.cluster_metadata.clone(), + inner.cluster_client.clone(), + inner.broadcasting.clone(), inner.storage.clone(), ) .await; @@ -364,29 +377,45 @@ struct PlayerRegistryInner { room_registry: RoomRegistry, dialogs: DialogRegistry, storage: Storage, + cluster_metadata: Arc, + cluster_client: LavinaClient, + broadcasting: Broadcasting, /// Active player actors. players: HashMap)>, metric_active_players: IntGauge, } +enum RoomRef { + Local(RoomHandle), + Remote { node_id: u32 }, +} + /// Player actor inner state representation. struct Player { player_id: PlayerId, storage_id: u32, connections: AnonTable>, - my_rooms: HashMap, + my_rooms: HashMap, banned_from: HashSet, rx: Receiver<(ActorCommand, Span)>, handle: PlayerHandle, + players: PlayerRegistry, rooms: RoomRegistry, dialogs: DialogRegistry, storage: Storage, + cluster_metadata: Arc, + cluster_client: LavinaClient, + broadcasting: Broadcasting, } impl Player { async fn launch( player_id: PlayerId, + players: PlayerRegistry, rooms: RoomRegistry, dialogs: DialogRegistry, + cluster_metadata: Arc, + cluster_client: LavinaClient, + broadcasting: Broadcasting, storage: Storage, ) -> (PlayerHandle, JoinHandle) { let (tx, rx) = channel(32); @@ -404,22 +433,41 @@ impl Player { banned_from: HashSet::new(), rx, handle, + players, rooms, dialogs, storage, + cluster_metadata, + cluster_client, + broadcasting, }; let fiber = tokio::task::spawn(player.main_loop()); (handle_clone, fiber) } + fn room_location(&self, room_id: &RoomId) -> Option { + let res = self.cluster_metadata.rooms.get(room_id.as_inner().as_ref()).copied(); + let node = res.unwrap_or(self.cluster_metadata.main_owner); + if node == self.cluster_metadata.node_id { + None + } else { + Some(node) + } + } + async fn main_loop(mut self) -> Self { let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap(); for room_id in rooms { - let room = self.rooms.get_room(&room_id).await; - if let Some(room) = room { - self.my_rooms.insert(room_id, room); + if let Some(remote_node) = self.room_location(&room_id) { + self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node }); + self.broadcasting.subscribe(self.player_id.clone(), room_id).await; } else { - tracing::error!("Room #{room_id:?} not found"); + let room = self.rooms.get_room(&room_id).await; + if let Some(room) = room { + self.my_rooms.insert(room_id, RoomRef::Local(room)); + } else { + tracing::error!("Room #{room_id:?} not found"); + } } } while let Some(cmd) = self.rx.recv().await { @@ -496,7 +544,7 @@ impl Player { let _ = promise.send(()); } ClientCommand::SendMessage { room_id, body, promise } => { - let result = self.send_message(connection_id, room_id, body).await; + let result = self.send_room_message(connection_id, room_id, body).await; let _ = promise.send(result); } ClientCommand::ChangeTopic { @@ -504,7 +552,7 @@ impl Player { new_topic, promise, } => { - self.change_topic(connection_id, room_id, new_topic).await; + self.change_room_topic(connection_id, room_id, new_topic).await; let _ = promise.send(()); } ClientCommand::GetRooms { promise } => { @@ -535,31 +583,61 @@ impl Player { return JoinResult::AlreadyJoined; } - let room = match self.rooms.get_or_create_room(room_id.clone()).await { - Ok(room) => room, - Err(e) => { - log::error!("Failed to get or create room: {e}"); - todo!(); - } - }; - room.add_member(&self.player_id, self.storage_id).await; - room.subscribe(&self.player_id, self.handle.clone()).await; - self.my_rooms.insert(room_id.clone(), room.clone()); - let room_info = room.get_room_info().await; - let update = Updates::RoomJoined { - room_id, - new_member_id: self.player_id.clone(), - }; - self.broadcast_update(update, connection_id).await; - JoinResult::Success(room_info) + if let Some(remote_node) = self.room_location(&room_id) { + let req = JoinRoomReq { + room_id: room_id.as_inner(), + player_id: self.player_id.as_inner(), + }; + self.cluster_client.join_room(remote_node, req).await.unwrap(); + let room_storage_id = self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap(); + self.storage.add_room_member(room_storage_id, self.storage_id).await.unwrap(); + self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node }); + JoinResult::Success(RoomInfo { + id: room_id, + topic: "unknown".into(), + members: vec![], + }) + } else { + let room = match self.rooms.get_or_create_room(room_id.clone()).await { + Ok(room) => room, + Err(e) => { + log::error!("Failed to get or create room: {e}"); + todo!(); + } + }; + room.add_member(&self.player_id, self.storage_id).await; + room.subscribe(&self.player_id, self.handle.clone()).await; + self.my_rooms.insert(room_id.clone(), RoomRef::Local(room.clone())); + let room_info = room.get_room_info().await; + let update = Updates::RoomJoined { + room_id, + new_member_id: self.player_id.clone(), + }; + self.broadcast_update(update, connection_id).await; + JoinResult::Success(room_info) + } } #[tracing::instrument(skip(self), name = "Player::leave_room")] async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) { let room = self.my_rooms.remove(&room_id); if let Some(room) = room { - room.unsubscribe(&self.player_id).await; - room.remove_member(&self.player_id, self.storage_id).await; + match room { + RoomRef::Local(room) => { + room.unsubscribe(&self.player_id).await; + room.remove_member(&self.player_id, self.storage_id).await; + } + RoomRef::Remote { node_id } => { + let req = LeaveRoomReq { + room_id: room_id.as_inner(), + player_id: self.player_id.as_inner(), + }; + self.cluster_client.leave_room(node_id, req).await.unwrap(); + let room_storage_id = + self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap(); + self.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap(); + } + } } let update = Updates::RoomLeft { room_id, @@ -568,14 +646,41 @@ impl Player { self.broadcast_update(update, connection_id).await; } - #[tracing::instrument(skip(self, body), name = "Player::send_message")] - async fn send_message(&mut self, connection_id: ConnectionId, room_id: RoomId, body: Str) -> SendMessageResult { + #[tracing::instrument(skip(self, body), name = "Player::send_room_message")] + async fn send_room_message( + &mut self, + connection_id: ConnectionId, + room_id: RoomId, + body: Str, + ) -> SendMessageResult { let Some(room) = self.my_rooms.get(&room_id) else { tracing::info!("no room found"); return SendMessageResult::NoSuchRoom; }; let created_at = chrono::Utc::now(); - room.send_message(&self.player_id, body.clone(), created_at.clone()).await; + match room { + RoomRef::Local(room) => { + room.send_message(&self.player_id, body.clone(), created_at.clone()).await; + } + RoomRef::Remote { node_id } => { + let req = SendMessageReq { + room_id: room_id.as_inner(), + player_id: self.player_id.as_inner(), + message: &*body, + created_at: &*created_at.to_rfc3339(), + }; + self.cluster_client.send_room_message(*node_id, req).await.unwrap(); + self.broadcasting + .broadcast( + &self.players, + room_id.clone(), + self.player_id.clone(), + body.clone(), + created_at.clone(), + ) + .await; + } + } let update = Updates::NewMessage { room_id, author_id: self.player_id.clone(), @@ -586,13 +691,25 @@ impl Player { SendMessageResult::Success(created_at) } - #[tracing::instrument(skip(self, new_topic), name = "Player::change_topic")] - async fn change_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { + #[tracing::instrument(skip(self, new_topic), name = "Player::change_room_topic")] + async fn change_room_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { let Some(room) = self.my_rooms.get(&room_id) else { tracing::info!("no room found"); return; }; - room.set_topic(&self.player_id, new_topic.clone()).await; + match room { + RoomRef::Local(room) => { + room.set_topic(&self.player_id, new_topic.clone()).await; + } + RoomRef::Remote { node_id } => { + let req = SetRoomTopicReq { + room_id: room_id.as_inner(), + player_id: self.player_id.as_inner(), + topic: &*new_topic, + }; + self.cluster_client.set_room_topic(*node_id, req).await.unwrap(); + } + } let update = Updates::RoomTopicChanged { room_id, new_topic }; self.broadcast_update(update, connection_id).await; } @@ -600,8 +717,20 @@ impl Player { #[tracing::instrument(skip(self), name = "Player::get_rooms")] async fn get_rooms(&self) -> Vec { let mut response = vec![]; - for (_, handle) in &self.my_rooms { - response.push(handle.get_room_info().await); + for (room_id, handle) in &self.my_rooms { + match handle { + RoomRef::Local(handle) => { + response.push(handle.get_room_info().await); + } + RoomRef::Remote { .. } => { + let room_info = RoomInfo { + id: room_id.clone(), + topic: "unknown".into(), + members: vec![], + }; + response.push(room_info); + } + } } response } diff --git a/crates/lavina-core/src/repo/room.rs b/crates/lavina-core/src/repo/room.rs index 8d41a79..95a4de6 100644 --- a/crates/lavina-core/src/repo/room.rs +++ b/crates/lavina-core/src/repo/room.rs @@ -69,4 +69,20 @@ impl Storage { Ok(()) } + + pub async fn create_or_retrieve_room_id_by_name(&self, name: &str) -> Result { + // TODO we don't need any info except the name on non-owning nodes, should remove stubs here + let mut executor = self.conn.lock().await; + let res: (u32,) = sqlx::query_as( + "insert into rooms(name, topic) + values (?, '') + on conflict(name) do update set name = excluded.name + returning id;", + ) + .bind(name) + .fetch_one(&mut *executor) + .await?; + + Ok(res.0) + } } diff --git a/crates/lavina-core/src/repo/user.rs b/crates/lavina-core/src/repo/user.rs index d836b8f..a27c245 100644 --- a/crates/lavina-core/src/repo/user.rs +++ b/crates/lavina-core/src/repo/user.rs @@ -14,6 +14,21 @@ impl Storage { Ok(res.map(|(id,)| id)) } + pub async fn create_or_retrieve_user_id_by_name(&self, name: &str) -> Result { + let mut executor = self.conn.lock().await; + let res: (u32,) = sqlx::query_as( + "insert into users(name) + values (?) + on conflict(name) do update set name = excluded.name + returning id;", + ) + .bind(name) + .fetch_one(&mut *executor) + .await?; + + Ok(res.0) + } + pub async fn get_rooms_of_a_user(&self, user_id: u32) -> Result> { let mut executor = self.conn.lock().await; let res: Vec<(String,)> = sqlx::query_as( diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index b7dfa20..bbffd15 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -62,7 +62,7 @@ impl RoomRegistry { } #[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")] - pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result { + pub async fn get_or_create_room(&self, room_id: RoomId) -> Result { let mut inner = self.0.write().await; if let Some(room_handle) = inner.get_or_load_room(&room_id).await? { Ok(room_handle.clone()) diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs index 583b18f..9b29646 100644 --- a/crates/projection-irc/tests/lib.rs +++ b/crates/projection-irc/tests/lib.rs @@ -9,6 +9,7 @@ use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::TcpStream; use lavina_core::auth::Authenticator; +use lavina_core::clustering::{ClusterConfig, ClusterMetadata}; use lavina_core::player::{JoinResult, PlayerId, SendMessageResult}; use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::room::RoomId; @@ -118,7 +119,15 @@ impl TestServer { db_path: ":memory:".into(), }) .await?; - let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let cluster_config = ClusterConfig { + addresses: vec![], + metadata: ClusterMetadata { + node_id: 0, + main_owner: 0, + rooms: Default::default(), + }, + }; + let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); Ok(TestServer { metrics, @@ -133,6 +142,14 @@ impl TestServer { listen_on: "127.0.0.1:0".parse().unwrap(), server_name: "testserver".into(), }; + let cluster_config = ClusterConfig { + addresses: vec![], + metadata: ClusterMetadata { + node_id: 0, + main_owner: 0, + rooms: Default::default(), + }, + }; let TestServer { metrics: _, storage, @@ -142,7 +159,7 @@ impl TestServer { server.terminate().await?; core.shutdown().await?; let metrics = MetricsRegistry::new(); - let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); Ok(TestServer { metrics, diff --git a/crates/projection-xmpp/src/testkit.rs b/crates/projection-xmpp/src/testkit.rs index 3002c30..cdc078d 100644 --- a/crates/projection-xmpp/src/testkit.rs +++ b/crates/projection-xmpp/src/testkit.rs @@ -1,8 +1,10 @@ +use prometheus::Registry as MetricsRegistry; + use crate::{Authenticated, XmppConnection}; -use lavina_core::player::{PlayerConnection, PlayerId}; +use lavina_core::clustering::{ClusterConfig, ClusterMetadata}; +use lavina_core::player::PlayerConnection; use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::LavinaCore; -use prometheus::Registry as MetricsRegistry; use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server}; pub(crate) struct TestServer { @@ -19,7 +21,15 @@ impl TestServer { db_path: ":memory:".into(), }) .await?; - let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let cluster_config = ClusterConfig { + metadata: ClusterMetadata { + node_id: 0, + main_owner: 0, + rooms: Default::default(), + }, + addresses: vec![], + }; + let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; Ok(TestServer { metrics, storage, core }) } @@ -27,7 +37,15 @@ impl TestServer { self.core.shutdown().await?; let metrics = MetricsRegistry::new(); - let core = LavinaCore::new(metrics.clone(), self.storage.clone()).await?; + let cluster_config = ClusterConfig { + metadata: ClusterMetadata { + node_id: 0, + main_owner: 0, + rooms: Default::default(), + }, + addresses: vec![], + }; + let core = LavinaCore::new(metrics.clone(), cluster_config, self.storage.clone()).await?; Ok(TestServer { metrics, diff --git a/crates/projection-xmpp/tests/lib.rs b/crates/projection-xmpp/tests/lib.rs index 29e0a69..411737c 100644 --- a/crates/projection-xmpp/tests/lib.rs +++ b/crates/projection-xmpp/tests/lib.rs @@ -19,6 +19,7 @@ use tokio_rustls::rustls::{ClientConfig, ServerName}; use tokio_rustls::TlsConnector; use lavina_core::auth::Authenticator; +use lavina_core::clustering::{ClusterConfig, ClusterMetadata}; use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::LavinaCore; use projection_xmpp::{launch, RunningServer, ServerConfig}; @@ -161,7 +162,15 @@ impl TestServer { db_path: ":memory:".into(), }) .await?; - let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let cluster_config = ClusterConfig { + addresses: vec![], + metadata: ClusterMetadata { + node_id: 0, + main_owner: 0, + rooms: Default::default(), + }, + }; + let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); Ok(TestServer { metrics, diff --git a/src/http.rs b/src/http.rs index 1d24c14..4d36181 100644 --- a/src/http.rs +++ b/src/http.rs @@ -19,9 +19,10 @@ use lavina_core::repo::Storage; use lavina_core::room::{RoomId, RoomRegistry}; use lavina_core::terminator::Terminator; use lavina_core::LavinaCore; - use mgmt_api::*; +mod clustering; + type HttpResult = std::result::Result; #[derive(Deserialize, Debug)] @@ -90,7 +91,7 @@ async fn route( (&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, core).await.or5xx(), (&Method::POST, rooms::paths::SEND_MESSAGE) => endpoint_send_room_message(request, core).await.or5xx(), (&Method::POST, rooms::paths::SET_TOPIC) => endpoint_set_room_topic(request, core).await.or5xx(), - _ => endpoint_not_found(), + _ => clustering::route(core, storage, request).await.unwrap_or_else(endpoint_not_found), }; Ok(res) } diff --git a/src/http/clustering.rs b/src/http/clustering.rs new file mode 100644 index 0000000..0314dd6 --- /dev/null +++ b/src/http/clustering.rs @@ -0,0 +1,80 @@ +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::{Method, Request, Response}; + +use super::Or5xx; +use crate::http::{empty_204_request, malformed_request, player_not_found, room_not_found}; +use lavina_core::clustering::room::{paths, JoinRoomReq, SendMessageReq}; +use lavina_core::player::PlayerId; +use lavina_core::repo::Storage; +use lavina_core::room::RoomId; +use lavina_core::LavinaCore; + +// TODO move this into core + +pub async fn route( + core: &LavinaCore, + storage: &Storage, + request: Request, +) -> Option>> { + match (request.method(), request.uri().path()) { + (&Method::POST, paths::JOIN) => Some(endpoint_cluster_join_room(request, core, storage).await.or5xx()), + (&Method::POST, paths::ADD_MESSAGE) => Some(endpoint_cluster_add_message(request, core).await.or5xx()), + _ => None, + } +} + +#[tracing::instrument(skip_all, name = "endpoint_cluster_join_room")] +async fn endpoint_cluster_join_room( + request: Request, + core: &LavinaCore, + storage: &Storage, +) -> lavina_core::prelude::Result>> { + let str = request.collect().await?.to_bytes(); + let Ok(req) = serde_json::from_slice::(&str[..]) else { + return Ok(malformed_request()); + }; + tracing::info!("Incoming request: {:?}", &req); + let Ok(room_id) = RoomId::from(req.room_id) else { + dbg!(&req.room_id); + return Ok(room_not_found()); + }; + let Ok(player_id) = PlayerId::from(req.player_id) else { + dbg!(&req.player_id); + return Ok(player_not_found()); + }; + let room_handle = core.rooms.get_or_create_room(room_id).await.unwrap(); + let storage_id = storage.create_or_retrieve_user_id_by_name(req.player_id).await?; + room_handle.add_member(&player_id, storage_id).await; + Ok(empty_204_request()) +} + +#[tracing::instrument(skip_all, name = "endpoint_cluster_add_message")] +async fn endpoint_cluster_add_message( + request: Request, + core: &LavinaCore, +) -> lavina_core::prelude::Result>> { + let str = request.collect().await?.to_bytes(); + let Ok(req) = serde_json::from_slice::(&str[..]) else { + return Ok(malformed_request()); + }; + tracing::info!("Incoming request: {:?}", &req); + let Ok(created_at) = chrono::DateTime::parse_from_rfc3339(req.created_at) else { + dbg!(&req.created_at); + return Ok(malformed_request()); + }; + let Ok(room_id) = RoomId::from(req.room_id) else { + dbg!(&req.room_id); + return Ok(room_not_found()); + }; + let Ok(player_id) = PlayerId::from(req.player_id) else { + dbg!(&req.player_id); + return Ok(player_not_found()); + }; + let Some(room_handle) = core.rooms.get_room(&room_id).await else { + dbg!(&room_id); + return Ok(room_not_found()); + }; + room_handle.send_message(&player_id, req.message.into(), created_at.to_utc()).await; + Ok(empty_204_request()) +} diff --git a/src/main.rs b/src/main.rs index 173b683..90a3159 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,8 +6,10 @@ use std::path::Path; use clap::Parser; use figment::providers::Format; use figment::{providers::Toml, Figment}; +use opentelemetry::global::set_text_map_propagator; use opentelemetry::KeyValue; use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{BatchConfig, RandomIdGenerator, Sampler}; use opentelemetry_sdk::{runtime, Resource}; use opentelemetry_semantic_conventions::resource::SERVICE_NAME; @@ -28,6 +30,7 @@ struct ServerConfig { irc: projection_irc::ServerConfig, xmpp: projection_xmpp::ServerConfig, storage: lavina_core::repo::StorageConfig, + cluster: lavina_core::clustering::ClusterConfig, tracing: Option, } @@ -63,11 +66,12 @@ async fn main() -> Result<()> { irc: irc_config, xmpp: xmpp_config, storage: storage_config, + cluster: cluster_config, tracing: _, } = config; let metrics = MetricsRegistry::new(); let storage = Storage::open(storage_config).await?; - let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; + let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?; let telemetry_terminator = http::launch(telemetry_config, metrics.clone(), core.clone(), storage.clone()).await?; let irc = projection_irc::launch(irc_config, core.clone(), metrics.clone()).await?; let xmpp = projection_xmpp::launch(xmpp_config, core.clone(), metrics.clone()).await?; @@ -139,6 +143,7 @@ fn set_up_logging(tracing_config: &Option) -> Result<()> { .with_exporter(trace_exporter) .install_batch(runtime::Tokio)?; let subscriber = subscriber.with(OpenTelemetryLayer::new(tracer)); + set_text_map_propagator(TraceContextPropagator::new()); targets.with_subscriber(subscriber).try_init()?; } else { targets.with_subscriber(subscriber).try_init()?;