forked from lavina/lavina
Compare commits
2 Commits
486bc17ec5
...
cb889193c7
Author | SHA1 | Date |
---|---|---|
Nikita Vilunov | cb889193c7 | |
Nikita Vilunov | 23a59bc303 |
|
@ -1,4 +1,4 @@
|
||||||
/target
|
/target
|
||||||
/db.sqlite
|
*.sqlite
|
||||||
.idea/
|
.idea/
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
|
|
@ -6,7 +6,8 @@ use serde::{Deserialize, Serialize};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
mod broadcast;
|
pub mod broadcast;
|
||||||
|
pub mod room;
|
||||||
|
|
||||||
type Addresses = Vec<SocketAddr>;
|
type Addresses = Vec<SocketAddr>;
|
||||||
|
|
||||||
|
@ -33,21 +34,6 @@ pub struct LavinaClient {
|
||||||
client: ClientWithMiddleware,
|
client: ClientWithMiddleware,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
|
||||||
pub struct SendMessageReq<'a> {
|
|
||||||
pub room_id: &'a str,
|
|
||||||
pub player_id: &'a str,
|
|
||||||
pub message: &'a str,
|
|
||||||
pub created_at: &'a str,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
|
||||||
pub struct SetRoomTopicReq<'a> {
|
|
||||||
pub room_id: &'a str,
|
|
||||||
pub player_id: &'a str,
|
|
||||||
pub topic: &'a str,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LavinaClient {
|
impl LavinaClient {
|
||||||
pub fn new(addresses: Addresses) -> Self {
|
pub fn new(addresses: Addresses) -> Self {
|
||||||
let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build();
|
let client = ClientBuilder::new(Client::new()).with(TracingMiddleware::<DefaultSpanBackend>::new()).build();
|
||||||
|
@ -57,40 +43,13 @@ impl LavinaClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, req), name = "LavinaClient::send_room_message")]
|
async fn send_request(&self, node_id: u32, path: &str, req: impl Serialize) -> Result<()> {
|
||||||
pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> Result<()> {
|
|
||||||
tracing::info!("Sending a message to a room on a remote node");
|
|
||||||
let Some(address) = self.addresses.get(node_id as usize) else {
|
let Some(address) = self.addresses.get(node_id as usize) else {
|
||||||
tracing::error!("Failed");
|
|
||||||
return Err(anyhow!("Unknown node"));
|
return Err(anyhow!("Unknown node"));
|
||||||
};
|
};
|
||||||
match self.client.post(format!("http://{}/cluster/rooms/add_message", address)).json(&req).send().await {
|
match self.client.post(format!("http://{}{}", address, path)).json(&req).send().await {
|
||||||
Ok(_) => {
|
Ok(_) => Ok(()),
|
||||||
tracing::info!("Message sent");
|
Err(e) => Err(e.into()),
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("Failed to send message: {e:?}");
|
|
||||||
Err(e.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> Result<()> {
|
|
||||||
tracing::info!("Setting the topic of a room on a remote node");
|
|
||||||
let Some(address) = self.addresses.get(node_id as usize) else {
|
|
||||||
tracing::error!("Failed");
|
|
||||||
return Err(anyhow!("Unknown node"));
|
|
||||||
};
|
|
||||||
match self.client.post(format!("http://{}/cluster/rooms/set_topic", address)).json(&req).send().await {
|
|
||||||
Ok(_) => {
|
|
||||||
tracing::info!("Room topic set");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("Failed to set room topic: {e:?}");
|
|
||||||
Err(e.into())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,29 +1,61 @@
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use crate::player::PlayerId;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
use crate::player::{PlayerId, PlayerRegistry, Updates};
|
||||||
use crate::prelude::Str;
|
use crate::prelude::Str;
|
||||||
use crate::room::RoomId;
|
use crate::room::RoomId;
|
||||||
|
|
||||||
/// Receives updates from other nodes and broadcasts them to local player actors.
|
/// Receives updates from other nodes and broadcasts them to local player actors.
|
||||||
struct Broadcasting {
|
struct BroadcastingInner {
|
||||||
subscriptions: HashMap<RoomId, HashSet<PlayerId>>,
|
subscriptions: HashMap<RoomId, HashSet<PlayerId>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Broadcasting {}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Broadcasting(Arc<Mutex<BroadcastingInner>>);
|
||||||
impl Broadcasting {
|
impl Broadcasting {
|
||||||
/// Creates a new broadcasting instance.
|
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
let inner = BroadcastingInner {
|
||||||
subscriptions: HashMap::new(),
|
subscriptions: HashMap::new(),
|
||||||
|
};
|
||||||
|
Self(Arc::new(Mutex::new(inner)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Broadcasts the given update to subscribed player actors on local node.
|
||||||
|
pub async fn broadcast(
|
||||||
|
&self,
|
||||||
|
players: &PlayerRegistry,
|
||||||
|
room_id: RoomId,
|
||||||
|
author_id: PlayerId,
|
||||||
|
message: Str,
|
||||||
|
created_at: DateTime<Utc>,
|
||||||
|
) {
|
||||||
|
let inner = self.0.lock().await;
|
||||||
|
let Some(subscribers) = inner.subscriptions.get(&room_id) else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let update = Updates::NewMessage {
|
||||||
|
room_id: room_id.clone(),
|
||||||
|
author_id: author_id.clone(),
|
||||||
|
body: message.clone(),
|
||||||
|
created_at: created_at.clone(),
|
||||||
|
};
|
||||||
|
for i in subscribers {
|
||||||
|
if i == &author_id {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Some(player) = players.get_player(i).await else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
player.update(update.clone()).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Broadcasts the given update to player actors.
|
pub async fn subscribe(&self, subscriber: PlayerId, room_id: RoomId) {
|
||||||
pub fn broadcast(&self, room_id: RoomId, author_id: PlayerId, message: Str) {
|
self.0.lock().await.subscriptions.entry(room_id).or_insert_with(HashSet::new).insert(subscriber);
|
||||||
self.subscriptions.get(&room_id).map(|players| {
|
|
||||||
players.iter().for_each(|player_id| {
|
|
||||||
// Send the message to the player actor.
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::clustering::LavinaClient;
|
||||||
|
|
||||||
|
pub mod paths {
|
||||||
|
pub const JOIN: &'static str = "/cluster/rooms/join";
|
||||||
|
pub const LEAVE: &'static str = "/cluster/rooms/leave";
|
||||||
|
pub const ADD_MESSAGE: &'static str = "/cluster/rooms/add_message";
|
||||||
|
pub const SET_TOPIC: &'static str = "/cluster/rooms/set_topic";
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct JoinRoomReq<'a> {
|
||||||
|
pub room_id: &'a str,
|
||||||
|
pub player_id: &'a str,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct LeaveRoomReq<'a> {
|
||||||
|
pub room_id: &'a str,
|
||||||
|
pub player_id: &'a str,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct SendMessageReq<'a> {
|
||||||
|
pub room_id: &'a str,
|
||||||
|
pub player_id: &'a str,
|
||||||
|
pub message: &'a str,
|
||||||
|
pub created_at: &'a str,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct SetRoomTopicReq<'a> {
|
||||||
|
pub room_id: &'a str,
|
||||||
|
pub player_id: &'a str,
|
||||||
|
pub topic: &'a str,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LavinaClient {
|
||||||
|
#[tracing::instrument(skip(self, req), name = "LavinaClient::join_room")]
|
||||||
|
pub async fn join_room(&self, node_id: u32, req: JoinRoomReq<'_>) -> anyhow::Result<()> {
|
||||||
|
self.send_request(node_id, paths::JOIN, req).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, req), name = "LavinaClient::leave_room")]
|
||||||
|
pub async fn leave_room(&self, node_id: u32, req: LeaveRoomReq<'_>) -> anyhow::Result<()> {
|
||||||
|
self.send_request(node_id, paths::LEAVE, req).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, req), name = "LavinaClient::send_room_message")]
|
||||||
|
pub async fn send_room_message(&self, node_id: u32, req: SendMessageReq<'_>) -> anyhow::Result<()> {
|
||||||
|
self.send_request(node_id, paths::ADD_MESSAGE, req).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, req), name = "LavinaClient::set_room_topic")]
|
||||||
|
pub async fn set_room_topic(&self, node_id: u32, req: SetRoomTopicReq<'_>) -> anyhow::Result<()> {
|
||||||
|
self.send_request(node_id, paths::SET_TOPIC, req).await
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,8 +3,10 @@ use crate::clustering::{ClusterConfig, LavinaClient};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use prometheus::Registry as MetricsRegistry;
|
use prometheus::Registry as MetricsRegistry;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use crate::auth::Authenticator;
|
use crate::auth::Authenticator;
|
||||||
|
use crate::clustering::broadcast::Broadcasting;
|
||||||
use crate::dialog::DialogRegistry;
|
use crate::dialog::DialogRegistry;
|
||||||
use crate::player::PlayerRegistry;
|
use crate::player::PlayerRegistry;
|
||||||
use crate::repo::Storage;
|
use crate::repo::Storage;
|
||||||
|
@ -26,6 +28,7 @@ pub struct LavinaCore {
|
||||||
pub players: PlayerRegistry,
|
pub players: PlayerRegistry,
|
||||||
pub rooms: RoomRegistry,
|
pub rooms: RoomRegistry,
|
||||||
pub dialogs: DialogRegistry,
|
pub dialogs: DialogRegistry,
|
||||||
|
pub broadcasting: Broadcasting,
|
||||||
pub authenticator: Authenticator,
|
pub authenticator: Authenticator,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +39,7 @@ impl LavinaCore {
|
||||||
storage: Storage,
|
storage: Storage,
|
||||||
) -> Result<LavinaCore> {
|
) -> Result<LavinaCore> {
|
||||||
// TODO shutdown all services in reverse order on error
|
// TODO shutdown all services in reverse order on error
|
||||||
|
let broadcasting = Broadcasting::new();
|
||||||
let client = LavinaClient::new(cluster_config.addresses.clone());
|
let client = LavinaClient::new(cluster_config.addresses.clone());
|
||||||
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
|
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
|
||||||
let dialogs = DialogRegistry::new(storage.clone());
|
let dialogs = DialogRegistry::new(storage.clone());
|
||||||
|
@ -46,6 +50,7 @@ impl LavinaCore {
|
||||||
&mut metrics,
|
&mut metrics,
|
||||||
Arc::new(cluster_config.metadata),
|
Arc::new(cluster_config.metadata),
|
||||||
client,
|
client,
|
||||||
|
broadcasting.clone(),
|
||||||
)?;
|
)?;
|
||||||
dialogs.set_players(players.clone()).await;
|
dialogs.set_players(players.clone()).await;
|
||||||
let authenticator = Authenticator::new(storage.clone());
|
let authenticator = Authenticator::new(storage.clone());
|
||||||
|
@ -53,6 +58,7 @@ impl LavinaCore {
|
||||||
players,
|
players,
|
||||||
rooms,
|
rooms,
|
||||||
dialogs,
|
dialogs,
|
||||||
|
broadcasting,
|
||||||
authenticator,
|
authenticator,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,9 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tracing::{Instrument, Span};
|
use tracing::{Instrument, Span};
|
||||||
|
|
||||||
use crate::clustering::{ClusterMetadata, LavinaClient, SendMessageReq, SetRoomTopicReq};
|
use crate::clustering::broadcast::Broadcasting;
|
||||||
|
use crate::clustering::room::*;
|
||||||
|
use crate::clustering::{ClusterMetadata, LavinaClient};
|
||||||
use crate::dialog::DialogRegistry;
|
use crate::dialog::DialogRegistry;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::repo::Storage;
|
use crate::repo::Storage;
|
||||||
|
@ -275,6 +277,7 @@ impl PlayerRegistry {
|
||||||
metrics: &mut MetricsRegistry,
|
metrics: &mut MetricsRegistry,
|
||||||
cluster_metadata: Arc<ClusterMetadata>,
|
cluster_metadata: Arc<ClusterMetadata>,
|
||||||
cluster_client: LavinaClient,
|
cluster_client: LavinaClient,
|
||||||
|
broadcasting: Broadcasting,
|
||||||
) -> Result<PlayerRegistry> {
|
) -> Result<PlayerRegistry> {
|
||||||
let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?;
|
let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?;
|
||||||
metrics.register(Box::new(metric_active_players.clone()))?;
|
metrics.register(Box::new(metric_active_players.clone()))?;
|
||||||
|
@ -284,6 +287,7 @@ impl PlayerRegistry {
|
||||||
storage,
|
storage,
|
||||||
cluster_metadata,
|
cluster_metadata,
|
||||||
cluster_client,
|
cluster_client,
|
||||||
|
broadcasting,
|
||||||
players: HashMap::new(),
|
players: HashMap::new(),
|
||||||
metric_active_players,
|
metric_active_players,
|
||||||
};
|
};
|
||||||
|
@ -333,10 +337,12 @@ impl PlayerRegistry {
|
||||||
} else {
|
} else {
|
||||||
let (handle, fiber) = Player::launch(
|
let (handle, fiber) = Player::launch(
|
||||||
id.clone(),
|
id.clone(),
|
||||||
|
self.clone(),
|
||||||
inner.room_registry.clone(),
|
inner.room_registry.clone(),
|
||||||
inner.dialogs.clone(),
|
inner.dialogs.clone(),
|
||||||
inner.cluster_metadata.clone(),
|
inner.cluster_metadata.clone(),
|
||||||
inner.cluster_client.clone(),
|
inner.cluster_client.clone(),
|
||||||
|
inner.broadcasting.clone(),
|
||||||
inner.storage.clone(),
|
inner.storage.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
@ -373,6 +379,7 @@ struct PlayerRegistryInner {
|
||||||
storage: Storage,
|
storage: Storage,
|
||||||
cluster_metadata: Arc<ClusterMetadata>,
|
cluster_metadata: Arc<ClusterMetadata>,
|
||||||
cluster_client: LavinaClient,
|
cluster_client: LavinaClient,
|
||||||
|
broadcasting: Broadcasting,
|
||||||
/// Active player actors.
|
/// Active player actors.
|
||||||
players: HashMap<PlayerId, (PlayerHandle, JoinHandle<Player>)>,
|
players: HashMap<PlayerId, (PlayerHandle, JoinHandle<Player>)>,
|
||||||
metric_active_players: IntGauge,
|
metric_active_players: IntGauge,
|
||||||
|
@ -392,19 +399,23 @@ struct Player {
|
||||||
banned_from: HashSet<RoomId>,
|
banned_from: HashSet<RoomId>,
|
||||||
rx: Receiver<(ActorCommand, Span)>,
|
rx: Receiver<(ActorCommand, Span)>,
|
||||||
handle: PlayerHandle,
|
handle: PlayerHandle,
|
||||||
|
players: PlayerRegistry,
|
||||||
rooms: RoomRegistry,
|
rooms: RoomRegistry,
|
||||||
dialogs: DialogRegistry,
|
dialogs: DialogRegistry,
|
||||||
storage: Storage,
|
storage: Storage,
|
||||||
cluster_metadata: Arc<ClusterMetadata>,
|
cluster_metadata: Arc<ClusterMetadata>,
|
||||||
cluster_client: LavinaClient,
|
cluster_client: LavinaClient,
|
||||||
|
broadcasting: Broadcasting,
|
||||||
}
|
}
|
||||||
impl Player {
|
impl Player {
|
||||||
async fn launch(
|
async fn launch(
|
||||||
player_id: PlayerId,
|
player_id: PlayerId,
|
||||||
|
players: PlayerRegistry,
|
||||||
rooms: RoomRegistry,
|
rooms: RoomRegistry,
|
||||||
dialogs: DialogRegistry,
|
dialogs: DialogRegistry,
|
||||||
cluster_metadata: Arc<ClusterMetadata>,
|
cluster_metadata: Arc<ClusterMetadata>,
|
||||||
cluster_client: LavinaClient,
|
cluster_client: LavinaClient,
|
||||||
|
broadcasting: Broadcasting,
|
||||||
storage: Storage,
|
storage: Storage,
|
||||||
) -> (PlayerHandle, JoinHandle<Player>) {
|
) -> (PlayerHandle, JoinHandle<Player>) {
|
||||||
let (tx, rx) = channel(32);
|
let (tx, rx) = channel(32);
|
||||||
|
@ -422,11 +433,13 @@ impl Player {
|
||||||
banned_from: HashSet::new(),
|
banned_from: HashSet::new(),
|
||||||
rx,
|
rx,
|
||||||
handle,
|
handle,
|
||||||
|
players,
|
||||||
rooms,
|
rooms,
|
||||||
dialogs,
|
dialogs,
|
||||||
storage,
|
storage,
|
||||||
cluster_metadata,
|
cluster_metadata,
|
||||||
cluster_client,
|
cluster_client,
|
||||||
|
broadcasting,
|
||||||
};
|
};
|
||||||
let fiber = tokio::task::spawn(player.main_loop());
|
let fiber = tokio::task::spawn(player.main_loop());
|
||||||
(handle_clone, fiber)
|
(handle_clone, fiber)
|
||||||
|
@ -449,7 +462,8 @@ impl Player {
|
||||||
let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
|
let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
|
||||||
for room_id in rooms {
|
for room_id in rooms {
|
||||||
if let Some(remote_node) = self.room_location(&room_id) {
|
if let Some(remote_node) = self.room_location(&room_id) {
|
||||||
self.my_rooms.insert(room_id, RoomRef::Remote { node_id: remote_node });
|
self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node });
|
||||||
|
self.broadcasting.subscribe(self.player_id.clone(), room_id).await;
|
||||||
} else {
|
} else {
|
||||||
let room = self.rooms.get_room(&room_id).await;
|
let room = self.rooms.get_room(&room_id).await;
|
||||||
if let Some(room) = room {
|
if let Some(room) = room {
|
||||||
|
@ -573,7 +587,19 @@ impl Player {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(remote_node) = self.room_location(&room_id) {
|
if let Some(remote_node) = self.room_location(&room_id) {
|
||||||
todo!()
|
let req = JoinRoomReq {
|
||||||
|
room_id: room_id.as_inner(),
|
||||||
|
player_id: self.player_id.as_inner(),
|
||||||
|
};
|
||||||
|
self.cluster_client.join_room(remote_node, req).await.unwrap();
|
||||||
|
let room_storage_id = self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
|
||||||
|
self.storage.add_room_member(room_storage_id, self.storage_id).await.unwrap();
|
||||||
|
self.my_rooms.insert(room_id.clone(), RoomRef::Remote { node_id: remote_node });
|
||||||
|
JoinResult::Success(RoomInfo {
|
||||||
|
id: room_id,
|
||||||
|
topic: "unknown".into(),
|
||||||
|
members: vec![],
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
let room = match self.rooms.get_or_create_room(room_id.clone()).await {
|
let room = match self.rooms.get_or_create_room(room_id.clone()).await {
|
||||||
Ok(room) => room,
|
Ok(room) => room,
|
||||||
|
@ -599,9 +625,22 @@ impl Player {
|
||||||
async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) {
|
async fn leave_room(&mut self, connection_id: ConnectionId, room_id: RoomId) {
|
||||||
let room = self.my_rooms.remove(&room_id);
|
let room = self.my_rooms.remove(&room_id);
|
||||||
if let Some(room) = room {
|
if let Some(room) = room {
|
||||||
panic!();
|
match room {
|
||||||
// room.unsubscribe(&self.player_id).await;
|
RoomRef::Local(room) => {
|
||||||
// room.remove_member(&self.player_id, self.storage_id).await;
|
room.unsubscribe(&self.player_id).await;
|
||||||
|
room.remove_member(&self.player_id, self.storage_id).await;
|
||||||
|
let room_storage_id =
|
||||||
|
self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
|
||||||
|
self.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap();
|
||||||
|
}
|
||||||
|
RoomRef::Remote { node_id } => {
|
||||||
|
let req = LeaveRoomReq {
|
||||||
|
room_id: room_id.as_inner(),
|
||||||
|
player_id: self.player_id.as_inner(),
|
||||||
|
};
|
||||||
|
self.cluster_client.leave_room(node_id, req).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let update = Updates::RoomLeft {
|
let update = Updates::RoomLeft {
|
||||||
room_id,
|
room_id,
|
||||||
|
@ -634,6 +673,15 @@ impl Player {
|
||||||
created_at: &*created_at.to_rfc3339(),
|
created_at: &*created_at.to_rfc3339(),
|
||||||
};
|
};
|
||||||
self.cluster_client.send_room_message(*node_id, req).await.unwrap();
|
self.cluster_client.send_room_message(*node_id, req).await.unwrap();
|
||||||
|
self.broadcasting
|
||||||
|
.broadcast(
|
||||||
|
&self.players,
|
||||||
|
room_id.clone(),
|
||||||
|
self.player_id.clone(),
|
||||||
|
body.clone(),
|
||||||
|
created_at.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let update = Updates::NewMessage {
|
let update = Updates::NewMessage {
|
||||||
|
|
|
@ -48,4 +48,19 @@ impl Storage {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn create_or_retrieve_room_id_by_name(&self, name: &str) -> Result<u32> {
|
||||||
|
let mut executor = self.conn.lock().await;
|
||||||
|
let res: (u32,) = sqlx::query_as(
|
||||||
|
"insert into rooms(name, topic)
|
||||||
|
values (?, '')
|
||||||
|
on conflict(name) do nothing
|
||||||
|
returning id;",
|
||||||
|
)
|
||||||
|
.bind(name)
|
||||||
|
.fetch_one(&mut *executor)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(res.0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,21 @@ impl Storage {
|
||||||
Ok(res.map(|(id,)| id))
|
Ok(res.map(|(id,)| id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn create_or_retrieve_user_id_by_name(&self, name: &str) -> Result<u32> {
|
||||||
|
let mut executor = self.conn.lock().await;
|
||||||
|
let res: (u32,) = sqlx::query_as(
|
||||||
|
"insert into users(name)
|
||||||
|
values (?)
|
||||||
|
on conflict(name) do update set name = excluded.name
|
||||||
|
returning id;",
|
||||||
|
)
|
||||||
|
.bind(name)
|
||||||
|
.fetch_one(&mut *executor)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(res.0)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_rooms_of_a_user(&self, user_id: u32) -> Result<Vec<RoomId>> {
|
pub async fn get_rooms_of_a_user(&self, user_id: u32) -> Result<Vec<RoomId>> {
|
||||||
let mut executor = self.conn.lock().await;
|
let mut executor = self.conn.lock().await;
|
||||||
let res: Vec<(String,)> = sqlx::query_as(
|
let res: Vec<(String,)> = sqlx::query_as(
|
||||||
|
|
|
@ -60,7 +60,7 @@ impl RoomRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")]
|
#[tracing::instrument(skip(self), name = "RoomRegistry::get_or_create_room")]
|
||||||
pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> {
|
pub async fn get_or_create_room(&self, room_id: RoomId) -> Result<RoomHandle> {
|
||||||
let mut inner = self.0.write().await;
|
let mut inner = self.0.write().await;
|
||||||
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? {
|
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? {
|
||||||
Ok(room_handle.clone())
|
Ok(room_handle.clone())
|
||||||
|
|
|
@ -9,6 +9,7 @@ use tokio::net::tcp::{ReadHalf, WriteHalf};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
use lavina_core::auth::Authenticator;
|
use lavina_core::auth::Authenticator;
|
||||||
|
use lavina_core::clustering::{ClusterConfig, ClusterMetadata};
|
||||||
use lavina_core::player::{JoinResult, PlayerId, SendMessageResult};
|
use lavina_core::player::{JoinResult, PlayerId, SendMessageResult};
|
||||||
use lavina_core::repo::{Storage, StorageConfig};
|
use lavina_core::repo::{Storage, StorageConfig};
|
||||||
use lavina_core::room::RoomId;
|
use lavina_core::room::RoomId;
|
||||||
|
@ -118,7 +119,16 @@ impl TestServer {
|
||||||
db_path: ":memory:".into(),
|
db_path: ":memory:".into(),
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
|
let cluster_config = ClusterConfig {
|
||||||
|
addresses: vec![],
|
||||||
|
metadata: ClusterMetadata {
|
||||||
|
node_id: 0,
|
||||||
|
main_owner: 0,
|
||||||
|
test_owner: 0,
|
||||||
|
test2_owner: 0,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?;
|
||||||
let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
|
let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
|
||||||
Ok(TestServer {
|
Ok(TestServer {
|
||||||
metrics,
|
metrics,
|
||||||
|
@ -133,6 +143,15 @@ impl TestServer {
|
||||||
listen_on: "127.0.0.1:0".parse().unwrap(),
|
listen_on: "127.0.0.1:0".parse().unwrap(),
|
||||||
server_name: "testserver".into(),
|
server_name: "testserver".into(),
|
||||||
};
|
};
|
||||||
|
let cluster_config = ClusterConfig {
|
||||||
|
addresses: vec![],
|
||||||
|
metadata: ClusterMetadata {
|
||||||
|
node_id: 0,
|
||||||
|
main_owner: 0,
|
||||||
|
test_owner: 0,
|
||||||
|
test2_owner: 0,
|
||||||
|
},
|
||||||
|
};
|
||||||
let TestServer {
|
let TestServer {
|
||||||
metrics: _,
|
metrics: _,
|
||||||
storage,
|
storage,
|
||||||
|
@ -142,7 +161,7 @@ impl TestServer {
|
||||||
server.terminate().await?;
|
server.terminate().await?;
|
||||||
core.shutdown().await?;
|
core.shutdown().await?;
|
||||||
let metrics = MetricsRegistry::new();
|
let metrics = MetricsRegistry::new();
|
||||||
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
|
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?;
|
||||||
let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
|
let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
|
||||||
Ok(TestServer {
|
Ok(TestServer {
|
||||||
metrics,
|
metrics,
|
||||||
|
|
37
src/http.rs
37
src/http.rs
|
@ -13,16 +13,16 @@ use serde::{Deserialize, Serialize};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
use lavina_core::auth::UpdatePasswordResult;
|
use lavina_core::auth::UpdatePasswordResult;
|
||||||
use lavina_core::clustering::SendMessageReq;
|
|
||||||
use lavina_core::player::{PlayerId, PlayerRegistry, SendMessageResult};
|
use lavina_core::player::{PlayerId, PlayerRegistry, SendMessageResult};
|
||||||
use lavina_core::prelude::*;
|
use lavina_core::prelude::*;
|
||||||
use lavina_core::repo::Storage;
|
use lavina_core::repo::Storage;
|
||||||
use lavina_core::room::{RoomId, RoomRegistry};
|
use lavina_core::room::{RoomId, RoomRegistry};
|
||||||
use lavina_core::terminator::Terminator;
|
use lavina_core::terminator::Terminator;
|
||||||
use lavina_core::LavinaCore;
|
use lavina_core::LavinaCore;
|
||||||
|
|
||||||
use mgmt_api::*;
|
use mgmt_api::*;
|
||||||
|
|
||||||
|
mod clustering;
|
||||||
|
|
||||||
type HttpResult<T> = std::result::Result<T, Infallible>;
|
type HttpResult<T> = std::result::Result<T, Infallible>;
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
|
@ -91,8 +91,7 @@ async fn route(
|
||||||
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, core).await.or5xx(),
|
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, core).await.or5xx(),
|
||||||
(&Method::POST, rooms::paths::SEND_MESSAGE) => endpoint_send_room_message(request, core).await.or5xx(),
|
(&Method::POST, rooms::paths::SEND_MESSAGE) => endpoint_send_room_message(request, core).await.or5xx(),
|
||||||
(&Method::POST, rooms::paths::SET_TOPIC) => endpoint_set_room_topic(request, core).await.or5xx(),
|
(&Method::POST, rooms::paths::SET_TOPIC) => endpoint_set_room_topic(request, core).await.or5xx(),
|
||||||
(&Method::POST, "/cluster/rooms/add_message") => endpoint_cluster_add_message(request, core).await.or5xx(),
|
_ => clustering::route(core, storage, request).await.unwrap_or_else(endpoint_not_found),
|
||||||
_ => endpoint_not_found(),
|
|
||||||
};
|
};
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
@ -208,36 +207,6 @@ async fn endpoint_set_room_topic(
|
||||||
Ok(empty_204_request())
|
Ok(empty_204_request())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, name = "endpoint_cluster_add_message")]
|
|
||||||
async fn endpoint_cluster_add_message(
|
|
||||||
request: Request<hyper::body::Incoming>,
|
|
||||||
core: &LavinaCore,
|
|
||||||
) -> Result<Response<Full<Bytes>>> {
|
|
||||||
let str = request.collect().await?.to_bytes();
|
|
||||||
let Ok(req) = serde_json::from_slice::<SendMessageReq>(&str[..]) else {
|
|
||||||
return Ok(malformed_request());
|
|
||||||
};
|
|
||||||
tracing::info!("Incoming request: {:?}", &req);
|
|
||||||
let Ok(created_at) = chrono::DateTime::parse_from_rfc3339(req.created_at) else {
|
|
||||||
dbg!(&req.created_at);
|
|
||||||
return Ok(malformed_request());
|
|
||||||
};
|
|
||||||
let Ok(room_id) = RoomId::from(req.room_id) else {
|
|
||||||
dbg!(&req.room_id);
|
|
||||||
return Ok(room_not_found());
|
|
||||||
};
|
|
||||||
let Ok(player_id) = PlayerId::from(req.player_id) else {
|
|
||||||
dbg!(&req.player_id);
|
|
||||||
return Ok(player_not_found());
|
|
||||||
};
|
|
||||||
let Some(room_handle) = core.rooms.get_room(&room_id).await else {
|
|
||||||
dbg!(&room_id);
|
|
||||||
return Ok(room_not_found());
|
|
||||||
};
|
|
||||||
room_handle.send_message(&player_id, req.message.into(), created_at.to_utc()).await;
|
|
||||||
Ok(empty_204_request())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn endpoint_not_found() -> Response<Full<Bytes>> {
|
fn endpoint_not_found() -> Response<Full<Bytes>> {
|
||||||
let payload = ErrorResponse {
|
let payload = ErrorResponse {
|
||||||
code: errors::INVALID_PATH,
|
code: errors::INVALID_PATH,
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
use http_body_util::{BodyExt, Full};
|
||||||
|
use hyper::body::Bytes;
|
||||||
|
use hyper::{Method, Request, Response};
|
||||||
|
|
||||||
|
use super::Or5xx;
|
||||||
|
use crate::http::{empty_204_request, malformed_request, player_not_found, room_not_found};
|
||||||
|
use lavina_core::clustering::room::{paths, JoinRoomReq, SendMessageReq};
|
||||||
|
use lavina_core::player::PlayerId;
|
||||||
|
use lavina_core::repo::Storage;
|
||||||
|
use lavina_core::room::RoomId;
|
||||||
|
use lavina_core::LavinaCore;
|
||||||
|
|
||||||
|
pub async fn route(
|
||||||
|
core: &LavinaCore,
|
||||||
|
storage: &Storage,
|
||||||
|
request: Request<hyper::body::Incoming>,
|
||||||
|
) -> Option<Response<Full<Bytes>>> {
|
||||||
|
match (request.method(), request.uri().path()) {
|
||||||
|
(&Method::POST, paths::JOIN) => Some(endpoint_cluster_join_room(request, core, storage).await.or5xx()),
|
||||||
|
(&Method::POST, paths::ADD_MESSAGE) => Some(endpoint_cluster_add_message(request, core).await.or5xx()),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all, name = "endpoint_cluster_join_room")]
|
||||||
|
async fn endpoint_cluster_join_room(
|
||||||
|
request: Request<hyper::body::Incoming>,
|
||||||
|
core: &LavinaCore,
|
||||||
|
storage: &Storage,
|
||||||
|
) -> lavina_core::prelude::Result<Response<Full<Bytes>>> {
|
||||||
|
let str = request.collect().await?.to_bytes();
|
||||||
|
let Ok(req) = serde_json::from_slice::<JoinRoomReq>(&str[..]) else {
|
||||||
|
return Ok(malformed_request());
|
||||||
|
};
|
||||||
|
tracing::info!("Incoming request: {:?}", &req);
|
||||||
|
let Ok(room_id) = RoomId::from(req.room_id) else {
|
||||||
|
dbg!(&req.room_id);
|
||||||
|
return Ok(room_not_found());
|
||||||
|
};
|
||||||
|
let Ok(player_id) = PlayerId::from(req.player_id) else {
|
||||||
|
dbg!(&req.player_id);
|
||||||
|
return Ok(player_not_found());
|
||||||
|
};
|
||||||
|
let room_handle = core.rooms.get_or_create_room(room_id).await.unwrap();
|
||||||
|
let storage_id = storage.create_or_retrieve_user_id_by_name(req.player_id).await?;
|
||||||
|
room_handle.add_member(&player_id, storage_id).await;
|
||||||
|
Ok(empty_204_request())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all, name = "endpoint_cluster_add_message")]
|
||||||
|
async fn endpoint_cluster_add_message(
|
||||||
|
request: Request<hyper::body::Incoming>,
|
||||||
|
core: &LavinaCore,
|
||||||
|
) -> lavina_core::prelude::Result<Response<Full<Bytes>>> {
|
||||||
|
let str = request.collect().await?.to_bytes();
|
||||||
|
let Ok(req) = serde_json::from_slice::<SendMessageReq>(&str[..]) else {
|
||||||
|
return Ok(malformed_request());
|
||||||
|
};
|
||||||
|
tracing::info!("Incoming request: {:?}", &req);
|
||||||
|
let Ok(created_at) = chrono::DateTime::parse_from_rfc3339(req.created_at) else {
|
||||||
|
dbg!(&req.created_at);
|
||||||
|
return Ok(malformed_request());
|
||||||
|
};
|
||||||
|
let Ok(room_id) = RoomId::from(req.room_id) else {
|
||||||
|
dbg!(&req.room_id);
|
||||||
|
return Ok(room_not_found());
|
||||||
|
};
|
||||||
|
let Ok(player_id) = PlayerId::from(req.player_id) else {
|
||||||
|
dbg!(&req.player_id);
|
||||||
|
return Ok(player_not_found());
|
||||||
|
};
|
||||||
|
let Some(room_handle) = core.rooms.get_room(&room_id).await else {
|
||||||
|
dbg!(&room_id);
|
||||||
|
return Ok(room_not_found());
|
||||||
|
};
|
||||||
|
room_handle.send_message(&player_id, req.message.into(), created_at.to_utc()).await;
|
||||||
|
Ok(empty_204_request())
|
||||||
|
}
|
Loading…
Reference in New Issue