forked from lavina/lavina
1
0
Fork 0

Compare commits

..

No commits in common. "cb958c1e65e3315c43f71b9a9e9f92cc36836123" and "3e414f4038e0077c79c89c9863035ced0cf93fc0" have entirely different histories.

9 changed files with 19 additions and 231 deletions

View File

@ -629,6 +629,9 @@ impl Player {
RoomRef::Local(room) => { RoomRef::Local(room) => {
room.unsubscribe(&self.player_id).await; room.unsubscribe(&self.player_id).await;
room.remove_member(&self.player_id, self.storage_id).await; room.remove_member(&self.player_id, self.storage_id).await;
let room_storage_id =
self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
self.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap();
} }
RoomRef::Remote { node_id } => { RoomRef::Remote { node_id } => {
let req = LeaveRoomReq { let req = LeaveRoomReq {
@ -636,9 +639,6 @@ impl Player {
player_id: self.player_id.as_inner(), player_id: self.player_id.as_inner(),
}; };
self.cluster_client.leave_room(node_id, req).await.unwrap(); self.cluster_client.leave_room(node_id, req).await.unwrap();
let room_storage_id =
self.storage.create_or_retrieve_room_id_by_name(room_id.as_inner()).await.unwrap();
self.storage.remove_room_member(room_storage_id, self.storage_id).await.unwrap();
} }
} }
} }

View File

