forked from lavina/lavina
1
0
Fork 0

Compare commits

..

3 Commits

Author SHA1 Message Date
Nikita Vilunov 8832c579e4 clean up some stuff 2024-04-15 02:20:22 +02:00
Nikita Vilunov b3af10a70a persist room leaves 2024-04-15 01:54:21 +02:00
Nikita Vilunov 79d626330c load rooms at player actor start 2024-04-15 01:46:34 +02:00
7 changed files with 97 additions and 47 deletions

View File

@ -224,19 +224,19 @@ impl PlayerRegistry {
Ok(PlayerRegistry(Arc::new(RwLock::new(inner))))
}
pub async fn get_or_launch_player(&mut self, id: PlayerId) -> PlayerHandle {
pub async fn get_or_launch_player(&mut self, id: &PlayerId) -> PlayerHandle {
let mut inner = self.0.write().await;
if let Some((handle, _)) = inner.players.get(&id) {
if let Some((handle, _)) = inner.players.get(id) {
handle.clone()
} else {
let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.storage.clone()).await;
inner.players.insert(id, (handle.clone(), fiber));
inner.players.insert(id.clone(), (handle.clone(), fiber));
inner.metric_active_players.inc();
handle
}
}
pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection {
pub async fn connect_to_player(&mut self, id: &PlayerId) -> PlayerConnection {
let player_handle = self.get_or_launch_player(id).await;
player_handle.subscribe().await
}
@ -284,8 +284,11 @@ impl Player {
let player = Player {
player_id,
storage_id,
// connections are empty when the actor is just started
connections: AnonTable::new(),
// room handlers will be loaded later in the started task
my_rooms: HashMap::new(),
// TODO implement and load bans
banned_from: HashSet::new(),
rx,
handle,
@ -297,6 +300,15 @@ impl Player {
}
async fn main_loop(mut self) -> Self {
let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
for room_id in rooms {
let room = self.rooms.get_room(&room_id).await;
if let Some(room) = room {
self.my_rooms.insert(room_id, room);
} else {
tracing::error!("Room #{room_id:?} not found");
}
}
while let Some(cmd) = self.rx.recv().await {
match cmd {
ActorCommand::AddConnection { sender, promise } => {
@ -384,8 +396,8 @@ impl Player {
todo!();
}
};
room.add_member(self.player_id.clone(), self.storage_id).await;
room.subscribe(self.player_id.clone(), self.handle.clone()).await;
room.add_member(&self.player_id, self.storage_id).await;
room.subscribe(&self.player_id, self.handle.clone()).await;
self.my_rooms.insert(room_id.clone(), room.clone());
let room_info = room.get_room_info().await;
let update = Updates::RoomJoined {
@ -400,7 +412,7 @@ impl Player {
let room = self.my_rooms.remove(&room_id);
if let Some(room) = room {
room.unsubscribe(&self.player_id).await;
room.remove_member(&self.player_id).await;
room.remove_member(&self.player_id, self.storage_id).await;
}
let update = Updates::RoomLeft {
room_id,
@ -414,7 +426,7 @@ impl Player {
tracing::info!("no room found");
return;
};
room.send_message(self.player_id.clone(), body.clone()).await;
room.send_message(&self.player_id, body.clone()).await;
let update = Updates::NewMessage {
room_id,
author_id: self.player_id.clone(),
@ -428,7 +440,7 @@ impl Player {
tracing::info!("no room found");
return;
};
room.set_topic(self.player_id.clone(), new_topic.clone()).await;
room.set_topic(&self.player_id, new_topic.clone()).await;
let update = Updates::RoomTopicChanged { room_id, new_topic };
self.broadcast_update(update, connection_id).await;
}

View File

@ -51,7 +51,7 @@ impl Storage {
Ok(res)
}
pub async fn retrieve_room_by_name(&mut self, name: &str) -> Result<Option<StoredRoom>> {
pub async fn retrieve_room_by_name(&self, name: &str) -> Result<Option<StoredRoom>> {
let mut executor = self.conn.lock().await;
let res = sqlx::query_as(
"select id, name, topic, message_count

View File

@ -3,7 +3,7 @@ use anyhow::Result;
use crate::repo::Storage;
impl Storage {
pub async fn add_room_member(&mut self, room_id: &u32, player_id: &u32) -> Result<()> {
pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
let mut executor = self.conn.lock().await;
sqlx::query(
"insert into memberships(user_id, room_id, status)
@ -16,4 +16,18 @@ impl Storage {
Ok(())
}
pub async fn remove_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
let mut executor = self.conn.lock().await;
sqlx::query(
"delete from memberships
where user_id = ? and room_id = ?;",
)
.bind(player_id)
.bind(room_id)
.execute(&mut *executor)
.await?;
Ok(())
}
}

View File

@ -1,6 +1,7 @@
use anyhow::Result;
use crate::repo::Storage;
use crate::room::RoomId;
impl Storage {
pub async fn retrieve_user_id_by_name(&self, name: &str) -> Result<Option<u32>> {
@ -12,4 +13,18 @@ impl Storage {
Ok(res.map(|(id,)| id))
}
pub async fn get_rooms_of_a_user(&self, user_id: u32) -> Result<Vec<RoomId>> {
let mut executor = self.conn.lock().await;
let res: Vec<(String,)> = sqlx::query_as(
"select r.name
from memberships m inner join rooms r on m.room_id = r.id
where m.user_id = ?;",
)
.bind(user_id)
.fetch_all(&mut *executor)
.await?;
res.into_iter().map(|(room_id,)| RoomId::from(room_id)).collect()
}
}

View File

@ -49,28 +49,9 @@ impl RoomRegistry {
pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> {
let mut inner = self.0.write().await;
if let Some(room_handle) = inner.rooms.get(&room_id) {
// room was already loaded into memory
log::debug!("Room {} was loaded already", &room_id.0);
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? {
Ok(room_handle.clone())
} else if let Some(stored_room) = inner.storage.retrieve_room_by_name(&*room_id.0).await? {
// room exists, but was not loaded
log::debug!("Loading room {}...", &room_id.0);
let room = Room {
storage_id: stored_room.id,
room_id: room_id.clone(),
subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions
members: HashSet::new(), // TODO load members from storage
topic: stored_room.topic.into(),
message_count: stored_room.message_count,
storage: inner.storage.clone(),
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone());
inner.metric_active_rooms.inc();
Ok(room_handle)
} else {
// room does not exist, create it and load
log::debug!("Creating room {}...", &room_id.0);
let topic = "New room";
let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?;
@ -91,9 +72,8 @@ impl RoomRegistry {
}
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
let inner = self.0.read().await;
let res = inner.rooms.get(room_id);
res.map(|r| r.clone())
let mut inner = self.0.write().await;
inner.get_or_load_room(room_id).await.unwrap()
}
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
@ -116,26 +96,53 @@ struct RoomRegistryInner {
storage: Storage,
}
impl RoomRegistryInner {
async fn get_or_load_room(&mut self, room_id: &RoomId) -> Result<Option<RoomHandle>> {
if let Some(room_handle) = self.rooms.get(room_id) {
log::debug!("Room {} was loaded already", &room_id.0);
Ok(Some(room_handle.clone()))
} else if let Some(stored_room) = self.storage.retrieve_room_by_name(&*room_id.0).await? {
log::debug!("Loading room {}...", &room_id.0);
let room = Room {
storage_id: stored_room.id,
room_id: room_id.clone(),
subscriptions: HashMap::new(),
members: HashSet::new(), // TODO load members from storage
topic: stored_room.topic.into(),
message_count: stored_room.message_count,
storage: self.storage.clone(),
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
self.rooms.insert(room_id.clone(), room_handle.clone());
self.metric_active_rooms.inc();
Ok(Some(room_handle))
} else {
tracing::debug!("Room {} does not exist", &room_id.0);
Ok(None)
}
}
}
#[derive(Clone)]
pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
impl RoomHandle {
pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) {
pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) {
let mut lock = self.0.write().await;
tracing::info!("Adding a subscriber to a room");
lock.subscriptions.insert(player_id.clone(), player_handle);
}
pub async fn add_member(&self, player_id: PlayerId, player_storage_id: u32) {
pub async fn add_member(&self, player_id: &PlayerId, player_storage_id: u32) {
let mut lock = self.0.write().await;
tracing::info!("Adding a new member to a room");
let storage_id = lock.storage_id;
let room_storage_id = lock.storage_id;
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
lock.members.insert(player_id.clone());
lock.storage.add_room_member(&storage_id, &player_storage_id).await.unwrap();
let update = Updates::RoomJoined {
room_id: lock.room_id.clone(),
new_member_id: player_id.clone(),
};
lock.broadcast_update(update, &player_id).await;
lock.broadcast_update(update, player_id).await;
}
pub async fn unsubscribe(&self, player_id: &PlayerId) {
@ -143,9 +150,11 @@ impl RoomHandle {
lock.subscriptions.remove(player_id);
}
pub async fn remove_member(&self, player_id: &PlayerId) {
pub async fn remove_member(&self, player_id: &PlayerId, player_storage_id: u32) {
let mut lock = self.0.write().await;
tracing::info!("Removing a member from a room");
let room_storage_id = lock.storage_id;
lock.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap();
lock.members.remove(player_id);
let update = Updates::RoomLeft {
room_id: lock.room_id.clone(),
@ -154,7 +163,7 @@ impl RoomHandle {
lock.broadcast_update(update, player_id).await;
}
pub async fn send_message(&self, player_id: PlayerId, body: Str) {
pub async fn send_message(&self, player_id: &PlayerId, body: Str) {
let mut lock = self.0.write().await;
let res = lock.send_message(player_id, body).await;
if let Err(err) = res {
@ -171,14 +180,14 @@ impl RoomHandle {
}
}
pub async fn set_topic(&self, changer_id: PlayerId, new_topic: Str) {
pub async fn set_topic(&self, changer_id: &PlayerId, new_topic: Str) {
let mut lock = self.0.write().await;
lock.topic = new_topic.clone();
let update = Updates::RoomTopicChanged {
room_id: lock.room_id.clone(),
new_topic: new_topic.clone(),
};
lock.broadcast_update(update, &changer_id).await;
lock.broadcast_update(update, changer_id).await;
}
}
@ -197,7 +206,7 @@ struct Room {
storage: Storage,
}
impl Room {
async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> {
async fn send_message(&mut self, author_id: &PlayerId, body: Str) -> Result<()> {
tracing::info!("Adding a message to room");
self.storage.insert_message(self.storage_id, self.message_count, &body, &*author_id.as_inner()).await?;
self.message_count += 1;
@ -206,7 +215,7 @@ impl Room {
author_id: author_id.clone(),
body,
};
self.broadcast_update(update, &author_id).await;
self.broadcast_update(update, author_id).await;
Ok(())
}

View File

@ -392,7 +392,7 @@ async fn handle_registered_socket<'a>(
log::info!("Handling registered user: {user:?}");
let player_id = PlayerId::from(user.nickname.clone())?;
let mut connection = players.connect_to_player(player_id.clone()).await;
let mut connection = players.connect_to_player(&player_id).await;
let text: Str = format!("Welcome to {} Server", &config.server_name).into();
ServerMessage {

View File

@ -205,7 +205,7 @@ async fn handle_socket(
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage) => {
match authenticated {
Ok(authenticated) => {
let mut connection = players.connect_to_player(authenticated.player_id.clone()).await;
let mut connection = players.connect_to_player(&authenticated.player_id).await;
socket_final(
&mut xml_reader,
&mut xml_writer,