forked from lavina/lavina
1
0
Fork 0
This commit is contained in:
Nikita Vilunov 2024-05-05 15:56:01 +02:00
parent cc7f282d92
commit 05adfe4920
2 changed files with 89 additions and 37 deletions

View File

@ -39,6 +39,13 @@ pub struct SendMessageReq<'a> {
pub created_at: &'a str, pub created_at: &'a str,
} }
#[derive(Serialize, Deserialize, Debug)]
pub struct SetRoomTopicReq<'a> {
pub room_id: &'a str,
pub player_id: &'a str,
pub topic: &'a str,
}
impl LavinaClient { impl LavinaClient {
pub fn new(addresses: Addresses) -> Self { pub fn new(addresses: Addresses) -> Self {
let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build(); let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build();
@ -66,4 +73,22 @@ impl LavinaClient {
} }
} }
} }
pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> Result<()> {
tracing::info!("Setting the topic of a room on a remote node");
let Some(address) = self.addresses.get(node_id as usize) else {
tracing::error!("Failed");
return Err(anyhow!("Unknown node"));
};
match self.client.post(format!("http://{}/cluster/rooms/set_topic", address)).json(&req).send().await {
Ok(_) => {
tracing::info!("Room topic set");
Ok(())
}
Err(e) => {
tracing::error!("Failed to set room topic: {e:?}");
Err(e.into())
}
}
}
} }

View File

@ -17,7 +17,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
use crate::clustering::{ClusterMetadata, LavinaClient, SendMessageReq}; use crate::clustering::{ClusterMetadata, LavinaClient, SendMessageReq, SetRoomTopicReq};
use crate::dialog::DialogRegistry; use crate::dialog::DialogRegistry;
use crate::prelude::*; use crate::prelude::*;
use crate::repo::Storage; use crate::repo::Storage;
@ -60,7 +60,7 @@ pub struct PlayerConnection {
player_handle: PlayerHandle, player_handle: PlayerHandle,
} }
impl PlayerConnection { impl PlayerConnection {
/// Handled in [Player::send_message]. /// Handled in [Player::send_room_message].
#[tracing::instrument(skip(self, body), name = "PlayerConnection::send_message")] #[tracing::instrument(skip(self, body), name = "PlayerConnection::send_message")]
pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<SendMessageResult> { pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<SendMessageResult> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
@ -78,7 +78,7 @@ impl PlayerConnection {
Ok(deferred.await?) Ok(deferred.await?)
} }
/// Handled in [Player::change_topic]. /// Handled in [Player::change_room_topic].
#[tracing::instrument(skip(self, new_topic), name = "PlayerConnection::change_topic")] #[tracing::instrument(skip(self, new_topic), name = "PlayerConnection::change_topic")]
pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
@ -413,23 +413,31 @@ impl Player {
(handle_clone, fiber) (handle_clone, fiber)
} }
fn room_location(&self, room_id: &RoomId) -> Option<u32> {
let node = match &**room_id.as_inner() {
"aaaaa" => self.cluster_metadata.test_owner,
"test" => self.cluster_metadata.test2_owner,
_ => self.cluster_metadata.main_owner,
};
if node == self.cluster_metadata.node_id {
None
} else {
Some(node)
}
}
async fn main_loop(mut self) -> Self { async fn main_loop(mut self) -> Self {
let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap(); let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
for room_id in rooms { for room_id in rooms {
let node = match &**room_id.as_inner() { if let Some(remote_node) = self.room_location(&room_id) {
"aaaaa" => self.cluster_metadata.test_owner, self.my_rooms.insert(room_id, RoomRef::Remote { node_id: remote_node });
"test" => self.cluster_metadata.test2_owner, } else {
_ => self.cluster_metadata.main_owner,
};
if node == self.cluster_metadata.node_id {
let room = self.rooms.get_room(&room_id).await; let room = self.rooms.get_room(&room_id).await;
if let Some(room) = room { if let Some(room) = room {
self.my_rooms.insert(room_id, RoomRef::Local(room)); self.my_rooms.insert(room_id, RoomRef::Local(room));
} else { } else {
tracing::error!("Room #{room_id:?} not found"); tracing::error!("Room #{room_id:?} not found");
} }
} else {
self.my_rooms.insert(room_id, RoomRef::Remote { node_id: node });
} }
} }
while let Some(cmd) = self.rx.recv().await { while let Some(cmd) = self.rx.recv().await {
@ -506,7 +514,7 @@ impl Player {
let _ = promise.send(()); let _ = promise.send(());
} }
ClientCommand::SendMessage { room_id, body, promise } => { ClientCommand::SendMessage { room_id, body, promise } => {
let result = self.send_message(connection_id, room_id, body).await; let result = self.send_room_message(connection_id, room_id, body).await;
let _ = promise.send(result); let _ = promise.send(result);
} }
ClientCommand::ChangeTopic { ClientCommand::ChangeTopic {
@ -514,7 +522,7 @@ impl Player {
new_topic, new_topic,
promise, promise,
} => { } => {
self.change_topic(connection_id, room_id, new_topic).await; self.change_room_topic(connection_id, room_id, new_topic).await;
let _ = promise.send(()); let _ = promise.send(());
} }
ClientCommand::GetRooms { promise } => { ClientCommand::GetRooms { promise } => {
@ -541,24 +549,27 @@ impl Player {
return JoinResult::AlreadyJoined; return JoinResult::AlreadyJoined;
} }
let room = match self.rooms.get_or_create_room(room_id.clone()).await { if let Some(remote_node) = self.room_location(&room_id) {
Ok(room) => room, todo!()
Err(e) => { } else {
log::error!("Failed to get or create room: {e}"); let room = match self.rooms.get_or_create_room(room_id.clone()).await {
todo!(); Ok(room) => room,
} Err(e) => {
}; log::error!("Failed to get or create room: {e}");
room.add_member(&self.player_id, self.storage_id).await; todo!();
room.subscribe(&self.player_id, self.handle.clone()).await; }
// self.my_rooms.insert(room_id.clone(), room.clone()); };
panic!(); room.add_member(&self.player_id, self.storage_id).await;
let room_info = room.get_room_info().await; room.subscribe(&self.player_id, self.handle.clone()).await;
let update = Updates::RoomJoined { self.my_rooms.insert(room_id.clone(), RoomRef::Local(room.clone()));
room_id, let room_info = room.get_room_info().await;
new_member_id: self.player_id.clone(), let update = Updates::RoomJoined {
}; room_id,
self.broadcast_update(update, connection_id).await; new_member_id: self.player_id.clone(),
JoinResult::Success(room_info) };
self.broadcast_update(update, connection_id).await;
JoinResult::Success(room_info)
}
} }
#[tracing::instrument(skip(self), name = "Player::leave_room")] #[tracing::instrument(skip(self), name = "Player::leave_room")]
@ -576,8 +587,13 @@ impl Player {
self.broadcast_update(update, connection_id).await; self.broadcast_update(update, connection_id).await;
} }
#[tracing::instrument(skip(self, body), name = "Player::send_message")] #[tracing::instrument(skip(self, body), name = "Player::send_room_message")]
async fn send_message(&mut self, connection_id: ConnectionId, room_id: RoomId, body: Str) -> SendMessageResult { async fn send_room_message(
&mut self,
connection_id: ConnectionId,
room_id: RoomId,
body: Str,
) -> SendMessageResult {
let Some(room) = self.my_rooms.get(&room_id) else { let Some(room) = self.my_rooms.get(&room_id) else {
tracing::info!("no room found"); tracing::info!("no room found");
return SendMessageResult::NoSuchRoom; return SendMessageResult::NoSuchRoom;
@ -607,14 +623,25 @@ impl Player {
SendMessageResult::Success(created_at) SendMessageResult::Success(created_at)
} }
#[tracing::instrument(skip(self, new_topic), name = "Player::change_topic")] #[tracing::instrument(skip(self, new_topic), name = "Player::change_room_topic")]
async fn change_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { async fn change_room_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) {
let Some(room) = self.my_rooms.get(&room_id) else { let Some(room) = self.my_rooms.get(&room_id) else {
tracing::info!("no room found"); tracing::info!("no room found");
return; return;
}; };
// room.set_topic(&self.player_id, new_topic.clone()).await; match room {
todo!(); RoomRef::Local(room) => {
room.set_topic(&self.player_id, new_topic.clone()).await;
}
RoomRef::Remote { node_id } => {
let req = SetRoomTopicReq {
room_id: room_id.as_inner(),
player_id: self.player_id.as_inner(),
topic: &*new_topic,
};
self.cluster_client.set_room_topic(*node_id, req).await.unwrap();
}
}
let update = Updates::RoomTopicChanged { room_id, new_topic }; let update = Updates::RoomTopicChanged { room_id, new_topic };
self.broadcast_update(update, connection_id).await; self.broadcast_update(update, connection_id).await;
} }