From 2694936ca5523c3c1166b5416cd44e555d5a5dde Mon Sep 17 00:00:00 2001 From: Nikita Vilunov Date: Wed, 6 Sep 2023 19:10:27 +0200 Subject: [PATCH] de-arcify Storage --- src/core/player.rs | 24 +++++++++++++++--------- src/core/repo/mod.rs | 20 +++++++------------- src/core/room.rs | 32 ++++++++++++++++---------------- src/main.rs | 3 +-- src/projections/irc/mod.rs | 10 +++++----- src/projections/xmpp/mod.rs | 12 ++++++------ src/util/telemetry.rs | 6 +++--- 7 files changed, 53 insertions(+), 54 deletions(-) diff --git a/src/core/player.rs b/src/core/player.rs index 8386a9b..239b4ea 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -102,7 +102,7 @@ impl PlayerConnection { } /// Handle to a player actor. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct PlayerHandle { tx: Sender, } @@ -148,7 +148,9 @@ impl PlayerHandle { } async fn send(&self, command: PlayerCommand) { - let _ = self.tx.send(command).await; + if let Err(e) = self.tx.send(command).await { + log::warn!("Failed to send command to a player: {e:?}"); + } } pub async fn update(&self, update: Updates) { @@ -156,6 +158,7 @@ impl PlayerHandle { } } +#[derive(Debug)] enum PlayerCommand { /** Commands from connections */ AddConnection { @@ -168,8 +171,10 @@ enum PlayerCommand { GetRooms(Promise>), /** Events from rooms */ Update(Updates), + Stop, } +#[derive(Debug)] pub enum Cmd { JoinRoom { room_id: RoomId, @@ -191,6 +196,7 @@ pub enum Cmd { }, } +#[derive(Debug)] pub enum JoinResult { Success(RoomInfo), Banned, @@ -224,7 +230,7 @@ pub enum Updates { pub struct PlayerRegistry<'a>(RwLock>); impl<'a> PlayerRegistry<'a> { pub fn empty( - room_registry: &'a RoomRegistry, + room_registry: &'a RoomRegistry<'a>, metrics: &MetricsRegistry, ) -> Result> { let metric_active_players = @@ -259,9 +265,8 @@ impl<'a> PlayerRegistry<'a> { pub async fn shutdown_all(self) -> Result<()> { let mut inner = self.0.write().unwrap(); - let mut players = HashMap::new(); - std::mem::swap(&mut players, &mut inner.players); for (i, k) in inner.players.drain() { + k.send(PlayerCommand::Stop).await; drop(k); log::debug!("Stopping player #{i:?}") } @@ -273,7 +278,7 @@ impl<'a> PlayerRegistry<'a> { /// The player registry state representation. struct PlayerRegistryInner<'a> { - room_registry: &'a RoomRegistry, + room_registry: &'a RoomRegistry<'a>, players: HashMap, metric_active_players: IntGauge, scope: Scope<'a>, @@ -283,14 +288,14 @@ struct PlayerRegistryInner<'a> { struct Player<'a> { player_id: PlayerId, connections: AnonTable>, - my_rooms: HashMap, + my_rooms: HashMap>, banned_from: HashSet, rx: Receiver, handle: PlayerHandle, - rooms: &'a RoomRegistry, + rooms: &'a RoomRegistry<'a>, } impl<'a> Player<'a> { - fn launch(player_id: PlayerId, rooms: &'a RoomRegistry, scope: &mut Scope<'a>) -> PlayerHandle { + fn launch(player_id: PlayerId, rooms: &'a RoomRegistry<'a>, scope: &mut Scope<'a>) -> PlayerHandle { let (tx, rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); @@ -344,6 +349,7 @@ impl<'a> Player<'a> { } } PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await, + PlayerCommand::Stop => { break; } } } log::debug!("Shutting down player actor #{:?}", self.player_id); diff --git a/src/core/repo/mod.rs b/src/core/repo/mod.rs index 261c164..dfd4042 100644 --- a/src/core/repo/mod.rs +++ b/src/core/repo/mod.rs @@ -1,7 +1,6 @@ //! Storage and persistence logic. use std::str::FromStr; -use std::sync::Arc; use serde::Deserialize; use sqlx::sqlite::SqliteConnectOptions; @@ -15,9 +14,8 @@ pub struct StorageConfig { pub db_path: String, } -#[derive(Clone)] pub struct Storage { - conn: Arc>, + conn: Mutex, } impl Storage { pub async fn open(config: StorageConfig) -> Result { @@ -29,7 +27,7 @@ impl Storage { migrator.run(&mut conn).await?; log::info!("Migrations passed"); - let conn = Arc::new(Mutex::new(conn)); + let conn = Mutex::new(conn); Ok(Storage { conn }) } @@ -47,7 +45,7 @@ impl Storage { Ok(res) } - pub async fn retrieve_room_by_name(&mut self, name: &str) -> Result> { + pub async fn retrieve_room_by_name(&self, name: &str) -> Result> { let mut executor = self.conn.lock().await; let res = sqlx::query_as( "select id, name, topic, message_count @@ -61,7 +59,7 @@ impl Storage { Ok(res) } - pub async fn create_new_room(&mut self, name: &str, topic: &str) -> Result { + pub async fn create_new_room(&self, name: &str, topic: &str) -> Result { let mut executor = self.conn.lock().await; let (id,): (u32,) = sqlx::query_as( "insert into rooms(name, topic) @@ -76,7 +74,7 @@ impl Storage { Ok(id) } - pub async fn insert_message(&mut self, room_id: u32, id: u32, content: &str) -> Result<()> { + pub async fn insert_message(&self, room_id: u32, id: u32, content: &str) -> Result<()> { let mut executor = self.conn.lock().await; sqlx::query( "insert into messages(room_id, id, content) @@ -93,12 +91,8 @@ impl Storage { Ok(()) } - pub async fn close(mut self) -> Result<()> { - let res = match Arc::try_unwrap(self.conn) { - Ok(e) => e, - Err(e) => return Err(fail("failed to acquire DB ownership on shutdown")), - }; - let res = res.into_inner(); + pub async fn close(self) -> Result<()> { + let res = self.conn.into_inner(); res.close().await?; Ok(()) } diff --git a/src/core/room.rs b/src/core/room.rs index f4d7adb..68026b2 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -38,9 +38,9 @@ impl RoomId { } /// Shared datastructure for storing metadata about rooms. -pub struct RoomRegistry(AsyncRwLock); -impl RoomRegistry { - pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result { +pub struct RoomRegistry<'a>(AsyncRwLock>); +impl<'a> RoomRegistry<'a> { + pub fn new(metrics: &mut MetricRegistry, storage: &'a Storage) -> Result> { let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; metrics.register(Box::new(metric_active_rooms.clone()))?; @@ -52,7 +52,7 @@ impl RoomRegistry { Ok(RoomRegistry(AsyncRwLock::new(inner))) } - pub async fn get_or_create_room(&self, room_id: RoomId) -> Result { + pub async fn get_or_create_room(&self, room_id: RoomId) -> Result> { let mut inner = self.0.write().await; if let Some(room_handle) = inner.rooms.get(&room_id) { // room was already loaded into memory @@ -67,7 +67,7 @@ impl RoomRegistry { subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions topic: stored_room.topic.into(), message_count: stored_room.message_count, - storage: inner.storage.clone(), + storage: inner.storage, }; let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); inner.rooms.insert(room_id, room_handle.clone()); @@ -84,7 +84,7 @@ impl RoomRegistry { subscriptions: HashMap::new(), topic: topic.into(), message_count: 0, - storage: inner.storage.clone(), + storage: inner.storage, }; let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room))); inner.rooms.insert(room_id, room_handle.clone()); @@ -93,7 +93,7 @@ impl RoomRegistry { } } - pub async fn get_room(&self, room_id: &RoomId) -> Option { + pub async fn get_room(&self, room_id: &RoomId) -> Option> { let inner = self.0.read().await; let res = inner.rooms.get(room_id); res.map(|r| r.clone()) @@ -113,15 +113,15 @@ impl RoomRegistry { } } -struct RoomRegistryInner { - rooms: HashMap, +struct RoomRegistryInner<'a> { + rooms: HashMap>, metric_active_rooms: IntGauge, - storage: Storage, + storage: &'a Storage, } #[derive(Clone)] -pub struct RoomHandle(Arc>); -impl RoomHandle { +pub struct RoomHandle<'a>(Arc>>); +impl<'a> RoomHandle<'a> { pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) { let mut lock = self.0.write().await; lock.add_subscriber(player_id, player_handle).await; @@ -169,15 +169,15 @@ impl RoomHandle { } } -struct Room { +struct Room<'a> { storage_id: u32, room_id: RoomId, subscriptions: HashMap, message_count: u32, topic: Str, - storage: Storage, + storage: &'a Storage, } -impl Room { +impl<'a> Room<'a> { async fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { tracing::info!("Adding a subscriber to room"); self.subscriptions.insert(player_id.clone(), player_handle); @@ -213,7 +213,7 @@ impl Room { } } -#[derive(Serialize)] +#[derive(Serialize, Debug)] pub struct RoomInfo { pub id: RoomId, pub members: Vec, diff --git a/src/main.rs b/src/main.rs index e352ab4..e0a59a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,7 +53,7 @@ async fn main() -> Result<()> { } = config; let mut metrics = MetricsRegistry::new(); let storage = Storage::open(storage_config).await?; - let rooms = RoomRegistry::new(&mut metrics, storage.clone())?; + let rooms = RoomRegistry::new(&mut metrics, &storage)?; let players = PlayerRegistry::empty(&rooms, &metrics)?; // unsafe: outer future is never dropped, scope is joined on `scope.collect` @@ -73,7 +73,6 @@ async fn main() -> Result<()> { drop(scope); players.shutdown_all().await?; - drop(rooms); storage.close().await?; tracing::info!("Shutdown complete"); Ok(()) diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index 7a6fe97..2c020d0 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -43,7 +43,7 @@ async fn handle_socket( mut stream: TcpStream, socket_addr: &SocketAddr, players: &PlayerRegistry<'_>, - rooms: &RoomRegistry, + rooms: &RoomRegistry<'_>, termination: Deferred<()>, // TODO use it to stop the connection gracefully storage: &Storage, ) -> Result<()> { @@ -176,7 +176,7 @@ async fn handle_registration<'a>( async fn handle_registered_socket<'a>( config: ServerConfig, players: &PlayerRegistry<'_>, - rooms: &RoomRegistry, + rooms: &RoomRegistry<'_>, reader: &mut BufReader>, writer: &mut BufWriter>, user: RegisteredUser, @@ -300,7 +300,7 @@ async fn handle_update( user: &RegisteredUser, player_id: &PlayerId, writer: &mut (impl AsyncWrite + Unpin), - rooms: &RoomRegistry, + rooms: &RoomRegistry<'_>, update: Updates, ) -> Result<()> { log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); @@ -397,7 +397,7 @@ async fn handle_incoming_message( buffer: &str, config: &ServerConfig, user: &RegisteredUser, - rooms: &RoomRegistry, + rooms: &RoomRegistry<'_>, user_handle: &mut PlayerConnection, writer: &mut (impl AsyncWrite + Unpin), ) -> Result { @@ -691,7 +691,7 @@ async fn produce_on_join_cmd_messages( pub async fn launch<'a>( config: &'a ServerConfig, players: &'a PlayerRegistry<'_>, - rooms: &'a RoomRegistry, + rooms: &'a RoomRegistry<'_>, metrics: &'a MetricsRegistry, storage: &'a Storage, scope: &mut Scope<'a>, diff --git a/src/projections/xmpp/mod.rs b/src/projections/xmpp/mod.rs index ac592fd..a5b94d8 100644 --- a/src/projections/xmpp/mod.rs +++ b/src/projections/xmpp/mod.rs @@ -55,7 +55,7 @@ struct Authenticated { pub async fn launch<'a>( config: ServerConfig, players: &'a PlayerRegistry<'_>, - rooms: &'a RoomRegistry, + rooms: &'a RoomRegistry<'_>, metrics: &'a MetricsRegistry, scope: &mut Scope<'a>, ) -> Result> { @@ -140,7 +140,7 @@ async fn handle_socket( mut stream: TcpStream, socket_addr: &SocketAddr, players: &PlayerRegistry<'_>, - rooms: &RoomRegistry, + rooms: &RoomRegistry<'_>, termination: Deferred<()>, // TODO use it to stop the connection gracefully ) -> Result<()> { log::debug!("Received an XMPP connection from {socket_addr}"); @@ -265,7 +265,7 @@ async fn socket_final( reader_buf: &mut Vec, authenticated: &Authenticated, user_handle: &mut PlayerConnection, - rooms: &RoomRegistry, + rooms: &RoomRegistry<'_>, ) -> Result<()> { read_xml_header(xml_reader, reader_buf).await?; let _ = ClientStreamStart::parse(xml_reader, reader_buf).await?; @@ -375,7 +375,7 @@ async fn handle_packet( packet: ClientPacket, user: &Authenticated, user_handle: &mut PlayerConnection, - rooms: &RoomRegistry, + rooms: &RoomRegistry<'_>, ) -> Result { Ok(match packet { proto::ClientPacket::Iq(iq) => { @@ -472,7 +472,7 @@ async fn handle_packet( }) } -async fn handle_iq(output: &mut Vec>, iq: Iq, rooms: &RoomRegistry) { +async fn handle_iq(output: &mut Vec>, iq: Iq, rooms: &RoomRegistry<'_>) { match iq.body { proto::IqClientBody::Bind(b) => { let req = Iq { @@ -584,7 +584,7 @@ fn disco_info(to: Option<&str>, req: &InfoQuery) -> InfoQuery { } } -async fn disco_items(to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry) -> ItemQuery { +async fn disco_items(to: Option<&str>, req: &ItemQuery, rooms: &RoomRegistry<'_>) -> ItemQuery { let item = match to { Some("localhost") => { vec![Item { diff --git a/src/util/telemetry.rs b/src/util/telemetry.rs index 1994112..1483734 100644 --- a/src/util/telemetry.rs +++ b/src/util/telemetry.rs @@ -27,7 +27,7 @@ pub struct ServerConfig { pub async fn launch<'a>( config: ServerConfig, metrics: &'a MetricsRegistry, - rooms: &'a RoomRegistry, + rooms: &'a RoomRegistry<'_>, scope: &mut Scope<'a>, ) -> Result> { log::info!("Starting the telemetry service"); @@ -65,7 +65,7 @@ pub async fn launch<'a>( async fn route( registry: &MetricsRegistry, - rooms: &RoomRegistry, + rooms: &RoomRegistry<'_>, request: Request, ) -> std::result::Result, Infallible> { match (request.method(), request.uri().path()) { @@ -84,7 +84,7 @@ fn endpoint_metrics(registry: &MetricsRegistry) -> HttpResult HttpResult>> { +async fn endpoint_rooms(rooms: &RoomRegistry<'_>) -> HttpResult>> { let room_list = rooms.get_all_rooms().await; let mut buffer = vec![]; serde_json::to_writer(&mut buffer, &room_list).expect("unexpected fail when writing to vec");