diff --git a/crates/lavina-core/src/clustering.rs b/crates/lavina-core/src/clustering.rs index 143a72b..d264906 100644 --- a/crates/lavina-core/src/clustering.rs +++ b/crates/lavina-core/src/clustering.rs @@ -39,6 +39,13 @@ pub struct SendMessageReq<'a> { pub created_at: &'a str, } +#[derive(Serialize, Deserialize, Debug)] +pub struct SetRoomTopicReq<'a> { + pub room_id: &'a str, + pub player_id: &'a str, + pub topic: &'a str, +} + impl LavinaClient { pub fn new(addresses: Addresses) -> Self { let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::::new()).build(); @@ -66,4 +73,22 @@ impl LavinaClient { } } } + + pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> Result<()> { + tracing::info!("Setting the topic of a room on a remote node"); + let Some(address) = self.addresses.get(node_id as usize) else { + tracing::error!("Failed"); + return Err(anyhow!("Unknown node")); + }; + match self.client.post(format!("http://{}/cluster/rooms/set_topic", address)).json(&req).send().await { + Ok(_) => { + tracing::info!("Room topic set"); + Ok(()) + } + Err(e) => { + tracing::error!("Failed to set room topic: {e:?}"); + Err(e.into()) + } + } + } } diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 12a96d6..0ab8e09 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -17,7 +17,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use tracing::{Instrument, Span}; -use crate::clustering::{ClusterMetadata, LavinaClient, SendMessageReq}; +use crate::clustering::{ClusterMetadata, LavinaClient, SendMessageReq, SetRoomTopicReq}; use crate::dialog::DialogRegistry; use crate::prelude::*; use crate::repo::Storage; @@ -60,7 +60,7 @@ pub struct PlayerConnection { player_handle: PlayerHandle, } impl PlayerConnection { - /// Handled in [Player::send_message]. + /// Handled in [Player::send_room_message]. #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_message")] pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result { let (promise, deferred) = oneshot(); @@ -78,7 +78,7 @@ impl PlayerConnection { Ok(deferred.await?) } - /// Handled in [Player::change_topic]. + /// Handled in [Player::change_room_topic]. #[tracing::instrument(skip(self, new_topic), name = "PlayerConnection::change_topic")] pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { let (promise, deferred) = oneshot(); @@ -413,23 +413,31 @@ impl Player { (handle_clone, fiber) } + fn room_location(&self, room_id: &RoomId) -> Option { + let node = match &**room_id.as_inner() { + "aaaaa" => self.cluster_metadata.test_owner, + "test" => self.cluster_metadata.test2_owner, + _ => self.cluster_metadata.main_owner, + }; + if node == self.cluster_metadata.node_id { + None + } else { + Some(node) + } + } + async fn main_loop(mut self) -> Self { let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap(); for room_id in rooms { - let node = match &**room_id.as_inner() { - "aaaaa" => self.cluster_metadata.test_owner, - "test" => self.cluster_metadata.test2_owner, - _ => self.cluster_metadata.main_owner, - }; - if node == self.cluster_metadata.node_id { + if let Some(remote_node) = self.room_location(&room_id) { + self.my_rooms.insert(room_id, RoomRef::Remote { node_id: remote_node }); + } else { let room = self.rooms.get_room(&room_id).await; if let Some(room) = room { self.my_rooms.insert(room_id, RoomRef::Local(room)); } else { tracing::error!("Room #{room_id:?} not found"); } - } else { - self.my_rooms.insert(room_id, RoomRef::Remote { node_id: node }); } } while let Some(cmd) = self.rx.recv().await { @@ -506,7 +514,7 @@ impl Player { let _ = promise.send(()); } ClientCommand::SendMessage { room_id, body, promise } => { - let result = self.send_message(connection_id, room_id, body).await; + let result = self.send_room_message(connection_id, room_id, body).await; let _ = promise.send(result); } ClientCommand::ChangeTopic { @@ -514,7 +522,7 @@ impl Player { new_topic, promise, } => { - self.change_topic(connection_id, room_id, new_topic).await; + self.change_room_topic(connection_id, room_id, new_topic).await; let _ = promise.send(()); } ClientCommand::GetRooms { promise } => { @@ -541,24 +549,27 @@ impl Player { return JoinResult::AlreadyJoined; } - let room = match self.rooms.get_or_create_room(room_id.clone()).await { - Ok(room) => room, - Err(e) => { - log::error!("Failed to get or create room: {e}"); - todo!(); - } - }; - room.add_member(&self.player_id, self.storage_id).await; - room.subscribe(&self.player_id, self.handle.clone()).await; - // self.my_rooms.insert(room_id.clone(), room.clone()); - panic!(); - let room_info = room.get_room_info().await; - let update = Updates::RoomJoined { - room_id, - new_member_id: self.player_id.clone(), - }; - self.broadcast_update(update, connection_id).await; - JoinResult::Success(room_info) + if let Some(remote_node) = self.room_location(&room_id) { + todo!() + } else { + let room = match self.rooms.get_or_create_room(room_id.clone()).await { + Ok(room) => room, + Err(e) => { + log::error!("Failed to get or create room: {e}"); + todo!(); + } + }; + room.add_member(&self.player_id, self.storage_id).await; + room.subscribe(&self.player_id, self.handle.clone()).await; + self.my_rooms.insert(room_id.clone(), RoomRef::Local(room.clone())); + let room_info = room.get_room_info().await; + let update = Updates::RoomJoined { + room_id, + new_member_id: self.player_id.clone(), + }; + self.broadcast_update(update, connection_id).await; + JoinResult::Success(room_info) + } } #[tracing::instrument(skip(self), name = "Player::leave_room")] @@ -576,8 +587,13 @@ impl Player { self.broadcast_update(update, connection_id).await; } - #[tracing::instrument(skip(self, body), name = "Player::send_message")] - async fn send_message(&mut self, connection_id: ConnectionId, room_id: RoomId, body: Str) -> SendMessageResult { + #[tracing::instrument(skip(self, body), name = "Player::send_room_message")] + async fn send_room_message( + &mut self, + connection_id: ConnectionId, + room_id: RoomId, + body: Str, + ) -> SendMessageResult { let Some(room) = self.my_rooms.get(&room_id) else { tracing::info!("no room found"); return SendMessageResult::NoSuchRoom; @@ -607,14 +623,25 @@ impl Player { SendMessageResult::Success(created_at) } - #[tracing::instrument(skip(self, new_topic), name = "Player::change_topic")] - async fn change_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { + #[tracing::instrument(skip(self, new_topic), name = "Player::change_room_topic")] + async fn change_room_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { let Some(room) = self.my_rooms.get(&room_id) else { tracing::info!("no room found"); return; }; - // room.set_topic(&self.player_id, new_topic.clone()).await; - todo!(); + match room { + RoomRef::Local(room) => { + room.set_topic(&self.player_id, new_topic.clone()).await; + } + RoomRef::Remote { node_id } => { + let req = SetRoomTopicReq { + room_id: room_id.as_inner(), + player_id: self.player_id.as_inner(), + topic: &*new_topic, + }; + self.cluster_client.set_room_topic(*node_id, req).await.unwrap(); + } + } let update = Updates::RoomTopicChanged { room_id, new_topic }; self.broadcast_update(update, connection_id).await; }