forked from lavina/lavina
1
0
Fork 0

introduce Updates as a common player and connection event

This commit is contained in:
Nikita Vilunov 2023-02-14 20:42:52 +01:00
parent 265b78dc51
commit 57ea2dd2d7
4 changed files with 63 additions and 117 deletions

View File

@ -31,14 +31,16 @@ pub struct PlayerId(ByteVec);
impl PlayerId { impl PlayerId {
pub fn from_bytes(bytes: ByteVec) -> Result<PlayerId> { pub fn from_bytes(bytes: ByteVec) -> Result<PlayerId> {
if bytes.len() > 32 { if bytes.len() > 32 {
return Err(anyhow::Error::msg("Nickname cannot be longer than 32 symbols")); return Err(fail("Nickname cannot be longer than 32 symbols"));
} }
if bytes.contains(&b' ') { if bytes.contains(&b' ') {
return Err(anyhow::Error::msg("Nickname cannot contain spaces")); return Err(anyhow::Error::msg("Nickname cannot contain spaces"));
} }
Ok(PlayerId(bytes)) Ok(PlayerId(bytes))
} }
pub fn as_bytes(&self) -> &ByteVec { &self.0 } pub fn as_bytes(&self) -> &ByteVec {
&self.0
}
} }
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
@ -113,43 +115,15 @@ impl PlayerHandle {
Ok(deferred.await?) Ok(deferred.await?)
} }
pub async fn receive_message(
&self,
room_id: RoomId,
author: PlayerId,
connection_id: ConnectionId,
body: String,
) {
self.tx
.send(PlayerCommand::IncomingMessage {
room_id,
author,
connection_id,
body,
})
.await;
}
pub async fn send(&self, command: PlayerCommand) { pub async fn send(&self, command: PlayerCommand) {
self.tx.send(command).await; self.tx.send(command).await;
} }
pub async fn update(&self, update: Updates) {
self.send(PlayerCommand::Update(update)).await;
}
} }
/// Player update event type which is sent to a connection handler.
pub enum Updates {
RoomJoined {
player_id: PlayerId,
connection_id: ConnectionId,
room_id: RoomId,
},
NewMessage {
author_id: PlayerId,
connection_id: ConnectionId,
room_id: RoomId,
body: String,
},
IncomingUpdate(IncomingPlayerEvent),
}
pub enum PlayerCommand { pub enum PlayerCommand {
/** Commands from connections */ /** Commands from connections */
AddSocket { AddSocket {
@ -168,23 +142,27 @@ pub enum PlayerCommand {
}, },
GetRooms(Promise<Vec<RoomInfo>>), GetRooms(Promise<Vec<RoomInfo>>),
/** Events from rooms */ /** Events from rooms */
IncomingMessage { Update(Updates),
}
/// Player update event type which is sent to a player actor and from there to a connection handler.
#[derive(Clone)]
pub enum Updates {
RoomTopicChanged {
room_id: RoomId,
new_topic: ByteVec,
},
NewMessage {
room_id: RoomId, room_id: RoomId,
connection_id: ConnectionId, connection_id: ConnectionId,
author: PlayerId, author_id: PlayerId,
body: String, body: String,
}, },
IncomingRoomJoined { RoomJoined {
room_id: RoomId, room_id: RoomId,
new_member_id: PlayerId, new_member_id: PlayerId,
connection_id: ConnectionId, connection_id: ConnectionId,
}, },
Event(IncomingPlayerEvent),
}
#[derive(Clone)]
pub enum IncomingPlayerEvent {
IncomingRoomTopicChanged { room_id: RoomId, new_topic: ByteVec },
} }
/// Handle to a player registry — a shared data structure containing information about players. /// Handle to a player registry — a shared data structure containing information about players.
@ -294,48 +272,13 @@ impl Player {
} }
promise.send(response); promise.send(response);
} }
PlayerCommand::IncomingMessage { PlayerCommand::Update(update) => {
room_id, log::info!(
author, "Player received an update, broadcasting to {} connections",
connection_id, self.sockets.len()
body,
} => {
tracing::info!(
"Handling incoming message, player_id={:?}",
player_id.clone()
); );
for socket in &self.sockets { for socket in &self.sockets {
log::info!("Send message to socket"); socket.send(update.clone()).await;
socket
.send(Updates::NewMessage {
author_id: author.clone(),
connection_id: connection_id.clone(),
room_id: room_id.clone(),
body: body.clone(),
})
.await;
}
}
PlayerCommand::IncomingRoomJoined {
room_id,
new_member_id,
connection_id,
} => {
for socket in &self.sockets {
let room_id = room_id.clone();
let connection_id = connection_id.clone();
socket
.send(Updates::RoomJoined {
player_id: new_member_id.clone(),
connection_id,
room_id,
})
.await;
}
}
PlayerCommand::Event(event) => {
for socket in &self.sockets {
socket.send(Updates::IncomingUpdate(event.clone())).await;
} }
} }
} }

View File

