forked from lavina/lavina
1
0
Fork 0
lavina/src/projections/irc.rs

127 lines
4.4 KiB
Rust
Raw Normal View History

2023-02-07 15:21:00 +00:00
use std::net::SocketAddr;
2023-02-09 19:01:21 +00:00
use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry};
use serde::Deserialize;
2023-02-10 17:09:29 +00:00
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
2023-02-07 15:21:00 +00:00
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot::channel;
2023-02-09 19:01:21 +00:00
use crate::core::player::PlayerRegistry;
2023-02-07 15:21:00 +00:00
use crate::prelude::*;
2023-02-10 17:09:29 +00:00
use crate::protos::irc::client::{client_message, ClientMessage};
use crate::protos::irc::server::{ServerMessage, ServerMessageBody};
2023-02-07 15:21:00 +00:00
use crate::util::Terminator;
2023-02-10 17:09:29 +00:00
#[derive(Deserialize, Debug, Clone)]
2023-02-07 15:21:00 +00:00
pub struct ServerConfig {
pub listen_on: SocketAddr,
2023-02-10 17:09:29 +00:00
pub server_name: String,
2023-02-07 15:21:00 +00:00
}
async fn handle_socket(
2023-02-10 17:09:29 +00:00
config: ServerConfig,
2023-02-07 15:21:00 +00:00
mut stream: TcpStream,
socket_addr: SocketAddr,
mut players: PlayerRegistry,
2023-02-09 19:01:21 +00:00
current_connections: IntGauge,
2023-02-07 15:21:00 +00:00
) {
let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
let mut buffer = vec![];
// let (player_id, mut player_handle) = players.create_player().await;
// let mut sub = player_handle.subscribe().await;
loop {
select! {
biased;
res = reader.read_until(b'\n', &mut buffer) => {
match res {
Ok(len) => if len == 0 {
log::info!("Terminating socket");
break;
},
Err(err) => log::warn!("Failed to read from socket: {err}"),
}
2023-02-10 17:09:29 +00:00
let parsed = client_message(&buffer[..]);
2023-02-07 15:21:00 +00:00
match parsed {
Ok((rest, msg)) => {
log::info!("Incoming IRC message: {msg:?}");
2023-02-10 17:09:29 +00:00
match msg {
ClientMessage::Ping { token } => {
let response = ServerMessage {
tags: vec![],
sender: None,
body: ServerMessageBody::Pong {
from: config.server_name.as_bytes().to_vec(),
token,
}
};
let mut buffer = vec![];
response.write(&mut buffer).unwrap();
writer.write_all(buffer.as_slice()).await;
writer.flush().await;
},
_ => {},
}
2023-02-07 15:21:00 +00:00
},
Err(err) => {
log::warn!("Failed to parse IRC message: {err}");
},
}
buffer.clear();
},
}
}
2023-02-09 19:01:21 +00:00
current_connections.dec();
2023-02-07 15:21:00 +00:00
}
2023-02-09 19:01:21 +00:00
pub async fn launch(
2023-02-10 17:09:29 +00:00
config: ServerConfig,
2023-02-09 19:01:21 +00:00
players: PlayerRegistry,
metrics: MetricsRegistry,
) -> Result<Terminator> {
2023-02-07 15:21:00 +00:00
log::info!("Starting IRC projection");
let (signal, mut rx) = channel();
2023-02-09 19:01:21 +00:00
let current_connections =
IntGauge::new("irc_current_connections", "Open and alive TCP connections")?;
let total_connections = IntCounter::new(
"irc_total_connections",
"Total number of opened connections",
)?;
metrics.register(Box::new(current_connections.clone()))?;
metrics.register(Box::new(total_connections.clone()))?;
2023-02-07 15:21:00 +00:00
let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started");
let handle = tokio::task::spawn(async move {
loop {
select! {
biased;
_ = &mut rx => break,
new_conn = listener.accept() => {
match new_conn {
Ok((stream, socket_addr)) => {
2023-02-10 17:09:29 +00:00
let config = config.clone();
2023-02-09 19:01:21 +00:00
total_connections.inc();
current_connections.inc();
2023-02-07 15:21:00 +00:00
log::debug!("Incoming connection from {socket_addr}");
2023-02-10 17:09:29 +00:00
let handle = tokio::task::spawn(handle_socket(config, stream, socket_addr, players.clone(), current_connections.clone()));
2023-02-07 15:21:00 +00:00
},
Err(err) => log::warn!("Failed to accept new connection: {err}"),
}
},
}
}
log::info!("Stopping IRC projection");
Ok(())
});
let terminator = Terminator::from_raw(signal, handle);
Ok(terminator)
}