forked from lavina/lavina
				
			Compare commits
	
		
			2 Commits
		
	
	
		
			1b3551f108
			...
			9ca81b3a2b
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 9ca81b3a2b | |
|  | a1db17c779 | 
|  | @ -854,10 +854,11 @@ dependencies = [ | ||||||
|  "futures-util", |  "futures-util", | ||||||
|  "http-body-util", |  "http-body-util", | ||||||
|  "hyper 1.0.0-rc.3", |  "hyper 1.0.0-rc.3", | ||||||
|  |  "lavina-core", | ||||||
|  "mgmt-api", |  "mgmt-api", | ||||||
|  "nonempty", |  "nonempty", | ||||||
|  |  "projection-irc", | ||||||
|  "prometheus", |  "prometheus", | ||||||
|  "proto-irc", |  | ||||||
|  "proto-xmpp", |  "proto-xmpp", | ||||||
|  "quick-xml", |  "quick-xml", | ||||||
|  "regex", |  "regex", | ||||||
|  | @ -865,7 +866,6 @@ dependencies = [ | ||||||
|  "rustls-pemfile", |  "rustls-pemfile", | ||||||
|  "serde", |  "serde", | ||||||
|  "serde_json", |  "serde_json", | ||||||
|  "sqlx", |  | ||||||
|  "tokio", |  "tokio", | ||||||
|  "tokio-rustls", |  "tokio-rustls", | ||||||
|  "tracing", |  "tracing", | ||||||
|  | @ -873,6 +873,18 @@ dependencies = [ | ||||||
|  "uuid", |  "uuid", | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "lavina-core" | ||||||
|  | version = "0.0.1-dev" | ||||||
|  | dependencies = [ | ||||||
|  |  "anyhow", | ||||||
|  |  "prometheus", | ||||||
|  |  "serde", | ||||||
|  |  "sqlx", | ||||||
|  |  "tokio", | ||||||
|  |  "tracing", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "lazy_static" | name = "lazy_static" | ||||||
| version = "1.4.0" | version = "1.4.0" | ||||||
|  | @ -1240,6 +1252,21 @@ dependencies = [ | ||||||
|  "yansi", |  "yansi", | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "projection-irc" | ||||||
|  | version = "0.0.1-dev" | ||||||
|  | dependencies = [ | ||||||
|  |  "anyhow", | ||||||
|  |  "futures-util", | ||||||
|  |  "lavina-core", | ||||||
|  |  "nonempty", | ||||||
|  |  "prometheus", | ||||||
|  |  "proto-irc", | ||||||
|  |  "serde", | ||||||
|  |  "tokio", | ||||||
|  |  "tracing", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "prometheus" | name = "prometheus" | ||||||
| version = "0.13.3" | version = "0.13.3" | ||||||
|  |  | ||||||
							
								
								
									
										13
									
								
								Cargo.toml
								
								
								
								
							
							
						
						
									
										13
									
								
								Cargo.toml
								
								
								
								
							|  | @ -1,7 +1,9 @@ | ||||||
| [workspace] | [workspace] | ||||||
| members = [ | members = [ | ||||||
|     ".", |     ".", | ||||||
|  |     "crates/lavina-core", | ||||||
|     "crates/proto-irc", |     "crates/proto-irc", | ||||||
|  |     "crates/projection-irc", | ||||||
|     "crates/proto-xmpp", |     "crates/proto-xmpp", | ||||||
|     "crates/mgmt-api", |     "crates/mgmt-api", | ||||||
| ] | ] | ||||||
|  | @ -22,6 +24,9 @@ regex = "1.7.1" | ||||||
| derive_more = "0.99.17" | derive_more = "0.99.17" | ||||||
| clap = { version = "4.4.4", features = ["derive"] } | clap = { version = "4.4.4", features = ["derive"] } | ||||||
| serde = { version = "1.0.152", features = ["rc", "serde_derive"] } | serde = { version = "1.0.152", features = ["rc", "serde_derive"] } | ||||||
|  | tracing = "0.1.37" # logging & tracing api | ||||||
|  | prometheus = { version = "0.13.3", default-features = false } | ||||||
|  | lavina-core = { path = "crates/lavina-core" } | ||||||
| 
 | 
 | ||||||
| [package] | [package] | ||||||
| name = "lavina" | name = "lavina" | ||||||
|  | @ -37,18 +42,18 @@ http-body-util = "0.1.0-rc.3" | ||||||
| serde.workspace = true | serde.workspace = true | ||||||
| serde_json = "1.0.93" | serde_json = "1.0.93" | ||||||
| tokio.workspace = true | tokio.workspace = true | ||||||
| tracing = "0.1.37" # logging & tracing api | tracing.workspace = true | ||||||
| tracing-subscriber = "0.3.16" | tracing-subscriber = "0.3.16" | ||||||
| futures-util.workspace = true | futures-util.workspace = true | ||||||
| prometheus = { version = "0.13.3", default-features = false } | prometheus.workspace = true | ||||||
| nonempty.workspace = true | nonempty.workspace = true | ||||||
| tokio-rustls = "0.24.1" | tokio-rustls = "0.24.1" | ||||||
| rustls-pemfile = "1.0.2" | rustls-pemfile = "1.0.2" | ||||||
| quick-xml.workspace = true | quick-xml.workspace = true | ||||||
| derive_more.workspace = true | derive_more.workspace = true | ||||||
| uuid = { version = "1.3.0", features = ["v4"] } | uuid = { version = "1.3.0", features = ["v4"] } | ||||||
| sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } | lavina-core.workspace = true | ||||||
| proto-irc = { path = "crates/proto-irc" } | projection-irc = { path = "crates/projection-irc" } | ||||||
| proto-xmpp = { path = "crates/proto-xmpp" } | proto-xmpp = { path = "crates/proto-xmpp" } | ||||||
| mgmt-api = { path = "crates/mgmt-api" } | mgmt-api = { path = "crates/mgmt-api" } | ||||||
| clap.workspace = true | clap.workspace = true | ||||||
|  |  | ||||||
|  | @ -0,0 +1,12 @@ | ||||||
|  | [package] | ||||||
|  | name = "lavina-core" | ||||||
|  | edition = "2021" | ||||||
|  | version.workspace = true | ||||||
|  | 
 | ||||||
|  | [dependencies] | ||||||
|  | anyhow.workspace = true | ||||||
|  | sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } | ||||||
|  | serde.workspace = true | ||||||
|  | tokio.workspace = true | ||||||
|  | tracing.workspace = true | ||||||
|  | prometheus.workspace = true | ||||||
|  | @ -1,4 +1,8 @@ | ||||||
| //! Domain definitions and implementation of common chat logic.
 | //! Domain definitions and implementation of common chat logic.
 | ||||||
| pub mod player; | pub mod player; | ||||||
|  | pub mod prelude; | ||||||
| pub mod repo; | pub mod repo; | ||||||
| pub mod room; | pub mod room; | ||||||
|  | pub mod terminator; | ||||||
|  | 
 | ||||||
|  | mod table; | ||||||
|  | @ -19,11 +19,9 @@ use tokio::{ | ||||||
|     task::JoinHandle, |     task::JoinHandle, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| use crate::{ | use crate::prelude::*; | ||||||
|     core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}, | use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; | ||||||
|     prelude::*, | use crate::table::{AnonTable, Key as AnonKey}; | ||||||
|     util::table::{AnonTable, Key as AnonKey}, |  | ||||||
| }; |  | ||||||
| 
 | 
 | ||||||
| /// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
 | /// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
 | ||||||
| #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] | #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] | ||||||
|  | @ -63,9 +61,7 @@ impl PlayerConnection { | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn join_room(&mut self, room_id: RoomId) -> Result<JoinResult> { |     pub async fn join_room(&mut self, room_id: RoomId) -> Result<JoinResult> { | ||||||
|         self.player_handle |         self.player_handle.join_room(room_id, self.connection_id.clone()).await | ||||||
|             .join_room(room_id, self.connection_id.clone()) |  | ||||||
|             .await |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { |     pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { | ||||||
|  | @ -100,9 +96,7 @@ impl PlayerConnection { | ||||||
| 
 | 
 | ||||||
|     pub async fn get_rooms(&self) -> Result<Vec<RoomInfo>> { |     pub async fn get_rooms(&self) -> Result<Vec<RoomInfo>> { | ||||||
|         let (promise, deferred) = oneshot(); |         let (promise, deferred) = oneshot(); | ||||||
|         self.player_handle |         self.player_handle.send(PlayerCommand::GetRooms(promise)).await; | ||||||
|             .send(PlayerCommand::GetRooms(promise)) |  | ||||||
|             .await; |  | ||||||
|         Ok(deferred.await?) |         Ok(deferred.await?) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | @ -126,27 +120,14 @@ impl PlayerHandle { | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn send_message( |     pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: Str) -> Result<()> { | ||||||
|         &self, |  | ||||||
|         room_id: RoomId, |  | ||||||
|         connection_id: ConnectionId, |  | ||||||
|         body: Str, |  | ||||||
|     ) -> Result<()> { |  | ||||||
|         let (promise, deferred) = oneshot(); |         let (promise, deferred) = oneshot(); | ||||||
|         let cmd = Cmd::SendMessage { |         let cmd = Cmd::SendMessage { room_id, body, promise }; | ||||||
|             room_id, |  | ||||||
|             body, |  | ||||||
|             promise, |  | ||||||
|         }; |  | ||||||
|         let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; |         let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; | ||||||
|         Ok(deferred.await?) |         Ok(deferred.await?) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn join_room( |     pub async fn join_room(&self, room_id: RoomId, connection_id: ConnectionId) -> Result<JoinResult> { | ||||||
|         &self, |  | ||||||
|         room_id: RoomId, |  | ||||||
|         connection_id: ConnectionId, |  | ||||||
|     ) -> Result<JoinResult> { |  | ||||||
|         let (promise, deferred) = oneshot(); |         let (promise, deferred) = oneshot(); | ||||||
|         let cmd = Cmd::JoinRoom { room_id, promise }; |         let cmd = Cmd::JoinRoom { room_id, promise }; | ||||||
|         let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; |         let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; | ||||||
|  | @ -230,12 +211,8 @@ pub enum Updates { | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| pub struct PlayerRegistry(Arc<RwLock<PlayerRegistryInner>>); | pub struct PlayerRegistry(Arc<RwLock<PlayerRegistryInner>>); | ||||||
| impl PlayerRegistry { | impl PlayerRegistry { | ||||||
|     pub fn empty( |     pub fn empty(room_registry: RoomRegistry, metrics: &mut MetricsRegistry) -> Result<PlayerRegistry> { | ||||||
|         room_registry: RoomRegistry, |         let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; | ||||||
|         metrics: &mut MetricsRegistry, |  | ||||||
|     ) -> Result<PlayerRegistry> { |  | ||||||
|         let metric_active_players = |  | ||||||
|             IntGauge::new("chat_players_active", "Number of alive player actors")?; |  | ||||||
|         metrics.register(Box::new(metric_active_players.clone()))?; |         metrics.register(Box::new(metric_active_players.clone()))?; | ||||||
|         let inner = PlayerRegistryInner { |         let inner = PlayerRegistryInner { | ||||||
|             room_registry, |             room_registry, | ||||||
|  | @ -375,8 +352,7 @@ impl Player { | ||||||
|                         return; |                         return; | ||||||
|                     } |                     } | ||||||
|                 }; |                 }; | ||||||
|                 room.subscribe(self.player_id.clone(), self.handle.clone()) |                 room.subscribe(self.player_id.clone(), self.handle.clone()).await; | ||||||
|                     .await; |  | ||||||
|                 self.my_rooms.insert(room_id.clone(), room.clone()); |                 self.my_rooms.insert(room_id.clone(), room.clone()); | ||||||
|                 let room_info = room.get_room_info().await; |                 let room_info = room.get_room_info().await; | ||||||
|                 let _ = promise.send(JoinResult::Success(room_info)); |                 let _ = promise.send(JoinResult::Success(room_info)); | ||||||
|  | @ -399,15 +375,10 @@ impl Player { | ||||||
|                 }; |                 }; | ||||||
|                 self.broadcast_update(update, connection_id).await; |                 self.broadcast_update(update, connection_id).await; | ||||||
|             } |             } | ||||||
|             Cmd::SendMessage { |             Cmd::SendMessage { room_id, body, promise } => { | ||||||
|                 room_id, |  | ||||||
|                 body, |  | ||||||
|                 promise, |  | ||||||
|             } => { |  | ||||||
|                 let room = self.rooms.get_room(&room_id).await; |                 let room = self.rooms.get_room(&room_id).await; | ||||||
|                 if let Some(room) = room { |                 if let Some(room) = room { | ||||||
|                     room.send_message(self.player_id.clone(), body.clone()) |                     room.send_message(self.player_id.clone(), body.clone()).await; | ||||||
|                         .await; |  | ||||||
|                 } else { |                 } else { | ||||||
|                     tracing::info!("no room found"); |                     tracing::info!("no room found"); | ||||||
|                 } |                 } | ||||||
|  | @ -426,8 +397,7 @@ impl Player { | ||||||
|             } => { |             } => { | ||||||
|                 let room = self.rooms.get_room(&room_id).await; |                 let room = self.rooms.get_room(&room_id).await; | ||||||
|                 if let Some(mut room) = room { |                 if let Some(mut room) = room { | ||||||
|                     room.set_topic(self.player_id.clone(), new_topic.clone()) |                     room.set_topic(self.player_id.clone(), new_topic.clone()).await; | ||||||
|                         .await; |  | ||||||
|                 } else { |                 } else { | ||||||
|                     tracing::info!("no room found"); |                     tracing::info!("no room found"); | ||||||
|                 } |                 } | ||||||
|  | @ -15,11 +15,3 @@ pub type Str = std::sync::Arc<str>; | ||||||
| pub fn fail(msg: &str) -> anyhow::Error { | pub fn fail(msg: &str) -> anyhow::Error { | ||||||
|     anyhow::Error::msg(msg.to_owned()) |     anyhow::Error::msg(msg.to_owned()) | ||||||
| } | } | ||||||
| 
 |  | ||||||
| macro_rules! ffail { |  | ||||||
|     ($($arg:tt)*) => { |  | ||||||
|         fail(&format!($($arg)*)) |  | ||||||
|     }; |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| pub(crate) use ffail; |  | ||||||
|  | @ -93,10 +93,10 @@ impl Storage { | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn close(mut self) -> Result<()> { |     pub async fn close(self) -> Result<()> { | ||||||
|         let res = match Arc::try_unwrap(self.conn) { |         let res = match Arc::try_unwrap(self.conn) { | ||||||
|             Ok(e) => e, |             Ok(e) => e, | ||||||
|             Err(e) => return Err(fail("failed to acquire DB ownership on shutdown")), |             Err(_) => return Err(fail("failed to acquire DB ownership on shutdown")), | ||||||
|         }; |         }; | ||||||
|         let res = res.into_inner(); |         let res = res.into_inner(); | ||||||
|         res.close().await?; |         res.close().await?; | ||||||
|  | @ -1,17 +1,13 @@ | ||||||
| //! Domain of rooms — chats with multiple participants.
 | //! Domain of rooms — chats with multiple participants.
 | ||||||
| use std::{ | use std::{collections::HashMap, hash::Hash, sync::Arc}; | ||||||
|     collections::HashMap, |  | ||||||
|     hash::Hash, |  | ||||||
|     sync::Arc, |  | ||||||
| }; |  | ||||||
| 
 | 
 | ||||||
| use prometheus::{IntGauge, Registry as MetricRegistry}; | use prometheus::{IntGauge, Registry as MetricRegistry}; | ||||||
| use serde::Serialize; | use serde::Serialize; | ||||||
| use tokio::sync::RwLock as AsyncRwLock; | use tokio::sync::RwLock as AsyncRwLock; | ||||||
| 
 | 
 | ||||||
| use crate::core::player::{PlayerHandle, PlayerId, Updates}; | use crate::player::{PlayerHandle, PlayerId, Updates}; | ||||||
| use crate::core::repo::Storage; |  | ||||||
| use crate::prelude::*; | use crate::prelude::*; | ||||||
|  | use crate::repo::Storage; | ||||||
| 
 | 
 | ||||||
| /// Opaque room id
 | /// Opaque room id
 | ||||||
| #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] | #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] | ||||||
|  | @ -20,9 +16,7 @@ impl RoomId { | ||||||
|     pub fn from(str: impl Into<Str>) -> Result<RoomId> { |     pub fn from(str: impl Into<Str>) -> Result<RoomId> { | ||||||
|         let bytes = str.into(); |         let bytes = str.into(); | ||||||
|         if bytes.len() > 32 { |         if bytes.len() > 32 { | ||||||
|             return Err(anyhow::Error::msg( |             return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); | ||||||
|                 "Room name cannot be longer than 32 symbols", |  | ||||||
|             )); |  | ||||||
|         } |         } | ||||||
|         if bytes.contains(' ') { |         if bytes.contains(' ') { | ||||||
|             return Err(anyhow::Error::msg("Room name cannot contain spaces")); |             return Err(anyhow::Error::msg("Room name cannot contain spaces")); | ||||||
|  | @ -42,8 +36,7 @@ impl RoomId { | ||||||
| pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>); | pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>); | ||||||
| impl RoomRegistry { | impl RoomRegistry { | ||||||
|     pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> { |     pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> { | ||||||
|         let metric_active_rooms = |         let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; | ||||||
|             IntGauge::new("chat_rooms_active", "Number of alive room actors")?; |  | ||||||
|         metrics.register(Box::new(metric_active_rooms.clone()))?; |         metrics.register(Box::new(metric_active_rooms.clone()))?; | ||||||
|         let inner = RoomRegistryInner { |         let inner = RoomRegistryInner { | ||||||
|             rooms: HashMap::new(), |             rooms: HashMap::new(), | ||||||
|  | @ -140,7 +133,7 @@ impl RoomHandle { | ||||||
| 
 | 
 | ||||||
|     pub async fn send_message(&self, player_id: PlayerId, body: Str) { |     pub async fn send_message(&self, player_id: PlayerId, body: Str) { | ||||||
|         let mut lock = self.0.write().await; |         let mut lock = self.0.write().await; | ||||||
|         let res =  lock.send_message(player_id, body).await; |         let res = lock.send_message(player_id, body).await; | ||||||
|         if let Err(err) = res { |         if let Err(err) = res { | ||||||
|             log::warn!("Failed to send message: {err:?}"); |             log::warn!("Failed to send message: {err:?}"); | ||||||
|         } |         } | ||||||
|  | @ -150,11 +143,7 @@ impl RoomHandle { | ||||||
|         let lock = self.0.read().await; |         let lock = self.0.read().await; | ||||||
|         RoomInfo { |         RoomInfo { | ||||||
|             id: lock.room_id.clone(), |             id: lock.room_id.clone(), | ||||||
|             members: lock |             members: lock.subscriptions.keys().map(|x| x.clone()).collect::<Vec<_>>(), | ||||||
|                 .subscriptions |  | ||||||
|                 .keys() |  | ||||||
|                 .map(|x| x.clone()) |  | ||||||
|                 .collect::<Vec<_>>(), |  | ||||||
|             topic: lock.topic.clone(), |             topic: lock.topic.clone(), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  | @ -191,7 +180,9 @@ impl Room { | ||||||
| 
 | 
 | ||||||
|     async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> { |     async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> { | ||||||
|         tracing::info!("Adding a message to room"); |         tracing::info!("Adding a message to room"); | ||||||
|         self.storage.insert_message(self.storage_id, self.message_count, &body).await?; |         self.storage | ||||||
|  |             .insert_message(self.storage_id, self.message_count, &body) | ||||||
|  |             .await?; | ||||||
|         self.message_count += 1; |         self.message_count += 1; | ||||||
|         let update = Updates::NewMessage { |         let update = Updates::NewMessage { | ||||||
|             room_id: self.room_id.clone(), |             room_id: self.room_id.clone(), | ||||||
|  | @ -0,0 +1,28 @@ | ||||||
|  | use crate::prelude::*; | ||||||
|  | 
 | ||||||
|  | pub struct Terminator { | ||||||
|  |     signal: Promise<()>, | ||||||
|  |     completion: JoinHandle<Result<()>>, | ||||||
|  | } | ||||||
|  | impl Terminator { | ||||||
|  |     pub async fn terminate(self) -> Result<()> { | ||||||
|  |         match self.signal.send(()) { | ||||||
|  |             Ok(()) => {} | ||||||
|  |             Err(_) => log::warn!("Termination channel is dropped"), | ||||||
|  |         } | ||||||
|  |         self.completion.await??; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Used to spawn managed tasks with support for graceful shutdown
 | ||||||
|  |     pub fn spawn<Fun, Fut>(launcher: Fun) -> Terminator | ||||||
|  |     where | ||||||
|  |         Fun: FnOnce(Deferred<()>) -> Fut, | ||||||
|  |         Fut: Future<Output = Result<()>> + Send + 'static, | ||||||
|  |     { | ||||||
|  |         let (signal, rx) = oneshot(); | ||||||
|  |         let future = launcher(rx); | ||||||
|  |         let completion = tokio::task::spawn(future); | ||||||
|  |         Terminator { signal, completion } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | @ -0,0 +1,15 @@ | ||||||
|  | [package] | ||||||
|  | name = "projection-irc" | ||||||
|  | edition = "2021" | ||||||
|  | version.workspace = true | ||||||
|  | 
 | ||||||
|  | [dependencies] | ||||||
|  | lavina-core.workspace = true | ||||||
|  | tracing.workspace = true | ||||||
|  | anyhow.workspace = true | ||||||
|  | nonempty.workspace = true | ||||||
|  | serde.workspace = true | ||||||
|  | tokio.workspace = true | ||||||
|  | prometheus.workspace = true | ||||||
|  | proto-irc = { path = "../proto-irc" } | ||||||
|  | futures-util.workspace = true | ||||||
|  | @ -1,26 +1,26 @@ | ||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
| use std::net::SocketAddr; | use std::net::SocketAddr; | ||||||
| 
 | 
 | ||||||
|  | use anyhow::{anyhow, Result}; | ||||||
| use futures_util::future::join_all; | use futures_util::future::join_all; | ||||||
| use nonempty::nonempty; | use nonempty::nonempty; | ||||||
| use nonempty::NonEmpty; | use nonempty::NonEmpty; | ||||||
| use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; | use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; | ||||||
| 
 |  | ||||||
| use serde::Deserialize; | use serde::Deserialize; | ||||||
| use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; | use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; | ||||||
| use tokio::net::tcp::{ReadHalf, WriteHalf}; | use tokio::net::tcp::{ReadHalf, WriteHalf}; | ||||||
| use tokio::net::{TcpListener, TcpStream}; | use tokio::net::{TcpListener, TcpStream}; | ||||||
| use tokio::sync::mpsc::channel; | use tokio::sync::mpsc::channel; | ||||||
| 
 | 
 | ||||||
| use crate::core::player::*; | use lavina_core::player::*; | ||||||
| use crate::core::repo::Storage; | use lavina_core::prelude::*; | ||||||
| use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; | use lavina_core::repo::Storage; | ||||||
| use crate::prelude::*; | use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; | ||||||
|  | use lavina_core::terminator::Terminator; | ||||||
| use proto_irc::client::{client_message, ClientMessage}; | use proto_irc::client::{client_message, ClientMessage}; | ||||||
| use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; | use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; | ||||||
|  | use proto_irc::user::PrefixedNick; | ||||||
| use proto_irc::{Chan, Recipient}; | use proto_irc::{Chan, Recipient}; | ||||||
| use proto_irc::user::{Prefix, PrefixedNick}; |  | ||||||
| use crate::util::Terminator; |  | ||||||
| 
 | 
 | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| mod test; | mod test; | ||||||
|  | @ -70,14 +70,12 @@ async fn handle_socket( | ||||||
|     .await?; |     .await?; | ||||||
|     writer.flush().await?; |     writer.flush().await?; | ||||||
| 
 | 
 | ||||||
|     let registered_user: Result<RegisteredUser> = |     let registered_user: Result<RegisteredUser> = handle_registration(&mut reader, &mut writer, &mut storage).await; | ||||||
|         handle_registration(&mut reader, &mut writer, &mut storage).await; |  | ||||||
| 
 | 
 | ||||||
|     match registered_user { |     match registered_user { | ||||||
|         Ok(user) => { |         Ok(user) => { | ||||||
|             log::debug!("User registered"); |             log::debug!("User registered"); | ||||||
|             handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user) |             handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?; | ||||||
|                 .await?; |  | ||||||
|         } |         } | ||||||
|         Err(_) => { |         Err(_) => { | ||||||
|             log::debug!("Registration failed"); |             log::debug!("Registration failed"); | ||||||
|  | @ -159,23 +157,22 @@ async fn handle_registration<'a>( | ||||||
|         buffer.clear(); |         buffer.clear(); | ||||||
|     }?; |     }?; | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
|     let stored_user = storage.retrieve_user_by_name(&*user.nickname).await?; |     let stored_user = storage.retrieve_user_by_name(&*user.nickname).await?; | ||||||
| 
 | 
 | ||||||
|     let stored_user = match stored_user { |     let stored_user = match stored_user { | ||||||
|         Some(u) => u, |         Some(u) => u, | ||||||
|         None => { |         None => { | ||||||
|             log::info!("User '{}' not found", user.nickname); |             log::info!("User '{}' not found", user.nickname); | ||||||
|             return Err(fail("no user found")); |             return Err(anyhow!("no user found")); | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
|     if stored_user.password.is_none() { |     if stored_user.password.is_none() { | ||||||
|         log::info!("Password not defined for user '{}'", user.nickname); |         log::info!("Password not defined for user '{}'", user.nickname); | ||||||
|         return Err(fail("password is not defined")); |         return Err(anyhow!("password is not defined")); | ||||||
|     } |     } | ||||||
|     if stored_user.password.as_deref() != pass.as_deref() { |     if stored_user.password.as_deref() != pass.as_deref() { | ||||||
|         log::info!("Incorrect password supplied for user '{}'", user.nickname); |         log::info!("Incorrect password supplied for user '{}'", user.nickname); | ||||||
|         return Err(fail("passwords do not match")); |         return Err(anyhow!("passwords do not match")); | ||||||
|     } |     } | ||||||
|     // TODO properly implement session temination
 |     // TODO properly implement session temination
 | ||||||
| 
 | 
 | ||||||
|  | @ -250,14 +247,7 @@ async fn handle_registered_socket<'a>( | ||||||
| 
 | 
 | ||||||
|     let rooms_list = connection.get_rooms().await?; |     let rooms_list = connection.get_rooms().await?; | ||||||
|     for room in &rooms_list { |     for room in &rooms_list { | ||||||
|         produce_on_join_cmd_messages( |         produce_on_join_cmd_messages(&config, &user, &Chan::Global(room.id.as_inner().clone()), room, writer).await?; | ||||||
|             &config, |  | ||||||
|             &user, |  | ||||||
|             &Chan::Global(room.id.as_inner().clone()), |  | ||||||
|             room, |  | ||||||
|             writer, |  | ||||||
|         ) |  | ||||||
|         .await?; |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     writer.flush().await?; |     writer.flush().await?; | ||||||
|  | @ -314,10 +304,7 @@ async fn handle_update( | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); |     log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); | ||||||
|     match update { |     match update { | ||||||
|         Updates::RoomJoined { |         Updates::RoomJoined { new_member_id, room_id } => { | ||||||
|             new_member_id, |  | ||||||
|             room_id, |  | ||||||
|         } => { |  | ||||||
|             if player_id == &new_member_id { |             if player_id == &new_member_id { | ||||||
|                 if let Some(room) = rooms.get_room(&room_id).await { |                 if let Some(room) = rooms.get_room(&room_id).await { | ||||||
|                     let room_info = room.get_room_info().await; |                     let room_info = room.get_room_info().await; | ||||||
|  | @ -438,16 +425,14 @@ async fn handle_incoming_message( | ||||||
|                 Recipient::Chan(Chan::Global(chan)) => { |                 Recipient::Chan(Chan::Global(chan)) => { | ||||||
|                     let room_id = RoomId::from(chan)?; |                     let room_id = RoomId::from(chan)?; | ||||||
|                     user_handle.send_message(room_id, body).await?; |                     user_handle.send_message(room_id, body).await?; | ||||||
|                 }, |                 } | ||||||
|                 _ => log::warn!("Unsupported target type"), |                 _ => log::warn!("Unsupported target type"), | ||||||
|             }, |             }, | ||||||
|             ClientMessage::Topic { chan, topic } => { |             ClientMessage::Topic { chan, topic } => { | ||||||
|                 match chan { |                 match chan { | ||||||
|                     Chan::Global(chan) => { |                     Chan::Global(chan) => { | ||||||
|                         let room_id = RoomId::from(chan)?; |                         let room_id = RoomId::from(chan)?; | ||||||
|                         user_handle |                         user_handle.change_topic(room_id.clone(), topic.clone()).await?; | ||||||
|                             .change_topic(room_id.clone(), topic.clone()) |  | ||||||
|                             .await?; |  | ||||||
|                         ServerMessage { |                         ServerMessage { | ||||||
|                             tags: vec![], |                             tags: vec![], | ||||||
|                             sender: Some(config.server_name.clone()), |                             sender: Some(config.server_name.clone()), | ||||||
|  | @ -569,11 +554,7 @@ async fn handle_incoming_message( | ||||||
|     Ok(HandleResult::Continue) |     Ok(HandleResult::Continue) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| fn user_to_who_msg( | fn user_to_who_msg(config: &ServerConfig, requestor: &RegisteredUser, target_user_nickname: &Str) -> ServerMessageBody { | ||||||
|     config: &ServerConfig, |  | ||||||
|     requestor: &RegisteredUser, |  | ||||||
|     target_user_nickname: &Str, |  | ||||||
| ) -> ServerMessageBody { |  | ||||||
|     // Username is equal to nickname
 |     // Username is equal to nickname
 | ||||||
|     let username = format!("~{target_user_nickname}").into(); |     let username = format!("~{target_user_nickname}").into(); | ||||||
| 
 | 
 | ||||||
|  | @ -674,8 +655,13 @@ async fn produce_on_join_cmd_messages( | ||||||
|     } |     } | ||||||
|     .write_async(writer) |     .write_async(writer) | ||||||
|     .await?; |     .await?; | ||||||
|     let prefixed_members: Vec<PrefixedNick> = room_info.members.iter().map(|member| PrefixedNick::from_str(member.clone().into_inner())).collect(); |     let prefixed_members: Vec<PrefixedNick> = room_info | ||||||
|     let non_empty_members: NonEmpty<PrefixedNick> = NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); |         .members | ||||||
|  |         .iter() | ||||||
|  |         .map(|member| PrefixedNick::from_str(member.clone().into_inner())) | ||||||
|  |         .collect(); | ||||||
|  |     let non_empty_members: NonEmpty<PrefixedNick> = | ||||||
|  |         NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); | ||||||
| 
 | 
 | ||||||
|     ServerMessage { |     ServerMessage { | ||||||
|         tags: vec![], |         tags: vec![], | ||||||
|  | @ -710,12 +696,8 @@ pub async fn launch( | ||||||
| ) -> Result<Terminator> { | ) -> Result<Terminator> { | ||||||
|     log::info!("Starting IRC projection"); |     log::info!("Starting IRC projection"); | ||||||
|     let (stopped_tx, mut stopped_rx) = channel(32); |     let (stopped_tx, mut stopped_rx) = channel(32); | ||||||
|     let current_connections = |     let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; | ||||||
|         IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; |     let total_connections = IntCounter::new("irc_total_connections", "Total number of opened connections")?; | ||||||
|     let total_connections = IntCounter::new( |  | ||||||
|         "irc_total_connections", |  | ||||||
|         "Total number of opened connections", |  | ||||||
|     )?; |  | ||||||
|     metrics.register(Box::new(current_connections.clone()))?; |     metrics.register(Box::new(current_connections.clone()))?; | ||||||
|     metrics.register(Box::new(total_connections.clone()))?; |     metrics.register(Box::new(total_connections.clone()))?; | ||||||
| 
 | 
 | ||||||
|  | @ -780,7 +762,8 @@ pub async fn launch( | ||||||
|                     log::warn!("IRC connection to {socket_addr} finished with error: {err}") |                     log::warn!("IRC connection to {socket_addr} finished with error: {err}") | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         })).await; |         })) | ||||||
|  |         .await; | ||||||
|         log::info!("Stopped IRC projection"); |         log::info!("Stopped IRC projection"); | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     }); |     }); | ||||||
|  | @ -2,7 +2,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; | ||||||
| use tokio::net::tcp::{ReadHalf, WriteHalf}; | use tokio::net::tcp::{ReadHalf, WriteHalf}; | ||||||
| use tokio::net::TcpStream; | use tokio::net::TcpStream; | ||||||
| 
 | 
 | ||||||
| use crate::prelude::*; | use lavina_core::prelude::*; | ||||||
| 
 | 
 | ||||||
| struct TestScope<'a> { | struct TestScope<'a> { | ||||||
|     reader: BufReader<ReadHalf<'a>>, |     reader: BufReader<ReadHalf<'a>>, | ||||||
|  | @ -35,11 +35,7 @@ async fn init_client(stream: &mut TcpStream) -> Result<TestScope> { | ||||||
|     let (reader, writer) = stream.split(); |     let (reader, writer) = stream.split(); | ||||||
|     let reader = BufReader::new(reader); |     let reader = BufReader::new(reader); | ||||||
|     let buffer = vec![]; |     let buffer = vec![]; | ||||||
|     Ok(TestScope { |     Ok(TestScope { reader, writer, buffer }) | ||||||
|         reader, |  | ||||||
|         writer, |  | ||||||
|         buffer, |  | ||||||
|     }) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| macro_rules! send { | macro_rules! send { | ||||||
							
								
								
									
										16
									
								
								src/main.rs
								
								
								
								
							
							
						
						
									
										16
									
								
								src/main.rs
								
								
								
								
							|  | @ -5,8 +5,6 @@ | ||||||
|     impl_trait_in_assoc_type |     impl_trait_in_assoc_type | ||||||
| )] | )] | ||||||
| 
 | 
 | ||||||
| mod core; |  | ||||||
| mod prelude; |  | ||||||
| mod projections; | mod projections; | ||||||
| mod util; | mod util; | ||||||
| 
 | 
 | ||||||
|  | @ -19,17 +17,17 @@ use figment::{providers::Toml, Figment}; | ||||||
| use prometheus::Registry as MetricsRegistry; | use prometheus::Registry as MetricsRegistry; | ||||||
| use serde::Deserialize; | use serde::Deserialize; | ||||||
| 
 | 
 | ||||||
| use crate::core::player::PlayerRegistry; | use lavina_core::player::PlayerRegistry; | ||||||
| use crate::core::repo::Storage; | use lavina_core::prelude::*; | ||||||
| use crate::core::room::RoomRegistry; | use lavina_core::repo::Storage; | ||||||
| use crate::prelude::*; | use lavina_core::room::RoomRegistry; | ||||||
| 
 | 
 | ||||||
| #[derive(Deserialize, Debug)] | #[derive(Deserialize, Debug)] | ||||||
| struct ServerConfig { | struct ServerConfig { | ||||||
|     telemetry: util::telemetry::ServerConfig, |     telemetry: util::telemetry::ServerConfig, | ||||||
|     irc: projections::irc::ServerConfig, |     irc: projection_irc::ServerConfig, | ||||||
|     xmpp: projections::xmpp::ServerConfig, |     xmpp: projections::xmpp::ServerConfig, | ||||||
|     storage: core::repo::StorageConfig, |     storage: lavina_core::repo::StorageConfig, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Parser)] | #[derive(Parser)] | ||||||
|  | @ -65,7 +63,7 @@ async fn main() -> Result<()> { | ||||||
|     let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; |     let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; | ||||||
|     let telemetry_terminator = |     let telemetry_terminator = | ||||||
|         util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; |         util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; | ||||||
|     let irc = projections::irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; |     let irc = projection_irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; | ||||||
|     let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; |     let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; | ||||||
|     tracing::info!("Started"); |     tracing::info!("Started"); | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -1,3 +1,2 @@ | ||||||
| //! Protocol projections — implementations of public APIs.
 | //! Protocol projections — implementations of public APIs.
 | ||||||
| pub mod irc; |  | ||||||
| pub mod xmpp; | pub mod xmpp; | ||||||
|  |  | ||||||
|  | @ -19,9 +19,10 @@ use tokio::sync::mpsc::channel; | ||||||
| use tokio_rustls::rustls::{Certificate, PrivateKey}; | use tokio_rustls::rustls::{Certificate, PrivateKey}; | ||||||
| use tokio_rustls::TlsAcceptor; | use tokio_rustls::TlsAcceptor; | ||||||
| 
 | 
 | ||||||
| use crate::core::player::{PlayerConnection, PlayerId, PlayerRegistry}; | use lavina_core::player::{PlayerConnection, PlayerId, PlayerRegistry}; | ||||||
| use crate::core::room::{RoomId, RoomRegistry}; | use lavina_core::prelude::*; | ||||||
| use crate::prelude::*; | use lavina_core::room::{RoomId, RoomRegistry}; | ||||||
|  | use lavina_core::terminator::Terminator; | ||||||
| use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; | use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; | ||||||
| use proto_xmpp::client::{Iq, Message, MessageType, Presence}; | use proto_xmpp::client::{Iq, Message, MessageType, Presence}; | ||||||
| use proto_xmpp::disco::*; | use proto_xmpp::disco::*; | ||||||
|  | @ -29,7 +30,6 @@ use proto_xmpp::roster::RosterQuery; | ||||||
| use proto_xmpp::session::Session; | use proto_xmpp::session::Session; | ||||||
| use proto_xmpp::stream::*; | use proto_xmpp::stream::*; | ||||||
| use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; | use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; | ||||||
| use crate::util::Terminator; |  | ||||||
| 
 | 
 | ||||||
| use self::proto::{ClientPacket, IqClientBody}; | use self::proto::{ClientPacket, IqClientBody}; | ||||||
| 
 | 
 | ||||||
|  | @ -332,7 +332,7 @@ async fn socket_final( | ||||||
|             update = user_handle.receiver.recv() => { |             update = user_handle.receiver.recv() => { | ||||||
|                 if let Some(update) = update { |                 if let Some(update) = update { | ||||||
|                     match update { |                     match update { | ||||||
|                         crate::core::player::Updates::NewMessage { room_id, author_id, body } => { |                         lavina_core::player::Updates::NewMessage { room_id, author_id, body } => { | ||||||
|                             Message { |                             Message { | ||||||
|                                 to: Some(Jid { |                                 to: Some(Jid { | ||||||
|                                     name: Some(authenticated.xmpp_name.clone()), |                                     name: Some(authenticated.xmpp_name.clone()), | ||||||
|  |  | ||||||
|  | @ -1,7 +1,9 @@ | ||||||
|  | use anyhow::anyhow; | ||||||
| use derive_more::From; | use derive_more::From; | ||||||
| use quick_xml::events::Event; | use quick_xml::events::Event; | ||||||
| use quick_xml::name::{Namespace, ResolveResult}; | use quick_xml::name::{Namespace, ResolveResult}; | ||||||
| 
 | 
 | ||||||
|  | use lavina_core::prelude::*; | ||||||
| use proto_xmpp::bind::BindRequest; | use proto_xmpp::bind::BindRequest; | ||||||
| use proto_xmpp::client::{Iq, Message, Presence}; | use proto_xmpp::client::{Iq, Message, Presence}; | ||||||
| use proto_xmpp::disco::{InfoQuery, ItemQuery}; | use proto_xmpp::disco::{InfoQuery, ItemQuery}; | ||||||
|  | @ -9,8 +11,6 @@ use proto_xmpp::roster::RosterQuery; | ||||||
| use proto_xmpp::session::Session; | use proto_xmpp::session::Session; | ||||||
| use proto_xmpp::xml::*; | use proto_xmpp::xml::*; | ||||||
| 
 | 
 | ||||||
| use crate::prelude::*; |  | ||||||
| 
 |  | ||||||
| #[derive(PartialEq, Eq, Debug, From)] | #[derive(PartialEq, Eq, Debug, From)] | ||||||
| pub enum IqClientBody { | pub enum IqClientBody { | ||||||
|     Bind(BindRequest), |     Bind(BindRequest), | ||||||
|  | @ -29,7 +29,7 @@ impl FromXml for IqClientBody { | ||||||
|             let bytes = match event { |             let bytes = match event { | ||||||
|                 Event::Start(bytes) => bytes, |                 Event::Start(bytes) => bytes, | ||||||
|                 Event::Empty(bytes) => bytes, |                 Event::Empty(bytes) => bytes, | ||||||
|                 _ => return Err(ffail!("Unexpected XML event: {event:?}")), |                 _ => return Err(anyhow!("Unexpected XML event: {event:?}")), | ||||||
|             }; |             }; | ||||||
|             let name = bytes.name(); |             let name = bytes.name(); | ||||||
|             match_parser!(name, namespace, event; |             match_parser!(name, namespace, event; | ||||||
|  | @ -67,7 +67,7 @@ impl FromXml for ClientPacket { | ||||||
|                         Presence::<Ignore>, |                         Presence::<Ignore>, | ||||||
|                         Message, |                         Message, | ||||||
|                         { |                         { | ||||||
|                             Err(ffail!( |                             Err(anyhow!( | ||||||
|                                 "Unexpected XML event of name {:?} in namespace {:?}", |                                 "Unexpected XML event of name {:?} in namespace {:?}", | ||||||
|                                 name, |                                 name, | ||||||
|                                 namespace |                                 namespace | ||||||
|  | @ -80,11 +80,11 @@ impl FromXml for ClientPacket { | ||||||
|                     if name.local_name().as_ref() == b"stream" { |                     if name.local_name().as_ref() == b"stream" { | ||||||
|                         return Ok(ClientPacket::StreamEnd); |                         return Ok(ClientPacket::StreamEnd); | ||||||
|                     } else { |                     } else { | ||||||
|                         return Err(ffail!("Unexpected XML event: {event:?}")); |                         return Err(anyhow!("Unexpected XML event: {event:?}")); | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|                 _ => { |                 _ => { | ||||||
|                     return Err(ffail!("Unexpected XML event: {event:?}")); |                     return Err(anyhow!("Unexpected XML event: {event:?}")); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  | @ -1,33 +1,3 @@ | ||||||
| use crate::prelude::*; |  | ||||||
| 
 |  | ||||||
| pub mod table; |  | ||||||
| pub mod telemetry; | pub mod telemetry; | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| pub mod testkit; | pub mod testkit; | ||||||
| 
 |  | ||||||
| pub struct Terminator { |  | ||||||
|     signal: Promise<()>, |  | ||||||
|     completion: JoinHandle<Result<()>>, |  | ||||||
| } |  | ||||||
| impl Terminator { |  | ||||||
|     pub async fn terminate(self) -> Result<()> { |  | ||||||
|         match self.signal.send(()) { |  | ||||||
|             Ok(()) => {} |  | ||||||
|             Err(_) => log::warn!("Termination channel is dropped"), |  | ||||||
|         } |  | ||||||
|         self.completion.await??; |  | ||||||
|         Ok(()) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /// Used to spawn managed tasks with support for graceful shutdown
 |  | ||||||
|     pub fn spawn<Fun, Fut>(launcher: Fun) -> Terminator |  | ||||||
|     where |  | ||||||
|         Fun: FnOnce(Deferred<()>) -> Fut, |  | ||||||
|         Fut: Future<Output = Result<()>> + Send + 'static, |  | ||||||
|     { |  | ||||||
|         let (signal, rx) = oneshot(); |  | ||||||
|         let future = launcher(rx); |  | ||||||
|         let completion = tokio::task::spawn(future); |  | ||||||
|         Terminator { signal, completion } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  | @ -11,10 +11,10 @@ use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
| use tokio::net::TcpListener; | use tokio::net::TcpListener; | ||||||
| 
 | 
 | ||||||
| use crate::core::repo::Storage; | use lavina_core::prelude::*; | ||||||
| use crate::core::room::RoomRegistry; | use lavina_core::repo::Storage; | ||||||
| use crate::prelude::*; | use lavina_core::room::RoomRegistry; | ||||||
| use crate::util::Terminator; | use lavina_core::terminator::Terminator; | ||||||
| 
 | 
 | ||||||
| use mgmt_api::*; | use mgmt_api::*; | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -2,7 +2,7 @@ use std::task::{Context, Poll}; | ||||||
| 
 | 
 | ||||||
| use futures_util::task::noop_waker_ref; | use futures_util::task::noop_waker_ref; | ||||||
| 
 | 
 | ||||||
| use crate::prelude::*; | use lavina_core::prelude::*; | ||||||
| 
 | 
 | ||||||
| pub fn sync_future<T>(future: impl Future<Output = T>) -> Result<T> { | pub fn sync_future<T>(future: impl Future<Output = T>) -> Result<T> { | ||||||
|     let waker = noop_waker_ref(); |     let waker = noop_waker_ref(); | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue