forked from lavina/lavina
1
0
Fork 0
This commit is contained in:
Nikita Vilunov 2023-12-18 16:49:41 +01:00
parent 59ff6060c5
commit cfa1e051fb
1 changed files with 42 additions and 36 deletions

View File

@ -6,7 +6,7 @@ use prost::Message;
use serde::Deserialize; use serde::Deserialize;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::spawn; use tokio::{select, pin, spawn};
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
@ -75,7 +75,7 @@ impl ClusterRegistry {
let handshake_response = read_message::<proto::HandshakeResponse>(&mut stream).await?; let handshake_response = read_message::<proto::HandshakeResponse>(&mut stream).await?;
info!("Connection to the bootstrap server succeeded"); info!("Connection to the bootstrap server succeeded");
peer_launch(stream , inner.clone()).await; peer_launch(stream, inner.clone()).await;
} }
Ok(ClusterRegistry { Ok(ClusterRegistry {
@ -93,49 +93,55 @@ impl ClusterRegistry {
if let Err(e) = k.send(PeerMsg::Close).await { if let Err(e) = k.send(PeerMsg::Close).await {
warn!("Failed to stop peer actor"); warn!("Failed to stop peer actor");
} }
} }
Ok(()) Ok(())
} }
} }
async fn start_connection_listener(listener: TcpListener, cluster: Arc<RwLock<ClusterRegistryInner>>) -> Terminator { async fn start_connection_listener(listener: TcpListener, cluster: Arc<RwLock<ClusterRegistryInner>>) -> Terminator {
Terminator::spawn(|_| async move { Terminator::spawn(|terminator| async move {
pin!(terminator);
loop { loop {
let (mut stream, addr) = match listener.accept().await { select! {
Ok(stream) => stream, _ = &mut terminator => break,
Err(e) => { res = listener.accept() => {
warn!("Error accepting connection: {}", e); let (mut stream, addr) = match res {
continue; Ok(stream) => stream,
Err(e) => {
warn!("Error accepting connection: {}", e);
continue;
}
};
info!("Incoming connection from {addr}");
// We handle the handshaking in order to not block other incoming connections.
let cluster = cluster.clone();
spawn(async move {
// All incoming connections expect the handshake to be initiated by the other peer.
// The peer sends a `HandshakeRequest` message and we respond to it with a `HandshakeResponse` message.
// After that a new task is launched for peer communication, which is the same fiber as the one started when we initiate the connection.
let handshake = read_message::<proto::HandshakeRequest>(&mut stream).await;
let handshake = match handshake {
Ok(handshake) => handshake,
Err(e) => {
warn!("Error reading from socket: {}", e);
return;
}
};
let handshake_response = proto::HandshakeResponse {};
let mut buf = vec![];
handshake_response.encode(&mut buf).unwrap();
stream.flush().await;
if let Err(e) = stream.write_all(&buf).await {
warn!("Failed to reply with HandshakeResponse: {}", e);
} else {
info!("Connection from {addr} accepted");
peer_launch(stream , cluster).await;
}
});
} }
}; }
info!("Incoming connection from {addr}");
// We handle the handshaking in order to not block other incoming connections.
let cluster = cluster.clone();
spawn(async move {
// All incoming connections expect the handshake to be initiated by the other peer.
// The peer sends a `HandshakeRequest` message and we respond to it with a `HandshakeResponse` message.
// After that a new task is launched for peer communication, which is the same fiber as the one started when we initiate the connection.
let handshake = read_message::<proto::HandshakeRequest>(&mut stream).await;
let handshake = match handshake {
Ok(handshake) => handshake,
Err(e) => {
warn!("Error reading from socket: {}", e);
return;
}
};
let handshake_response = proto::HandshakeResponse {};
let mut buf = vec![];
handshake_response.encode(&mut buf).unwrap();
stream.flush().await;
if let Err(e) = stream.write_all(&buf).await {
warn!("Failed to reply with HandshakeResponse: {}", e);
} else {
info!("Connection from {addr} accepted");
peer_launch(stream , cluster).await;
}
});
} }
Ok(())
}) })
} }