forked from lavina/lavina
1
0
Fork 0

de-arcify Storage

This commit is contained in:
Nikita Vilunov 2023-09-06 19:10:27 +02:00
parent 99980720be
commit 2694936ca5
7 changed files with 53 additions and 54 deletions

View File

@ -102,7 +102,7 @@ impl PlayerConnection {
} }
/// Handle to a player actor. /// Handle to a player actor.
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct PlayerHandle { pub struct PlayerHandle {
tx: Sender<PlayerCommand>, tx: Sender<PlayerCommand>,
} }
@ -148,7 +148,9 @@ impl PlayerHandle {
} }
async fn send(&self, command: PlayerCommand) { async fn send(&self, command: PlayerCommand) {
let _ = self.tx.send(command).await; if let Err(e) = self.tx.send(command).await {
log::warn!("Failed to send command to a player: {e:?}");
}
} }
pub async fn update(&self, update: Updates) { pub async fn update(&self, update: Updates) {
@ -156,6 +158,7 @@ impl PlayerHandle {
} }
} }
#[derive(Debug)]
enum PlayerCommand { enum PlayerCommand {
/** Commands from connections */ /** Commands from connections */
AddConnection { AddConnection {
@ -168,8 +171,10 @@ enum PlayerCommand {
GetRooms(Promise<Vec<RoomInfo>>), GetRooms(Promise<Vec<RoomInfo>>),
/** Events from rooms */ /** Events from rooms */
Update(Updates), Update(Updates),
Stop,
} }
#[derive(Debug)]
pub enum Cmd { pub enum Cmd {
JoinRoom { JoinRoom {
room_id: RoomId, room_id: RoomId,
@ -191,6 +196,7 @@ pub enum Cmd {
}, },
} }
#[derive(Debug)]
pub enum JoinResult { pub enum JoinResult {
Success(RoomInfo), Success(RoomInfo),
Banned, Banned,
@ -224,7 +230,7 @@ pub enum Updates {
pub struct PlayerRegistry<'a>(RwLock<PlayerRegistryInner<'a>>); pub struct PlayerRegistry<'a>(RwLock<PlayerRegistryInner<'a>>);
impl<'a> PlayerRegistry<'a> { impl<'a> PlayerRegistry<'a> {
pub fn empty( pub fn empty(
room_registry: &'a RoomRegistry, room_registry: &'a RoomRegistry<'a>,
metrics: &MetricsRegistry, metrics: &MetricsRegistry,
) -> Result<PlayerRegistry<'a>> { ) -> Result<PlayerRegistry<'a>> {
let metric_active_players = let metric_active_players =
@ -259,9 +265,8 @@ impl<'a> PlayerRegistry<'a> {
pub async fn shutdown_all(self) -> Result<()> { pub async fn shutdown_all(self) -> Result<()> {
let mut inner = self.0.write().unwrap(); let mut inner = self.0.write().unwrap();
let mut players = HashMap::new();
std::mem::swap(&mut players, &mut inner.players);
for (i, k) in inner.players.drain() { for (i, k) in inner.players.drain() {
k.send(PlayerCommand::Stop).await;
drop(k); drop(k);
log::debug!("Stopping player #{i:?}") log::debug!("Stopping player #{i:?}")
} }
@ -273,7 +278,7 @@ impl<'a> PlayerRegistry<'a> {
/// The player registry state representation. /// The player registry state representation.
struct PlayerRegistryInner<'a> { struct PlayerRegistryInner<'a> {
room_registry: &'a RoomRegistry, room_registry: &'a RoomRegistry<'a>,
players: HashMap<PlayerId, PlayerHandle>, players: HashMap<PlayerId, PlayerHandle>,
metric_active_players: IntGauge, metric_active_players: IntGauge,
scope: Scope<'a>, scope: Scope<'a>,
@ -283,14 +288,14 @@ struct PlayerRegistryInner<'a> {
struct Player<'a> { struct Player<'a> {
player_id: PlayerId, player_id: PlayerId,
connections: AnonTable<Sender<Updates>>, connections: AnonTable<Sender<Updates>>,
my_rooms: HashMap<RoomId, RoomHandle>, my_rooms: HashMap<RoomId, RoomHandle<'a>>,
banned_from: HashSet<RoomId>, banned_from: HashSet<RoomId>,
rx: Receiver<PlayerCommand>, rx: Receiver<PlayerCommand>,
handle: PlayerHandle, handle: PlayerHandle,
rooms: &'a RoomRegistry, rooms: &'a RoomRegistry<'a>,
} }
impl<'a> Player<'a> { impl<'a> Player<'a> {
fn launch(player_id: PlayerId, rooms: &'a RoomRegistry, scope: &mut Scope<'a>) -> PlayerHandle { fn launch(player_id: PlayerId, rooms: &'a RoomRegistry<'a>, scope: &mut Scope<'a>) -> PlayerHandle {
let (tx, rx) = channel(32); let (tx, rx) = channel(32);
let handle = PlayerHandle { tx }; let handle = PlayerHandle { tx };
let handle_clone = handle.clone(); let handle_clone = handle.clone();
@ -344,6 +349,7 @@ impl<'a> Player<'a> {
} }
} }
PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await,
PlayerCommand::Stop => { break; }
} }
} }
log::debug!("Shutting down player actor #{:?}", self.player_id); log::debug!("Shutting down player actor #{:?}", self.player_id);

View File

@ -1,7 +1,6 @@
//! Storage and persistence logic. //! Storage and persistence logic.
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use serde::Deserialize; use serde::Deserialize;
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
@ -15,9 +14,8 @@ pub struct StorageConfig {
pub db_path: String, pub db_path: String,
} }
#[derive(Clone)]
pub struct Storage { pub struct Storage {
conn: Arc<Mutex<SqliteConnection>>, conn: Mutex<SqliteConnection>,
} }
impl Storage { impl Storage {
pub async fn open(config: StorageConfig) -> Result<Storage> { pub async fn open(config: StorageConfig) -> Result<Storage> {
@ -29,7 +27,7 @@ impl Storage {
migrator.run(&mut conn).await?; migrator.run(&mut conn).await?;
log::info!("Migrations passed"); log::info!("Migrations passed");
let conn = Arc::new(Mutex::new(conn)); let conn = Mutex::new(conn);
Ok(Storage { conn }) Ok(Storage { conn })
} }
@ -47,7 +45,7 @@ impl Storage {
Ok(res) Ok(res)
} }
pub async fn retrieve_room_by_name(&mut self, name: &str) -> Result<Option<StoredRoom>> { pub async fn retrieve_room_by_name(&self, name: &str) -> Result<Option<StoredRoom>> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let res = sqlx::query_as( let res = sqlx::query_as(
"select id, name, topic, message_count "select id, name, topic, message_count
@ -61,7 +59,7 @@ impl Storage {
Ok(res) Ok(res)
} }
pub async fn create_new_room(&mut self, name: &str, topic: &str) -> Result<u32> { pub async fn create_new_room(&self, name: &str, topic: &str) -> Result<u32> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let (id,): (u32,) = sqlx::query_as( let (id,): (u32,) = sqlx::query_as(
"insert into rooms(name, topic) "insert into rooms(name, topic)
@ -76,7 +74,7 @@ impl Storage {
Ok(id) Ok(id)
} }
pub async fn insert_message(&mut self, room_id: u32, id: u32, content: &str) -> Result<()> { pub async fn insert_message(&self, room_id: u32, id: u32, content: &str) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
sqlx::query( sqlx::query(
"insert into messages(room_id, id, content) "insert into messages(room_id, id, content)
@ -93,12 +91,8 @@ impl Storage {
Ok(()) Ok(())
} }
pub async fn close(mut self) -> Result<()> { pub async fn close(self) -> Result<()> {
let res = match Arc::try_unwrap(self.conn) { let res = self.conn.into_inner();
Ok(e) => e,
Err(e) => return Err(fail("failed to acquire DB ownership on shutdown")),
};
let res = res.into_inner();
res.close().await?; res.close().await?;
Ok(()) Ok(())
} }

View File

@ -38,9 +38,9 @@ impl RoomId {
} }
/// Shared datastructure for storing metadata about rooms. /// Shared datastructure for storing metadata about rooms.
pub struct RoomRegistry(AsyncRwLock<RoomRegistryInner>); pub struct RoomRegistry<'a>(AsyncRwLock<RoomRegistryInner<'a>>);
impl RoomRegistry { impl<'a> RoomRegistry<'a> {
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> { pub fn new(metrics: &mut MetricRegistry, storage: &'a Storage) -> Result<RoomRegistry<'a>> {
let metric_active_rooms = let metric_active_rooms =
IntGauge::new("chat_rooms_active", "Number of alive room actors")?; IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
metrics.register(Box::new(metric_active_rooms.clone()))?; metrics.register(Box::new(metric_active_rooms.clone()))?;
@ -52,7 +52,7 @@ impl RoomRegistry {
Ok(RoomRegistry(AsyncRwLock::new(inner))) Ok(RoomRegistry(AsyncRwLock::new(inner)))
} }
pub async fn get_or_create_room(&self, room_id: RoomId) -> Result<RoomHandle> { pub async fn get_or_create_room(&self, room_id: RoomId) -> Result<RoomHandle<'a>> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
if let Some(room_handle) = inner.rooms.get(&room_id) { if let Some(room_handle) = inner.rooms.get(&room_id) {
// room was already loaded into memory // room was already loaded into memory
@ -67,7 +67,7 @@ impl RoomRegistry {
subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions
topic: stored_room.topic.into(), topic: stored_room.topic.into(),
message_count: stored_room.message_count, message_count: stored_room.message_count,
storage: inner.storage.clone(), storage: inner.storage,
}; };
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone()); inner.rooms.insert(room_id, room_handle.clone());
@ -84,7 +84,7 @@ impl RoomRegistry {
subscriptions: HashMap::new(), subscriptions: HashMap::new(),
topic: topic.into(), topic: topic.into(),
message_count: 0, message_count: 0,
storage: inner.storage.clone(), storage: inner.storage,
}; };
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone()); inner.rooms.insert(room_id, room_handle.clone());
@ -93,7 +93,7 @@ impl RoomRegistry {
} }
} }
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> { pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle<'a>> {
let inner = self.0.read().await; let inner = self.0.read().await;
let res = inner.rooms.get(room_id); let res = inner.rooms.get(room_id);
res.map(|r| r.clone()) res.map(|r| r.clone())
@ -113,15 +113,15 @@ impl RoomRegistry {
} }
} }
struct RoomRegistryInner { struct RoomRegistryInner<'a> {
rooms: HashMap<RoomId, RoomHandle>, rooms: HashMap<RoomId, RoomHandle<'a>>,
metric_active_rooms: IntGauge, metric_active_rooms: IntGauge,
storage: Storage, storage: &'a Storage,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct RoomHandle(Arc<AsyncRwLock<Room>>); pub struct RoomHandle<'a>(Arc<AsyncRwLock<Room<'a>>>);
impl RoomHandle { impl<'a> RoomHandle<'a> {
pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) { pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
lock.add_subscriber(player_id, player_handle).await; lock.add_subscriber(player_id, player_handle).await;
@ -169,15 +169,15 @@ impl RoomHandle {
} }
} }
struct Room { struct Room<'a> {
storage_id: u32, storage_id: u32,
room_id: RoomId, room_id: RoomId,
subscriptions: HashMap<PlayerId, PlayerHandle>, subscriptions: HashMap<PlayerId, PlayerHandle>,
message_count: u32, message_count: u32,
topic: Str, topic: Str,
storage: Storage, storage: &'a Storage,
} }
impl Room { impl<'a> Room<'a> {
async fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { async fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) {
tracing::info!("Adding a subscriber to room"); tracing::info!("Adding a subscriber to room");
self.subscriptions.insert(player_id.clone(), player_handle); self.subscriptions.insert(player_id.clone(), player_handle);
@ -213,7 +213,7 @@ impl Room {
} }
} }
#[derive(Serialize)] #[derive(Serialize, Debug)]
pub struct RoomInfo { pub struct RoomInfo {
pub id: RoomId, pub id: RoomId,
pub members: Vec<PlayerId>, pub members: Vec<PlayerId>,

View File

@ -53,7 +53,7 @@ async fn main() -> Result<()> {
} = config; } = config;
let mut metrics = MetricsRegistry::new(); let mut metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; let rooms = RoomRegistry::new(&mut metrics, &storage)?;
let players = PlayerRegistry::empty(&rooms, &metrics)?; let players = PlayerRegistry::empty(&rooms, &metrics)?;
// unsafe: outer future is never dropped, scope is joined on `scope.collect` // unsafe: outer future is never dropped, scope is joined on `scope.collect`
@ -73,7 +73,6 @@ async fn main() -> Result<()> {
drop(scope); drop(scope);
players.shutdown_all().await?; players.shutdown_all().await?;
drop(rooms);
storage.close().await?; storage.close().await?;
tracing::info!("Shutdown complete"); tracing::info!("Shutdown complete");
Ok(()) Ok(())

View File

@ -43,7 +43,7 @@ async fn handle_socket(
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
players: &PlayerRegistry<'_>, players: &PlayerRegistry<'_>,
rooms: &RoomRegistry, rooms: &RoomRegistry<'_>,
termination: Deferred<()>, // TODO use it to stop the connection gracefully termination: Deferred<()>, // TODO use it to stop the connection gracefully
storage: &Storage, storage: &Storage,
) -> Result<()> { ) -> Result<()> {
@ -176,7 +176,7 @@ async fn handle_registration<'a>(
async fn handle_registered_socket<'a>( async fn handle_registered_socket<'a>(
config: ServerConfig, config: ServerConfig,
players: &PlayerRegistry<'_>, players: &PlayerRegistry<'_>,
rooms: &RoomRegistry, rooms: &RoomRegistry<'_>,
reader: &mut BufReader<ReadHalf<'a>>, reader: &mut BufReader<ReadHalf<'a>>,
writer: &mut BufWriter<WriteHalf<'a>>, writer: &mut BufWriter<WriteHalf<'a>>,
user: RegisteredUser, user: RegisteredUser,
@ -300,7 +300,7 @@ async fn handle_update(
user: &RegisteredUser, user: &RegisteredUser,
player_id: &PlayerId, player_id: &PlayerId,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
rooms: &RoomRegistry, rooms: &RoomRegistry<'_>,
update: Updates, update: Updates,
) -> Result<()> { ) -> Result<()> {
log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); log::debug!("Sending irc message to player {player_id:?} on update {update:?}");
@ -397,7 +397,7 @@ async fn handle_incoming_message(
buffer: &str, buffer: &str,
config: &ServerConfig, config: &ServerConfig,
user: &RegisteredUser, user: &RegisteredUser,
rooms: &RoomRegistry, rooms: &RoomRegistry<'_>,
user_handle: &mut PlayerConnection, user_handle: &mut PlayerConnection,
writer: &mut (impl AsyncWrite + Unpin), writer: &mut (impl AsyncWrite + Unpin),
) -> Result<HandleResult> { ) -> Result<HandleResult> {
@ -691,7 +691,7 @@ async fn produce_on_join_cmd_messages(
pub async fn launch<'a>( pub async fn launch<'a>(
config: &'a ServerConfig, config: &'a ServerConfig,
players: &'a PlayerRegistry<'_>, players: &'a PlayerRegistry<'_>,
rooms: &'a RoomRegistry, rooms: &'a RoomRegistry<'_>,
metrics: &'a MetricsRegistry, metrics: &'a MetricsRegistry,
storage: &'a Storage, storage: &'a Storage,
scope: &mut Scope<'a>, scope: &mut Scope<'a>,

View File

@ -55,7 +55,7 @@ struct Authenticated {
pub async fn launch<'a>( pub async fn launch<'a>(
config: ServerConfig, config: ServerConfig,
players: &'a PlayerRegistry<'_>, players: &'a PlayerRegistry<'_>,
rooms: &'a RoomRegistry, rooms: &'a RoomRegistry<'_>,
metrics: &'a MetricsRegistry, metrics: &'a MetricsRegistry,
scope: &mut Scope<'a>, scope: &mut Scope<'a>,
) -> Result<Promise<()>> { ) -> Result<Promise<()>> {
@ -140,7 +140,7 @@ async fn handle_socket(
mut stream: TcpStream, mut stream: TcpStream,
socket_addr: &SocketAddr, socket_addr: &SocketAddr,
players: &PlayerRegistry<'_>, players: &PlayerRegistry<'_>,
rooms: &RoomRegistry, rooms: &RoomRegistry<'_>,
termination: Deferred<()>, // TODO use it to stop the connection gracefully termination: Deferred<()>, // TODO use it to stop the connection gracefully
) -> Result<()> { ) -> Result<()> {
log::debug!("Received an XMPP connection from {socket_addr}"); log::debug!("Received an XMPP connection from {socket_addr}");
@ -265,7 +265,7 @@ async fn socket_final(
reader_buf: &mut Vec<u8>, reader_buf: &mut Vec<u8>,
authenticated: &Authenticated, authenticated: &Authenticated,
user_handle: &mut PlayerConnection, user_handle: &mut PlayerConnection,
rooms: &RoomRegistry, rooms: &RoomRegistry<'_>,
) -> Result<()> { ) -> Result<()> {
read_xml_header(xml_reader, reader_buf).await?; read_xml_header(xml_reader, reader_buf).await?;
let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?; let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?;
@ -375,7 +375,7 @@ async fn handle_packet(
packet: ClientPacket, packet: ClientPacket,
user: &Authenticated, user: &Authenticated,
user_handle: &mut PlayerConnection, user_handle: &mut PlayerConnection,
rooms: &RoomRegistry, rooms: &RoomRegistry<'_>,
) -> Result<bool> { ) -> Result<bool> {
Ok(match packet { Ok(match packet {
proto::ClientPacket::Iq(iq) => { proto::ClientPacket::Iq(iq) => {
@ -472,7 +472,7 @@ async fn handle_packet(
}) })
} }
async fn handle_iq(output: &mut Vec<Event<'static>>, iq: Iq<IqClientBody>, rooms: &RoomRegistry) { async fn handle_iq(output: &mut Vec<Event<'static>>, iq: Iq<IqClientBody>, rooms: &RoomRegistry<'_>) {
match iq.body { match iq.body {
proto::IqClientBody::Bind(b) => { proto::IqClientBody::Bind(b) => {
let req = Iq { let req = Iq {
@ -584,7 +584,7 @@ fn disco_info(to: Option<&str>, req: &InfoQuery) -> InfoQuery {
} }
} }
async fn disco_items(to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery { async fn disco_items(to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry<'_>) -> ItemQuery {
let item = match to { let item = match to {
Some("localhost") => { Some("localhost") => {
vec![Item { vec![Item {

View File

@ -27,7 +27,7 @@ pub struct ServerConfig {
pub async fn launch<'a>( pub async fn launch<'a>(
config: ServerConfig, config: ServerConfig,
metrics: &'a MetricsRegistry, metrics: &'a MetricsRegistry,
rooms: &'a RoomRegistry, rooms: &'a RoomRegistry<'_>,
scope: &mut Scope<'a>, scope: &mut Scope<'a>,
) -> Result<Promise<()>> { ) -> Result<Promise<()>> {
log::info!("Starting the telemetry service"); log::info!("Starting the telemetry service");
@ -65,7 +65,7 @@ pub async fn launch<'a>(
async fn route( async fn route(
registry: &MetricsRegistry, registry: &MetricsRegistry,
rooms: &RoomRegistry, rooms: &RoomRegistry<'_>,
request: Request<hyper::body::Incoming>, request: Request<hyper::body::Incoming>,
) -> std::result::Result<Response<BoxBody>, Infallible> { ) -> std::result::Result<Response<BoxBody>, Infallible> {
match (request.method(), request.uri().path()) { match (request.method(), request.uri().path()) {
@ -84,7 +84,7 @@ fn endpoint_metrics(registry: &MetricsRegistry) -> HttpResult<Response<Full<Byte
Ok(Response::new(Full::new(Bytes::from(buffer)))) Ok(Response::new(Full::new(Bytes::from(buffer))))
} }
async fn endpoint_rooms(rooms: &RoomRegistry) -> HttpResult<Response<Full<Bytes>>> { async fn endpoint_rooms(rooms: &RoomRegistry<'_>) -> HttpResult<Response<Full<Bytes>>> {
let room_list = rooms.get_all_rooms().await; let room_list = rooms.get_all_rooms().await;
let mut buffer = vec![]; let mut buffer = vec![];
serde_json::to_writer(&mut buffer, &room_list).expect("unexpected fail when writing to vec"); serde_json::to_writer(&mut buffer, &room_list).expect("unexpected fail when writing to vec");