forked from lavina/lavina
Compare commits
No commits in common. "8832c579e485fbda75c6358edbecca85c3242df0" and "694d35dbf1bfec81db08093abef0e592186169bb" have entirely different histories.
8832c579e4
...
694d35dbf1
|
@ -224,19 +224,19 @@ impl PlayerRegistry {
|
|||
Ok(PlayerRegistry(Arc::new(RwLock::new(inner))))
|
||||
}
|
||||
|
||||
pub async fn get_or_launch_player(&mut self, id: &PlayerId) -> PlayerHandle {
|
||||
pub async fn get_or_launch_player(&mut self, id: PlayerId) -> PlayerHandle {
|
||||
let mut inner = self.0.write().await;
|
||||
if let Some((handle, _)) = inner.players.get(id) {
|
||||
if let Some((handle, _)) = inner.players.get(&id) {
|
||||
handle.clone()
|
||||
} else {
|
||||
let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone(), inner.storage.clone()).await;
|
||||
inner.players.insert(id.clone(), (handle.clone(), fiber));
|
||||
inner.players.insert(id, (handle.clone(), fiber));
|
||||
inner.metric_active_players.inc();
|
||||
handle
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect_to_player(&mut self, id: &PlayerId) -> PlayerConnection {
|
||||
pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection {
|
||||
let player_handle = self.get_or_launch_player(id).await;
|
||||
player_handle.subscribe().await
|
||||
}
|
||||
|
@ -284,11 +284,8 @@ impl Player {
|
|||
let player = Player {
|
||||
player_id,
|
||||
storage_id,
|
||||
// connections are empty when the actor is just started
|
||||
connections: AnonTable::new(),
|
||||
// room handlers will be loaded later in the started task
|
||||
my_rooms: HashMap::new(),
|
||||
// TODO implement and load bans
|
||||
banned_from: HashSet::new(),
|
||||
rx,
|
||||
handle,
|
||||
|
@ -300,15 +297,6 @@ impl Player {
|
|||
}
|
||||
|
||||
async fn main_loop(mut self) -> Self {
|
||||
let rooms = self.storage.get_rooms_of_a_user(self.storage_id).await.unwrap();
|
||||
for room_id in rooms {
|
||||
let room = self.rooms.get_room(&room_id).await;
|
||||
if let Some(room) = room {
|
||||
self.my_rooms.insert(room_id, room);
|
||||
} else {
|
||||
tracing::error!("Room #{room_id:?} not found");
|
||||
}
|
||||
}
|
||||
while let Some(cmd) = self.rx.recv().await {
|
||||
match cmd {
|
||||
ActorCommand::AddConnection { sender, promise } => {
|
||||
|
@ -396,8 +384,8 @@ impl Player {
|
|||
todo!();
|
||||
}
|
||||
};
|
||||
room.add_member(&self.player_id, self.storage_id).await;
|
||||
room.subscribe(&self.player_id, self.handle.clone()).await;
|
||||
room.add_member(self.player_id.clone(), self.storage_id).await;
|
||||
room.subscribe(self.player_id.clone(), self.handle.clone()).await;
|
||||
self.my_rooms.insert(room_id.clone(), room.clone());
|
||||
let room_info = room.get_room_info().await;
|
||||
let update = Updates::RoomJoined {
|
||||
|
@ -412,7 +400,7 @@ impl Player {
|
|||
let room = self.my_rooms.remove(&room_id);
|
||||
if let Some(room) = room {
|
||||
room.unsubscribe(&self.player_id).await;
|
||||
room.remove_member(&self.player_id, self.storage_id).await;
|
||||
room.remove_member(&self.player_id).await;
|
||||
}
|
||||
let update = Updates::RoomLeft {
|
||||
room_id,
|
||||
|
@ -426,7 +414,7 @@ impl Player {
|
|||
tracing::info!("no room found");
|
||||
return;
|
||||
};
|
||||
room.send_message(&self.player_id, body.clone()).await;
|
||||
room.send_message(self.player_id.clone(), body.clone()).await;
|
||||
let update = Updates::NewMessage {
|
||||
room_id,
|
||||
author_id: self.player_id.clone(),
|
||||
|
@ -440,7 +428,7 @@ impl Player {
|
|||
tracing::info!("no room found");
|
||||
return;
|
||||
};
|
||||
room.set_topic(&self.player_id, new_topic.clone()).await;
|
||||
room.set_topic(self.player_id.clone(), new_topic.clone()).await;
|
||||
let update = Updates::RoomTopicChanged { room_id, new_topic };
|
||||
self.broadcast_update(update, connection_id).await;
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ impl Storage {
|
|||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn retrieve_room_by_name(&self, name: &str) -> Result<Option<StoredRoom>> {
|
||||
pub async fn retrieve_room_by_name(&mut self, name: &str) -> Result<Option<StoredRoom>> {
|
||||
let mut executor = self.conn.lock().await;
|
||||
let res = sqlx::query_as(
|
||||
"select id, name, topic, message_count
|
||||
|
|
|
@ -3,7 +3,7 @@ use anyhow::Result;
|
|||
use crate::repo::Storage;
|
||||
|
||||
impl Storage {
|
||||
pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
|
||||
pub async fn add_room_member(&mut self, room_id: &u32, player_id: &u32) -> Result<()> {
|
||||
let mut executor = self.conn.lock().await;
|
||||
sqlx::query(
|
||||
"insert into memberships(user_id, room_id, status)
|
||||
|
@ -16,18 +16,4 @@ impl Storage {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
|
||||
let mut executor = self.conn.lock().await;
|
||||
sqlx::query(
|
||||
"delete from memberships
|
||||
where user_id = ? and room_id = ?;",
|
||||
)
|
||||
.bind(player_id)
|
||||
.bind(room_id)
|
||||
.execute(&mut *executor)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use anyhow::Result;
|
||||
|
||||
use crate::repo::Storage;
|
||||
use crate::room::RoomId;
|
||||
|
||||
impl Storage {
|
||||
pub async fn retrieve_user_id_by_name(&self, name: &str) -> Result<Option<u32>> {
|
||||
|
@ -13,18 +12,4 @@ impl Storage {
|
|||
|
||||
Ok(res.map(|(id,)| id))
|
||||
}
|
||||
|
||||
pub async fn get_rooms_of_a_user(&self, user_id: u32) -> Result<Vec<RoomId>> {
|
||||
let mut executor = self.conn.lock().await;
|
||||
let res: Vec<(String,)> = sqlx::query_as(
|
||||
"select r.name
|
||||
from memberships m inner join rooms r on m.room_id = r.id
|
||||
where m.user_id = ?;",
|
||||
)
|
||||
.bind(user_id)
|
||||
.fetch_all(&mut *executor)
|
||||
.await?;
|
||||
|
||||
res.into_iter().map(|(room_id,)| RoomId::from(room_id)).collect()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,9 +49,28 @@ impl RoomRegistry {
|
|||
|
||||
pub async fn get_or_create_room(&mut self, room_id: RoomId) -> Result<RoomHandle> {
|
||||
let mut inner = self.0.write().await;
|
||||
if let Some(room_handle) = inner.get_or_load_room(&room_id).await? {
|
||||
if let Some(room_handle) = inner.rooms.get(&room_id) {
|
||||
// room was already loaded into memory
|
||||
log::debug!("Room {} was loaded already", &room_id.0);
|
||||
Ok(room_handle.clone())
|
||||
} else if let Some(stored_room) = inner.storage.retrieve_room_by_name(&*room_id.0).await? {
|
||||
// room exists, but was not loaded
|
||||
log::debug!("Loading room {}...", &room_id.0);
|
||||
let room = Room {
|
||||
storage_id: stored_room.id,
|
||||
room_id: room_id.clone(),
|
||||
subscriptions: HashMap::new(), // TODO figure out how to populate subscriptions
|
||||
members: HashSet::new(), // TODO load members from storage
|
||||
topic: stored_room.topic.into(),
|
||||
message_count: stored_room.message_count,
|
||||
storage: inner.storage.clone(),
|
||||
};
|
||||
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
|
||||
inner.rooms.insert(room_id, room_handle.clone());
|
||||
inner.metric_active_rooms.inc();
|
||||
Ok(room_handle)
|
||||
} else {
|
||||
// room does not exist, create it and load
|
||||
log::debug!("Creating room {}...", &room_id.0);
|
||||
let topic = "New room";
|
||||
let id = inner.storage.create_new_room(&*room_id.0, &*topic).await?;
|
||||
|
@ -72,8 +91,9 @@ impl RoomRegistry {
|
|||
}
|
||||
|
||||
pub async fn get_room(&self, room_id: &RoomId) -> Option<RoomHandle> {
|
||||
let mut inner = self.0.write().await;
|
||||
inner.get_or_load_room(room_id).await.unwrap()
|
||||
let inner = self.0.read().await;
|
||||
let res = inner.rooms.get(room_id);
|
||||
res.map(|r| r.clone())
|
||||
}
|
||||
|
||||
pub async fn get_all_rooms(&self) -> Vec<RoomInfo> {
|
||||
|
@ -96,53 +116,26 @@ struct RoomRegistryInner {
|
|||
storage: Storage,
|
||||
}
|
||||
|
||||
impl RoomRegistryInner {
|
||||
async fn get_or_load_room(&mut self, room_id: &RoomId) -> Result<Option<RoomHandle>> {
|
||||
if let Some(room_handle) = self.rooms.get(room_id) {
|
||||
log::debug!("Room {} was loaded already", &room_id.0);
|
||||
Ok(Some(room_handle.clone()))
|
||||
} else if let Some(stored_room) = self.storage.retrieve_room_by_name(&*room_id.0).await? {
|
||||
log::debug!("Loading room {}...", &room_id.0);
|
||||
let room = Room {
|
||||
storage_id: stored_room.id,
|
||||
room_id: room_id.clone(),
|
||||
subscriptions: HashMap::new(),
|
||||
members: HashSet::new(), // TODO load members from storage
|
||||
topic: stored_room.topic.into(),
|
||||
message_count: stored_room.message_count,
|
||||
storage: self.storage.clone(),
|
||||
};
|
||||
let room_handle = RoomHandle(Arc::new(AsyncRwLock::new(room)));
|
||||
self.rooms.insert(room_id.clone(), room_handle.clone());
|
||||
self.metric_active_rooms.inc();
|
||||
Ok(Some(room_handle))
|
||||
} else {
|
||||
tracing::debug!("Room {} does not exist", &room_id.0);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
|
||||
impl RoomHandle {
|
||||
pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) {
|
||||
pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) {
|
||||
let mut lock = self.0.write().await;
|
||||
tracing::info!("Adding a subscriber to a room");
|
||||
lock.subscriptions.insert(player_id.clone(), player_handle);
|
||||
}
|
||||
|
||||
pub async fn add_member(&self, player_id: &PlayerId, player_storage_id: u32) {
|
||||
pub async fn add_member(&self, player_id: PlayerId, player_storage_id: u32) {
|
||||
let mut lock = self.0.write().await;
|
||||
tracing::info!("Adding a new member to a room");
|
||||
let room_storage_id = lock.storage_id;
|
||||
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
|
||||
let storage_id = lock.storage_id;
|
||||
lock.members.insert(player_id.clone());
|
||||
lock.storage.add_room_member(&storage_id, &player_storage_id).await.unwrap();
|
||||
let update = Updates::RoomJoined {
|
||||
room_id: lock.room_id.clone(),
|
||||
new_member_id: player_id.clone(),
|
||||
};
|
||||
lock.broadcast_update(update, player_id).await;
|
||||
lock.broadcast_update(update, &player_id).await;
|
||||
}
|
||||
|
||||
pub async fn unsubscribe(&self, player_id: &PlayerId) {
|
||||
|
@ -150,11 +143,9 @@ impl RoomHandle {
|
|||
lock.subscriptions.remove(player_id);
|
||||
}
|
||||
|
||||
pub async fn remove_member(&self, player_id: &PlayerId, player_storage_id: u32) {
|
||||
pub async fn remove_member(&self, player_id: &PlayerId) {
|
||||
let mut lock = self.0.write().await;
|
||||
tracing::info!("Removing a member from a room");
|
||||
let room_storage_id = lock.storage_id;
|
||||
lock.storage.remove_room_member(room_storage_id, player_storage_id).await.unwrap();
|
||||
lock.members.remove(player_id);
|
||||
let update = Updates::RoomLeft {
|
||||
room_id: lock.room_id.clone(),
|
||||
|
@ -163,7 +154,7 @@ impl RoomHandle {
|
|||
lock.broadcast_update(update, player_id).await;
|
||||
}
|
||||
|
||||
pub async fn send_message(&self, player_id: &PlayerId, body: Str) {
|
||||
pub async fn send_message(&self, player_id: PlayerId, body: Str) {
|
||||
let mut lock = self.0.write().await;
|
||||
let res = lock.send_message(player_id, body).await;
|
||||
if let Err(err) = res {
|
||||
|
@ -180,14 +171,14 @@ impl RoomHandle {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn set_topic(&self, changer_id: &PlayerId, new_topic: Str) {
|
||||
pub async fn set_topic(&self, changer_id: PlayerId, new_topic: Str) {
|
||||
let mut lock = self.0.write().await;
|
||||
lock.topic = new_topic.clone();
|
||||
let update = Updates::RoomTopicChanged {
|
||||
room_id: lock.room_id.clone(),
|
||||
new_topic: new_topic.clone(),
|
||||
};
|
||||
lock.broadcast_update(update, changer_id).await;
|
||||
lock.broadcast_update(update, &changer_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -206,7 +197,7 @@ struct Room {
|
|||
storage: Storage,
|
||||
}
|
||||
impl Room {
|
||||
async fn send_message(&mut self, author_id: &PlayerId, body: Str) -> Result<()> {
|
||||
async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> {
|
||||
tracing::info!("Adding a message to room");
|
||||
self.storage.insert_message(self.storage_id, self.message_count, &body, &*author_id.as_inner()).await?;
|
||||
self.message_count += 1;
|
||||
|
@ -215,7 +206,7 @@ impl Room {
|
|||
author_id: author_id.clone(),
|
||||
body,
|
||||
};
|
||||
self.broadcast_update(update, author_id).await;
|
||||
self.broadcast_update(update, &author_id).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -392,7 +392,7 @@ async fn handle_registered_socket<'a>(
|
|||
log::info!("Handling registered user: {user:?}");
|
||||
|
||||
let player_id = PlayerId::from(user.nickname.clone())?;
|
||||
let mut connection = players.connect_to_player(&player_id).await;
|
||||
let mut connection = players.connect_to_player(player_id.clone()).await;
|
||||
let text: Str = format!("Welcome to {} Server", &config.server_name).into();
|
||||
|
||||
ServerMessage {
|
||||
|
|
|
@ -205,7 +205,7 @@ async fn handle_socket(
|
|||
authenticated = socket_auth(&mut xml_reader, &mut xml_writer, &mut reader_buf, &mut storage) => {
|
||||
match authenticated {
|
||||
Ok(authenticated) => {
|
||||
let mut connection = players.connect_to_player(&authenticated.player_id).await;
|
||||
let mut connection = players.connect_to_player(authenticated.player_id.clone()).await;
|
||||
socket_final(
|
||||
&mut xml_reader,
|
||||
&mut xml_writer,
|
||||
|
|
Loading…
Reference in New Issue