management api endpoints for rooms

This commit is contained in:
Nikita Vilunov 2024-05-01 17:30:31 +02:00
parent a87f7c9d73
commit 9a09ff717e
6 changed files with 94 additions and 10 deletions

1
Cargo.lock generated
View File

@ -1062,6 +1062,7 @@ version = "0.0.3-dev"
dependencies = [
"anyhow",
"assert_matches",
"chrono",
"clap",
"derive_more",
"figment",

View File

@ -64,6 +64,7 @@ opentelemetry-semantic-conventions = "0.14.0"
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
opentelemetry-otlp = "0.15.0"
tracing-opentelemetry = "0.23.0"
chrono.workspace = true
[dev-dependencies]
assert_matches.workspace = true

View File

@ -297,7 +297,7 @@ impl PlayerRegistry {
}
#[tracing::instrument(skip(self), name = "PlayerRegistry::get_or_launch_player")]
pub async fn get_or_launch_player(&mut self, id: &PlayerId) -> PlayerHandle {
pub async fn get_or_launch_player(&self, id: &PlayerId) -> PlayerHandle {
let inner = self.0.read().await;
if let Some((handle, _)) = inner.players.get(id) {
handle.clone()
@ -322,7 +322,7 @@ impl PlayerRegistry {
}
#[tracing::instrument(skip(self), name = "PlayerRegistry::connect_to_player")]
pub async fn connect_to_player(&mut self, id: &PlayerId) -> PlayerConnection {
pub async fn connect_to_player(&self, id: &PlayerId) -> PlayerConnection {
let player_handle = self.get_or_launch_player(id).await;
player_handle.subscribe().await
}

View File

@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize};
pub mod rooms;
#[derive(Serialize, Deserialize)]
pub struct ErrorResponse<'a> {
pub code: &'a str,

View File

@ -0,0 +1,24 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct SendMessageReq<'a> {
pub room_id: &'a str,
pub author_id: &'a str,
pub message: &'a str,
}
#[derive(Serialize, Deserialize)]
pub struct SetTopicReq<'a> {
pub room_id: &'a str,
pub author_id: &'a str,
pub topic: &'a str,
}
pub mod paths {
pub const SEND_MESSAGE: &'static str = "/mgmt/rooms/send_message";
pub const SET_TOPIC: &'static str = "/mgmt/rooms/set_topic";
}
pub mod errors {
pub const ROOM_NOT_FOUND: &'static str = "room_not_found";
}

View File

@ -13,10 +13,10 @@ use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use lavina_core::auth::{Authenticator, UpdatePasswordResult};
use lavina_core::player::{PlayerId, PlayerRegistry};
use lavina_core::player::{PlayerId, PlayerRegistry, SendMessageResult};
use lavina_core::prelude::*;
use lavina_core::repo::Storage;
use lavina_core::room::RoomRegistry;
use lavina_core::room::{RoomId, RoomRegistry};
use lavina_core::terminator::Terminator;
use lavina_core::LavinaCore;
@ -88,6 +88,8 @@ async fn route(
(&Method::POST, paths::CREATE_PLAYER) => endpoint_create_player(request, storage).await.or5xx(),
(&Method::POST, paths::STOP_PLAYER) => endpoint_stop_player(request, core.players).await.or5xx(),
(&Method::POST, paths::SET_PASSWORD) => endpoint_set_password(request, storage).await.or5xx(),
(&Method::POST, rooms::paths::SEND_MESSAGE) => endpoint_send_room_message(request, core).await.or5xx(),
(&Method::POST, rooms::paths::SET_TOPIC) => endpoint_set_room_topic(request, core).await.or5xx(),
_ => endpoint_not_found(),
};
Ok(res)
@ -139,9 +141,7 @@ async fn endpoint_stop_player(
let Some(()) = players.stop_player(&player_id).await? else {
return Ok(player_not_found());
};
let mut response = Response::new(Full::<Bytes>::default());
*response.status_mut() = StatusCode::NO_CONTENT;
Ok(response)
Ok(empty_204_request())
}
#[tracing::instrument(skip_all)]
@ -160,9 +160,48 @@ async fn endpoint_set_password(
return Ok(player_not_found());
}
}
let mut response = Response::new(Full::<Bytes>::default());
*response.status_mut() = StatusCode::NO_CONTENT;
Ok(response)
Ok(empty_204_request())
}
async fn endpoint_send_room_message(
request: Request<hyper::body::Incoming>,
mut core: LavinaCore,
) -> Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes();
let Ok(req) = serde_json::from_slice::<rooms::SendMessageReq>(&str[..]) else {
return Ok(malformed_request());
};
let Ok(room_id) = RoomId::from(req.room_id) else {
return Ok(room_not_found());
};
let Ok(player_id) = PlayerId::from(req.author_id) else {
return Ok(player_not_found());
};
let mut player = core.players.connect_to_player(&player_id).await;
let res = player.send_message(room_id, req.message.into()).await?;
match res {
SendMessageResult::NoSuchRoom => Ok(room_not_found()),
SendMessageResult::Success(_) => Ok(empty_204_request()),
}
}
async fn endpoint_set_room_topic(
request: Request<hyper::body::Incoming>,
core: LavinaCore,
) -> Result<Response<Full<Bytes>>> {
let str = request.collect().await?.to_bytes();
let Ok(req) = serde_json::from_slice::<rooms::SetTopicReq>(&str[..]) else {
return Ok(malformed_request());
};
let Ok(room_id) = RoomId::from(req.room_id) else {
return Ok(room_not_found());
};
let Ok(player_id) = PlayerId::from(req.author_id) else {
return Ok(player_not_found());
};
let mut player = core.players.connect_to_player(&player_id).await;
player.change_topic(room_id, req.topic.into()).await?;
Ok(empty_204_request())
}
fn endpoint_not_found() -> Response<Full<Bytes>> {
@ -188,6 +227,17 @@ fn player_not_found() -> Response<Full<Bytes>> {
response
}
fn room_not_found() -> Response<Full<Bytes>> {
let payload = ErrorResponse {
code: rooms::errors::ROOM_NOT_FOUND,
message: "No such room exists",
}
.to_body();
let mut response = Response::new(payload);
*response.status_mut() = StatusCode::UNPROCESSABLE_ENTITY;
response
}
fn malformed_request() -> Response<Full<Bytes>> {
let payload = ErrorResponse {
code: errors::MALFORMED_REQUEST,
@ -200,6 +250,12 @@ fn malformed_request() -> Response<Full<Bytes>> {
return response;
}
fn empty_204_request() -> Response<Full<Bytes>> {
let mut response = Response::new(Full::<Bytes>::default());
*response.status_mut() = StatusCode::NO_CONTENT;
response
}
trait Or5xx {
fn or5xx(self) -> Response<Full<Bytes>>;
}