implemen sending messages to a chan

This commit is contained in:
Nikita Vilunov 2023-02-13 19:32:52 +01:00
parent 89f85b4fee
commit d661f68fb6
5 changed files with 204 additions and 37 deletions

View File

@ -15,34 +15,51 @@ use std::{
use prometheus::{IntGauge, Registry as MetricsRegistry};
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
sync::oneshot::{channel as oneshot, Sender as OneshotSender},
task::JoinHandle,
};
use crate::{
core::room::{RoomId, RoomRegistry},
prelude::*,
util::table::AnonTable,
util::table::{AnonTable, Key as AnonKey},
};
/// Opaque player identifier.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PlayerId(pub ByteVec);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConnectionId(pub AnonKey);
/// Handle to a player actor.
#[derive(Clone)]
pub struct PlayerHandle {
tx: Sender<PlayerCommand>,
}
impl PlayerHandle {
pub async fn subscribe(&mut self) -> Receiver<Updates> {
pub async fn subscribe(&mut self) -> (ConnectionId, Receiver<Updates>) {
let (sender, rx) = channel(32);
self.tx.send(PlayerCommand::AddSocket { sender }).await;
rx
let (promise, deferred) = oneshot();
self.tx
.send(PlayerCommand::AddSocket { sender, promise })
.await;
let connection_id = deferred.await.unwrap();
(connection_id, rx)
}
pub async fn send_message(&mut self, room_id: RoomId, body: String) {
pub async fn send_message(
&mut self,
room_id: RoomId,
connection_id: ConnectionId,
body: String,
) {
self.tx
.send(PlayerCommand::SendMessage { room_id, body })
.send(PlayerCommand::SendMessage {
room_id,
connection_id,
body,
})
.await;
}
@ -50,11 +67,18 @@ impl PlayerHandle {
self.tx.send(PlayerCommand::JoinRoom { room_id }).await;
}
pub async fn receive_message(&mut self, room_id: RoomId, author: PlayerId, body: String) {
pub async fn receive_message(
&mut self,
room_id: RoomId,
author: PlayerId,
connection_id: ConnectionId,
body: String,
) {
self.tx
.send(PlayerCommand::IncomingMessage {
room_id,
author,
connection_id,
body,
})
.await;
@ -63,22 +87,32 @@ impl PlayerHandle {
/// Player update event type which is sent to a connection handler.
pub enum Updates {
RoomJoined { room_id: RoomId },
NewMessage { room_id: RoomId, body: String },
RoomJoined {
room_id: RoomId,
},
NewMessage {
author_id: PlayerId,
connection_id: ConnectionId,
room_id: RoomId,
body: String,
},
}
enum PlayerCommand {
AddSocket {
sender: Sender<Updates>,
promise: OneshotSender<ConnectionId>,
},
JoinRoom {
room_id: RoomId,
},
SendMessage {
room_id: RoomId,
connection_id: ConnectionId,
body: String,
},
IncomingMessage {
room_id: RoomId,
connection_id: ConnectionId,
author: PlayerId,
body: String,
},
@ -108,11 +142,15 @@ impl PlayerRegistry {
sockets: AnonTable::new(),
};
let mut inner = self.0.write().unwrap();
if let Some((handle, _)) = inner.players.get(&id) {
handle.clone()
} else {
let (handle, fiber) = player.launch(id.clone(), inner.room_registry.clone());
inner.players.insert(id, (handle.clone(), fiber));
inner.metric_active_players.inc();
handle
}
}
}
/// The player registry state representation.
@ -138,18 +176,24 @@ impl Player {
let fiber = tokio::task::spawn(async move {
while let Some(cmd) = rx.recv().await {
match cmd {
PlayerCommand::AddSocket { sender } => {
self.sockets.insert(sender);
PlayerCommand::AddSocket { sender, promise } => {
let connection_id = self.sockets.insert(sender);
promise.send(ConnectionId(connection_id));
}
PlayerCommand::JoinRoom { room_id } => {
let mut room = rooms.get_or_create_room(room_id);
room.subscribe(player_id.clone(), handle.clone()).await;
}
PlayerCommand::SendMessage { room_id, body } => {
PlayerCommand::SendMessage {
room_id,
connection_id,
body,
} => {
let room = rooms.get_room(room_id);
match room {
Some(mut room) => {
room.send_message(player_id.clone(), body).await;
room.send_message(player_id.clone(), connection_id, body)
.await;
}
None => {
tracing::info!("no room found");
@ -159,12 +203,16 @@ impl Player {
PlayerCommand::IncomingMessage {
room_id,
author,
connection_id,
body,
} => {
tracing::info!("Handling incoming message");
for socket in &self.sockets {
log::info!("Send message to socket");
socket
.send(Updates::NewMessage {
author_id: author.clone(),
connection_id: connection_id.clone(),
room_id: room_id.clone(),
body: body.clone(),
})

View File

@ -13,6 +13,8 @@ use crate::{
prelude::*,
};
use super::player::ConnectionId;
/// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RoomId(pub ByteVec);
@ -37,11 +39,15 @@ impl RoomRegistry {
subscriptions: HashMap::new(),
};
let mut inner = self.0.write().unwrap();
if let Some((room_handle, _)) = inner.rooms.get(&room_id) {
room_handle.clone()
} else {
let (room_handle, fiber) = room.launch(room_id.clone());
inner.rooms.insert(room_id, (room_handle.clone(), fiber));
inner.metric_active_rooms.inc();
room_handle
}
}
pub fn get_room(&self, room_id: RoomId) -> Option<RoomHandle> {
let inner = self.0.read().unwrap();
@ -73,9 +79,18 @@ impl RoomHandle {
};
}
pub async fn send_message(&mut self, player_id: PlayerId, body: String) {
pub async fn send_message(
&mut self,
player_id: PlayerId,
connection_id: ConnectionId,
body: String,
) {
self.tx
.send(RoomCommand::SendMessage { player_id, body })
.send(RoomCommand::SendMessage {
player_id,
connection_id,
body,
})
.await;
}
}
@ -87,6 +102,7 @@ enum RoomCommand {
},
SendMessage {
player_id: PlayerId,
connection_id: ConnectionId,
body: String,
},
}
@ -105,10 +121,20 @@ impl Room {
tracing::info!("Adding a subscriber to room");
self.subscriptions.insert(player_id, player);
}
RoomCommand::SendMessage { player_id, body } => {
RoomCommand::SendMessage {
player_id,
connection_id,
body,
} => {
tracing::info!("Adding a message to room");
for (_, sub) in &mut self.subscriptions {
sub.receive_message(room_id.clone(), player_id.clone(), body.clone())
log::info!("Sending a message from room to player");
sub.receive_message(
room_id.clone(),
player_id.clone(),
connection_id.clone(),
body.clone(),
)
.await;
}
}

View File

@ -134,7 +134,7 @@ async fn handle_registered_socket<'a>(
let mut user_handle = players
.get_or_create_player(PlayerId(user.nickname.clone()))
.await;
let mut connnection = user_handle.subscribe().await;
let (my_connection_id, mut connnection) = user_handle.subscribe().await;
ServerMessage {
tags: vec![],
@ -217,8 +217,49 @@ async fn handle_registered_socket<'a>(
},
ClientMessage::Join(chan) => {
match chan {
Chan::Global(room) => {
Chan::Global(ref room) => {
user_handle.join_room(RoomId(room.clone())).await;
ServerMessage {
tags: vec![],
sender: Some(user.nickname.clone()),
body: ServerMessageBody::Join(chan.clone()),
}
.write_async(&mut writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N332Topic {
client: user.nickname.clone(),
chat: chan.clone(),
topic: b"chan topic lol".to_vec(),
},
}
.write_async(&mut writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N353NamesReply {
client: user.nickname.clone(),
chan: chan.clone(),
members: user.nickname.clone(),
},
}
.write_async(&mut writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N366NamesReplyEnd {
client: user.nickname.clone(),
chan: chan.clone(),
},
}
.write_async(&mut writer)
.await?;
writer.flush().await?;
},
Chan::Local(_) => {},
};
@ -227,7 +268,7 @@ async fn handle_registered_socket<'a>(
match recipient {
Recipient::Chan(Chan::Global(room)) => {
match String::from_utf8(body) {
Ok(body) => user_handle.send_message(RoomId(room.clone()), body.clone()).await,
Ok(body) => user_handle.send_message(RoomId(room.clone()), my_connection_id.clone(), body.clone()).await,
Err(err) => log::warn!("failed to parse incoming message: {err}"),
}
},
@ -246,13 +287,15 @@ async fn handle_registered_socket<'a>(
update = connnection.recv() => {
match update.unwrap() {
Updates::RoomJoined { room_id } => {},
Updates::NewMessage { room_id, body } => {
Updates::NewMessage { author_id, connection_id, room_id, body } => {
if my_connection_id != connection_id {
ServerMessage {
tags: vec![],
sender: None,
sender: Some(author_id.0.clone()),
body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.0)), body: body.as_bytes().to_vec() }
}.write_async(&mut writer).await?;
writer.flush().await?
}
},
}
}

View File

@ -59,6 +59,7 @@ pub enum ServerMessageBody {
target: Recipient,
body: ByteVec,
},
Join(Chan),
N001Welcome {
client: ByteVec,
text: ByteVec,
@ -81,6 +82,20 @@ pub enum ServerMessageBody {
client: ByteVec,
params: ByteVec, // TODO make this a datatype
},
N332Topic {
client: ByteVec,
chat: Chan,
topic: ByteVec,
},
N353NamesReply {
client: ByteVec,
chan: Chan,
members: ByteVec, // TODO make this a non-empty list with prefixes
},
N366NamesReplyEnd {
client: ByteVec,
chan: Chan,
},
}
impl ServerMessageBody {
@ -112,6 +127,10 @@ impl ServerMessageBody {
writer.write_all(b" :").await?;
writer.write_all(&body).await?;
}
ServerMessageBody::Join(chan) => {
writer.write_all(b"JOIN ").await?;
chan.write_async(writer).await?;
}
ServerMessageBody::N001Welcome { client, text } => {
writer.write_all(b"001 ").await?;
writer.write_all(&client).await?;
@ -153,6 +172,37 @@ impl ServerMessageBody {
writer.write_all(&params).await?;
writer.write_all(b" :are supported by this server").await?;
}
ServerMessageBody::N332Topic {
client,
chat,
topic,
} => {
writer.write_all(b"332 ").await?;
writer.write_all(&client).await?;
writer.write_all(b" ").await?;
chat.write_async(writer).await?;
writer.write_all(b" :").await?;
writer.write_all(&topic).await?;
}
ServerMessageBody::N353NamesReply {
client,
chan,
members,
} => {
writer.write_all(b"353 ").await?;
writer.write_all(&client).await?;
writer.write_all(b" = ").await?;
chan.write_async(writer).await?;
writer.write_all(b" :").await?;
writer.write_all(&members).await?;
}
ServerMessageBody::N366NamesReplyEnd { client, chan } => {
writer.write_all(b"366 ").await?;
writer.write_all(&client).await?;
writer.write_all(b" ").await?;
chan.write_async(writer).await?;
writer.write_all(b" :End of /NAMES list").await?;
}
}
Ok(())
}

View File

@ -1,6 +1,6 @@
use std::collections::HashMap;
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
#[derive(PartialEq, Eq, Debug, Clone, Copy, Hash)]
pub struct Key(u32);
/// Hash map with auto-generated surrogate key.