From 7f5fa955ec8434c992348cdfafd18cf4a6842df5 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Mon, 13 Feb 2023 20:16:00 +0100 Subject: [PATCH] introduce player connection --- src/core/player.rs | 32 +++++++++++++++++++++++++++++--- src/projections/irc.rs | 26 +++++++++++--------------- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/src/core/player.rs b/src/core/player.rs index b7ba5ba..3ba8795 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -32,20 +32,41 @@ pub struct PlayerId(pub ByteVec); #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionId(pub AnonKey); +pub struct PlayerConnection { + pub connection_id: ConnectionId, + pub receiver: Receiver, + player_handle: PlayerHandle, +} +impl PlayerConnection { + pub async fn send_message(&mut self, room_id: RoomId, body: String) { + self.player_handle + .send_message(room_id, self.connection_id.clone(), body) + .await + } + + pub async fn join_room(&mut self, room_id: RoomId) { + self.player_handle.join_room(room_id).await + } +} + /// Handle to a player actor. #[derive(Clone)] pub struct PlayerHandle { tx: Sender, } impl PlayerHandle { - pub async fn subscribe(&mut self) -> (ConnectionId, Receiver) { - let (sender, rx) = channel(32); + pub async fn subscribe(&mut self) -> PlayerConnection { + let (sender, receiver) = channel(32); let (promise, deferred) = oneshot(); self.tx .send(PlayerCommand::AddSocket { sender, promise }) .await; let connection_id = deferred.await.unwrap(); - (connection_id, rx) + PlayerConnection { + connection_id, + player_handle: self.clone(), + receiver, + } } pub async fn send_message( @@ -151,6 +172,11 @@ impl PlayerRegistry { handle } } + + pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection { + let mut player_handle = self.get_or_create_player(id).await; + player_handle.subscribe().await + } } /// The player registry state representation. diff --git a/src/projections/irc.rs b/src/projections/irc.rs index 465a516..78688b2 100644 --- a/src/projections/irc.rs +++ b/src/projections/irc.rs @@ -7,7 +7,9 @@ use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot::channel; -use crate::core::player::{ConnectionId, PlayerHandle, PlayerId, PlayerRegistry, Updates}; +use crate::core::player::{ + ConnectionId, PlayerConnection, PlayerHandle, PlayerId, PlayerRegistry, Updates, +}; use crate::core::room::RoomId; use crate::prelude::*; use crate::protos::irc::client::{client_message, ClientMessage}; @@ -137,10 +139,9 @@ async fn handle_registered_socket<'a>( let mut buffer = vec![]; log::info!("Handling registered user: {user:?}"); - let mut user_handle = players - .get_or_create_player(PlayerId(user.nickname.clone())) + let mut connection = players + .connect_to_player(PlayerId(user.nickname.clone())) .await; - let (my_connection_id, mut connnection) = user_handle.subscribe().await; ServerMessage { tags: vec![], @@ -206,14 +207,14 @@ async fn handle_registered_socket<'a>( } else { len }; - handle_incoming_message(&buffer[0..len], &config, &user, &mut user_handle, &my_connection_id, writer).await?; + handle_incoming_message(&buffer[0..len], &config, &user, &mut connection, writer).await?; buffer.clear(); }, - update = connnection.recv() => { + update = connection.receiver.recv() => { match update.unwrap() { Updates::RoomJoined { room_id } => {}, Updates::NewMessage { author_id, connection_id, room_id, body } => { - if my_connection_id != connection_id { + if connection.connection_id != connection_id { ServerMessage { tags: vec![], sender: Some(author_id.0.clone()), @@ -233,8 +234,7 @@ async fn handle_incoming_message( buffer: &[u8], config: &ServerConfig, user: &RegisteredUser, - user_handle: &mut PlayerHandle, - my_connection_id: &ConnectionId, + user_handle: &mut PlayerConnection, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { let parsed = client_message(buffer); @@ -260,11 +260,7 @@ async fn handle_incoming_message( Recipient::Chan(Chan::Global(room)) => match String::from_utf8(body) { Ok(body) => { user_handle - .send_message( - RoomId(room.clone()), - my_connection_id.clone(), - body.clone(), - ) + .send_message(RoomId(room.clone()), body.clone()) .await } Err(err) => log::warn!("failed to parse incoming message: {err}"), @@ -283,7 +279,7 @@ async fn handle_incoming_message( async fn handle_join( config: &ServerConfig, user: &RegisteredUser, - user_handle: &mut PlayerHandle, + user_handle: &mut PlayerConnection, chan: &Chan, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> {