forked from lavina/lavina
termination usage for socket graceful shutdown
This commit is contained in:
parent
d76e786ceb
commit
07e26ee77a
|
@ -54,7 +54,7 @@ async fn handle_socket(
|
||||||
socket_addr: &SocketAddr,
|
socket_addr: &SocketAddr,
|
||||||
players: PlayerRegistry,
|
players: PlayerRegistry,
|
||||||
rooms: RoomRegistry,
|
rooms: RoomRegistry,
|
||||||
termination: Deferred<()>, // TODO use it to stop the connection gracefully
|
termination: Deferred<()>,
|
||||||
mut storage: Storage,
|
mut storage: Storage,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
log::info!("Received an IRC connection from {socket_addr}");
|
log::info!("Received an IRC connection from {socket_addr}");
|
||||||
|
@ -62,19 +62,24 @@ async fn handle_socket(
|
||||||
let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
|
let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
|
||||||
let mut writer = BufWriter::new(writer);
|
let mut writer = BufWriter::new(writer);
|
||||||
|
|
||||||
let registered_user: Result<RegisteredUser> =
|
pin!(termination);
|
||||||
handle_registration(&mut reader, &mut writer, &mut storage, &config).await;
|
select! {
|
||||||
|
biased;
|
||||||
match registered_user {
|
_ = &mut termination =>{
|
||||||
Ok(user) => {
|
log::info!("Socket handling was terminated");
|
||||||
log::debug!("User registered");
|
return Ok(())
|
||||||
handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?;
|
},
|
||||||
}
|
registered_user = handle_registration(&mut reader, &mut writer, &mut storage, &config) =>
|
||||||
Err(err) => {
|
match registered_user {
|
||||||
log::debug!("Registration failed: {err}");
|
Ok(user) => {
|
||||||
}
|
log::debug!("User registered");
|
||||||
|
handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::debug!("Registration failed: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.shutdown().await?;
|
stream.shutdown().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
|
@ -220,8 +221,9 @@ async fn scenario_cap_short_negotiation() -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn scenario_cap_sasl_fail() -> Result<()> {
|
async fn terminate_socket_scenario() -> Result<()> {
|
||||||
let mut server = TestServer::start().await?;
|
let mut server = TestServer::start().await?;
|
||||||
|
let address: SocketAddr = ("127.0.0.1:0".parse().unwrap());
|
||||||
|
|
||||||
// test scenario
|
// test scenario
|
||||||
|
|
||||||
|
@ -231,38 +233,16 @@ async fn scenario_cap_sasl_fail() -> Result<()> {
|
||||||
let mut stream = TcpStream::connect(server.server.addr).await?;
|
let mut stream = TcpStream::connect(server.server.addr).await?;
|
||||||
let mut s = TestScope::new(&mut stream);
|
let mut s = TestScope::new(&mut stream);
|
||||||
|
|
||||||
s.send("CAP LS 302").await?;
|
|
||||||
s.send("NICK tester").await?;
|
s.send("NICK tester").await?;
|
||||||
s.send("USER UserName 0 * :Real Name").await?;
|
|
||||||
s.expect(":testserver CAP * LS :sasl=PLAIN").await?;
|
|
||||||
s.send("CAP REQ :sasl").await?;
|
s.send("CAP REQ :sasl").await?;
|
||||||
|
s.send("USER UserName 0 * :Real Name").await?;
|
||||||
s.expect(":testserver CAP tester ACK :sasl").await?;
|
s.expect(":testserver CAP tester ACK :sasl").await?;
|
||||||
s.send("AUTHENTICATE SHA256").await?;
|
|
||||||
s.expect(":testserver 904 tester :Unsupported mechanism").await?;
|
|
||||||
s.send("AUTHENTICATE PLAIN").await?;
|
s.send("AUTHENTICATE PLAIN").await?;
|
||||||
s.expect(":testserver AUTHENTICATE +").await?;
|
s.expect(":testserver AUTHENTICATE +").await?;
|
||||||
s.send("AUTHENTICATE dGVzdGVyAHRlc3RlcgBwYXNzd29yZDE=").await?;
|
|
||||||
s.expect(":testserver 904 tester :Bad credentials").await?;
|
|
||||||
s.send("AUTHENTICATE dGVzdGVyAHRlc3RlcgBwYXNzd29yZA==").await?; // base64-encoded 'tester\x00tester\x00password'
|
|
||||||
s.expect(":testserver 900 tester tester tester :You are now logged in as tester").await?;
|
|
||||||
s.expect(":testserver 903 tester :SASL authentication successful").await?;
|
|
||||||
|
|
||||||
s.send("CAP END").await?;
|
|
||||||
|
|
||||||
s.expect(":testserver 001 tester :Welcome to Kek Server").await?;
|
|
||||||
s.expect(":testserver 002 tester :Welcome to Kek Server").await?;
|
|
||||||
s.expect(":testserver 003 tester :Welcome to Kek Server").await?;
|
|
||||||
s.expect(":testserver 004 tester testserver kek-0.1.alpha.3 r CFILPQbcefgijklmnopqrstvz").await?;
|
|
||||||
s.expect(":testserver 005 tester CHANTYPES=# :are supported by this server").await?;
|
|
||||||
s.expect_nothing().await?;
|
|
||||||
s.send("QUIT :Leaving").await?;
|
|
||||||
s.expect(":testserver ERROR :Leaving the server").await?;
|
|
||||||
s.expect_eof().await?;
|
|
||||||
|
|
||||||
stream.shutdown().await?;
|
stream.shutdown().await?;
|
||||||
|
|
||||||
// wrap up
|
|
||||||
|
|
||||||
server.server.terminate().await?;
|
server.server.terminate().await?;
|
||||||
|
|
||||||
|
assert!(TcpStream::connect(&address).await.is_err());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,7 +162,7 @@ async fn handle_socket(
|
||||||
mut players: PlayerRegistry,
|
mut players: PlayerRegistry,
|
||||||
rooms: RoomRegistry,
|
rooms: RoomRegistry,
|
||||||
mut storage: Storage,
|
mut storage: Storage,
|
||||||
termination: Deferred<()>, // TODO use it to stop the connection gracefully
|
termination: Deferred<()>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
log::info!("Received an XMPP connection from {socket_addr}");
|
log::info!("Received an XMPP connection from {socket_addr}");
|
||||||
let mut reader_buf = vec![];
|
let mut reader_buf = vec![];
|
||||||
|
@ -187,18 +187,34 @@ async fn handle_socket(
|
||||||
let mut xml_reader = NsReader::from_reader(BufReader::new(a));
|
let mut xml_reader = NsReader::from_reader(BufReader::new(a));
|
||||||
let mut xml_writer = Writer::new(b);
|
let mut xml_writer = Writer::new(b);
|
||||||
|
|
||||||
let authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage).await?;
|
pin!(termination);
|
||||||
log::debug!("User authenticated");
|
select! {
|
||||||
let mut connection = players.connect_to_player(authenticated.player_id.clone()).await;
|
biased;
|
||||||
socket_final(
|
_ = &mut termination =>{
|
||||||
&mut xml_reader,
|
log::info!("Socket handling was terminated");
|
||||||
&mut xml_writer,
|
return Ok(())
|
||||||
&mut reader_buf,
|
},
|
||||||
&authenticated,
|
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage) => {
|
||||||
&mut connection,
|
match authenticated {
|
||||||
&rooms,
|
Ok(authenticated) => {
|
||||||
)
|
let mut connection = players.connect_to_player(authenticated.player_id.clone()).await;
|
||||||
.await?;
|
socket_final(
|
||||||
|
&mut xml_reader,
|
||||||
|
&mut xml_writer,
|
||||||
|
&mut reader_buf,
|
||||||
|
&authenticated,
|
||||||
|
&mut connection,
|
||||||
|
&rooms,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
log::error!("Authentication error: {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
let a = xml_reader.into_inner().into_inner();
|
let a = xml_reader.into_inner().into_inner();
|
||||||
let b = xml_writer.into_inner();
|
let b = xml_writer.into_inner();
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -184,3 +185,69 @@ async fn scenario_basic() -> Result<()> {
|
||||||
server.terminate().await?;
|
server.terminate().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn terminate_socket() -> Result<()> {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
let config = ServerConfig {
|
||||||
|
listen_on: "127.0.0.1:0".parse().unwrap(),
|
||||||
|
cert: "tests/certs/xmpp.pem".parse().unwrap(),
|
||||||
|
key: "tests/certs/xmpp.key".parse().unwrap(),
|
||||||
|
};
|
||||||
|
let mut metrics = MetricsRegistry::new();
|
||||||
|
let mut storage = Storage::open(StorageConfig {
|
||||||
|
db_path: ":memory:".into(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let rooms = RoomRegistry::new(&mut metrics, storage.clone()).unwrap();
|
||||||
|
let players = PlayerRegistry::empty(rooms.clone(), &mut metrics).unwrap();
|
||||||
|
let server = launch(config, players, rooms, metrics, storage.clone()).await.unwrap();
|
||||||
|
let address: SocketAddr = ("127.0.0.1:0".parse().unwrap());
|
||||||
|
// test scenario
|
||||||
|
|
||||||
|
storage.create_user("tester").await?;
|
||||||
|
storage.set_password("tester", "password").await?;
|
||||||
|
|
||||||
|
let mut stream = TcpStream::connect(server.addr).await?;
|
||||||
|
let mut s = TestScope::new(&mut stream);
|
||||||
|
tracing::info!("TCP connection established");
|
||||||
|
|
||||||
|
s.send(r#"<?xml version="1.0"?>"#).await?;
|
||||||
|
|
||||||
|
s.send(r#"<stream:stream xmlns:stream="http://etherx.jabber.org/streams" to="127.0.0.1" xml:lang="en" xmlns:xml="http://www.w3.org/XML/1998/namespace" xmlns="jabber:client" version="1.0">"#).await?;
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Decl(_) => {});
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Start(b) => assert_eq!(b.local_name().into_inner(), b"stream"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Start(b) => assert_eq!(b.local_name().into_inner(), b"features"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Start(b) => assert_eq!(b.local_name().into_inner(), b"starttls"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Empty(b) => assert_eq!(b.local_name().into_inner(), b"required"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::End(b) => assert_eq!(b.local_name().into_inner(), b"starttls"));
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::End(b) => assert_eq!(b.local_name().into_inner(), b"features"));
|
||||||
|
s.send(r#"<starttls/>"#).await?;
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Empty(b) => assert_eq!(b.local_name().into_inner(), b"proceed"));
|
||||||
|
let buffer = s.buffer;
|
||||||
|
|
||||||
|
let connector = TlsConnector::from(Arc::new(
|
||||||
|
ClientConfig::builder()
|
||||||
|
.with_safe_defaults()
|
||||||
|
.with_custom_certificate_verifier(Arc::new(IgnoreCertVerification))
|
||||||
|
.with_no_client_auth(),
|
||||||
|
));
|
||||||
|
tracing::info!("Initiating TLS connection...");
|
||||||
|
let mut stream = connector.connect(ServerName::IpAddress(server.addr.ip()), stream).await?;
|
||||||
|
tracing::info!("TLS connection established");
|
||||||
|
|
||||||
|
let mut s = TestScopeTls::new(&mut stream, buffer);
|
||||||
|
|
||||||
|
s.send(r#"<?xml version="1.0"?>"#).await?;
|
||||||
|
s.send(r#"<stream:stream xmlns:stream="http://etherx.jabber.org/streams" to="127.0.0.1" xml:lang="en" xmlns:xml="http://www.w3.org/XML/1998/namespace" xmlns="jabber:client" version="1.0">"#).await?;
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Decl(_) => {});
|
||||||
|
assert_matches!(s.next_xml_event().await?, Event::Start(b) => assert_eq!(b.local_name().into_inner(), b"stream"));
|
||||||
|
|
||||||
|
stream.shutdown().await?;
|
||||||
|
server.terminate().await?;
|
||||||
|
|
||||||
|
assert!(TcpStream::connect(&address).await.is_err());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue