forked from lavina/lavina
				
			Compare commits
	
		
			4 Commits
		
	
	
		
			775f43a1f3
			...
			f3fd15b781
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | f3fd15b781 | |
|  | 8dd3ee4443 | |
|  | 061c3a1093 | |
|  | 5512a74999 | 
|  | @ -1,8 +1,33 @@ | |||
| use crate::player::PlayerConnection; | ||||
| use anyhow::Result; | ||||
| 
 | ||||
| use crate::repo::Storage; | ||||
| 
 | ||||
| impl Storage { | ||||
|     #[tracing::instrument(skip(self), name = "Storage::is_room_member")] | ||||
|     pub async fn is_room_member(&self, room_id: u32, player_id: u32) -> Result<bool> { | ||||
|         let mut executor = self.conn.lock().await; | ||||
|         let res: (u32,) = sqlx::query_as( | ||||
|             " | ||||
|                 select | ||||
|                     count(*) | ||||
|                 from | ||||
|                     memberships | ||||
|                 where | ||||
|                     user_id = ? and room_id = ?; | ||||
| <<<<<<< HEAD | ||||
| 
 | ||||
| ======= | ||||
| >>>>>>> 775f43a (Fix query) | ||||
|                 ",
 | ||||
|         ) | ||||
|         .bind(player_id) | ||||
|         .bind(room_id) | ||||
|         .fetch_one(&mut *executor) | ||||
|         .await?; | ||||
|         Ok(res.0 > 0) | ||||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument(skip(self), name = "Storage::add_room_member")] | ||||
|     pub async fn add_room_member(&self, room_id: u32, player_id: u32) -> Result<()> { | ||||
|         let mut executor = self.conn.lock().await; | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ use crate::repo::Storage; | |||
| /// Opaque room id
 | ||||
| #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] | ||||
| pub struct RoomId(Str); | ||||
| 
 | ||||
| impl RoomId { | ||||
|     pub fn from(str: impl Into<Str>) -> Result<RoomId> { | ||||
|         let bytes = str.into(); | ||||
|  | @ -36,6 +37,7 @@ impl RoomId { | |||
| /// Shared data structure for storing metadata about rooms.
 | ||||
| #[derive(Clone)] | ||||
| pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>); | ||||
| 
 | ||||
| impl RoomRegistry { | ||||
|     pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> { | ||||
|         let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; | ||||
|  | @ -141,6 +143,7 @@ impl RoomRegistryInner { | |||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct RoomHandle(Arc<AsyncRwLock<Room>>); | ||||
| 
 | ||||
| impl RoomHandle { | ||||
|     #[tracing::instrument(skip(self, player_handle), name = "RoomHandle::subscribe")] | ||||
|     pub async fn subscribe(&self, player_id: &PlayerId, player_handle: PlayerHandle) { | ||||
|  | @ -154,7 +157,11 @@ impl RoomHandle { | |||
|         let mut lock = self.0.write().await; | ||||
|         tracing::info!("Adding a new member to a room"); | ||||
|         let room_storage_id = lock.storage_id; | ||||
|         lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap(); | ||||
|         if !lock.storage.is_room_member(room_storage_id, player_storage_id).await.unwrap() { | ||||
|             lock.storage.add_room_member(room_storage_id, player_storage_id).await.unwrap(); | ||||
|         } else { | ||||
|             tracing::warn!("User {} has already been added to the room.", player_id); | ||||
|         } | ||||
|         lock.members.insert(player_id.clone()); | ||||
|         let update = Updates::RoomJoined { | ||||
|             room_id: lock.room_id.clone(), | ||||
|  | @ -230,6 +237,7 @@ struct Room { | |||
|     topic: Str, | ||||
|     storage: Storage, | ||||
| } | ||||
| 
 | ||||
| impl Room { | ||||
|     #[tracing::instrument(skip(self, body, created_at), name = "Room::send_message")] | ||||
|     async fn send_message(&mut self, author_id: &PlayerId, body: Str, created_at: DateTime<Utc>) -> Result<()> { | ||||
|  |  | |||
|  | @ -3,8 +3,8 @@ | |||
| use quick_xml::events::Event; | ||||
| 
 | ||||
| use lavina_core::room::{RoomId, RoomRegistry}; | ||||
| use proto_xmpp::bind::{BindResponse, Jid, Name, Server}; | ||||
| use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType, Message, MessageType}; | ||||
| use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Server}; | ||||
| use proto_xmpp::client::{Iq, IqError, IqErrorType, IqType}; | ||||
| use proto_xmpp::disco::{Feature, Identity, InfoQuery, Item, ItemQuery}; | ||||
| use proto_xmpp::mam::{Fin, Set}; | ||||
| use proto_xmpp::roster::RosterQuery; | ||||
|  | @ -17,17 +17,13 @@ use crate::XmppConnection; | |||
| impl<'a> XmppConnection<'a> { | ||||
|     pub async fn handle_iq(&self, output: &mut Vec<Event<'static>>, iq: Iq<IqClientBody>) { | ||||
|         match iq.body { | ||||
|             IqClientBody::Bind(_) => { | ||||
|             IqClientBody::Bind(req) => { | ||||
|                 let req = Iq { | ||||
|                     from: None, | ||||
|                     id: iq.id, | ||||
|                     to: None, | ||||
|                     r#type: IqType::Result, | ||||
|                     body: BindResponse(Jid { | ||||
|                         name: Some(self.user.xmpp_name.clone()), | ||||
|                         server: Server(self.hostname.clone()), | ||||
|                         resource: Some(self.user.xmpp_resource.clone()), | ||||
|                     }), | ||||
|                     body: self.bind(&req).await, | ||||
|                 }; | ||||
|                 req.serialize(output); | ||||
|             } | ||||
|  | @ -114,6 +110,14 @@ impl<'a> XmppConnection<'a> { | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub(crate) async fn bind(&self, req: &BindRequest) -> BindResponse { | ||||
|         BindResponse(Jid { | ||||
|             name: Some(self.user.xmpp_name.clone()), | ||||
|             server: Server(self.hostname.clone()), | ||||
|             resource: Some(self.user.xmpp_resource.clone()), | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     async fn disco_info(&self, to: Option<&Jid>, req: &InfoQuery) -> Result<InfoQuery, IqError> { | ||||
|         let identity; | ||||
|         let feature; | ||||
|  |  | |||
|  | @ -41,6 +41,9 @@ mod message; | |||
| mod presence; | ||||
| mod updates; | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod testkit; | ||||
| 
 | ||||
| #[derive(Deserialize, Debug, Clone)] | ||||
| pub struct ServerConfig { | ||||
|     pub listen_on: SocketAddr, | ||||
|  |  | |||
|  | @ -22,7 +22,8 @@ impl<'a> XmppConnection<'a> { | |||
|                 // resources in MUCs are members' personas – not implemented (yet?)
 | ||||
|                 resource: Some(_), | ||||
|             }) if server.0 == self.hostname_rooms => { | ||||
|                 self.muc_presence(name, output).await?; | ||||
|                 let response = self.muc_presence(&name).await?; | ||||
|                 response.serialize(output); | ||||
|             } | ||||
|             _ => { | ||||
|                 // TODO other presence cases
 | ||||
|  | @ -58,7 +59,8 @@ impl<'a> XmppConnection<'a> { | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     async fn muc_presence(&mut self, name: Name, output: &mut Vec<Event<'static>>) -> Result<()> { | ||||
|     // todo: return Presence and serialize on the outside.
 | ||||
|     async fn muc_presence(&mut self, name: &Name) -> Result<(Presence<()>)> { | ||||
|         let a = self.user_handle.join_room(RoomId::from(name.0.clone())?).await?; | ||||
|         // TODO handle bans
 | ||||
|         let response = Presence::<()> { | ||||
|  | @ -74,7 +76,101 @@ impl<'a> XmppConnection<'a> { | |||
|             }), | ||||
|             ..Default::default() | ||||
|         }; | ||||
|         response.serialize(output); | ||||
|         Ok(()) | ||||
|         Ok(response) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| // todo: set up so that the user has been previously joined.
 | ||||
| // todo: first call to muc_presence is OK, next one is OK too.
 | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use crate::testkit::{expect_user_authenticated, TestServer}; | ||||
|     use crate::{Authenticated, XmppConnection}; | ||||
|     use lavina_core::player::PlayerId; | ||||
|     use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server}; | ||||
|     use proto_xmpp::client::Presence; | ||||
| 
 | ||||
|     #[tokio::test] | ||||
|     async fn test_muc_joining() { | ||||
|         let server = TestServer::start().await.unwrap(); | ||||
| 
 | ||||
|         server.storage.create_user("tester").await.unwrap(); | ||||
| 
 | ||||
|         let player_id = PlayerId::from("tester").unwrap(); | ||||
|         let user = Authenticated { | ||||
|             player_id, | ||||
|             xmpp_name: Name("tester".into()), | ||||
|             xmpp_resource: Resource("tester".into()), | ||||
|             xmpp_muc_name: Resource("tester".into()), | ||||
|         }; | ||||
| 
 | ||||
|         let mut player_conn = server.core.players.connect_to_player(&user.player_id).await; | ||||
|         let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); | ||||
| 
 | ||||
|         let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); | ||||
|         let expected = Presence::<()> { | ||||
|             to: Some(Jid { | ||||
|                 name: Some(conn.user.xmpp_name.clone()), | ||||
|                 server: Server(conn.hostname.clone()), | ||||
|                 resource: Some(conn.user.xmpp_resource.clone()), | ||||
|             }), | ||||
|             from: Some(Jid { | ||||
|                 name: Some(user.xmpp_name.clone()), | ||||
|                 server: Server(conn.hostname_rooms.clone()), | ||||
|                 resource: Some(conn.user.xmpp_muc_name.clone()), | ||||
|             }), | ||||
|             ..Default::default() | ||||
|         }; | ||||
|         assert_eq!(expected, response); | ||||
| 
 | ||||
|         server.shutdown().await.unwrap(); | ||||
|     } | ||||
| 
 | ||||
|     // Test that joining a room second time after a server restart,
 | ||||
|     // i.e. in-memory cache of memberships is cleaned, does not cause any issues.
 | ||||
|     #[tokio::test] | ||||
|     async fn test_muc_joining_twice() { | ||||
|         let server = TestServer::start().await.unwrap(); | ||||
| 
 | ||||
|         server.storage.create_user("tester").await.unwrap(); | ||||
| 
 | ||||
|         let player_id = PlayerId::from("tester").unwrap(); | ||||
|         let user = Authenticated { | ||||
|             player_id, | ||||
|             xmpp_name: Name("tester".into()), | ||||
|             xmpp_resource: Resource("tester".into()), | ||||
|             xmpp_muc_name: Resource("tester".into()), | ||||
|         }; | ||||
| 
 | ||||
|         let mut player_conn = server.core.players.connect_to_player(&user.player_id).await; | ||||
|         let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); | ||||
| 
 | ||||
|         let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); | ||||
|         let expected = Presence::<()> { | ||||
|             to: Some(Jid { | ||||
|                 name: Some(conn.user.xmpp_name.clone()), | ||||
|                 server: Server(conn.hostname.clone()), | ||||
|                 resource: Some(conn.user.xmpp_resource.clone()), | ||||
|             }), | ||||
|             from: Some(Jid { | ||||
|                 name: Some(user.xmpp_name.clone()), | ||||
|                 server: Server(conn.hostname_rooms.clone()), | ||||
|                 resource: Some(conn.user.xmpp_muc_name.clone()), | ||||
|             }), | ||||
|             ..Default::default() | ||||
|         }; | ||||
|         assert_eq!(expected, response); | ||||
| 
 | ||||
|         drop(conn); | ||||
|         let server = server.reboot().await.unwrap(); | ||||
| 
 | ||||
|         let mut player_conn = server.core.players.connect_to_player(&user.player_id).await; | ||||
|         let mut conn = expect_user_authenticated(&server, &user, &mut player_conn).await.unwrap(); | ||||
| 
 | ||||
|         let response = conn.muc_presence(&user.xmpp_name).await.unwrap(); | ||||
|         assert_eq!(expected, response); | ||||
| 
 | ||||
|         server.shutdown().await.unwrap(); | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,66 @@ | |||
| use crate::{Authenticated, XmppConnection}; | ||||
| use lavina_core::player::{PlayerConnection, PlayerId}; | ||||
| use lavina_core::repo::{Storage, StorageConfig}; | ||||
| use lavina_core::LavinaCore; | ||||
| use prometheus::Registry as MetricsRegistry; | ||||
| use proto_xmpp::bind::{BindRequest, BindResponse, Jid, Name, Resource, Server}; | ||||
| 
 | ||||
| pub(crate) struct TestServer { | ||||
|     pub metrics: MetricsRegistry, | ||||
|     pub storage: Storage, | ||||
|     pub core: LavinaCore, | ||||
| } | ||||
| 
 | ||||
| impl TestServer { | ||||
|     pub async fn start() -> anyhow::Result<TestServer> { | ||||
|         let _ = tracing_subscriber::fmt::try_init(); | ||||
|         let metrics = MetricsRegistry::new(); | ||||
|         let storage = Storage::open(StorageConfig { | ||||
|             db_path: ":memory:".into(), | ||||
|         }) | ||||
|         .await?; | ||||
|         let core = LavinaCore::new(metrics.clone(), storage.clone()).await?; | ||||
|         Ok(TestServer { metrics, storage, core }) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn reboot(self) -> anyhow::Result<TestServer> { | ||||
|         self.core.shutdown().await?; | ||||
| 
 | ||||
|         let metrics = MetricsRegistry::new(); | ||||
|         let core = LavinaCore::new(metrics.clone(), self.storage.clone()).await?; | ||||
| 
 | ||||
|         Ok(TestServer { | ||||
|             metrics, | ||||
|             storage: self.storage.clone(), | ||||
|             core, | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn shutdown(self) -> anyhow::Result<()> { | ||||
|         self.core.shutdown().await?; | ||||
|         self.storage.close().await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub async fn expect_user_authenticated<'a>( | ||||
|     server: &'a TestServer, | ||||
|     user: &'a Authenticated, | ||||
|     conn: &'a mut PlayerConnection, | ||||
| ) -> anyhow::Result<XmppConnection<'a>> { | ||||
|     let conn = XmppConnection { | ||||
|         user: &user, | ||||
|         user_handle: conn, | ||||
|         rooms: &server.core.rooms, | ||||
|         hostname: "localhost".into(), | ||||
|         hostname_rooms: "rooms.localhost".into(), | ||||
|     }; | ||||
|     let result = conn.bind(&BindRequest(Resource("whatever".into()))).await; | ||||
|     let expected = BindResponse(Jid { | ||||
|         name: Some(Name("tester".into())), | ||||
|         server: Server("localhost".into()), | ||||
|         resource: Some(Resource("tester".into())), | ||||
|     }); | ||||
|     assert_eq!(expected, result); | ||||
|     Ok(conn) | ||||
| } | ||||
|  | @ -127,12 +127,16 @@ impl FromXml for BindRequest { | |||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(PartialEq, Eq, Debug)] | ||||
| pub struct BindResponse(pub Jid); | ||||
| 
 | ||||
| impl ToXml for BindResponse { | ||||
|     fn serialize(&self, events: &mut Vec<Event<'static>>) { | ||||
|         events.extend_from_slice(&[ | ||||
|             Event::Start(BytesStart::new(r#"bind xmlns="urn:ietf:params:xml:ns:xmpp-bind""#)), | ||||
|             Event::Start(BytesStart::from_content( | ||||
|                 r#"bind xmlns="urn:ietf:params:xml:ns:xmpp-bind""#, | ||||
|                 4, | ||||
|             )), | ||||
|             Event::Start(BytesStart::new(r#"jid"#)), | ||||
|             Event::Text(BytesText::new(self.0.to_string().as_str()).into_owned()), | ||||
|             Event::End(BytesEnd::new("jid")), | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue