forked from lavina/lavina
181 lines
5.9 KiB
Rust
181 lines
5.9 KiB
Rust
//! Domain of dialogs – conversations between two participants.
|
||
//!
|
||
//! Dialogs are different from rooms in that they are always between two participants.
|
||
//! There are no admins or other roles in dialogs, both participants have equal rights.
|
||
|
||
use std::collections::HashMap;
|
||
use std::sync::Arc;
|
||
|
||
use chrono::{DateTime, Utc};
|
||
use tokio::sync::RwLock as AsyncRwLock;
|
||
|
||
use crate::player::{PlayerId, PlayerRegistry, Updates};
|
||
use crate::prelude::*;
|
||
use crate::repo::Storage;
|
||
|
||
/// Id of a conversation between two players.
|
||
///
|
||
/// Dialogs are identified by the pair of participants' ids. The order of ids does not matter.
|
||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||
pub struct DialogId(PlayerId, PlayerId);
|
||
impl DialogId {
|
||
pub fn new(a: PlayerId, b: PlayerId) -> DialogId {
|
||
if a.as_inner() < b.as_inner() {
|
||
DialogId(a, b)
|
||
} else {
|
||
DialogId(b, a)
|
||
}
|
||
}
|
||
|
||
pub fn as_inner(&self) -> (&PlayerId, &PlayerId) {
|
||
(&self.0, &self.1)
|
||
}
|
||
|
||
pub fn into_inner(self) -> (PlayerId, PlayerId) {
|
||
(self.0, self.1)
|
||
}
|
||
}
|
||
|
||
struct Dialog {
|
||
storage_id: u32,
|
||
player_storage_id_1: u32,
|
||
player_storage_id_2: u32,
|
||
message_count: u32,
|
||
}
|
||
|
||
struct DialogRegistryInner {
|
||
dialogs: HashMap<DialogId, AsyncRwLock<Dialog>>,
|
||
players: Option<PlayerRegistry>,
|
||
storage: Storage,
|
||
}
|
||
|
||
#[derive(Clone)]
|
||
pub struct DialogRegistry(Arc<AsyncRwLock<DialogRegistryInner>>);
|
||
|
||
impl DialogRegistry {
|
||
pub async fn send_message(
|
||
&self,
|
||
from: PlayerId,
|
||
to: PlayerId,
|
||
body: Str,
|
||
created_at: &DateTime<Utc>,
|
||
) -> Result<()> {
|
||
let mut guard = self.0.read().await;
|
||
let id = DialogId::new(from.clone(), to.clone());
|
||
let dialog = guard.dialogs.get(&id);
|
||
if let Some(d) = dialog {
|
||
let mut d = d.write().await;
|
||
guard
|
||
.storage
|
||
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
|
||
.await?;
|
||
d.message_count += 1;
|
||
} else {
|
||
drop(guard);
|
||
let mut guard2 = self.0.write().await;
|
||
// double check in case concurrent access has loaded this dialog
|
||
if let Some(d) = guard2.dialogs.get(&id) {
|
||
let mut d = d.write().await;
|
||
guard2
|
||
.storage
|
||
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
|
||
.await?;
|
||
d.message_count += 1;
|
||
} else {
|
||
let (p1, p2) = id.as_inner();
|
||
tracing::info!("Dialog {id:?} not found locally, trying to load from storage");
|
||
let stored_dialog = match guard2.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? {
|
||
Some(t) => t,
|
||
None => {
|
||
tracing::info!("Dialog {id:?} does not exist, creating a new one in storage");
|
||
guard2.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await?
|
||
}
|
||
};
|
||
tracing::info!("Dialog {id:?} loaded");
|
||
guard2
|
||
.storage
|
||
.insert_dialog_message(
|
||
stored_dialog.id,
|
||
stored_dialog.message_count,
|
||
from.as_inner(),
|
||
&body,
|
||
created_at,
|
||
)
|
||
.await?;
|
||
let dialog = Dialog {
|
||
storage_id: stored_dialog.id,
|
||
player_storage_id_1: stored_dialog.participant_1,
|
||
player_storage_id_2: stored_dialog.participant_2,
|
||
message_count: stored_dialog.message_count + 1,
|
||
};
|
||
guard2.dialogs.insert(id.clone(), AsyncRwLock::new(dialog));
|
||
}
|
||
guard = guard2.downgrade();
|
||
}
|
||
// TODO send message to the other player and persist it
|
||
let Some(players) = &guard.players else {
|
||
tracing::error!("No player registry present");
|
||
return Ok(());
|
||
};
|
||
let Some(player) = players.get_player(&to).await else {
|
||
tracing::debug!("Player {to:?} not active, not sending message");
|
||
return Ok(());
|
||
};
|
||
let update = Updates::NewDialogMessage {
|
||
sender: from.clone(),
|
||
receiver: to.clone(),
|
||
body: body.clone(),
|
||
created_at: created_at.clone(),
|
||
};
|
||
player.update(update).await;
|
||
return Ok(());
|
||
}
|
||
}
|
||
|
||
impl DialogRegistry {
|
||
pub fn new(storage: Storage) -> DialogRegistry {
|
||
DialogRegistry(Arc::new(AsyncRwLock::new(DialogRegistryInner {
|
||
dialogs: HashMap::new(),
|
||
players: None,
|
||
storage,
|
||
})))
|
||
}
|
||
|
||
pub async fn set_players(&self, players: PlayerRegistry) {
|
||
let mut guard = self.0.write().await;
|
||
guard.players = Some(players);
|
||
}
|
||
|
||
pub async fn unset_players(&self) {
|
||
let mut guard = self.0.write().await;
|
||
guard.players = None;
|
||
}
|
||
|
||
pub fn shutdown(self) -> Result<()> {
|
||
let res = match Arc::try_unwrap(self.0) {
|
||
Ok(e) => e,
|
||
Err(_) => return Err(fail("failed to acquire dialogs ownership on shutdown")),
|
||
};
|
||
let res = res.into_inner();
|
||
drop(res);
|
||
Ok(())
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn test_dialog_id_new() {
|
||
let a = PlayerId::from("a").unwrap();
|
||
let b = PlayerId::from("b").unwrap();
|
||
let id1 = DialogId::new(a.clone(), b.clone());
|
||
let id2 = DialogId::new(a.clone(), b.clone());
|
||
// Dialog ids are invariant with respect to the order of participants
|
||
assert_eq!(id1, id2);
|
||
assert_eq!(id1.as_inner(), (&a, &b));
|
||
assert_eq!(id2.as_inner(), (&a, &b));
|
||
}
|
||
}
|