@ -1,29 +1,8 @@
use crate::player::PlayerConnection;
use anyhow::Result; use anyhow::Result;
use crate::repo::Storage; use crate::repo::Storage;
impl Storage { impl Storage {
#[tracing::instrument(skip(self), name = "Storage::is_room_member")]
pub async fn is_room_member(&self, room_id: u32, player_id: u32) -> Result<bool> {
let mut executor = self.conn.lock().await;
let res: (u32,) = sqlx::query_as(
"
select
count(*)
from
memberships
where
user_id = ? and room_id = ?;
",
)
.bind(player_id)
.bind(room_id)
.fetch_one(&mut *executor)
.await?;
Ok(res.0 > 0)
}
#[tracing::instrument(skip(self), name = "Storage::add_room_member")] #[tracing::instrument(skip(self), name = "Storage::add_room_member")]
pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
@ -76,7 +55,7 @@ impl Storage {
let res: (u32,) = sqlx::query_as( let res: (u32,) = sqlx::query_as(
"insert into rooms(name, topic) "insert into rooms(name, topic)
values (?, '') values (?, '')
on conflict(name) do update set name = excluded.name on conflict(name) do nothing
returning id;", returning id;",
) )
.bind(name) .bind(name)

View File

@ -14,7 +14,6 @@ use crate::repo::Storage;
/// Opaque room id /// Opaque room id
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct RoomId(Str); pub struct RoomId(Str);
impl RoomId { impl RoomId {
pub fn from(str: impl Into<Str>) -> Result<RoomId> { pub fn from(str: impl Into<Str>) -> Result<RoomId> {
let bytes = str.into(); let bytes = str.into();
@ -37,7 +36,6 @@ impl RoomId {
/// Shared data structure for storing metadata about rooms. /// Shared data structure for storing metadata about rooms.
#[derive(Clone)] #[derive(Clone)]
pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>); pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>);
impl RoomRegistry { impl RoomRegistry {
pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> { pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> {
let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?;
@ -143,7 +141,6 @@ impl RoomRegistryInner {
#[derive(Clone)] #[derive(Clone)]
pub struct RoomHandle(Arc<AsyncRwLock<Room>>); pub struct RoomHandle(Arc<AsyncRwLock<Room>>);
impl RoomHandle { impl RoomHandle {
#[tracing::instrument(skip(self, player_handle), name = "RoomHandle::subscribe")] #[tracing::instrument(skip(self, player_handle), name = "RoomHandle::subscribe")]
pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) { pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) {
@ -157,11 +154,7 @@ impl RoomHandle {
let mut lock = self.0.write().await; let mut lock = self.0.write().await;
tracing::info!("Adding a new member to a room"); tracing::info!("Adding a new member to a room");
let room_storage_id = lock.storage_id; let room_storage_id = lock.storage_id;
if !lock.storage.is_room_member(room_storage_id, player_storage_id).await.unwrap() { lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap();
} else {
tracing::warn!("User {:#?} has already been added to the room.", player_id);
}
lock.members.insert(player_id.clone()); lock.members.insert(player_id.clone());
let update = Updates::RoomJoined { let update = Updates::RoomJoined {
room_id: lock.room_id.clone(), room_id: lock.room_id.clone(),
@ -237,7 +230,6 @@ struct Room {
topic: Str, topic: Str,
storage: Storage, storage: Storage,
} }
impl Room { impl Room {
#[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")] #[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")]
async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime<Utc>) -> Result<()> { async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime<Utc>) -> Result<()> {

View File

@ -3,8 +3,8 @@
use quick_xml::events::Event; use quick_xml::events::Event;
use lavina_core::room::{RoomId, RoomRegistry}; use lavina_core::room::{RoomId, RoomRegistry};
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Server}; use proto_xmpp::bind::{BindResponse, Jid, Name, Server};
use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType}; use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType, Message, MessageType};
use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery}; use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery};
use proto_xmpp::mam::{Fin, Set}; use proto_xmpp::mam::{Fin, Set};
use proto_xmpp::roster::RosterQuery; use proto_xmpp::roster::RosterQuery;
@ -17,13 +17,17 @@ use crate::XmppConnection;
impl<'a> XmppConnection<'a> { impl<'a> XmppConnection<'a> {
pub async fn handle_iq(&self, output: &mut Vec<Event<'static>>, iq: Iq<IqClientBody>) { pub async fn handle_iq(&self, output: &mut Vec<Event<'static>>, iq: Iq<IqClientBody>) {
match iq.body { match iq.body {
IqClientBody::Bind(req) => { IqClientBody::Bind(_) => {
let req = Iq { let req = Iq {
from: None, from: None,
id: iq.id, id: iq.id,
to: None, to: None,
r#type: IqType::Result, r#type: IqType::Result,
body: self.bind(&req).await, body: BindResponse(Jid {
name: Some(self.user.xmpp_name.clone()),
server: Server(self.hostname.clone()),
resource: Some(self.user.xmpp_resource.clone()),
}),
}; };
req.serialize(output); req.serialize(output);
} }
@ -110,14 +114,6 @@ impl<'a> XmppConnection<'a> {
} }
} }
pub(crate) async fn bind(&self, req: &BindRequest) -> BindResponse {
BindResponse(Jid {
name: Some(self.user.xmpp_name.clone()),
server: Server(self.hostname.clone()),
resource: Some(self.user.xmpp_resource.clone()),
})
}
async fn disco_info(&self, to: Option<&Jid>, req: &InfoQuery) -> Result<InfoQuery, IqError> { async fn disco_info(&self, to: Option<&Jid>, req: &InfoQuery) -> Result<InfoQuery, IqError> {
let identity; let identity;
let feature; let feature;

View File

@ -41,9 +41,6 @@ mod message;
mod presence; mod presence;
mod updates; mod updates;
#[cfg(test)]
mod testkit;
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct ServerConfig { pub struct ServerConfig {
pub listen_on: SocketAddr, pub listen_on: SocketAddr,

View File

@ -22,8 +22,7 @@ impl<'a> XmppConnection<'a> {
// resources in MUCs are members' personas not implemented (yet?) // resources in MUCs are members' personas not implemented (yet?)
resource: Some(_), resource: Some(_),
}) if server.0 == self.hostname_rooms => { }) if server.0 == self.hostname_rooms => {
let response = self.muc_presence(&name).await?; self.muc_presence(name, output).await?;
response.serialize(output);
} }
_ => { _ => {
// TODO other presence cases // TODO other presence cases
@ -59,8 +58,7 @@ impl<'a> XmppConnection<'a> {
} }
} }
// todo: return Presence and serialize on the outside. async fn muc_presence(&mut self, name: Name, output: &mut Vec<Event<'static>>) -> Result<()> {
async fn muc_presence(&mut self, name: &Name) -> Result<(Presence<()>)> {
let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?; let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?;
// TODO handle bans // TODO handle bans
let response = Presence::<()> { let response = Presence::<()> {
@ -76,101 +74,7 @@ impl<'a> XmppConnection<'a> {
}), }),
..Default::default() ..Default::default()
}; };
Ok(response) response.serialize(output);
} Ok(())
}
// todo: set up so that the user has been previously joined.
// todo: first call to muc_presence is OK, next one is OK too.
#[cfg(test)]
mod tests {
use crate::testkit::{expect_user_authenticated, TestServer};
use crate::{Authenticated, XmppConnection};
use lavina_core::player::PlayerId;
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server};
use proto_xmpp::client::Presence;
#[tokio::test]
async fn test_muc_joining() {
let server = TestServer::start().await.unwrap();
server.storage.create_user("tester").await.unwrap();
let player_id = PlayerId::from("tester").unwrap();
let user = Authenticated {
player_id,
xmpp_name: Name("tester".into()),
xmpp_resource: Resource("tester".into()),
xmpp_muc_name: Resource("tester".into()),
};
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
let expected = Presence::<()> {
to: Some(Jid {
name: Some(conn.user.xmpp_name.clone()),
server: Server(conn.hostname.clone()),
resource: Some(conn.user.xmpp_resource.clone()),
}),
from: Some(Jid {
name: Some(user.xmpp_name.clone()),
server: Server(conn.hostname_rooms.clone()),
resource: Some(conn.user.xmpp_muc_name.clone()),
}),
..Default::default()
};
assert_eq!(expected, response);
server.shutdown().await.unwrap();
}
// Test that joining a room second time after a server restart,
// i.e. in-memory cache of memberships is cleaned, does not cause any issues.
#[tokio::test]
async fn test_muc_joining_twice() {
let server = TestServer::start().await.unwrap();
server.storage.create_user("tester").await.unwrap();
let player_id = PlayerId::from("tester").unwrap();
let user = Authenticated {
player_id,
xmpp_name: Name("tester".into()),
xmpp_resource: Resource("tester".into()),
xmpp_muc_name: Resource("tester".into()),
};
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
let expected = Presence::<()> {
to: Some(Jid {
name: Some(conn.user.xmpp_name.clone()),
server: Server(conn.hostname.clone()),
resource: Some(conn.user.xmpp_resource.clone()),
}),
from: Some(Jid {
name: Some(user.xmpp_name.clone()),
server: Server(conn.hostname_rooms.clone()),
resource: Some(conn.user.xmpp_muc_name.clone()),
}),
..Default::default()
};
assert_eq!(expected, response);
drop(conn);
let server = server.reboot().await.unwrap();
let mut player_conn = server.core.players.connect_to_player(&user.player_id).await;
let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap();
let response = conn.muc_presence(&user.xmpp_name).await.unwrap();
assert_eq!(expected, response);
server.shutdown().await.unwrap();
} }
} }

View File

@ -1,66 +0,0 @@
use crate::{Authenticated, XmppConnection};
use lavina_core::player::{PlayerConnection, PlayerId};
use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::LavinaCore;
use prometheus::Registry as MetricsRegistry;
use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server};
pub(crate) struct TestServer {
pub metrics: MetricsRegistry,
pub storage: Storage,
pub core: LavinaCore,
}
impl TestServer {
pub async fn start() -> anyhow::Result<TestServer> {
let _ = tracing_subscriber::fmt::try_init();
let metrics = MetricsRegistry::new();
let storage = Storage::open(StorageConfig {
db_path: ":memory:".into(),
})
.await?;
let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
Ok(TestServer { metrics, storage, core })
}
pub async fn reboot(self) -> anyhow::Result<TestServer> {
self.core.shutdown().await?;
let metrics = MetricsRegistry::new();
let core = LavinaCore::new(metrics.clone(), self.storage.clone()).await?;
Ok(TestServer {
metrics,
storage: self.storage.clone(),
core,
})
}
pub async fn shutdown(self) -> anyhow::Result<()> {
self.core.shutdown().await?;
self.storage.close().await?;
Ok(())
}
}
pub async fn expect_user_authenticated<'a>(
server: &'a TestServer,
user: &'a Authenticated,
conn: &'a mut PlayerConnection,
) -> anyhow::Result<XmppConnection<'a>> {
let conn = XmppConnection {
user: &user,
user_handle: conn,
rooms: &server.core.rooms,
hostname: "localhost".into(),
hostname_rooms: "rooms.localhost".into(),
};
let result = conn.bind(&BindRequest(Resource("whatever".into()))).await;
let expected = BindResponse(Jid {
name: Some(Name("tester".into())),
server: Server("localhost".into()),
resource: Some(Resource("tester".into())),
});
assert_eq!(expected, result);
Ok(conn)
}

View File

@ -19,7 +19,6 @@ use tokio_rustls::rustls::{ClientConfig, ServerName};
use tokio_rustls::TlsConnector; use tokio_rustls::TlsConnector;
use lavina_core::auth::Authenticator; use lavina_core::auth::Authenticator;
use lavina_core::clustering::{ClusterConfig, ClusterMetadata};
use lavina_core::repo::{Storage, StorageConfig}; use lavina_core::repo::{Storage, StorageConfig};
use lavina_core::LavinaCore; use lavina_core::LavinaCore;
use projection_xmpp::{launch, RunningServer, ServerConfig}; use projection_xmpp::{launch, RunningServer, ServerConfig};
@ -162,16 +161,7 @@ impl TestServer {
db_path: ":memory:".into(), db_path: ":memory:".into(),
}) })
.await?; .await?;
let cluster_config = ClusterConfig { let core = LavinaCore::new(metrics.clone(), storage.clone()).await?;
addresses: vec![],
metadata: ClusterMetadata {
node_id: 0,
main_owner: 0,
test_owner: 0,
test2_owner: 0,
},
};
let core = LavinaCore::new(metrics.clone(), cluster_config, storage.clone()).await?;
let server = launch(config, core.clone(), metrics.clone()).await.unwrap(); let server = launch(config, core.clone(), metrics.clone()).await.unwrap();
Ok(TestServer { Ok(TestServer {
metrics, metrics,

View File

@ -127,16 +127,12 @@ impl FromXml for BindRequest {
} }
} }
#[derive(PartialEq, Eq, Debug)]
pub struct BindResponse(pub Jid); pub struct BindResponse(pub Jid);
impl ToXml for BindResponse { impl ToXml for BindResponse {
fn serialize(&self, events: &mut Vec<Event<'static>>) { fn serialize(&self, events: &mut Vec<Event<'static>>) {
events.extend_from_slice(&[ events.extend_from_slice(&[
Event::Start(BytesStart::from_content( Event::Start(BytesStart::new(r#"bind xmlns="urn:ietf:params:xml:ns:xmpp-bind""#)),
r#"bind xmlns="urn:ietf:params:xml:ns:xmpp-bind""#,
4,
)),
Event::Start(BytesStart::new(r#"jid"#)), Event::Start(BytesStart::new(r#"jid"#)),
Event::Text(BytesText::new(self.0.to_string().as_str()).into_owned()), Event::Text(BytesText::new(self.0.to_string().as_str()).into_owned()),
Event::End(BytesEnd::new("jid")), Event::End(BytesEnd::new("jid")),