forked from lavina/lavina
Compare commits
7 Commits
d436631450
...
b0721160ff
Author | SHA1 | Date |
---|---|---|
Nikita Vilunov | b0721160ff | |
Nikita Vilunov | 2ed70a4885 | |
Nikita Vilunov | 36b0d50d51 | |
Nikita Vilunov | adece11fef | |
Nikita Vilunov | ab61e975bf | |
Nikita Vilunov | fd437df67e | |
Nikita Vilunov | a325c7307c |
|
@ -9,6 +9,7 @@ server_name = "irc.localhost"
|
||||||
listen_on = "127.0.0.1:5222"
|
listen_on = "127.0.0.1:5222"
|
||||||
cert = "./certs/xmpp.pem"
|
cert = "./certs/xmpp.pem"
|
||||||
key = "./certs/xmpp.key"
|
key = "./certs/xmpp.key"
|
||||||
|
hostname = "localhost"
|
||||||
|
|
||||||
[storage]
|
[storage]
|
||||||
db_path = "db.sqlite"
|
db_path = "db.sqlite"
|
||||||
|
|
|
@ -48,6 +48,31 @@ impl<'a> TestScope<'a> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn expect_that(&mut self, validate: impl FnOnce(&str) -> bool) -> Result<()> {
|
||||||
|
let len = tokio::time::timeout(self.timeout, read_irc_message(&mut self.reader, &mut self.buffer)).await??;
|
||||||
|
let msg = std::str::from_utf8(&self.buffer[..len - 2])?;
|
||||||
|
if !validate(msg) {
|
||||||
|
return Err(anyhow!("unexpected message: {:?}", msg));
|
||||||
|
}
|
||||||
|
self.buffer.clear();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn expect_server_introduction(&mut self, nick: &str) -> Result<()> {
|
||||||
|
self.expect(&format!(":testserver 001 {nick} :Welcome to testserver Server")).await?;
|
||||||
|
self.expect(&format!(":testserver 002 {nick} :Welcome to testserver Server")).await?;
|
||||||
|
self.expect(&format!(":testserver 003 {nick} :Welcome to testserver Server")).await?;
|
||||||
|
self.expect(&format!(
|
||||||
|
":testserver 004 {nick} testserver {APP_VERSION} r CFILPQbcefgijklmnopqrstvz"
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
self.expect(&format!(
|
||||||
|
":testserver 005 {nick} CHANTYPES=# :are supported by this server"
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn expect_eof(&mut self) -> Result<()> {
|
async fn expect_eof(&mut self) -> Result<()> {
|
||||||
let mut buf = [0; 1];
|
let mut buf = [0; 1];
|
||||||
let len = tokio::time::timeout(self.timeout, self.reader.read(&mut buf)).await??;
|
let len = tokio::time::timeout(self.timeout, self.reader.read(&mut buf)).await??;
|
||||||
|
@ -113,18 +138,7 @@ async fn scenario_basic() -> Result<()> {
|
||||||
s.send("PASS password").await?;
|
s.send("PASS password").await?;
|
||||||
s.send("NICK tester").await?;
|
s.send("NICK tester").await?;
|
||||||
s.send("USER UserName 0 * :Real Name").await?;
|
s.send("USER UserName 0 * :Real Name").await?;
|
||||||
s.expect(":testserver 001 tester :Welcome to testserver Server").await?;
|
s.expect_server_introduction("tester").await?;
|
||||||
s.expect(":testserver 002 tester :Welcome to testserver Server").await?;
|
|
||||||
s.expect(":testserver 003 tester :Welcome to testserver Server").await?;
|
|
||||||
s.expect(
|
|
||||||
format!(
|
|
||||||
":testserver 004 tester testserver {} r CFILPQbcefgijklmnopqrstvz",
|
|
||||||
&APP_VERSION
|
|
||||||
)
|
|
||||||
.as_str(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
s.expect(":testserver 005 tester CHANTYPES=# :are supported by this server").await?;
|
|
||||||
s.expect_nothing().await?;
|
s.expect_nothing().await?;
|
||||||
s.send("QUIT :Leaving").await?;
|
s.send("QUIT :Leaving").await?;
|
||||||
s.expect(":testserver ERROR :Leaving the server").await?;
|
s.expect(":testserver ERROR :Leaving the server").await?;
|
||||||
|
@ -138,6 +152,132 @@ async fn scenario_basic() -> Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scenario_force_join_msg() -> Result<()> {
|
||||||
|
let mut server = TestServer::start().await?;
|
||||||
|
|
||||||
|
// test scenario
|
||||||
|
|
||||||
|
server.storage.create_user("tester").await?;
|
||||||
|
server.storage.set_password("tester", "password").await?;
|
||||||
|
|
||||||
|
let mut stream1 = TcpStream::connect(server.server.addr).await?;
|
||||||
|
let mut s1 = TestScope::new(&mut stream1);
|
||||||
|
let mut stream2 = TcpStream::connect(server.server.addr).await?;
|
||||||
|
let mut s2 = TestScope::new(&mut stream2);
|
||||||
|
|
||||||
|
s1.send("PASS password").await?;
|
||||||
|
s1.send("NICK tester").await?;
|
||||||
|
s1.send("USER UserName 0 * :Real Name").await?;
|
||||||
|
s1.expect_server_introduction("tester").await?;
|
||||||
|
s1.expect_nothing().await?;
|
||||||
|
|
||||||
|
s2.send("PASS password").await?;
|
||||||
|
s2.send("NICK tester").await?;
|
||||||
|
s2.send("USER UserName 0 * :Real Name").await?;
|
||||||
|
s2.expect_server_introduction("tester").await?;
|
||||||
|
s2.expect_nothing().await?;
|
||||||
|
|
||||||
|
// We join the channel from the first connection
|
||||||
|
s1.send("JOIN #test").await?;
|
||||||
|
s1.expect(":tester JOIN #test").await?;
|
||||||
|
s1.expect(":testserver 332 tester #test :New room").await?;
|
||||||
|
s1.expect(":testserver 353 tester = #test :tester").await?;
|
||||||
|
s1.expect(":testserver 366 tester #test :End of /NAMES list").await?;
|
||||||
|
|
||||||
|
// And the second connection should receive the JOIN message (forced JOIN)
|
||||||
|
s2.expect(":tester JOIN #test").await?;
|
||||||
|
s2.expect(":testserver 332 tester #test :New room").await?;
|
||||||
|
s2.expect(":testserver 353 tester = #test :tester").await?;
|
||||||
|
s2.expect(":testserver 366 tester #test :End of /NAMES list").await?;
|
||||||
|
|
||||||
|
// We send a message to the channel from the second connection
|
||||||
|
s2.send("PRIVMSG #test :Hello").await?;
|
||||||
|
// We should not receive an acknowledgement from the server
|
||||||
|
s2.expect_nothing().await?;
|
||||||
|
// But we should receive this message from the first connection
|
||||||
|
s1.expect(":tester PRIVMSG #test :Hello").await?;
|
||||||
|
|
||||||
|
s1.send("QUIT :Leaving").await?;
|
||||||
|
s1.expect(":testserver ERROR :Leaving the server").await?;
|
||||||
|
s1.expect_eof().await?;
|
||||||
|
|
||||||
|
// Closing a connection does not kick you from the channel on a different connection
|
||||||
|
s2.expect_nothing().await?;
|
||||||
|
|
||||||
|
s2.send("QUIT :Leaving").await?;
|
||||||
|
s2.expect(":testserver ERROR :Leaving the server").await?;
|
||||||
|
s2.expect_eof().await?;
|
||||||
|
|
||||||
|
stream1.shutdown().await?;
|
||||||
|
stream2.shutdown().await?;
|
||||||
|
|
||||||
|
// wrap up
|
||||||
|
|
||||||
|
server.server.terminate().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scenario_two_users() -> Result<()> {
|
||||||
|
let mut server = TestServer::start().await?;
|
||||||
|
|
||||||
|
// test scenario
|
||||||
|
|
||||||
|
server.storage.create_user("tester1").await?;
|
||||||
|
server.storage.set_password("tester1", "password").await?;
|
||||||
|
server.storage.create_user("tester2").await?;
|
||||||
|
server.storage.set_password("tester2", "password").await?;
|
||||||
|
|
||||||
|
let mut stream1 = TcpStream::connect(server.server.addr).await?;
|
||||||
|
let mut s1 = TestScope::new(&mut stream1);
|
||||||
|
let mut stream2 = TcpStream::connect(server.server.addr).await?;
|
||||||
|
let mut s2 = TestScope::new(&mut stream2);
|
||||||
|
|
||||||
|
s1.send("PASS password").await?;
|
||||||
|
s1.send("NICK tester1").await?;
|
||||||
|
s1.send("USER UserName 0 * :Real Name").await?;
|
||||||
|
s1.expect_server_introduction("tester1").await?;
|
||||||
|
s1.expect_nothing().await?;
|
||||||
|
|
||||||
|
s2.send("PASS password").await?;
|
||||||
|
s2.send("NICK tester2").await?;
|
||||||
|
s2.send("USER UserName 0 * :Real Name").await?;
|
||||||
|
s2.expect_server_introduction("tester2").await?;
|
||||||
|
s2.expect_nothing().await?;
|
||||||
|
|
||||||
|
// Join the channel from the first user
|
||||||
|
s1.send("JOIN #test").await?;
|
||||||
|
s1.expect(":tester1 JOIN #test").await?;
|
||||||
|
s1.expect(":testserver 332 tester1 #test :New room").await?;
|
||||||
|
s1.expect(":testserver 353 tester1 = #test :tester1").await?;
|
||||||
|
s1.expect(":testserver 366 tester1 #test :End of /NAMES list").await?;
|
||||||
|
// Then join the channel from the second user
|
||||||
|
s2.send("JOIN #test").await?;
|
||||||
|
s2.expect(":tester2 JOIN #test").await?;
|
||||||
|
s2.expect(":testserver 332 tester2 #test :New room").await?;
|
||||||
|
s2.expect_that(|msg| {
|
||||||
|
msg == ":testserver 353 tester2 = #test :tester1 tester2"
|
||||||
|
|| msg == ":testserver 353 tester2 = #test :tester2 tester1"
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
s2.expect(":testserver 366 tester2 #test :End of /NAMES list").await?;
|
||||||
|
// The first user should receive the JOIN message from the second user
|
||||||
|
s1.expect(":tester2 JOIN #test").await?;
|
||||||
|
s1.expect_nothing().await?;
|
||||||
|
s2.expect_nothing().await?;
|
||||||
|
// Send a message from the second user
|
||||||
|
s2.send("PRIVMSG #test :Hello").await?;
|
||||||
|
// The first user should receive the message
|
||||||
|
s1.expect(":tester2 PRIVMSG #test :Hello").await?;
|
||||||
|
// Leave the channel from the first user
|
||||||
|
s1.send("PART #test").await?;
|
||||||
|
s1.expect(":tester1 PART #test").await?;
|
||||||
|
// The second user should receive the PART message
|
||||||
|
s2.expect(":tester1 PART #test").await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
IRC SASL doc: https://ircv3.net/specs/extensions/sasl-3.1.html
|
IRC SASL doc: https://ircv3.net/specs/extensions/sasl-3.1.html
|
||||||
AUTHENTICATE doc: https://modern.ircdocs.horse/#authenticate-message
|
AUTHENTICATE doc: https://modern.ircdocs.horse/#authenticate-message
|
||||||
|
@ -168,18 +308,7 @@ async fn scenario_cap_full_negotiation() -> Result<()> {
|
||||||
|
|
||||||
s.send("CAP END").await?;
|
s.send("CAP END").await?;
|
||||||
|
|
||||||
s.expect(":testserver 001 tester :Welcome to testserver Server").await?;
|
s.expect_server_introduction("tester").await?;
|
||||||
s.expect(":testserver 002 tester :Welcome to testserver Server").await?;
|
|
||||||
s.expect(":testserver 003 tester :Welcome to testserver Server").await?;
|
|
||||||
s.expect(
|
|
||||||
format!(
|
|
||||||
":testserver 004 tester testserver {} r CFILPQbcefgijklmnopqrstvz",
|
|
||||||
&APP_VERSION
|
|
||||||
)
|
|
||||||
.as_str(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
s.expect(":testserver 005 tester CHANTYPES=# :are supported by this server").await?;
|
|
||||||
s.expect_nothing().await?;
|
s.expect_nothing().await?;
|
||||||
s.send("QUIT :Leaving").await?;
|
s.send("QUIT :Leaving").await?;
|
||||||
s.expect(":testserver ERROR :Leaving the server").await?;
|
s.expect(":testserver ERROR :Leaving the server").await?;
|
||||||
|
@ -217,18 +346,7 @@ async fn scenario_cap_short_negotiation() -> Result<()> {
|
||||||
|
|
||||||
s.send("CAP END").await?;
|
s.send("CAP END").await?;
|
||||||
|
|
||||||
s.expect(":testserver 001 tester :Welcome to testserver Server").await?;
|
s.expect_server_introduction("tester").await?;
|
||||||
s.expect(":testserver 002 tester :Welcome to testserver Server").await?;
|
|
||||||
s.expect(":testserver 003 tester :Welcome to testserver Server").await?;
|
|
||||||
s.expect(
|
|
||||||
format!(
|
|
||||||
":testserver 004 tester testserver {} r CFILPQbcefgijklmnopqrstvz",
|
|
||||||
&APP_VERSION
|
|
||||||
)
|
|
||||||
.as_str(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
s.expect(":testserver 005 tester CHANTYPES=# :are supported by this server").await?;
|
|
||||||
s.expect_nothing().await?;
|
s.expect_nothing().await?;
|
||||||
s.send("QUIT :Leaving").await?;
|
s.send("QUIT :Leaving").await?;
|
||||||
s.expect(":testserver ERROR :Leaving the server").await?;
|
s.expect(":testserver ERROR :Leaving the server").await?;
|
||||||
|
@ -272,18 +390,7 @@ async fn scenario_cap_sasl_fail() -> Result<()> {
|
||||||
|
|
||||||
s.send("CAP END").await?;
|
s.send("CAP END").await?;
|
||||||
|
|
||||||
s.expect(":testserver 001 tester :Welcome to testserver Server").await?;
|
s.expect_server_introduction("tester").await?;
|
||||||
s.expect(":testserver 002 tester :Welcome to testserver Server").await?;
|
|
||||||
s.expect(":testserver 003 tester :Welcome to testserver Server").await?;
|
|
||||||
s.expect(
|
|
||||||
format!(
|
|
||||||
":testserver 004 tester testserver {} r CFILPQbcefgijklmnopqrstvz",
|
|
||||||
&APP_VERSION
|
|
||||||
)
|
|
||||||
.as_str(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
s.expect(":testserver 005 tester CHANTYPES=# :are supported by this server").await?;
|
|
||||||
s.expect_nothing().await?;
|
s.expect_nothing().await?;
|
||||||
s.send("QUIT :Leaving").await?;
|
s.send("QUIT :Leaving").await?;
|
||||||
s.expect(":testserver ERROR :Leaving the server").await?;
|
s.expect(":testserver ERROR :Leaving the server").await?;
|
||||||
|
|
|
@ -4,7 +4,7 @@ use quick_xml::events::Event;
|
||||||
|
|
||||||
use lavina_core::room::RoomRegistry;
|
use lavina_core::room::RoomRegistry;
|
||||||
use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server};
|
use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server};
|
||||||
use proto_xmpp::client::{Iq, IqType};
|
use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType};
|
||||||
use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery};
|
use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery};
|
||||||
use proto_xmpp::roster::RosterQuery;
|
use proto_xmpp::roster::RosterQuery;
|
||||||
use proto_xmpp::session::Session;
|
use proto_xmpp::session::Session;
|
||||||
|
@ -25,7 +25,7 @@ impl<'a> XmppConnection<'a> {
|
||||||
r#type: IqType::Result,
|
r#type: IqType::Result,
|
||||||
body: BindResponse(Jid {
|
body: BindResponse(Jid {
|
||||||
name: Some(Name("darova".into())),
|
name: Some(Name("darova".into())),
|
||||||
server: Server("localhost".into()),
|
server: Server(self.hostname.clone()),
|
||||||
resource: Some(Resource("kek".into())),
|
resource: Some(Resource("kek".into())),
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
@ -52,7 +52,7 @@ impl<'a> XmppConnection<'a> {
|
||||||
req.serialize(output);
|
req.serialize(output);
|
||||||
}
|
}
|
||||||
IqClientBody::DiscoInfo(info) => {
|
IqClientBody::DiscoInfo(info) => {
|
||||||
let response = disco_info(iq.to.as_deref(), &info);
|
let response = self.disco_info(iq.to.as_deref(), &info);
|
||||||
let req = Iq {
|
let req = Iq {
|
||||||
from: iq.to,
|
from: iq.to,
|
||||||
id: iq.id,
|
id: iq.id,
|
||||||
|
@ -63,7 +63,7 @@ impl<'a> XmppConnection<'a> {
|
||||||
req.serialize(output);
|
req.serialize(output);
|
||||||
}
|
}
|
||||||
IqClientBody::DiscoItem(item) => {
|
IqClientBody::DiscoItem(item) => {
|
||||||
let response = disco_items(iq.to.as_deref(), &item, self.rooms).await;
|
let response = self.disco_items(iq.to.as_deref(), &item, self.rooms).await;
|
||||||
let req = Iq {
|
let req = Iq {
|
||||||
from: iq.to,
|
from: iq.to,
|
||||||
id: iq.id,
|
id: iq.id,
|
||||||
|
@ -79,84 +79,87 @@ impl<'a> XmppConnection<'a> {
|
||||||
id: iq.id,
|
id: iq.id,
|
||||||
to: None,
|
to: None,
|
||||||
r#type: IqType::Error,
|
r#type: IqType::Error,
|
||||||
body: (),
|
body: IqError {
|
||||||
|
r#type: IqErrorType::Cancel,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
req.serialize(output);
|
req.serialize(output);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
fn disco_info(to: Option<&str>, req: &InfoQuery) -> InfoQuery {
|
fn disco_info(&self, to: Option<&str>, req: &InfoQuery) -> InfoQuery {
|
||||||
let identity;
|
let identity;
|
||||||
let feature;
|
let feature;
|
||||||
match to {
|
|
||||||
Some("localhost") => {
|
|
||||||
identity = vec![Identity {
|
|
||||||
category: "server".into(),
|
|
||||||
name: None,
|
|
||||||
r#type: "im".into(),
|
|
||||||
}];
|
|
||||||
feature = vec![
|
|
||||||
Feature::new("http://jabber.org/protocol/disco#info"),
|
|
||||||
Feature::new("http://jabber.org/protocol/disco#items"),
|
|
||||||
Feature::new("iq"),
|
|
||||||
Feature::new("presence"),
|
|
||||||
]
|
|
||||||
}
|
|
||||||
Some("rooms.localhost") => {
|
|
||||||
identity = vec![Identity {
|
|
||||||
category: "conference".into(),
|
|
||||||
name: Some("Chat rooms".into()),
|
|
||||||
r#type: "text".into(),
|
|
||||||
}];
|
|
||||||
feature = vec![
|
|
||||||
Feature::new("http://jabber.org/protocol/disco#info"),
|
|
||||||
Feature::new("http://jabber.org/protocol/disco#items"),
|
|
||||||
Feature::new("http://jabber.org/protocol/muc"),
|
|
||||||
]
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
identity = vec![];
|
|
||||||
feature = vec![];
|
|
||||||
}
|
|
||||||
};
|
|
||||||
InfoQuery {
|
|
||||||
node: None,
|
|
||||||
identity,
|
|
||||||
feature,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn disco_items(to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery {
|
match to {
|
||||||
let item = match to {
|
Some(r) if r == &*self.hostname => {
|
||||||
Some("localhost") => {
|
identity = vec![Identity {
|
||||||
vec![Item {
|
category: "server".into(),
|
||||||
jid: Jid {
|
|
||||||
name: None,
|
name: None,
|
||||||
server: Server("rooms.localhost".into()),
|
r#type: "im".into(),
|
||||||
resource: None,
|
}];
|
||||||
},
|
feature = vec![
|
||||||
name: None,
|
Feature::new("http://jabber.org/protocol/disco#info"),
|
||||||
node: None,
|
Feature::new("http://jabber.org/protocol/disco#items"),
|
||||||
}]
|
Feature::new("iq"),
|
||||||
|
Feature::new("presence"),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
Some(r) if r == &*self.hostname_rooms => {
|
||||||
|
identity = vec![Identity {
|
||||||
|
category: "conference".into(),
|
||||||
|
name: Some("Chat rooms".into()),
|
||||||
|
r#type: "text".into(),
|
||||||
|
}];
|
||||||
|
feature = vec![
|
||||||
|
Feature::new("http://jabber.org/protocol/disco#info"),
|
||||||
|
Feature::new("http://jabber.org/protocol/disco#items"),
|
||||||
|
Feature::new("http://jabber.org/protocol/muc"),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
identity = vec![];
|
||||||
|
feature = vec![];
|
||||||
|
}
|
||||||
|
};
|
||||||
|
InfoQuery {
|
||||||
|
node: None,
|
||||||
|
identity,
|
||||||
|
feature,
|
||||||
}
|
}
|
||||||
Some("rooms.localhost") => {
|
}
|
||||||
let room_list = rooms.get_all_rooms().await;
|
|
||||||
room_list
|
async fn disco_items(&self, to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery {
|
||||||
.into_iter()
|
let item = match to {
|
||||||
.map(|room_info| Item {
|
Some(r) if r == &*self.hostname => {
|
||||||
|
vec![Item {
|
||||||
jid: Jid {
|
jid: Jid {
|
||||||
name: Some(Name(room_info.id.into_inner())),
|
name: None,
|
||||||
server: Server("rooms.localhost".into()),
|
server: Server(self.hostname_rooms.clone()),
|
||||||
resource: None,
|
resource: None,
|
||||||
},
|
},
|
||||||
name: None,
|
name: None,
|
||||||
node: None,
|
node: None,
|
||||||
})
|
}]
|
||||||
.collect()
|
}
|
||||||
}
|
Some(r) if r == &*self.hostname_rooms => {
|
||||||
_ => vec![],
|
let room_list = rooms.get_all_rooms().await;
|
||||||
};
|
room_list
|
||||||
ItemQuery { item }
|
.into_iter()
|
||||||
|
.map(|room_info| Item {
|
||||||
|
jid: Jid {
|
||||||
|
name: Some(Name(room_info.id.into_inner())),
|
||||||
|
server: Server(self.hostname_rooms.clone()),
|
||||||
|
resource: None,
|
||||||
|
},
|
||||||
|
name: None,
|
||||||
|
node: None,
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
_ => vec![],
|
||||||
|
};
|
||||||
|
ItemQuery { item }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::anyhow;
|
|
||||||
use futures_util::future::join_all;
|
use futures_util::future::join_all;
|
||||||
use prometheus::Registry as MetricsRegistry;
|
use prometheus::Registry as MetricsRegistry;
|
||||||
use quick_xml::events::{BytesDecl, Event};
|
use quick_xml::events::{BytesDecl, Event};
|
||||||
|
@ -44,6 +43,7 @@ pub struct ServerConfig {
|
||||||
pub listen_on: SocketAddr,
|
pub listen_on: SocketAddr,
|
||||||
pub cert: PathBuf,
|
pub cert: PathBuf,
|
||||||
pub key: PathBuf,
|
pub key: PathBuf,
|
||||||
|
pub hostname: Str,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct LoadedConfig {
|
struct LoadedConfig {
|
||||||
|
@ -117,11 +117,12 @@ pub async fn launch(
|
||||||
let players = players.clone();
|
let players = players.clone();
|
||||||
let rooms = rooms.clone();
|
let rooms = rooms.clone();
|
||||||
let storage = storage.clone();
|
let storage = storage.clone();
|
||||||
|
let hostname = config.hostname.clone();
|
||||||
let terminator = Terminator::spawn(|termination| {
|
let terminator = Terminator::spawn(|termination| {
|
||||||
let stopped_tx = stopped_tx.clone();
|
let stopped_tx = stopped_tx.clone();
|
||||||
let loaded_config = loaded_config.clone();
|
let loaded_config = loaded_config.clone();
|
||||||
async move {
|
async move {
|
||||||
match handle_socket(loaded_config, stream, &socket_addr, players, rooms, storage, termination).await {
|
match handle_socket(loaded_config, stream, &socket_addr, players, rooms, storage, hostname, termination).await {
|
||||||
Ok(_) => log::info!("Connection terminated"),
|
Ok(_) => log::info!("Connection terminated"),
|
||||||
Err(err) => log::warn!("Connection failed: {err}"),
|
Err(err) => log::warn!("Connection failed: {err}"),
|
||||||
}
|
}
|
||||||
|
@ -156,12 +157,13 @@ pub async fn launch(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_socket(
|
async fn handle_socket(
|
||||||
config: Arc<LoadedConfig>,
|
cert_config: Arc<LoadedConfig>,
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
socket_addr: &SocketAddr,
|
socket_addr: &SocketAddr,
|
||||||
mut players: PlayerRegistry,
|
mut players: PlayerRegistry,
|
||||||
rooms: RoomRegistry,
|
rooms: RoomRegistry,
|
||||||
mut storage: Storage,
|
mut storage: Storage,
|
||||||
|
hostname: Str,
|
||||||
termination: Deferred<()>, // TODO use it to stop the connection gracefully
|
termination: Deferred<()>, // TODO use it to stop the connection gracefully
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
log::info!("Received an XMPP connection from {socket_addr}");
|
log::info!("Received an XMPP connection from {socket_addr}");
|
||||||
|
@ -170,12 +172,12 @@ async fn handle_socket(
|
||||||
let mut buf_reader = BufReader::new(reader);
|
let mut buf_reader = BufReader::new(reader);
|
||||||
let mut buf_writer = BufWriter::new(writer);
|
let mut buf_writer = BufWriter::new(writer);
|
||||||
|
|
||||||
socket_force_tls(&mut buf_reader, &mut buf_writer, &mut reader_buf).await?;
|
socket_force_tls(&mut buf_reader, &mut buf_writer, &mut reader_buf, &hostname).await?;
|
||||||
|
|
||||||
let mut config = tokio_rustls::rustls::ServerConfig::builder()
|
let mut config = tokio_rustls::rustls::ServerConfig::builder()
|
||||||
.with_safe_defaults()
|
.with_safe_defaults()
|
||||||
.with_no_client_auth()
|
.with_no_client_auth()
|
||||||
.with_single_cert(vec![config.cert.clone()], config.key.clone())?;
|
.with_single_cert(vec![cert_config.cert.clone()], cert_config.key.clone())?;
|
||||||
config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new());
|
config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new());
|
||||||
|
|
||||||
log::debug!("Accepting TLS connection...");
|
log::debug!("Accepting TLS connection...");
|
||||||
|
@ -185,7 +187,7 @@ async fn handle_socket(
|
||||||
|
|
||||||
let (a, b) = tokio::io::split(new_stream);
|
let (a, b) = tokio::io::split(new_stream);
|
||||||
let mut xml_reader = NsReader::from_reader(BufReader::new(a));
|
let mut xml_reader = NsReader::from_reader(BufReader::new(a));
|
||||||
let mut xml_writer = Writer::new(b);
|
let mut xml_writer = Writer::new(BufWriter::new(b));
|
||||||
|
|
||||||
pin!(termination);
|
pin!(termination);
|
||||||
select! {
|
select! {
|
||||||
|
@ -194,7 +196,7 @@ async fn handle_socket(
|
||||||
log::info!("Socket handling was terminated");
|
log::info!("Socket handling was terminated");
|
||||||
return Ok(())
|
return Ok(())
|
||||||
},
|
},
|
||||||
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage) => {
|
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage, &hostname) => {
|
||||||
match authenticated {
|
match authenticated {
|
||||||
Ok(authenticated) => {
|
Ok(authenticated) => {
|
||||||
let mut connection = players.connect_to_player(authenticated.player_id.clone()).await;
|
let mut connection = players.connect_to_player(authenticated.player_id.clone()).await;
|
||||||
|
@ -205,6 +207,7 @@ async fn handle_socket(
|
||||||
&authenticated,
|
&authenticated,
|
||||||
&mut connection,
|
&mut connection,
|
||||||
&rooms,
|
&rooms,
|
||||||
|
&hostname,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
},
|
},
|
||||||
|
@ -216,7 +219,7 @@ async fn handle_socket(
|
||||||
}
|
}
|
||||||
|
|
||||||
let a = xml_reader.into_inner().into_inner();
|
let a = xml_reader.into_inner().into_inner();
|
||||||
let b = xml_writer.into_inner();
|
let b = xml_writer.into_inner().into_inner();
|
||||||
a.unsplit(b).shutdown().await?;
|
a.unsplit(b).shutdown().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -225,17 +228,18 @@ async fn socket_force_tls(
|
||||||
reader: &mut (impl AsyncBufRead + Unpin),
|
reader: &mut (impl AsyncBufRead + Unpin),
|
||||||
writer: &mut (impl AsyncWrite + Unpin),
|
writer: &mut (impl AsyncWrite + Unpin),
|
||||||
reader_buf: &mut Vec<u8>,
|
reader_buf: &mut Vec<u8>,
|
||||||
|
hostname: &Str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
use proto_xmpp::tls::*;
|
use proto_xmpp::tls::*;
|
||||||
let xml_reader = &mut NsReader::from_reader(reader);
|
let xml_reader = &mut NsReader::from_reader(reader);
|
||||||
let xml_writer = &mut Writer::new(writer);
|
let xml_writer = &mut Writer::new(writer);
|
||||||
read_xml_header(xml_reader, reader_buf).await?;
|
// TODO validate the server hostname received in the stream start
|
||||||
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
|
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
|
||||||
|
|
||||||
let event = Event::Decl(BytesDecl::new("1.0", None, None));
|
let event = Event::Decl(BytesDecl::new("1.0", None, None));
|
||||||
xml_writer.write_event_async(event).await?;
|
xml_writer.write_event_async(event).await?;
|
||||||
let msg = ServerStreamStart {
|
let msg = ServerStreamStart {
|
||||||
from: "localhost".into(),
|
from: hostname.to_string(),
|
||||||
lang: "en".into(),
|
lang: "en".into(),
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
id: uuid::Uuid::new_v4().to_string(),
|
||||||
version: "1.0".into(),
|
version: "1.0".into(),
|
||||||
|
@ -260,13 +264,14 @@ async fn socket_auth(
|
||||||
xml_writer: &mut Writer<(impl AsyncWrite + Unpin)>,
|
xml_writer: &mut Writer<(impl AsyncWrite + Unpin)>,
|
||||||
reader_buf: &mut Vec<u8>,
|
reader_buf: &mut Vec<u8>,
|
||||||
storage: &mut Storage,
|
storage: &mut Storage,
|
||||||
|
hostname: &Str,
|
||||||
) -> Result<Authenticated> {
|
) -> Result<Authenticated> {
|
||||||
read_xml_header(xml_reader, reader_buf).await?;
|
// TODO validate the server hostname received in the stream start
|
||||||
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
|
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
|
||||||
|
|
||||||
xml_writer.write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))).await?;
|
xml_writer.write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))).await?;
|
||||||
ServerStreamStart {
|
ServerStreamStart {
|
||||||
from: "localhost".into(),
|
from: hostname.to_string(),
|
||||||
lang: "en".into(),
|
lang: "en".into(),
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
id: uuid::Uuid::new_v4().to_string(),
|
||||||
version: "1.0".into(),
|
version: "1.0".into(),
|
||||||
|
@ -284,6 +289,7 @@ async fn socket_auth(
|
||||||
|
|
||||||
let auth: proto_xmpp::sasl::Auth = proto_xmpp::sasl::Auth::parse(xml_reader, reader_buf).await?;
|
let auth: proto_xmpp::sasl::Auth = proto_xmpp::sasl::Auth::parse(xml_reader, reader_buf).await?;
|
||||||
proto_xmpp::sasl::Success.write_xml(xml_writer).await?;
|
proto_xmpp::sasl::Success.write_xml(xml_writer).await?;
|
||||||
|
xml_writer.get_mut().flush().await?;
|
||||||
|
|
||||||
match AuthBody::from_str(&auth.body) {
|
match AuthBody::from_str(&auth.body) {
|
||||||
Ok(logopass) => {
|
Ok(logopass) => {
|
||||||
|
@ -326,13 +332,14 @@ async fn socket_final(
|
||||||
authenticated: &Authenticated,
|
authenticated: &Authenticated,
|
||||||
user_handle: &mut PlayerConnection,
|
user_handle: &mut PlayerConnection,
|
||||||
rooms: &RoomRegistry,
|
rooms: &RoomRegistry,
|
||||||
|
hostname: &Str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
read_xml_header(xml_reader, reader_buf).await?;
|
// TODO validate the server hostname received in the stream start
|
||||||
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
|
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
|
||||||
|
|
||||||
xml_writer.write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))).await?;
|
xml_writer.write_event_async(Event::Decl(BytesDecl::new("1.0", None, None))).await?;
|
||||||
ServerStreamStart {
|
ServerStreamStart {
|
||||||
from: "localhost".into(),
|
from: hostname.to_string(),
|
||||||
lang: "en".into(),
|
lang: "en".into(),
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
id: uuid::Uuid::new_v4().to_string(),
|
||||||
version: "1.0".into(),
|
version: "1.0".into(),
|
||||||
|
@ -358,6 +365,8 @@ async fn socket_final(
|
||||||
user: authenticated,
|
user: authenticated,
|
||||||
user_handle,
|
user_handle,
|
||||||
rooms,
|
rooms,
|
||||||
|
hostname: hostname.clone(),
|
||||||
|
hostname_rooms: format!("rooms.{}", hostname).into(),
|
||||||
};
|
};
|
||||||
let should_recreate_xml_future = select! {
|
let should_recreate_xml_future = select! {
|
||||||
biased;
|
biased;
|
||||||
|
@ -414,12 +423,14 @@ struct XmppConnection<'a> {
|
||||||
user: &'a Authenticated,
|
user: &'a Authenticated,
|
||||||
user_handle: &'a mut PlayerConnection,
|
user_handle: &'a mut PlayerConnection,
|
||||||
rooms: &'a RoomRegistry,
|
rooms: &'a RoomRegistry,
|
||||||
|
hostname: Str,
|
||||||
|
hostname_rooms: Str,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> XmppConnection<'a> {
|
impl<'a> XmppConnection<'a> {
|
||||||
async fn handle_packet(&mut self, output: &mut Vec<Event<'static>>, packet: ClientPacket) -> Result<bool> {
|
async fn handle_packet(&mut self, output: &mut Vec<Event<'static>>, packet: ClientPacket) -> Result<bool> {
|
||||||
let res = match packet {
|
let res = match packet {
|
||||||
proto::ClientPacket::Iq(iq) => {
|
ClientPacket::Iq(iq) => {
|
||||||
self.handle_iq(output, iq).await;
|
self.handle_iq(output, iq).await;
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
@ -427,11 +438,11 @@ impl<'a> XmppConnection<'a> {
|
||||||
self.handle_message(output, m).await?;
|
self.handle_message(output, m).await?;
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
proto::ClientPacket::Presence(p) => {
|
ClientPacket::Presence(p) => {
|
||||||
self.handle_presence(output, p).await?;
|
self.handle_presence(output, p).await?;
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
proto::ClientPacket::StreamEnd => {
|
ClientPacket::StreamEnd => {
|
||||||
ServerStreamEnd.serialize(output);
|
ServerStreamEnd.serialize(output);
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
@ -439,25 +450,3 @@ impl<'a> XmppConnection<'a> {
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_xml_header(
|
|
||||||
xml_reader: &mut NsReader<(impl AsyncBufRead + Unpin)>,
|
|
||||||
reader_buf: &mut Vec<u8>,
|
|
||||||
) -> Result<()> {
|
|
||||||
if let Event::Decl(bytes) = xml_reader.read_event_into_async(reader_buf).await? {
|
|
||||||
// this is <?xml ...> header
|
|
||||||
if let Some(encoding) = bytes.encoding() {
|
|
||||||
let encoding = encoding?;
|
|
||||||
if &*encoding == b"UTF-8" {
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
Err(anyhow!("Unsupported encoding: {encoding:?}"))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Err(fail("No XML encoding provided"))
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Err(anyhow!("Expected XML header"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -18,17 +18,17 @@ impl<'a> XmppConnection<'a> {
|
||||||
resource: _,
|
resource: _,
|
||||||
}) = m.to
|
}) = m.to
|
||||||
{
|
{
|
||||||
if server.0.as_ref() == "rooms.localhost" && m.r#type == MessageType::Groupchat {
|
if server.0.as_ref() == &*self.hostname_rooms && m.r#type == MessageType::Groupchat {
|
||||||
self.user_handle.send_message(RoomId::from(name.0.clone())?, m.body.clone().into()).await?;
|
self.user_handle.send_message(RoomId::from(name.0.clone())?, m.body.clone().into()).await?;
|
||||||
Message::<()> {
|
Message::<()> {
|
||||||
to: Some(Jid {
|
to: Some(Jid {
|
||||||
name: Some(self.user.xmpp_name.clone()),
|
name: Some(self.user.xmpp_name.clone()),
|
||||||
server: Server("localhost".into()),
|
server: Server(self.hostname.clone()),
|
||||||
resource: Some(self.user.xmpp_resource.clone()),
|
resource: Some(self.user.xmpp_resource.clone()),
|
||||||
}),
|
}),
|
||||||
from: Some(Jid {
|
from: Some(Jid {
|
||||||
name: Some(name),
|
name: Some(name),
|
||||||
server: Server("rooms.localhost".into()),
|
server: Server(self.hostname_rooms.clone()),
|
||||||
resource: Some(self.user.xmpp_muc_name.clone()),
|
resource: Some(self.user.xmpp_muc_name.clone()),
|
||||||
}),
|
}),
|
||||||
id: m.id,
|
id: m.id,
|
||||||
|
|
|
@ -16,12 +16,12 @@ impl<'a> XmppConnection<'a> {
|
||||||
Presence::<()> {
|
Presence::<()> {
|
||||||
to: Some(Jid {
|
to: Some(Jid {
|
||||||
name: Some(self.user.xmpp_name.clone()),
|
name: Some(self.user.xmpp_name.clone()),
|
||||||
server: Server("localhost".into()),
|
server: Server(self.hostname.clone()),
|
||||||
resource: Some(self.user.xmpp_resource.clone()),
|
resource: Some(self.user.xmpp_resource.clone()),
|
||||||
}),
|
}),
|
||||||
from: Some(Jid {
|
from: Some(Jid {
|
||||||
name: Some(self.user.xmpp_name.clone()),
|
name: Some(self.user.xmpp_name.clone()),
|
||||||
server: Server("localhost".into()),
|
server: Server(self.hostname.clone()),
|
||||||
resource: Some(self.user.xmpp_resource.clone()),
|
resource: Some(self.user.xmpp_resource.clone()),
|
||||||
}),
|
}),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
|
@ -36,12 +36,12 @@ impl<'a> XmppConnection<'a> {
|
||||||
Presence::<()> {
|
Presence::<()> {
|
||||||
to: Some(Jid {
|
to: Some(Jid {
|
||||||
name: Some(self.user.xmpp_name.clone()),
|
name: Some(self.user.xmpp_name.clone()),
|
||||||
server: Server("localhost".into()),
|
server: Server(self.hostname.clone()),
|
||||||
resource: Some(self.user.xmpp_resource.clone()),
|
resource: Some(self.user.xmpp_resource.clone()),
|
||||||
}),
|
}),
|
||||||
from: Some(Jid {
|
from: Some(Jid {
|
||||||
name: Some(name.clone()),
|
name: Some(name.clone()),
|
||||||
server: Server("rooms.localhost".into()),
|
server: Server(self.hostname_rooms.clone()),
|
||||||
resource: Some(self.user.xmpp_muc_name.clone()),
|
resource: Some(self.user.xmpp_muc_name.clone()),
|
||||||
}),
|
}),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
|
|
|
@ -21,12 +21,12 @@ impl<'a> XmppConnection<'a> {
|
||||||
Message::<()> {
|
Message::<()> {
|
||||||
to: Some(Jid {
|
to: Some(Jid {
|
||||||
name: Some(self.user.xmpp_name.clone()),
|
name: Some(self.user.xmpp_name.clone()),
|
||||||
server: Server("localhost".into()),
|
server: Server(self.hostname.clone()),
|
||||||
resource: Some(self.user.xmpp_resource.clone()),
|
resource: Some(self.user.xmpp_resource.clone()),
|
||||||
}),
|
}),
|
||||||
from: Some(Jid {
|
from: Some(Jid {
|
||||||
name: Some(Name(room_id.into_inner().into())),
|
name: Some(Name(room_id.into_inner().into())),
|
||||||
server: Server("rooms.localhost".into()),
|
server: Server(self.hostname_rooms.clone()),
|
||||||
resource: Some(Resource(author_id.into_inner().into())),
|
resource: Some(Resource(author_id.into_inner().into())),
|
||||||
}),
|
}),
|
||||||
id: None,
|
id: None,
|
||||||
|
|
|
@ -187,6 +187,69 @@ async fn scenario_basic() -> Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scenario_basic_without_headers() -> Result<()> {
|
||||||
|
tracing_subscriber::fmt::try_init();
|
||||||
|
let config = ServerConfig {
|
||||||
|
listen_on: "127.0.0.1:0".parse().unwrap(),
|
||||||
|
cert: "tests/certs/xmpp.pem".parse().unwrap(),
|
||||||
|
key: "tests/certs/xmpp.key".parse().unwrap(),
|
||||||
|
};
|
||||||
|
let mut metrics = MetricsRegistry::new();
|
||||||
|
let mut storage = Storage::open(StorageConfig {
|
||||||
|
db_path: ":memory:".into(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
|
||||||
|
let players = PlayerRegistry::empty(rooms.clone(), &mut metrics).unwrap();
|
||||||
|
let server = launch(config, players, rooms, metrics, storage.clone()).await.unwrap();
|
||||||
|
|
||||||
|
// test scenario
|
||||||
|
|
||||||
|
storage.create_user("tester").await?;
|
||||||
|
storage.set_password("tester", "password").await?;
|
||||||
|
|
||||||
|
let mut stream = TcpStream::connect(server.addr).await?;
|
||||||
|
let mut s = TestScope::new(&mut stream);
|
||||||
|
tracing::info!("TCP connection established");
|
||||||
|
|
||||||
|
s.send(r#"<stream:stream xmlns:stream="http://etherx.jabber.org/streams" to="127.0.0.1" xml:lang="en" xmlns:xml="http://www.w3.org/XML/1998/namespace" xmlns="jabber:client" version="1.0">"#).await?;
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Decl(_) => {});
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Start(b) => assert_eq!(b.local_name().into_inner(), b"stream"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Start(b) => assert_eq!(b.local_name().into_inner(), b"features"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Start(b) => assert_eq!(b.local_name().into_inner(), b"starttls"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Empty(b) => assert_eq!(b.local_name().into_inner(), b"required"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::End(b) => assert_eq!(b.local_name().into_inner(), b"starttls"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::End(b) => assert_eq!(b.local_name().into_inner(), b"features"));
|
||||||
|
s.send(r#"<starttls/>"#).await?;
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Empty(b) => assert_eq!(b.local_name().into_inner(), b"proceed"));
|
||||||
|
let buffer = s.buffer;
|
||||||
|
tracing::info!("TLS feature negotiation complete");
|
||||||
|
|
||||||
|
let connector = TlsConnector::from(Arc::new(
|
||||||
|
ClientConfig::builder()
|
||||||
|
.with_safe_defaults()
|
||||||
|
.with_custom_certificate_verifier(Arc::new(IgnoreCertVerification))
|
||||||
|
.with_no_client_auth(),
|
||||||
|
));
|
||||||
|
tracing::info!("Initiating TLS connection...");
|
||||||
|
let mut stream = connector.connect(ServerName::IpAddress(server.addr.ip()), stream).await?;
|
||||||
|
tracing::info!("TLS connection established");
|
||||||
|
|
||||||
|
let mut s = TestScopeTls::new(&mut stream, buffer);
|
||||||
|
|
||||||
|
s.send(r#"<stream:stream xmlns:stream="http://etherx.jabber.org/streams" to="127.0.0.1" xml:lang="en" xmlns:xml="http://www.w3.org/XML/1998/namespace" xmlns="jabber:client" version="1.0">"#).await?;
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Decl(_) => {});
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Start(b) => assert_eq!(b.local_name().into_inner(), b"stream"));
|
||||||
|
|
||||||
|
stream.shutdown().await?;
|
||||||
|
|
||||||
|
// wrap up
|
||||||
|
|
||||||
|
server.terminate().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn terminate_socket() -> Result<()> {
|
async fn terminate_socket() -> Result<()> {
|
||||||
tracing_subscriber::fmt::try_init();
|
tracing_subscriber::fmt::try_init();
|
||||||
|
|
|
@ -49,7 +49,7 @@ pub enum ClientMessage {
|
||||||
},
|
},
|
||||||
Part {
|
Part {
|
||||||
chan: Chan,
|
chan: Chan,
|
||||||
message: Str,
|
message: Option<Str>,
|
||||||
},
|
},
|
||||||
/// `PRIVMSG <target> :<msg>`
|
/// `PRIVMSG <target> :<msg>`
|
||||||
PrivateMessage {
|
PrivateMessage {
|
||||||
|
@ -194,14 +194,20 @@ fn client_message_topic(input: &str) -> IResult<&str, ClientMessage> {
|
||||||
fn client_message_part(input: &str) -> IResult<&str, ClientMessage> {
|
fn client_message_part(input: &str) -> IResult<&str, ClientMessage> {
|
||||||
let (input, _) = tag("PART ")(input)?;
|
let (input, _) = tag("PART ")(input)?;
|
||||||
let (input, chan) = chan(input)?;
|
let (input, chan) = chan(input)?;
|
||||||
let (input, _) = tag(" ")(input)?;
|
let (input, t) = opt(tag(" "))(input)?;
|
||||||
|
match t {
|
||||||
|
Some(_) => (),
|
||||||
|
None => {
|
||||||
|
return Ok((input, ClientMessage::Part { chan, message: None }));
|
||||||
|
}
|
||||||
|
}
|
||||||
let (input, r) = opt(tag(":"))(input)?;
|
let (input, r) = opt(tag(":"))(input)?;
|
||||||
let (input, message) = match r {
|
let (input, message) = match r {
|
||||||
Some(_) => token(input)?,
|
Some(_) => token(input)?,
|
||||||
None => receiver(input)?,
|
None => receiver(input)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let message = message.into();
|
let message = Some(message.into());
|
||||||
Ok((input, ClientMessage::Part { chan, message }))
|
Ok((input, ClientMessage::Part { chan, message }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,7 +375,18 @@ mod test {
|
||||||
let input = "PART #chan :Pokasiki !!!";
|
let input = "PART #chan :Pokasiki !!!";
|
||||||
let expected = ClientMessage::Part {
|
let expected = ClientMessage::Part {
|
||||||
chan: Chan::Global("chan".into()),
|
chan: Chan::Global("chan".into()),
|
||||||
message: "Pokasiki !!!".into(),
|
message: Some("Pokasiki !!!".into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = client_message(input);
|
||||||
|
assert_matches!(result, Ok(result) => assert_eq!(expected, result));
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_client_message_part_empty() {
|
||||||
|
let input = "PART #chan";
|
||||||
|
let expected = ClientMessage::Part {
|
||||||
|
chan: Chan::Global("chan".into()),
|
||||||
|
message: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = client_message(input);
|
let result = client_message(input);
|
||||||
|
|
|
@ -317,10 +317,15 @@ impl ServerMessageBody {
|
||||||
writer.write_all(b" = ").await?;
|
writer.write_all(b" = ").await?;
|
||||||
chan.write_async(writer).await?;
|
chan.write_async(writer).await?;
|
||||||
writer.write_all(b" :").await?;
|
writer.write_all(b" :").await?;
|
||||||
for member in members {
|
{
|
||||||
|
let member = &members.head;
|
||||||
writer.write_all(member.prefix.to_string().as_bytes()).await?;
|
writer.write_all(member.prefix.to_string().as_bytes()).await?;
|
||||||
writer.write_all(member.nick.as_bytes()).await?;
|
writer.write_all(member.nick.as_bytes()).await?;
|
||||||
|
}
|
||||||
|
for member in &members.tail {
|
||||||
writer.write_all(b" ").await?;
|
writer.write_all(b" ").await?;
|
||||||
|
writer.write_all(member.prefix.to_string().as_bytes()).await?;
|
||||||
|
writer.write_all(member.nick.as_bytes()).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ServerMessageBody::N366NamesReplyEnd { client, chan } => {
|
ServerMessageBody::N366NamesReplyEnd { client, chan } => {
|
||||||
|
|
|
@ -255,6 +255,44 @@ impl MessageType {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Error response to an IQ request.
|
||||||
|
///
|
||||||
|
/// https://xmpp.org/rfcs/rfc6120.html#stanzas-error
|
||||||
|
pub struct IqError {
|
||||||
|
pub r#type: IqErrorType,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum IqErrorType {
|
||||||
|
/// Retry after providing credentials
|
||||||
|
Auth,
|
||||||
|
/// Do not retry (the error cannot be remedied)
|
||||||
|
Cancel,
|
||||||
|
/// Proceed (the condition was only a warning)
|
||||||
|
Continue,
|
||||||
|
/// Retry after changing the data sent
|
||||||
|
Modify,
|
||||||
|
/// Retry after waiting (the error is temporary)
|
||||||
|
Wait,
|
||||||
|
}
|
||||||
|
impl IqErrorType {
|
||||||
|
pub fn as_str(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
IqErrorType::Auth => "auth",
|
||||||
|
IqErrorType::Cancel => "cancel",
|
||||||
|
IqErrorType::Continue => "continue",
|
||||||
|
IqErrorType::Modify => "modify",
|
||||||
|
IqErrorType::Wait => "wait",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToXml for IqError {
|
||||||
|
fn serialize(&self, events: &mut Vec<Event<'static>>) {
|
||||||
|
let bytes = BytesStart::new(format!(r#"error xmlns="{}" type="{}""#, XMLNS, self.r#type.as_str()));
|
||||||
|
events.push(Event::Empty(bytes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Debug)]
|
#[derive(PartialEq, Eq, Debug)]
|
||||||
pub struct Iq<T> {
|
pub struct Iq<T> {
|
||||||
pub from: Option<String>,
|
pub from: Option<String>,
|
||||||
|
|
|
@ -24,7 +24,17 @@ impl ClientStreamStart {
|
||||||
reader: &mut NsReader<impl AsyncBufRead + Unpin>,
|
reader: &mut NsReader<impl AsyncBufRead + Unpin>,
|
||||||
buf: &mut Vec<u8>,
|
buf: &mut Vec<u8>,
|
||||||
) -> Result<ClientStreamStart> {
|
) -> Result<ClientStreamStart> {
|
||||||
let incoming = skip_text!(reader, buf);
|
let mut incoming = skip_text!(reader, buf);
|
||||||
|
if let Event::Decl(bytes) = incoming {
|
||||||
|
// this is <?xml ...> header
|
||||||
|
if let Some(encoding) = bytes.encoding() {
|
||||||
|
let encoding = encoding?;
|
||||||
|
if &*encoding != b"UTF-8" {
|
||||||
|
return Err(anyhow!("Unsupported encoding: {encoding:?}"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
incoming = skip_text!(reader, buf);
|
||||||
|
}
|
||||||
if let Event::Start(e) = incoming {
|
if let Event::Start(e) = incoming {
|
||||||
let (ns, local) = reader.resolve_element(e.name());
|
let (ns, local) = reader.resolve_element(e.name());
|
||||||
if ns != ResolveResult::Bound(Namespace(XMLNS.as_bytes())) {
|
if ns != ResolveResult::Bound(Namespace(XMLNS.as_bytes())) {
|
||||||
|
|
|
@ -19,6 +19,7 @@ server_name = "irc.localhost"
|
||||||
listen_on = "127.0.0.1:5222"
|
listen_on = "127.0.0.1:5222"
|
||||||
cert = "./certs/xmpp.pem"
|
cert = "./certs/xmpp.pem"
|
||||||
key = "./certs/xmpp.key"
|
key = "./certs/xmpp.key"
|
||||||
|
hostname = "localhost"
|
||||||
|
|
||||||
[storage]
|
[storage]
|
||||||
db_path = "db.sqlite"
|
db_path = "db.sqlite"
|
||||||
|
|
Loading…
Reference in New Issue