From 938bcd79260f88b2598c3642ab13b0ee5561e63b Mon Sep 17 00:00:00 2001 From: gusto Date: Thu, 24 Oct 2024 19:33:04 +0300 Subject: [PATCH] Nomos tracing crate (#849) * tmp grafana stack * Testnet configure otlp tracing * tmp: Configurable loki and tempo in cfgsync * Rename nomos-log to nomos-tracing-service * nomos-tracing crate for tracing layer creation * Use multiple layers in nomos-tracing-service * Tracing in tests and testnet * Tempo volume instead of local host dir --- Cargo.toml | 5 +- compose.static.yml | 47 ++++ nodes/nomos-executor/src/config.rs | 8 +- nodes/nomos-executor/src/lib.rs | 8 +- nodes/nomos-executor/src/main.rs | 2 +- nodes/nomos-node/Cargo.toml | 3 +- nodes/nomos-node/src/config.rs | 54 ++--- nodes/nomos-node/src/lib.rs | 6 +- nodes/nomos-node/src/main.rs | 2 +- .../data-availability/tests/Cargo.toml | 1 - nomos-services/mempool/Cargo.toml | 2 +- nomos-services/mempool/tests/mock.rs | 6 +- nomos-services/{log => tracing}/Cargo.toml | 10 +- nomos-services/{log => tracing}/src/lib.rs | 200 ++++++++---------- nomos-tracing/Cargo.toml | 18 ++ nomos-tracing/src/lib.rs | 2 + nomos-tracing/src/logging/gelf.rs | 36 ++++ nomos-tracing/src/logging/local.rs | 40 ++++ nomos-tracing/src/logging/loki.rs | 29 +++ nomos-tracing/src/logging/mod.rs | 3 + nomos-tracing/src/tracing/mod.rs | 1 + nomos-tracing/src/tracing/otlp.rs | 42 ++++ testnet/cfgsync.yaml | 4 + testnet/cfgsync/Cargo.toml | 3 + testnet/cfgsync/src/bin/cfgsync-client.rs | 14 +- testnet/cfgsync/src/bin/cfgsync-server.rs | 32 ++- testnet/cfgsync/src/config.rs | 41 +++- testnet/cfgsync/src/lib.rs | 8 + testnet/cfgsync/src/repo.rs | 5 + testnet/monitoring/tempo.yaml | 71 +++++++ tests/Cargo.toml | 3 +- tests/src/nodes/executor.rs | 10 +- tests/src/nodes/validator.rs | 10 +- tests/src/topology/configs/mod.rs | 3 + tests/src/topology/configs/tracing.rs | 14 ++ tests/src/topology/mod.rs | 4 + 36 files changed, 562 insertions(+), 185 deletions(-) rename nomos-services/{log => tracing}/Cargo.toml (56%) rename nomos-services/{log => tracing}/src/lib.rs (54%) create mode 100644 nomos-tracing/Cargo.toml create mode 100644 nomos-tracing/src/lib.rs create mode 100644 nomos-tracing/src/logging/gelf.rs create mode 100644 nomos-tracing/src/logging/local.rs create mode 100644 nomos-tracing/src/logging/loki.rs create mode 100644 nomos-tracing/src/logging/mod.rs create mode 100644 nomos-tracing/src/tracing/mod.rs create mode 100644 nomos-tracing/src/tracing/otlp.rs create mode 100644 testnet/monitoring/tempo.yaml create mode 100644 tests/src/topology/configs/tracing.rs diff --git a/Cargo.toml b/Cargo.toml index 92fa376f..3a37ddfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "nomos-core/chain-defs", "nomos-core/cl", + "nomos-da/full-replication", "nomos-da/kzgrs", "nomos-da/kzgrs-backend", "nomos-da/storage", @@ -10,7 +11,7 @@ members = [ "nomos-da/network/subnetworks-assignations", "nomos-libp2p", "nomos-services/api", - "nomos-services/log", + "nomos-services/tracing", "nomos-services/network", "nomos-services/storage", "nomos-services/cryptarchia-consensus", @@ -24,10 +25,10 @@ members = [ "nomos-services/data-availability/dispersal", "nomos-services/data-availability/tests", "nomos-services/mix", - "nomos-da/full-replication", "nomos-mix/message", "nomos-mix/network", "nomos-mix/queue", + "nomos-tracing", "nomos-cli", "nomos-utils", "nodes/nomos-node", diff --git a/compose.static.yml b/compose.static.yml index 4fc71db3..77df2e45 100644 --- a/compose.static.yml +++ b/compose.static.yml @@ -91,6 +91,11 @@ services: grafana: container_name: grafana image: grafana/grafana:latest + environment: + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + - GF_AUTH_DISABLE_LOGIN_FORM=true + - GF_FEATURE_TOGGLES_ENABLE=traceqlEditor traceQLStreaming metricsSummary env_file: - ./testnet/monitoring/grafana/plugins.env volumes: @@ -101,3 +106,45 @@ services: restart: on-failure depends_on: - prometheus + + loki: + image: grafana/loki:2.9.2 + ports: + - "3100:3100" + command: -config.file=/etc/loki/local-config.yaml + + # Tempo runs as unpriviliged user, volumes need to be chowned before running. + tempo-init: + image: &tempoImage grafana/tempo:latest + user: root + entrypoint: + - "chown" + - "10001:10001" + - "/var/tempo" + volumes: + - tempo-data:/var/tempo + + memcached: + image: memcached:1.6.29 + container_name: memcached + ports: + - "11211:11211" + environment: + - MEMCACHED_MAX_MEMORY=64m + - MEMCACHED_THREADS=4 + + tempo: + image: *tempoImage + container_name: tempo + command: [ "-config.file=/etc/tempo.yaml" ] + volumes: + - ./testnet/monitoring/tempo.yaml:/etc/tempo.yaml:z + - tempo-data:/var/tempo + ports: + - "4317:4317" # otlp grpc + depends_on: + - tempo-init + - memcached + +volumes: + tempo-data: diff --git a/nodes/nomos-executor/src/config.rs b/nodes/nomos-executor/src/config.rs index 46c66f49..26a4d296 100644 --- a/nodes/nomos-executor/src/config.rs +++ b/nodes/nomos-executor/src/config.rs @@ -8,8 +8,8 @@ use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; use nomos_mix_service::MixService; use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_node::{ - config::{update_cryptarchia_consensus, update_log, update_mix, update_network, MixArgs}, - CryptarchiaArgs, HttpArgs, LogArgs, Logger, NetworkArgs, NetworkService, Wire, + config::{update_cryptarchia_consensus, update_mix, update_network, update_tracing, MixArgs}, + CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, NetworkService, Tracing, Wire, }; use nomos_storage::backends::rocksdb::RocksBackend; use overwatch_rs::services::ServiceData; @@ -20,7 +20,7 @@ use crate::ExecutorApiService; #[derive(Deserialize, Debug, Clone, Serialize)] pub struct Config { - pub log: ::Settings, + pub tracing: ::Settings, pub network: as ServiceData>::Settings, pub mix: as ServiceData>::Settings, pub da_dispersal: ::Settings, @@ -43,7 +43,7 @@ impl Config { http_args: HttpArgs, cryptarchia_args: CryptarchiaArgs, ) -> Result { - update_log(&mut self.log, log_args)?; + update_tracing(&mut self.tracing, log_args)?; update_network(&mut self.network, network_args)?; update_mix(&mut self.mix, mix_args)?; update_http(&mut self.http, http_args)?; diff --git a/nodes/nomos-executor/src/lib.rs b/nodes/nomos-executor/src/lib.rs index 7642dbae..db8f34dc 100644 --- a/nodes/nomos-executor/src/lib.rs +++ b/nodes/nomos-executor/src/lib.rs @@ -28,9 +28,9 @@ use nomos_node::MempoolNetworkAdapter; use nomos_node::Metrics; use nomos_node::NetworkBackend; use nomos_node::{ - BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier, Logger, - NetworkService, NomosDaMembership, RocksBackend, StorageService, SystemSig, Tx, TxMempool, - Wire, MB16, + BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier, + NetworkService, NomosDaMembership, RocksBackend, StorageService, SystemSig, Tracing, Tx, + TxMempool, Wire, MB16, }; use overwatch_derive::Services; use overwatch_rs::services::handle::ServiceHandle; @@ -88,7 +88,7 @@ pub type ExecutorDaVerifier = DaVerifier, + tracing: ServiceHandle, network: ServiceHandle>, mix: ServiceHandle>, da_dispersal: ServiceHandle, diff --git a/nodes/nomos-executor/src/main.rs b/nodes/nomos-executor/src/main.rs index 88a626df..64b49a3c 100644 --- a/nodes/nomos-executor/src/main.rs +++ b/nodes/nomos-executor/src/main.rs @@ -79,7 +79,7 @@ fn main() -> Result<()> { network: config.network, mix: config.mix, #[cfg(feature = "tracing")] - logging: config.log, + tracing: config.tracing, http: config.http, cl_mempool: TxMempoolSettings { backend: (), diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 1d165839..3688e2f0 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -30,7 +30,8 @@ nomos-da-network-service = { path = "../../nomos-services/data-availability/netw nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] } nomos-api = { path = "../../nomos-services/api" } -nomos-log = { path = "../../nomos-services/log" } +nomos-tracing = { path = "../../nomos-tracing" } +nomos-tracing-service = { path = "../../nomos-services/tracing" } nomos-mempool = { path = "../../nomos-services/mempool", features = [ "mock", "libp2p", diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 80477700..0fa8e053 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -8,26 +8,27 @@ use cl::{InputWitness, NoteWitness, NullifierSecret}; use clap::{Parser, ValueEnum}; use color_eyre::eyre::{eyre, Result}; use hex::FromHex; +use nomos_tracing::logging::{gelf::GelfConfig, local::FileConfig}; +use serde::{Deserialize, Serialize}; +use tracing::Level; +// internal +use crate::{NomosApiService, NomosDaMembership, Wire}; use nomos_core::staking::NMO_UNIT; use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend; use nomos_da_network_service::NetworkService as DaNetworkService; use nomos_libp2p::{ed25519::SecretKey, Multiaddr}; -use nomos_log::{Logger, LoggerBackend, LoggerFormat}; use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend; use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; use nomos_mix_service::MixService; use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_network::NetworkService; use nomos_storage::backends::rocksdb::RocksBackend; +use nomos_tracing_service::{LoggerLayer, Tracing}; use overwatch_rs::services::ServiceData; -use serde::{Deserialize, Serialize}; use subnetworks_assignations::versions::v1::FillFromNodeList; -use tracing::Level; -// internal -use crate::{NomosApiService, NomosDaMembership, Wire}; #[derive(ValueEnum, Clone, Debug, Default)] -pub enum LoggerBackendType { +pub enum LoggerLayerType { Gelf, File, #[default] @@ -51,10 +52,7 @@ pub struct LogArgs { /// Backend type #[clap(long = "log-backend", env = "LOG_BACKEND", value_enum)] - backend: Option, - - #[clap(long = "log-format", env = "LOG_FORMAT")] - format: Option, + backend: Option, #[clap(long = "log-level", env = "LOG_LEVEL")] level: Option, @@ -135,7 +133,7 @@ pub struct MetricsArgs { #[derive(Deserialize, Debug, Clone, Serialize)] pub struct Config { - pub log: ::Settings, + pub tracing: ::Settings, pub network: as ServiceData>::Settings, pub mix: as ServiceData>::Settings, pub da_network: @@ -159,7 +157,7 @@ impl Config { http_args: HttpArgs, cryptarchia_args: CryptarchiaArgs, ) -> Result { - update_log(&mut self.log, log_args)?; + update_tracing(&mut self.tracing, log_args)?; update_network(&mut self.network, network_args)?; update_mix(&mut self.mix, mix_args)?; update_http(&mut self.http, http_args)?; @@ -168,45 +166,39 @@ impl Config { } } -pub fn update_log(log: &mut ::Settings, log_args: LogArgs) -> Result<()> { +pub fn update_tracing( + tracing: &mut ::Settings, + tracing_args: LogArgs, +) -> Result<()> { let LogArgs { backend, log_addr: addr, directory, prefix, - format, level, - } = log_args; + } = tracing_args; // Override the file config with the one from env variables. if let Some(backend) = backend { - log.backend = match backend { - LoggerBackendType::Gelf => LoggerBackend::Gelf { + tracing.logger = match backend { + LoggerLayerType::Gelf => LoggerLayer::Gelf(GelfConfig { addr: addr .ok_or_else(|| eyre!("Gelf backend requires an address."))? .to_socket_addrs()? .next() .ok_or_else(|| eyre!("Invalid gelf address"))?, - }, - LoggerBackendType::File => LoggerBackend::File { + }), + LoggerLayerType::File => LoggerLayer::File(FileConfig { directory: directory.ok_or_else(|| eyre!("File backend requires a directory."))?, prefix, - }, - LoggerBackendType::Stdout => LoggerBackend::Stdout, - LoggerBackendType::Stderr => LoggerBackend::Stderr, + }), + LoggerLayerType::Stdout => LoggerLayer::Stdout, + LoggerLayerType::Stderr => LoggerLayer::Stderr, } }; - // Update parts of the config. - if let Some(format_str) = format { - log.format = match format_str.as_str() { - "Json" => LoggerFormat::Json, - "Plain" => LoggerFormat::Plain, - _ => return Err(eyre!("Invalid log format provided.")), - }; - } if let Some(level_str) = level { - log.level = match level_str.as_str() { + tracing.level = match level_str.as_str() { "DEBUG" => Level::DEBUG, "INFO" => Level::INFO, "ERROR" => Level::ERROR, diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index dd0af5bb..0856688e 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -29,8 +29,6 @@ use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; use nomos_da_verifier::network::adapters::validator::Libp2pAdapter as VerifierNetworkAdapter; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter; use nomos_da_verifier::DaVerifierService; -#[cfg(feature = "tracing")] -pub use nomos_log::Logger; pub use nomos_mempool::da::service::{DaMempoolService, DaMempoolSettings}; pub use nomos_mempool::network::adapters::libp2p::{ Libp2pAdapter as MempoolNetworkAdapter, Settings as MempoolAdapterSettings, @@ -53,6 +51,8 @@ pub use nomos_storage::{ StorageService, }; pub use nomos_system_sig::SystemSig; +#[cfg(feature = "tracing")] +pub use nomos_tracing_service::Tracing; use overwatch_derive::*; use overwatch_rs::services::handle::ServiceHandle; use rand_chacha::ChaCha20Rng; @@ -161,7 +161,7 @@ pub type NodeDaVerifier = DaVerifier>; #[derive(Services)] pub struct Nomos { #[cfg(feature = "tracing")] - logging: ServiceHandle, + tracing: ServiceHandle, network: ServiceHandle>, mix: ServiceHandle>, da_indexer: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 1bac2f2f..1f54d228 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -78,7 +78,7 @@ fn main() -> Result<()> { network: config.network, mix: config.mix, #[cfg(feature = "tracing")] - logging: config.log, + tracing: config.tracing, http: config.http, cl_mempool: nomos_mempool::TxMempoolSettings { backend: (), diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 004bf944..e08bfd36 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -22,7 +22,6 @@ nomos-da-storage = { path = "../../../nomos-da/storage" } nomos-node = { path = "../../../nodes/nomos-node" } nomos-mempool = { path = "../../../nomos-services/mempool" } nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb-backend"] } -nomos-log = { path = "../../log" } nomos-network = { path = "../../network", features = ["mock"] } nomos-mix-service = { path = "../../mix" } nomos-libp2p = { path = "../../../nomos-libp2p" } diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index c479d529..8a5ec8e9 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -29,7 +29,7 @@ utoipa = { version = "4.0", optional = true } serde_json = { version = "1", optional = true } [dev-dependencies] -nomos-log = { path = "../log" } +nomos-tracing-service = { path = "../tracing" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tokio = { version = "1", features = ["full"] } blake2 = "0.10" diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index acbbcc3b..b9436f82 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -2,11 +2,11 @@ use nomos_core::{ header::HeaderId, tx::mock::{MockTransaction, MockTxId}, }; -use nomos_log::{Logger, LoggerSettings}; use nomos_network::{ backends::mock::{Mock, MockBackendMessage, MockConfig, MockMessage}, NetworkConfig, NetworkMsg, NetworkService, }; +use nomos_tracing_service::{Tracing, TracingSettings}; use overwatch_derive::*; use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; @@ -18,7 +18,7 @@ use nomos_mempool::{ #[derive(Services)] struct MockPoolNode { - logging: ServiceHandle, + logging: ServiceHandle, network: ServiceHandle>, mockpool: ServiceHandle< TxMempoolService, MockTxId>>, @@ -66,7 +66,7 @@ fn test_mockmempool() { network: (), registry: None, }, - logging: LoggerSettings::default(), + logging: TracingSettings::default(), }, None, ) diff --git a/nomos-services/log/Cargo.toml b/nomos-services/tracing/Cargo.toml similarity index 56% rename from nomos-services/log/Cargo.toml rename to nomos-services/tracing/Cargo.toml index 7a70fb13..c828e217 100644 --- a/nomos-services/log/Cargo.toml +++ b/nomos-services/tracing/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "nomos-log" +name = "nomos-tracing-service" version = "0.1.0" edition = "2021" @@ -8,10 +8,16 @@ edition = "2021" [dependencies] async-trait = "0.1" futures = "0.3" +nomos-tracing = { path = "../../nomos-tracing" } +opentelemetry = { version = "0.25" } +opentelemetry-otlp = "0.25" +opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["time"] } tracing = "0.1" tracing-appender = "0.2" -tracing-subscriber = { version = "0.3", features = ["json"] } +tracing-loki = "0.2.5" +tracing-opentelemetry = "0.26" +tracing-subscriber = { version = "0.3", features = ["json", "registry"] } tracing-gelf = "0.7" diff --git a/nomos-services/log/src/lib.rs b/nomos-services/tracing/src/lib.rs similarity index 54% rename from nomos-services/log/src/lib.rs rename to nomos-services/tracing/src/lib.rs index 8d7648ee..f4fb6cbd 100644 --- a/nomos-services/log/src/lib.rs +++ b/nomos-services/tracing/src/lib.rs @@ -1,17 +1,12 @@ // std -use futures::StreamExt; use std::fmt::{Debug, Formatter}; use std::io::Write; -use std::net::SocketAddr; -use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::time::Duration; // crates -use serde::{Deserialize, Serialize}; -use tracing::{error, Level}; -use tracing_appender::non_blocking::WorkerGuard; -use tracing_subscriber::{filter::LevelFilter, prelude::*}; -// internal +use futures::StreamExt; +use nomos_tracing::logging::gelf::{create_gelf_layer, GelfConfig}; +use nomos_tracing::logging::local::{create_file_layer, create_writer_layer, FileConfig}; +use nomos_tracing::logging::loki::{create_loki_layer, LokiConfig}; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -19,12 +14,18 @@ use overwatch_rs::services::{ state::{NoOperator, NoState}, ServiceCore, ServiceData, }; +use serde::{Deserialize, Serialize}; +use tracing::{error, Level}; +use tracing_appender::non_blocking::WorkerGuard; +use tracing_subscriber::filter::LevelFilter; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +// internal +use nomos_tracing::tracing::otlp::{create_otlp_tracing_layer, OtlpTracingConfig}; -const GELF_RECONNECT_INTERVAL: u64 = 10; - -pub struct Logger { +pub struct Tracing { service_state: ServiceStateHandle, - worker_guard: Option, + logger_guard: Option, } /// This is a wrapper around a writer to allow cloning which is @@ -67,14 +68,10 @@ impl Debug for SharedWriter { } #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum LoggerBackend { - Gelf { - addr: SocketAddr, - }, - File { - directory: PathBuf, - prefix: Option, - }, +pub enum LoggerLayer { + Gelf(GelfConfig), + File(FileConfig), + Loki(LokiConfig), Stdout, Stderr, #[serde(skip)] @@ -84,68 +81,50 @@ pub enum LoggerBackend { } #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct LoggerSettings { - pub backend: LoggerBackend, - pub format: LoggerFormat, +pub enum TracingLayer { + Otlp(OtlpTracingConfig), + None, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TracingSettings { + pub logger: LoggerLayer, + pub tracing: TracingLayer, #[serde(with = "serde_level")] pub level: Level, } -impl Default for LoggerSettings { +impl Default for TracingSettings { fn default() -> Self { Self { - backend: LoggerBackend::Stdout, - format: LoggerFormat::Json, + logger: LoggerLayer::Stdout, + tracing: TracingLayer::None, level: Level::DEBUG, } } } -impl LoggerSettings { +impl TracingSettings { #[inline] - pub const fn new(backend: LoggerBackend, format: LoggerFormat, level: Level) -> Self { + pub const fn new(logger: LoggerLayer, tracing: TracingLayer, level: Level) -> Self { Self { - backend, - format, + logger, + tracing, level, } } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum LoggerFormat { - Json, - Plain, -} - -impl ServiceData for Logger { - const SERVICE_ID: &'static str = "Logger"; - type Settings = LoggerSettings; +impl ServiceData for Tracing { + const SERVICE_ID: &'static str = "Tracing"; + type Settings = TracingSettings; type State = NoState; type StateOperator = NoOperator; type Message = NoMessage; } -// a macro and not a function because it's a bit of a type -// mess with `Layer` -macro_rules! registry_init { - ($layer:expr, $format:expr, $level:expr) => { - if let LoggerFormat::Json = $format { - tracing_subscriber::registry() - .with(LevelFilter::from($level)) - .with($layer) - .init(); - } else { - tracing_subscriber::registry() - .with(LevelFilter::from($level)) - .with($layer) - .init(); - } - }; -} - #[async_trait::async_trait] -impl ServiceCore for Logger { +impl ServiceCore for Tracing { fn init(service_state: ServiceStateHandle) -> Result { #[cfg(test)] use std::sync::Once; @@ -153,72 +132,79 @@ impl ServiceCore for Logger { static ONCE_INIT: Once = Once::new(); let config = service_state.settings_reader.get_updated_settings(); - let (non_blocking, _guard) = match config.backend { - LoggerBackend::Gelf { addr } => { - let (layer, mut task) = tracing_gelf::Logger::builder() - .connect_tcp(addr) - .expect("Connect to the graylog instance"); - service_state.overwatch_handle.runtime().spawn(async move { - loop { - if task.connect().await.0.is_empty() { - break; - } else { - eprintln!("Failed to connect to graylog"); - let delay = Duration::from_secs(GELF_RECONNECT_INTERVAL); - tokio::time::sleep(delay).await; - } - } - }); - #[cfg(test)] - ONCE_INIT.call_once(move || { - registry_init!(layer, config.format, config.level); - }); - #[cfg(not(test))] - registry_init!(layer, config.format, config.level); + let mut layers: Vec + Send + Sync>> = vec![]; - return Ok(Self { - service_state, - worker_guard: None, - }); + let (logger_layer, logger_guard): ( + Box + Send + Sync>, + Option, + ) = match config.logger { + LoggerLayer::Gelf(config) => { + let gelf_layer = + create_gelf_layer(config, service_state.overwatch_handle.runtime())?; + (Box::new(gelf_layer), None) } - LoggerBackend::File { directory, prefix } => { - let file_appender = tracing_appender::rolling::hourly( - directory, - prefix.unwrap_or_else(|| PathBuf::from("nomos.log")), - ); - tracing_appender::non_blocking(file_appender) + LoggerLayer::File(config) => { + let (layer, guard) = create_file_layer(config); + (Box::new(layer), Some(guard)) } - LoggerBackend::Stdout => tracing_appender::non_blocking(std::io::stdout()), - LoggerBackend::Stderr => tracing_appender::non_blocking(std::io::stderr()), - LoggerBackend::Writer(writer) => tracing_appender::non_blocking(writer), - LoggerBackend::None => { - return Ok(Self { - service_state, - worker_guard: None, - }) + LoggerLayer::Loki(config) => { + let loki_layer = + create_loki_layer(config, service_state.overwatch_handle.runtime())?; + (Box::new(loki_layer), None) } + LoggerLayer::Stdout => { + let (layer, guard) = create_writer_layer(std::io::stdout()); + (Box::new(layer), Some(guard)) + } + LoggerLayer::Stderr => { + let (layer, guard) = create_writer_layer(std::io::stderr()); + (Box::new(layer), Some(guard)) + } + LoggerLayer::Writer(writer) => { + let (layer, guard) = create_writer_layer(writer); + (Box::new(layer), Some(guard)) + } + LoggerLayer::None => (Box::new(tracing_subscriber::fmt::Layer::new()), None), }; - let layer = tracing_subscriber::fmt::Layer::new() - .with_level(true) - .with_writer(non_blocking); + layers.push(logger_layer); + + if let TracingLayer::Otlp(config) = config.tracing { + let tracing_layer = create_otlp_tracing_layer(config)?; + layers.push(Box::new(tracing_layer)); + } + + // If no layers are created, tracing subscriber is not required. + if layers.is_empty() { + return Ok(Self { + service_state, + logger_guard: None, + }); + } + #[cfg(test)] ONCE_INIT.call_once(move || { - registry_init!(layer, config.format, config.level); + tracing_subscriber::registry() + .with(LevelFilter::from(config.level)) + .with(layers) + .init(); }); #[cfg(not(test))] - registry_init!(layer, config.format, config.level); + tracing_subscriber::registry() + .with(LevelFilter::from(config.level)) + .with(layers) + .init(); Ok(Self { service_state, - worker_guard: Some(_guard), + logger_guard, }) } async fn run(self) -> Result<(), overwatch_rs::DynError> { let Self { service_state, - worker_guard, + logger_guard, } = self; // keep the handle alive without stressing the runtime let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); @@ -227,7 +213,7 @@ impl ServiceCore for Logger { match msg { LifecycleMessage::Shutdown(sender) => { // flush pending logs before signaling message processing - drop(worker_guard); + drop(logger_guard); if sender.send(()).is_err() { error!( "Error sending successful shutdown signal from service {}", diff --git a/nomos-tracing/Cargo.toml b/nomos-tracing/Cargo.toml new file mode 100644 index 00000000..b05382af --- /dev/null +++ b/nomos-tracing/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "nomos-tracing" +version = "0.1.0" +edition = "2021" + +[dependencies] +opentelemetry = { version = "0.26" } +opentelemetry-otlp = "0.26" +opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] } +serde = { version = "1.0", features = ["derive"] } +tokio = "1" +tracing = "0.1" +tracing-appender = "0.2" +tracing-loki = "0.2.5" +tracing-opentelemetry = "0.27" +tracing-subscriber = { version = "0.3", features = ["json", "registry"] } +tracing-gelf = "0.7" +url = { version = "2", features = ["serde"] } diff --git a/nomos-tracing/src/lib.rs b/nomos-tracing/src/lib.rs new file mode 100644 index 00000000..8f89f062 --- /dev/null +++ b/nomos-tracing/src/lib.rs @@ -0,0 +1,2 @@ +pub mod logging; +pub mod tracing; diff --git a/nomos-tracing/src/logging/gelf.rs b/nomos-tracing/src/logging/gelf.rs new file mode 100644 index 00000000..1b217b85 --- /dev/null +++ b/nomos-tracing/src/logging/gelf.rs @@ -0,0 +1,36 @@ +// std +use std::{error::Error, net::SocketAddr, time::Duration}; +// crates +use serde::{Deserialize, Serialize}; +use tokio::runtime::Handle; +// internal + +const GELF_RECONNECT_INTERVAL: u64 = 10; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GelfConfig { + pub addr: SocketAddr, +} + +pub fn create_gelf_layer( + config: GelfConfig, + handle: &Handle, +) -> Result> { + let (layer, mut task) = tracing_gelf::Logger::builder() + .connect_tcp(config.addr) + .expect("Connect to the graylog instance"); + + handle.spawn(async move { + loop { + if task.connect().await.0.is_empty() { + break; + } else { + eprintln!("Failed to connect to graylog"); + let delay = Duration::from_secs(GELF_RECONNECT_INTERVAL); + tokio::time::sleep(delay).await; + } + } + }); + + Ok(layer) +} diff --git a/nomos-tracing/src/logging/local.rs b/nomos-tracing/src/logging/local.rs new file mode 100644 index 00000000..f9798d8e --- /dev/null +++ b/nomos-tracing/src/logging/local.rs @@ -0,0 +1,40 @@ +// std +use std::{io::Write, path::PathBuf}; +// crates +use serde::{Deserialize, Serialize}; +use tracing_appender::non_blocking::WorkerGuard; +use tracing_subscriber::fmt::{ + format::{DefaultFields, Format}, + Layer, +}; +// internal + +pub type FmtLayer = Layer; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct FileConfig { + pub directory: PathBuf, + pub prefix: Option, +} + +pub fn create_file_layer(config: FileConfig) -> (FmtLayer, WorkerGuard) { + let file_appender = tracing_appender::rolling::hourly( + config.directory, + config.prefix.unwrap_or_else(|| PathBuf::from("nomos.log")), + ); + + create_writer_layer(file_appender) +} + +pub fn create_writer_layer(writer: W) -> (FmtLayer, WorkerGuard) +where + W: Write + Send + 'static, +{ + let (non_blocking, guard) = tracing_appender::non_blocking(writer); + + let layer = tracing_subscriber::fmt::Layer::new() + .with_level(true) + .with_writer(non_blocking); + + (layer, guard) +} diff --git a/nomos-tracing/src/logging/loki.rs b/nomos-tracing/src/logging/loki.rs new file mode 100644 index 00000000..5f00aa9f --- /dev/null +++ b/nomos-tracing/src/logging/loki.rs @@ -0,0 +1,29 @@ +// std +use std::error::Error; +// crates +use serde::{Deserialize, Serialize}; +use tokio::runtime::Handle; +use url::Url; +// internal + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LokiConfig { + pub endpoint: Url, + pub host_identifier: String, +} + +pub fn create_loki_layer( + config: LokiConfig, + handle: &Handle, +) -> Result> { + let (loki_layer, task) = tracing_loki::layer( + config.endpoint, + vec![("host".into(), config.host_identifier)] + .into_iter() + .collect(), + Default::default(), + )?; + + handle.spawn(task); + Ok(loki_layer) +} diff --git a/nomos-tracing/src/logging/mod.rs b/nomos-tracing/src/logging/mod.rs new file mode 100644 index 00000000..cd6970e4 --- /dev/null +++ b/nomos-tracing/src/logging/mod.rs @@ -0,0 +1,3 @@ +pub mod gelf; +pub mod local; +pub mod loki; diff --git a/nomos-tracing/src/tracing/mod.rs b/nomos-tracing/src/tracing/mod.rs new file mode 100644 index 00000000..95eb89aa --- /dev/null +++ b/nomos-tracing/src/tracing/mod.rs @@ -0,0 +1 @@ +pub mod otlp; diff --git a/nomos-tracing/src/tracing/otlp.rs b/nomos-tracing/src/tracing/otlp.rs new file mode 100644 index 00000000..851aa07c --- /dev/null +++ b/nomos-tracing/src/tracing/otlp.rs @@ -0,0 +1,42 @@ +// std +use std::error::Error; +// crates +use opentelemetry::{global, trace::TracerProvider as _}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::trace::{BatchConfig, Sampler, Tracer}; +use serde::{Deserialize, Serialize}; +use tracing::Subscriber; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::registry::LookupSpan; +use url::Url; +// internal + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OtlpTracingConfig { + pub endpoint: Url, + pub sample_ratio: f64, +} + +pub fn create_otlp_tracing_layer( + config: OtlpTracingConfig, +) -> Result, Box> +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + let otel_exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(config.endpoint); + let tracer_provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config(opentelemetry_sdk::trace::Config::default().with_sampler( + Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(config.sample_ratio))), + )) + .with_batch_config(BatchConfig::default()) + .with_exporter(otel_exporter) + .install_batch(opentelemetry_sdk::runtime::Tokio)?; + + global::set_tracer_provider(tracer_provider.clone()); + let tracer: opentelemetry_sdk::trace::Tracer = tracer_provider.tracer("NomosTracer"); + + Ok(OpenTelemetryLayer::new(tracer)) +} diff --git a/testnet/cfgsync.yaml b/testnet/cfgsync.yaml index 2878d9a0..750658aa 100644 --- a/testnet/cfgsync.yaml +++ b/testnet/cfgsync.yaml @@ -14,3 +14,7 @@ num_subnets: 2 old_blobs_check_interval_secs: 5 blobs_validity_duration_secs: 60 global_params_path: "/kzgrs_test_params" + +# Tracing params +tempo_endpoint: "http://tempo:4317" +loki_endpoint: "http://loki:3100" diff --git a/testnet/cfgsync/Cargo.toml b/testnet/cfgsync/Cargo.toml index 65442152..4229e187 100644 --- a/testnet/cfgsync/Cargo.toml +++ b/testnet/cfgsync/Cargo.toml @@ -9,10 +9,13 @@ clap = { version = "4", features = ["derive"] } nomos-executor = { path = "../../nodes/nomos-executor" } nomos-libp2p = { path = "../../nomos-libp2p" } nomos-node = { path = "../../nodes/nomos-node" } +nomos-tracing = { path = "../../nomos-tracing" } +nomos-tracing-service = { path = "../../nomos-services/tracing" } rand = "0.8" reqwest = { version = "0.11", features = ["json", "rustls-tls"] } tests = { path = "../../tests" } tokio = { version = "1", features = ["rt-multi-thread"] } +tracing = "0.1" serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml = "0.9" diff --git a/testnet/cfgsync/src/bin/cfgsync-client.rs b/testnet/cfgsync/src/bin/cfgsync-client.rs index 931e51b7..34ab2847 100644 --- a/testnet/cfgsync/src/bin/cfgsync-client.rs +++ b/testnet/cfgsync/src/bin/cfgsync-client.rs @@ -10,6 +10,7 @@ use serde::{de::DeserializeOwned, Serialize}; #[derive(Serialize)] struct ClientIp { ip: Ipv4Addr, + identifier: String, } fn parse_ip(ip_str: String) -> Ipv4Addr { @@ -21,6 +22,7 @@ fn parse_ip(ip_str: String) -> Ipv4Addr { async fn get_config( ip: Ipv4Addr, + identifier: String, url: &str, config_file: &str, ) -> Result<(), String> { @@ -28,7 +30,7 @@ async fn get_config( let response = client .post(url) - .json(&ClientIp { ip }) + .json(&ClientIp { ip, identifier }) .send() .await .map_err(|err| format!("Failed to send IP announcement: {}", err))?; @@ -57,6 +59,8 @@ async fn main() { let config_file_path = env::var("CFG_FILE_PATH").unwrap_or("config.yaml".to_string()); let server_addr = env::var("CFG_SERVER_ADDR").unwrap_or("http://127.0.0.1:4400".to_string()); let ip = parse_ip(env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_string())); + let identifier = + env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_string()); let host_kind = env::var("CFG_HOST_KIND").unwrap_or_else(|_| "validator".to_string()); @@ -67,9 +71,13 @@ async fn main() { let config_result = match host_kind.as_str() { "executor" => { - get_config::(ip, &node_config_endpoint, &config_file_path).await + get_config::(ip, identifier, &node_config_endpoint, &config_file_path) + .await + } + _ => { + get_config::(ip, identifier, &node_config_endpoint, &config_file_path) + .await } - _ => get_config::(ip, &node_config_endpoint, &config_file_path).await, }; // Handle error if the config request fails diff --git a/testnet/cfgsync/src/bin/cfgsync-server.rs b/testnet/cfgsync/src/bin/cfgsync-server.rs index 0a38bbfb..276ef96b 100644 --- a/testnet/cfgsync/src/bin/cfgsync-server.rs +++ b/testnet/cfgsync/src/bin/cfgsync-server.rs @@ -10,7 +10,9 @@ use axum::Json; use axum::{http::StatusCode, response::IntoResponse, routing::post, Router}; use cfgsync::config::Host; use cfgsync::repo::{ConfigRepo, RepoResponse}; +use cfgsync::TracingParams; use clap::Parser; +use reqwest::Url; use serde::{Deserialize, Serialize}; use tests::nodes::executor::create_executor_config; use tests::nodes::validator::create_validator_config; @@ -43,6 +45,10 @@ struct CfgSyncConfig { old_blobs_check_interval_secs: u64, blobs_validity_duration_secs: u64, global_params_path: String, + + // Tracing params + tempo_endpoint: Url, + loki_endpoint: Url, } impl CfgSyncConfig { @@ -72,21 +78,29 @@ impl CfgSyncConfig { global_params_path: self.global_params_path.clone(), } } + + fn to_tracing_params(&self) -> TracingParams { + TracingParams { + tempo_endpoint: self.tempo_endpoint.clone(), + loki_endpoint: self.loki_endpoint.clone(), + } + } } #[derive(Serialize, Deserialize)] struct ClientIp { ip: Ipv4Addr, + identifier: String, } async fn validator_config( State(config_repo): State>, Json(payload): Json, ) -> impl IntoResponse { - let ClientIp { ip } = payload; + let ClientIp { ip, identifier } = payload; let (reply_tx, reply_rx) = channel(); - config_repo.register(Host::default_validator_from_ip(ip), reply_tx); + config_repo.register(Host::default_validator_from_ip(ip, identifier), reply_tx); match reply_rx.await { Ok(config_response) => match config_response { @@ -104,10 +118,10 @@ async fn executor_config( State(config_repo): State>, Json(payload): Json, ) -> impl IntoResponse { - let ClientIp { ip } = payload; + let ClientIp { ip, identifier } = payload; let (reply_tx, reply_rx) = channel(); - config_repo.register(Host::default_executor_from_ip(ip), reply_tx); + config_repo.register(Host::default_executor_from_ip(ip, identifier), reply_tx); match reply_rx.await { Ok(config_response) => match config_response { @@ -129,13 +143,15 @@ async fn main() { eprintln!("{}", err); process::exit(1); }); - let consensus_config = config.to_consensus_params(); - let da_config = config.to_da_params(); + let consensus_params = config.to_consensus_params(); + let da_params = config.to_da_params(); + let tracing_params = config.to_tracing_params(); let config_repo = ConfigRepo::new( config.n_hosts, - consensus_config, - da_config, + consensus_params, + da_params, + tracing_params, Duration::from_secs(config.timeout), ); let app = Router::new() diff --git a/testnet/cfgsync/src/config.rs b/testnet/cfgsync/src/config.rs index f9fa10b8..357998c1 100644 --- a/testnet/cfgsync/src/config.rs +++ b/testnet/cfgsync/src/config.rs @@ -2,6 +2,8 @@ use std::{collections::HashMap, net::Ipv4Addr, str::FromStr}; // crates use nomos_libp2p::{Multiaddr, PeerId, Protocol}; +use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig}; +use nomos_tracing_service::{LoggerLayer, TracingSettings}; use rand::{thread_rng, Rng}; use tests::topology::configs::{ api::GeneralApiConfig, @@ -9,9 +11,12 @@ use tests::topology::configs::{ da::{create_da_configs, DaParams}, mix::create_mix_configs, network::create_network_configs, + tracing::GeneralTracingConfig, GeneralConfig, }; +use tracing::Level; // internal +use crate::TracingParams; const DEFAULT_LIBP2P_NETWORK_PORT: u16 = 3000; const DEFAULT_DA_NETWORK_PORT: u16 = 3300; @@ -28,26 +33,29 @@ pub enum HostKind { pub struct Host { pub kind: HostKind, pub ip: Ipv4Addr, + pub identifier: String, pub network_port: u16, pub da_network_port: u16, pub mix_port: u16, } impl Host { - pub fn default_validator_from_ip(ip: Ipv4Addr) -> Self { + pub fn default_validator_from_ip(ip: Ipv4Addr, identifier: String) -> Self { Self { kind: HostKind::Validator, ip, + identifier, network_port: DEFAULT_LIBP2P_NETWORK_PORT, da_network_port: DEFAULT_DA_NETWORK_PORT, mix_port: DEFAULT_MIX_PORT, } } - pub fn default_executor_from_ip(ip: Ipv4Addr) -> Self { + pub fn default_executor_from_ip(ip: Ipv4Addr, identifier: String) -> Self { Self { kind: HostKind::Executor, ip, + identifier, network_port: DEFAULT_LIBP2P_NETWORK_PORT, da_network_port: DEFAULT_DA_NETWORK_PORT, mix_port: DEFAULT_MIX_PORT, @@ -58,6 +66,7 @@ impl Host { pub fn create_node_configs( consensus_params: ConsensusParams, da_params: DaParams, + tracing_params: TracingParams, hosts: Vec, ) -> HashMap { let mut ids = vec![[0; 32]; consensus_params.n_participants]; @@ -115,6 +124,10 @@ pub fn create_node_configs( Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap(); mix_config.backend.membership = host_mix_membership.clone(); + // Tracing config. + let tracing_config = + tracing_config_for_grafana(tracing_params.clone(), host.identifier.clone()); + configured_hosts.insert( host.clone(), GeneralConfig { @@ -123,6 +136,7 @@ pub fn create_node_configs( network_config, mix_config, api_config, + tracing_config, }, ); } @@ -182,6 +196,22 @@ fn extract_peer_id(multiaddr: &Multiaddr) -> Option { }) } +fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> GeneralTracingConfig { + GeneralTracingConfig { + tracing_settings: TracingSettings { + logger: LoggerLayer::Loki(LokiConfig { + endpoint: params.loki_endpoint, + host_identifier: identifier, + }), + tracing: nomos_tracing_service::TracingLayer::Otlp(OtlpTracingConfig { + endpoint: params.tempo_endpoint, + sample_ratio: 1.0, + }), + level: Level::INFO, + }, + } +} + #[cfg(test)] mod cfgsync_tests { use std::str::FromStr; @@ -191,6 +221,8 @@ mod cfgsync_tests { use tests::topology::configs::consensus::ConsensusParams; use tests::topology::configs::da::DaParams; + use crate::TracingParams; + use super::{create_node_configs, Host, HostKind}; #[test] @@ -199,6 +231,7 @@ mod cfgsync_tests { .map(|i| Host { kind: HostKind::Validator, ip: Ipv4Addr::from_str(&format!("10.1.1.{i}")).unwrap(), + identifier: "node".into(), network_port: 3000, da_network_port: 4044, mix_port: 5000, @@ -220,6 +253,10 @@ mod cfgsync_tests { blobs_validity_duration: Duration::from_secs(u64::MAX), global_params_path: "".into(), }, + TracingParams { + tempo_endpoint: "http://test.com".try_into().unwrap(), + loki_endpoint: "http://test.com".try_into().unwrap(), + }, hosts, ); diff --git a/testnet/cfgsync/src/lib.rs b/testnet/cfgsync/src/lib.rs index 551ac1a8..a432324e 100644 --- a/testnet/cfgsync/src/lib.rs +++ b/testnet/cfgsync/src/lib.rs @@ -1,2 +1,10 @@ +use reqwest::Url; + pub mod config; pub mod repo; + +#[derive(Clone)] +pub struct TracingParams { + pub tempo_endpoint: Url, + pub loki_endpoint: Url, +} diff --git a/testnet/cfgsync/src/repo.rs b/testnet/cfgsync/src/repo.rs index ee0fffc0..d6f1f38a 100644 --- a/testnet/cfgsync/src/repo.rs +++ b/testnet/cfgsync/src/repo.rs @@ -11,6 +11,7 @@ use tokio::time::timeout; // internal use crate::config::{create_node_configs, Host}; +use crate::TracingParams; pub enum RepoResponse { Config(Box), @@ -22,6 +23,7 @@ pub struct ConfigRepo { n_hosts: usize, consensus_params: ConsensusParams, da_params: DaParams, + tracing_params: TracingParams, timeout_duration: Duration, } @@ -30,6 +32,7 @@ impl ConfigRepo { n_hosts: usize, consensus_params: ConsensusParams, da_params: DaParams, + tracing_params: TracingParams, timeout_duration: Duration, ) -> Arc { let repo = Arc::new(Self { @@ -37,6 +40,7 @@ impl ConfigRepo { n_hosts, consensus_params, da_params, + tracing_params, timeout_duration, }); @@ -70,6 +74,7 @@ impl ConfigRepo { let configs = create_node_configs( self.consensus_params.clone(), self.da_params.clone(), + self.tracing_params.clone(), hosts, ); diff --git a/testnet/monitoring/tempo.yaml b/testnet/monitoring/tempo.yaml new file mode 100644 index 00000000..52667e2c --- /dev/null +++ b/testnet/monitoring/tempo.yaml @@ -0,0 +1,71 @@ +stream_over_http_enabled: true +server: + http_listen_port: 3200 + log_level: info + + +cache: + background: + writeback_goroutines: 5 + caches: + - roles: + - frontend-search + memcached: + host: memcached:11211 + +query_frontend: + search: + duration_slo: 5s + throughput_bytes_slo: 1.073741824e+09 + trace_by_id: + duration_slo: 5s + +distributor: + receivers: # this configuration will listen on all ports and protocols that tempo is capable of. + jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can + protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver + thrift_http: # + grpc: # for a production deployment you should only enable the receivers you need! + thrift_binary: + thrift_compact: + zipkin: + otlp: + protocols: + http: + grpc: + opencensus: + +ingester: + max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally + +compactor: + compaction: + block_retention: 1h # overall Tempo trace retention. set for demo purposes + +metrics_generator: + registry: + external_labels: + source: tempo + cluster: docker-compose + storage: + path: /var/tempo/generator/wal + remote_write: + - url: http://prometheus:9090/api/v1/write + send_exemplars: true + traces_storage: + path: /var/tempo/generator/traces + +storage: + trace: + backend: local # backend configuration to use + wal: + path: /var/tempo/wal # where to store the wal locally + local: + path: /var/tempo/blocks + +overrides: + defaults: + metrics_generator: + processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator + generate_native_histograms: both + diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 36f0993d..7159c30f 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -12,7 +12,8 @@ nomos-executor = { path = "../nodes/nomos-executor", default-features = false } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] } cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } -nomos-log = { path = "../nomos-services/log" } +nomos-tracing = { path = "../nomos-tracing" } +nomos-tracing-service = { path = "../nomos-services/tracing" } nomos-api = { path = "../nomos-services/api" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } nomos-core = { path = "../nomos-core/chain-defs" } diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs index 88fc2a84..ff79573f 100644 --- a/tests/src/nodes/executor.rs +++ b/tests/src/nodes/executor.rs @@ -20,10 +20,11 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as Verif use nomos_da_verifier::DaVerifierServiceSettings; use nomos_executor::api::backend::AxumBackendSettings; use nomos_executor::config::Config; -use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE}; use nomos_node::RocksBackendSettings; +use nomos_tracing::logging::local::FileConfig; +use nomos_tracing_service::LoggerLayer; use tempfile::NamedTempFile; use crate::adjust_timeout; @@ -62,11 +63,10 @@ impl Executor { let config_path = file.path().to_owned(); // setup logging so that we can intercept it later in testing - config.log.backend = LoggerBackend::File { + config.tracing.logger = LoggerLayer::File(FileConfig { directory: dir.path().to_owned(), prefix: Some(LOGS_PREFIX.into()), - }; - config.log.format = LoggerFormat::Json; + }); config.storage.db_path = dir.path().join("db"); config @@ -197,7 +197,7 @@ pub fn create_executor_config(config: GeneralConfig) -> Config { blob_storage_directory: "./".into(), }, }, - log: Default::default(), + tracing: config.tracing_config.tracing_settings, http: nomos_api::ApiServiceSettings { backend_settings: AxumBackendSettings { address: config.api_config.address, diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs index 072a69ba..77ca63e4 100644 --- a/tests/src/nodes/validator.rs +++ b/tests/src/nodes/validator.rs @@ -13,7 +13,6 @@ use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as Sampl use nomos_da_sampling::{backend::kzgrs::KzgrsSamplingBackendSettings, DaSamplingServiceSettings}; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings}; -use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_mempool::MempoolMetrics; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{ @@ -21,6 +20,8 @@ use nomos_node::api::paths::{ }; use nomos_node::{api::backend::AxumBackendSettings, Config, RocksBackendSettings}; use nomos_node::{BlobInfo, HeaderId, Tx}; +use nomos_tracing::logging::local::FileConfig; +use nomos_tracing_service::LoggerLayer; use reqwest::Url; use tempfile::NamedTempFile; @@ -65,11 +66,10 @@ impl Validator { let config_path = file.path().to_owned(); // setup logging so that we can intercept it later in testing - config.log.backend = LoggerBackend::File { + config.tracing.logger = LoggerLayer::File(FileConfig { directory: dir.path().to_owned(), prefix: Some(LOGS_PREFIX.into()), - }; - config.log.format = LoggerFormat::Json; + }); config.storage.db_path = dir.path().join("db"); config @@ -279,7 +279,7 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { blob_storage_directory: "./".into(), }, }, - log: Default::default(), + tracing: config.tracing_config.tracing_settings, http: nomos_api::ApiServiceSettings { backend_settings: AxumBackendSettings { address: config.api_config.address, diff --git a/tests/src/topology/configs/mod.rs b/tests/src/topology/configs/mod.rs index 2860e00e..b4b62d03 100644 --- a/tests/src/topology/configs/mod.rs +++ b/tests/src/topology/configs/mod.rs @@ -3,12 +3,14 @@ pub mod consensus; pub mod da; pub mod mix; pub mod network; +pub mod tracing; use api::GeneralApiConfig; use consensus::GeneralConsensusConfig; use da::GeneralDaConfig; use mix::GeneralMixConfig; use network::GeneralNetworkConfig; +use tracing::GeneralTracingConfig; #[derive(Clone)] pub struct GeneralConfig { @@ -17,4 +19,5 @@ pub struct GeneralConfig { pub da_config: GeneralDaConfig, pub network_config: GeneralNetworkConfig, pub mix_config: GeneralMixConfig, + pub tracing_config: GeneralTracingConfig, } diff --git a/tests/src/topology/configs/tracing.rs b/tests/src/topology/configs/tracing.rs new file mode 100644 index 00000000..cf87a797 --- /dev/null +++ b/tests/src/topology/configs/tracing.rs @@ -0,0 +1,14 @@ +use nomos_tracing_service::TracingSettings; + +#[derive(Clone)] +pub struct GeneralTracingConfig { + pub tracing_settings: TracingSettings, +} + +pub fn create_tracing_configs(ids: &[[u8; 32]]) -> Vec { + ids.iter() + .map(|_| GeneralTracingConfig { + tracing_settings: Default::default(), + }) + .collect() +} diff --git a/tests/src/topology/mod.rs b/tests/src/topology/mod.rs index cc719a91..b2d15529 100644 --- a/tests/src/topology/mod.rs +++ b/tests/src/topology/mod.rs @@ -3,6 +3,7 @@ pub mod configs; use configs::{ da::{create_da_configs, DaParams}, network::{create_network_configs, NetworkParams}, + tracing::create_tracing_configs, GeneralConfig, }; use rand::{thread_rng, Rng}; @@ -78,6 +79,7 @@ impl Topology { let network_configs = create_network_configs(&ids, config.network_params); let mix_configs = create_mix_configs(&ids); let api_configs = create_api_configs(&ids); + let tracing_configs = create_tracing_configs(&ids); let mut validators = Vec::new(); for i in 0..config.n_validators { @@ -87,6 +89,7 @@ impl Topology { network_config: network_configs[i].to_owned(), mix_config: mix_configs[i].to_owned(), api_config: api_configs[i].to_owned(), + tracing_config: tracing_configs[i].to_owned(), }); validators.push(Validator::spawn(config).await) } @@ -99,6 +102,7 @@ impl Topology { network_config: network_configs[i].to_owned(), mix_config: mix_configs[i].to_owned(), api_config: api_configs[i].to_owned(), + tracing_config: tracing_configs[i].to_owned(), }); executors.push(Executor::spawn(config).await) }