rewrite server message writes as async, introduce error handling

This commit is contained in:
Nikita Vilunov 2023-02-10 22:27:29 +01:00
parent 69406cb33b
commit d5d0c6e73e
2 changed files with 122 additions and 137 deletions

View File

@ -31,14 +31,12 @@ async fn handle_socket(
mut stream: TcpStream,
socket_addr: SocketAddr,
mut players: PlayerRegistry,
current_connections: IntGauge,
) {
) -> Result<()> {
let (reader, writer) = stream.split();
let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
{
let notice = ServerMessage {
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::Notice {
@ -46,12 +44,10 @@ async fn handle_socket(
rest_targets: vec![],
text: b"Welcome to my server!".to_vec(),
},
};
let mut buffer = vec![];
notice.write(&mut buffer).unwrap();
writer.write_all(buffer.as_slice()).await;
writer.flush().await;
}
.write_async(&mut writer)
.await?;
writer.flush().await?;
let mut buffer = vec![];
@ -113,12 +109,13 @@ async fn handle_socket(
};
match registered_user {
Ok(user) => {
handle_registered_socket(config, socket_addr, players, reader, writer, user).await
handle_registered_socket(config, socket_addr, players, reader, writer, user).await?;
}
Err(_) => {}
}
current_connections.dec();
stream.shutdown().await?;
Ok(())
}
async fn handle_registered_socket<'a>(
@ -128,57 +125,41 @@ async fn handle_registered_socket<'a>(
mut reader: BufReader<ReadHalf<'a>>,
mut writer: BufWriter<WriteHalf<'a>>,
user: RegisteredUser,
) {
) -> Result<()> {
let mut buffer = vec![];
log::info!("Handling registered user: {user:?}");
{
let notice = ServerMessage {
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N001Welcome {
client: user.nickname.clone(),
text: b"Welcome to Kek Server".to_vec(),
},
};
let mut buffer = vec![];
notice.write(&mut buffer).unwrap();
writer.write_all(buffer.as_slice()).await;
writer.flush().await;
}
{
let notice = ServerMessage {
.write_async(&mut writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N002YourHost {
client: user.nickname.clone(),
text: b"Welcome to Kek Server".to_vec(),
},
};
let mut buffer = vec![];
notice.write(&mut buffer).unwrap();
writer.write_all(buffer.as_slice()).await;
writer.flush().await;
}
{
let notice = ServerMessage {
.write_async(&mut writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N003Created {
client: user.nickname.clone(),
text: b"Welcome to Kek Server".to_vec(),
},
};
let mut buffer = vec![];
notice.write(&mut buffer).unwrap();
writer.write_all(buffer.as_slice()).await;
writer.flush().await;
}
{
let notice = ServerMessage {
.write_async(&mut writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N004MyInfo {
@ -186,27 +167,20 @@ async fn handle_registered_socket<'a>(
hostname: config.server_name.as_bytes().to_vec(),
softname: b"kek-0.1.alpha.3".to_vec(),
},
};
let mut buffer = vec![];
notice.write(&mut buffer).unwrap();
writer.write_all(buffer.as_slice()).await;
writer.flush().await;
}
{
let notice = ServerMessage {
.write_async(&mut writer)
.await?;
ServerMessage {
tags: vec![],
sender: Some(config.server_name.as_bytes().to_vec()),
body: ServerMessageBody::N005ISupport {
client: user.nickname.clone(),
params: b"CHANTYPES=#".to_vec(),
},
};
let mut buffer = vec![];
notice.write(&mut buffer).unwrap();
writer.write_all(buffer.as_slice()).await;
writer.flush().await;
}
.write_async(&mut writer)
.await?;
writer.flush().await?;
loop {
select! {
@ -224,18 +198,15 @@ async fn handle_registered_socket<'a>(
Ok((rest, msg)) => {
match msg {
ClientMessage::Ping { token } => {
let response = ServerMessage {
ServerMessage {
tags: vec![],
sender: None,
body: ServerMessageBody::Pong {
from: config.server_name.as_bytes().to_vec(),
token,
}
};
let mut buffer = vec![];
response.write(&mut buffer).unwrap();
writer.write_all(buffer.as_slice()).await;
writer.flush().await;
}.write_async(&mut writer).await?;
writer.flush().await?;
},
_ => {},
}
@ -248,6 +219,7 @@ async fn handle_registered_socket<'a>(
},
}
}
Ok(())
}
pub async fn launch(
@ -281,7 +253,15 @@ pub async fn launch(
total_connections.inc();
current_connections.inc();
log::debug!("Incoming connection from {socket_addr}");
let handle = tokio::task::spawn(handle_socket(config, stream, socket_addr, players.clone(), current_connections.clone()));
let players_clone = players.clone();
let current_connections_clone = current_connections.clone();
let handle = tokio::task::spawn(async move {
match handle_socket(config, stream, socket_addr, players_clone).await {
Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"),
}
current_connections_clone.dec();
});
},
Err(err) => log::warn!("Failed to accept new connection: {err}"),
}

View File

@ -1,3 +1,6 @@
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use super::*;
/// Server-to-client message.
@ -11,17 +14,17 @@ pub struct ServerMessage {
}
impl ServerMessage {
pub fn write(&self, writer: &mut impl Write) -> std::io::Result<()> {
pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> {
match &self.sender {
Some(ref sender) => {
writer.write(b":")?;
writer.write(sender.as_slice())?;
writer.write(b" ")?;
writer.write_all(b":").await?;
writer.write_all(sender.as_slice()).await?;
writer.write_all(b" ").await?;
}
None => {}
}
self.body.write(writer)?;
writer.write(b"\n")?;
self.body.write_async(writer).await?;
writer.write_all(b"\n").await?;
Ok(())
}
}
@ -77,66 +80,68 @@ pub enum ServerMessageBody {
}
impl ServerMessageBody {
pub fn write(&self, writer: &mut impl Write) -> std::io::Result<()> {
pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> {
match self {
ServerMessageBody::Notice {
first_target,
rest_targets,
text,
} => {
writer.write(b"NOTICE ")?;
writer.write(&first_target)?;
writer.write(b" :")?;
writer.write(&text)?;
writer.write_all(b"NOTICE ").await?;
writer.write_all(&first_target).await?;
writer.write_all(b" :").await?;
writer.write_all(&text).await?;
}
ServerMessageBody::Ping { token } => {
writer.write(b"PING ")?;
writer.write(&token)?;
writer.write_all(b"PING ").await?;
writer.write_all(&token).await?;
}
ServerMessageBody::Pong { from, token } => {
writer.write(b"PONG ")?;
writer.write(&from)?;
writer.write(b" :")?;
writer.write(&token)?;
writer.write_all(b"PONG ").await?;
writer.write_all(&from).await?;
writer.write_all(b" :").await?;
writer.write_all(&token).await?;
}
ServerMessageBody::N001Welcome { client, text } => {
writer.write(b"001 ")?;
writer.write(&client)?;
writer.write(b" :")?;
writer.write(text)?;
writer.write_all(b"001 ").await?;
writer.write_all(&client).await?;
writer.write_all(b" :").await?;
writer.write_all(text).await?;
}
ServerMessageBody::N002YourHost { client, text } => {
writer.write(b"002 ")?;
writer.write(&client)?;
writer.write(b" :")?;
writer.write(text)?;
writer.write_all(b"002 ").await?;
writer.write_all(&client).await?;
writer.write_all(b" :").await?;
writer.write_all(text).await?;
}
ServerMessageBody::N003Created { client, text } => {
writer.write(b"003 ")?;
writer.write(&client)?;
writer.write(b" :")?;
writer.write(text)?;
writer.write_all(b"003 ").await?;
writer.write_all(&client).await?;
writer.write_all(b" :").await?;
writer.write_all(text).await?;
}
ServerMessageBody::N004MyInfo {
client,
hostname,
softname,
} => {
writer.write(b"004 ")?;
writer.write(&client)?;
writer.write(b" ")?;
writer.write(&hostname)?;
writer.write(b" ")?;
writer.write(&softname)?;
writer.write(b" DGMQRSZagiloswz CFILPQbcefgijklmnopqrstvz bkloveqjfI")?;
writer.write_all(b"004 ").await?;
writer.write_all(&client).await?;
writer.write_all(b" ").await?;
writer.write_all(&hostname).await?;
writer.write_all(b" ").await?;
writer.write_all(&softname).await?;
writer
.write_all(b" DGMQRSZagiloswz CFILPQbcefgijklmnopqrstvz bkloveqjfI")
.await?;
// TODO remove hardcoded modes
}
ServerMessageBody::N005ISupport { client, params } => {
writer.write(b"005 ")?;
writer.write(&client)?;
writer.write(b" ")?;
writer.write(&params)?;
writer.write(b" :are supported by this server")?;
writer.write_all(b"005 ").await?;
writer.write_all(&client).await?;
writer.write_all(b" ").await?;
writer.write_all(&params).await?;
writer.write_all(b" :are supported by this server").await?;
}
}
Ok(())