forked from lavina/lavina
1
0
Fork 0
This commit is contained in:
Nikita Vilunov 2024-04-22 21:27:14 +02:00
parent c3a709424e
commit c983b59ddc
1 changed files with 45 additions and 37 deletions

View File

@ -60,51 +60,59 @@ impl DialogRegistry {
body: Str, body: Str,
created_at: &DateTime<Utc>, created_at: &DateTime<Utc>,
) -> Result<()> { ) -> Result<()> {
let guard = self.0.read().await; let mut guard = self.0.read().await;
let id = DialogId::new(from.clone(), to.clone()); let id = DialogId::new(from.clone(), to.clone());
let dialog = guard.dialogs.get(&id); let dialog = guard.dialogs.get(&id);
if let Some(d) = dialog { if let Some(d) = dialog {
let mut d = d.write().await; let mut d = d.write().await;
guard.storage.increment_dialog_message_count(d.storage_id).await?; guard.storage.increment_dialog_message_count(d.storage_id).await?;
d.message_count += 1; d.message_count += 1;
let Some(players) = &guard.players else { } else {
tracing::error!("No player registry present"); drop(guard);
return Ok(()); let mut guard2 = self.0.write().await;
}; // double check in case concurrent access has loaded this dialog
let Some(player) = players.get_player(&to).await else { if let Some(d) = guard2.dialogs.get(&id) {
tracing::debug!("Player {to:?} not active, not sending message"); let mut d = d.write().await;
return Ok(()); guard2.storage.increment_dialog_message_count(d.storage_id).await?;
}; d.message_count += 1;
let update = Updates::NewDialogMessage { } else {
sender: from.clone(), let (p1, p2) = id.as_inner();
receiver: to.clone(), tracing::info!("Dialog {id:?} not found locally, trying to load from storage");
body: body.clone(), let stored_dialog = match guard2.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? {
created_at: chrono::Utc::now(), // todo Some(t) => t,
}; None => {
player.update(update).await; tracing::info!("Dialog {id:?} does not exist, creating a new one in storage");
return Ok(()); guard2.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await?
} }
drop(guard); };
let mut guard = self.0.write().await; tracing::info!("Dialog {id:?} loaded");
let (p1, p2) = id.as_inner(); guard2.storage.increment_dialog_message_count(stored_dialog.id).await?;
tracing::info!("Dialog {id:?} not found locally, trying to load from storage"); let dialog = Dialog {
let stored_dialog = match guard.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? { storage_id: stored_dialog.id,
Some(t) => t, player_storage_id_1: stored_dialog.participant_1,
None => { player_storage_id_2: stored_dialog.participant_2,
tracing::info!("Dialog {id:?} does not exist, creating a new one in storage"); message_count: stored_dialog.message_count + 1,
guard.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await? };
guard2.dialogs.insert(id.clone(), AsyncRwLock::new(dialog));
} }
}; guard = guard2.downgrade();
tracing::info!("Dialog {id:?} loaded"); }
guard.storage.increment_dialog_message_count(stored_dialog.id).await?;
let dialog = Dialog {
storage_id: stored_dialog.id,
player_storage_id_1: stored_dialog.participant_1,
player_storage_id_2: stored_dialog.participant_2,
message_count: stored_dialog.message_count + 1,
};
guard.dialogs.insert(id.clone(), AsyncRwLock::new(dialog));
// TODO send message to the other player and persist it // TODO send message to the other player and persist it
let Some(players) = &guard.players else {
tracing::error!("No player registry present");
return Ok(());
};
let Some(player) = players.get_player(&to).await else {
tracing::debug!("Player {to:?} not active, not sending message");
return Ok(());
};
let update = Updates::NewDialogMessage {
sender: from.clone(),
receiver: to.clone(),
body: body.clone(),
created_at: chrono::Utc::now(), // todo
};
player.update(update).await;
return Ok(()); return Ok(());
} }
} }