forked from lavina/lavina
				
			Compare commits
	
		
			2 Commits
		
	
	
		
			1b3551f108
			...
			9ca81b3a2b
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 9ca81b3a2b | |
|  | a1db17c779 | 
|  | @ -854,10 +854,11 @@ dependencies = [ | |||
|  "futures-util", | ||||
|  "http-body-util", | ||||
|  "hyper 1.0.0-rc.3", | ||||
|  "lavina-core", | ||||
|  "mgmt-api", | ||||
|  "nonempty", | ||||
|  "projection-irc", | ||||
|  "prometheus", | ||||
|  "proto-irc", | ||||
|  "proto-xmpp", | ||||
|  "quick-xml", | ||||
|  "regex", | ||||
|  | @ -865,7 +866,6 @@ dependencies = [ | |||
|  "rustls-pemfile", | ||||
|  "serde", | ||||
|  "serde_json", | ||||
|  "sqlx", | ||||
|  "tokio", | ||||
|  "tokio-rustls", | ||||
|  "tracing", | ||||
|  | @ -873,6 +873,18 @@ dependencies = [ | |||
|  "uuid", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "lavina-core" | ||||
| version = "0.0.1-dev" | ||||
| dependencies = [ | ||||
|  "anyhow", | ||||
|  "prometheus", | ||||
|  "serde", | ||||
|  "sqlx", | ||||
|  "tokio", | ||||
|  "tracing", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "lazy_static" | ||||
| version = "1.4.0" | ||||
|  | @ -1240,6 +1252,21 @@ dependencies = [ | |||
|  "yansi", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "projection-irc" | ||||
| version = "0.0.1-dev" | ||||
| dependencies = [ | ||||
|  "anyhow", | ||||
|  "futures-util", | ||||
|  "lavina-core", | ||||
|  "nonempty", | ||||
|  "prometheus", | ||||
|  "proto-irc", | ||||
|  "serde", | ||||
|  "tokio", | ||||
|  "tracing", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "prometheus" | ||||
| version = "0.13.3" | ||||
|  |  | |||
							
								
								
									
										13
									
								
								Cargo.toml
								
								
								
								
							
							
						
						
									
										13
									
								
								Cargo.toml
								
								
								
								
							|  | @ -1,7 +1,9 @@ | |||
| [workspace] | ||||
| members = [ | ||||
|     ".", | ||||
|     "crates/lavina-core", | ||||
|     "crates/proto-irc", | ||||
|     "crates/projection-irc", | ||||
|     "crates/proto-xmpp", | ||||
|     "crates/mgmt-api", | ||||
| ] | ||||
|  | @ -22,6 +24,9 @@ regex = "1.7.1" | |||
| derive_more = "0.99.17" | ||||
| clap = { version = "4.4.4", features = ["derive"] } | ||||
| serde = { version = "1.0.152", features = ["rc", "serde_derive"] } | ||||
| tracing = "0.1.37" # logging & tracing api | ||||
| prometheus = { version = "0.13.3", default-features = false } | ||||
| lavina-core = { path = "crates/lavina-core" } | ||||
| 
 | ||||
| [package] | ||||
| name = "lavina" | ||||
|  | @ -37,18 +42,18 @@ http-body-util = "0.1.0-rc.3" | |||
| serde.workspace = true | ||||
| serde_json = "1.0.93" | ||||
| tokio.workspace = true | ||||
| tracing = "0.1.37" # logging & tracing api | ||||
| tracing.workspace = true | ||||
| tracing-subscriber = "0.3.16" | ||||
| futures-util.workspace = true | ||||
| prometheus = { version = "0.13.3", default-features = false } | ||||
| prometheus.workspace = true | ||||
| nonempty.workspace = true | ||||
| tokio-rustls = "0.24.1" | ||||
| rustls-pemfile = "1.0.2" | ||||
| quick-xml.workspace = true | ||||
| derive_more.workspace = true | ||||
| uuid = { version = "1.3.0", features = ["v4"] } | ||||
| sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } | ||||
| proto-irc = { path = "crates/proto-irc" } | ||||
| lavina-core.workspace = true | ||||
| projection-irc = { path = "crates/projection-irc" } | ||||
| proto-xmpp = { path = "crates/proto-xmpp" } | ||||
| mgmt-api = { path = "crates/mgmt-api" } | ||||
| clap.workspace = true | ||||
|  |  | |||
|  | @ -0,0 +1,12 @@ | |||
| [package] | ||||
| name = "lavina-core" | ||||
| edition = "2021" | ||||
| version.workspace = true | ||||
| 
 | ||||
| [dependencies] | ||||
| anyhow.workspace = true | ||||
| sqlx = { version = "0.7.0-alpha.2", features = ["sqlite", "migrate"] } | ||||
| serde.workspace = true | ||||
| tokio.workspace = true | ||||
| tracing.workspace = true | ||||
| prometheus.workspace = true | ||||
|  | @ -1,4 +1,8 @@ | |||
| //! Domain definitions and implementation of common chat logic.
 | ||||
| pub mod player; | ||||
| pub mod prelude; | ||||
| pub mod repo; | ||||
| pub mod room; | ||||
| pub mod terminator; | ||||
| 
 | ||||
| mod table; | ||||
|  | @ -19,11 +19,9 @@ use tokio::{ | |||
|     task::JoinHandle, | ||||
| }; | ||||
| 
 | ||||
| use crate::{ | ||||
|     core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}, | ||||
|     prelude::*, | ||||
|     util::table::{AnonTable, Key as AnonKey}, | ||||
| }; | ||||
| use crate::prelude::*; | ||||
| use crate::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}; | ||||
| use crate::table::{AnonTable, Key as AnonKey}; | ||||
| 
 | ||||
| /// Opaque player identifier. Cannot contain spaces, must be shorter than 32.
 | ||||
| #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] | ||||
|  | @ -63,9 +61,7 @@ impl PlayerConnection { | |||
|     } | ||||
| 
 | ||||
|     pub async fn join_room(&mut self, room_id: RoomId) -> Result<JoinResult> { | ||||
|         self.player_handle | ||||
|             .join_room(room_id, self.connection_id.clone()) | ||||
|             .await | ||||
|         self.player_handle.join_room(room_id, self.connection_id.clone()).await | ||||
|     } | ||||
| 
 | ||||
|     pub async fn change_topic(&mut self, room_id: RoomId, new_topic: Str) -> Result<()> { | ||||
|  | @ -100,9 +96,7 @@ impl PlayerConnection { | |||
| 
 | ||||
|     pub async fn get_rooms(&self) -> Result<Vec<RoomInfo>> { | ||||
|         let (promise, deferred) = oneshot(); | ||||
|         self.player_handle | ||||
|             .send(PlayerCommand::GetRooms(promise)) | ||||
|             .await; | ||||
|         self.player_handle.send(PlayerCommand::GetRooms(promise)).await; | ||||
|         Ok(deferred.await?) | ||||
|     } | ||||
| } | ||||
|  | @ -126,27 +120,14 @@ impl PlayerHandle { | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub async fn send_message( | ||||
|         &self, | ||||
|         room_id: RoomId, | ||||
|         connection_id: ConnectionId, | ||||
|         body: Str, | ||||
|     ) -> Result<()> { | ||||
|     pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: Str) -> Result<()> { | ||||
|         let (promise, deferred) = oneshot(); | ||||
|         let cmd = Cmd::SendMessage { | ||||
|             room_id, | ||||
|             body, | ||||
|             promise, | ||||
|         }; | ||||
|         let cmd = Cmd::SendMessage { room_id, body, promise }; | ||||
|         let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; | ||||
|         Ok(deferred.await?) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn join_room( | ||||
|         &self, | ||||
|         room_id: RoomId, | ||||
|         connection_id: ConnectionId, | ||||
|     ) -> Result<JoinResult> { | ||||
|     pub async fn join_room(&self, room_id: RoomId, connection_id: ConnectionId) -> Result<JoinResult> { | ||||
|         let (promise, deferred) = oneshot(); | ||||
|         let cmd = Cmd::JoinRoom { room_id, promise }; | ||||
|         let _ = self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; | ||||
|  | @ -230,12 +211,8 @@ pub enum Updates { | |||
| #[derive(Clone)] | ||||
| pub struct PlayerRegistry(Arc<RwLock<PlayerRegistryInner>>); | ||||
| impl PlayerRegistry { | ||||
|     pub fn empty( | ||||
|         room_registry: RoomRegistry, | ||||
|         metrics: &mut MetricsRegistry, | ||||
|     ) -> Result<PlayerRegistry> { | ||||
|         let metric_active_players = | ||||
|             IntGauge::new("chat_players_active", "Number of alive player actors")?; | ||||
|     pub fn empty(room_registry: RoomRegistry, metrics: &mut MetricsRegistry) -> Result<PlayerRegistry> { | ||||
|         let metric_active_players = IntGauge::new("chat_players_active", "Number of alive player actors")?; | ||||
|         metrics.register(Box::new(metric_active_players.clone()))?; | ||||
|         let inner = PlayerRegistryInner { | ||||
|             room_registry, | ||||
|  | @ -375,8 +352,7 @@ impl Player { | |||
|                         return; | ||||
|                     } | ||||
|                 }; | ||||
|                 room.subscribe(self.player_id.clone(), self.handle.clone()) | ||||
|                     .await; | ||||
|                 room.subscribe(self.player_id.clone(), self.handle.clone()).await; | ||||
|                 self.my_rooms.insert(room_id.clone(), room.clone()); | ||||
|                 let room_info = room.get_room_info().await; | ||||
|                 let _ = promise.send(JoinResult::Success(room_info)); | ||||
|  | @ -399,15 +375,10 @@ impl Player { | |||
|                 }; | ||||
|                 self.broadcast_update(update, connection_id).await; | ||||
|             } | ||||
|             Cmd::SendMessage { | ||||
|                 room_id, | ||||
|                 body, | ||||
|                 promise, | ||||
|             } => { | ||||
|             Cmd::SendMessage { room_id, body, promise } => { | ||||
|                 let room = self.rooms.get_room(&room_id).await; | ||||
|                 if let Some(room) = room { | ||||
|                     room.send_message(self.player_id.clone(), body.clone()) | ||||
|                         .await; | ||||
|                     room.send_message(self.player_id.clone(), body.clone()).await; | ||||
|                 } else { | ||||
|                     tracing::info!("no room found"); | ||||
|                 } | ||||
|  | @ -426,8 +397,7 @@ impl Player { | |||
|             } => { | ||||
|                 let room = self.rooms.get_room(&room_id).await; | ||||
|                 if let Some(mut room) = room { | ||||
|                     room.set_topic(self.player_id.clone(), new_topic.clone()) | ||||
|                         .await; | ||||
|                     room.set_topic(self.player_id.clone(), new_topic.clone()).await; | ||||
|                 } else { | ||||
|                     tracing::info!("no room found"); | ||||
|                 } | ||||
|  | @ -15,11 +15,3 @@ pub type Str = std::sync::Arc<str>; | |||
| pub fn fail(msg: &str) -> anyhow::Error { | ||||
|     anyhow::Error::msg(msg.to_owned()) | ||||
| } | ||||
| 
 | ||||
| macro_rules! ffail { | ||||
|     ($($arg:tt)*) => { | ||||
|         fail(&format!($($arg)*)) | ||||
|     }; | ||||
| } | ||||
| 
 | ||||
| pub(crate) use ffail; | ||||
|  | @ -93,10 +93,10 @@ impl Storage { | |||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn close(mut self) -> Result<()> { | ||||
|     pub async fn close(self) -> Result<()> { | ||||
|         let res = match Arc::try_unwrap(self.conn) { | ||||
|             Ok(e) => e, | ||||
|             Err(e) => return Err(fail("failed to acquire DB ownership on shutdown")), | ||||
|             Err(_) => return Err(fail("failed to acquire DB ownership on shutdown")), | ||||
|         }; | ||||
|         let res = res.into_inner(); | ||||
|         res.close().await?; | ||||
|  | @ -1,17 +1,13 @@ | |||
| //! Domain of rooms — chats with multiple participants.
 | ||||
| use std::{ | ||||
|     collections::HashMap, | ||||
|     hash::Hash, | ||||
|     sync::Arc, | ||||
| }; | ||||
| use std::{collections::HashMap, hash::Hash, sync::Arc}; | ||||
| 
 | ||||
| use prometheus::{IntGauge, Registry as MetricRegistry}; | ||||
| use serde::Serialize; | ||||
| use tokio::sync::RwLock as AsyncRwLock; | ||||
| 
 | ||||
| use crate::core::player::{PlayerHandle, PlayerId, Updates}; | ||||
| use crate::core::repo::Storage; | ||||
| use crate::player::{PlayerHandle, PlayerId, Updates}; | ||||
| use crate::prelude::*; | ||||
| use crate::repo::Storage; | ||||
| 
 | ||||
| /// Opaque room id
 | ||||
| #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] | ||||
|  | @ -20,9 +16,7 @@ impl RoomId { | |||
|     pub fn from(str: impl Into<Str>) -> Result<RoomId> { | ||||
|         let bytes = str.into(); | ||||
|         if bytes.len() > 32 { | ||||
|             return Err(anyhow::Error::msg( | ||||
|                 "Room name cannot be longer than 32 symbols", | ||||
|             )); | ||||
|             return Err(anyhow::Error::msg("Room name cannot be longer than 32 symbols")); | ||||
|         } | ||||
|         if bytes.contains(' ') { | ||||
|             return Err(anyhow::Error::msg("Room name cannot contain spaces")); | ||||
|  | @ -42,8 +36,7 @@ impl RoomId { | |||
| pub struct RoomRegistry(Arc<AsyncRwLock<RoomRegistryInner>>); | ||||
| impl RoomRegistry { | ||||
|     pub fn new(metrics: &mut MetricRegistry, storage: Storage) -> Result<RoomRegistry> { | ||||
|         let metric_active_rooms = | ||||
|             IntGauge::new("chat_rooms_active", "Number of alive room actors")?; | ||||
|         let metric_active_rooms = IntGauge::new("chat_rooms_active", "Number of alive room actors")?; | ||||
|         metrics.register(Box::new(metric_active_rooms.clone()))?; | ||||
|         let inner = RoomRegistryInner { | ||||
|             rooms: HashMap::new(), | ||||
|  | @ -150,11 +143,7 @@ impl RoomHandle { | |||
|         let lock = self.0.read().await; | ||||
|         RoomInfo { | ||||
|             id: lock.room_id.clone(), | ||||
|             members: lock | ||||
|                 .subscriptions | ||||
|                 .keys() | ||||
|                 .map(|x| x.clone()) | ||||
|                 .collect::<Vec<_>>(), | ||||
|             members: lock.subscriptions.keys().map(|x| x.clone()).collect::<Vec<_>>(), | ||||
|             topic: lock.topic.clone(), | ||||
|         } | ||||
|     } | ||||
|  | @ -191,7 +180,9 @@ impl Room { | |||
| 
 | ||||
|     async fn send_message(&mut self, author_id: PlayerId, body: Str) -> Result<()> { | ||||
|         tracing::info!("Adding a message to room"); | ||||
|         self.storage.insert_message(self.storage_id, self.message_count, &body).await?; | ||||
|         self.storage | ||||
|             .insert_message(self.storage_id, self.message_count, &body) | ||||
|             .await?; | ||||
|         self.message_count += 1; | ||||
|         let update = Updates::NewMessage { | ||||
|             room_id: self.room_id.clone(), | ||||
|  | @ -0,0 +1,28 @@ | |||
| use crate::prelude::*; | ||||
| 
 | ||||
| pub struct Terminator { | ||||
|     signal: Promise<()>, | ||||
|     completion: JoinHandle<Result<()>>, | ||||
| } | ||||
| impl Terminator { | ||||
|     pub async fn terminate(self) -> Result<()> { | ||||
|         match self.signal.send(()) { | ||||
|             Ok(()) => {} | ||||
|             Err(_) => log::warn!("Termination channel is dropped"), | ||||
|         } | ||||
|         self.completion.await??; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Used to spawn managed tasks with support for graceful shutdown
 | ||||
|     pub fn spawn<Fun, Fut>(launcher: Fun) -> Terminator | ||||
|     where | ||||
|         Fun: FnOnce(Deferred<()>) -> Fut, | ||||
|         Fut: Future<Output = Result<()>> + Send + 'static, | ||||
|     { | ||||
|         let (signal, rx) = oneshot(); | ||||
|         let future = launcher(rx); | ||||
|         let completion = tokio::task::spawn(future); | ||||
|         Terminator { signal, completion } | ||||
|     } | ||||
| } | ||||
|  | @ -0,0 +1,15 @@ | |||
| [package] | ||||
| name = "projection-irc" | ||||
| edition = "2021" | ||||
| version.workspace = true | ||||
| 
 | ||||
| [dependencies] | ||||
| lavina-core.workspace = true | ||||
| tracing.workspace = true | ||||
| anyhow.workspace = true | ||||
| nonempty.workspace = true | ||||
| serde.workspace = true | ||||
| tokio.workspace = true | ||||
| prometheus.workspace = true | ||||
| proto-irc = { path = "../proto-irc" } | ||||
| futures-util.workspace = true | ||||
|  | @ -1,26 +1,26 @@ | |||
| use std::collections::HashMap; | ||||
| use std::net::SocketAddr; | ||||
| 
 | ||||
| use anyhow::{anyhow, Result}; | ||||
| use futures_util::future::join_all; | ||||
| use nonempty::nonempty; | ||||
| use nonempty::NonEmpty; | ||||
| use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; | ||||
| 
 | ||||
| use serde::Deserialize; | ||||
| use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; | ||||
| use tokio::net::tcp::{ReadHalf, WriteHalf}; | ||||
| use tokio::net::{TcpListener, TcpStream}; | ||||
| use tokio::sync::mpsc::channel; | ||||
| 
 | ||||
| use crate::core::player::*; | ||||
| use crate::core::repo::Storage; | ||||
| use crate::core::room::{RoomId, RoomInfo, RoomRegistry}; | ||||
| use crate::prelude::*; | ||||
| use lavina_core::player::*; | ||||
| use lavina_core::prelude::*; | ||||
| use lavina_core::repo::Storage; | ||||
| use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; | ||||
| use lavina_core::terminator::Terminator; | ||||
| use proto_irc::client::{client_message, ClientMessage}; | ||||
| use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; | ||||
| use proto_irc::user::PrefixedNick; | ||||
| use proto_irc::{Chan, Recipient}; | ||||
| use proto_irc::user::{Prefix, PrefixedNick}; | ||||
| use crate::util::Terminator; | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod test; | ||||
|  | @ -70,14 +70,12 @@ async fn handle_socket( | |||
|     .await?; | ||||
|     writer.flush().await?; | ||||
| 
 | ||||
|     let registered_user: Result<RegisteredUser> = | ||||
|         handle_registration(&mut reader, &mut writer, &mut storage).await; | ||||
|     let registered_user: Result<RegisteredUser> = handle_registration(&mut reader, &mut writer, &mut storage).await; | ||||
| 
 | ||||
|     match registered_user { | ||||
|         Ok(user) => { | ||||
|             log::debug!("User registered"); | ||||
|             handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user) | ||||
|                 .await?; | ||||
|             handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?; | ||||
|         } | ||||
|         Err(_) => { | ||||
|             log::debug!("Registration failed"); | ||||
|  | @ -159,23 +157,22 @@ async fn handle_registration<'a>( | |||
|         buffer.clear(); | ||||
|     }?; | ||||
| 
 | ||||
| 
 | ||||
|     let stored_user = storage.retrieve_user_by_name(&*user.nickname).await?; | ||||
| 
 | ||||
|     let stored_user = match stored_user { | ||||
|         Some(u) => u, | ||||
|         None => { | ||||
|             log::info!("User '{}' not found", user.nickname); | ||||
|             return Err(fail("no user found")); | ||||
|             return Err(anyhow!("no user found")); | ||||
|         } | ||||
|     }; | ||||
|     if stored_user.password.is_none() { | ||||
|         log::info!("Password not defined for user '{}'", user.nickname); | ||||
|         return Err(fail("password is not defined")); | ||||
|         return Err(anyhow!("password is not defined")); | ||||
|     } | ||||
|     if stored_user.password.as_deref() != pass.as_deref() { | ||||
|         log::info!("Incorrect password supplied for user '{}'", user.nickname); | ||||
|         return Err(fail("passwords do not match")); | ||||
|         return Err(anyhow!("passwords do not match")); | ||||
|     } | ||||
|     // TODO properly implement session temination
 | ||||
| 
 | ||||
|  | @ -250,14 +247,7 @@ async fn handle_registered_socket<'a>( | |||
| 
 | ||||
|     let rooms_list = connection.get_rooms().await?; | ||||
|     for room in &rooms_list { | ||||
|         produce_on_join_cmd_messages( | ||||
|             &config, | ||||
|             &user, | ||||
|             &Chan::Global(room.id.as_inner().clone()), | ||||
|             room, | ||||
|             writer, | ||||
|         ) | ||||
|         .await?; | ||||
|         produce_on_join_cmd_messages(&config, &user, &Chan::Global(room.id.as_inner().clone()), room, writer).await?; | ||||
|     } | ||||
| 
 | ||||
|     writer.flush().await?; | ||||
|  | @ -314,10 +304,7 @@ async fn handle_update( | |||
| ) -> Result<()> { | ||||
|     log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); | ||||
|     match update { | ||||
|         Updates::RoomJoined { | ||||
|             new_member_id, | ||||
|             room_id, | ||||
|         } => { | ||||
|         Updates::RoomJoined { new_member_id, room_id } => { | ||||
|             if player_id == &new_member_id { | ||||
|                 if let Some(room) = rooms.get_room(&room_id).await { | ||||
|                     let room_info = room.get_room_info().await; | ||||
|  | @ -438,16 +425,14 @@ async fn handle_incoming_message( | |||
|                 Recipient::Chan(Chan::Global(chan)) => { | ||||
|                     let room_id = RoomId::from(chan)?; | ||||
|                     user_handle.send_message(room_id, body).await?; | ||||
|                 }, | ||||
|                 } | ||||
|                 _ => log::warn!("Unsupported target type"), | ||||
|             }, | ||||
|             ClientMessage::Topic { chan, topic } => { | ||||
|                 match chan { | ||||
|                     Chan::Global(chan) => { | ||||
|                         let room_id = RoomId::from(chan)?; | ||||
|                         user_handle | ||||
|                             .change_topic(room_id.clone(), topic.clone()) | ||||
|                             .await?; | ||||
|                         user_handle.change_topic(room_id.clone(), topic.clone()).await?; | ||||
|                         ServerMessage { | ||||
|                             tags: vec![], | ||||
|                             sender: Some(config.server_name.clone()), | ||||
|  | @ -569,11 +554,7 @@ async fn handle_incoming_message( | |||
|     Ok(HandleResult::Continue) | ||||
| } | ||||
| 
 | ||||
| fn user_to_who_msg( | ||||
|     config: &ServerConfig, | ||||
|     requestor: &RegisteredUser, | ||||
|     target_user_nickname: &Str, | ||||
| ) -> ServerMessageBody { | ||||
| fn user_to_who_msg(config: &ServerConfig, requestor: &RegisteredUser, target_user_nickname: &Str) -> ServerMessageBody { | ||||
|     // Username is equal to nickname
 | ||||
|     let username = format!("~{target_user_nickname}").into(); | ||||
| 
 | ||||
|  | @ -674,8 +655,13 @@ async fn produce_on_join_cmd_messages( | |||
|     } | ||||
|     .write_async(writer) | ||||
|     .await?; | ||||
|     let prefixed_members: Vec<PrefixedNick> = room_info.members.iter().map(|member| PrefixedNick::from_str(member.clone().into_inner())).collect(); | ||||
|     let non_empty_members: NonEmpty<PrefixedNick> = NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); | ||||
|     let prefixed_members: Vec<PrefixedNick> = room_info | ||||
|         .members | ||||
|         .iter() | ||||
|         .map(|member| PrefixedNick::from_str(member.clone().into_inner())) | ||||
|         .collect(); | ||||
|     let non_empty_members: NonEmpty<PrefixedNick> = | ||||
|         NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); | ||||
| 
 | ||||
|     ServerMessage { | ||||
|         tags: vec![], | ||||
|  | @ -710,12 +696,8 @@ pub async fn launch( | |||
| ) -> Result<Terminator> { | ||||
|     log::info!("Starting IRC projection"); | ||||
|     let (stopped_tx, mut stopped_rx) = channel(32); | ||||
|     let current_connections = | ||||
|         IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; | ||||
|     let total_connections = IntCounter::new( | ||||
|         "irc_total_connections", | ||||
|         "Total number of opened connections", | ||||
|     )?; | ||||
|     let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; | ||||
|     let total_connections = IntCounter::new("irc_total_connections", "Total number of opened connections")?; | ||||
|     metrics.register(Box::new(current_connections.clone()))?; | ||||
|     metrics.register(Box::new(total_connections.clone()))?; | ||||
| 
 | ||||
|  | @ -780,7 +762,8 @@ pub async fn launch( | |||
|                     log::warn!("IRC connection to {socket_addr} finished with error: {err}") | ||||
|                 } | ||||
|             } | ||||
|         })).await; | ||||
|         })) | ||||
|         .await; | ||||
|         log::info!("Stopped IRC projection"); | ||||
|         Ok(()) | ||||
|     }); | ||||
|  | @ -2,7 +2,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; | |||
| use tokio::net::tcp::{ReadHalf, WriteHalf}; | ||||
| use tokio::net::TcpStream; | ||||
| 
 | ||||
| use crate::prelude::*; | ||||
| use lavina_core::prelude::*; | ||||
| 
 | ||||
| struct TestScope<'a> { | ||||
|     reader: BufReader<ReadHalf<'a>>, | ||||
|  | @ -35,11 +35,7 @@ async fn init_client(stream: &mut TcpStream) -> Result<TestScope> { | |||
|     let (reader, writer) = stream.split(); | ||||
|     let reader = BufReader::new(reader); | ||||
|     let buffer = vec![]; | ||||
|     Ok(TestScope { | ||||
|         reader, | ||||
|         writer, | ||||
|         buffer, | ||||
|     }) | ||||
|     Ok(TestScope { reader, writer, buffer }) | ||||
| } | ||||
| 
 | ||||
| macro_rules! send { | ||||
							
								
								
									
										16
									
								
								src/main.rs
								
								
								
								
							
							
						
						
									
										16
									
								
								src/main.rs
								
								
								
								
							|  | @ -5,8 +5,6 @@ | |||
|     impl_trait_in_assoc_type | ||||
| )] | ||||
| 
 | ||||
| mod core; | ||||
| mod prelude; | ||||
| mod projections; | ||||
| mod util; | ||||
| 
 | ||||
|  | @ -19,17 +17,17 @@ use figment::{providers::Toml, Figment}; | |||
| use prometheus::Registry as MetricsRegistry; | ||||
| use serde::Deserialize; | ||||
| 
 | ||||
| use crate::core::player::PlayerRegistry; | ||||
| use crate::core::repo::Storage; | ||||
| use crate::core::room::RoomRegistry; | ||||
| use crate::prelude::*; | ||||
| use lavina_core::player::PlayerRegistry; | ||||
| use lavina_core::prelude::*; | ||||
| use lavina_core::repo::Storage; | ||||
| use lavina_core::room::RoomRegistry; | ||||
| 
 | ||||
| #[derive(Deserialize, Debug)] | ||||
| struct ServerConfig { | ||||
|     telemetry: util::telemetry::ServerConfig, | ||||
|     irc: projections::irc::ServerConfig, | ||||
|     irc: projection_irc::ServerConfig, | ||||
|     xmpp: projections::xmpp::ServerConfig, | ||||
|     storage: core::repo::StorageConfig, | ||||
|     storage: lavina_core::repo::StorageConfig, | ||||
| } | ||||
| 
 | ||||
| #[derive(Parser)] | ||||
|  | @ -65,7 +63,7 @@ async fn main() -> Result<()> { | |||
|     let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; | ||||
|     let telemetry_terminator = | ||||
|         util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; | ||||
|     let irc = projections::irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; | ||||
|     let irc = projection_irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; | ||||
|     let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; | ||||
|     tracing::info!("Started"); | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,3 +1,2 @@ | |||
| //! Protocol projections — implementations of public APIs.
 | ||||
| pub mod irc; | ||||
| pub mod xmpp; | ||||
|  |  | |||
|  | @ -19,9 +19,10 @@ use tokio::sync::mpsc::channel; | |||
| use tokio_rustls::rustls::{Certificate, PrivateKey}; | ||||
| use tokio_rustls::TlsAcceptor; | ||||
| 
 | ||||
| use crate::core::player::{PlayerConnection, PlayerId, PlayerRegistry}; | ||||
| use crate::core::room::{RoomId, RoomRegistry}; | ||||
| use crate::prelude::*; | ||||
| use lavina_core::player::{PlayerConnection, PlayerId, PlayerRegistry}; | ||||
| use lavina_core::prelude::*; | ||||
| use lavina_core::room::{RoomId, RoomRegistry}; | ||||
| use lavina_core::terminator::Terminator; | ||||
| use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; | ||||
| use proto_xmpp::client::{Iq, Message, MessageType, Presence}; | ||||
| use proto_xmpp::disco::*; | ||||
|  | @ -29,7 +30,6 @@ use proto_xmpp::roster::RosterQuery; | |||
| use proto_xmpp::session::Session; | ||||
| use proto_xmpp::stream::*; | ||||
| use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; | ||||
| use crate::util::Terminator; | ||||
| 
 | ||||
| use self::proto::{ClientPacket, IqClientBody}; | ||||
| 
 | ||||
|  | @ -332,7 +332,7 @@ async fn socket_final( | |||
|             update = user_handle.receiver.recv() => { | ||||
|                 if let Some(update) = update { | ||||
|                     match update { | ||||
|                         crate::core::player::Updates::NewMessage { room_id, author_id, body } => { | ||||
|                         lavina_core::player::Updates::NewMessage { room_id, author_id, body } => { | ||||
|                             Message { | ||||
|                                 to: Some(Jid { | ||||
|                                     name: Some(authenticated.xmpp_name.clone()), | ||||
|  |  | |||
|  | @ -1,7 +1,9 @@ | |||
| use anyhow::anyhow; | ||||
| use derive_more::From; | ||||
| use quick_xml::events::Event; | ||||
| use quick_xml::name::{Namespace, ResolveResult}; | ||||
| 
 | ||||
| use lavina_core::prelude::*; | ||||
| use proto_xmpp::bind::BindRequest; | ||||
| use proto_xmpp::client::{Iq, Message, Presence}; | ||||
| use proto_xmpp::disco::{InfoQuery, ItemQuery}; | ||||
|  | @ -9,8 +11,6 @@ use proto_xmpp::roster::RosterQuery; | |||
| use proto_xmpp::session::Session; | ||||
| use proto_xmpp::xml::*; | ||||
| 
 | ||||
| use crate::prelude::*; | ||||
| 
 | ||||
| #[derive(PartialEq, Eq, Debug, From)] | ||||
| pub enum IqClientBody { | ||||
|     Bind(BindRequest), | ||||
|  | @ -29,7 +29,7 @@ impl FromXml for IqClientBody { | |||
|             let bytes = match event { | ||||
|                 Event::Start(bytes) => bytes, | ||||
|                 Event::Empty(bytes) => bytes, | ||||
|                 _ => return Err(ffail!("Unexpected XML event: {event:?}")), | ||||
|                 _ => return Err(anyhow!("Unexpected XML event: {event:?}")), | ||||
|             }; | ||||
|             let name = bytes.name(); | ||||
|             match_parser!(name, namespace, event; | ||||
|  | @ -67,7 +67,7 @@ impl FromXml for ClientPacket { | |||
|                         Presence::<Ignore>, | ||||
|                         Message, | ||||
|                         { | ||||
|                             Err(ffail!( | ||||
|                             Err(anyhow!( | ||||
|                                 "Unexpected XML event of name {:?} in namespace {:?}", | ||||
|                                 name, | ||||
|                                 namespace | ||||
|  | @ -80,11 +80,11 @@ impl FromXml for ClientPacket { | |||
|                     if name.local_name().as_ref() == b"stream" { | ||||
|                         return Ok(ClientPacket::StreamEnd); | ||||
|                     } else { | ||||
|                         return Err(ffail!("Unexpected XML event: {event:?}")); | ||||
|                         return Err(anyhow!("Unexpected XML event: {event:?}")); | ||||
|                     } | ||||
|                 } | ||||
|                 _ => { | ||||
|                     return Err(ffail!("Unexpected XML event: {event:?}")); | ||||
|                     return Err(anyhow!("Unexpected XML event: {event:?}")); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  |  | |||
|  | @ -1,33 +1,3 @@ | |||
| use crate::prelude::*; | ||||
| 
 | ||||
| pub mod table; | ||||
| pub mod telemetry; | ||||
| #[cfg(test)] | ||||
| pub mod testkit; | ||||
| 
 | ||||
| pub struct Terminator { | ||||
|     signal: Promise<()>, | ||||
|     completion: JoinHandle<Result<()>>, | ||||
| } | ||||
| impl Terminator { | ||||
|     pub async fn terminate(self) -> Result<()> { | ||||
|         match self.signal.send(()) { | ||||
|             Ok(()) => {} | ||||
|             Err(_) => log::warn!("Termination channel is dropped"), | ||||
|         } | ||||
|         self.completion.await??; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Used to spawn managed tasks with support for graceful shutdown
 | ||||
|     pub fn spawn<Fun, Fut>(launcher: Fun) -> Terminator | ||||
|     where | ||||
|         Fun: FnOnce(Deferred<()>) -> Fut, | ||||
|         Fut: Future<Output = Result<()>> + Send + 'static, | ||||
|     { | ||||
|         let (signal, rx) = oneshot(); | ||||
|         let future = launcher(rx); | ||||
|         let completion = tokio::task::spawn(future); | ||||
|         Terminator { signal, completion } | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -11,10 +11,10 @@ use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; | |||
| use serde::{Deserialize, Serialize}; | ||||
| use tokio::net::TcpListener; | ||||
| 
 | ||||
| use crate::core::repo::Storage; | ||||
| use crate::core::room::RoomRegistry; | ||||
| use crate::prelude::*; | ||||
| use crate::util::Terminator; | ||||
| use lavina_core::prelude::*; | ||||
| use lavina_core::repo::Storage; | ||||
| use lavina_core::room::RoomRegistry; | ||||
| use lavina_core::terminator::Terminator; | ||||
| 
 | ||||
| use mgmt_api::*; | ||||
| 
 | ||||
|  |  | |||
|  | @ -2,7 +2,7 @@ use std::task::{Context, Poll}; | |||
| 
 | ||||
| use futures_util::task::noop_waker_ref; | ||||
| 
 | ||||
| use crate::prelude::*; | ||||
| use lavina_core::prelude::*; | ||||
| 
 | ||||
| pub fn sync_future<T>(future: impl Future<Output = T>) -> Result<T> { | ||||
|     let waker = noop_waker_ref(); | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue