lavina/crates/lavina-core/src/clustering/broadcast.rs

54 lines
1.7 KiB
Rust

use std::collections::{HashMap, HashSet};
use chrono::{DateTime, Utc};
use tokio::sync::Mutex;
use crate::player::{PlayerId, Updates};
use crate::prelude::Str;
use crate::room::RoomId;
use crate::Services;
/// Receives updates from other nodes and broadcasts them to local player actors.
struct BroadcastingInner {
subscriptions: HashMap<RoomId, HashSet<PlayerId>>,
}
pub struct Broadcasting(Mutex<BroadcastingInner>);
impl Broadcasting {
pub fn new() -> Self {
let inner = BroadcastingInner {
subscriptions: HashMap::new(),
};
Self(Mutex::new(inner))
}
}
impl Services {
/// Broadcasts the given update to subscribed player actors on local node.
#[tracing::instrument(skip(self, message, created_at))]
pub async fn broadcast(&self, room_id: RoomId, author_id: PlayerId, message: Str, created_at: DateTime<Utc>) {
let inner = self.broadcasting.0.lock().await;
let Some(subscribers) = inner.subscriptions.get(&room_id) else {
return;
};
let update = Updates::NewMessage {
room_id: room_id.clone(),
author_id: author_id.clone(),
body: message.clone(),
created_at: created_at.clone(),
};
for i in subscribers {
if i == &author_id {
continue;
}
let Some(player) = self.players.get_player(i).await else {
continue;
};
player.update(update.clone()).await;
}
}
pub async fn subscribe(&self, subscriber: PlayerId, room_id: RoomId) {
self.broadcasting.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber);
}
}