From 20b461e81c8d6ed9018a4862f4f04ffc9e690211 Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Mon, 13 Feb 2023 00:31:16 +0100 Subject: [PATCH] handle irc join commands --- src/core/player.rs | 42 +++++++++------------------------------- src/core/room.rs | 18 ++++++++--------- src/prelude.rs | 2 ++ src/projections/irc.rs | 16 ++++++++++++++- src/protos/irc/client.rs | 8 ++++++++ src/protos/irc/mod.rs | 3 +-- 6 files changed, 43 insertions(+), 46 deletions(-) diff --git a/src/core/player.rs b/src/core/player.rs index e6b0bda..c9cd050 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -25,8 +25,8 @@ use crate::{ }; /// Opaque player identifier. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct PlayerId(u64); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PlayerId(pub ByteVec); /// Handle to a player actor. #[derive(Clone)] @@ -40,15 +40,6 @@ impl PlayerHandle { rx } - pub async fn create_room(&mut self) { - match self.tx.send(PlayerCommand::CreateRoom).await { - Ok(_) => {} - Err(_) => { - panic!("unexpected err"); - } - }; - } - pub async fn send_message(&mut self, room_id: RoomId, body: String) { self.tx .send(PlayerCommand::SendMessage { room_id, body }) @@ -79,7 +70,6 @@ enum PlayerCommand { AddSocket { sender: Sender, }, - CreateRoom, JoinRoom { room_id: RoomId, }, @@ -106,7 +96,6 @@ impl PlayerRegistry { IntGauge::new("chat_players_active", "Number of alive player actors")?; metrics.register(Box::new(metric_active_players.clone()))?; let inner = PlayerRegistryInner { - next_id: PlayerId(0), room_registry, players: HashMap::new(), metric_active_players, @@ -114,23 +103,20 @@ impl PlayerRegistry { Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) } - pub async fn create_player(&mut self) -> (PlayerId, PlayerHandle) { + pub async fn get_or_create_player(&mut self, id: PlayerId) -> PlayerHandle { let player = Player { sockets: AnonTable::new(), }; let mut inner = self.0.write().unwrap(); - let id = inner.next_id; - inner.next_id.0 += 1; - let (handle, fiber) = player.launch(id, inner.room_registry.clone()); + let (handle, fiber) = player.launch(id.clone(), inner.room_registry.clone()); inner.players.insert(id, (handle.clone(), fiber)); inner.metric_active_players.inc(); - (id, handle) + handle } } /// The player registry state representation. struct PlayerRegistryInner { - next_id: PlayerId, room_registry: RoomRegistry, players: HashMap)>, metric_active_players: IntGauge, @@ -155,25 +141,15 @@ impl Player { PlayerCommand::AddSocket { sender } => { self.sockets.insert(sender); } - PlayerCommand::CreateRoom => { - let (room_id, room_handle) = rooms.create_room(); - } PlayerCommand::JoinRoom { room_id } => { - let room = rooms.get_room(room_id); - match room { - Some(mut room) => { - room.subscribe(player_id, handle.clone()).await; - } - None => { - tracing::info!("no room found"); - } - } + let mut room = rooms.get_or_create_room(room_id); + room.subscribe(player_id.clone(), handle.clone()).await; } PlayerCommand::SendMessage { room_id, body } => { let room = rooms.get_room(room_id); match room { Some(mut room) => { - room.send_message(player_id, body).await; + room.send_message(player_id.clone(), body).await; } None => { tracing::info!("no room found"); @@ -189,7 +165,7 @@ impl Player { for socket in &self.sockets { socket .send(Updates::NewMessage { - room_id, + room_id: room_id.clone(), body: body.clone(), }) .await; diff --git a/src/core/room.rs b/src/core/room.rs index 38a1cb0..46d4929 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -14,8 +14,8 @@ use crate::{ }; /// Opaque room id -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct RoomId(pub u64); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RoomId(pub ByteVec); /// Shared datastructure for storing metadata about rooms. #[derive(Clone)] @@ -26,23 +26,21 @@ impl RoomRegistry { IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; let inner = RoomRegistryInner { - next_room_id: RoomId(0), rooms: HashMap::new(), metric_active_rooms, }; Ok(RoomRegistry(Arc::new(RwLock::new(inner)))) } - pub fn create_room(&mut self) -> (RoomId, RoomHandle) { + pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle { let room = Room { subscriptions: HashMap::new(), }; let mut inner = self.0.write().unwrap(); - let room_id = inner.next_room_id; - inner.next_room_id.0 += 1; - let (room_handle, fiber) = room.launch(room_id); + let (room_handle, fiber) = room.launch(room_id.clone()); inner.rooms.insert(room_id, (room_handle.clone(), fiber)); - (room_id, room_handle) + inner.metric_active_rooms.inc(); + room_handle } pub fn get_room(&self, room_id: RoomId) -> Option { @@ -53,7 +51,6 @@ impl RoomRegistry { } struct RoomRegistryInner { - next_room_id: RoomId, rooms: HashMap)>, metric_active_rooms: IntGauge, } @@ -111,7 +108,8 @@ impl Room { RoomCommand::SendMessage { player_id, body } => { tracing::info!("Adding a message to room"); for (_, sub) in &mut self.subscriptions { - sub.receive_message(room_id, player_id, body.clone()).await; + sub.receive_message(room_id.clone(), player_id.clone(), body.clone()) + .await; } } } diff --git a/src/prelude.rs b/src/prelude.rs index fe8b582..c19614d 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -9,3 +9,5 @@ pub mod log { } pub type Result = std::result::Result; + +pub type ByteVec = Vec; diff --git a/src/projections/irc.rs b/src/projections/irc.rs index 8480d91..cfa7a89 100644 --- a/src/projections/irc.rs +++ b/src/projections/irc.rs @@ -7,10 +7,12 @@ use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot::channel; -use crate::core::player::PlayerRegistry; +use crate::core::player::{PlayerId, PlayerRegistry}; +use crate::core::room::RoomId; use crate::prelude::*; use crate::protos::irc::client::{client_message, ClientMessage}; use crate::protos::irc::server::{ServerMessage, ServerMessageBody}; +use crate::protos::irc::Chan; use crate::util::Terminator; #[derive(Deserialize, Debug, Clone)] @@ -129,6 +131,10 @@ async fn handle_registered_socket<'a>( let mut buffer = vec![]; log::info!("Handling registered user: {user:?}"); + let mut user_handle = players + .get_or_create_player(PlayerId(user.nickname.clone())) + .await; + ServerMessage { tags: vec![], sender: Some(config.server_name.as_bytes().to_vec()), @@ -208,6 +214,14 @@ async fn handle_registered_socket<'a>( }.write_async(&mut writer).await?; writer.flush().await?; }, + ClientMessage::Join(chan) => { + match chan { + Chan::Global(room) => { + user_handle.join_room(RoomId(room.clone())).await; + }, + Chan::Local(_) => {}, + }; + } _ => {}, } }, diff --git a/src/protos/irc/client.rs b/src/protos/irc/client.rs index fc653ab..3e56a12 100644 --- a/src/protos/irc/client.rs +++ b/src/protos/irc/client.rs @@ -23,6 +23,7 @@ pub enum ClientMessage { username: ByteVec, realname: ByteVec, }, + Join(Chan), Quit { reason: ByteVec, }, @@ -35,6 +36,7 @@ pub fn client_message(input: &[u8]) -> IResult<&[u8], ClientMessage> { client_message_pong, client_message_nick, client_message_user, + client_message_join, client_message_quit, ))(input) } @@ -98,6 +100,12 @@ fn client_message_user(input: &[u8]) -> IResult<&[u8], ClientMessage> { }, )) } +fn client_message_join(input: &[u8]) -> IResult<&[u8], ClientMessage> { + let (input, _) = tag("JOIN ")(input)?; + let (input, chan) = chan(input)?; + + Ok((input, ClientMessage::Join(chan))) +} fn client_message_quit(input: &[u8]) -> IResult<&[u8], ClientMessage> { let (input, _) = tag("QUIT :")(input)?; diff --git a/src/protos/irc/mod.rs b/src/protos/irc/mod.rs index 872a36a..06f5434 100644 --- a/src/protos/irc/mod.rs +++ b/src/protos/irc/mod.rs @@ -2,8 +2,6 @@ pub mod client; pub mod server; -use std::io::Write; - use nom::{ branch::alt, bytes::complete::{tag, take, take_while}, @@ -27,6 +25,7 @@ fn token(input: &[u8]) -> IResult<&[u8], &[u8]> { take_while(|i| i != b'\n' && i != b'\r')(input) } +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Chan { /// # — network-global channel, available from any server in the network. Global(ByteVec),