forked from lavina/lavina
				
			split irc proejction into a separate crate
This commit is contained in:
		
							parent
							
								
									a1db17c779
								
							
						
					
					
						commit
						9ca81b3a2b
					
				|  | @ -857,8 +857,8 @@ dependencies = [ | |||
|  "lavina-core", | ||||
|  "mgmt-api", | ||||
|  "nonempty", | ||||
|  "projection-irc", | ||||
|  "prometheus", | ||||
|  "proto-irc", | ||||
|  "proto-xmpp", | ||||
|  "quick-xml", | ||||
|  "regex", | ||||
|  | @ -1252,6 +1252,21 @@ dependencies = [ | |||
|  "yansi", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "projection-irc" | ||||
| version = "0.0.1-dev" | ||||
| dependencies = [ | ||||
|  "anyhow", | ||||
|  "futures-util", | ||||
|  "lavina-core", | ||||
|  "nonempty", | ||||
|  "prometheus", | ||||
|  "proto-irc", | ||||
|  "serde", | ||||
|  "tokio", | ||||
|  "tracing", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "prometheus" | ||||
| version = "0.13.3" | ||||
|  |  | |||
|  | @ -3,6 +3,7 @@ members = [ | |||
|     ".", | ||||
|     "crates/lavina-core", | ||||
|     "crates/proto-irc", | ||||
|     "crates/projection-irc", | ||||
|     "crates/proto-xmpp", | ||||
|     "crates/mgmt-api", | ||||
| ] | ||||
|  | @ -25,6 +26,7 @@ clap = { version = "4.4.4", features = ["derive"] } | |||
| serde = { version = "1.0.152", features = ["rc", "serde_derive"] } | ||||
| tracing = "0.1.37" # logging & tracing api | ||||
| prometheus = { version = "0.13.3", default-features = false } | ||||
| lavina-core = { path = "crates/lavina-core" } | ||||
| 
 | ||||
| [package] | ||||
| name = "lavina" | ||||
|  | @ -50,8 +52,8 @@ rustls-pemfile = "1.0.2" | |||
| quick-xml.workspace = true | ||||
| derive_more.workspace = true | ||||
| uuid = { version = "1.3.0", features = ["v4"] } | ||||
| lavina-core = { path = "crates/lavina-core" } | ||||
| proto-irc = { path = "crates/proto-irc" } | ||||
| lavina-core.workspace = true | ||||
| projection-irc = { path = "crates/projection-irc" } | ||||
| proto-xmpp = { path = "crates/proto-xmpp" } | ||||
| mgmt-api = { path = "crates/mgmt-api" } | ||||
| clap.workspace = true | ||||
|  |  | |||
|  | @ -1,7 +1,8 @@ | |||
| //! Domain definitions and implementation of common chat logic.
 | ||||
| pub mod player; | ||||
| pub mod prelude; | ||||
| pub mod repo; | ||||
| pub mod room; | ||||
| pub mod terminator; | ||||
| 
 | ||||
| mod prelude; | ||||
| mod table; | ||||
|  |  | |||
|  | @ -0,0 +1,28 @@ | |||
| use crate::prelude::*; | ||||
| 
 | ||||
| pub struct Terminator { | ||||
|     signal: Promise<()>, | ||||
|     completion: JoinHandle<Result<()>>, | ||||
| } | ||||
| impl Terminator { | ||||
|     pub async fn terminate(self) -> Result<()> { | ||||
|         match self.signal.send(()) { | ||||
|             Ok(()) => {} | ||||
|             Err(_) => log::warn!("Termination channel is dropped"), | ||||
|         } | ||||
|         self.completion.await??; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Used to spawn managed tasks with support for graceful shutdown
 | ||||
|     pub fn spawn<Fun, Fut>(launcher: Fun) -> Terminator | ||||
|     where | ||||
|         Fun: FnOnce(Deferred<()>) -> Fut, | ||||
|         Fut: Future<Output = Result<()>> + Send + 'static, | ||||
|     { | ||||
|         let (signal, rx) = oneshot(); | ||||
|         let future = launcher(rx); | ||||
|         let completion = tokio::task::spawn(future); | ||||
|         Terminator { signal, completion } | ||||
|     } | ||||
| } | ||||
|  | @ -0,0 +1,15 @@ | |||
| [package] | ||||
| name = "projection-irc" | ||||
| edition = "2021" | ||||
| version.workspace = true | ||||
| 
 | ||||
| [dependencies] | ||||
| lavina-core.workspace = true | ||||
| tracing.workspace = true | ||||
| anyhow.workspace = true | ||||
| nonempty.workspace = true | ||||
| serde.workspace = true | ||||
| tokio.workspace = true | ||||
| prometheus.workspace = true | ||||
| proto-irc = { path = "../proto-irc" } | ||||
| futures-util.workspace = true | ||||
|  | @ -1,11 +1,11 @@ | |||
| use std::collections::HashMap; | ||||
| use std::net::SocketAddr; | ||||
| 
 | ||||
| use anyhow::{anyhow, Result}; | ||||
| use futures_util::future::join_all; | ||||
| use nonempty::nonempty; | ||||
| use nonempty::NonEmpty; | ||||
| use prometheus::{IntCounter, IntGauge, Registry as MetricsRegistry}; | ||||
| 
 | ||||
| use serde::Deserialize; | ||||
| use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; | ||||
| use tokio::net::tcp::{ReadHalf, WriteHalf}; | ||||
|  | @ -13,14 +13,14 @@ use tokio::net::{TcpListener, TcpStream}; | |||
| use tokio::sync::mpsc::channel; | ||||
| 
 | ||||
| use lavina_core::player::*; | ||||
| use lavina_core::prelude::*; | ||||
| use lavina_core::repo::Storage; | ||||
| use lavina_core::room::{RoomId, RoomInfo, RoomRegistry}; | ||||
| use crate::prelude::*; | ||||
| use lavina_core::terminator::Terminator; | ||||
| use proto_irc::client::{client_message, ClientMessage}; | ||||
| use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; | ||||
| use proto_irc::user::PrefixedNick; | ||||
| use proto_irc::{Chan, Recipient}; | ||||
| use proto_irc::user::{Prefix, PrefixedNick}; | ||||
| use crate::util::Terminator; | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod test; | ||||
|  | @ -70,14 +70,12 @@ async fn handle_socket( | |||
|     .await?; | ||||
|     writer.flush().await?; | ||||
| 
 | ||||
|     let registered_user: Result<RegisteredUser> = | ||||
|         handle_registration(&mut reader, &mut writer, &mut storage).await; | ||||
|     let registered_user: Result<RegisteredUser> = handle_registration(&mut reader, &mut writer, &mut storage).await; | ||||
| 
 | ||||
|     match registered_user { | ||||
|         Ok(user) => { | ||||
|             log::debug!("User registered"); | ||||
|             handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user) | ||||
|                 .await?; | ||||
|             handle_registered_socket(config, players, rooms, &mut reader, &mut writer, user).await?; | ||||
|         } | ||||
|         Err(_) => { | ||||
|             log::debug!("Registration failed"); | ||||
|  | @ -159,23 +157,22 @@ async fn handle_registration<'a>( | |||
|         buffer.clear(); | ||||
|     }?; | ||||
| 
 | ||||
| 
 | ||||
|     let stored_user = storage.retrieve_user_by_name(&*user.nickname).await?; | ||||
| 
 | ||||
|     let stored_user = match stored_user { | ||||
|         Some(u) => u, | ||||
|         None => { | ||||
|             log::info!("User '{}' not found", user.nickname); | ||||
|             return Err(fail("no user found")); | ||||
|             return Err(anyhow!("no user found")); | ||||
|         } | ||||
|     }; | ||||
|     if stored_user.password.is_none() { | ||||
|         log::info!("Password not defined for user '{}'", user.nickname); | ||||
|         return Err(fail("password is not defined")); | ||||
|         return Err(anyhow!("password is not defined")); | ||||
|     } | ||||
|     if stored_user.password.as_deref() != pass.as_deref() { | ||||
|         log::info!("Incorrect password supplied for user '{}'", user.nickname); | ||||
|         return Err(fail("passwords do not match")); | ||||
|         return Err(anyhow!("passwords do not match")); | ||||
|     } | ||||
|     // TODO properly implement session temination
 | ||||
| 
 | ||||
|  | @ -250,14 +247,7 @@ async fn handle_registered_socket<'a>( | |||
| 
 | ||||
|     let rooms_list = connection.get_rooms().await?; | ||||
|     for room in &rooms_list { | ||||
|         produce_on_join_cmd_messages( | ||||
|             &config, | ||||
|             &user, | ||||
|             &Chan::Global(room.id.as_inner().clone()), | ||||
|             room, | ||||
|             writer, | ||||
|         ) | ||||
|         .await?; | ||||
|         produce_on_join_cmd_messages(&config, &user, &Chan::Global(room.id.as_inner().clone()), room, writer).await?; | ||||
|     } | ||||
| 
 | ||||
|     writer.flush().await?; | ||||
|  | @ -314,10 +304,7 @@ async fn handle_update( | |||
| ) -> Result<()> { | ||||
|     log::debug!("Sending irc message to player {player_id:?} on update {update:?}"); | ||||
|     match update { | ||||
|         Updates::RoomJoined { | ||||
|             new_member_id, | ||||
|             room_id, | ||||
|         } => { | ||||
|         Updates::RoomJoined { new_member_id, room_id } => { | ||||
|             if player_id == &new_member_id { | ||||
|                 if let Some(room) = rooms.get_room(&room_id).await { | ||||
|                     let room_info = room.get_room_info().await; | ||||
|  | @ -438,16 +425,14 @@ async fn handle_incoming_message( | |||
|                 Recipient::Chan(Chan::Global(chan)) => { | ||||
|                     let room_id = RoomId::from(chan)?; | ||||
|                     user_handle.send_message(room_id, body).await?; | ||||
|                 }, | ||||
|                 } | ||||
|                 _ => log::warn!("Unsupported target type"), | ||||
|             }, | ||||
|             ClientMessage::Topic { chan, topic } => { | ||||
|                 match chan { | ||||
|                     Chan::Global(chan) => { | ||||
|                         let room_id = RoomId::from(chan)?; | ||||
|                         user_handle | ||||
|                             .change_topic(room_id.clone(), topic.clone()) | ||||
|                             .await?; | ||||
|                         user_handle.change_topic(room_id.clone(), topic.clone()).await?; | ||||
|                         ServerMessage { | ||||
|                             tags: vec![], | ||||
|                             sender: Some(config.server_name.clone()), | ||||
|  | @ -569,11 +554,7 @@ async fn handle_incoming_message( | |||
|     Ok(HandleResult::Continue) | ||||
| } | ||||
| 
 | ||||
| fn user_to_who_msg( | ||||
|     config: &ServerConfig, | ||||
|     requestor: &RegisteredUser, | ||||
|     target_user_nickname: &Str, | ||||
| ) -> ServerMessageBody { | ||||
| fn user_to_who_msg(config: &ServerConfig, requestor: &RegisteredUser, target_user_nickname: &Str) -> ServerMessageBody { | ||||
|     // Username is equal to nickname
 | ||||
|     let username = format!("~{target_user_nickname}").into(); | ||||
| 
 | ||||
|  | @ -674,8 +655,13 @@ async fn produce_on_join_cmd_messages( | |||
|     } | ||||
|     .write_async(writer) | ||||
|     .await?; | ||||
|     let prefixed_members: Vec<PrefixedNick> = room_info.members.iter().map(|member| PrefixedNick::from_str(member.clone().into_inner())).collect(); | ||||
|     let non_empty_members: NonEmpty<PrefixedNick> = NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); | ||||
|     let prefixed_members: Vec<PrefixedNick> = room_info | ||||
|         .members | ||||
|         .iter() | ||||
|         .map(|member| PrefixedNick::from_str(member.clone().into_inner())) | ||||
|         .collect(); | ||||
|     let non_empty_members: NonEmpty<PrefixedNick> = | ||||
|         NonEmpty::from_vec(prefixed_members).unwrap_or(nonempty![PrefixedNick::from_str(user.nickname.clone())]); | ||||
| 
 | ||||
|     ServerMessage { | ||||
|         tags: vec![], | ||||
|  | @ -710,12 +696,8 @@ pub async fn launch( | |||
| ) -> Result<Terminator> { | ||||
|     log::info!("Starting IRC projection"); | ||||
|     let (stopped_tx, mut stopped_rx) = channel(32); | ||||
|     let current_connections = | ||||
|         IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; | ||||
|     let total_connections = IntCounter::new( | ||||
|         "irc_total_connections", | ||||
|         "Total number of opened connections", | ||||
|     )?; | ||||
|     let current_connections = IntGauge::new("irc_current_connections", "Open and alive TCP connections")?; | ||||
|     let total_connections = IntCounter::new("irc_total_connections", "Total number of opened connections")?; | ||||
|     metrics.register(Box::new(current_connections.clone()))?; | ||||
|     metrics.register(Box::new(total_connections.clone()))?; | ||||
| 
 | ||||
|  | @ -780,7 +762,8 @@ pub async fn launch( | |||
|                     log::warn!("IRC connection to {socket_addr} finished with error: {err}") | ||||
|                 } | ||||
|             } | ||||
|         })).await; | ||||
|         })) | ||||
|         .await; | ||||
|         log::info!("Stopped IRC projection"); | ||||
|         Ok(()) | ||||
|     }); | ||||
|  | @ -2,7 +2,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; | |||
| use tokio::net::tcp::{ReadHalf, WriteHalf}; | ||||
| use tokio::net::TcpStream; | ||||
| 
 | ||||
| use crate::prelude::*; | ||||
| use lavina_core::prelude::*; | ||||
| 
 | ||||
| struct TestScope<'a> { | ||||
|     reader: BufReader<ReadHalf<'a>>, | ||||
|  | @ -35,11 +35,7 @@ async fn init_client(stream: &mut TcpStream) -> Result<TestScope> { | |||
|     let (reader, writer) = stream.split(); | ||||
|     let reader = BufReader::new(reader); | ||||
|     let buffer = vec![]; | ||||
|     Ok(TestScope { | ||||
|         reader, | ||||
|         writer, | ||||
|         buffer, | ||||
|     }) | ||||
|     Ok(TestScope { reader, writer, buffer }) | ||||
| } | ||||
| 
 | ||||
| macro_rules! send { | ||||
|  | @ -5,7 +5,6 @@ | |||
|     impl_trait_in_assoc_type | ||||
| )] | ||||
| 
 | ||||
| mod prelude; | ||||
| mod projections; | ||||
| mod util; | ||||
| 
 | ||||
|  | @ -19,14 +18,14 @@ use prometheus::Registry as MetricsRegistry; | |||
| use serde::Deserialize; | ||||
| 
 | ||||
| use lavina_core::player::PlayerRegistry; | ||||
| use lavina_core::prelude::*; | ||||
| use lavina_core::repo::Storage; | ||||
| use lavina_core::room::RoomRegistry; | ||||
| use crate::prelude::*; | ||||
| 
 | ||||
| #[derive(Deserialize, Debug)] | ||||
| struct ServerConfig { | ||||
|     telemetry: util::telemetry::ServerConfig, | ||||
|     irc: projections::irc::ServerConfig, | ||||
|     irc: projection_irc::ServerConfig, | ||||
|     xmpp: projections::xmpp::ServerConfig, | ||||
|     storage: lavina_core::repo::StorageConfig, | ||||
| } | ||||
|  | @ -64,7 +63,7 @@ async fn main() -> Result<()> { | |||
|     let mut players = PlayerRegistry::empty(rooms.clone(), &mut metrics)?; | ||||
|     let telemetry_terminator = | ||||
|         util::telemetry::launch(telemetry_config, metrics.clone(), rooms.clone(), storage.clone()).await?; | ||||
|     let irc = projections::irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; | ||||
|     let irc = projection_irc::launch(irc_config, players.clone(), rooms.clone(), metrics.clone(), storage.clone()).await?; | ||||
|     let xmpp = projections::xmpp::launch(xmpp_config, players.clone(), rooms.clone(), metrics.clone()).await?; | ||||
|     tracing::info!("Started"); | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,25 +0,0 @@ | |||
| pub use std::future::Future; | ||||
| 
 | ||||
| pub use tokio::pin; | ||||
| pub use tokio::select; | ||||
| pub use tokio::sync::oneshot::{channel as oneshot, Receiver as Deferred, Sender as Promise}; | ||||
| pub use tokio::task::JoinHandle; | ||||
| 
 | ||||
| pub mod log { | ||||
|     pub use tracing::{debug, error, info, warn}; | ||||
| } | ||||
| 
 | ||||
| pub type Result<T> = std::result::Result<T, anyhow::Error>; | ||||
| pub type Str = std::sync::Arc<str>; | ||||
| 
 | ||||
| pub fn fail(msg: &str) -> anyhow::Error { | ||||
|     anyhow::Error::msg(msg.to_owned()) | ||||
| } | ||||
| 
 | ||||
| macro_rules! ffail { | ||||
|     ($($arg:tt)*) => { | ||||
|         fail(&format!($($arg)*)) | ||||
|     }; | ||||
| } | ||||
| 
 | ||||
| pub(crate) use ffail; | ||||
|  | @ -1,3 +1,2 @@ | |||
| //! Protocol projections — implementations of public APIs.
 | ||||
| pub mod irc; | ||||
| pub mod xmpp; | ||||
|  |  | |||
|  | @ -20,8 +20,9 @@ use tokio_rustls::rustls::{Certificate, PrivateKey}; | |||
| use tokio_rustls::TlsAcceptor; | ||||
| 
 | ||||
| use lavina_core::player::{PlayerConnection, PlayerId, PlayerRegistry}; | ||||
| use lavina_core::prelude::*; | ||||
| use lavina_core::room::{RoomId, RoomRegistry}; | ||||
| use crate::prelude::*; | ||||
| use lavina_core::terminator::Terminator; | ||||
| use proto_xmpp::bind::{BindResponse, Jid, Name, Resource, Server}; | ||||
| use proto_xmpp::client::{Iq, Message, MessageType, Presence}; | ||||
| use proto_xmpp::disco::*; | ||||
|  | @ -29,7 +30,6 @@ use proto_xmpp::roster::RosterQuery; | |||
| use proto_xmpp::session::Session; | ||||
| use proto_xmpp::stream::*; | ||||
| use proto_xmpp::xml::{Continuation, FromXml, Parser, ToXml}; | ||||
| use crate::util::Terminator; | ||||
| 
 | ||||
| use self::proto::{ClientPacket, IqClientBody}; | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,7 +1,9 @@ | |||
| use anyhow::anyhow; | ||||
| use derive_more::From; | ||||
| use quick_xml::events::Event; | ||||
| use quick_xml::name::{Namespace, ResolveResult}; | ||||
| 
 | ||||
| use lavina_core::prelude::*; | ||||
| use proto_xmpp::bind::BindRequest; | ||||
| use proto_xmpp::client::{Iq, Message, Presence}; | ||||
| use proto_xmpp::disco::{InfoQuery, ItemQuery}; | ||||
|  | @ -9,8 +11,6 @@ use proto_xmpp::roster::RosterQuery; | |||
| use proto_xmpp::session::Session; | ||||
| use proto_xmpp::xml::*; | ||||
| 
 | ||||
| use crate::prelude::*; | ||||
| 
 | ||||
| #[derive(PartialEq, Eq, Debug, From)] | ||||
| pub enum IqClientBody { | ||||
|     Bind(BindRequest), | ||||
|  | @ -29,7 +29,7 @@ impl FromXml for IqClientBody { | |||
|             let bytes = match event { | ||||
|                 Event::Start(bytes) => bytes, | ||||
|                 Event::Empty(bytes) => bytes, | ||||
|                 _ => return Err(ffail!("Unexpected XML event: {event:?}")), | ||||
|                 _ => return Err(anyhow!("Unexpected XML event: {event:?}")), | ||||
|             }; | ||||
|             let name = bytes.name(); | ||||
|             match_parser!(name, namespace, event; | ||||
|  | @ -67,7 +67,7 @@ impl FromXml for ClientPacket { | |||
|                         Presence::<Ignore>, | ||||
|                         Message, | ||||
|                         { | ||||
|                             Err(ffail!( | ||||
|                             Err(anyhow!( | ||||
|                                 "Unexpected XML event of name {:?} in namespace {:?}", | ||||
|                                 name, | ||||
|                                 namespace | ||||
|  | @ -80,11 +80,11 @@ impl FromXml for ClientPacket { | |||
|                     if name.local_name().as_ref() == b"stream" { | ||||
|                         return Ok(ClientPacket::StreamEnd); | ||||
|                     } else { | ||||
|                         return Err(ffail!("Unexpected XML event: {event:?}")); | ||||
|                         return Err(anyhow!("Unexpected XML event: {event:?}")); | ||||
|                     } | ||||
|                 } | ||||
|                 _ => { | ||||
|                     return Err(ffail!("Unexpected XML event: {event:?}")); | ||||
|                     return Err(anyhow!("Unexpected XML event: {event:?}")); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  |  | |||
|  | @ -1,32 +1,3 @@ | |||
| use crate::prelude::*; | ||||
| 
 | ||||
| pub mod telemetry; | ||||
| #[cfg(test)] | ||||
| pub mod testkit; | ||||
| 
 | ||||
| pub struct Terminator { | ||||
|     signal: Promise<()>, | ||||
|     completion: JoinHandle<Result<()>>, | ||||
| } | ||||
| impl Terminator { | ||||
|     pub async fn terminate(self) -> Result<()> { | ||||
|         match self.signal.send(()) { | ||||
|             Ok(()) => {} | ||||
|             Err(_) => log::warn!("Termination channel is dropped"), | ||||
|         } | ||||
|         self.completion.await??; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Used to spawn managed tasks with support for graceful shutdown
 | ||||
|     pub fn spawn<Fun, Fut>(launcher: Fun) -> Terminator | ||||
|     where | ||||
|         Fun: FnOnce(Deferred<()>) -> Fut, | ||||
|         Fut: Future<Output = Result<()>> + Send + 'static, | ||||
|     { | ||||
|         let (signal, rx) = oneshot(); | ||||
|         let future = launcher(rx); | ||||
|         let completion = tokio::task::spawn(future); | ||||
|         Terminator { signal, completion } | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -11,10 +11,10 @@ use prometheus::{Encoder, Registry as MetricsRegistry, TextEncoder}; | |||
| use serde::{Deserialize, Serialize}; | ||||
| use tokio::net::TcpListener; | ||||
| 
 | ||||
| use lavina_core::prelude::*; | ||||
| use lavina_core::repo::Storage; | ||||
| use lavina_core::room::RoomRegistry; | ||||
| use crate::prelude::*; | ||||
| use crate::util::Terminator; | ||||
| use lavina_core::terminator::Terminator; | ||||
| 
 | ||||
| use mgmt_api::*; | ||||
| 
 | ||||
|  |  | |||
|  | @ -2,7 +2,7 @@ use std::task::{Context, Poll}; | |||
| 
 | ||||
| use futures_util::task::noop_waker_ref; | ||||
| 
 | ||||
| use crate::prelude::*; | ||||
| use lavina_core::prelude::*; | ||||
| 
 | ||||
| pub fn sync_future<T>(future: impl Future<Output = T>) -> Result<T> { | ||||
|     let waker = noop_waker_ref(); | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue