//! Domain of chat participants.
//!
//! Player is a single user account, which is used to participate in chats,
//! including sending messages, receiving messaged, retrieving history and running privileged actions.
//! A player corresponds to a single user account. Usually a person has only one account,
//! but it is possible to have multiple accounts for one person and therefore multiple player entities.
//!
//! A player actor is a serial handler of commands from a single player. It is preferable to run all per-player validations in the player actor,
//! so that they don't overload the room actor.
use std::{
    collections::{HashMap, HashSet},
    sync::{Arc, RwLock},
};

use prometheus::{IntGauge, Registry as MetricsRegistry};
use serde::Serialize;
use tokio::{
    sync::mpsc::{channel, Receiver, Sender},
    task::JoinHandle,
};

use crate::{
    core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry},
    prelude::*,
    util::table::{AnonTable, Key as AnonKey},
};

/// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct PlayerId(ByteVec);
impl PlayerId {
    pub fn from_bytes(bytes: ByteVec) -> Result<PlayerId> {
        if bytes.len() > 32 {
            return Err(fail("Nickname cannot be longer than 32 symbols"));
        }
        if bytes.contains(&b' ') {
            return Err(anyhow::Error::msg("Nickname cannot contain spaces"));
        }
        Ok(PlayerId(bytes))
    }
    pub fn as_bytes(&self) -> &ByteVec {
        &self.0
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConnectionId(pub AnonKey);

pub struct PlayerConnection {
    pub connection_id: ConnectionId,
    pub receiver: Receiver<Updates>,
    player_handle: PlayerHandle,
}
impl PlayerConnection {
    pub async fn send_message(&mut self, room_id: RoomId, body: String) -> Result<()> {
        self.player_handle
            .send_message(room_id, self.connection_id.clone(), body)
            .await
    }

    pub async fn join_room(&mut self, room_id: RoomId) -> Result<JoinResult> {
        self.player_handle
            .join_room(room_id, self.connection_id.clone())
            .await
    }

    pub async fn change_topic(&mut self, room_id: RoomId, new_topic: ByteVec) -> Result<()> {
        let (promise, deferred) = oneshot();
        let cmd = Cmd::ChangeTopic {
            room_id,
            new_topic,
            promise,
        };
        self.player_handle
            .send(PlayerCommand::Cmd(cmd, self.connection_id.clone()))
            .await;
        Ok(deferred.await?)
    }

    pub async fn leave_room(&mut self, room_id: RoomId) -> Result<()> {
        let (promise, deferred) = oneshot();
        self.player_handle
            .send(PlayerCommand::Cmd(
                Cmd::LeaveRoom { room_id, promise },
                self.connection_id.clone(),
            ))
            .await;
        Ok(deferred.await?)
    }

    pub async fn terminate(self) {
        self.player_handle
            .send(PlayerCommand::TerminateConnection(self.connection_id))
            .await;
    }

    pub async fn get_rooms(&self) -> Result<Vec<RoomInfo>> {
        let (promise, deferred) = oneshot();
        self.player_handle
            .send(PlayerCommand::GetRooms(promise))
            .await;
        Ok(deferred.await?)
    }
}

/// Handle to a player actor.
#[derive(Clone)]
pub struct PlayerHandle {
    tx: Sender<PlayerCommand>,
}
impl PlayerHandle {
    pub async fn subscribe(&self) -> PlayerConnection {
        let (sender, receiver) = channel(32);
        let (promise, deferred) = oneshot();
        self.tx
            .send(PlayerCommand::AddConnection { sender, promise })
            .await;
        let connection_id = deferred.await.unwrap();
        PlayerConnection {
            connection_id,
            player_handle: self.clone(),
            receiver,
        }
    }

    pub async fn send_message(
        &self,
        room_id: RoomId,
        connection_id: ConnectionId,
        body: String,
    ) -> Result<()> {
        let (promise, deferred) = oneshot();
        let cmd = Cmd::SendMessage {
            room_id,
            body,
            promise,
        };
        self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await;
        Ok(deferred.await?)
    }

    pub async fn join_room(
        &self,
        room_id: RoomId,
        connection_id: ConnectionId,
    ) -> Result<JoinResult> {
        let (promise, deferred) = oneshot();
        let cmd = Cmd::JoinRoom {
            room_id,
            promise: promise,
        };
        self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await;
        Ok(deferred.await?)
    }

    async fn send(&self, command: PlayerCommand) {
        self.tx.send(command).await;
    }

    pub async fn update(&self, update: Updates) {
        self.send(PlayerCommand::Update(update)).await;
    }
}

enum PlayerCommand {
    /** Commands from connections */
    AddConnection {
        sender: Sender<Updates>,
        promise: Promise<ConnectionId>,
    },
    TerminateConnection(ConnectionId),
    Cmd(Cmd, ConnectionId),
    /// Query - responds with a list of rooms the player is a member of.
    GetRooms(Promise<Vec<RoomInfo>>),
    /** Events from rooms */
    Update(Updates),
}

pub enum Cmd {
    JoinRoom {
        room_id: RoomId,
        promise: Promise<JoinResult>,
    },
    LeaveRoom {
        room_id: RoomId,
        promise: Promise<()>,
    },
    SendMessage {
        room_id: RoomId,
        body: String,
        promise: Promise<()>,
    },
    ChangeTopic {
        room_id: RoomId,
        new_topic: ByteVec,
        promise: Promise<()>,
    },
}

pub enum JoinResult {
    Success(RoomInfo),
    Banned,
}

/// Player update event type which is sent to a player actor and from there to a connection handler.
#[derive(Clone)]
pub enum Updates {
    RoomTopicChanged {
        room_id: RoomId,
        new_topic: ByteVec,
    },
    NewMessage {
        room_id: RoomId,
        author_id: PlayerId,
        body: String,
    },
    RoomJoined {
        room_id: RoomId,
        new_member_id: PlayerId,
    },
    RoomLeft {
        room_id: RoomId,
        former_member_id: PlayerId,
    },
    /// The player was banned from the room and left it immediately.
    BannedFrom(RoomId),
}

/// Handle to a player registry — a shared data structure containing information about players.
#[derive(Clone)]
pub struct PlayerRegistry(Arc<RwLock<PlayerRegistryInner>>);
impl PlayerRegistry {
    pub fn empty(
        room_registry: RoomRegistry,
        metrics: &mut MetricsRegistry,
    ) -> Result<PlayerRegistry> {
        let metric_active_players =
            IntGauge::new("chat_players_active", "Number of alive player actors")?;
        metrics.register(Box::new(metric_active_players.clone()))?;
        let inner = PlayerRegistryInner {
            room_registry,
            players: HashMap::new(),
            metric_active_players,
        };
        Ok(PlayerRegistry(Arc::new(RwLock::new(inner))))
    }

    pub async fn get_or_create_player(&mut self, id: PlayerId) -> PlayerHandle {
        let mut inner = self.0.write().unwrap();
        if let Some((handle, _)) = inner.players.get(&id) {
            handle.clone()
        } else {
            let (handle, fiber) = Player::launch(id.clone(), inner.room_registry.clone());
            inner.players.insert(id, (handle.clone(), fiber));
            inner.metric_active_players.inc();
            handle
        }
    }

    pub async fn connect_to_player(&mut self, id: PlayerId) -> PlayerConnection {
        let player_handle = self.get_or_create_player(id).await;
        player_handle.subscribe().await
    }
}

/// The player registry state representation.
struct PlayerRegistryInner {
    room_registry: RoomRegistry,
    players: HashMap<PlayerId, (PlayerHandle, JoinHandle<Player>)>,
    metric_active_players: IntGauge,
}

/// Player actor inner state representation.
struct Player {
    player_id: PlayerId,
    connections: AnonTable<Sender<Updates>>,
    my_rooms: HashMap<RoomId, RoomHandle>,
    banned_from: HashSet<RoomId>,
    rx: Receiver<PlayerCommand>,
    handle: PlayerHandle,
    rooms: RoomRegistry,
}
impl Player {
    fn launch(player_id: PlayerId, rooms: RoomRegistry) -> (PlayerHandle, JoinHandle<Player>) {
        let (tx, rx) = channel(32);
        let handle = PlayerHandle { tx };
        let handle_clone = handle.clone();
        let player = Player {
            player_id,
            connections: AnonTable::new(),
            my_rooms: HashMap::new(),
            banned_from: HashSet::from([RoomId::from_bytes(b"empty".to_vec()).unwrap()]),
            rx,
            handle,
            rooms,
        };
        let fiber = tokio::task::spawn(player.main_loop());
        (handle_clone, fiber)
    }

    async fn main_loop(mut self) -> Self {
        while let Some(cmd) = self.rx.recv().await {
            match cmd {
                PlayerCommand::AddConnection { sender, promise } => {
                    let connection_id = self.connections.insert(sender);
                    if let Err(connection_id) = promise.send(ConnectionId(connection_id)) {
                        log::warn!("Connection {connection_id:?} terminated before finalization");
                        self.terminate_connection(connection_id);
                    }
                }
                PlayerCommand::TerminateConnection(connection_id) => {
                    self.terminate_connection(connection_id);
                }
                PlayerCommand::GetRooms(promise) => {
                    let mut response = vec![];
                    for (_, handle) in &self.my_rooms {
                        response.push(handle.get_room_info().await);
                    }
                    promise.send(response);
                }
                PlayerCommand::Update(update) => {
                    log::info!(
                        "Player received an update, broadcasting to {} connections",
                        self.connections.len()
                    );
                    match update {
                        Updates::BannedFrom(ref room_id) => {
                            self.banned_from.insert(room_id.clone());
                            self.my_rooms.remove(room_id);
                        }
                        _ => {}
                    }
                    for (_, connection) in &self.connections {
                        connection.send(update.clone()).await;
                    }
                }
                PlayerCommand::Cmd(cmd, connection_id) => self.handle_cmd(cmd, connection_id).await,
            }
        }
        self
    }

    fn terminate_connection(&mut self, connection_id: ConnectionId) {
        if let None = self.connections.pop(connection_id.0) {
            log::warn!("Connection {connection_id:?} already terminated");
        }
    }

    async fn handle_cmd(&mut self, cmd: Cmd, connection_id: ConnectionId) {
        match cmd {
            Cmd::JoinRoom { room_id, promise } => {
                if self.banned_from.contains(&room_id) {
                    promise.send(JoinResult::Banned);
                    return;
                }

                let room = self.rooms.get_or_create_room(room_id.clone());
                room.subscribe(self.player_id.clone(), self.handle.clone())
                    .await;
                self.my_rooms.insert(room_id.clone(), room.clone());
                let room_info = room.get_room_info().await;
                promise.send(JoinResult::Success(room_info));
                let update = Updates::RoomJoined {
                    room_id,
                    new_member_id: self.player_id.clone(),
                };
                self.broadcast_update(update, connection_id).await;
            }
            Cmd::LeaveRoom { room_id, promise } => {
                let room = self.my_rooms.remove(&room_id);
                if let Some(room) = room {
                    room.unsubscribe(&self.player_id).await;
                    let room_info = room.get_room_info().await;
                }
                promise.send(());
                let update = Updates::RoomLeft {
                    room_id,
                    former_member_id: self.player_id.clone(),
                };
                self.broadcast_update(update, connection_id).await;
            }
            Cmd::SendMessage {
                room_id,
                body,
                promise,
            } => {
                let room = self.rooms.get_room(&room_id);
                if let Some(room) = room {
                    room.send_message(self.player_id.clone(), body.clone())
                        .await;
                } else {
                    tracing::info!("no room found");
                }
                promise.send(());
                let update = Updates::NewMessage {
                    room_id,
                    author_id: self.player_id.clone(),
                    body,
                };
                self.broadcast_update(update, connection_id).await;
            }
            Cmd::ChangeTopic {
                room_id,
                new_topic,
                promise,
            } => {
                let room = self.rooms.get_room(&room_id);
                if let Some(mut room) = room {
                    room.set_topic(self.player_id.clone(), new_topic.clone())
                        .await;
                } else {
                    tracing::info!("no room found");
                }
                promise.send(());
                let update = Updates::RoomTopicChanged { room_id, new_topic };
                self.broadcast_update(update, connection_id).await;
            }
        }
    }

    async fn broadcast_update(&self, update: Updates, except: ConnectionId) {
        for (a, b) in &self.connections {
            if ConnectionId(a) == except {
                continue;
            }
            b.send(update.clone()).await;
        }
    }
}