diff --git a/crates/lavina-core/src/clustering/broadcast.rs b/crates/lavina-core/src/clustering/broadcast.rs index 49573dd..2cc50c1 100644 --- a/crates/lavina-core/src/clustering/broadcast.rs +++ b/crates/lavina-core/src/clustering/broadcast.rs @@ -3,9 +3,10 @@ use std::collections::{HashMap, HashSet}; use chrono::{DateTime, Utc}; use tokio::sync::Mutex; -use crate::player::{PlayerId, PlayerRegistry, Updates}; +use crate::player::{PlayerId, Updates}; use crate::prelude::Str; use crate::room::RoomId; +use crate::Services; /// Receives updates from other nodes and broadcasts them to local player actors. struct BroadcastingInner { @@ -20,18 +21,12 @@ impl Broadcasting { }; Self(Mutex::new(inner)) } - +} +impl Services { /// Broadcasts the given update to subscribed player actors on local node. - #[tracing::instrument(skip(self, players, message, created_at), name = "Broadcasting::broadcast")] - pub async fn broadcast( - &self, - players: &PlayerRegistry, - room_id: RoomId, - author_id: PlayerId, - message: Str, - created_at: DateTime, - ) { - let inner = self.0.lock().await; + #[tracing::instrument(skip(self, message, created_at))] + pub async fn broadcast(&self, room_id: RoomId, author_id: PlayerId, message: Str, created_at: DateTime) { + let inner = self.broadcasting.0.lock().await; let Some(subscribers) = inner.subscriptions.get(&room_id) else { return; }; @@ -45,7 +40,7 @@ impl Broadcasting { if i == &author_id { continue; } - let Some(player) = players.get_player(i).await else { + let Some(player) = self.players.get_player(i).await else { continue; }; player.update(update.clone()).await; @@ -53,6 +48,6 @@ impl Broadcasting { } pub async fn subscribe(&self, subscriber: PlayerId, room_id: RoomId) { - self.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber); + self.broadcasting.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber); } } diff --git a/crates/lavina-core/src/clustering/room.rs b/crates/lavina-core/src/clustering/room.rs index 533dac8..fc246e4 100644 --- a/crates/lavina-core/src/clustering/room.rs +++ b/crates/lavina-core/src/clustering/room.rs @@ -43,22 +43,22 @@ pub struct SetRoomTopicReq<'a> { impl LavinaClient { #[tracing::instrument(skip(self, req), name = "LavinaClient::join_room")] - pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> anyhow::Result<()> { + pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> Result<()> { self.send_request(node_id, paths::JOIN, req).await } #[tracing::instrument(skip(self, req), name = "LavinaClient::leave_room")] - pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> anyhow::Result<()> { + pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> Result<()> { self.send_request(node_id, paths::LEAVE, req).await } #[tracing::instrument(skip(self, req), name = "LavinaClient::send_room_message")] - pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> anyhow::Result<()> { + pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> Result<()> { self.send_request(node_id, paths::ADD_MESSAGE, req).await } #[tracing::instrument(skip(self, req), name = "LavinaClient::set_room_topic")] - pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> anyhow::Result<()> { + pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> Result<()> { self.send_request(node_id, paths::SET_TOPIC, req).await } } diff --git a/crates/lavina-core/src/dialog.rs b/crates/lavina-core/src/dialog.rs index 8cf4867..3813a57 100644 --- a/crates/lavina-core/src/dialog.rs +++ b/crates/lavina-core/src/dialog.rs @@ -48,49 +48,46 @@ struct DialogRegistryInner { pub(crate) struct DialogRegistry(AsyncRwLock); -impl DialogRegistry { - pub async fn send_message( +impl Services { + #[tracing::instrument(skip(self, body, created_at))] + pub async fn send_dialog_message( &self, - services: &Services, from: PlayerId, to: PlayerId, body: Str, created_at: &DateTime, ) -> Result<()> { - let guard = self.0.read().await; + let guard = self.dialogs.0.read().await; let id = DialogId::new(from.clone(), to.clone()); let dialog = guard.dialogs.get(&id); if let Some(d) = dialog { let mut d = d.write().await; - services - .storage + self.storage .insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at) .await?; d.message_count += 1; } else { drop(guard); - let mut guard2 = self.0.write().await; + let mut guard2 = self.dialogs.0.write().await; // double check in case concurrent access has loaded this dialog if let Some(d) = guard2.dialogs.get(&id) { let mut d = d.write().await; - services - .storage + self.storage .insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at) .await?; d.message_count += 1; } else { let (p1, p2) = id.as_inner(); tracing::info!("Dialog {id:?} not found locally, trying to load from storage"); - let stored_dialog = match services.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? { + let stored_dialog = match self.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? { Some(t) => t, None => { tracing::info!("Dialog {id:?} does not exist, creating a new one in storage"); - services.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await? + self.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await? } }; tracing::info!("Dialog {id:?} loaded"); - services - .storage + self.storage .insert_dialog_message( stored_dialog.id, stored_dialog.message_count, @@ -110,7 +107,7 @@ impl DialogRegistry { drop(guard2); } // TODO send message to the other player and persist it - let Some(player) = services.players.get_player(&to).await else { + let Some(player) = self.players.get_player(&to).await else { tracing::debug!("Player {to:?} not active, not sending message"); return Ok(()); }; diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index db45f7f..292c220 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -398,7 +398,7 @@ impl Player { for room_id in rooms { if let Some(remote_node) = self.room_location(&room_id) { self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node }); - self.services.broadcasting.subscribe(self.player_id.clone(), room_id).await; + self.services.subscribe(self.player_id.clone(), room_id).await; } else { let room = self.services.rooms.get_room(&self.services, &room_id).await; if let Some(room) = room { @@ -596,7 +596,7 @@ impl Player { tracing::info!("no room found"); return SendMessageResult::NoSuchRoom; }; - let created_at = chrono::Utc::now(); + let created_at = Utc::now(); match room { RoomRef::Local(room) => { room.send_message(&self.services, &self.player_id, body.clone(), created_at.clone()).await; @@ -610,9 +610,7 @@ impl Player { }; self.services.client.send_room_message(*node_id, req).await.unwrap(); self.services - .broadcasting .broadcast( - &self.services.players, room_id.clone(), self.player_id.clone(), body.clone(), @@ -677,16 +675,9 @@ impl Player { #[tracing::instrument(skip(self, body), name = "Player::send_dialog_message")] async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) { - let created_at = chrono::Utc::now(); + let created_at = Utc::now(); self.services - .dialogs - .send_message( - &self.services, - self.player_id.clone(), - recipient.clone(), - body.clone(), - &created_at, - ) + .send_dialog_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at) .await .unwrap(); let update = Updates::NewDialogMessage {