produce join messages on joins from other connections

This commit is contained in:
Nikita Vilunov 2023-02-14 18:53:43 +01:00
parent 7d6ae661c4
commit a8d6a98a5b
7 changed files with 318 additions and 96 deletions

View File

@ -20,7 +20,7 @@ use tokio::{
};
use crate::{
core::room::{RoomId, RoomRegistry, RoomInfo},
core::room::{RoomId, RoomInfo, RoomRegistry},
prelude::*,
util::table::{AnonTable, Key as AnonKey},
};
@ -45,7 +45,9 @@ impl PlayerConnection {
}
pub async fn join_room(&mut self, room_id: RoomId) -> Result<RoomInfo> {
self.player_handle.join_room(room_id).await
self.player_handle
.join_room(room_id, self.connection_id.clone())
.await
}
}
@ -69,12 +71,7 @@ impl PlayerHandle {
}
}
pub async fn send_message(
&self,
room_id: RoomId,
connection_id: ConnectionId,
body: String,
) {
pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: String) {
self.tx
.send(PlayerCommand::SendMessage {
room_id,
@ -84,9 +81,19 @@ impl PlayerHandle {
.await;
}
pub async fn join_room(&self, room_id: RoomId) -> Result<RoomInfo> {
pub async fn join_room(
&self,
room_id: RoomId,
connection_id: ConnectionId,
) -> Result<RoomInfo> {
let (promise, deferred) = oneshot();
self.tx.send(PlayerCommand::JoinRoom { room_id, promise }).await;
self.tx
.send(PlayerCommand::JoinRoom {
room_id,
connection_id,
promise,
})
.await;
Ok(deferred.await?)
}
@ -106,11 +113,17 @@ impl PlayerHandle {
})
.await;
}
pub async fn send(&self, command: PlayerCommand) {
self.tx.send(command).await;
}
}
/// Player update event type which is sent to a connection handler.
pub enum Updates {
RoomJoined {
player_id: PlayerId,
connection_id: ConnectionId,
room_id: RoomId,
},
NewMessage {
@ -120,13 +133,15 @@ pub enum Updates {
body: String,
},
}
enum PlayerCommand {
pub enum PlayerCommand {
/** Commands from connections */
AddSocket {
sender: Sender<Updates>,
promise: OneshotSender<ConnectionId>,
},
JoinRoom {
room_id: RoomId,
connection_id: ConnectionId,
promise: Promise<RoomInfo>,
},
SendMessage {
@ -134,12 +149,18 @@ enum PlayerCommand {
connection_id: ConnectionId,
body: String,
},
/** Events from rooms */
IncomingMessage {
room_id: RoomId,
connection_id: ConnectionId,
author: PlayerId,
body: String,
},
IncomingRoomJoined {
room_id: RoomId,
new_member_id: PlayerId,
connection_id: ConnectionId,
},
}
/// Handle to a player registry — a shared data structure containing information about players.
@ -209,18 +230,27 @@ impl Player {
let connection_id = self.sockets.insert(sender);
promise.send(ConnectionId(connection_id));
}
PlayerCommand::JoinRoom { room_id, promise } => {
PlayerCommand::JoinRoom {
room_id,
connection_id,
promise,
} => {
let mut room = rooms.get_or_create_room(room_id.clone());
room.subscribe(player_id.clone(), handle.clone()).await;
room.subscribe(player_id.clone(), connection_id, handle.clone())
.await;
let members = room.get_members().await;
promise.send(RoomInfo { id: room_id, members, topic: b"some topic lol".to_vec() });
promise.send(RoomInfo {
id: room_id,
members,
topic: b"some topic lol".to_vec(),
});
}
PlayerCommand::SendMessage {
room_id,
connection_id,
body,
} => {
let room = rooms.get_room(room_id);
let room = rooms.get_room(&room_id);
match room {
Some(mut room) => {
room.send_message(player_id.clone(), connection_id, body)
@ -253,6 +283,23 @@ impl Player {
.await;
}
}
PlayerCommand::IncomingRoomJoined {
room_id,
new_member_id,
connection_id,
} => {
for socket in &self.sockets {
let room_id = room_id.clone();
let connection_id = connection_id.clone();
socket
.send(Updates::RoomJoined {
player_id: new_member_id.clone(),
connection_id,
room_id,
})
.await;
}
}
}
}
self

View File

@ -9,7 +9,7 @@ use prometheus::{IntGauge, Registry as MetricRegistry};
use tokio::sync::RwLock as AsyncRwLock;
use crate::{
core::player::{PlayerHandle, PlayerId},
core::player::{PlayerCommand, PlayerHandle, PlayerId},
prelude::*,
};
@ -50,9 +50,9 @@ impl RoomRegistry {
}
}
pub fn get_room(&self, room_id: RoomId) -> Option<RoomHandle> {
pub fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
let inner = self.0.read().unwrap();
let res = inner.rooms.get(&room_id);
let res = inner.rooms.get(room_id);
res.map(|r| r.clone())
}
}
@ -65,9 +65,15 @@ struct RoomRegistryInner {
#[derive(Clone)]
pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
impl RoomHandle {
pub async fn subscribe(&mut self, player_id: PlayerId, player_handle: PlayerHandle) {
pub async fn subscribe(
&self,
player_id: PlayerId,
connection_id: ConnectionId,
player_handle: PlayerHandle,
) {
let mut lock = self.0.write().await;
lock.add_subscriber(player_id, player_handle);
lock.add_subscriber(player_id, connection_id, player_handle)
.await;
}
pub async fn send_message(
@ -82,7 +88,23 @@ impl RoomHandle {
pub async fn get_members(&self) -> Vec<PlayerId> {
let lock = self.0.read().await;
lock.subscriptions.keys().map(|x| x.clone()).collect::<Vec<_>>()
lock.subscriptions
.keys()
.map(|x| x.clone())
.collect::<Vec<_>>()
}
pub async fn get_room_info(&self) -> RoomInfo {
let lock = self.0.read().await;
RoomInfo {
id: lock.room_id.clone(),
members: lock
.subscriptions
.keys()
.map(|x| x.clone())
.collect::<Vec<_>>(),
topic: b"some topic lol".to_vec(),
}
}
}
@ -91,9 +113,22 @@ struct Room {
subscriptions: HashMap<PlayerId, PlayerHandle>,
}
impl Room {
fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) {
async fn add_subscriber(
&mut self,
player_id: PlayerId,
connection_id: ConnectionId,
player_handle: PlayerHandle,
) {
tracing::info!("Adding a subscriber to room");
self.subscriptions.insert(player_id, player_handle);
self.subscriptions.insert(player_id.clone(), player_handle);
for (_, sub) in &self.subscriptions {
sub.send(PlayerCommand::IncomingRoomJoined {
room_id: self.room_id.clone(),
new_member_id: player_id.clone(),
connection_id: connection_id.clone(),
})
.await;
}
}
async fn send_message(&self, player_id: PlayerId, connection_id: ConnectionId, body: String) {

View File

@ -43,7 +43,7 @@ async fn main() -> Result<()> {
let rooms = RoomRegistry::empty(&mut metrics)?;
let players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?;
let telemetry_terminator = util::telemetry::launch(telemetry_config, metrics.clone()).await?;
let irc = projections::irc::launch(irc_config, players, metrics.clone()).await?;
let irc = projections::irc::launch(irc_config, players, rooms.clone(), metrics.clone()).await?;
tracing::info!("Started");
sleep.await;

View File

@ -2,8 +2,8 @@ pub use std::future::Future;
pub use tokio::pin;
pub use tokio::select;
pub use tokio::task::JoinHandle;
pub use tokio::sync::oneshot::{channel as oneshot, Receiver as Deferred, Sender as Promise};
pub use tokio::task::JoinHandle;
pub mod log {
pub use tracing::{debug, error, info, warn};

View File

@ -10,7 +10,7 @@ use tokio::sync::oneshot::channel;
use crate::core::player::{
ConnectionId, PlayerConnection, PlayerHandle, PlayerId, PlayerRegistry, Updates,
};
use crate::core::room::RoomId;
use crate::core::room::{RoomId, RoomInfo, RoomRegistry};
use crate::prelude::*;
use crate::protos::irc::client::{client_message, ClientMessage};
use crate::protos::irc::server::{ServerMessage, ServerMessageBody};
@ -38,6 +38,7 @@ async fn handle_socket(
mut stream: TcpStream,
socket_addr: SocketAddr,
mut players: PlayerRegistry,
rooms: RoomRegistry,
) -> Result<()> {
let (reader, writer) = stream.split();
let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
@ -60,8 +61,16 @@ async fn handle_socket(
handle_registration(&mut reader, &mut writer).await;
match registered_user {
Ok(user) => {
handle_registered_socket(config, socket_addr, players, &mut reader, &mut writer, user)
.await?;
handle_registered_socket(
config,
socket_addr,
players,
rooms,
&mut reader,
&mut writer,
user,
)
.await?;
}
Err(_) => {}
}
@ -135,6 +144,7 @@ async fn handle_registered_socket<'a>(
config: ServerConfig,
socket_addr: SocketAddr,
mut players: PlayerRegistry,
rooms: RoomRegistry,
reader: &mut BufReader<ReadHalf<'a>>,
writer: &mut BufWriter<WriteHalf<'a>>,
user: RegisteredUser,
@ -214,7 +224,25 @@ async fn handle_registered_socket<'a>(
},
update = connection.receiver.recv() => {
match update.unwrap() {
Updates::RoomJoined { room_id } => {},
Updates::RoomJoined { player_id: author_id, connection_id, room_id } => {
if player_id == author_id {
if let Some(room) = rooms.get_room(&room_id) {
let room_info = room.get_room_info().await;
let chan = Chan::Global(room_id.0);
produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?;
writer.flush().await?;
} else {
log::warn!("Received join to a non-existant room");
}
} else {
ServerMessage {
tags: vec![],
sender: Some(author_id.0.clone()),
body: ServerMessageBody::Join(Chan::Global(room_id.0)),
}.write_async(writer).await?;
writer.flush().await?
}
},
Updates::NewMessage { author_id, connection_id, room_id, body } => {
if player_id != author_id || connection.connection_id != connection_id {
ServerMessage {
@ -288,65 +316,74 @@ async fn handle_join(
match chan {
Chan::Global(ref room) => {
let room_info = user_handle.join_room(RoomId(room.clone())).await?;
ServerMessage {
tags: vec![],
sender: Some(user.nickname.clone()),
body: ServerMessageBody::Join(chan.clone()),
}
.write_async(writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N332Topic {
client: user.nickname.clone(),
chat: chan.clone(),
topic: room_info.topic,
},
}
.write_async(writer)
.await?;
let mut members = if let Some(head) = room_info.members.first() {
head.0.clone()
} else {
user.nickname.clone()
};
for i in &room_info.members[1..] {
members.push(b' ');
members.extend(&i.0);
}
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N353NamesReply {
client: user.nickname.clone(),
chan: chan.clone(),
members,
},
}
.write_async(writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N366NamesReplyEnd {
client: user.nickname.clone(),
chan: chan.clone(),
},
}
.write_async(writer)
.await?;
writer.flush().await?;
}
Chan::Local(_) => {}
};
Ok(())
}
async fn produce_on_join_cmd_messages(
config: &ServerConfig,
user: &RegisteredUser,
chan: &Chan,
room_info: &RoomInfo,
writer: &mut (impl AsyncWrite + Unpin),
) -> Result<()> {
ServerMessage {
tags: vec![],
sender: Some(user.nickname.clone()),
body: ServerMessageBody::Join(chan.clone()),
}
.write_async(writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N332Topic {
client: user.nickname.clone(),
chat: chan.clone(),
topic: room_info.topic.clone(),
},
}
.write_async(writer)
.await?;
let mut members = if let Some(head) = room_info.members.first() {
head.0.clone()
} else {
user.nickname.clone()
};
for i in &room_info.members[1..] {
members.push(b' ');
members.extend(&i.0);
}
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N353NamesReply {
client: user.nickname.clone(),
chan: chan.clone(),
members,
},
}
.write_async(writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N366NamesReplyEnd {
client: user.nickname.clone(),
chan: chan.clone(),
},
}
.write_async(writer)
.await?;
Ok(())
}
pub async fn launch(
config: ServerConfig,
players: PlayerRegistry,
rooms: RoomRegistry,
metrics: MetricsRegistry,
) -> Result<Terminator> {
log::info!("Starting IRC projection");
@ -375,10 +412,11 @@ pub async fn launch(
total_connections.inc();
current_connections.inc();
log::debug!("Incoming connection from {socket_addr}");
let players_clone = players.clone();
let players = players.clone();
let rooms = rooms.clone();
let current_connections_clone = current_connections.clone();
let handle = tokio::task::spawn(async move {
match handle_socket(config, stream, socket_addr, players_clone).await {
match handle_socket(config, stream, socket_addr, players, rooms).await {
Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"),
}

View File

@ -51,20 +51,41 @@ async fn registration(scope: &mut TestScope<'_>, nickname: &str) -> Result<()> {
expect!(scope, ":irc.localhost NOTICE * :Welcome to my server!\n");
send!(scope, "NICK {nickname}\n");
send!(scope, "USER UserName 0 * :Real Name\n");
expect!(scope, ":irc.localhost 001 {nickname} :Welcome to Kek Server\n");
expect!(scope, ":irc.localhost 002 {nickname} :Welcome to Kek Server\n");
expect!(scope, ":irc.localhost 003 {nickname} :Welcome to Kek Server\n");
expect!(
scope,
":irc.localhost 001 {nickname} :Welcome to Kek Server\n"
);
expect!(
scope,
":irc.localhost 002 {nickname} :Welcome to Kek Server\n"
);
expect!(
scope,
":irc.localhost 003 {nickname} :Welcome to Kek Server\n"
);
expect!(scope, ":irc.localhost 004 {nickname} irc.localhost kek-0.1.alpha.3 DGMQRSZagiloswz CFILPQbcefgijklmnopqrstvz bkloveqjfI\n");
expect!(scope, ":irc.localhost 005 {nickname} CHANTYPES=# :are supported by this server\n");
expect!(
scope,
":irc.localhost 005 {nickname} CHANTYPES=# :are supported by this server\n"
);
Ok(())
}
async fn join(scope: &mut TestScope<'_>, nickname: &str) -> Result<()> {
send!(scope, "JOIN #channol\n");
expect!(scope, ":{nickname} JOIN #channol\n");
expect!(scope, ":irc.localhost 332 {nickname} #channol :chan topic lol\n");
expect!(scope, ":irc.localhost 353 {nickname} = #channol :{nickname}\n");
expect!(scope, ":irc.localhost 366 {nickname} #channol :End of /NAMES list\n");
expect!(
scope,
":irc.localhost 332 {nickname} #channol :chan topic lol\n"
);
expect!(
scope,
":irc.localhost 353 {nickname} = #channol :{nickname}\n"
);
expect!(
scope,
":irc.localhost 366 {nickname} #channol :End of /NAMES list\n"
);
Ok(())
}
@ -90,7 +111,10 @@ async fn test_two_connections_one_player() -> Result<()> {
join(&mut scope1, "NickName").await?;
join(&mut scope2, "NickName").await?;
send!(scope1, "PRIVMSG #channol :Chmoki vsem v etam chati!\n");
expect!(scope2, ":NickName PRIVMSG #channol :Chmoki vsem v etam chati!\n");
expect!(
scope2,
":NickName PRIVMSG #channol :Chmoki vsem v etam chati!\n"
);
send!(scope2, "PRIVMSG #channol :I tebe privetiki\n");
expect!(scope1, ":NickName PRIVMSG #channol :I tebe privetiki\n");
@ -109,7 +133,10 @@ async fn test_two_players() -> Result<()> {
join(&mut scope1, "NickName1").await?;
join(&mut scope2, "NickName2").await?;
send!(scope1, "PRIVMSG #channol :Chmoki vsem v etam chati!\n");
expect!(scope2, ":NickName1 PRIVMSG #channol :Chmoki vsem v etam chati!\n");
expect!(
scope2,
":NickName1 PRIVMSG #channol :Chmoki vsem v etam chati!\n"
);
send!(scope2, "PRIVMSG #channol :I tebe privetiki\n");
expect!(scope1, ":NickName2 PRIVMSG #channol :I tebe privetiki\n");

View File

@ -4,13 +4,21 @@ use super::*;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClientMessage {
/// CAP. Capability-related commands.
Capability { subcommand: CapabilitySubcommand },
Capability {
subcommand: CapabilitySubcommand,
},
/// PING <token>
Ping { token: ByteVec },
Ping {
token: ByteVec,
},
/// PONG <token>
Pong { token: ByteVec },
Pong {
token: ByteVec,
},
/// NICK <nickname>
Nick { nickname: ByteVec },
Nick {
nickname: ByteVec,
},
/// USER <username> 0 * :<realname>
User {
username: ByteVec,
@ -18,10 +26,28 @@ pub enum ClientMessage {
},
/// JOIN <chan>
Join(Chan),
/// MODE <target>
Mode(Chan), // TODO support not only chan
/// WHO <target>
Who(Chan), // TODO support not only chan
/// TOPIC <chan> :<topic>
Topic {
chan: Chan,
topic: ByteVec,
},
Part {
chan: Chan,
message: ByteVec,
},
/// PRIVMSG <target> :<msg>
PrivateMessage { recipient: Recipient, body: ByteVec },
PrivateMessage {
recipient: Recipient,
body: ByteVec,
},
/// QUIT :<reason>
Quit { reason: ByteVec },
Quit {
reason: ByteVec,
},
}
pub fn client_message(input: &[u8]) -> IResult<&[u8], ClientMessage> {
@ -32,6 +58,10 @@ pub fn client_message(input: &[u8]) -> IResult<&[u8], ClientMessage> {
client_message_nick,
client_message_user,
client_message_join,
client_message_mode,
client_message_who,
client_message_topic,
client_message_part,
client_message_privmsg,
client_message_quit,
))(input)
@ -103,6 +133,40 @@ fn client_message_join(input: &[u8]) -> IResult<&[u8], ClientMessage> {
Ok((input, ClientMessage::Join(chan)))
}
fn client_message_mode(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("MODE ")(input)?;
let (input, chan) = chan(input)?;
Ok((input, ClientMessage::Mode(chan)))
}
fn client_message_who(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("WHO ")(input)?;
let (input, chan) = chan(input)?;
Ok((input, ClientMessage::Who(chan)))
}
fn client_message_topic(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("TOPIC ")(input)?;
let (input, chan) = chan(input)?;
let (input, _) = tag(b" :")(input)?;
let (input, topic) = token(input)?;
let topic = topic.to_vec();
Ok((input, ClientMessage::Topic { chan, topic }))
}
fn client_message_part(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("PART ")(input)?;
let (input, chan) = chan(input)?;
let (input, _) = tag(b" :")(input)?;
let (input, message) = token(input)?;
let message = message.to_vec();
Ok((input, ClientMessage::Part { chan, message }))
}
fn client_message_privmsg(input: &[u8]) -> IResult<&[u8], ClientMessage> {
let (input, _) = tag("PRIVMSG ")(input)?;
let (input, recipient) = recipient(input)?;
@ -219,6 +283,17 @@ mod test {
realname: b"Real Name".to_vec(),
};
let result = client_message(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));
}
#[test]
fn test_client_message_part() {
let input = b"PART #chan :Pokasiki !!!";
let expected = ClientMessage::Part {
chan: Chan::Global(b"chan".to_vec()),
message: b"Pokasiki !!!".to_vec(),
};
let result = client_message(input);
assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result));
}