@ -9,11 +9,11 @@ use prometheus::{IntGauge, Registry as MetricRegistry};
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
use crate::{ use crate::{
core::player::{PlayerCommand, PlayerHandle, PlayerId}, core::player::{PlayerHandle, PlayerId},
prelude::*, prelude::*,
}; };
use super::player::{ConnectionId, IncomingPlayerEvent}; use super::player::{ConnectionId, Updates};
/// Opaque room id /// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
@ -21,14 +21,18 @@ pub struct RoomId(ByteVec);
impl RoomId { impl RoomId {
pub fn from_bytes(bytes: ByteVec) -> Result<RoomId> { pub fn from_bytes(bytes: ByteVec) -> Result<RoomId> {
if bytes.len() > 32 { if bytes.len() > 32 {
return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); return Err(anyhow::Error::msg(
"Room name cannot be longer than 32 symbols",
));
} }
if bytes.contains(&b' ') { if bytes.contains(&b' ') {
return Err(anyhow::Error::msg("Room name cannot contain spaces")); return Err(anyhow::Error::msg("Room name cannot contain spaces"));
} }
Ok(RoomId(bytes)) Ok(RoomId(bytes))
} }
pub fn as_bytes(&self) -> &ByteVec { &self.0 } pub fn as_bytes(&self) -> &ByteVec {
&self.0
}
} }
/// Shared datastructure for storing metadata about rooms. /// Shared datastructure for storing metadata about rooms.
@ -123,15 +127,12 @@ impl RoomHandle {
pub async fn set_topic(&mut self, new_topic: ByteVec) { pub async fn set_topic(&mut self, new_topic: ByteVec) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
lock.topic = new_topic.clone(); lock.topic = new_topic.clone();
for (player_id, player_handle) in &lock.subscriptions { for (_, player_handle) in &lock.subscriptions {
let msg = player_handle let update = Updates::RoomTopicChanged {
.send(PlayerCommand::Event( room_id: lock.room_id.clone(),
IncomingPlayerEvent::IncomingRoomTopicChanged { new_topic: new_topic.clone(),
room_id: lock.room_id.clone(), };
new_topic: new_topic.clone(), player_handle.update(update.clone()).await;
},
))
.await;
} }
} }
} }
@ -150,27 +151,27 @@ impl Room {
) { ) {
tracing::info!("Adding a subscriber to room"); tracing::info!("Adding a subscriber to room");
self.subscriptions.insert(player_id.clone(), player_handle); self.subscriptions.insert(player_id.clone(), player_handle);
let update = Updates::RoomJoined {
room_id: self.room_id.clone(),
new_member_id: player_id.clone(),
connection_id: connection_id.clone(),
};
for (_, sub) in &self.subscriptions { for (_, sub) in &self.subscriptions {
sub.send(PlayerCommand::IncomingRoomJoined { sub.update(update.clone()).await;
room_id: self.room_id.clone(),
new_member_id: player_id.clone(),
connection_id: connection_id.clone(),
})
.await;
} }
} }
async fn send_message(&self, player_id: PlayerId, connection_id: ConnectionId, body: String) { async fn send_message(&self, author_id: PlayerId, connection_id: ConnectionId, body: String) {
tracing::info!("Adding a message to room"); tracing::info!("Adding a message to room");
let update = Updates::NewMessage {
room_id: self.room_id.clone(),
connection_id,
author_id,
body,
};
for (_, sub) in &self.subscriptions { for (_, sub) in &self.subscriptions {
log::info!("Sending a message from room to player"); log::info!("Sending a message from room to player");
sub.receive_message( sub.update(update.clone()).await;
self.room_id.clone(),
player_id.clone(),
connection_id.clone(),
body.clone(),
)
.await;
} }
} }
} }

View File

@ -12,3 +12,7 @@ pub mod log {
pub type Result<T> = std::result::Result<T, anyhow::Error>; pub type Result<T> = std::result::Result<T, anyhow::Error>;
pub type ByteVec = Vec<u8>; pub type ByteVec = Vec<u8>;
pub fn fail(msg: &'static str) -> anyhow::Error {
anyhow::Error::msg(msg)
}

View File

@ -7,9 +7,7 @@ use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot::channel; use tokio::sync::oneshot::channel;
use crate::core::player::{ use crate::core::player::{PlayerCommand, PlayerConnection, PlayerId, PlayerRegistry, Updates};
IncomingPlayerEvent, PlayerCommand, PlayerConnection, PlayerId, PlayerRegistry, Updates,
};
use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; use crate::core::room::{RoomId, RoomInfo, RoomRegistry};
use crate::prelude::*; use crate::prelude::*;
use crate::protos::irc::client::{client_message, ClientMessage}; use crate::protos::irc::client::{client_message, ClientMessage};
@ -239,8 +237,8 @@ async fn handle_registered_socket<'a>(
}, },
update = connection.receiver.recv() => { update = connection.receiver.recv() => {
match update.unwrap() { match update.unwrap() {
Updates::RoomJoined { player_id: author_id, connection_id, room_id } => { Updates::RoomJoined { new_member_id, connection_id, room_id } => {
if player_id == author_id { if player_id == new_member_id {
if let Some(room) = rooms.get_room(&room_id) { if let Some(room) = rooms.get_room(&room_id) {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
let chan = Chan::Global(room_id.as_bytes().clone()); let chan = Chan::Global(room_id.as_bytes().clone());
@ -252,7 +250,7 @@ async fn handle_registered_socket<'a>(
} else { } else {
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(author_id.as_bytes().clone()), sender: Some(new_member_id.as_bytes().clone()),
body: ServerMessageBody::Join(Chan::Global(room_id.as_bytes().clone())), body: ServerMessageBody::Join(Chan::Global(room_id.as_bytes().clone())),
}.write_async(writer).await?; }.write_async(writer).await?;
writer.flush().await? writer.flush().await?
@ -268,7 +266,7 @@ async fn handle_registered_socket<'a>(
writer.flush().await? writer.flush().await?
} }
}, },
Updates::IncomingUpdate(IncomingPlayerEvent::IncomingRoomTopicChanged { room_id, new_topic }) => { Updates::RoomTopicChanged { room_id, new_topic } => {
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()), sender: Some(config.server_name.as_bytes().to_vec()),