forked from lavina/lavina
1
0
Fork 0

move methods to services

This commit is contained in:
Nikita Vilunov 2024-05-13 00:28:24 +02:00
parent 825d41fa3a
commit 51c6e51d9c
4 changed files with 28 additions and 45 deletions

View File

@ -3,9 +3,10 @@ use std::collections::{HashMap, HashSet};
use chrono::{DateTime, Utc};
use tokio::sync::Mutex;
use crate::player::{PlayerId, PlayerRegistry, Updates};
use crate::player::{PlayerId, Updates};
use crate::prelude::Str;
use crate::room::RoomId;
use crate::Services;
/// Receives updates from other nodes and broadcasts them to local player actors.
struct BroadcastingInner {
@ -20,18 +21,12 @@ impl Broadcasting {
};
Self(Mutex::new(inner))
}
}
impl Services {
/// Broadcasts the given update to subscribed player actors on local node.
#[tracing::instrument(skip(self, players, message, created_at), name = "Broadcasting::broadcast")]
pub async fn broadcast(
&self,
players: &PlayerRegistry,
room_id: RoomId,
author_id: PlayerId,
message: Str,
created_at: DateTime<Utc>,
) {
let inner = self.0.lock().await;
#[tracing::instrument(skip(self, message, created_at))]
pub async fn broadcast(&self, room_id: RoomId, author_id: PlayerId, message: Str, created_at: DateTime<Utc>) {
let inner = self.broadcasting.0.lock().await;
let Some(subscribers) = inner.subscriptions.get(&room_id) else {
return;
};
@ -45,7 +40,7 @@ impl Broadcasting {
if i == &author_id {
continue;
}
let Some(player) = players.get_player(i).await else {
let Some(player) = self.players.get_player(i).await else {
continue;
};
player.update(update.clone()).await;
@ -53,6 +48,6 @@ impl Broadcasting {
}
pub async fn subscribe(&self, subscriber: PlayerId, room_id: RoomId) {
self.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber);
self.broadcasting.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber);
}
}

View File

@ -43,22 +43,22 @@ pub struct SetRoomTopicReq<'a> {
impl LavinaClient {
#[tracing::instrument(skip(self, req), name = "LavinaClient::join_room")]
pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> anyhow::Result<()> {
pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> Result<()> {
self.send_request(node_id, paths::JOIN, req).await
}
#[tracing::instrument(skip(self, req), name = "LavinaClient::leave_room")]
pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> anyhow::Result<()> {
pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> Result<()> {
self.send_request(node_id, paths::LEAVE, req).await
}
#[tracing::instrument(skip(self, req), name = "LavinaClient::send_room_message")]
pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> anyhow::Result<()> {
pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> Result<()> {
self.send_request(node_id, paths::ADD_MESSAGE, req).await
}
#[tracing::instrument(skip(self, req), name = "LavinaClient::set_room_topic")]
pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> anyhow::Result<()> {
pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> Result<()> {
self.send_request(node_id, paths::SET_TOPIC, req).await
}
}

View File

@ -48,49 +48,46 @@ struct DialogRegistryInner {
pub(crate) struct DialogRegistry(AsyncRwLock<DialogRegistryInner>);
impl DialogRegistry {
pub async fn send_message(
impl Services {
#[tracing::instrument(skip(self, body, created_at))]
pub async fn send_dialog_message(
&self,
services: &Services,
from: PlayerId,
to: PlayerId,
body: Str,
created_at: &DateTime<Utc>,
) -> Result<()> {
let guard = self.0.read().await;
let guard = self.dialogs.0.read().await;
let id = DialogId::new(from.clone(), to.clone());
let dialog = guard.dialogs.get(&id);
if let Some(d) = dialog {
let mut d = d.write().await;
services
.storage
self.storage
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
.await?;
d.message_count += 1;
} else {
drop(guard);
let mut guard2 = self.0.write().await;
let mut guard2 = self.dialogs.0.write().await;
// double check in case concurrent access has loaded this dialog
if let Some(d) = guard2.dialogs.get(&id) {
let mut d = d.write().await;
services
.storage
self.storage
.insert_dialog_message(d.storage_id, d.message_count, from.as_inner(), &body, created_at)
.await?;
d.message_count += 1;
} else {
let (p1, p2) = id.as_inner();
tracing::info!("Dialog {id:?} not found locally, trying to load from storage");
let stored_dialog = match services.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? {
let stored_dialog = match self.storage.retrieve_dialog(p1.as_inner(), p2.as_inner()).await? {
Some(t) => t,
None => {
tracing::info!("Dialog {id:?} does not exist, creating a new one in storage");
services.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await?
self.storage.initialize_dialog(p1.as_inner(), p2.as_inner(), created_at).await?
}
};
tracing::info!("Dialog {id:?} loaded");
services
.storage
self.storage
.insert_dialog_message(
stored_dialog.id,
stored_dialog.message_count,
@ -110,7 +107,7 @@ impl DialogRegistry {
drop(guard2);
}
// TODO send message to the other player and persist it
let Some(player) = services.players.get_player(&to).await else {
let Some(player) = self.players.get_player(&to).await else {
tracing::debug!("Player {to:?} not active, not sending message");
return Ok(());
};

View File

@ -398,7 +398,7 @@ impl Player {
for room_id in rooms {
if let Some(remote_node) = self.room_location(&room_id) {
self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node });
self.services.broadcasting.subscribe(self.player_id.clone(), room_id).await;
self.services.subscribe(self.player_id.clone(), room_id).await;
} else {
let room = self.services.rooms.get_room(&self.services, &room_id).await;
if let Some(room) = room {
@ -596,7 +596,7 @@ impl Player {
tracing::info!("no room found");
return SendMessageResult::NoSuchRoom;
};
let created_at = chrono::Utc::now();
let created_at = Utc::now();
match room {
RoomRef::Local(room) => {
room.send_message(&self.services, &self.player_id, body.clone(), created_at.clone()).await;
@ -610,9 +610,7 @@ impl Player {
};
self.services.client.send_room_message(*node_id, req).await.unwrap();
self.services
.broadcasting
.broadcast(
&self.services.players,
room_id.clone(),
self.player_id.clone(),
body.clone(),
@ -677,16 +675,9 @@ impl Player {
#[tracing::instrument(skip(self, body), name = "Player::send_dialog_message")]
async fn send_dialog_message(&self, connection_id: ConnectionId, recipient: PlayerId, body: Str) {
let created_at = chrono::Utc::now();
let created_at = Utc::now();
self.services
.dialogs
.send_message(
&self.services,
self.player_id.clone(),
recipient.clone(),
body.clone(),
&created_at,
)
.send_dialog_message(self.player_id.clone(), recipient.clone(), body.clone(), &created_at)
.await
.unwrap();
let update = Updates::NewDialogMessage {