Nomos tracing crate (#849)
* tmp grafana stack * Testnet configure otlp tracing * tmp: Configurable loki and tempo in cfgsync * Rename nomos-log to nomos-tracing-service * nomos-tracing crate for tracing layer creation * Use multiple layers in nomos-tracing-service * Tracing in tests and testnet * Tempo volume instead of local host dir
This commit is contained in:
parent
733f50b521
commit
938bcd7926
|
@ -2,6 +2,7 @@
|
|||
members = [
|
||||
"nomos-core/chain-defs",
|
||||
"nomos-core/cl",
|
||||
"nomos-da/full-replication",
|
||||
"nomos-da/kzgrs",
|
||||
"nomos-da/kzgrs-backend",
|
||||
"nomos-da/storage",
|
||||
|
@ -10,7 +11,7 @@ members = [
|
|||
"nomos-da/network/subnetworks-assignations",
|
||||
"nomos-libp2p",
|
||||
"nomos-services/api",
|
||||
"nomos-services/log",
|
||||
"nomos-services/tracing",
|
||||
"nomos-services/network",
|
||||
"nomos-services/storage",
|
||||
"nomos-services/cryptarchia-consensus",
|
||||
|
@ -24,10 +25,10 @@ members = [
|
|||
"nomos-services/data-availability/dispersal",
|
||||
"nomos-services/data-availability/tests",
|
||||
"nomos-services/mix",
|
||||
"nomos-da/full-replication",
|
||||
"nomos-mix/message",
|
||||
"nomos-mix/network",
|
||||
"nomos-mix/queue",
|
||||
"nomos-tracing",
|
||||
"nomos-cli",
|
||||
"nomos-utils",
|
||||
"nodes/nomos-node",
|
||||
|
|
|
@ -91,6 +91,11 @@ services:
|
|||
grafana:
|
||||
container_name: grafana
|
||||
image: grafana/grafana:latest
|
||||
environment:
|
||||
- GF_AUTH_ANONYMOUS_ENABLED=true
|
||||
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
|
||||
- GF_AUTH_DISABLE_LOGIN_FORM=true
|
||||
- GF_FEATURE_TOGGLES_ENABLE=traceqlEditor traceQLStreaming metricsSummary
|
||||
env_file:
|
||||
- ./testnet/monitoring/grafana/plugins.env
|
||||
volumes:
|
||||
|
@ -101,3 +106,45 @@ services:
|
|||
restart: on-failure
|
||||
depends_on:
|
||||
- prometheus
|
||||
|
||||
loki:
|
||||
image: grafana/loki:2.9.2
|
||||
ports:
|
||||
- "3100:3100"
|
||||
command: -config.file=/etc/loki/local-config.yaml
|
||||
|
||||
# Tempo runs as unpriviliged user, volumes need to be chowned before running.
|
||||
tempo-init:
|
||||
image: &tempoImage grafana/tempo:latest
|
||||
user: root
|
||||
entrypoint:
|
||||
- "chown"
|
||||
- "10001:10001"
|
||||
- "/var/tempo"
|
||||
volumes:
|
||||
- tempo-data:/var/tempo
|
||||
|
||||
memcached:
|
||||
image: memcached:1.6.29
|
||||
container_name: memcached
|
||||
ports:
|
||||
- "11211:11211"
|
||||
environment:
|
||||
- MEMCACHED_MAX_MEMORY=64m
|
||||
- MEMCACHED_THREADS=4
|
||||
|
||||
tempo:
|
||||
image: *tempoImage
|
||||
container_name: tempo
|
||||
command: [ "-config.file=/etc/tempo.yaml" ]
|
||||
volumes:
|
||||
- ./testnet/monitoring/tempo.yaml:/etc/tempo.yaml:z
|
||||
- tempo-data:/var/tempo
|
||||
ports:
|
||||
- "4317:4317" # otlp grpc
|
||||
depends_on:
|
||||
- tempo-init
|
||||
- memcached
|
||||
|
||||
volumes:
|
||||
tempo-data:
|
||||
|
|
|
@ -8,8 +8,8 @@ use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
|
|||
use nomos_mix_service::MixService;
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
use nomos_node::{
|
||||
config::{update_cryptarchia_consensus, update_log, update_mix, update_network, MixArgs},
|
||||
CryptarchiaArgs, HttpArgs, LogArgs, Logger, NetworkArgs, NetworkService, Wire,
|
||||
config::{update_cryptarchia_consensus, update_mix, update_network, update_tracing, MixArgs},
|
||||
CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, NetworkService, Tracing, Wire,
|
||||
};
|
||||
use nomos_storage::backends::rocksdb::RocksBackend;
|
||||
use overwatch_rs::services::ServiceData;
|
||||
|
@ -20,7 +20,7 @@ use crate::ExecutorApiService;
|
|||
|
||||
#[derive(Deserialize, Debug, Clone, Serialize)]
|
||||
pub struct Config {
|
||||
pub log: <Logger as ServiceData>::Settings,
|
||||
pub tracing: <Tracing as ServiceData>::Settings,
|
||||
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
|
||||
pub mix: <MixService<MixBackend, MixNetworkAdapter> as ServiceData>::Settings,
|
||||
pub da_dispersal: <crate::DaDispersal as ServiceData>::Settings,
|
||||
|
@ -43,7 +43,7 @@ impl Config {
|
|||
http_args: HttpArgs,
|
||||
cryptarchia_args: CryptarchiaArgs,
|
||||
) -> Result<Self> {
|
||||
update_log(&mut self.log, log_args)?;
|
||||
update_tracing(&mut self.tracing, log_args)?;
|
||||
update_network(&mut self.network, network_args)?;
|
||||
update_mix(&mut self.mix, mix_args)?;
|
||||
update_http(&mut self.http, http_args)?;
|
||||
|
|
|
@ -28,9 +28,9 @@ use nomos_node::MempoolNetworkAdapter;
|
|||
use nomos_node::Metrics;
|
||||
use nomos_node::NetworkBackend;
|
||||
use nomos_node::{
|
||||
BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier, Logger,
|
||||
NetworkService, NomosDaMembership, RocksBackend, StorageService, SystemSig, Tx, TxMempool,
|
||||
Wire, MB16,
|
||||
BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier,
|
||||
NetworkService, NomosDaMembership, RocksBackend, StorageService, SystemSig, Tracing, Tx,
|
||||
TxMempool, Wire, MB16,
|
||||
};
|
||||
use overwatch_derive::Services;
|
||||
use overwatch_rs::services::handle::ServiceHandle;
|
||||
|
@ -88,7 +88,7 @@ pub type ExecutorDaVerifier = DaVerifier<VerifierNetworkAdapter<NomosDaMembershi
|
|||
#[derive(Services)]
|
||||
pub struct NomosExecutor {
|
||||
#[cfg(feature = "tracing")]
|
||||
logging: ServiceHandle<Logger>,
|
||||
tracing: ServiceHandle<Tracing>,
|
||||
network: ServiceHandle<NetworkService<NetworkBackend>>,
|
||||
mix: ServiceHandle<MixService<MixBackend, MixNetworkAdapter>>,
|
||||
da_dispersal: ServiceHandle<DaDispersal>,
|
||||
|
|
|
@ -79,7 +79,7 @@ fn main() -> Result<()> {
|
|||
network: config.network,
|
||||
mix: config.mix,
|
||||
#[cfg(feature = "tracing")]
|
||||
logging: config.log,
|
||||
tracing: config.tracing,
|
||||
http: config.http,
|
||||
cl_mempool: TxMempoolSettings {
|
||||
backend: (),
|
||||
|
|
|
@ -30,7 +30,8 @@ nomos-da-network-service = { path = "../../nomos-services/data-availability/netw
|
|||
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
|
||||
nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] }
|
||||
nomos-api = { path = "../../nomos-services/api" }
|
||||
nomos-log = { path = "../../nomos-services/log" }
|
||||
nomos-tracing = { path = "../../nomos-tracing" }
|
||||
nomos-tracing-service = { path = "../../nomos-services/tracing" }
|
||||
nomos-mempool = { path = "../../nomos-services/mempool", features = [
|
||||
"mock",
|
||||
"libp2p",
|
||||
|
|
|
@ -8,26 +8,27 @@ use cl::{InputWitness, NoteWitness, NullifierSecret};
|
|||
use clap::{Parser, ValueEnum};
|
||||
use color_eyre::eyre::{eyre, Result};
|
||||
use hex::FromHex;
|
||||
use nomos_tracing::logging::{gelf::GelfConfig, local::FileConfig};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::Level;
|
||||
// internal
|
||||
use crate::{NomosApiService, NomosDaMembership, Wire};
|
||||
use nomos_core::staking::NMO_UNIT;
|
||||
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend;
|
||||
use nomos_da_network_service::NetworkService as DaNetworkService;
|
||||
use nomos_libp2p::{ed25519::SecretKey, Multiaddr};
|
||||
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
|
||||
use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend;
|
||||
use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
|
||||
use nomos_mix_service::MixService;
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
use nomos_network::NetworkService;
|
||||
use nomos_storage::backends::rocksdb::RocksBackend;
|
||||
use nomos_tracing_service::{LoggerLayer, Tracing};
|
||||
use overwatch_rs::services::ServiceData;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use subnetworks_assignations::versions::v1::FillFromNodeList;
|
||||
use tracing::Level;
|
||||
// internal
|
||||
use crate::{NomosApiService, NomosDaMembership, Wire};
|
||||
|
||||
#[derive(ValueEnum, Clone, Debug, Default)]
|
||||
pub enum LoggerBackendType {
|
||||
pub enum LoggerLayerType {
|
||||
Gelf,
|
||||
File,
|
||||
#[default]
|
||||
|
@ -51,10 +52,7 @@ pub struct LogArgs {
|
|||
|
||||
/// Backend type
|
||||
#[clap(long = "log-backend", env = "LOG_BACKEND", value_enum)]
|
||||
backend: Option<LoggerBackendType>,
|
||||
|
||||
#[clap(long = "log-format", env = "LOG_FORMAT")]
|
||||
format: Option<String>,
|
||||
backend: Option<LoggerLayerType>,
|
||||
|
||||
#[clap(long = "log-level", env = "LOG_LEVEL")]
|
||||
level: Option<String>,
|
||||
|
@ -135,7 +133,7 @@ pub struct MetricsArgs {
|
|||
|
||||
#[derive(Deserialize, Debug, Clone, Serialize)]
|
||||
pub struct Config {
|
||||
pub log: <Logger as ServiceData>::Settings,
|
||||
pub tracing: <Tracing as ServiceData>::Settings,
|
||||
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
|
||||
pub mix: <MixService<MixBackend, MixNetworkAdapter> as ServiceData>::Settings,
|
||||
pub da_network:
|
||||
|
@ -159,7 +157,7 @@ impl Config {
|
|||
http_args: HttpArgs,
|
||||
cryptarchia_args: CryptarchiaArgs,
|
||||
) -> Result<Self> {
|
||||
update_log(&mut self.log, log_args)?;
|
||||
update_tracing(&mut self.tracing, log_args)?;
|
||||
update_network(&mut self.network, network_args)?;
|
||||
update_mix(&mut self.mix, mix_args)?;
|
||||
update_http(&mut self.http, http_args)?;
|
||||
|
@ -168,45 +166,39 @@ impl Config {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn update_log(log: &mut <Logger as ServiceData>::Settings, log_args: LogArgs) -> Result<()> {
|
||||
pub fn update_tracing(
|
||||
tracing: &mut <Tracing as ServiceData>::Settings,
|
||||
tracing_args: LogArgs,
|
||||
) -> Result<()> {
|
||||
let LogArgs {
|
||||
backend,
|
||||
log_addr: addr,
|
||||
directory,
|
||||
prefix,
|
||||
format,
|
||||
level,
|
||||
} = log_args;
|
||||
} = tracing_args;
|
||||
|
||||
// Override the file config with the one from env variables.
|
||||
if let Some(backend) = backend {
|
||||
log.backend = match backend {
|
||||
LoggerBackendType::Gelf => LoggerBackend::Gelf {
|
||||
tracing.logger = match backend {
|
||||
LoggerLayerType::Gelf => LoggerLayer::Gelf(GelfConfig {
|
||||
addr: addr
|
||||
.ok_or_else(|| eyre!("Gelf backend requires an address."))?
|
||||
.to_socket_addrs()?
|
||||
.next()
|
||||
.ok_or_else(|| eyre!("Invalid gelf address"))?,
|
||||
},
|
||||
LoggerBackendType::File => LoggerBackend::File {
|
||||
}),
|
||||
LoggerLayerType::File => LoggerLayer::File(FileConfig {
|
||||
directory: directory.ok_or_else(|| eyre!("File backend requires a directory."))?,
|
||||
prefix,
|
||||
},
|
||||
LoggerBackendType::Stdout => LoggerBackend::Stdout,
|
||||
LoggerBackendType::Stderr => LoggerBackend::Stderr,
|
||||
}),
|
||||
LoggerLayerType::Stdout => LoggerLayer::Stdout,
|
||||
LoggerLayerType::Stderr => LoggerLayer::Stderr,
|
||||
}
|
||||
};
|
||||
|
||||
// Update parts of the config.
|
||||
if let Some(format_str) = format {
|
||||
log.format = match format_str.as_str() {
|
||||
"Json" => LoggerFormat::Json,
|
||||
"Plain" => LoggerFormat::Plain,
|
||||
_ => return Err(eyre!("Invalid log format provided.")),
|
||||
};
|
||||
}
|
||||
if let Some(level_str) = level {
|
||||
log.level = match level_str.as_str() {
|
||||
tracing.level = match level_str.as_str() {
|
||||
"DEBUG" => Level::DEBUG,
|
||||
"INFO" => Level::INFO,
|
||||
"ERROR" => Level::ERROR,
|
||||
|
|
|
@ -29,8 +29,6 @@ use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
|
|||
use nomos_da_verifier::network::adapters::validator::Libp2pAdapter as VerifierNetworkAdapter;
|
||||
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter;
|
||||
use nomos_da_verifier::DaVerifierService;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use nomos_log::Logger;
|
||||
pub use nomos_mempool::da::service::{DaMempoolService, DaMempoolSettings};
|
||||
pub use nomos_mempool::network::adapters::libp2p::{
|
||||
Libp2pAdapter as MempoolNetworkAdapter, Settings as MempoolAdapterSettings,
|
||||
|
@ -53,6 +51,8 @@ pub use nomos_storage::{
|
|||
StorageService,
|
||||
};
|
||||
pub use nomos_system_sig::SystemSig;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use nomos_tracing_service::Tracing;
|
||||
use overwatch_derive::*;
|
||||
use overwatch_rs::services::handle::ServiceHandle;
|
||||
use rand_chacha::ChaCha20Rng;
|
||||
|
@ -161,7 +161,7 @@ pub type NodeDaVerifier = DaVerifier<VerifierNetworkAdapter<FillFromNodeList>>;
|
|||
#[derive(Services)]
|
||||
pub struct Nomos {
|
||||
#[cfg(feature = "tracing")]
|
||||
logging: ServiceHandle<Logger>,
|
||||
tracing: ServiceHandle<Tracing>,
|
||||
network: ServiceHandle<NetworkService<NetworkBackend>>,
|
||||
mix: ServiceHandle<MixService<MixBackend, MixNetworkAdapter>>,
|
||||
da_indexer: ServiceHandle<NodeDaIndexer>,
|
||||
|
|
|
@ -78,7 +78,7 @@ fn main() -> Result<()> {
|
|||
network: config.network,
|
||||
mix: config.mix,
|
||||
#[cfg(feature = "tracing")]
|
||||
logging: config.log,
|
||||
tracing: config.tracing,
|
||||
http: config.http,
|
||||
cl_mempool: nomos_mempool::TxMempoolSettings {
|
||||
backend: (),
|
||||
|
|
|
@ -22,7 +22,6 @@ nomos-da-storage = { path = "../../../nomos-da/storage" }
|
|||
nomos-node = { path = "../../../nodes/nomos-node" }
|
||||
nomos-mempool = { path = "../../../nomos-services/mempool" }
|
||||
nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb-backend"] }
|
||||
nomos-log = { path = "../../log" }
|
||||
nomos-network = { path = "../../network", features = ["mock"] }
|
||||
nomos-mix-service = { path = "../../mix" }
|
||||
nomos-libp2p = { path = "../../../nomos-libp2p" }
|
||||
|
|
|
@ -29,7 +29,7 @@ utoipa = { version = "4.0", optional = true }
|
|||
serde_json = { version = "1", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
nomos-log = { path = "../log" }
|
||||
nomos-tracing-service = { path = "../tracing" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
blake2 = "0.10"
|
||||
|
|
|
@ -2,11 +2,11 @@ use nomos_core::{
|
|||
header::HeaderId,
|
||||
tx::mock::{MockTransaction, MockTxId},
|
||||
};
|
||||
use nomos_log::{Logger, LoggerSettings};
|
||||
use nomos_network::{
|
||||
backends::mock::{Mock, MockBackendMessage, MockConfig, MockMessage},
|
||||
NetworkConfig, NetworkMsg, NetworkService,
|
||||
};
|
||||
use nomos_tracing_service::{Tracing, TracingSettings};
|
||||
use overwatch_derive::*;
|
||||
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
|
||||
|
||||
|
@ -18,7 +18,7 @@ use nomos_mempool::{
|
|||
|
||||
#[derive(Services)]
|
||||
struct MockPoolNode {
|
||||
logging: ServiceHandle<Logger>,
|
||||
logging: ServiceHandle<Tracing>,
|
||||
network: ServiceHandle<NetworkService<Mock>>,
|
||||
mockpool: ServiceHandle<
|
||||
TxMempoolService<MockAdapter, MockPool<HeaderId, MockTransaction<MockMessage>, MockTxId>>,
|
||||
|
@ -66,7 +66,7 @@ fn test_mockmempool() {
|
|||
network: (),
|
||||
registry: None,
|
||||
},
|
||||
logging: LoggerSettings::default(),
|
||||
logging: TracingSettings::default(),
|
||||
},
|
||||
None,
|
||||
)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
[package]
|
||||
name = "nomos-log"
|
||||
name = "nomos-tracing-service"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
|
@ -8,10 +8,16 @@ edition = "2021"
|
|||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
futures = "0.3"
|
||||
nomos-tracing = { path = "../../nomos-tracing" }
|
||||
opentelemetry = { version = "0.25" }
|
||||
opentelemetry-otlp = "0.25"
|
||||
opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
tokio = { version = "1", features = ["time"] }
|
||||
tracing = "0.1"
|
||||
tracing-appender = "0.2"
|
||||
tracing-subscriber = { version = "0.3", features = ["json"] }
|
||||
tracing-loki = "0.2.5"
|
||||
tracing-opentelemetry = "0.26"
|
||||
tracing-subscriber = { version = "0.3", features = ["json", "registry"] }
|
||||
tracing-gelf = "0.7"
|
|
@ -1,17 +1,12 @@
|
|||
// std
|
||||
use futures::StreamExt;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::io::Write;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
// crates
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::{error, Level};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::{filter::LevelFilter, prelude::*};
|
||||
// internal
|
||||
use futures::StreamExt;
|
||||
use nomos_tracing::logging::gelf::{create_gelf_layer, GelfConfig};
|
||||
use nomos_tracing::logging::local::{create_file_layer, create_writer_layer, FileConfig};
|
||||
use nomos_tracing::logging::loki::{create_loki_layer, LokiConfig};
|
||||
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||
use overwatch_rs::services::{
|
||||
handle::ServiceStateHandle,
|
||||
|
@ -19,12 +14,18 @@ use overwatch_rs::services::{
|
|||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::{error, Level};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::filter::LevelFilter;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
// internal
|
||||
use nomos_tracing::tracing::otlp::{create_otlp_tracing_layer, OtlpTracingConfig};
|
||||
|
||||
const GELF_RECONNECT_INTERVAL: u64 = 10;
|
||||
|
||||
pub struct Logger {
|
||||
pub struct Tracing {
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
worker_guard: Option<WorkerGuard>,
|
||||
logger_guard: Option<WorkerGuard>,
|
||||
}
|
||||
|
||||
/// This is a wrapper around a writer to allow cloning which is
|
||||
|
@ -67,14 +68,10 @@ impl Debug for SharedWriter {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum LoggerBackend {
|
||||
Gelf {
|
||||
addr: SocketAddr,
|
||||
},
|
||||
File {
|
||||
directory: PathBuf,
|
||||
prefix: Option<PathBuf>,
|
||||
},
|
||||
pub enum LoggerLayer {
|
||||
Gelf(GelfConfig),
|
||||
File(FileConfig),
|
||||
Loki(LokiConfig),
|
||||
Stdout,
|
||||
Stderr,
|
||||
#[serde(skip)]
|
||||
|
@ -84,68 +81,50 @@ pub enum LoggerBackend {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct LoggerSettings {
|
||||
pub backend: LoggerBackend,
|
||||
pub format: LoggerFormat,
|
||||
pub enum TracingLayer {
|
||||
Otlp(OtlpTracingConfig),
|
||||
None,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct TracingSettings {
|
||||
pub logger: LoggerLayer,
|
||||
pub tracing: TracingLayer,
|
||||
#[serde(with = "serde_level")]
|
||||
pub level: Level,
|
||||
}
|
||||
|
||||
impl Default for LoggerSettings {
|
||||
impl Default for TracingSettings {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
backend: LoggerBackend::Stdout,
|
||||
format: LoggerFormat::Json,
|
||||
logger: LoggerLayer::Stdout,
|
||||
tracing: TracingLayer::None,
|
||||
level: Level::DEBUG,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LoggerSettings {
|
||||
impl TracingSettings {
|
||||
#[inline]
|
||||
pub const fn new(backend: LoggerBackend, format: LoggerFormat, level: Level) -> Self {
|
||||
pub const fn new(logger: LoggerLayer, tracing: TracingLayer, level: Level) -> Self {
|
||||
Self {
|
||||
backend,
|
||||
format,
|
||||
logger,
|
||||
tracing,
|
||||
level,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum LoggerFormat {
|
||||
Json,
|
||||
Plain,
|
||||
}
|
||||
|
||||
impl ServiceData for Logger {
|
||||
const SERVICE_ID: &'static str = "Logger";
|
||||
type Settings = LoggerSettings;
|
||||
impl ServiceData for Tracing {
|
||||
const SERVICE_ID: &'static str = "Tracing";
|
||||
type Settings = TracingSettings;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = NoMessage;
|
||||
}
|
||||
|
||||
// a macro and not a function because it's a bit of a type
|
||||
// mess with `Layer<S>`
|
||||
macro_rules! registry_init {
|
||||
($layer:expr, $format:expr, $level:expr) => {
|
||||
if let LoggerFormat::Json = $format {
|
||||
tracing_subscriber::registry()
|
||||
.with(LevelFilter::from($level))
|
||||
.with($layer)
|
||||
.init();
|
||||
} else {
|
||||
tracing_subscriber::registry()
|
||||
.with(LevelFilter::from($level))
|
||||
.with($layer)
|
||||
.init();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ServiceCore for Logger {
|
||||
impl ServiceCore for Tracing {
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
#[cfg(test)]
|
||||
use std::sync::Once;
|
||||
|
@ -153,72 +132,79 @@ impl ServiceCore for Logger {
|
|||
static ONCE_INIT: Once = Once::new();
|
||||
|
||||
let config = service_state.settings_reader.get_updated_settings();
|
||||
let (non_blocking, _guard) = match config.backend {
|
||||
LoggerBackend::Gelf { addr } => {
|
||||
let (layer, mut task) = tracing_gelf::Logger::builder()
|
||||
.connect_tcp(addr)
|
||||
.expect("Connect to the graylog instance");
|
||||
service_state.overwatch_handle.runtime().spawn(async move {
|
||||
loop {
|
||||
if task.connect().await.0.is_empty() {
|
||||
break;
|
||||
} else {
|
||||
eprintln!("Failed to connect to graylog");
|
||||
let delay = Duration::from_secs(GELF_RECONNECT_INTERVAL);
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
#[cfg(test)]
|
||||
ONCE_INIT.call_once(move || {
|
||||
registry_init!(layer, config.format, config.level);
|
||||
});
|
||||
#[cfg(not(test))]
|
||||
registry_init!(layer, config.format, config.level);
|
||||
let mut layers: Vec<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> = vec![];
|
||||
|
||||
return Ok(Self {
|
||||
service_state,
|
||||
worker_guard: None,
|
||||
});
|
||||
let (logger_layer, logger_guard): (
|
||||
Box<dyn tracing_subscriber::Layer<_> + Send + Sync>,
|
||||
Option<WorkerGuard>,
|
||||
) = match config.logger {
|
||||
LoggerLayer::Gelf(config) => {
|
||||
let gelf_layer =
|
||||
create_gelf_layer(config, service_state.overwatch_handle.runtime())?;
|
||||
(Box::new(gelf_layer), None)
|
||||
}
|
||||
LoggerBackend::File { directory, prefix } => {
|
||||
let file_appender = tracing_appender::rolling::hourly(
|
||||
directory,
|
||||
prefix.unwrap_or_else(|| PathBuf::from("nomos.log")),
|
||||
);
|
||||
tracing_appender::non_blocking(file_appender)
|
||||
LoggerLayer::File(config) => {
|
||||
let (layer, guard) = create_file_layer(config);
|
||||
(Box::new(layer), Some(guard))
|
||||
}
|
||||
LoggerBackend::Stdout => tracing_appender::non_blocking(std::io::stdout()),
|
||||
LoggerBackend::Stderr => tracing_appender::non_blocking(std::io::stderr()),
|
||||
LoggerBackend::Writer(writer) => tracing_appender::non_blocking(writer),
|
||||
LoggerBackend::None => {
|
||||
return Ok(Self {
|
||||
service_state,
|
||||
worker_guard: None,
|
||||
})
|
||||
LoggerLayer::Loki(config) => {
|
||||
let loki_layer =
|
||||
create_loki_layer(config, service_state.overwatch_handle.runtime())?;
|
||||
(Box::new(loki_layer), None)
|
||||
}
|
||||
LoggerLayer::Stdout => {
|
||||
let (layer, guard) = create_writer_layer(std::io::stdout());
|
||||
(Box::new(layer), Some(guard))
|
||||
}
|
||||
LoggerLayer::Stderr => {
|
||||
let (layer, guard) = create_writer_layer(std::io::stderr());
|
||||
(Box::new(layer), Some(guard))
|
||||
}
|
||||
LoggerLayer::Writer(writer) => {
|
||||
let (layer, guard) = create_writer_layer(writer);
|
||||
(Box::new(layer), Some(guard))
|
||||
}
|
||||
LoggerLayer::None => (Box::new(tracing_subscriber::fmt::Layer::new()), None),
|
||||
};
|
||||
|
||||
let layer = tracing_subscriber::fmt::Layer::new()
|
||||
.with_level(true)
|
||||
.with_writer(non_blocking);
|
||||
layers.push(logger_layer);
|
||||
|
||||
if let TracingLayer::Otlp(config) = config.tracing {
|
||||
let tracing_layer = create_otlp_tracing_layer(config)?;
|
||||
layers.push(Box::new(tracing_layer));
|
||||
}
|
||||
|
||||
// If no layers are created, tracing subscriber is not required.
|
||||
if layers.is_empty() {
|
||||
return Ok(Self {
|
||||
service_state,
|
||||
logger_guard: None,
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
ONCE_INIT.call_once(move || {
|
||||
registry_init!(layer, config.format, config.level);
|
||||
tracing_subscriber::registry()
|
||||
.with(LevelFilter::from(config.level))
|
||||
.with(layers)
|
||||
.init();
|
||||
});
|
||||
#[cfg(not(test))]
|
||||
registry_init!(layer, config.format, config.level);
|
||||
tracing_subscriber::registry()
|
||||
.with(LevelFilter::from(config.level))
|
||||
.with(layers)
|
||||
.init();
|
||||
|
||||
Ok(Self {
|
||||
service_state,
|
||||
worker_guard: Some(_guard),
|
||||
logger_guard,
|
||||
})
|
||||
}
|
||||
|
||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||
let Self {
|
||||
service_state,
|
||||
worker_guard,
|
||||
logger_guard,
|
||||
} = self;
|
||||
// keep the handle alive without stressing the runtime
|
||||
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||
|
@ -227,7 +213,7 @@ impl ServiceCore for Logger {
|
|||
match msg {
|
||||
LifecycleMessage::Shutdown(sender) => {
|
||||
// flush pending logs before signaling message processing
|
||||
drop(worker_guard);
|
||||
drop(logger_guard);
|
||||
if sender.send(()).is_err() {
|
||||
error!(
|
||||
"Error sending successful shutdown signal from service {}",
|
|
@ -0,0 +1,18 @@
|
|||
[package]
|
||||
name = "nomos-tracing"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
opentelemetry = { version = "0.26" }
|
||||
opentelemetry-otlp = "0.26"
|
||||
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
tokio = "1"
|
||||
tracing = "0.1"
|
||||
tracing-appender = "0.2"
|
||||
tracing-loki = "0.2.5"
|
||||
tracing-opentelemetry = "0.27"
|
||||
tracing-subscriber = { version = "0.3", features = ["json", "registry"] }
|
||||
tracing-gelf = "0.7"
|
||||
url = { version = "2", features = ["serde"] }
|
|
@ -0,0 +1,2 @@
|
|||
pub mod logging;
|
||||
pub mod tracing;
|
|
@ -0,0 +1,36 @@
|
|||
// std
|
||||
use std::{error::Error, net::SocketAddr, time::Duration};
|
||||
// crates
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::runtime::Handle;
|
||||
// internal
|
||||
|
||||
const GELF_RECONNECT_INTERVAL: u64 = 10;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct GelfConfig {
|
||||
pub addr: SocketAddr,
|
||||
}
|
||||
|
||||
pub fn create_gelf_layer(
|
||||
config: GelfConfig,
|
||||
handle: &Handle,
|
||||
) -> Result<tracing_gelf::Logger, Box<dyn Error + Send + Sync>> {
|
||||
let (layer, mut task) = tracing_gelf::Logger::builder()
|
||||
.connect_tcp(config.addr)
|
||||
.expect("Connect to the graylog instance");
|
||||
|
||||
handle.spawn(async move {
|
||||
loop {
|
||||
if task.connect().await.0.is_empty() {
|
||||
break;
|
||||
} else {
|
||||
eprintln!("Failed to connect to graylog");
|
||||
let delay = Duration::from_secs(GELF_RECONNECT_INTERVAL);
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(layer)
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
// std
|
||||
use std::{io::Write, path::PathBuf};
|
||||
// crates
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::fmt::{
|
||||
format::{DefaultFields, Format},
|
||||
Layer,
|
||||
};
|
||||
// internal
|
||||
|
||||
pub type FmtLayer<S> = Layer<S, DefaultFields, Format, tracing_appender::non_blocking::NonBlocking>;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct FileConfig {
|
||||
pub directory: PathBuf,
|
||||
pub prefix: Option<PathBuf>,
|
||||
}
|
||||
|
||||
pub fn create_file_layer<S>(config: FileConfig) -> (FmtLayer<S>, WorkerGuard) {
|
||||
let file_appender = tracing_appender::rolling::hourly(
|
||||
config.directory,
|
||||
config.prefix.unwrap_or_else(|| PathBuf::from("nomos.log")),
|
||||
);
|
||||
|
||||
create_writer_layer(file_appender)
|
||||
}
|
||||
|
||||
pub fn create_writer_layer<S, W>(writer: W) -> (FmtLayer<S>, WorkerGuard)
|
||||
where
|
||||
W: Write + Send + 'static,
|
||||
{
|
||||
let (non_blocking, guard) = tracing_appender::non_blocking(writer);
|
||||
|
||||
let layer = tracing_subscriber::fmt::Layer::new()
|
||||
.with_level(true)
|
||||
.with_writer(non_blocking);
|
||||
|
||||
(layer, guard)
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
// std
|
||||
use std::error::Error;
|
||||
// crates
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::runtime::Handle;
|
||||
use url::Url;
|
||||
// internal
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct LokiConfig {
|
||||
pub endpoint: Url,
|
||||
pub host_identifier: String,
|
||||
}
|
||||
|
||||
pub fn create_loki_layer(
|
||||
config: LokiConfig,
|
||||
handle: &Handle,
|
||||
) -> Result<tracing_loki::Layer, Box<dyn Error + Send + Sync>> {
|
||||
let (loki_layer, task) = tracing_loki::layer(
|
||||
config.endpoint,
|
||||
vec![("host".into(), config.host_identifier)]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
Default::default(),
|
||||
)?;
|
||||
|
||||
handle.spawn(task);
|
||||
Ok(loki_layer)
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
pub mod gelf;
|
||||
pub mod local;
|
||||
pub mod loki;
|
|
@ -0,0 +1 @@
|
|||
pub mod otlp;
|
|
@ -0,0 +1,42 @@
|
|||
// std
|
||||
use std::error::Error;
|
||||
// crates
|
||||
use opentelemetry::{global, trace::TracerProvider as _};
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_sdk::trace::{BatchConfig, Sampler, Tracer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::Subscriber;
|
||||
use tracing_opentelemetry::OpenTelemetryLayer;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
use url::Url;
|
||||
// internal
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct OtlpTracingConfig {
|
||||
pub endpoint: Url,
|
||||
pub sample_ratio: f64,
|
||||
}
|
||||
|
||||
pub fn create_otlp_tracing_layer<S>(
|
||||
config: OtlpTracingConfig,
|
||||
) -> Result<OpenTelemetryLayer<S, Tracer>, Box<dyn Error + Send + Sync>>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
let otel_exporter = opentelemetry_otlp::new_exporter()
|
||||
.tonic()
|
||||
.with_endpoint(config.endpoint);
|
||||
let tracer_provider = opentelemetry_otlp::new_pipeline()
|
||||
.tracing()
|
||||
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_sampler(
|
||||
Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(config.sample_ratio))),
|
||||
))
|
||||
.with_batch_config(BatchConfig::default())
|
||||
.with_exporter(otel_exporter)
|
||||
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
|
||||
|
||||
global::set_tracer_provider(tracer_provider.clone());
|
||||
let tracer: opentelemetry_sdk::trace::Tracer = tracer_provider.tracer("NomosTracer");
|
||||
|
||||
Ok(OpenTelemetryLayer::new(tracer))
|
||||
}
|
|
@ -14,3 +14,7 @@ num_subnets: 2
|
|||
old_blobs_check_interval_secs: 5
|
||||
blobs_validity_duration_secs: 60
|
||||
global_params_path: "/kzgrs_test_params"
|
||||
|
||||
# Tracing params
|
||||
tempo_endpoint: "http://tempo:4317"
|
||||
loki_endpoint: "http://loki:3100"
|
||||
|
|
|
@ -9,10 +9,13 @@ clap = { version = "4", features = ["derive"] }
|
|||
nomos-executor = { path = "../../nodes/nomos-executor" }
|
||||
nomos-libp2p = { path = "../../nomos-libp2p" }
|
||||
nomos-node = { path = "../../nodes/nomos-node" }
|
||||
nomos-tracing = { path = "../../nomos-tracing" }
|
||||
nomos-tracing-service = { path = "../../nomos-services/tracing" }
|
||||
rand = "0.8"
|
||||
reqwest = { version = "0.11", features = ["json", "rustls-tls"] }
|
||||
tests = { path = "../../tests" }
|
||||
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||
tracing = "0.1"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
serde_yaml = "0.9"
|
||||
|
|
|
@ -10,6 +10,7 @@ use serde::{de::DeserializeOwned, Serialize};
|
|||
#[derive(Serialize)]
|
||||
struct ClientIp {
|
||||
ip: Ipv4Addr,
|
||||
identifier: String,
|
||||
}
|
||||
|
||||
fn parse_ip(ip_str: String) -> Ipv4Addr {
|
||||
|
@ -21,6 +22,7 @@ fn parse_ip(ip_str: String) -> Ipv4Addr {
|
|||
|
||||
async fn get_config<Config: Serialize + DeserializeOwned>(
|
||||
ip: Ipv4Addr,
|
||||
identifier: String,
|
||||
url: &str,
|
||||
config_file: &str,
|
||||
) -> Result<(), String> {
|
||||
|
@ -28,7 +30,7 @@ async fn get_config<Config: Serialize + DeserializeOwned>(
|
|||
|
||||
let response = client
|
||||
.post(url)
|
||||
.json(&ClientIp { ip })
|
||||
.json(&ClientIp { ip, identifier })
|
||||
.send()
|
||||
.await
|
||||
.map_err(|err| format!("Failed to send IP announcement: {}", err))?;
|
||||
|
@ -57,6 +59,8 @@ async fn main() {
|
|||
let config_file_path = env::var("CFG_FILE_PATH").unwrap_or("config.yaml".to_string());
|
||||
let server_addr = env::var("CFG_SERVER_ADDR").unwrap_or("http://127.0.0.1:4400".to_string());
|
||||
let ip = parse_ip(env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_string()));
|
||||
let identifier =
|
||||
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_string());
|
||||
|
||||
let host_kind = env::var("CFG_HOST_KIND").unwrap_or_else(|_| "validator".to_string());
|
||||
|
||||
|
@ -67,9 +71,13 @@ async fn main() {
|
|||
|
||||
let config_result = match host_kind.as_str() {
|
||||
"executor" => {
|
||||
get_config::<ExecutorConfig>(ip, &node_config_endpoint, &config_file_path).await
|
||||
get_config::<ExecutorConfig>(ip, identifier, &node_config_endpoint, &config_file_path)
|
||||
.await
|
||||
}
|
||||
_ => {
|
||||
get_config::<ValidatorConfig>(ip, identifier, &node_config_endpoint, &config_file_path)
|
||||
.await
|
||||
}
|
||||
_ => get_config::<ValidatorConfig>(ip, &node_config_endpoint, &config_file_path).await,
|
||||
};
|
||||
|
||||
// Handle error if the config request fails
|
||||
|
|
|
@ -10,7 +10,9 @@ use axum::Json;
|
|||
use axum::{http::StatusCode, response::IntoResponse, routing::post, Router};
|
||||
use cfgsync::config::Host;
|
||||
use cfgsync::repo::{ConfigRepo, RepoResponse};
|
||||
use cfgsync::TracingParams;
|
||||
use clap::Parser;
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tests::nodes::executor::create_executor_config;
|
||||
use tests::nodes::validator::create_validator_config;
|
||||
|
@ -43,6 +45,10 @@ struct CfgSyncConfig {
|
|||
old_blobs_check_interval_secs: u64,
|
||||
blobs_validity_duration_secs: u64,
|
||||
global_params_path: String,
|
||||
|
||||
// Tracing params
|
||||
tempo_endpoint: Url,
|
||||
loki_endpoint: Url,
|
||||
}
|
||||
|
||||
impl CfgSyncConfig {
|
||||
|
@ -72,21 +78,29 @@ impl CfgSyncConfig {
|
|||
global_params_path: self.global_params_path.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_tracing_params(&self) -> TracingParams {
|
||||
TracingParams {
|
||||
tempo_endpoint: self.tempo_endpoint.clone(),
|
||||
loki_endpoint: self.loki_endpoint.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct ClientIp {
|
||||
ip: Ipv4Addr,
|
||||
identifier: String,
|
||||
}
|
||||
|
||||
async fn validator_config(
|
||||
State(config_repo): State<Arc<ConfigRepo>>,
|
||||
Json(payload): Json<ClientIp>,
|
||||
) -> impl IntoResponse {
|
||||
let ClientIp { ip } = payload;
|
||||
let ClientIp { ip, identifier } = payload;
|
||||
|
||||
let (reply_tx, reply_rx) = channel();
|
||||
config_repo.register(Host::default_validator_from_ip(ip), reply_tx);
|
||||
config_repo.register(Host::default_validator_from_ip(ip, identifier), reply_tx);
|
||||
|
||||
match reply_rx.await {
|
||||
Ok(config_response) => match config_response {
|
||||
|
@ -104,10 +118,10 @@ async fn executor_config(
|
|||
State(config_repo): State<Arc<ConfigRepo>>,
|
||||
Json(payload): Json<ClientIp>,
|
||||
) -> impl IntoResponse {
|
||||
let ClientIp { ip } = payload;
|
||||
let ClientIp { ip, identifier } = payload;
|
||||
|
||||
let (reply_tx, reply_rx) = channel();
|
||||
config_repo.register(Host::default_executor_from_ip(ip), reply_tx);
|
||||
config_repo.register(Host::default_executor_from_ip(ip, identifier), reply_tx);
|
||||
|
||||
match reply_rx.await {
|
||||
Ok(config_response) => match config_response {
|
||||
|
@ -129,13 +143,15 @@ async fn main() {
|
|||
eprintln!("{}", err);
|
||||
process::exit(1);
|
||||
});
|
||||
let consensus_config = config.to_consensus_params();
|
||||
let da_config = config.to_da_params();
|
||||
let consensus_params = config.to_consensus_params();
|
||||
let da_params = config.to_da_params();
|
||||
let tracing_params = config.to_tracing_params();
|
||||
|
||||
let config_repo = ConfigRepo::new(
|
||||
config.n_hosts,
|
||||
consensus_config,
|
||||
da_config,
|
||||
consensus_params,
|
||||
da_params,
|
||||
tracing_params,
|
||||
Duration::from_secs(config.timeout),
|
||||
);
|
||||
let app = Router::new()
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
use std::{collections::HashMap, net::Ipv4Addr, str::FromStr};
|
||||
// crates
|
||||
use nomos_libp2p::{Multiaddr, PeerId, Protocol};
|
||||
use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig};
|
||||
use nomos_tracing_service::{LoggerLayer, TracingSettings};
|
||||
use rand::{thread_rng, Rng};
|
||||
use tests::topology::configs::{
|
||||
api::GeneralApiConfig,
|
||||
|
@ -9,9 +11,12 @@ use tests::topology::configs::{
|
|||
da::{create_da_configs, DaParams},
|
||||
mix::create_mix_configs,
|
||||
network::create_network_configs,
|
||||
tracing::GeneralTracingConfig,
|
||||
GeneralConfig,
|
||||
};
|
||||
use tracing::Level;
|
||||
// internal
|
||||
use crate::TracingParams;
|
||||
|
||||
const DEFAULT_LIBP2P_NETWORK_PORT: u16 = 3000;
|
||||
const DEFAULT_DA_NETWORK_PORT: u16 = 3300;
|
||||
|
@ -28,26 +33,29 @@ pub enum HostKind {
|
|||
pub struct Host {
|
||||
pub kind: HostKind,
|
||||
pub ip: Ipv4Addr,
|
||||
pub identifier: String,
|
||||
pub network_port: u16,
|
||||
pub da_network_port: u16,
|
||||
pub mix_port: u16,
|
||||
}
|
||||
|
||||
impl Host {
|
||||
pub fn default_validator_from_ip(ip: Ipv4Addr) -> Self {
|
||||
pub fn default_validator_from_ip(ip: Ipv4Addr, identifier: String) -> Self {
|
||||
Self {
|
||||
kind: HostKind::Validator,
|
||||
ip,
|
||||
identifier,
|
||||
network_port: DEFAULT_LIBP2P_NETWORK_PORT,
|
||||
da_network_port: DEFAULT_DA_NETWORK_PORT,
|
||||
mix_port: DEFAULT_MIX_PORT,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_executor_from_ip(ip: Ipv4Addr) -> Self {
|
||||
pub fn default_executor_from_ip(ip: Ipv4Addr, identifier: String) -> Self {
|
||||
Self {
|
||||
kind: HostKind::Executor,
|
||||
ip,
|
||||
identifier,
|
||||
network_port: DEFAULT_LIBP2P_NETWORK_PORT,
|
||||
da_network_port: DEFAULT_DA_NETWORK_PORT,
|
||||
mix_port: DEFAULT_MIX_PORT,
|
||||
|
@ -58,6 +66,7 @@ impl Host {
|
|||
pub fn create_node_configs(
|
||||
consensus_params: ConsensusParams,
|
||||
da_params: DaParams,
|
||||
tracing_params: TracingParams,
|
||||
hosts: Vec<Host>,
|
||||
) -> HashMap<Host, GeneralConfig> {
|
||||
let mut ids = vec![[0; 32]; consensus_params.n_participants];
|
||||
|
@ -115,6 +124,10 @@ pub fn create_node_configs(
|
|||
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap();
|
||||
mix_config.backend.membership = host_mix_membership.clone();
|
||||
|
||||
// Tracing config.
|
||||
let tracing_config =
|
||||
tracing_config_for_grafana(tracing_params.clone(), host.identifier.clone());
|
||||
|
||||
configured_hosts.insert(
|
||||
host.clone(),
|
||||
GeneralConfig {
|
||||
|
@ -123,6 +136,7 @@ pub fn create_node_configs(
|
|||
network_config,
|
||||
mix_config,
|
||||
api_config,
|
||||
tracing_config,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
@ -182,6 +196,22 @@ fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
|
|||
})
|
||||
}
|
||||
|
||||
fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> GeneralTracingConfig {
|
||||
GeneralTracingConfig {
|
||||
tracing_settings: TracingSettings {
|
||||
logger: LoggerLayer::Loki(LokiConfig {
|
||||
endpoint: params.loki_endpoint,
|
||||
host_identifier: identifier,
|
||||
}),
|
||||
tracing: nomos_tracing_service::TracingLayer::Otlp(OtlpTracingConfig {
|
||||
endpoint: params.tempo_endpoint,
|
||||
sample_ratio: 1.0,
|
||||
}),
|
||||
level: Level::INFO,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod cfgsync_tests {
|
||||
use std::str::FromStr;
|
||||
|
@ -191,6 +221,8 @@ mod cfgsync_tests {
|
|||
use tests::topology::configs::consensus::ConsensusParams;
|
||||
use tests::topology::configs::da::DaParams;
|
||||
|
||||
use crate::TracingParams;
|
||||
|
||||
use super::{create_node_configs, Host, HostKind};
|
||||
|
||||
#[test]
|
||||
|
@ -199,6 +231,7 @@ mod cfgsync_tests {
|
|||
.map(|i| Host {
|
||||
kind: HostKind::Validator,
|
||||
ip: Ipv4Addr::from_str(&format!("10.1.1.{i}")).unwrap(),
|
||||
identifier: "node".into(),
|
||||
network_port: 3000,
|
||||
da_network_port: 4044,
|
||||
mix_port: 5000,
|
||||
|
@ -220,6 +253,10 @@ mod cfgsync_tests {
|
|||
blobs_validity_duration: Duration::from_secs(u64::MAX),
|
||||
global_params_path: "".into(),
|
||||
},
|
||||
TracingParams {
|
||||
tempo_endpoint: "http://test.com".try_into().unwrap(),
|
||||
loki_endpoint: "http://test.com".try_into().unwrap(),
|
||||
},
|
||||
hosts,
|
||||
);
|
||||
|
||||
|
|
|
@ -1,2 +1,10 @@
|
|||
use reqwest::Url;
|
||||
|
||||
pub mod config;
|
||||
pub mod repo;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TracingParams {
|
||||
pub tempo_endpoint: Url,
|
||||
pub loki_endpoint: Url,
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ use tokio::time::timeout;
|
|||
// internal
|
||||
|
||||
use crate::config::{create_node_configs, Host};
|
||||
use crate::TracingParams;
|
||||
|
||||
pub enum RepoResponse {
|
||||
Config(Box<GeneralConfig>),
|
||||
|
@ -22,6 +23,7 @@ pub struct ConfigRepo {
|
|||
n_hosts: usize,
|
||||
consensus_params: ConsensusParams,
|
||||
da_params: DaParams,
|
||||
tracing_params: TracingParams,
|
||||
timeout_duration: Duration,
|
||||
}
|
||||
|
||||
|
@ -30,6 +32,7 @@ impl ConfigRepo {
|
|||
n_hosts: usize,
|
||||
consensus_params: ConsensusParams,
|
||||
da_params: DaParams,
|
||||
tracing_params: TracingParams,
|
||||
timeout_duration: Duration,
|
||||
) -> Arc<Self> {
|
||||
let repo = Arc::new(Self {
|
||||
|
@ -37,6 +40,7 @@ impl ConfigRepo {
|
|||
n_hosts,
|
||||
consensus_params,
|
||||
da_params,
|
||||
tracing_params,
|
||||
timeout_duration,
|
||||
});
|
||||
|
||||
|
@ -70,6 +74,7 @@ impl ConfigRepo {
|
|||
let configs = create_node_configs(
|
||||
self.consensus_params.clone(),
|
||||
self.da_params.clone(),
|
||||
self.tracing_params.clone(),
|
||||
hosts,
|
||||
);
|
||||
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
stream_over_http_enabled: true
|
||||
server:
|
||||
http_listen_port: 3200
|
||||
log_level: info
|
||||
|
||||
|
||||
cache:
|
||||
background:
|
||||
writeback_goroutines: 5
|
||||
caches:
|
||||
- roles:
|
||||
- frontend-search
|
||||
memcached:
|
||||
host: memcached:11211
|
||||
|
||||
query_frontend:
|
||||
search:
|
||||
duration_slo: 5s
|
||||
throughput_bytes_slo: 1.073741824e+09
|
||||
trace_by_id:
|
||||
duration_slo: 5s
|
||||
|
||||
distributor:
|
||||
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
|
||||
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
|
||||
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
|
||||
thrift_http: #
|
||||
grpc: # for a production deployment you should only enable the receivers you need!
|
||||
thrift_binary:
|
||||
thrift_compact:
|
||||
zipkin:
|
||||
otlp:
|
||||
protocols:
|
||||
http:
|
||||
grpc:
|
||||
opencensus:
|
||||
|
||||
ingester:
|
||||
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally
|
||||
|
||||
compactor:
|
||||
compaction:
|
||||
block_retention: 1h # overall Tempo trace retention. set for demo purposes
|
||||
|
||||
metrics_generator:
|
||||
registry:
|
||||
external_labels:
|
||||
source: tempo
|
||||
cluster: docker-compose
|
||||
storage:
|
||||
path: /var/tempo/generator/wal
|
||||
remote_write:
|
||||
- url: http://prometheus:9090/api/v1/write
|
||||
send_exemplars: true
|
||||
traces_storage:
|
||||
path: /var/tempo/generator/traces
|
||||
|
||||
storage:
|
||||
trace:
|
||||
backend: local # backend configuration to use
|
||||
wal:
|
||||
path: /var/tempo/wal # where to store the wal locally
|
||||
local:
|
||||
path: /var/tempo/blocks
|
||||
|
||||
overrides:
|
||||
defaults:
|
||||
metrics_generator:
|
||||
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
|
||||
generate_native_histograms: both
|
||||
|
|
@ -12,7 +12,8 @@ nomos-executor = { path = "../nodes/nomos-executor", default-features = false }
|
|||
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
||||
nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] }
|
||||
cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" }
|
||||
nomos-log = { path = "../nomos-services/log" }
|
||||
nomos-tracing = { path = "../nomos-tracing" }
|
||||
nomos-tracing-service = { path = "../nomos-services/tracing" }
|
||||
nomos-api = { path = "../nomos-services/api" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
nomos-core = { path = "../nomos-core/chain-defs" }
|
||||
|
|
|
@ -20,10 +20,11 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as Verif
|
|||
use nomos_da_verifier::DaVerifierServiceSettings;
|
||||
use nomos_executor::api::backend::AxumBackendSettings;
|
||||
use nomos_executor::config::Config;
|
||||
use nomos_log::{LoggerBackend, LoggerFormat};
|
||||
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
|
||||
use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE};
|
||||
use nomos_node::RocksBackendSettings;
|
||||
use nomos_tracing::logging::local::FileConfig;
|
||||
use nomos_tracing_service::LoggerLayer;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
use crate::adjust_timeout;
|
||||
|
@ -62,11 +63,10 @@ impl Executor {
|
|||
let config_path = file.path().to_owned();
|
||||
|
||||
// setup logging so that we can intercept it later in testing
|
||||
config.log.backend = LoggerBackend::File {
|
||||
config.tracing.logger = LoggerLayer::File(FileConfig {
|
||||
directory: dir.path().to_owned(),
|
||||
prefix: Some(LOGS_PREFIX.into()),
|
||||
};
|
||||
config.log.format = LoggerFormat::Json;
|
||||
});
|
||||
|
||||
config.storage.db_path = dir.path().join("db");
|
||||
config
|
||||
|
@ -197,7 +197,7 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
|
|||
blob_storage_directory: "./".into(),
|
||||
},
|
||||
},
|
||||
log: Default::default(),
|
||||
tracing: config.tracing_config.tracing_settings,
|
||||
http: nomos_api::ApiServiceSettings {
|
||||
backend_settings: AxumBackendSettings {
|
||||
address: config.api_config.address,
|
||||
|
|
|
@ -13,7 +13,6 @@ use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as Sampl
|
|||
use nomos_da_sampling::{backend::kzgrs::KzgrsSamplingBackendSettings, DaSamplingServiceSettings};
|
||||
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
|
||||
use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings};
|
||||
use nomos_log::{LoggerBackend, LoggerFormat};
|
||||
use nomos_mempool::MempoolMetrics;
|
||||
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
|
||||
use nomos_node::api::paths::{
|
||||
|
@ -21,6 +20,8 @@ use nomos_node::api::paths::{
|
|||
};
|
||||
use nomos_node::{api::backend::AxumBackendSettings, Config, RocksBackendSettings};
|
||||
use nomos_node::{BlobInfo, HeaderId, Tx};
|
||||
use nomos_tracing::logging::local::FileConfig;
|
||||
use nomos_tracing_service::LoggerLayer;
|
||||
use reqwest::Url;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
|
@ -65,11 +66,10 @@ impl Validator {
|
|||
let config_path = file.path().to_owned();
|
||||
|
||||
// setup logging so that we can intercept it later in testing
|
||||
config.log.backend = LoggerBackend::File {
|
||||
config.tracing.logger = LoggerLayer::File(FileConfig {
|
||||
directory: dir.path().to_owned(),
|
||||
prefix: Some(LOGS_PREFIX.into()),
|
||||
};
|
||||
config.log.format = LoggerFormat::Json;
|
||||
});
|
||||
|
||||
config.storage.db_path = dir.path().join("db");
|
||||
config
|
||||
|
@ -279,7 +279,7 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
|
|||
blob_storage_directory: "./".into(),
|
||||
},
|
||||
},
|
||||
log: Default::default(),
|
||||
tracing: config.tracing_config.tracing_settings,
|
||||
http: nomos_api::ApiServiceSettings {
|
||||
backend_settings: AxumBackendSettings {
|
||||
address: config.api_config.address,
|
||||
|
|
|
@ -3,12 +3,14 @@ pub mod consensus;
|
|||
pub mod da;
|
||||
pub mod mix;
|
||||
pub mod network;
|
||||
pub mod tracing;
|
||||
|
||||
use api::GeneralApiConfig;
|
||||
use consensus::GeneralConsensusConfig;
|
||||
use da::GeneralDaConfig;
|
||||
use mix::GeneralMixConfig;
|
||||
use network::GeneralNetworkConfig;
|
||||
use tracing::GeneralTracingConfig;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GeneralConfig {
|
||||
|
@ -17,4 +19,5 @@ pub struct GeneralConfig {
|
|||
pub da_config: GeneralDaConfig,
|
||||
pub network_config: GeneralNetworkConfig,
|
||||
pub mix_config: GeneralMixConfig,
|
||||
pub tracing_config: GeneralTracingConfig,
|
||||
}
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
use nomos_tracing_service::TracingSettings;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GeneralTracingConfig {
|
||||
pub tracing_settings: TracingSettings,
|
||||
}
|
||||
|
||||
pub fn create_tracing_configs(ids: &[[u8; 32]]) -> Vec<GeneralTracingConfig> {
|
||||
ids.iter()
|
||||
.map(|_| GeneralTracingConfig {
|
||||
tracing_settings: Default::default(),
|
||||
})
|
||||
.collect()
|
||||
}
|
|
@ -3,6 +3,7 @@ pub mod configs;
|
|||
use configs::{
|
||||
da::{create_da_configs, DaParams},
|
||||
network::{create_network_configs, NetworkParams},
|
||||
tracing::create_tracing_configs,
|
||||
GeneralConfig,
|
||||
};
|
||||
use rand::{thread_rng, Rng};
|
||||
|
@ -78,6 +79,7 @@ impl Topology {
|
|||
let network_configs = create_network_configs(&ids, config.network_params);
|
||||
let mix_configs = create_mix_configs(&ids);
|
||||
let api_configs = create_api_configs(&ids);
|
||||
let tracing_configs = create_tracing_configs(&ids);
|
||||
|
||||
let mut validators = Vec::new();
|
||||
for i in 0..config.n_validators {
|
||||
|
@ -87,6 +89,7 @@ impl Topology {
|
|||
network_config: network_configs[i].to_owned(),
|
||||
mix_config: mix_configs[i].to_owned(),
|
||||
api_config: api_configs[i].to_owned(),
|
||||
tracing_config: tracing_configs[i].to_owned(),
|
||||
});
|
||||
validators.push(Validator::spawn(config).await)
|
||||
}
|
||||
|
@ -99,6 +102,7 @@ impl Topology {
|
|||
network_config: network_configs[i].to_owned(),
|
||||
mix_config: mix_configs[i].to_owned(),
|
||||
api_config: api_configs[i].to_owned(),
|
||||
tracing_config: tracing_configs[i].to_owned(),
|
||||
});
|
||||
executors.push(Executor::spawn(config).await)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue