From 12d30ca5c204c38170e92f27f3ec1ad3cabd3dfb Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Sun, 21 Apr 2024 21:00:44 +0000 Subject: [PATCH] irc: implement server-time capability for incoming messages (#52) Spec: https://ircv3.net/specs/extensions/server-time Reviewed-on: https://git.vilunov.me/lavina/lavina/pulls/52 --- Cargo.lock | 1 + Cargo.toml | 1 + crates/lavina-core/Cargo.toml | 2 +- crates/lavina-core/src/player.rs | 24 +++++--- crates/lavina-core/src/repo/mod.rs | 12 +++- crates/lavina-core/src/room.rs | 18 ++++-- crates/projection-irc/Cargo.toml | 1 + crates/projection-irc/src/cap.rs | 3 +- crates/projection-irc/src/lib.rs | 40 +++++++++++-- crates/projection-irc/tests/lib.rs | 85 +++++++++++++++++++++++++-- crates/projection-xmpp/src/updates.rs | 1 + crates/proto-irc/src/lib.rs | 15 ++++- crates/proto-irc/src/server.rs | 7 +++ 13 files changed, 183 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a3e6e02..a586d14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1263,6 +1263,7 @@ version = "0.0.2-dev" dependencies = [ "anyhow", "bitflags 2.5.0", + "chrono", "futures-util", "lavina-core", "nonempty", diff --git a/Cargo.toml b/Cargo.toml index 7234d24..0158e42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ base64 = "0.22.0" lavina-core = { path = "crates/lavina-core" } tracing-subscriber = "0.3.16" sasl = { path = "crates/sasl" } +chrono = "0.4.37" [package] name = "lavina" diff --git a/crates/lavina-core/Cargo.toml b/crates/lavina-core/Cargo.toml index 531e959..92bf798 100644 --- a/crates/lavina-core/Cargo.toml +++ b/crates/lavina-core/Cargo.toml @@ -10,4 +10,4 @@ serde.workspace = true tokio.workspace = true tracing.workspace = true prometheus.workspace = true -chrono = "0.4.37" +chrono.workspace = true diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 8693752..9925709 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -10,6 +10,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use chrono::{DateTime, Utc}; use prometheus::{IntGauge, Registry as MetricsRegistry}; use serde::Serialize; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -57,7 +58,7 @@ pub struct PlayerConnection { } impl PlayerConnection { /// Handled in [Player::send_message]. - pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result<()> { + pub async fn send_message(&mut self, room_id: RoomId, body: Str) -> Result { let (promise, deferred) = oneshot(); let cmd = ClientCommand::SendMessage { room_id, body, promise }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; @@ -163,7 +164,7 @@ pub enum ClientCommand { SendMessage { room_id: RoomId, body: Str, - promise: Promise<()>, + promise: Promise, }, ChangeTopic { room_id: RoomId, @@ -181,6 +182,11 @@ pub enum JoinResult { Banned, } +pub enum SendMessageResult { + Success(DateTime), + NoSuchRoom, +} + /// Player update event type which is sent to a player actor and from there to a connection handler. #[derive(Clone, Debug)] pub enum Updates { @@ -192,6 +198,7 @@ pub enum Updates { room_id: RoomId, author_id: PlayerId, body: Str, + created_at: DateTime, }, RoomJoined { room_id: RoomId, @@ -367,8 +374,8 @@ impl Player { let _ = promise.send(()); } ClientCommand::SendMessage { room_id, body, promise } => { - self.send_message(connection_id, room_id, body).await; - let _ = promise.send(()); + let result = self.send_message(connection_id, room_id, body).await; + let _ = promise.send(result); } ClientCommand::ChangeTopic { room_id, @@ -425,18 +432,21 @@ impl Player { self.broadcast_update(update, connection_id).await; } - async fn send_message(&mut self, connection_id: ConnectionId, room_id: RoomId, body: Str) { + async fn send_message(&mut self, connection_id: ConnectionId, room_id: RoomId, body: Str) -> SendMessageResult { let Some(room) = self.my_rooms.get(&room_id) else { tracing::info!("no room found"); - return; + return SendMessageResult::NoSuchRoom; }; - room.send_message(&self.player_id, body.clone()).await; + let created_at = chrono::Utc::now(); + room.send_message(&self.player_id, body.clone(), created_at.clone()).await; let update = Updates::NewMessage { room_id, author_id: self.player_id.clone(), body, + created_at, }; self.broadcast_update(update, connection_id).await; + SendMessageResult::Success(created_at) } async fn change_topic(&mut self, connection_id: ConnectionId, room_id: RoomId, new_topic: Str) { diff --git a/crates/lavina-core/src/repo/mod.rs b/crates/lavina-core/src/repo/mod.rs index e8e3854..645c764 100644 --- a/crates/lavina-core/src/repo/mod.rs +++ b/crates/lavina-core/src/repo/mod.rs @@ -4,6 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::anyhow; +use chrono::{DateTime, Utc}; use serde::Deserialize; use sqlx::sqlite::SqliteConnectOptions; use sqlx::{ConnectOptions, Connection, FromRow, Sqlite, SqliteConnection, Transaction}; @@ -80,7 +81,14 @@ impl Storage { Ok(id) } - pub async fn insert_message(&mut self, room_id: u32, id: u32, content: &str, author_id: &str) -> Result<()> { + pub async fn insert_message( + &mut self, + room_id: u32, + id: u32, + content: &str, + author_id: &str, + created_at: &DateTime, + ) -> Result<()> { let mut executor = self.conn.lock().await; let res: Option<(u32,)> = sqlx::query_as("select id from users where name = ?;") .bind(author_id) @@ -98,7 +106,7 @@ impl Storage { .bind(id) .bind(content) .bind(author_id) - .bind(chrono::Utc::now().to_string()) + .bind(created_at.to_string()) .bind(room_id) .execute(&mut *executor) .await?; diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index a5e2dab..52ac7c4 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use std::{collections::HashMap, hash::Hash, sync::Arc}; +use chrono::{DateTime, Utc}; use prometheus::{IntGauge, Registry as MetricRegistry}; use serde::Serialize; use tokio::sync::RwLock as AsyncRwLock; @@ -163,9 +164,9 @@ impl RoomHandle { lock.broadcast_update(update, player_id).await; } - pub async fn send_message(&self, player_id: &PlayerId, body: Str) { + pub async fn send_message(&self, player_id: &PlayerId, body: Str, created_at: DateTime) { let mut lock = self.0.write().await; - let res = lock.send_message(player_id, body).await; + let res = lock.send_message(player_id, body, created_at).await; if let Err(err) = res { log::warn!("Failed to send message: {err:?}"); } @@ -208,14 +209,23 @@ struct Room { storage: Storage, } impl Room { - async fn send_message(&mut self, author_id: &PlayerId, body: Str) -> Result<()> { + async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime) -> Result<()> { tracing::info!("Adding a message to room"); - self.storage.insert_message(self.storage_id, self.message_count, &body, &*author_id.as_inner()).await?; + self.storage + .insert_message( + self.storage_id, + self.message_count, + &body, + &*author_id.as_inner(), + &created_at, + ) + .await?; self.message_count += 1; let update = Updates::NewMessage { room_id: self.room_id.clone(), author_id: author_id.clone(), body, + created_at, }; self.broadcast_update(update, author_id).await; Ok(()) diff --git a/crates/projection-irc/Cargo.toml b/crates/projection-irc/Cargo.toml index 3135280..7275f69 100644 --- a/crates/projection-irc/Cargo.toml +++ b/crates/projection-irc/Cargo.toml @@ -12,6 +12,7 @@ tokio.workspace = true prometheus.workspace = true futures-util.workspace = true nonempty.workspace = true +chrono.workspace = true bitflags = "2.4.1" proto-irc = { path = "../proto-irc" } sasl = { path = "../sasl" } diff --git a/crates/projection-irc/src/cap.rs b/crates/projection-irc/src/cap.rs index af0e3ff..83f1e24 100644 --- a/crates/projection-irc/src/cap.rs +++ b/crates/projection-irc/src/cap.rs @@ -1,9 +1,10 @@ use bitflags::bitflags; bitflags! { - #[derive(Debug)] + #[derive(Debug, Clone, Copy)] pub struct Capabilities: u32 { const None = 0; const Sasl = 1 << 0; + const ServerTime = 1 << 1; } } diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 1513546..ce450c1 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use anyhow::{anyhow, Result}; +use chrono::SecondsFormat; use futures_util::future::join_all; use nonempty::nonempty; use nonempty::NonEmpty; @@ -24,7 +25,7 @@ use proto_irc::client::{client_message, ClientMessage}; use proto_irc::server::CapSubBody; use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; use proto_irc::user::PrefixedNick; -use proto_irc::{Chan, Recipient}; +use proto_irc::{Chan, Recipient, Tag}; use sasl::AuthBody; mod cap; @@ -49,6 +50,7 @@ struct RegisteredUser { */ username: Str, realname: Str, + enabled_capabilities: Capabilities, } async fn handle_socket( @@ -136,7 +138,7 @@ impl RegistrationState { sender: Some(config.server_name.clone().into()), body: ServerMessageBody::Cap { target: self.future_nickname.clone().unwrap_or_else(|| "*".into()), - subcmd: CapSubBody::Ls("sasl=PLAIN".into()), + subcmd: CapSubBody::Ls("sasl=PLAIN server-time".into()), }, } .write_async(writer) @@ -156,16 +158,30 @@ impl RegistrationState { self.enabled_capabilities |= Capabilities::Sasl; } acked.push(cap); + } else if &*cap.name == "server-time" { + if cap.to_disable { + self.enabled_capabilities &= !Capabilities::ServerTime; + } else { + self.enabled_capabilities |= Capabilities::ServerTime; + } + acked.push(cap); } else { naked.push(cap); } } let mut ack_body = String::new(); - for cap in acked { - if cap.to_disable { + if let Some((first, tail)) = acked.split_first() { + if first.to_disable { ack_body.push('-'); } - ack_body += &*cap.name; + ack_body += &*first.name; + for cap in tail { + ack_body.push(' '); + if cap.to_disable { + ack_body.push('-'); + } + ack_body += &*cap.name; + } } ServerMessage { tags: vec![], @@ -195,6 +211,7 @@ impl RegistrationState { nickname: nickname.clone(), username, realname, + enabled_capabilities: self.enabled_capabilities, }; self.finalize_auth(candidate_user, writer, storage, config).await } @@ -208,6 +225,7 @@ impl RegistrationState { nickname: nickname.clone(), username: username.clone(), realname: realname.clone(), + enabled_capabilities: self.enabled_capabilities, }; self.finalize_auth(candidate_user, writer, storage, config).await } else { @@ -224,6 +242,7 @@ impl RegistrationState { nickname: nickname.clone(), username, realname, + enabled_capabilities: self.enabled_capabilities, }; self.finalize_auth(candidate_user, writer, storage, config).await } else { @@ -587,9 +606,18 @@ async fn handle_update( author_id, room_id, body, + created_at, } => { + let mut tags = vec![]; + if user.enabled_capabilities.contains(Capabilities::ServerTime) { + let tag = Tag { + key: "time".into(), + value: Some(created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()), + }; + tags.push(tag); + } ServerMessage { - tags: vec![], + tags, sender: Some(author_id.as_inner().clone()), body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_inner().clone())), diff --git a/crates/projection-irc/tests/lib.rs b/crates/projection-irc/tests/lib.rs index f2f4505..5a4eb7c 100644 --- a/crates/projection-irc/tests/lib.rs +++ b/crates/projection-irc/tests/lib.rs @@ -1,17 +1,20 @@ use std::io::ErrorKind; -use std::net::SocketAddr; use std::time::Duration; use anyhow::{anyhow, Result}; +use chrono::{DateTime, SecondsFormat}; use prometheus::Registry as MetricsRegistry; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::TcpStream; +use lavina_core::player::{JoinResult, PlayerId, SendMessageResult}; use lavina_core::repo::{Storage, StorageConfig}; +use lavina_core::room::RoomId; use lavina_core::LavinaCore; use projection_irc::APP_VERSION; use projection_irc::{launch, read_irc_message, RunningServer, ServerConfig}; + struct TestScope<'a> { reader: BufReader>, writer: WriteHalf<'a>, @@ -89,6 +92,11 @@ impl<'a> TestScope<'a> { Err(_) => Ok(()), } } + + async fn expect_cap_ls(&mut self) -> Result<()> { + self.expect(":testserver CAP * LS :sasl=PLAIN server-time").await?; + Ok(()) + } } struct TestServer { @@ -388,7 +396,7 @@ async fn scenario_cap_full_negotiation() -> Result<()> { s.send("CAP LS 302").await?; s.send("NICK tester").await?; s.send("USER UserName 0 * :Real Name").await?; - s.expect(":testserver CAP * LS :sasl=PLAIN").await?; + s.expect_cap_ls().await?; s.send("CAP REQ :sasl").await?; s.expect(":testserver CAP tester ACK :sasl").await?; s.send("AUTHENTICATE PLAIN").await?; @@ -426,7 +434,7 @@ async fn scenario_cap_full_negotiation_nick_last() -> Result<()> { let mut s = TestScope::new(&mut stream); s.send("CAP LS 302").await?; - s.expect(":testserver CAP * LS :sasl=PLAIN").await?; + s.expect_cap_ls().await?; s.send("CAP REQ :sasl").await?; s.expect(":testserver CAP * ACK :sasl").await?; s.send("AUTHENTICATE PLAIN").await?; @@ -505,7 +513,7 @@ async fn scenario_cap_sasl_fail() -> Result<()> { s.send("CAP LS 302").await?; s.send("NICK tester").await?; s.send("USER UserName 0 * :Real Name").await?; - s.expect(":testserver CAP * LS :sasl=PLAIN").await?; + s.expect_cap_ls().await?; s.send("CAP REQ :sasl").await?; s.expect(":testserver CAP tester ACK :sasl").await?; s.send("AUTHENTICATE SHA256").await?; @@ -558,3 +566,72 @@ async fn terminate_socket_scenario() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn server_time_capability() -> Result<()> { + let mut server = TestServer::start().await?; + + // test scenario + + server.storage.create_user("tester").await?; + server.storage.set_password("tester", "password").await?; + + let mut stream = TcpStream::connect(server.server.addr).await?; + let mut s = TestScope::new(&mut stream); + + s.send("CAP LS 302").await?; + s.send("NICK tester").await?; + s.send("USER UserName 0 * :Real Name").await?; + s.expect_cap_ls().await?; + s.send("CAP REQ :sasl server-time").await?; + s.expect(":testserver CAP tester ACK :sasl server-time").await?; + s.send("AUTHENTICATE PLAIN").await?; + s.expect(":testserver AUTHENTICATE +").await?; + s.send("AUTHENTICATE dGVzdGVyAHRlc3RlcgBwYXNzd29yZA==").await?; // base64-encoded 'tester\x00tester\x00password' + s.expect(":testserver 900 tester tester tester :You are now logged in as tester").await?; + s.expect(":testserver 903 tester :SASL authentication successful").await?; + s.send("CAP END").await?; + s.expect_server_introduction("tester").await?; + s.expect_nothing().await?; + s.send("JOIN #test").await?; + s.expect(":tester JOIN #test").await?; + s.expect(":testserver 332 tester #test :New room").await?; + s.expect(":testserver 353 tester = #test :tester").await?; + s.expect(":testserver 366 tester #test :End of /NAMES list").await?; + + server.storage.create_user("some_guy").await?; + let mut conn = server.core.players.connect_to_player(&PlayerId::from("some_guy").unwrap()).await; + let res = conn.join_room(RoomId::from("test").unwrap()).await?; + let JoinResult::Success(_) = res else { + panic!("Failed to join room"); + }; + + s.expect(":some_guy JOIN #test").await?; + + let SendMessageResult::Success(res) = conn.send_message(RoomId::from("test").unwrap(), "Hello".into()).await? + else { + panic!("Failed to send message"); + }; + s.expect(&format!( + "@time={} :some_guy PRIVMSG #test :Hello", + res.to_rfc3339_opts(SecondsFormat::Millis, true) + )) + .await?; + + // formatting check + assert_eq!( + DateTime::parse_from_rfc3339(&"2024-01-01T10:00:32.123Z").unwrap().to_rfc3339_opts(SecondsFormat::Millis, true), + "2024-01-01T10:00:32.123Z" + ); + + s.send("QUIT :Leaving").await?; + s.expect(":testserver ERROR :Leaving the server").await?; + s.expect_eof().await?; + + stream.shutdown().await?; + + // wrap up + + server.server.terminate().await?; + Ok(()) +} diff --git a/crates/projection-xmpp/src/updates.rs b/crates/projection-xmpp/src/updates.rs index 0161b3f..fcc62b6 100644 --- a/crates/projection-xmpp/src/updates.rs +++ b/crates/projection-xmpp/src/updates.rs @@ -17,6 +17,7 @@ impl<'a> XmppConnection<'a> { room_id, author_id, body, + created_at: _, } => { Message::<()> { to: Some(Jid { diff --git a/crates/proto-irc/src/lib.rs b/crates/proto-irc/src/lib.rs index 54ff676..3c7ade2 100644 --- a/crates/proto-irc/src/lib.rs +++ b/crates/proto-irc/src/lib.rs @@ -18,8 +18,19 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; /// Single message tag value. #[derive(Clone, Debug, PartialEq, Eq)] pub struct Tag { - key: Str, - value: Option, + pub key: Str, + pub value: Option, +} + +impl Tag { + pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> { + writer.write_all(self.key.as_bytes()).await?; + if let Some(value) = &self.value { + writer.write_all(b"=").await?; + writer.write_all(value.as_bytes()).await?; + } + Ok(()) + } } fn receiver(input: &str) -> IResult<&str, &str> { diff --git a/crates/proto-irc/src/server.rs b/crates/proto-irc/src/server.rs index c751e23..53cc05d 100644 --- a/crates/proto-irc/src/server.rs +++ b/crates/proto-irc/src/server.rs @@ -19,6 +19,13 @@ pub struct ServerMessage { impl ServerMessage { pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> { + if !self.tags.is_empty() { + for tag in &self.tags { + writer.write_all(b"@").await?; + tag.write_async(writer).await?; + writer.write_all(b" ").await?; + } + } match &self.sender { Some(ref sender) => { writer.write_all(b":").await?;