diff --git a/src/core/player.rs b/src/core/player.rs index 58a6a50..e31e977 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -55,7 +55,7 @@ pub struct PlayerHandle { tx: Sender, } impl PlayerHandle { - pub async fn subscribe(&mut self) -> PlayerConnection { + pub async fn subscribe(&self) -> PlayerConnection { let (sender, receiver) = channel(32); let (promise, deferred) = oneshot(); self.tx @@ -70,7 +70,7 @@ impl PlayerHandle { } pub async fn send_message( - &mut self, + &self, room_id: RoomId, connection_id: ConnectionId, body: String, @@ -84,12 +84,12 @@ impl PlayerHandle { .await; } - pub async fn join_room(&mut self, room_id: RoomId) { + pub async fn join_room(&self, room_id: RoomId) { self.tx.send(PlayerCommand::JoinRoom { room_id }).await; } pub async fn receive_message( - &mut self, + &self, room_id: RoomId, author: PlayerId, connection_id: ConnectionId, diff --git a/src/core/room.rs b/src/core/room.rs index 74c7eea..c725740 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -6,7 +6,7 @@ use std::{ }; use prometheus::{IntGauge, Registry as MetricRegistry}; -use tokio::sync::mpsc::{channel, Sender}; +use tokio::sync::RwLock as AsyncRwLock; use crate::{ core::player::{PlayerHandle, PlayerId}, @@ -35,15 +35,16 @@ impl RoomRegistry { } pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle { - let room = Room { - subscriptions: HashMap::new(), - }; let mut inner = self.0.write().unwrap(); - if let Some((room_handle, _)) = inner.rooms.get(&room_id) { + if let Some(room_handle) = inner.rooms.get(&room_id) { room_handle.clone() } else { - let (room_handle, fiber) = room.launch(room_id.clone()); - inner.rooms.insert(room_id, (room_handle.clone(), fiber)); + let room = Room { + room_id: room_id.clone(), + subscriptions: HashMap::new(), + }; + let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); + inner.rooms.insert(room_id, room_handle.clone()); inner.metric_active_rooms.inc(); room_handle } @@ -52,31 +53,21 @@ impl RoomRegistry { pub fn get_room(&self, room_id: RoomId) -> Option { let inner = self.0.read().unwrap(); let res = inner.rooms.get(&room_id); - res.map(|r| r.0.clone()) + res.map(|r| r.clone()) } } struct RoomRegistryInner { - rooms: HashMap)>, + rooms: HashMap, metric_active_rooms: IntGauge, } #[derive(Clone)] -pub struct RoomHandle { - tx: Sender, -} +pub struct RoomHandle(Arc>); impl RoomHandle { - pub async fn subscribe(&mut self, player_id: PlayerId, player: PlayerHandle) { - match self - .tx - .send(RoomCommand::AddSubscriber { player_id, player }) - .await - { - Ok(_) => {} - Err(_) => { - tracing::error!("Room mailbox is closed unexpectedly"); - } - }; + pub async fn subscribe(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { + let mut lock = self.0.write().await; + lock.add_subscriber(player_id, player_handle); } pub async fn send_message( @@ -85,64 +76,32 @@ impl RoomHandle { connection_id: ConnectionId, body: String, ) { - self.tx - .send(RoomCommand::SendMessage { - player_id, - connection_id, - body, - }) - .await; + let lock = self.0.read().await; + lock.send_message(player_id, connection_id, body).await; } } -enum RoomCommand { - AddSubscriber { - player_id: PlayerId, - player: PlayerHandle, - }, - SendMessage { - player_id: PlayerId, - connection_id: ConnectionId, - body: String, - }, -} - struct Room { + room_id: RoomId, subscriptions: HashMap, } impl Room { - fn launch(mut self, room_id: RoomId) -> (RoomHandle, JoinHandle) { - let (tx, mut rx) = channel(32); - let fiber = tokio::task::spawn(async move { - tracing::info!("Starting room fiber"); - while let Some(a) = rx.recv().await { - match a { - RoomCommand::AddSubscriber { player_id, player } => { - tracing::info!("Adding a subscriber to room"); - self.subscriptions.insert(player_id, player); - } - RoomCommand::SendMessage { - player_id, - connection_id, - body, - } => { - tracing::info!("Adding a message to room"); - for (_, sub) in &mut self.subscriptions { - log::info!("Sending a message from room to player"); - sub.receive_message( - room_id.clone(), - player_id.clone(), - connection_id.clone(), - body.clone(), - ) - .await; - } - } - } - } - tracing::info!("Stopping room fiber"); - self - }); - (RoomHandle { tx }, fiber) + fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { + tracing::info!("Adding a subscriber to room"); + self.subscriptions.insert(player_id, player_handle); + } + + async fn send_message(&self, player_id: PlayerId, connection_id: ConnectionId, body: String) { + tracing::info!("Adding a message to room"); + for (_, sub) in &self.subscriptions { + log::info!("Sending a message from room to player"); + sub.receive_message( + self.room_id.clone(), + player_id.clone(), + connection_id.clone(), + body.clone(), + ) + .await; + } } }