From 8832c579e485fbda75c6358edbecca85c3242df0 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Mon, 15 Apr 2024 02:20:22 +0200 Subject: [PATCH] clean up some stuff --- crates/lavina-core/src/player.rs | 16 ++--- crates/lavina-core/src/repo/room.rs | 4 +- crates/lavina-core/src/room.rs | 94 +++++++++++++---------------- crates/projection-irc/src/lib.rs | 2 +- crates/projection-xmpp/src/lib.rs | 2 +- 5 files changed, 53 insertions(+), 65 deletions(-) diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 426cc00..0486808 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -224,19 +224,19 @@ impl PlayerRegistry { Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) } - pub async fn get_or_launch_player(&mut self, id: PlayerId) -> PlayerHandle { + pub async fn get_or_launch_player(&mut self, id: &PlayerId) -> PlayerHandle { let mut inner = self.0.write().await; - if let Some((handle, _)) = inner.players.get(&id) { + if let Some((handle, _)) = inner.players.get(id) { handle.clone() } else { let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.storage.clone()).await; - inner.players.insert(id, (handle.clone(), fiber)); + inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.metric_active_players.inc(); handle } } - pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection { + pub async fn connect_to_player(&mut self, id: &PlayerId) -> PlayerConnection { let player_handle = self.get_or_launch_player(id).await; player_handle.subscribe().await } @@ -396,8 +396,8 @@ impl Player { todo!(); } }; - room.add_member(self.player_id.clone(), self.storage_id).await; - room.subscribe(self.player_id.clone(), self.handle.clone()).await; + room.add_member(&self.player_id, self.storage_id).await; + room.subscribe(&self.player_id, self.handle.clone()).await; self.my_rooms.insert(room_id.clone(), room.clone()); let room_info = room.get_room_info().await; let update = Updates::RoomJoined { @@ -426,7 +426,7 @@ impl Player { tracing::info!("no room found"); return; }; - room.send_message(self.player_id.clone(), body.clone()).await; + room.send_message(&self.player_id, body.clone()).await; let update = Updates::NewMessage { room_id, author_id: self.player_id.clone(), @@ -440,7 +440,7 @@ impl Player { tracing::info!("no room found"); return; }; - room.set_topic(self.player_id.clone(), new_topic.clone()).await; + room.set_topic(&self.player_id, new_topic.clone()).await; let update = Updates::RoomTopicChanged { room_id, new_topic }; self.broadcast_update(update, connection_id).await; } diff --git a/crates/lavina-core/src/repo/room.rs b/crates/lavina-core/src/repo/room.rs index 079deb6..b568b9d 100644 --- a/crates/lavina-core/src/repo/room.rs +++ b/crates/lavina-core/src/repo/room.rs @@ -3,7 +3,7 @@ use anyhow::Result; use crate::repo::Storage; impl Storage { - pub async fn add_room_member(&self, room_id: &u32, player_id: &u32) -> Result<()> { + pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { let mut executor = self.conn.lock().await; sqlx::query( "insert into memberships(user_id, room_id, status) @@ -17,7 +17,7 @@ impl Storage { Ok(()) } - pub async fn remove_room_member(&self, room_id: &u32, player_id: &u32) -> Result<()> { + pub async fn remove_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { let mut executor = self.conn.lock().await; sqlx::query( "delete from memberships diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index e7e8776..dbd14fd 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -49,28 +49,9 @@ impl RoomRegistry { pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result { let mut inner = self.0.write().await; - if let Some(room_handle) = inner.rooms.get(&room_id) { - // room was already loaded into memory - log::debug!("Room {} was loaded already", &room_id.0); + if let Some(room_handle) = inner.get_or_load_room(&room_id).await? { Ok(room_handle.clone()) - } else if let Some(stored_room) = inner.storage.retrieve_room_by_name(&*room_id.0).await? { - // room exists, but was not loaded - log::debug!("Loading room {}...", &room_id.0); - let room = Room { - storage_id: stored_room.id, - room_id: room_id.clone(), - subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions - members: HashSet::new(), // TODO load members from storage - topic: stored_room.topic.into(), - message_count: stored_room.message_count, - storage: inner.storage.clone(), - }; - let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); - inner.rooms.insert(room_id, room_handle.clone()); - inner.metric_active_rooms.inc(); - Ok(room_handle) } else { - // room does not exist, create it and load log::debug!("Creating room {}...", &room_id.0); let topic = "New room"; let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?; @@ -92,27 +73,7 @@ impl RoomRegistry { pub async fn get_room(&self, room_id: &RoomId) -> Option { let mut inner = self.0.write().await; - if let Some(room_handle) = inner.rooms.get(room_id) { - Some(room_handle.clone()) - } else if let Some(stored_room) = inner.storage.retrieve_room_by_name(&*room_id.0).await.unwrap() { - // room exists, but was not loaded - log::debug!("Loading room {}...", &room_id.0); - let room = Room { - storage_id: stored_room.id, - room_id: room_id.clone(), - subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions - members: HashSet::new(), // TODO load members from storage - topic: stored_room.topic.into(), - message_count: stored_room.message_count, - storage: inner.storage.clone(), - }; - let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); - inner.rooms.insert(room_id.clone(), room_handle.clone()); - inner.metric_active_rooms.inc(); - Some(room_handle) - } else { - None - } + inner.get_or_load_room(room_id).await.unwrap() } pub async fn get_all_rooms(&self) -> Vec { @@ -135,26 +96,53 @@ struct RoomRegistryInner { storage: Storage, } +impl RoomRegistryInner { + async fn get_or_load_room(&mut self, room_id: &RoomId) -> Result> { + if let Some(room_handle) = self.rooms.get(room_id) { + log::debug!("Room {} was loaded already", &room_id.0); + Ok(Some(room_handle.clone())) + } else if let Some(stored_room) = self.storage.retrieve_room_by_name(&*room_id.0).await? { + log::debug!("Loading room {}...", &room_id.0); + let room = Room { + storage_id: stored_room.id, + room_id: room_id.clone(), + subscriptions: HashMap::new(), + members: HashSet::new(), // TODO load members from storage + topic: stored_room.topic.into(), + message_count: stored_room.message_count, + storage: self.storage.clone(), + }; + let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); + self.rooms.insert(room_id.clone(), room_handle.clone()); + self.metric_active_rooms.inc(); + Ok(Some(room_handle)) + } else { + tracing::debug!("Room {} does not exist", &room_id.0); + Ok(None) + } + } +} + #[derive(Clone)] pub struct RoomHandle(Arc>); impl RoomHandle { - pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) { + pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) { let mut lock = self.0.write().await; tracing::info!("Adding a subscriber to a room"); lock.subscriptions.insert(player_id.clone(), player_handle); } - pub async fn add_member(&self, player_id: PlayerId, player_storage_id: u32) { + pub async fn add_member(&self, player_id: &PlayerId, player_storage_id: u32) { let mut lock = self.0.write().await; tracing::info!("Adding a new member to a room"); - let storage_id = lock.storage_id; + let room_storage_id = lock.storage_id; + lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap(); lock.members.insert(player_id.clone()); - lock.storage.add_room_member(&storage_id, &player_storage_id).await.unwrap(); let update = Updates::RoomJoined { room_id: lock.room_id.clone(), new_member_id: player_id.clone(), }; - lock.broadcast_update(update, &player_id).await; + lock.broadcast_update(update, player_id).await; } pub async fn unsubscribe(&self, player_id: &PlayerId) { @@ -165,9 +153,9 @@ impl RoomHandle { pub async fn remove_member(&self, player_id: &PlayerId, player_storage_id: u32) { let mut lock = self.0.write().await; tracing::info!("Removing a member from a room"); - let storage_id = lock.storage_id; + let room_storage_id = lock.storage_id; + lock.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap(); lock.members.remove(player_id); - lock.storage.remove_room_member(&storage_id, &player_storage_id).await.unwrap(); let update = Updates::RoomLeft { room_id: lock.room_id.clone(), former_member_id: player_id.clone(), @@ -175,7 +163,7 @@ impl RoomHandle { lock.broadcast_update(update, player_id).await; } - pub async fn send_message(&self, player_id: PlayerId, body: Str) { + pub async fn send_message(&self, player_id: &PlayerId, body: Str) { let mut lock = self.0.write().await; let res = lock.send_message(player_id, body).await; if let Err(err) = res { @@ -192,14 +180,14 @@ impl RoomHandle { } } - pub async fn set_topic(&self, changer_id: PlayerId, new_topic: Str) { + pub async fn set_topic(&self, changer_id: &PlayerId, new_topic: Str) { let mut lock = self.0.write().await; lock.topic = new_topic.clone(); let update = Updates::RoomTopicChanged { room_id: lock.room_id.clone(), new_topic: new_topic.clone(), }; - lock.broadcast_update(update, &changer_id).await; + lock.broadcast_update(update, changer_id).await; } } @@ -218,7 +206,7 @@ struct Room { storage: Storage, } impl Room { - async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> { + async fn send_message(&mut self, author_id: &PlayerId, body: Str) -> Result<()> { tracing::info!("Adding a message to room"); self.storage.insert_message(self.storage_id, self.message_count, &body, &*author_id.as_inner()).await?; self.message_count += 1; @@ -227,7 +215,7 @@ impl Room { author_id: author_id.clone(), body, }; - self.broadcast_update(update, &author_id).await; + self.broadcast_update(update, author_id).await; Ok(()) } diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index ee47f9b..e52e92a 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -392,7 +392,7 @@ async fn handle_registered_socket<'a>( log::info!("Handling registered user: {user:?}"); let player_id = PlayerId::from(user.nickname.clone())?; - let mut connection = players.connect_to_player(player_id.clone()).await; + let mut connection = players.connect_to_player(&player_id).await; let text: Str = format!("Welcome to {} Server", &config.server_name).into(); ServerMessage { diff --git a/crates/projection-xmpp/src/lib.rs b/crates/projection-xmpp/src/lib.rs index c659e5b..e17218d 100644 --- a/crates/projection-xmpp/src/lib.rs +++ b/crates/projection-xmpp/src/lib.rs @@ -205,7 +205,7 @@ async fn handle_socket( authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage) => { match authenticated { Ok(authenticated) => { - let mut connection = players.connect_to_player(authenticated.player_id.clone()).await; + let mut connection = players.connect_to_player(&authenticated.player_id).await; socket_final( &mut xml_reader, &mut xml_writer,