From 3950ee1d7a732d96e9be311cf08675bdf4fce1e8 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Tue, 14 Feb 2023 23:38:40 +0100 Subject: [PATCH] refactor player actor a bit --- src/core/player.rs | 216 +++++++++++++++++++++++---------------------- 1 file changed, 112 insertions(+), 104 deletions(-) diff --git a/src/core/player.rs b/src/core/player.rs index 8d8f10b..3abdead 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -208,14 +208,11 @@ impl PlayerRegistry { } pub async fn get_or_create_player(&mut self, id: PlayerId) -> PlayerHandle { - let player = Player { - connections: AnonTable::new(), - }; let mut inner = self.0.write().unwrap(); if let Some((handle, _)) = inner.players.get(&id) { handle.clone() } else { - let (handle, fiber) = player.launch(id.clone(), inner.room_registry.clone()); + let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone()); inner.players.insert(id, (handle.clone(), fiber)); inner.metric_active_players.inc(); handle @@ -237,112 +234,123 @@ struct PlayerRegistryInner { /// Player actor inner state representation. struct Player { + player_id: PlayerId, connections: AnonTable>, + my_rooms: HashMap, + rx: Receiver, + handle: PlayerHandle, + rooms: RoomRegistry, } impl Player { - fn launch( - mut self, - player_id: PlayerId, - mut rooms: RoomRegistry, - ) -> (PlayerHandle, JoinHandle) { + fn launch(player_id: PlayerId, rooms: RoomRegistry) -> (PlayerHandle, JoinHandle) { let (tx, mut rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); - let mut my_rooms: HashMap = HashMap::new(); - let fiber = tokio::task::spawn(async move { - while let Some(cmd) = rx.recv().await { - match cmd { - PlayerCommand::AddConnection { sender, promise } => { - let connection_id = self.connections.insert(sender); - promise.send(ConnectionId(connection_id)); - } - PlayerCommand::GetRooms(promise) => { - let mut response = vec![]; - for (_, handle) in &my_rooms { - response.push(handle.get_room_info().await); - } - promise.send(response); - } - PlayerCommand::Update(update) => { - log::info!( - "Player received an update, broadcasting to {} connections", - self.connections.len() - ); - for (_, connection) in &self.connections { - connection.send(update.clone()).await; - } - } - PlayerCommand::Cmd(cmd, connection_id) => match cmd { - Cmd::JoinRoom { room_id, promise } => { - let mut room = rooms.get_or_create_room(room_id.clone()); - room.subscribe(player_id.clone(), handle.clone()).await; - my_rooms.insert(room_id.clone(), room.clone()); - let members = room.get_members().await; - promise.send(RoomInfo { - id: room_id.clone(), - members, - topic: b"some topic lol".to_vec(), - }); - let update = Updates::RoomJoined { - room_id, - new_member_id: player_id.clone(), - }; - for (a, b) in &self.connections { - if ConnectionId(a) == connection_id { - continue; - } - b.send(update.clone()).await; - } - } - Cmd::SendMessage { - room_id, - body, - promise, - } => { - let room = rooms.get_room(&room_id); - if let Some(room) = room { - room.send_message(player_id.clone(), body.clone()).await; - } else { - tracing::info!("no room found"); - } - promise.send(()); - let update = Updates::NewMessage { - room_id, - author_id: player_id.clone(), - body, - }; - for (a, b) in &self.connections { - if ConnectionId(a) == connection_id { - continue; - } - b.send(update.clone()).await; - } - } - Cmd::ChangeTopic { - room_id, - new_topic, - promise, - } => { - let room = rooms.get_room(&room_id); - if let Some(mut room) = room { - room.set_topic(player_id.clone(), new_topic.clone()).await; - } else { - tracing::info!("no room found"); - } - promise.send(()); - let update = Updates::RoomTopicChanged { room_id, new_topic }; - for (a, b) in &self.connections { - if ConnectionId(a) == connection_id { - continue; - } - b.send(update.clone()).await; - } - } - }, - } - } - self - }); + let player = Player { + player_id, + connections: AnonTable::new(), + my_rooms: HashMap::new(), + rx, + handle, + rooms, + }; + let fiber = tokio::task::spawn(player.main_loop()); (handle_clone, fiber) } + + async fn main_loop(mut self) -> Self { + while let Some(cmd) = self.rx.recv().await { + match cmd { + PlayerCommand::AddConnection { sender, promise } => { + let connection_id = self.connections.insert(sender); + promise.send(ConnectionId(connection_id)); + } + PlayerCommand::GetRooms(promise) => { + let mut response = vec![]; + for (_, handle) in &self.my_rooms { + response.push(handle.get_room_info().await); + } + promise.send(response); + } + PlayerCommand::Update(update) => { + log::info!( + "Player received an update, broadcasting to {} connections", + self.connections.len() + ); + for (_, connection) in &self.connections { + connection.send(update.clone()).await; + } + } + PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, + } + } + self + } + + async fn handle_cmd(&mut self, cmd: Cmd, connection_id: ConnectionId) { + match cmd { + Cmd::JoinRoom { room_id, promise } => { + let mut room = self.rooms.get_or_create_room(room_id.clone()); + room.subscribe(self.player_id.clone(), self.handle.clone()) + .await; + self.my_rooms.insert(room_id.clone(), room.clone()); + let members = room.get_members().await; + promise.send(RoomInfo { + id: room_id.clone(), + members, + topic: b"some topic lol".to_vec(), + }); + let update = Updates::RoomJoined { + room_id, + new_member_id: self.player_id.clone(), + }; + self.broadcast_update(update, connection_id); + } + Cmd::SendMessage { + room_id, + body, + promise, + } => { + let room = self.rooms.get_room(&room_id); + if let Some(room) = room { + room.send_message(self.player_id.clone(), body.clone()) + .await; + } else { + tracing::info!("no room found"); + } + promise.send(()); + let update = Updates::NewMessage { + room_id, + author_id: self.player_id.clone(), + body, + }; + self.broadcast_update(update, connection_id); + } + Cmd::ChangeTopic { + room_id, + new_topic, + promise, + } => { + let room = self.rooms.get_room(&room_id); + if let Some(mut room) = room { + room.set_topic(self.player_id.clone(), new_topic.clone()) + .await; + } else { + tracing::info!("no room found"); + } + promise.send(()); + let update = Updates::RoomTopicChanged { room_id, new_topic }; + self.broadcast_update(update, connection_id); + } + } + } + + async fn broadcast_update(&self, update: Updates, except: ConnectionId) { + for (a, b) in &self.connections { + if ConnectionId(a) == except { + continue; + } + b.send(update.clone()).await; + } + } }