make room a data structure behind a rwlock instead of an actor

This commit is contained in:
Nikita Vilunov 2023-02-14 01:12:27 +01:00
parent 315b7e638b
commit ec819d37ea
2 changed files with 38 additions and 79 deletions

View File

@ -55,7 +55,7 @@ pub struct PlayerHandle {
tx: Sender<PlayerCommand>, tx: Sender<PlayerCommand>,
} }
impl PlayerHandle { impl PlayerHandle {
pub async fn subscribe(&mut self) -> PlayerConnection { pub async fn subscribe(&self) -> PlayerConnection {
let (sender, receiver) = channel(32); let (sender, receiver) = channel(32);
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
self.tx self.tx
@ -70,7 +70,7 @@ impl PlayerHandle {
} }
pub async fn send_message( pub async fn send_message(
&mut self, &self,
room_id: RoomId, room_id: RoomId,
connection_id: ConnectionId, connection_id: ConnectionId,
body: String, body: String,
@ -84,12 +84,12 @@ impl PlayerHandle {
.await; .await;
} }
pub async fn join_room(&mut self, room_id: RoomId) { pub async fn join_room(&self, room_id: RoomId) {
self.tx.send(PlayerCommand::JoinRoom { room_id }).await; self.tx.send(PlayerCommand::JoinRoom { room_id }).await;
} }
pub async fn receive_message( pub async fn receive_message(
&mut self, &self,
room_id: RoomId, room_id: RoomId,
author: PlayerId, author: PlayerId,
connection_id: ConnectionId, connection_id: ConnectionId,

View File

@ -6,7 +6,7 @@ use std::{
}; };
use prometheus::{IntGauge, Registry as MetricRegistry}; use prometheus::{IntGauge, Registry as MetricRegistry};
use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::RwLock as AsyncRwLock;
use crate::{ use crate::{
core::player::{PlayerHandle, PlayerId}, core::player::{PlayerHandle, PlayerId},
@ -35,15 +35,16 @@ impl RoomRegistry {
} }
pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle { pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle {
let room = Room {
subscriptions: HashMap::new(),
};
let mut inner = self.0.write().unwrap(); let mut inner = self.0.write().unwrap();
if let Some((room_handle, _)) = inner.rooms.get(&room_id) { if let Some(room_handle) = inner.rooms.get(&room_id) {
room_handle.clone() room_handle.clone()
} else { } else {
let (room_handle, fiber) = room.launch(room_id.clone()); let room = Room {
inner.rooms.insert(room_id, (room_handle.clone(), fiber)); room_id: room_id.clone(),
subscriptions: HashMap::new(),
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone());
inner.metric_active_rooms.inc(); inner.metric_active_rooms.inc();
room_handle room_handle
} }
@ -52,31 +53,21 @@ impl RoomRegistry {
pub fn get_room(&self, room_id: RoomId) -> Option<RoomHandle> { pub fn get_room(&self, room_id: RoomId) -> Option<RoomHandle> {
let inner = self.0.read().unwrap(); let inner = self.0.read().unwrap();
let res = inner.rooms.get(&room_id); let res = inner.rooms.get(&room_id);
res.map(|r| r.0.clone()) res.map(|r| r.clone())
} }
} }
struct RoomRegistryInner { struct RoomRegistryInner {
rooms: HashMap<RoomId, (RoomHandle, JoinHandle<Room>)>, rooms: HashMap<RoomId, RoomHandle>,
metric_active_rooms: IntGauge, metric_active_rooms: IntGauge,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct RoomHandle { pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
tx: Sender<RoomCommand>,
}
impl RoomHandle { impl RoomHandle {
pub async fn subscribe(&mut self, player_id: PlayerId, player: PlayerHandle) { pub async fn subscribe(&mut self, player_id: PlayerId, player_handle: PlayerHandle) {
match self let mut lock = self.0.write().await;
.tx lock.add_subscriber(player_id, player_handle);
.send(RoomCommand::AddSubscriber { player_id, player })
.await
{
Ok(_) => {}
Err(_) => {
tracing::error!("Room mailbox is closed unexpectedly");
}
};
} }
pub async fn send_message( pub async fn send_message(
@ -85,52 +76,27 @@ impl RoomHandle {
connection_id: ConnectionId, connection_id: ConnectionId,
body: String, body: String,
) { ) {
self.tx let lock = self.0.read().await;
.send(RoomCommand::SendMessage { lock.send_message(player_id, connection_id, body).await;
player_id,
connection_id,
body,
})
.await;
} }
} }
enum RoomCommand {
AddSubscriber {
player_id: PlayerId,
player: PlayerHandle,
},
SendMessage {
player_id: PlayerId,
connection_id: ConnectionId,
body: String,
},
}
struct Room { struct Room {
room_id: RoomId,
subscriptions: HashMap<PlayerId, PlayerHandle>, subscriptions: HashMap<PlayerId, PlayerHandle>,
} }
impl Room { impl Room {
fn launch(mut self, room_id: RoomId) -> (RoomHandle, JoinHandle<Room>) { fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) {
let (tx, mut rx) = channel(32);
let fiber = tokio::task::spawn(async move {
tracing::info!("Starting room fiber");
while let Some(a) = rx.recv().await {
match a {
RoomCommand::AddSubscriber { player_id, player } => {
tracing::info!("Adding a subscriber to room"); tracing::info!("Adding a subscriber to room");
self.subscriptions.insert(player_id, player); self.subscriptions.insert(player_id, player_handle);
} }
RoomCommand::SendMessage {
player_id, async fn send_message(&self, player_id: PlayerId, connection_id: ConnectionId, body: String) {
connection_id,
body,
} => {
tracing::info!("Adding a message to room"); tracing::info!("Adding a message to room");
for (_, sub) in &mut self.subscriptions { for (_, sub) in &self.subscriptions {
log::info!("Sending a message from room to player"); log::info!("Sending a message from room to player");
sub.receive_message( sub.receive_message(
room_id.clone(), self.room_id.clone(),
player_id.clone(), player_id.clone(),
connection_id.clone(), connection_id.clone(),
body.clone(), body.clone(),
@ -139,10 +105,3 @@ impl Room {
} }
} }
} }
}
tracing::info!("Stopping room fiber");
self
});
(RoomHandle { tx }, fiber)
}
}