forked from lavina/lavina
1
0
Fork 0

add more instrumentation

This commit is contained in:
Nikita Vilunov 2024-04-26 02:21:40 +02:00
parent e76ebf060a
commit cca30d5152
6 changed files with 62 additions and 19 deletions

View File

@ -13,3 +13,7 @@ hostname = "localhost"
[storage] [storage]
db_path = "db.sqlite" db_path = "db.sqlite"
[tracing]
endpoint = "http://localhost:4317"
service_name = "lavina"

View File

@ -25,6 +25,7 @@ impl<'a> Authenticator<'a> {
Self { storage } Self { storage }
} }
#[tracing::instrument(skip(self, provided_password), name = "Authenticator::authenticate")]
pub async fn authenticate(&self, login: &str, provided_password: &str) -> Result<Verdict> { pub async fn authenticate(&self, login: &str, provided_password: &str) -> Result<Verdict> {
let Some(stored_user) = self.storage.retrieve_user_by_name(login).await? else { let Some(stored_user) = self.storage.retrieve_user_by_name(login).await? else {
return Ok(Verdict::UserNotFound); return Ok(Verdict::UserNotFound);
@ -46,6 +47,7 @@ impl<'a> Authenticator<'a> {
Ok(Verdict::InvalidPassword) Ok(Verdict::InvalidPassword)
} }
#[tracing::instrument(skip(self, provided_password), name = "Authenticator::set_password")]
pub async fn set_password(&self, login: &str, provided_password: &str) -> Result<UpdatePasswordResult> { pub async fn set_password(&self, login: &str, provided_password: &str) -> Result<UpdatePasswordResult> {
let Some(u) = self.storage.retrieve_user_by_name(login).await? else { let Some(u) = self.storage.retrieve_user_by_name(login).await? else {
return Ok(UpdatePasswordResult::UserNotFound); return Ok(UpdatePasswordResult::UserNotFound);

View File

@ -146,7 +146,7 @@ impl PlayerHandle {
} }
async fn send(&self, command: ActorCommand) { async fn send(&self, command: ActorCommand) {
let span = tracing::span!(tracing::Level::DEBUG, "PlayerHandle::send"); let span = tracing::span!(tracing::Level::INFO, "PlayerHandle::send");
// TODO either handle the error or doc why it is safe to ignore // TODO either handle the error or doc why it is safe to ignore
let _ = self.tx.send((command, span)).await; let _ = self.tx.send((command, span)).await;
} }

View File

@ -3,6 +3,7 @@ use anyhow::Result;
use crate::repo::Storage; use crate::repo::Storage;
impl Storage { impl Storage {
#[tracing::instrument(skip(self), name = "Storage::set_argon2_challenge")]
pub async fn set_argon2_challenge(&self, user_id: u32, hash: &str) -> Result<()> { pub async fn set_argon2_challenge(&self, user_id: u32, hash: &str) -> Result<()> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
sqlx::query( sqlx::query(

View File

@ -40,6 +40,7 @@ impl Storage {
Ok(Storage { conn }) Ok(Storage { conn })
} }
#[tracing::instrument(skip(self), name = "Storage::retrieve_user_by_name")]
pub async fn retrieve_user_by_name(&self, name: &str) -> Result<Option<StoredUser>> { pub async fn retrieve_user_by_name(&self, name: &str) -> Result<Option<StoredUser>> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let res = sqlx::query_as( let res = sqlx::query_as(
@ -55,6 +56,7 @@ impl Storage {
Ok(res) Ok(res)
} }
#[tracing::instrument(skip(self), name = "Storage::retrieve_room_by_name")]
pub async fn retrieve_room_by_name(&self, name: &str) -> Result<Option<StoredRoom>> { pub async fn retrieve_room_by_name(&self, name: &str) -> Result<Option<StoredRoom>> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let res = sqlx::query_as( let res = sqlx::query_as(
@ -69,6 +71,7 @@ impl Storage {
Ok(res) Ok(res)
} }
#[tracing::instrument(skip(self, topic), name = "Storage::create_new_room")]
pub async fn create_new_room(&mut self, name: &str, topic: &str) -> Result<u32> { pub async fn create_new_room(&mut self, name: &str, topic: &str) -> Result<u32> {
let mut executor = self.conn.lock().await; let mut executor = self.conn.lock().await;
let (id,): (u32,) = sqlx::query_as( let (id,): (u32,) = sqlx::query_as(
@ -128,6 +131,7 @@ impl Storage {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self), name = "Storage::create_user")]
pub async fn create_user(&mut self, name: &str) -> Result<()> { pub async fn create_user(&mut self, name: &str) -> Result<()> {
let query = sqlx::query( let query = sqlx::query(
"insert into users(name) "insert into users(name)
@ -140,6 +144,7 @@ impl Storage {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, pwd), name = "Storage::set_password")]
pub async fn set_password<'a>(&'a self, name: &'a str, pwd: &'a str) -> Result<Option<()>> { pub async fn set_password<'a>(&'a self, name: &'a str, pwd: &'a str) -> Result<Option<()>> {
async fn inner(txn: &mut Transaction<'_, Sqlite>, name: &str, pwd: &str) -> Result<Option<()>> { async fn inner(txn: &mut Transaction<'_, Sqlite>, name: &str, pwd: &str) -> Result<Option<()>> {
let id: Option<(u32,)> = sqlx::query_as("select * from users where name = ? limit 1;") let id: Option<(u32,)> = sqlx::query_as("select * from users where name = ? limit 1;")

View File

@ -14,7 +14,9 @@ use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use opentelemetry_semantic_conventions::SCHEMA_URL; use opentelemetry_semantic_conventions::SCHEMA_URL;
use prometheus::Registry as MetricsRegistry; use prometheus::Registry as MetricsRegistry;
use serde::Deserialize; use serde::Deserialize;
use tracing::level_filters::LevelFilter;
use tracing_opentelemetry::OpenTelemetryLayer; use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
use lavina_core::prelude::*; use lavina_core::prelude::*;
@ -27,6 +29,13 @@ struct ServerConfig {
irc: projection_irc::ServerConfig, irc: projection_irc::ServerConfig,
xmpp: projection_xmpp::ServerConfig, xmpp: projection_xmpp::ServerConfig,
storage: lavina_core::repo::StorageConfig, storage: lavina_core::repo::StorageConfig,
tracing: Option<TracingConfig>,
}
#[derive(Deserialize, Debug)]
struct TracingConfig {
endpoint: String,
service_name: String,
} }
#[derive(Parser)] #[derive(Parser)]
@ -44,9 +53,9 @@ fn load_config() -> Result<ServerConfig> {
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
set_up_logging()?;
let sleep = ctrl_c()?; let sleep = ctrl_c()?;
let config = load_config()?; let config = load_config()?;
set_up_logging(&config.tracing)?;
tracing::info!("Booting up"); tracing::info!("Booting up");
tracing::info!("Loaded config: {config:?}"); tracing::info!("Loaded config: {config:?}");
@ -55,6 +64,7 @@ async fn main() -> Result<()> {
irc: irc_config, irc: irc_config,
xmpp: xmpp_config, xmpp: xmpp_config,
storage: storage_config, storage: storage_config,
tracing: _,
} = config; } = config;
let metrics = MetricsRegistry::new(); let metrics = MetricsRegistry::new();
let storage = Storage::open(storage_config).await?; let storage = Storage::open(storage_config).await?;
@ -95,23 +105,44 @@ fn ctrl_c() -> Result<impl Future<Output = ()>> {
Ok(recv(chan)) Ok(recv(chan))
} }
fn set_up_logging() -> Result<()> { fn set_up_logging(tracing_config: &Option<TracingConfig>) -> Result<()> {
let subscriber = tracing_subscriber::registry().with(tracing_subscriber::fmt::layer());
let targets = {
use std::{env, str::FromStr};
use tracing_subscriber::{filter::Targets, layer::SubscriberExt};
match env::var("RUST_LOG") {
Ok(var) => Targets::from_str(&var)
.map_err(|e| {
eprintln!("Ignoring `RUST_LOG={:?}`: {}", var, e);
})
.unwrap_or_default(),
Err(env::VarError::NotPresent) => Targets::new().with_default(Subscriber::DEFAULT_MAX_LEVEL),
Err(e) => {
eprintln!("Ignoring `RUST_LOG`: {}", e);
Targets::new().with_default(Subscriber::DEFAULT_MAX_LEVEL)
}
}
};
if let Some(config) = tracing_config {
let trace_config = opentelemetry_sdk::trace::Config::default() let trace_config = opentelemetry_sdk::trace::Config::default()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(1.0)))) .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(1.0))))
.with_id_generator(RandomIdGenerator::default()) .with_id_generator(RandomIdGenerator::default())
.with_resource(Resource::from_schema_url( .with_resource(Resource::from_schema_url(
[KeyValue::new(SERVICE_NAME, "lavina")], [KeyValue::new(SERVICE_NAME, config.service_name.to_string())],
SCHEMA_URL, SCHEMA_URL,
)); ));
let trace_exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint("http://localhost:4317"); let trace_exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(&config.endpoint);
let tracer = opentelemetry_otlp::new_pipeline() let tracer = opentelemetry_otlp::new_pipeline()
.tracing() .tracing()
.with_trace_config(trace_config) .with_trace_config(trace_config)
.with_batch_config(BatchConfig::default()) .with_batch_config(BatchConfig::default())
.with_exporter(trace_exporter) .with_exporter(trace_exporter)
.install_batch(runtime::Tokio) .install_batch(runtime::Tokio)?;
.unwrap(); let subscriber = subscriber.with(OpenTelemetryLayer::new(tracer));
targets.with_subscriber(subscriber).try_init()?;
tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).with(OpenTelemetryLayer::new(tracer)).init(); } else {
targets.with_subscriber(subscriber).try_init()?;
}
Ok(()) Ok(())
} }