forked from lavina/lavina
1
0
Fork 0

introduce player connection

This commit is contained in:
Nikita Vilunov 2023-02-13 20:16:00 +01:00
parent 6d330c0fcd
commit 7f5fa955ec
2 changed files with 40 additions and 18 deletions

View File

@ -32,20 +32,41 @@ pub struct PlayerId(pub ByteVec);
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConnectionId(pub AnonKey); pub struct ConnectionId(pub AnonKey);
pub struct PlayerConnection {
pub connection_id: ConnectionId,
pub receiver: Receiver<Updates>,
player_handle: PlayerHandle,
}
impl PlayerConnection {
pub async fn send_message(&mut self, room_id: RoomId, body: String) {
self.player_handle
.send_message(room_id, self.connection_id.clone(), body)
.await
}
pub async fn join_room(&mut self, room_id: RoomId) {
self.player_handle.join_room(room_id).await
}
}
/// Handle to a player actor. /// Handle to a player actor.
#[derive(Clone)] #[derive(Clone)]
pub struct PlayerHandle { pub struct PlayerHandle {
tx: Sender<PlayerCommand>, tx: Sender<PlayerCommand>,
} }
impl PlayerHandle { impl PlayerHandle {
pub async fn subscribe(&mut self) -> (ConnectionId, Receiver<Updates>) { pub async fn subscribe(&mut self) -> PlayerConnection {
let (sender, rx) = channel(32); let (sender, receiver) = channel(32);
let (promise, deferred) = oneshot(); let (promise, deferred) = oneshot();
self.tx self.tx
.send(PlayerCommand::AddSocket { sender, promise }) .send(PlayerCommand::AddSocket { sender, promise })
.await; .await;
let connection_id = deferred.await.unwrap(); let connection_id = deferred.await.unwrap();
(connection_id, rx) PlayerConnection {
connection_id,
player_handle: self.clone(),
receiver,
}
} }
pub async fn send_message( pub async fn send_message(
@ -151,6 +172,11 @@ impl PlayerRegistry {
handle handle
} }
} }
pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection {
let mut player_handle = self.get_or_create_player(id).await;
player_handle.subscribe().await
}
} }
/// The player registry state representation. /// The player registry state representation.

View File

@ -7,7 +7,9 @@ use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot::channel; use tokio::sync::oneshot::channel;
use crate::core::player::{ConnectionId, PlayerHandle, PlayerId, PlayerRegistry, Updates}; use crate::core::player::{
ConnectionId, PlayerConnection, PlayerHandle, PlayerId, PlayerRegistry, Updates,
};
use crate::core::room::RoomId; use crate::core::room::RoomId;
use crate::prelude::*; use crate::prelude::*;
use crate::protos::irc::client::{client_message, ClientMessage}; use crate::protos::irc::client::{client_message, ClientMessage};
@ -137,10 +139,9 @@ async fn handle_registered_socket<'a>(
let mut buffer = vec![]; let mut buffer = vec![];
log::info!("Handling registered user: {user:?}"); log::info!("Handling registered user: {user:?}");
let mut user_handle = players let mut connection = players
.get_or_create_player(PlayerId(user.nickname.clone())) .connect_to_player(PlayerId(user.nickname.clone()))
.await; .await;
let (my_connection_id, mut connnection) = user_handle.subscribe().await;
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
@ -206,14 +207,14 @@ async fn handle_registered_socket<'a>(
} else { } else {
len len
}; };
handle_incoming_message(&buffer[0..len], &config, &user, &mut user_handle, &my_connection_id, writer).await?; handle_incoming_message(&buffer[0..len], &config, &user, &mut connection, writer).await?;
buffer.clear(); buffer.clear();
}, },
update = connnection.recv() => { update = connection.receiver.recv() => {
match update.unwrap() { match update.unwrap() {
Updates::RoomJoined { room_id } => {}, Updates::RoomJoined { room_id } => {},
Updates::NewMessage { author_id, connection_id, room_id, body } => { Updates::NewMessage { author_id, connection_id, room_id, body } => {
if my_connection_id != connection_id { if connection.connection_id != connection_id {
ServerMessage { ServerMessage {
tags: vec![], tags: vec![],
sender: Some(author_id.0.clone()), sender: Some(author_id.0.clone()),
@ -233,8 +234,7 @@ async fn handle_incoming_message(
buffer: &[u8], buffer: &[u8],
config: &ServerConfig, config: &ServerConfig,
user: &RegisteredUser, user: &RegisteredUser,
user_handle: &mut PlayerHandle, user_handle: &mut PlayerConnection,
my_connection_id: &ConnectionId,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> { ) -> Result<()> {
let parsed = client_message(buffer); let parsed = client_message(buffer);
@ -260,11 +260,7 @@ async fn handle_incoming_message(
Recipient::Chan(Chan::Global(room)) => match String::from_utf8(body) { Recipient::Chan(Chan::Global(room)) => match String::from_utf8(body) {
Ok(body) => { Ok(body) => {
user_handle user_handle
.send_message( .send_message(RoomId(room.clone()), body.clone())
RoomId(room.clone()),
my_connection_id.clone(),
body.clone(),
)
.await .await
} }
Err(err) => log::warn!("failed to parse incoming message: {err}"), Err(err) => log::warn!("failed to parse incoming message: {err}"),
@ -283,7 +279,7 @@ async fn handle_incoming_message(
async fn handle_join( async fn handle_join(
config: &ServerConfig, config: &ServerConfig,
user: &RegisteredUser, user: &RegisteredUser,
user_handle: &mut PlayerHandle, user_handle: &mut PlayerConnection,
chan: &Chan, chan: &Chan,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> { ) -> Result<()> {