diff --git a/crates/lavina-core/src/clustering/mod.rs b/crates/lavina-core/src/clustering/mod.rs index 936fb96..1df0be2 100644 --- a/crates/lavina-core/src/clustering/mod.rs +++ b/crates/lavina-core/src/clustering/mod.rs @@ -6,7 +6,7 @@ use prost::Message; use serde::Deserialize; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::spawn; +use tokio::{select, pin, spawn}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use tokio::task::JoinHandle; @@ -75,7 +75,7 @@ impl ClusterRegistry { let handshake_response = read_message::(&mut stream).await?; info!("Connection to the bootstrap server succeeded"); - peer_launch(stream , inner.clone()).await; + peer_launch(stream, inner.clone()).await; } Ok(ClusterRegistry { @@ -93,49 +93,55 @@ impl ClusterRegistry { if let Err(e) = k.send(PeerMsg::Close).await { warn!("Failed to stop peer actor"); } - } Ok(()) } } async fn start_connection_listener(listener: TcpListener, cluster: Arc>) -> Terminator { - Terminator::spawn(|_| async move { + Terminator::spawn(|terminator| async move { + pin!(terminator); loop { - let (mut stream, addr) = match listener.accept().await { - Ok(stream) => stream, - Err(e) => { - warn!("Error accepting connection: {}", e); - continue; + select! { + _ = &mut terminator => break, + res = listener.accept() => { + let (mut stream, addr) = match res { + Ok(stream) => stream, + Err(e) => { + warn!("Error accepting connection: {}", e); + continue; + } + }; + info!("Incoming connection from {addr}"); + // We handle the handshaking in order to not block other incoming connections. + let cluster = cluster.clone(); + spawn(async move { + // All incoming connections expect the handshake to be initiated by the other peer. + // The peer sends a `HandshakeRequest` message and we respond to it with a `HandshakeResponse` message. + // After that a new task is launched for peer communication, which is the same fiber as the one started when we initiate the connection. + let handshake = read_message::(&mut stream).await; + let handshake = match handshake { + Ok(handshake) => handshake, + Err(e) => { + warn!("Error reading from socket: {}", e); + return; + } + }; + let handshake_response = proto::HandshakeResponse {}; + let mut buf = vec![]; + handshake_response.encode(&mut buf).unwrap(); + stream.flush().await; + if let Err(e) = stream.write_all(&buf).await { + warn!("Failed to reply with HandshakeResponse: {}", e); + } else { + info!("Connection from {addr} accepted"); + peer_launch(stream , cluster).await; + } + }); } - }; - info!("Incoming connection from {addr}"); - // We handle the handshaking in order to not block other incoming connections. - let cluster = cluster.clone(); - spawn(async move { - // All incoming connections expect the handshake to be initiated by the other peer. - // The peer sends a `HandshakeRequest` message and we respond to it with a `HandshakeResponse` message. - // After that a new task is launched for peer communication, which is the same fiber as the one started when we initiate the connection. - let handshake = read_message::(&mut stream).await; - let handshake = match handshake { - Ok(handshake) => handshake, - Err(e) => { - warn!("Error reading from socket: {}", e); - return; - } - }; - let handshake_response = proto::HandshakeResponse {}; - let mut buf = vec![]; - handshake_response.encode(&mut buf).unwrap(); - stream.flush().await; - if let Err(e) = stream.write_all(&buf).await { - warn!("Failed to reply with HandshakeResponse: {}", e); - } else { - info!("Connection from {addr} accepted"); - peer_launch(stream , cluster).await; - } - }); + } } + Ok(()) }) }