handle irc join commands

This commit is contained in:
Nikita Vilunov 2023-02-13 00:31:16 +01:00
parent ae27c04b0a
commit 20b461e81c
6 changed files with 43 additions and 46 deletions

View File

@ -25,8 +25,8 @@ use crate::{
};
/// Opaque player identifier.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PlayerId(u64);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PlayerId(pub ByteVec);
/// Handle to a player actor.
#[derive(Clone)]
@ -40,15 +40,6 @@ impl PlayerHandle {
rx
}
pub async fn create_room(&mut self) {
match self.tx.send(PlayerCommand::CreateRoom).await {
Ok(_) => {}
Err(_) => {
panic!("unexpected err");
}
};
}
pub async fn send_message(&mut self, room_id: RoomId, body: String) {
self.tx
.send(PlayerCommand::SendMessage { room_id, body })
@ -79,7 +70,6 @@ enum PlayerCommand {
AddSocket {
sender: Sender<Updates>,
},
CreateRoom,
JoinRoom {
room_id: RoomId,
},
@ -106,7 +96,6 @@ impl PlayerRegistry {
IntGauge::new("chat_players_active", "Number of alive player actors")?;
metrics.register(Box::new(metric_active_players.clone()))?;
let inner = PlayerRegistryInner {
next_id: PlayerId(0),
room_registry,
players: HashMap::new(),
metric_active_players,
@ -114,23 +103,20 @@ impl PlayerRegistry {
Ok(PlayerRegistry(Arc::new(RwLock::new(inner))))
}
pub async fn create_player(&mut self) -> (PlayerId, PlayerHandle) {
pub async fn get_or_create_player(&mut self, id: PlayerId) -> PlayerHandle {
let player = Player {
sockets: AnonTable::new(),
};
let mut inner = self.0.write().unwrap();
let id = inner.next_id;
inner.next_id.0 += 1;
let (handle, fiber) = player.launch(id, inner.room_registry.clone());
let (handle, fiber) = player.launch(id.clone(), inner.room_registry.clone());
inner.players.insert(id, (handle.clone(), fiber));
inner.metric_active_players.inc();
(id, handle)
handle
}
}
/// The player registry state representation.
struct PlayerRegistryInner {
next_id: PlayerId,
room_registry: RoomRegistry,
players: HashMap<PlayerId, (PlayerHandle, JoinHandle<Player>)>,
metric_active_players: IntGauge,
@ -155,25 +141,15 @@ impl Player {
PlayerCommand::AddSocket { sender } => {
self.sockets.insert(sender);
}
PlayerCommand::CreateRoom => {
let (room_id, room_handle) = rooms.create_room();
}
PlayerCommand::JoinRoom { room_id } => {
let room = rooms.get_room(room_id);
match room {
Some(mut room) => {
room.subscribe(player_id, handle.clone()).await;
}
None => {
tracing::info!("no room found");
}
}
let mut room = rooms.get_or_create_room(room_id);
room.subscribe(player_id.clone(), handle.clone()).await;
}
PlayerCommand::SendMessage { room_id, body } => {
let room = rooms.get_room(room_id);
match room {
Some(mut room) => {
room.send_message(player_id, body).await;
room.send_message(player_id.clone(), body).await;
}
None => {
tracing::info!("no room found");
@ -189,7 +165,7 @@ impl Player {
for socket in &self.sockets {
socket
.send(Updates::NewMessage {
room_id,
room_id: room_id.clone(),
body: body.clone(),
})
.await;

View File

@ -14,8 +14,8 @@ use crate::{
};
/// Opaque room id
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RoomId(pub u64);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RoomId(pub ByteVec);
/// Shared datastructure for storing metadata about rooms.
#[derive(Clone)]
@ -26,23 +26,21 @@ impl RoomRegistry {
IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
metrics.register(Box::new(metric_active_rooms.clone()))?;
let inner = RoomRegistryInner {
next_room_id: RoomId(0),
rooms: HashMap::new(),
metric_active_rooms,
};
Ok(RoomRegistry(Arc::new(RwLock::new(inner))))
}
pub fn create_room(&mut self) -> (RoomId, RoomHandle) {
pub fn get_or_create_room(&mut self, room_id: RoomId) -> RoomHandle {
let room = Room {
subscriptions: HashMap::new(),
};
let mut inner = self.0.write().unwrap();
let room_id = inner.next_room_id;
inner.next_room_id.0 += 1;
let (room_handle, fiber) = room.launch(room_id);
let (room_handle, fiber) = room.launch(room_id.clone());
inner.rooms.insert(room_id, (room_handle.clone(), fiber));
(room_id, room_handle)
inner.metric_active_rooms.inc();
room_handle
}
pub fn get_room(&self, room_id: RoomId) -> Option<RoomHandle> {
@ -53,7 +51,6 @@ impl RoomRegistry {
}
struct RoomRegistryInner {
next_room_id: RoomId,
rooms: HashMap<RoomId, (RoomHandle, JoinHandle<Room>)>,
metric_active_rooms: IntGauge,
}
@ -111,7 +108,8 @@ impl Room {
RoomCommand::SendMessage { player_id, body } => {
tracing::info!("Adding a message to room");
for (_, sub) in &mut self.subscriptions {
sub.receive_message(room_id, player_id, body.clone()).await;
sub.receive_message(room_id.clone(), player_id.clone(), body.clone())
.await;
}
}
}

View File

@ -9,3 +9,5 @@ pub mod log {
}
pub type Result<T> = std::result::Result<T, anyhow::Error>;
pub type ByteVec = Vec<u8>;

View File

@ -7,10 +7,12 @@ use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot::channel;
use crate::core::player::PlayerRegistry;
use crate::core::player::{PlayerId, PlayerRegistry};
use crate::core::room::RoomId;
use crate::prelude::*;
use crate::protos::irc::client::{client_message, ClientMessage};
use crate::protos::irc::server::{ServerMessage, ServerMessageBody};
use crate::protos::irc::Chan;
use crate::util::Terminator;
#[derive(Deserialize, Debug, Clone)]
@ -129,6 +131,10 @@ async fn handle_registered_socket<'a>(
let mut buffer = vec![];
log::info!("Handling registered user: {user:?}");
let mut user_handle = players
.get_or_create_player(PlayerId(user.nickname.clone()))
.await;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
@ -208,6 +214,14 @@ async fn handle_registered_socket<'a>(
}.write_async(&mut writer).await?;
writer.flush().await?;
},
ClientMessage::Join(chan) => {
match chan {
Chan::Global(room) => {
user_handle.join_room(RoomId(room.clone())).await;
},
Chan::Local(_) => {},
};
}
_ => {},
}
},

View File

@ -23,6 +23,7 @@ pub enum ClientMessage {
username: ByteVec,
realname: ByteVec,
},
Join(Chan),
Quit {
reason: ByteVec,
},
@ -35,6 +36,7 @@ pub fn client_message(input: &[u8]) -> IResult<&[u8], ClientMessage> {
client_message_pong,
client_message_nick,
client_message_user,
client_message_join,
client_message_quit,
))(input)
}
@ -98,6 +100,12 @@ fn client_message_user(input: &[u8]) -> IResult<&[u8], ClientMessage> {
},
))
}
fn client_message_join(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("JOIN ")(input)?;
let (input, chan) = chan(input)?;
Ok((input, ClientMessage::Join(chan)))
}
fn client_message_quit(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("QUIT :")(input)?;

View File

@ -2,8 +2,6 @@
pub mod client;
pub mod server;
use std::io::Write;
use nom::{
branch::alt,
bytes::complete::{tag, take, take_while},
@ -27,6 +25,7 @@ fn token(input: &[u8]) -> IResult<&[u8], &[u8]> {
take_while(|i| i != b'\n' && i != b'\r')(input)
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Chan {
/// #<name> — network-global channel, available from any server in the network.
Global(ByteVec),