refactor player actor a bit

This commit is contained in:
Nikita Vilunov 2023-02-14 23:38:40 +01:00
parent 05f8c5e502
commit 3950ee1d7a
1 changed files with 112 additions and 104 deletions

View File

@ -208,14 +208,11 @@ impl PlayerRegistry {
}
pub async fn get_or_create_player(&mut self, id: PlayerId) -> PlayerHandle {
let player = Player {
connections: AnonTable::new(),
};
let mut inner = self.0.write().unwrap();
if let Some((handle, _)) = inner.players.get(&id) {
handle.clone()
} else {
let (handle, fiber) = player.launch(id.clone(), inner.room_registry.clone());
let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone());
inner.players.insert(id, (handle.clone(), fiber));
inner.metric_active_players.inc();
handle
@ -237,112 +234,123 @@ struct PlayerRegistryInner {
/// Player actor inner state representation.
struct Player {
player_id: PlayerId,
connections: AnonTable<Sender<Updates>>,
my_rooms: HashMap<RoomId, RoomHandle>,
rx: Receiver<PlayerCommand>,
handle: PlayerHandle,
rooms: RoomRegistry,
}
impl Player {
fn launch(
mut self,
player_id: PlayerId,
mut rooms: RoomRegistry,
) -> (PlayerHandle, JoinHandle<Player>) {
fn launch(player_id: PlayerId, rooms: RoomRegistry) -> (PlayerHandle, JoinHandle<Player>) {
let (tx, mut rx) = channel(32);
let handle = PlayerHandle { tx };
let handle_clone = handle.clone();
let mut my_rooms: HashMap<RoomId, RoomHandle> = HashMap::new();
let fiber = tokio::task::spawn(async move {
while let Some(cmd) = rx.recv().await {
match cmd {
PlayerCommand::AddConnection { sender, promise } => {
let connection_id = self.connections.insert(sender);
promise.send(ConnectionId(connection_id));
}
PlayerCommand::GetRooms(promise) => {
let mut response = vec![];
for (_, handle) in &my_rooms {
response.push(handle.get_room_info().await);
}
promise.send(response);
}
PlayerCommand::Update(update) => {
log::info!(
"Player received an update, broadcasting to {} connections",
self.connections.len()
);
for (_, connection) in &self.connections {
connection.send(update.clone()).await;
}
}
PlayerCommand::Cmd(cmd, connection_id) => match cmd {
Cmd::JoinRoom { room_id, promise } => {
let mut room = rooms.get_or_create_room(room_id.clone());
room.subscribe(player_id.clone(), handle.clone()).await;
my_rooms.insert(room_id.clone(), room.clone());
let members = room.get_members().await;
promise.send(RoomInfo {
id: room_id.clone(),
members,
topic: b"some topic lol".to_vec(),
});
let update = Updates::RoomJoined {
room_id,
new_member_id: player_id.clone(),
};
for (a, b) in &self.connections {
if ConnectionId(a) == connection_id {
continue;
}
b.send(update.clone()).await;
}
}
Cmd::SendMessage {
room_id,
body,
promise,
} => {
let room = rooms.get_room(&room_id);
if let Some(room) = room {
room.send_message(player_id.clone(), body.clone()).await;
} else {
tracing::info!("no room found");
}
promise.send(());
let update = Updates::NewMessage {
room_id,
author_id: player_id.clone(),
body,
};
for (a, b) in &self.connections {
if ConnectionId(a) == connection_id {
continue;
}
b.send(update.clone()).await;
}
}
Cmd::ChangeTopic {
room_id,
new_topic,
promise,
} => {
let room = rooms.get_room(&room_id);
if let Some(mut room) = room {
room.set_topic(player_id.clone(), new_topic.clone()).await;
} else {
tracing::info!("no room found");
}
promise.send(());
let update = Updates::RoomTopicChanged { room_id, new_topic };
for (a, b) in &self.connections {
if ConnectionId(a) == connection_id {
continue;
}
b.send(update.clone()).await;
}
}
},
}
}
self
});
let player = Player {
player_id,
connections: AnonTable::new(),
my_rooms: HashMap::new(),
rx,
handle,
rooms,
};
let fiber = tokio::task::spawn(player.main_loop());
(handle_clone, fiber)
}
async fn main_loop(mut self) -> Self {
while let Some(cmd) = self.rx.recv().await {
match cmd {
PlayerCommand::AddConnection { sender, promise } => {
let connection_id = self.connections.insert(sender);
promise.send(ConnectionId(connection_id));
}
PlayerCommand::GetRooms(promise) => {
let mut response = vec![];
for (_, handle) in &self.my_rooms {
response.push(handle.get_room_info().await);
}
promise.send(response);
}
PlayerCommand::Update(update) => {
log::info!(
"Player received an update, broadcasting to {} connections",
self.connections.len()
);
for (_, connection) in &self.connections {
connection.send(update.clone()).await;
}
}
PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await,
}
}
self
}
async fn handle_cmd(&mut self, cmd: Cmd, connection_id: ConnectionId) {
match cmd {
Cmd::JoinRoom { room_id, promise } => {
let mut room = self.rooms.get_or_create_room(room_id.clone());
room.subscribe(self.player_id.clone(), self.handle.clone())
.await;
self.my_rooms.insert(room_id.clone(), room.clone());
let members = room.get_members().await;
promise.send(RoomInfo {
id: room_id.clone(),
members,
topic: b"some topic lol".to_vec(),
});
let update = Updates::RoomJoined {
room_id,
new_member_id: self.player_id.clone(),
};
self.broadcast_update(update, connection_id);
}
Cmd::SendMessage {
room_id,
body,
promise,
} => {
let room = self.rooms.get_room(&room_id);
if let Some(room) = room {
room.send_message(self.player_id.clone(), body.clone())
.await;
} else {
tracing::info!("no room found");
}
promise.send(());
let update = Updates::NewMessage {
room_id,
author_id: self.player_id.clone(),
body,
};
self.broadcast_update(update, connection_id);
}
Cmd::ChangeTopic {
room_id,
new_topic,
promise,
} => {
let room = self.rooms.get_room(&room_id);
if let Some(mut room) = room {
room.set_topic(self.player_id.clone(), new_topic.clone())
.await;
} else {
tracing::info!("no room found");
}
promise.send(());
let update = Updates::RoomTopicChanged { room_id, new_topic };
self.broadcast_update(update, connection_id);
}
}
}
async fn broadcast_update(&self, update: Updates, except: ConnectionId) {
for (a, b) in &self.connections {
if ConnectionId(a) == except {
continue;
}
b.send(update.clone()).await;
}
}
}