forked from lavina/lavina
Compare commits
4 Commits
775f43a1f3
...
f3fd15b781
Author | SHA1 | Date |
---|---|---|
Mikhail | f3fd15b781 | |
Mikhail | 8dd3ee4443 | |
Mikhail | 061c3a1093 | |
Mikhail | 5512a74999 |
|
@ -1,8 +1,33 @@
|
||||||
|
use crate::player::PlayerConnection;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
||||||
use crate::repo::Storage;
|
use crate::repo::Storage;
|
||||||
|
|
||||||
impl Storage {
|
impl Storage {
|
||||||
|
#[tracing::instrument(skip(self), name = "Storage::is_room_member")]
|
||||||
|
pub async fn is_room_member(&self, room_id: u32, player_id: u32) -> Result<bool> {
|
||||||
|
let mut executor = self.conn.lock().await;
|
||||||
|
let res: (u32,) = sqlx::query_as(
|
||||||
|
"
|
||||||
|
select
|
||||||
|
count(*)
|
||||||
|
from
|
||||||
|
memberships
|
||||||
|
where
|
||||||
|
user_id = ? and room_id = ?;
|
||||||
|
<<<<<<< HEAD
|
||||||
|
|
||||||
|
=======
|
||||||
|
>>>>>>> 775f43a (Fix query)
|
||||||
|
",
|
||||||
|
)
|
||||||
|
.bind(player_id)
|
||||||
|
.bind(room_id)
|
||||||
|
.fetch_one(&mut *executor)
|
||||||
|
.await?;
|
||||||
|
Ok(res.0 > 0)
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self), name = "Storage::add_room_member")]
|
#[tracing::instrument(skip(self), name = "Storage::add_room_member")]
|
||||||
pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
|
pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
|
||||||
let mut executor = self.conn.lock().await;
|
let mut executor = self.conn.lock().await;
|
||||||
|
|
|
@ -14,6 +14,7 @@ use crate::repo::Storage;
|
||||||
/// Opaque room id
|
/// Opaque room id
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
|
||||||
pub struct RoomId(Str);
|
pub struct RoomId(Str);
|
||||||
|
|
||||||
impl RoomId {
|
impl RoomId {
|
||||||
pub fn from(str: impl Into<Str>) -> Result<RoomId> {
|
pub fn from(str: impl Into<Str>) -> Result<RoomId> {
|
||||||
let bytes = str.into();
|
let bytes = str.into();
|
||||||
|
@ -36,6 +37,7 @@ impl RoomId {
|
||||||
/// Shared data structure for storing metadata about rooms.
|
/// Shared data structure for storing metadata about rooms.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
|
pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
|
||||||
|
|
||||||
impl RoomRegistry {
|
impl RoomRegistry {
|
||||||
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> {
|
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> {
|
||||||
let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
|
let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
|
||||||
|
@ -141,6 +143,7 @@ impl RoomRegistryInner {
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
|
pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
|
||||||
|
|
||||||
impl RoomHandle {
|
impl RoomHandle {
|
||||||
#[tracing::instrument(skip(self, player_handle), name = "RoomHandle::subscribe")]
|
#[tracing::instrument(skip(self, player_handle), name = "RoomHandle::subscribe")]
|
||||||
pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) {
|
pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) {
|
||||||
|
@ -154,7 +157,11 @@ impl RoomHandle {
|
||||||
let mut lock = self.0.write().await;
|
let mut lock = self.0.write().await;
|
||||||
tracing::info!("Adding a new member to a room");
|
tracing::info!("Adding a new member to a room");
|
||||||
let room_storage_id = lock.storage_id;
|
let room_storage_id = lock.storage_id;
|
||||||
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
|
if !lock.storage.is_room_member(room_storage_id, player_storage_id).await.unwrap() {
|
||||||
|
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
|
||||||
|
} else {
|
||||||
|
tracing::warn!("User {} has already been added to the room.", player_id);
|
||||||
|
}
|
||||||
lock.members.insert(player_id.clone());
|
lock.members.insert(player_id.clone());
|
||||||
let update = Updates::RoomJoined {
|
let update = Updates::RoomJoined {
|
||||||
room_id: lock.room_id.clone(),
|
room_id: lock.room_id.clone(),
|
||||||
|
@ -230,6 +237,7 @@ struct Room {
|
||||||
topic: Str,
|
topic: Str,
|
||||||
storage: Storage,
|
storage: Storage,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Room {
|
impl Room {
|
||||||
#[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")]
|
#[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")]
|
||||||
async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime<Utc>) -> Result<()> {
|
async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime<Utc>) -> Result<()> {
|
||||||
|
|
|
@ -3,8 +3,8 @@
|
||||||
use quick_xml::events::Event;
|
use quick_xml::events::Event;
|
||||||
|
|
||||||
use lavina_core::room::{RoomId, RoomRegistry};
|
use lavina_core::room::{RoomId, RoomRegistry};
|
||||||
use proto_xmpp::bind::{BindResponse, Jid, Name, Server};
|
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Server};
|
||||||
use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType, Message, MessageType};
|
use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType};
|
||||||
use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery};
|
use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery};
|
||||||
use proto_xmpp::mam::{Fin, Set};
|
use proto_xmpp::mam::{Fin, Set};
|
||||||
use proto_xmpp::roster::RosterQuery;
|
use proto_xmpp::roster::RosterQuery;
|
||||||
|
@ -17,17 +17,13 @@ use crate::XmppConnection;
|
||||||
impl<'a> XmppConnection<'a> {
|
impl<'a> XmppConnection<'a> {
|
||||||
pub async fn handle_iq(&self, output: &mut Vec<Event<'static>>, iq: Iq<IqClientBody>) {
|
pub async fn handle_iq(&self, output: &mut Vec<Event<'static>>, iq: Iq<IqClientBody>) {
|
||||||
match iq.body {
|
match iq.body {
|
||||||
IqClientBody::Bind(_) => {
|
IqClientBody::Bind(req) => {
|
||||||
let req = Iq {
|
let req = Iq {
|
||||||
from: None,
|
from: None,
|
||||||
id: iq.id,
|
id: iq.id,
|
||||||
to: None,
|
to: None,
|
||||||
r#type: IqType::Result,
|
r#type: IqType::Result,
|
||||||
body: BindResponse(Jid {
|
body: self.bind(&req).await,
|
||||||
name: Some(self.user.xmpp_name.clone()),
|
|
||||||
server: Server(self.hostname.clone()),
|
|
||||||
resource: Some(self.user.xmpp_resource.clone()),
|
|
||||||
}),
|
|
||||||
};
|
};
|
||||||
req.serialize(output);
|
req.serialize(output);
|
||||||
}
|
}
|
||||||
|
@ -114,6 +110,14 @@ impl<'a> XmppConnection<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn bind(&self, req: &BindRequest) -> BindResponse {
|
||||||
|
BindResponse(Jid {
|
||||||
|
name: Some(self.user.xmpp_name.clone()),
|
||||||
|
server: Server(self.hostname.clone()),
|
||||||
|
resource: Some(self.user.xmpp_resource.clone()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
async fn disco_info(&self, to: Option<&Jid>, req: &InfoQuery) -> Result<InfoQuery, IqError> {
|
async fn disco_info(&self, to: Option<&Jid>, req: &InfoQuery) -> Result<InfoQuery, IqError> {
|
||||||
let identity;
|
let identity;
|
||||||
let feature;
|
let feature;
|
||||||
|
|
|
@ -41,6 +41,9 @@ mod message;
|
||||||
mod presence;
|
mod presence;
|
||||||
mod updates;
|
mod updates;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod testkit;
|
||||||
|
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct ServerConfig {
|
pub struct ServerConfig {
|
||||||
pub listen_on: SocketAddr,
|
pub listen_on: SocketAddr,
|
||||||
|
|
|
@ -22,7 +22,8 @@ impl<'a> XmppConnection<'a> {
|
||||||
// resources in MUCs are members' personas – not implemented (yet?)
|
// resources in MUCs are members' personas – not implemented (yet?)
|
||||||
resource: Some(_),
|
resource: Some(_),
|
||||||
}) if server.0 == self.hostname_rooms => {
|
}) if server.0 == self.hostname_rooms => {
|
||||||
self.muc_presence(name, output).await?;
|
let response = self.muc_presence(&name).await?;
|
||||||
|
response.serialize(output);
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
// TODO other presence cases
|
// TODO other presence cases
|
||||||
|
@ -58,7 +59,8 @@ impl<'a> XmppConnection<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn muc_presence(&mut self, name: Name, output: &mut Vec<Event<'static>>) -> Result<()> {
|
// todo: return Presence and serialize on the outside.
|
||||||
|
async fn muc_presence(&mut self, name: &Name) -> Result<(Presence<()>)> {
|
||||||
let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?;
|
let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?;
|
||||||
// TODO handle bans
|
// TODO handle bans
|
||||||
let response = Presence::<()> {
|
let response = Presence::<()> {
|
||||||
|
@ -74,7 +76,101 @@ impl<'a> XmppConnection<'a> {
|
||||||
}),
|
}),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
response.serialize(output);
|
Ok(response)
|
||||||
Ok(())
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo: set up so that the user has been previously joined.
|
||||||
|
// todo: first call to muc_presence is OK, next one is OK too.
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::testkit::{expect_user_authenticated, TestServer};
|
||||||
|
use crate::{Authenticated, XmppConnection};
|
||||||
|
use lavina_core::player::PlayerId;
|
||||||
|
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server};
|
||||||
|
use proto_xmpp::client::Presence;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_muc_joining() {
|
||||||
|
let server = TestServer::start().await.unwrap();
|
||||||
|
|
||||||
|
server.storage.create_user("tester").await.unwrap();
|
||||||
|
|
||||||
|
let player_id = PlayerId::from("tester").unwrap();
|
||||||
|
let user = Authenticated {
|
||||||
|
player_id,
|
||||||
|
xmpp_name: Name("tester".into()),
|
||||||
|
xmpp_resource: Resource("tester".into()),
|
||||||
|
xmpp_muc_name: Resource("tester".into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
|
||||||
|
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
|
||||||
|
|
||||||
|
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
|
||||||
|
let expected = Presence::<()> {
|
||||||
|
to: Some(Jid {
|
||||||
|
name: Some(conn.user.xmpp_name.clone()),
|
||||||
|
server: Server(conn.hostname.clone()),
|
||||||
|
resource: Some(conn.user.xmpp_resource.clone()),
|
||||||
|
}),
|
||||||
|
from: Some(Jid {
|
||||||
|
name: Some(user.xmpp_name.clone()),
|
||||||
|
server: Server(conn.hostname_rooms.clone()),
|
||||||
|
resource: Some(conn.user.xmpp_muc_name.clone()),
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
assert_eq!(expected, response);
|
||||||
|
|
||||||
|
server.shutdown().await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that joining a room second time after a server restart,
|
||||||
|
// i.e. in-memory cache of memberships is cleaned, does not cause any issues.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_muc_joining_twice() {
|
||||||
|
let server = TestServer::start().await.unwrap();
|
||||||
|
|
||||||
|
server.storage.create_user("tester").await.unwrap();
|
||||||
|
|
||||||
|
let player_id = PlayerId::from("tester").unwrap();
|
||||||
|
let user = Authenticated {
|
||||||
|
player_id,
|
||||||
|
xmpp_name: Name("tester".into()),
|
||||||
|
xmpp_resource: Resource("tester".into()),
|
||||||
|
xmpp_muc_name: Resource("tester".into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
|
||||||
|
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
|
||||||
|
|
||||||
|
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
|
||||||
|
let expected = Presence::<()> {
|
||||||
|
to: Some(Jid {
|
||||||
|
name: Some(conn.user.xmpp_name.clone()),
|
||||||
|
server: Server(conn.hostname.clone()),
|
||||||
|
resource: Some(conn.user.xmpp_resource.clone()),
|
||||||
|
}),
|
||||||
|
from: Some(Jid {
|
||||||
|
name: Some(user.xmpp_name.clone()),
|
||||||
|
server: Server(conn.hostname_rooms.clone()),
|
||||||
|
resource: Some(conn.user.xmpp_muc_name.clone()),
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
assert_eq!(expected, response);
|
||||||
|
|
||||||
|
drop(conn);
|
||||||
|
let server = server.reboot().await.unwrap();
|
||||||
|
|
||||||
|
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
|
||||||
|
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
|
||||||
|
|
||||||
|
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
|
||||||
|
assert_eq!(expected, response);
|
||||||
|
|
||||||
|
server.shutdown().await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
use crate::{Authenticated, XmppConnection};
|
||||||
|
use lavina_core::player::{PlayerConnection, PlayerId};
|
||||||
|
use lavina_core::repo::{Storage, StorageConfig};
|
||||||
|
use lavina_core::LavinaCore;
|
||||||
|
use prometheus::Registry as MetricsRegistry;
|
||||||
|
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server};
|
||||||
|
|
||||||
|
pub(crate) struct TestServer {
|
||||||
|
pub metrics: MetricsRegistry,
|
||||||
|
pub storage: Storage,
|
||||||
|
pub core: LavinaCore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestServer {
|
||||||
|
pub async fn start() -> anyhow::Result<TestServer> {
|
||||||
|
let _ = tracing_subscriber::fmt::try_init();
|
||||||
|
let metrics = MetricsRegistry::new();
|
||||||
|
let storage = Storage::open(StorageConfig {
|
||||||
|
db_path: ":memory:".into(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
|
||||||
|
Ok(TestServer { metrics, storage, core })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn reboot(self) -> anyhow::Result<TestServer> {
|
||||||
|
self.core.shutdown().await?;
|
||||||
|
|
||||||
|
let metrics = MetricsRegistry::new();
|
||||||
|
let core = LavinaCore::new(metrics.clone(), self.storage.clone()).await?;
|
||||||
|
|
||||||
|
Ok(TestServer {
|
||||||
|
metrics,
|
||||||
|
storage: self.storage.clone(),
|
||||||
|
core,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn shutdown(self) -> anyhow::Result<()> {
|
||||||
|
self.core.shutdown().await?;
|
||||||
|
self.storage.close().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn expect_user_authenticated<'a>(
|
||||||
|
server: &'a TestServer,
|
||||||
|
user: &'a Authenticated,
|
||||||
|
conn: &'a mut PlayerConnection,
|
||||||
|
) -> anyhow::Result<XmppConnection<'a>> {
|
||||||
|
let conn = XmppConnection {
|
||||||
|
user: &user,
|
||||||
|
user_handle: conn,
|
||||||
|
rooms: &server.core.rooms,
|
||||||
|
hostname: "localhost".into(),
|
||||||
|
hostname_rooms: "rooms.localhost".into(),
|
||||||
|
};
|
||||||
|
let result = conn.bind(&BindRequest(Resource("whatever".into()))).await;
|
||||||
|
let expected = BindResponse(Jid {
|
||||||
|
name: Some(Name("tester".into())),
|
||||||
|
server: Server("localhost".into()),
|
||||||
|
resource: Some(Resource("tester".into())),
|
||||||
|
});
|
||||||
|
assert_eq!(expected, result);
|
||||||
|
Ok(conn)
|
||||||
|
}
|
|
@ -127,12 +127,16 @@ impl FromXml for BindRequest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Debug)]
|
||||||
pub struct BindResponse(pub Jid);
|
pub struct BindResponse(pub Jid);
|
||||||
|
|
||||||
impl ToXml for BindResponse {
|
impl ToXml for BindResponse {
|
||||||
fn serialize(&self, events: &mut Vec<Event<'static>>) {
|
fn serialize(&self, events: &mut Vec<Event<'static>>) {
|
||||||
events.extend_from_slice(&[
|
events.extend_from_slice(&[
|
||||||
Event::Start(BytesStart::new(r#"bind xmlns="urn:ietf:params:xml:ns:xmpp-bind""#)),
|
Event::Start(BytesStart::from_content(
|
||||||
|
r#"bind xmlns="urn:ietf:params:xml:ns:xmpp-bind""#,
|
||||||
|
4,
|
||||||
|
)),
|
||||||
Event::Start(BytesStart::new(r#"jid"#)),
|
Event::Start(BytesStart::new(r#"jid"#)),
|
||||||
Event::Text(BytesText::new(self.0.to_string().as_str()).into_owned()),
|
Event::Text(BytesText::new(self.0.to_string().as_str()).into_owned()),
|
||||||
Event::End(BytesEnd::new("jid")),
|
Event::End(BytesEnd::new("jid")),
|
||||||
|
|
Loading…
Reference in New Issue