forked from lavina/lavina
1
0
Fork 0

Compare commits

..

No commits in common. "8832c579e485fbda75c6358edbecca85c3242df0" and "694d35dbf1bfec81db08093abef0e592186169bb" have entirely different histories.

7 changed files with 47 additions and 97 deletions

View File

@ -224,19 +224,19 @@ impl PlayerRegistry {
Ok(PlayerRegistry(Arc::new(RwLock::new(inner)))) Ok(PlayerRegistry(Arc::new(RwLock::new(inner))))
} }
pub async fn get_or_launch_player(&mut self, id: &PlayerId) -> PlayerHandle { pub async fn get_or_launch_player(&mut self, id: PlayerId) -> PlayerHandle {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
if let Some((handle, _)) = inner.players.get(id) { if let Some((handle, _)) = inner.players.get(&id) {
handle.clone() handle.clone()
} else { } else {
let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.storage.clone()).await; let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.storage.clone()).await;
inner.players.insert(id.clone(), (handle.clone(), fiber)); inner.players.insert(id, (handle.clone(), fiber));
inner.metric_active_players.inc(); inner.metric_active_players.inc();
handle handle
} }
} }
pub async fn connect_to_player(&mut self, id: &PlayerId) -> PlayerConnection { pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection {
let player_handle = self.get_or_launch_player(id).await; let player_handle = self.get_or_launch_player(id).await;
player_handle.subscribe().await player_handle.subscribe().await
} }
@ -284,11 +284,8 @@ impl Player {
let player = Player { let player = Player {
player_id, player_id,
storage_id, storage_id,
// connections are empty when the actor is just started
connections: AnonTable::new(), connections: AnonTable::new(),
// room handlers will be loaded later in the started task
my_rooms: HashMap::new(), my_rooms: HashMap::new(),
// TODO implement and load bans
banned_from: HashSet::new(), banned_from: HashSet::new(),
rx, rx,
handle, handle,
@ -300,15 +297,6 @@ impl Player {
} }
async fn main_loop(mut self) -> Self { async fn main_loop(mut self) -> Self {
let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
for room_id in rooms {
let room = self.rooms.get_room(&room_id).await;
if let Some(room) = room {
self.my_rooms.insert(room_id, room);
} else {
tracing::error!("Room #{room_id:?} not found");
}
}
while let Some(cmd) = self.rx.recv().await { while let Some(cmd) = self.rx.recv().await {
match cmd { match cmd {
ActorCommand::AddConnection { sender, promise } => { ActorCommand::AddConnection { sender, promise } => {
@ -396,8 +384,8 @@ impl Player {
todo!(); todo!();
} }
}; };
room.add_member(&self.player_id, self.storage_id).await; room.add_member(self.player_id.clone(), self.storage_id).await;
room.subscribe(&self.player_id, self.handle.clone()).await; room.subscribe(self.player_id.clone(), self.handle.clone()).await;
self.my_rooms.insert(room_id.clone(), room.clone()); self.my_rooms.insert(room_id.clone(), room.clone());
let room_info = room.get_room_info().await; let room_info = room.get_room_info().await;
let update = Updates::RoomJoined { let update = Updates::RoomJoined {
@ -412,7 +400,7 @@ impl Player {
let room = self.my_rooms.remove(&room_id); let room = self.my_rooms.remove(&room_id);
if let Some(room) = room { if let Some(room) = room {
room.unsubscribe(&self.player_id).await; room.unsubscribe(&self.player_id).await;
room.remove_member(&self.player_id, self.storage_id).await; room.remove_member(&self.player_id).await;
} }
let update = Updates::RoomLeft { let update = Updates::RoomLeft {
room_id, room_id,
@ -426,7 +414,7 @@ impl Player {
tracing::info!("no room found"); tracing::info!("no room found");
return; return;
}; };
room.send_message(&self.player_id, body.clone()).await; room.send_message(self.player_id.clone(), body.clone()).await;
let update = Updates::NewMessage { let update = Updates::NewMessage {
room_id, room_id,
author_id: self.player_id.clone(), author_id: self.player_id.clone(),
@ -440,7 +428,7 @@ impl Player {
tracing::info!("no room found"); tracing::info!("no room found");
return; return;
}; };
room.set_topic(&self.player_id, new_topic.clone()).await; room.set_topic(self.player_id.clone(), new_topic.clone()).await;
let update = Updates::RoomTopicChanged { room_id, new_topic }; let update = Updates::RoomTopicChanged { room_id, new_topic };
self.broadcast_update(update, connection_id).await; self.broadcast_update(update, connection_id).await;
} }

View File

@ -51,7 +51,7 @@ impl Storage {
Ok(res) Ok(res)
} }
pub async fn retrieve_room_by_name(&self, name: &str) -> Result<Option<StoredRoom>> { pub async fn retrieve_room_by_name(&mut self, name: &str) -> Result<Option<StoredRoom>> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let res = sqlx::query_as( let res = sqlx::query_as(
"select id, name, topic, message_count "select id, name, topic, message_count

View File

@ -3,7 +3,7 @@ use anyhow::Result;
use crate::repo::Storage; use crate::repo::Storage;
impl Storage { impl Storage {
pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { pub async fn add_room_member(&mut self, room_id: &u32, player_id: &u32) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
sqlx::query( sqlx::query(
"insert into memberships(user_id, room_id, status) "insert into memberships(user_id, room_id, status)
@ -16,18 +16,4 @@ impl Storage {
Ok(()) Ok(())
} }
pub async fn remove_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
let mut executor = self.conn.lock().await;
sqlx::query(
"delete from memberships
where user_id = ? and room_id = ?;",
)
.bind(player_id)
.bind(room_id)
.execute(&mut *executor)
.await?;
Ok(())
}
} }

View File

@ -1,7 +1,6 @@
use anyhow::Result; use anyhow::Result;
use crate::repo::Storage; use crate::repo::Storage;
use crate::room::RoomId;
impl Storage { impl Storage {
pub async fn retrieve_user_id_by_name(&self, name: &str) -> Result<Option<u32>> { pub async fn retrieve_user_id_by_name(&self, name: &str) -> Result<Option<u32>> {
@ -13,18 +12,4 @@ impl Storage {
Ok(res.map(|(id,)| id)) Ok(res.map(|(id,)| id))
} }
pub async fn get_rooms_of_a_user(&self, user_id: u32) -> Result<Vec<RoomId>> {
let mut executor = self.conn.lock().await;
let res: Vec<(String,)> = sqlx::query_as(
"select r.name
from memberships m inner join rooms r on m.room_id = r.id
where m.user_id = ?;",
)
.bind(user_id)
.fetch_all(&mut *executor)
.await?;
res.into_iter().map(|(room_id,)| RoomId::from(room_id)).collect()
}
} }

View File

@ -49,9 +49,28 @@ impl RoomRegistry {
pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> { pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> {
let mut inner = self.0.write().await; let mut inner = self.0.write().await;
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? { if let Some(room_handle) = inner.rooms.get(&room_id) {
// room was already loaded into memory
log::debug!("Room {} was loaded already", &room_id.0);
Ok(room_handle.clone()) Ok(room_handle.clone())
} else if let Some(stored_room) = inner.storage.retrieve_room_by_name(&*room_id.0).await? {
// room exists, but was not loaded
log::debug!("Loading room {}...", &room_id.0);
let room = Room {
storage_id: stored_room.id,
room_id: room_id.clone(),
subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions
members: HashSet::new(), // TODO load members from storage
topic: stored_room.topic.into(),
message_count: stored_room.message_count,
storage: inner.storage.clone(),
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
inner.rooms.insert(room_id, room_handle.clone());
inner.metric_active_rooms.inc();
Ok(room_handle)
} else { } else {
// room does not exist, create it and load
log::debug!("Creating room {}...", &room_id.0); log::debug!("Creating room {}...", &room_id.0);
let topic = "New room"; let topic = "New room";
let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?; let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?;
@ -72,8 +91,9 @@ impl RoomRegistry {
} }
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> { pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
let mut inner = self.0.write().await; let inner = self.0.read().await;
inner.get_or_load_room(room_id).await.unwrap() let res = inner.rooms.get(room_id);
res.map(|r| r.clone())
} }
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> { pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
@ -96,53 +116,26 @@ struct RoomRegistryInner {
storage: Storage, storage: Storage,
} }
impl RoomRegistryInner {
async fn get_or_load_room(&mut self, room_id: &RoomId) -> Result<Option<RoomHandle>> {
if let Some(room_handle) = self.rooms.get(room_id) {
log::debug!("Room {} was loaded already", &room_id.0);
Ok(Some(room_handle.clone()))
} else if let Some(stored_room) = self.storage.retrieve_room_by_name(&*room_id.0).await? {
log::debug!("Loading room {}...", &room_id.0);
let room = Room {
storage_id: stored_room.id,
room_id: room_id.clone(),
subscriptions: HashMap::new(),
members: HashSet::new(), // TODO load members from storage
topic: stored_room.topic.into(),
message_count: stored_room.message_count,
storage: self.storage.clone(),
};
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
self.rooms.insert(room_id.clone(), room_handle.clone());
self.metric_active_rooms.inc();
Ok(Some(room_handle))
} else {
tracing::debug!("Room {} does not exist", &room_id.0);
Ok(None)
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct RoomHandle(Arc<AsyncRwLock<Room>>); pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
impl RoomHandle { impl RoomHandle {
pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) { pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
tracing::info!("Adding a subscriber to a room"); tracing::info!("Adding a subscriber to a room");
lock.subscriptions.insert(player_id.clone(), player_handle); lock.subscriptions.insert(player_id.clone(), player_handle);
} }
pub async fn add_member(&self, player_id: &PlayerId, player_storage_id: u32) { pub async fn add_member(&self, player_id: PlayerId, player_storage_id: u32) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
tracing::info!("Adding a new member to a room"); tracing::info!("Adding a new member to a room");
let room_storage_id = lock.storage_id; let storage_id = lock.storage_id;
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
lock.members.insert(player_id.clone()); lock.members.insert(player_id.clone());
lock.storage.add_room_member(&storage_id, &player_storage_id).await.unwrap();
let update = Updates::RoomJoined { let update = Updates::RoomJoined {
room_id: lock.room_id.clone(), room_id: lock.room_id.clone(),
new_member_id: player_id.clone(), new_member_id: player_id.clone(),
}; };
lock.broadcast_update(update, player_id).await; lock.broadcast_update(update, &player_id).await;
} }
pub async fn unsubscribe(&self, player_id: &PlayerId) { pub async fn unsubscribe(&self, player_id: &PlayerId) {
@ -150,11 +143,9 @@ impl RoomHandle {
lock.subscriptions.remove(player_id); lock.subscriptions.remove(player_id);
} }
pub async fn remove_member(&self, player_id: &PlayerId, player_storage_id: u32) { pub async fn remove_member(&self, player_id: &PlayerId) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
tracing::info!("Removing a member from a room"); tracing::info!("Removing a member from a room");
let room_storage_id = lock.storage_id;
lock.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap();
lock.members.remove(player_id); lock.members.remove(player_id);
let update = Updates::RoomLeft { let update = Updates::RoomLeft {
room_id: lock.room_id.clone(), room_id: lock.room_id.clone(),
@ -163,7 +154,7 @@ impl RoomHandle {
lock.broadcast_update(update, player_id).await; lock.broadcast_update(update, player_id).await;
} }
pub async fn send_message(&self, player_id: &PlayerId, body: Str) { pub async fn send_message(&self, player_id: PlayerId, body: Str) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
let res = lock.send_message(player_id, body).await; let res = lock.send_message(player_id, body).await;
if let Err(err) = res { if let Err(err) = res {
@ -180,14 +171,14 @@ impl RoomHandle {
} }
} }
pub async fn set_topic(&self, changer_id: &PlayerId, new_topic: Str) { pub async fn set_topic(&self, changer_id: PlayerId, new_topic: Str) {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
lock.topic = new_topic.clone(); lock.topic = new_topic.clone();
let update = Updates::RoomTopicChanged { let update = Updates::RoomTopicChanged {
room_id: lock.room_id.clone(), room_id: lock.room_id.clone(),
new_topic: new_topic.clone(), new_topic: new_topic.clone(),
}; };
lock.broadcast_update(update, changer_id).await; lock.broadcast_update(update, &changer_id).await;
} }
} }
@ -206,7 +197,7 @@ struct Room {
storage: Storage, storage: Storage,
} }
impl Room { impl Room {
async fn send_message(&mut self, author_id: &PlayerId, body: Str) -> Result<()> { async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> {
tracing::info!("Adding a message to room"); tracing::info!("Adding a message to room");
self.storage.insert_message(self.storage_id, self.message_count, &body, &*author_id.as_inner()).await?; self.storage.insert_message(self.storage_id, self.message_count, &body, &*author_id.as_inner()).await?;
self.message_count += 1; self.message_count += 1;
@ -215,7 +206,7 @@ impl Room {
author_id: author_id.clone(), author_id: author_id.clone(),
body, body,
}; };
self.broadcast_update(update, author_id).await; self.broadcast_update(update, &author_id).await;
Ok(()) Ok(())
} }

View File

@ -392,7 +392,7 @@ async fn handle_registered_socket<'a>(
log::info!("Handling registered user: {user:?}"); log::info!("Handling registered user: {user:?}");
let player_id = PlayerId::from(user.nickname.clone())?; let player_id = PlayerId::from(user.nickname.clone())?;
let mut connection = players.connect_to_player(&player_id).await; let mut connection = players.connect_to_player(player_id.clone()).await;
let text: Str = format!("Welcome to {} Server", &config.server_name).into(); let text: Str = format!("Welcome to {} Server", &config.server_name).into();
ServerMessage { ServerMessage {

View File

@ -205,7 +205,7 @@ async fn handle_socket(
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage) => { authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage) => {
match authenticated { match authenticated {
Ok(authenticated) => { Ok(authenticated) => {
let mut connection = players.connect_to_player(&authenticated.player_id).await; let mut connection = players.connect_to_player(authenticated.player_id.clone()).await;
socket_final( socket_final(
&mut xml_reader, &mut xml_reader,
&mut xml_writer, &mut xml_writer,