rework commands and updates.

updates from rooms are send only to users other than the initiator.
updates from player are send only to connections other than the one the command was sent from.
This commit is contained in:
Nikita Vilunov 2023-02-14 23:22:04 +01:00
parent 39fed80106
commit 05f8c5e502
4 changed files with 180 additions and 127 deletions

View File

@ -15,12 +15,11 @@ use std::{
use prometheus::{IntGauge, Registry as MetricsRegistry}; use prometheus::{IntGauge, Registry as MetricsRegistry};
use tokio::{ use tokio::{
sync::mpsc::{channel, Receiver, Sender}, sync::mpsc::{channel, Receiver, Sender},
sync::oneshot::{channel as oneshot, Sender as OneshotSender},
task::JoinHandle, task::JoinHandle,
}; };
use crate::{ use crate::{
core::room::{RoomId, RoomInfo, RoomRegistry}, core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry},
prelude::*, prelude::*,
util::table::{AnonTable, Key as AnonKey}, util::table::{AnonTable, Key as AnonKey},
}; };
@ -52,7 +51,7 @@ pub struct PlayerConnection {
player_handle: PlayerHandle, player_handle: PlayerHandle,
} }
impl PlayerConnection { impl PlayerConnection {
pub async fn send_message(&mut self, room_id: RoomId, body: String) { pub async fn send_message(&mut self, room_id: RoomId, body: String) -> Result<()> {
self.player_handle self.player_handle
.send_message(room_id, self.connection_id.clone(), body) .send_message(room_id, self.connection_id.clone(), body)
.await .await
@ -64,6 +63,19 @@ impl PlayerConnection {
.await .await
} }
pub async fn change_topic(&mut self, room_id: RoomId, new_topic: ByteVec) -> Result<()> {
let (promise, deferred) = oneshot();
let cmd = Cmd::ChangeTopic {
room_id,
new_topic,
promise,
};
self.player_handle
.send(PlayerCommand::Cmd(cmd, self.connection_id.clone()))
.await;
Ok(deferred.await?)
}
pub async fn send(&self, command: PlayerCommand) { pub async fn send(&self, command: PlayerCommand) {
self.player_handle.send(command).await; self.player_handle.send(command).await;
} }
@ -89,14 +101,20 @@ impl PlayerHandle {
} }
} }
pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: String) { pub async fn send_message(
self.tx &self,
.send(PlayerCommand::SendMessage { room_id: RoomId,
connection_id: ConnectionId,
body: String,
) -> Result<()> {
let (promise, deferred) = oneshot();
let cmd = Cmd::SendMessage {
room_id, room_id,
connection_id,
body, body,
}) promise,
.await; };
self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await;
Ok(deferred.await?)
} }
pub async fn join_room( pub async fn join_room(
@ -105,17 +123,15 @@ impl PlayerHandle {
connection_id: ConnectionId, connection_id: ConnectionId,
) -> Result<RoomInfo> { ) -> Result<RoomInfo> {
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
self.tx let cmd = Cmd::JoinRoom {
.send(PlayerCommand::JoinRoom {
room_id, room_id,
connection_id, promise: promise,
promise, };
}) self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await;
.await;
Ok(deferred.await?) Ok(deferred.await?)
} }
pub async fn send(&self, command: PlayerCommand) { async fn send(&self, command: PlayerCommand) {
self.tx.send(command).await; self.tx.send(command).await;
} }
@ -128,21 +144,30 @@ pub enum PlayerCommand {
/** Commands from connections */ /** Commands from connections */
AddConnection { AddConnection {
sender: Sender<Updates>, sender: Sender<Updates>,
promise: OneshotSender<ConnectionId>, promise: Promise<ConnectionId>,
}, },
Cmd(Cmd, ConnectionId),
/// Query - responds with a list of rooms the player is a member of.
GetRooms(Promise<Vec<RoomInfo>>),
/** Events from rooms */
Update(Updates),
}
pub enum Cmd {
JoinRoom { JoinRoom {
room_id: RoomId, room_id: RoomId,
connection_id: ConnectionId,
promise: Promise<RoomInfo>, promise: Promise<RoomInfo>,
}, },
SendMessage { SendMessage {
room_id: RoomId, room_id: RoomId,
connection_id: ConnectionId,
body: String, body: String,
promise: Promise<()>,
},
ChangeTopic {
room_id: RoomId,
new_topic: ByteVec,
promise: Promise<()>,
}, },
GetRooms(Promise<Vec<RoomInfo>>),
/** Events from rooms */
Update(Updates),
} }
/// Player update event type which is sent to a player actor and from there to a connection handler. /// Player update event type which is sent to a player actor and from there to a connection handler.
@ -154,14 +179,12 @@ pub enum Updates {
}, },
NewMessage { NewMessage {
room_id: RoomId, room_id: RoomId,
connection_id: ConnectionId,
author_id: PlayerId, author_id: PlayerId,
body: String, body: String,
}, },
RoomJoined { RoomJoined {
room_id: RoomId, room_id: RoomId,
new_member_id: PlayerId, new_member_id: PlayerId,
connection_id: ConnectionId,
}, },
} }
@ -225,7 +248,7 @@ impl Player {
let (tx, mut rx) = channel(32); let (tx, mut rx) = channel(32);
let handle = PlayerHandle { tx }; let handle = PlayerHandle { tx };
let handle_clone = handle.clone(); let handle_clone = handle.clone();
let mut my_rooms = HashMap::new(); let mut my_rooms: HashMap<RoomId, RoomHandle> = HashMap::new();
let fiber = tokio::task::spawn(async move { let fiber = tokio::task::spawn(async move {
while let Some(cmd) = rx.recv().await { while let Some(cmd) = rx.recv().await {
match cmd { match cmd {
@ -233,38 +256,6 @@ impl Player {
let connection_id = self.connections.insert(sender); let connection_id = self.connections.insert(sender);
promise.send(ConnectionId(connection_id)); promise.send(ConnectionId(connection_id));
} }
PlayerCommand::JoinRoom {
room_id,
connection_id,
promise,
} => {
let mut room = rooms.get_or_create_room(room_id.clone());
room.subscribe(player_id.clone(), connection_id, handle.clone())
.await;
my_rooms.insert(room_id.clone(), room.clone());
let members = room.get_members().await;
promise.send(RoomInfo {
id: room_id,
members,
topic: b"some topic lol".to_vec(),
});
}
PlayerCommand::SendMessage {
room_id,
connection_id,
body,
} => {
let room = rooms.get_room(&room_id);
match room {
Some(mut room) => {
room.send_message(player_id.clone(), connection_id, body)
.await;
}
None => {
tracing::info!("no room found");
}
}
}
PlayerCommand::GetRooms(promise) => { PlayerCommand::GetRooms(promise) => {
let mut response = vec![]; let mut response = vec![];
for (_, handle) in &my_rooms { for (_, handle) in &my_rooms {
@ -277,10 +268,77 @@ impl Player {
"Player received an update, broadcasting to {} connections", "Player received an update, broadcasting to {} connections",
self.connections.len() self.connections.len()
); );
for connection in &self.connections { for (_, connection) in &self.connections {
connection.send(update.clone()).await; connection.send(update.clone()).await;
} }
} }
PlayerCommand::Cmd(cmd, connection_id) => match cmd {
Cmd::JoinRoom { room_id, promise } => {
let mut room = rooms.get_or_create_room(room_id.clone());
room.subscribe(player_id.clone(), handle.clone()).await;
my_rooms.insert(room_id.clone(), room.clone());
let members = room.get_members().await;
promise.send(RoomInfo {
id: room_id.clone(),
members,
topic: b"some topic lol".to_vec(),
});
let update = Updates::RoomJoined {
room_id,
new_member_id: player_id.clone(),
};
for (a, b) in &self.connections {
if ConnectionId(a) == connection_id {
continue;
}
b.send(update.clone()).await;
}
}
Cmd::SendMessage {
room_id,
body,
promise,
} => {
let room = rooms.get_room(&room_id);
if let Some(room) = room {
room.send_message(player_id.clone(), body.clone()).await;
} else {
tracing::info!("no room found");
}
promise.send(());
let update = Updates::NewMessage {
room_id,
author_id: player_id.clone(),
body,
};
for (a, b) in &self.connections {
if ConnectionId(a) == connection_id {
continue;
}
b.send(update.clone()).await;
}
}
Cmd::ChangeTopic {
room_id,
new_topic,
promise,
} => {
let room = rooms.get_room(&room_id);
if let Some(mut room) = room {
room.set_topic(player_id.clone(), new_topic.clone()).await;
} else {
tracing::info!("no room found");
}
promise.send(());
let update = Updates::RoomTopicChanged { room_id, new_topic };
for (a, b) in &self.connections {
if ConnectionId(a) == connection_id {
continue;
}
b.send(update.clone()).await;
}
}
},
} }
} }
self self

View File

@ -9,12 +9,10 @@ use prometheus::{IntGauge, Registry as MetricRegistry};
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
use crate::{ use crate::{
core::player::{PlayerHandle, PlayerId}, core::player::{PlayerHandle, PlayerId, Updates},
prelude::*, prelude::*,
}; };
use super::player::{ConnectionId, Updates};
/// Opaque room id /// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RoomId(ByteVec); pub struct RoomId(ByteVec);
@ -82,25 +80,14 @@ struct RoomRegistryInner {
#[derive(Clone)] #[derive(Clone)]
pub struct RoomHandle(Arc<AsyncRwLock<Room>>); pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
impl RoomHandle { impl RoomHandle {
pub async fn subscribe( pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) {
&self,
player_id: PlayerId,
connection_id: ConnectionId,
player_handle: PlayerHandle,
) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
lock.add_subscriber(player_id, connection_id, player_handle) lock.add_subscriber(player_id, player_handle).await;
.await;
} }
pub async fn send_message( pub async fn send_message(&self, player_id: PlayerId, body: String) {
&self,
player_id: PlayerId,
connection_id: ConnectionId,
body: String,
) {
let lock = self.0.read().await; let lock = self.0.read().await;
lock.send_message(player_id, connection_id, body).await; lock.send_message(player_id, body).await;
} }
pub async fn get_members(&self) -> Vec<PlayerId> { pub async fn get_members(&self) -> Vec<PlayerId> {
@ -124,16 +111,14 @@ impl RoomHandle {
} }
} }
pub async fn set_topic(&mut self, new_topic: ByteVec) { pub async fn set_topic(&mut self, changer_id: PlayerId, new_topic: ByteVec) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
lock.topic = new_topic.clone(); lock.topic = new_topic.clone();
for (_, player_handle) in &lock.subscriptions {
let update = Updates::RoomTopicChanged { let update = Updates::RoomTopicChanged {
room_id: lock.room_id.clone(), room_id: lock.room_id.clone(),
new_topic: new_topic.clone(), new_topic: new_topic.clone(),
}; };
player_handle.update(update.clone()).await; lock.broadcast_update(update, &changer_id).await;
}
} }
} }
@ -143,33 +128,31 @@ struct Room {
topic: ByteVec, topic: ByteVec,
} }
impl Room { impl Room {
async fn add_subscriber( async fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) {
&mut self,
player_id: PlayerId,
connection_id: ConnectionId,
player_handle: PlayerHandle,
) {
tracing::info!("Adding a subscriber to room"); tracing::info!("Adding a subscriber to room");
self.subscriptions.insert(player_id.clone(), player_handle); self.subscriptions.insert(player_id.clone(), player_handle);
let update = Updates::RoomJoined { let update = Updates::RoomJoined {
room_id: self.room_id.clone(), room_id: self.room_id.clone(),
new_member_id: player_id.clone(), new_member_id: player_id.clone(),
connection_id: connection_id.clone(),
}; };
for (_, sub) in &self.subscriptions { self.broadcast_update(update, &player_id).await;
sub.update(update.clone()).await;
}
} }
async fn send_message(&self, author_id: PlayerId, connection_id: ConnectionId, body: String) { async fn send_message(&self, author_id: PlayerId, body: String) {
tracing::info!("Adding a message to room"); tracing::info!("Adding a message to room");
let update = Updates::NewMessage { let update = Updates::NewMessage {
room_id: self.room_id.clone(), room_id: self.room_id.clone(),
connection_id, author_id: author_id.clone(),
author_id,
body, body,
}; };
for (_, sub) in &self.subscriptions { self.broadcast_update(update, &author_id).await;
}
async fn broadcast_update(&self, update: Updates, except: &PlayerId) {
for (player_id, sub) in &self.subscriptions {
if player_id == except {
continue;
}
log::info!("Sending a message from room to player"); log::info!("Sending a message from room to player");
sub.update(update.clone()).await; sub.update(update.clone()).await;
} }

View File

@ -232,12 +232,12 @@ async fn handle_registered_socket<'a>(
} else { } else {
len len
}; };
handle_incoming_message(&buffer[0..len], &config, &user, &rooms, &mut connection, writer).await?; handle_incoming_message(&buffer[0..len], &config, &user, &rooms, &mut connection, &player_id, writer).await?;
buffer.clear(); buffer.clear();
}, },
update = connection.receiver.recv() => { update = connection.receiver.recv() => {
match update.unwrap() { match update.unwrap() {
Updates::RoomJoined { new_member_id, connection_id, room_id } => { Updates::RoomJoined { new_member_id, room_id } => {
if player_id == new_member_id { if player_id == new_member_id {
if let Some(room) = rooms.get_room(&room_id) { if let Some(room) = rooms.get_room(&room_id) {
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
@ -256,15 +256,13 @@ async fn handle_registered_socket<'a>(
writer.flush().await? writer.flush().await?
} }
}, },
Updates::NewMessage { author_id, connection_id, room_id, body } => { Updates::NewMessage { author_id, room_id, body } => {
if player_id != author_id || connection.connection_id != connection_id {
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(author_id.as_bytes().clone()), sender: Some(author_id.as_bytes().clone()),
body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_bytes().clone())), body: body.as_bytes().to_vec() } body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_bytes().clone())), body: body.as_bytes().to_vec() }
}.write_async(writer).await?; }.write_async(writer).await?;
writer.flush().await? writer.flush().await?
}
}, },
Updates::RoomTopicChanged { room_id, new_topic } => { Updates::RoomTopicChanged { room_id, new_topic } => {
ServerMessage { ServerMessage {
@ -291,6 +289,7 @@ async fn handle_incoming_message(
user: &RegisteredUser, user: &RegisteredUser,
rooms: &RoomRegistry, rooms: &RoomRegistry,
user_handle: &mut PlayerConnection, user_handle: &mut PlayerConnection,
player_id: &PlayerId,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> { ) -> Result<()> {
let parsed = client_message(buffer); let parsed = client_message(buffer);
@ -316,7 +315,7 @@ async fn handle_incoming_message(
Recipient::Chan(Chan::Global(chan)) => match String::from_utf8(body) { Recipient::Chan(Chan::Global(chan)) => match String::from_utf8(body) {
Ok(body) => { Ok(body) => {
let room_id = RoomId::from_bytes(chan)?; let room_id = RoomId::from_bytes(chan)?;
user_handle.send_message(room_id, body.clone()).await user_handle.send_message(room_id, body.clone()).await?;
} }
Err(err) => log::warn!("failed to parse incoming message: {err}"), Err(err) => log::warn!("failed to parse incoming message: {err}"),
}, },
@ -326,14 +325,25 @@ async fn handle_incoming_message(
match chan { match chan {
Chan::Global(chan) => { Chan::Global(chan) => {
let room_id = RoomId::from_bytes(chan)?; let room_id = RoomId::from_bytes(chan)?;
let room = rooms.get_room(&room_id); user_handle
if let Some(mut room) = room { .change_topic(room_id.clone(), topic.clone())
room.set_topic(topic).await; .await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N332Topic {
client: user.nickname.clone(),
chat: Chan::Global(room_id.as_bytes().clone()),
topic,
},
} }
.write_async(writer)
.await?;
writer.flush().await?;
} }
Chan::Local(_) => {} Chan::Local(_) => {}
}; };
}, }
cmd => { cmd => {
log::warn!("Not implemented handler for client command: {cmd:?}"); log::warn!("Not implemented handler for client command: {cmd:?}");
} }
@ -353,9 +363,11 @@ async fn handle_join(
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> { ) -> Result<()> {
match chan { match chan {
Chan::Global(chan) => { Chan::Global(chan_name) => {
let room_id = RoomId::from_bytes(chan.clone())?; let room_id = RoomId::from_bytes(chan_name.clone())?;
let room_info = user_handle.join_room(room_id).await?; let room_info = user_handle.join_room(room_id).await?;
produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?;
writer.flush().await?;
} }
Chan::Local(_) => {} Chan::Local(_) => {}
}; };

View File

@ -44,15 +44,15 @@ impl<V> AnonTable<V> {
pub struct AnonTableIterator<'a, V>(<&'a HashMap<u32, V> as IntoIterator>::IntoIter); pub struct AnonTableIterator<'a, V>(<&'a HashMap<u32, V> as IntoIterator>::IntoIter);
impl<'a, V> Iterator for AnonTableIterator<'a, V> { impl<'a, V> Iterator for AnonTableIterator<'a, V> {
type Item = &'a V; type Item = (Key, &'a V);
fn next(&mut self) -> Option<&'a V> { fn next(&mut self) -> Option<(Key, &'a V)> {
self.0.next().map(|a| a.1) self.0.next().map(|(k, v)| (Key(*k), v))
} }
} }
impl<'a, V> IntoIterator for &'a AnonTable<V> { impl<'a, V> IntoIterator for &'a AnonTable<V> {
type Item = &'a V; type Item = (Key, &'a V);
type IntoIter = AnonTableIterator<'a, V>; type IntoIter = AnonTableIterator<'a, V>;
@ -63,15 +63,15 @@ impl<'a, V> IntoIterator for &'a AnonTable<V> {
pub struct AnonTableMutIterator<'a, V>(<&'a mut HashMap<u32, V> as IntoIterator>::IntoIter); pub struct AnonTableMutIterator<'a, V>(<&'a mut HashMap<u32, V> as IntoIterator>::IntoIter);
impl<'a, V> Iterator for AnonTableMutIterator<'a, V> { impl<'a, V> Iterator for AnonTableMutIterator<'a, V> {
type Item = &'a mut V; type Item = (Key, &'a mut V);
fn next(&mut self) -> Option<&'a mut V> { fn next(&mut self) -> Option<(Key, &'a mut V)> {
self.0.next().map(|a| a.1) self.0.next().map(|(k, v)| (Key(*k), v))
} }
} }
impl<'a, V> IntoIterator for &'a mut AnonTable<V> { impl<'a, V> IntoIterator for &'a mut AnonTable<V> {
type Item = &'a mut V; type Item = (Key, &'a mut V);
type IntoIter = AnonTableMutIterator<'a, V>; type IntoIter = AnonTableMutIterator<'a, V>;