forked from lavina/lavina
1
0
Fork 0

use scoped for telemetry and all socket connections

This commit is contained in:
Nikita Vilunov 2023-09-06 17:55:24 +02:00
parent 1652333184
commit c114de7dfa
6 changed files with 91 additions and 111 deletions

View File

@ -227,8 +227,7 @@ pub enum Updates {
} }
/// Handle to a player registry — a shared data structure containing information about players. /// Handle to a player registry — a shared data structure containing information about players.
#[derive(Clone)] pub struct PlayerRegistry(RwLock<PlayerRegistryInner>);
pub struct PlayerRegistry(Arc<RwLock<PlayerRegistryInner>>);
impl PlayerRegistry { impl PlayerRegistry {
pub fn empty( pub fn empty(
room_registry: RoomRegistry, room_registry: RoomRegistry,
@ -242,10 +241,10 @@ impl PlayerRegistry {
players: HashMap::new(), players: HashMap::new(),
metric_active_players, metric_active_players,
}; };
Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) Ok(PlayerRegistry(RwLock::new(inner)))
} }
pub async fn get_or_create_player(&mut self, id: PlayerId) -> PlayerHandle { pub async fn get_or_create_player(&self, id: PlayerId) -> PlayerHandle {
let mut inner = self.0.write().unwrap(); let mut inner = self.0.write().unwrap();
if let Some((handle, _)) = inner.players.get(&id) { if let Some((handle, _)) = inner.players.get(&id) {
handle.clone() handle.clone()
@ -257,7 +256,7 @@ impl PlayerRegistry {
} }
} }
pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection { pub async fn connect_to_player(&self, id: PlayerId) -> PlayerConnection {
let player_handle = self.get_or_create_player(id).await; let player_handle = self.get_or_create_player(id).await;
player_handle.subscribe().await player_handle.subscribe().await
} }

View File

@ -33,7 +33,7 @@ impl Storage {
Ok(Storage { conn }) Ok(Storage { conn })
} }
pub async fn retrieve_user_by_name(&mut self, name: &str) -> Result<Option<StoredUser>> { pub async fn retrieve_user_by_name(&self, name: &str) -> Result<Option<StoredUser>> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let res = sqlx::query_as( let res = sqlx::query_as(
"select u.id, u.name, c.password "select u.id, u.name, c.password

View File

@ -55,11 +55,10 @@ async fn main() -> Result<()> {
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let rooms = RoomRegistry::new(&mut metrics, storage.clone())?;
let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?;
let telemetry_terminator =
util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone()).await?;
// unsafe: outer future is never dropped, scope is joined on `scope.collect` // unsafe: outer future is never dropped, scope is joined on `scope.collect`
let mut scope = unsafe { Scope::create() }; let mut scope = unsafe { Scope::create() };
let telemetry_terminator = util::telemetry::launch(telemetry_config, &metrics, &rooms, &mut scope).await?;
let irc = projections::irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?; let irc = projections::irc::launch(&irc_config, &players, &rooms, &metrics, &storage, &mut scope).await?;
let xmpp = projections::xmpp::launch(xmpp_config, &players, &rooms, &metrics, &mut scope).await?; let xmpp = projections::xmpp::launch(xmpp_config, &players, &rooms, &metrics, &mut scope).await?;
tracing::info!("Started"); tracing::info!("Started");
@ -69,10 +68,10 @@ async fn main() -> Result<()> {
tracing::info!("Begin shutdown"); tracing::info!("Begin shutdown");
let _ = xmpp.send(()); let _ = xmpp.send(());
let _ = irc.send(()); let _ = irc.send(());
let _ = telemetry_terminator.send(());
let _ = scope.collect().await; let _ = scope.collect().await;
drop(scope); drop(scope);
telemetry_terminator.terminate().await?;
players.shutdown_all().await?; players.shutdown_all().await?;
drop(players); drop(players);
drop(rooms); drop(rooms);

View File

@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures_util::FutureExt; use futures_util::FutureExt;
use futures_util::future::join_all;
use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry};
use serde::Deserialize; use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
@ -17,7 +16,6 @@ use crate::prelude::*;
use crate::protos::irc::client::{client_message, ClientMessage}; use crate::protos::irc::client::{client_message, ClientMessage};
use crate::protos::irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; use crate::protos::irc::server::{AwayStatus, ServerMessage, ServerMessageBody};
use crate::protos::irc::{Chan, Recipient}; use crate::protos::irc::{Chan, Recipient};
use crate::util::Terminator;
#[cfg(test)] #[cfg(test)]
mod test; mod test;
@ -44,10 +42,10 @@ async fn handle_socket(
config: ServerConfig, config: ServerConfig,
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
players: PlayerRegistry, players: &PlayerRegistry,
rooms: RoomRegistry, rooms: &RoomRegistry,
termination: Deferred<()>, // TODO use it to stop the connection gracefully termination: Deferred<()>, // TODO use it to stop the connection gracefully
mut storage: Storage, storage: &Storage,
) -> Result<()> { ) -> Result<()> {
let (reader, writer) = stream.split(); let (reader, writer) = stream.split();
let mut reader: BufReader<ReadHalf> = BufReader::new(reader); let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
@ -67,7 +65,7 @@ async fn handle_socket(
writer.flush().await?; writer.flush().await?;
let registered_user: Result<RegisteredUser> = let registered_user: Result<RegisteredUser> =
handle_registration(&mut reader, &mut writer, &mut storage).await; handle_registration(&mut reader, &mut writer, &storage).await;
match registered_user { match registered_user {
Ok(user) => { Ok(user) => {
@ -84,7 +82,7 @@ async fn handle_socket(
async fn handle_registration<'a>( async fn handle_registration<'a>(
reader: &mut BufReader<ReadHalf<'a>>, reader: &mut BufReader<ReadHalf<'a>>,
writer: &mut BufWriter<WriteHalf<'a>>, writer: &mut BufWriter<WriteHalf<'a>>,
storage: &mut Storage, storage: &Storage,
) -> Result<RegisteredUser> { ) -> Result<RegisteredUser> {
let mut buffer = vec![]; let mut buffer = vec![];
@ -177,8 +175,8 @@ async fn handle_registration<'a>(
async fn handle_registered_socket<'a>( async fn handle_registered_socket<'a>(
config: ServerConfig, config: ServerConfig,
mut players: PlayerRegistry, players: &PlayerRegistry,
rooms: RoomRegistry, rooms: &RoomRegistry,
reader: &mut BufReader<ReadHalf<'a>>, reader: &mut BufReader<ReadHalf<'a>>,
writer: &mut BufWriter<WriteHalf<'a>>, writer: &mut BufWriter<WriteHalf<'a>>,
user: RegisteredUser, user: RegisteredUser,
@ -716,6 +714,7 @@ pub async fn launch<'a>(
// TODO probably should separate logic for accepting new connection and storing them // TODO probably should separate logic for accepting new connection and storing them
// into two tasks so that they don't block each other // into two tasks so that they don't block each other
let mut actors = HashMap::new(); let mut actors = HashMap::new();
let mut scope = unsafe { Scope::create() };
loop { loop {
select! { select! {
biased; biased;
@ -738,23 +737,22 @@ pub async fn launch<'a>(
continue; continue;
} }
let terminator = Terminator::spawn(|termination| {
let players = players.clone(); let current_connections_clone = current_connections.clone();
let rooms = rooms.clone(); let stopped_tx = stopped_tx.clone();
let current_connections_clone = current_connections.clone();
let stopped_tx = stopped_tx.clone(); let (a, rx) = oneshot();
let storage = storage.clone(); let future = async move {
async move { match handle_socket(config, stream, &socket_addr, players, rooms, rx, storage).await {
match handle_socket(config, stream, &socket_addr, players, rooms, termination, storage).await { Ok(_) => log::info!("Connection terminated"),
Ok(_) => log::info!("Connection terminated"), Err(err) => log::warn!("Connection failed: {err}"),
Err(err) => log::warn!("Connection failed: {err}"),
}
current_connections_clone.dec();
stopped_tx.send(socket_addr).await?;
Ok(())
} }
}); current_connections_clone.dec();
actors.insert(socket_addr, terminator); stopped_tx.send(socket_addr).await?;
Ok(())
};
scope.spawn(future.map(|_: Result<()>| ()));
actors.insert(socket_addr, a);
}, },
Err(err) => log::warn!("Failed to accept new connection: {err}"), Err(err) => log::warn!("Failed to accept new connection: {err}"),
} }
@ -763,15 +761,14 @@ pub async fn launch<'a>(
} }
log::info!("Stopping IRC projection"); log::info!("Stopping IRC projection");
join_all(actors.into_iter().map(|(socket_addr, terminator)| async move { for (socket_addr, terminator) in actors {
log::debug!("Stopping IRC connection at {socket_addr}"); match terminator.send(()) {
match terminator.terminate().await { Ok(_) => log::debug!("Stopping IRC connection at {socket_addr}"),
Ok(_) => log::debug!("Stopped IRC connection at {socket_addr}"), Err(_) => log::debug!("IRC connection at {socket_addr} already stopped")
Err(err) => {
log::warn!("IRC connection to {socket_addr} finished with error: {err}")
}
} }
})).await; }
let _ = scope.collect().await;
drop(scope);
log::info!("Stopped IRC projection"); log::info!("Stopped IRC projection");
Ok(()) Ok(())
}; };

View File

@ -8,7 +8,6 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use futures_util::FutureExt; use futures_util::FutureExt;
use futures_util::future::join_all;
use prometheus::Registry as MetricsRegistry; use prometheus::Registry as MetricsRegistry;
use quick_xml::events::{BytesDecl, Event}; use quick_xml::events::{BytesDecl, Event};
use quick_xml::{NsReader, Writer}; use quick_xml::{NsReader, Writer};
@ -31,7 +30,6 @@ use crate::protos::xmpp::roster::RosterQuery;
use crate::protos::xmpp::session::Session; use crate::protos::xmpp::session::Session;
use crate::protos::xmpp::stream::*; use crate::protos::xmpp::stream::*;
use crate::util::xml::{Continuation, FromXml, Parser, ToXml}; use crate::util::xml::{Continuation, FromXml, Parser, ToXml};
use crate::util::Terminator;
use self::proto::{ClientPacket, IqClientBody}; use self::proto::{ClientPacket, IqClientBody};
@ -82,6 +80,7 @@ pub async fn launch<'a>(
let future = async move { let future = async move {
let (stopped_tx, mut stopped_rx) = channel(32); let (stopped_tx, mut stopped_rx) = channel(32);
let mut actors = HashMap::new(); let mut actors = HashMap::new();
let mut scope = unsafe { Scope::create() };
loop { loop {
select! { select! {
biased; biased;
@ -99,22 +98,20 @@ pub async fn launch<'a>(
// TODO kill the older connection and restart it // TODO kill the older connection and restart it
continue; continue;
} }
let players = players.clone(); let stopped_tx = stopped_tx.clone();
let rooms = rooms.clone(); let loaded_config = loaded_config.clone();
let terminator = Terminator::spawn(|termination| { let (a, rx) = oneshot();
let stopped_tx = stopped_tx.clone(); let future = async move {
let loaded_config = loaded_config.clone(); match handle_socket(loaded_config, stream, &socket_addr, players, rooms, rx).await {
async move {
match handle_socket(loaded_config, stream, &socket_addr, players, rooms, termination).await {
Ok(_) => log::info!("Connection terminated"), Ok(_) => log::info!("Connection terminated"),
Err(err) => log::warn!("Connection failed: {err}"), Err(err) => log::warn!("Connection failed: {err}"),
} }
stopped_tx.send(socket_addr).await?; stopped_tx.send(socket_addr).await?;
Ok(()) Ok(())
} };
}); scope.spawn(future.map(|_: Result<()>| ()));
actors.insert(socket_addr, terminator); actors.insert(socket_addr, a);
}, },
Err(err) => log::warn!("Failed to accept new connection: {err}"), Err(err) => log::warn!("Failed to accept new connection: {err}"),
} }
@ -122,22 +119,14 @@ pub async fn launch<'a>(
} }
} }
log::info!("Stopping XMPP projection"); log::info!("Stopping XMPP projection");
join_all( for (socket_addr, terminator) in actors {
actors match terminator.send(()) {
.into_iter() Ok(_) => log::debug!("Stopping XMPP connection at {socket_addr}"),
.map(|(socket_addr, terminator)| async move { Err(_) => log::debug!("XMPP connection at {socket_addr} already stopped")
log::debug!("Stopping XMPP connection at {socket_addr}"); }
match terminator.terminate().await { }
Ok(_) => log::debug!("Stopped XMPP connection at {socket_addr}"), let _ = scope.collect().await;
Err(err) => { drop(scope);
log::warn!(
"XMPP connection to {socket_addr} finished with error: {err}"
)
}
}
}),
)
.await;
log::info!("Stopped XMPP projection"); log::info!("Stopped XMPP projection");
Ok(()) Ok(())
}; };
@ -150,8 +139,8 @@ async fn handle_socket(
config: Arc<LoadedConfig>, config: Arc<LoadedConfig>,
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
mut players: PlayerRegistry, players: &PlayerRegistry,
rooms: RoomRegistry, rooms: &RoomRegistry,
termination: Deferred<()>, // TODO use it to stop the connection gracefully termination: Deferred<()>, // TODO use it to stop the connection gracefully
) -> Result<()> { ) -> Result<()> {
log::debug!("Received an XMPP connection from {socket_addr}"); log::debug!("Received an XMPP connection from {socket_addr}");

View File

@ -15,7 +15,6 @@ use crate::core::room::RoomRegistry;
use crate::prelude::*; use crate::prelude::*;
use crate::util::http::*; use crate::util::http::*;
use crate::util::Terminator;
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>; type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>;
type HttpResult<T> = std::result::Result<T, Infallible>; type HttpResult<T> = std::result::Result<T, Infallible>;
@ -25,51 +24,48 @@ pub struct ServerConfig {
pub listen_on: SocketAddr, pub listen_on: SocketAddr,
} }
pub async fn launch( pub async fn launch<'a>(
config: ServerConfig, config: ServerConfig,
metrics: MetricsRegistry, metrics: &'a MetricsRegistry,
rooms: RoomRegistry, rooms: &'a RoomRegistry,
) -> Result<Terminator> { scope: &mut Scope<'a>,
) -> Result<Promise<()>> {
log::info!("Starting the telemetry service"); log::info!("Starting the telemetry service");
let listener = TcpListener::bind(config.listen_on).await?; let listener = TcpListener::bind(config.listen_on).await?;
log::debug!("Listener started"); log::debug!("Listener started");
let terminator = Terminator::spawn(|rx| main_loop(listener, metrics, rooms, rx.map(|_| ())));
Ok(terminator)
}
async fn main_loop( let (signal, mut rx) = oneshot();
listener: TcpListener,
metrics: MetricsRegistry, let future = async move {
rooms: RoomRegistry, let mut scope = unsafe { Scope::create() };
termination: impl Future<Output = ()>, loop {
) -> Result<()> { select! {
pin!(termination); biased;
loop { _ = &mut rx => break,
select! { result = listener.accept() => {
biased; let (stream, _) = result?;
_ = &mut termination => break, scope.spawn(async move {
result = listener.accept() => { let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(metrics, rooms, r)));
let (stream, _) = result?; if let Err(err) = server.await {
let metrics = metrics.clone(); tracing::error!("Error serving connection: {:?}", err);
let rooms = rooms.clone(); }
tokio::task::spawn(async move { });
let registry = metrics.clone(); },
let rooms = rooms.clone(); }
let server = http1::Builder::new().serve_connection(stream, service_fn(move |r| route(registry.clone(), rooms.clone(), r)));
if let Err(err) = server.await {
tracing::error!("Error serving connection: {:?}", err);
}
});
},
} }
} let _ = scope.collect().await;
log::info!("Terminating the telemetry service"); drop(scope);
Ok(()) log::info!("Terminating the telemetry service");
Ok(())
};
scope.spawn(future.map(|_: Result<()>| ()));
Ok(signal)
} }
async fn route( async fn route(
registry: MetricsRegistry, registry: &MetricsRegistry,
rooms: RoomRegistry, rooms: &RoomRegistry,
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
) -> std::result::Result<Response<BoxBody>, Infallible> { ) -> std::result::Result<Response<BoxBody>, Infallible> {
match (request.method(), request.uri().path()) { match (request.method(), request.uri().path()) {
@ -79,7 +75,7 @@ async fn route(
} }
} }
fn endpoint_metrics(registry: MetricsRegistry) -> HttpResult<Response<Full<Bytes>>> { fn endpoint_metrics(registry: &MetricsRegistry) -> HttpResult<Response<Full<Bytes>>> {
let mf = registry.gather(); let mf = registry.gather();
let mut buffer = vec![]; let mut buffer = vec![];
TextEncoder TextEncoder
@ -88,7 +84,7 @@ fn endpoint_metrics(registry: MetricsRegistry) -> HttpResult<Response<Full<Bytes
Ok(Response::new(Full::new(Bytes::from(buffer)))) Ok(Response::new(Full::new(Bytes::from(buffer))))
} }
async fn endpoint_rooms(rooms: RoomRegistry) -> HttpResult<Response<Full<Bytes>>> { async fn endpoint_rooms(rooms: &RoomRegistry) -> HttpResult<Response<Full<Bytes>>> {
let room_list = rooms.get_all_rooms().await; let room_list = rooms.get_all_rooms().await;
let mut buffer = vec![]; let mut buffer = vec![];
serde_json::to_writer(&mut buffer, &room_list).expect("unexpected fail when writing to vec"); serde_json::to_writer(&mut buffer, &room_list).expect("unexpected fail when writing to vec");