From c983b59ddcbea2494b19a93a5cb8bcebefcb4572 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Mon, 22 Apr 2024 21:27:14 +0200 Subject: [PATCH] fix --- crates/lavina-core/src/dialog.rs | 82 ++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/crates/lavina-core/src/dialog.rs b/crates/lavina-core/src/dialog.rs index eae3915..f597295 100644 --- a/crates/lavina-core/src/dialog.rs +++ b/crates/lavina-core/src/dialog.rs @@ -60,51 +60,59 @@ impl DialogRegistry { body: Str, created_at: &DateTime, ) -> Result<()> { - let guard = self.0.read().await; + let mut guard = self.0.read().await; let id = DialogId::new(from.clone(), to.clone()); let dialog = guard.dialogs.get(&id); if let Some(d) = dialog { let mut d = d.write().await; guard.storage.increment_dialog_message_count(d.storage_id).await?; d.message_count += 1; - let Some(players) = &guard.players else { - tracing::error!("No player registry present"); - return Ok(()); - }; - let Some(player) = players.get_player(&to).await else { - tracing::debug!("Player {to:?} not active, not sending message"); - return Ok(()); - }; - let update = Updates::NewDialogMessage { - sender: from.clone(), - receiver: to.clone(), - body: body.clone(), - created_at: chrono::Utc::now(), // todo - }; - player.update(update).await; - return Ok(()); - } - drop(guard); - let mut guard = self.0.write().await; - let (p1, p2) = id.as_inner(); - tracing::info!("Dialog {id:?} not found locally, trying to load from storage"); - let stored_dialog = match guard.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? { - Some(t) => t, - None => { - tracing::info!("Dialog {id:?} does not exist, creating a new one in storage"); - guard.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await? + } else { + drop(guard); + let mut guard2 = self.0.write().await; + // double check in case concurrent access has loaded this dialog + if let Some(d) = guard2.dialogs.get(&id) { + let mut d = d.write().await; + guard2.storage.increment_dialog_message_count(d.storage_id).await?; + d.message_count += 1; + } else { + let (p1, p2) = id.as_inner(); + tracing::info!("Dialog {id:?} not found locally, trying to load from storage"); + let stored_dialog = match guard2.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? { + Some(t) => t, + None => { + tracing::info!("Dialog {id:?} does not exist, creating a new one in storage"); + guard2.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await? + } + }; + tracing::info!("Dialog {id:?} loaded"); + guard2.storage.increment_dialog_message_count(stored_dialog.id).await?; + let dialog = Dialog { + storage_id: stored_dialog.id, + player_storage_id_1: stored_dialog.participant_1, + player_storage_id_2: stored_dialog.participant_2, + message_count: stored_dialog.message_count + 1, + }; + guard2.dialogs.insert(id.clone(), AsyncRwLock::new(dialog)); } - }; - tracing::info!("Dialog {id:?} loaded"); - guard.storage.increment_dialog_message_count(stored_dialog.id).await?; - let dialog = Dialog { - storage_id: stored_dialog.id, - player_storage_id_1: stored_dialog.participant_1, - player_storage_id_2: stored_dialog.participant_2, - message_count: stored_dialog.message_count + 1, - }; - guard.dialogs.insert(id.clone(), AsyncRwLock::new(dialog)); + guard = guard2.downgrade(); + } // TODO send message to the other player and persist it + let Some(players) = &guard.players else { + tracing::error!("No player registry present"); + return Ok(()); + }; + let Some(player) = players.get_player(&to).await else { + tracing::debug!("Player {to:?} not active, not sending message"); + return Ok(()); + }; + let update = Updates::NewDialogMessage { + sender: from.clone(), + receiver: to.clone(), + body: body.clone(), + created_at: chrono::Utc::now(), // todo + }; + player.update(update).await; return Ok(()); } }