diff --git a/compose.debug.yml b/compose.debug.yml index aea73219..3d8eceac 100644 --- a/compose.debug.yml +++ b/compose.debug.yml @@ -48,17 +48,6 @@ services: - "chown" - "10001:10001" - "/var/tempo" - volumes: - - tempo-data:/var/tempo - - memcached: - image: memcached:1.6.29 - container_name: memcached - ports: - - "11211:11211" - environment: - - MEMCACHED_MAX_MEMORY=64m - - MEMCACHED_THREADS=4 tempo: image: *tempoImage @@ -66,12 +55,7 @@ services: command: [ "-config.file=/etc/tempo.yaml" ] volumes: - ./testnet/monitoring/tempo.yaml:/etc/tempo.yaml:z - - tempo-data:/var/tempo ports: - "4317:4317" # otlp grpc depends_on: - tempo-init - - memcached - -volumes: - tempo-data: diff --git a/compose.static.yml b/compose.static.yml index e9dfb168..779210ba 100644 --- a/compose.static.yml +++ b/compose.static.yml @@ -122,17 +122,6 @@ services: - "chown" - "10001:10001" - "/var/tempo" - volumes: - - tempo-data:/var/tempo - - memcached: - image: memcached:1.6.29 - container_name: memcached - ports: - - "11211:11211" - environment: - - MEMCACHED_MAX_MEMORY=64m - - MEMCACHED_THREADS=4 tempo: image: *tempoImage @@ -140,12 +129,7 @@ services: command: [ "-config.file=/etc/tempo.yaml" ] volumes: - ./testnet/monitoring/tempo.yaml:/etc/tempo.yaml:z - - tempo-data:/var/tempo ports: - "4317:4317" # otlp grpc depends_on: - tempo-init - - memcached - -volumes: - tempo-data: diff --git a/nodes/nomos-executor/src/main.rs b/nodes/nomos-executor/src/main.rs index fcd3e66d..d783132b 100644 --- a/nodes/nomos-executor/src/main.rs +++ b/nodes/nomos-executor/src/main.rs @@ -10,8 +10,6 @@ use nomos_node::{ DA_TOPIC, }; use overwatch_rs::overwatch::*; -use tracing::{span, Level}; -use uuid::Uuid; // internal #[derive(Parser, Debug)] @@ -53,13 +51,6 @@ fn main() -> Result<()> { cryptarchia_args, )?; - #[cfg(debug_assertions)] - let debug_span = { - let debug_id = Uuid::new_v4(); - span!(Level::DEBUG, "Nomos", debug_id = debug_id.to_string()) - }; - #[cfg(debug_assertions)] - let _guard = debug_span.enter(); let app = OverwatchRunner::::run( NomosExecutorServiceSettings { network: config.network, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 165a12c2..56b1e7a8 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -11,8 +11,6 @@ use nomos_core::{da::blob::info::DispersedBlobInfo, tx::Transaction}; use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; use overwatch_rs::overwatch::*; -use tracing::{span, Level}; -use uuid::Uuid; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -53,13 +51,6 @@ fn main() -> Result<()> { cryptarchia_args, )?; - #[cfg(debug_assertions)] - let debug_span = { - let debug_id = Uuid::new_v4(); - span!(Level::DEBUG, "Nomos", debug_id = debug_id.to_string()) - }; - #[cfg(debug_assertions)] - let _guard = debug_span.enter(); let app = OverwatchRunner::::run( NomosServiceSettings { network: config.network, diff --git a/nomos-services/data-availability/dispersal/Cargo.toml b/nomos-services/data-availability/dispersal/Cargo.toml index ecf8f1cc..b95c7ea0 100644 --- a/nomos-services/data-availability/dispersal/Cargo.toml +++ b/nomos-services/data-availability/dispersal/Cargo.toml @@ -12,6 +12,7 @@ nomos-da-network-core = { path = "../../../nomos-da/network/core" } nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" } nomos-da-sampling = { path = "../sampling" } nomos-mempool = { path = "../../mempool", features = ["libp2p"] } +nomos-tracing = { path = "../../../nomos-tracing" } kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } rand = "0.8" diff --git a/nomos-services/data-availability/dispersal/src/backend/mod.rs b/nomos-services/data-availability/dispersal/src/backend/mod.rs index c09d59cb..aed65f98 100644 --- a/nomos-services/data-availability/dispersal/src/backend/mod.rs +++ b/nomos-services/data-availability/dispersal/src/backend/mod.rs @@ -2,7 +2,10 @@ use crate::adapters::{mempool::DaMempoolAdapter, network::DispersalNetworkAdapte use std::time::Duration; use nomos_core::da::{blob::metadata, DaDispersal, DaEncoder}; +use nomos_tracing::info_with_id; use overwatch_rs::DynError; +use std::fmt::Debug; +use tracing::instrument; pub mod kzgrs; @@ -13,8 +16,8 @@ pub trait DispersalBackend { type Dispersal: DaDispersal::EncodedData>; type NetworkAdapter: DispersalNetworkAdapter; type MempoolAdapter: DaMempoolAdapter; - type Metadata: metadata::Metadata + Send; - type BlobId: Send; + type Metadata: Debug + metadata::Metadata + Send; + type BlobId: AsRef<[u8]> + Send; fn init( config: Self::Settings, @@ -36,12 +39,14 @@ pub trait DispersalBackend { metadata: Self::Metadata, ) -> Result<(), DynError>; + #[instrument(skip_all)] async fn process_dispersal( &self, data: Vec, metadata: Self::Metadata, ) -> Result<(), DynError> { let (blob_id, encoded_data) = self.encode(data).await?; + info_with_id!(blob_id.as_ref(), "ProcessDispersal"); self.disperse(encoded_data).await?; // let disperse and replication happen before pushing to mempool tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/nomos-services/data-availability/dispersal/src/lib.rs b/nomos-services/data-availability/dispersal/src/lib.rs index 50d041e8..ef122376 100644 --- a/nomos-services/data-availability/dispersal/src/lib.rs +++ b/nomos-services/data-availability/dispersal/src/lib.rs @@ -4,7 +4,7 @@ use std::marker::PhantomData; // crates use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; -use tracing::log::error; +use tracing::error; // internal use crate::adapters::mempool::DaMempoolAdapter; use crate::adapters::network::DispersalNetworkAdapter; @@ -120,6 +120,7 @@ where service_state, .. } = self; + let DispersalServiceSettings { backend: backend_settings, } = service_state.settings_reader.get_updated_settings(); @@ -144,6 +145,7 @@ where } } } + Ok(()) } } diff --git a/nomos-services/data-availability/indexer/Cargo.toml b/nomos-services/data-availability/indexer/Cargo.toml index d8200742..5048eaab 100644 --- a/nomos-services/data-availability/indexer/Cargo.toml +++ b/nomos-services/data-availability/indexer/Cargo.toml @@ -14,6 +14,7 @@ nomos-da-storage = { path = "../../../nomos-da/storage" } nomos-da-sampling = { path = "../sampling" } nomos-storage = { path = "../../../nomos-services/storage" } nomos-mempool = { path = "../../../nomos-services/mempool" } +nomos-tracing = { path = "../../../nomos-tracing" } cryptarchia-consensus = { path = "../../../nomos-services/cryptarchia-consensus" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } diff --git a/nomos-services/data-availability/indexer/src/lib.rs b/nomos-services/data-availability/indexer/src/lib.rs index 8a094806..4cc65faf 100644 --- a/nomos-services/data-availability/indexer/src/lib.rs +++ b/nomos-services/data-availability/indexer/src/lib.rs @@ -17,6 +17,7 @@ use nomos_da_sampling::backend::DaSamplingServiceBackend; use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter}; use nomos_storage::backends::StorageBackend; use nomos_storage::StorageService; +use nomos_tracing::info_with_id; use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::{Relay, RelayMessage}; @@ -28,7 +29,9 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use storage::DaStorageAdapter; use tokio::sync::oneshot::Sender; -use tracing::error; +use tracing::{error, instrument}; + +const DA_INDEXER_TAG: ServiceId = "DA-Indexer"; pub type ConsensusRelay< A, @@ -213,7 +216,7 @@ where SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { - const SERVICE_ID: ServiceId = "DaIndexer"; + const SERVICE_ID: ServiceId = DA_INDEXER_TAG; type Settings = IndexerSettings; type State = NoState; type StateOperator = NoOperator; @@ -267,7 +270,8 @@ where DaPoolAdapter::Payload: DispersedBlobInfo + Into + Debug, ClPool::Item: Clone + Eq + Hash + Debug + 'static, ClPool::Key: Debug + 'static, - DaPool::Item: Metadata + Clone + Eq + Hash + Debug + 'static, + DaPool::Item: DispersedBlobInfo + Metadata + Clone + Eq + Hash + Debug + 'static, + ::BlobId: AsRef<[u8]>, DaPool::Key: Debug + 'static, ::Index: Send + Sync, A::Backend: 'static, @@ -283,22 +287,28 @@ where SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { + #[instrument(skip_all)] async fn handle_new_block( storage_adapter: &DaStorage, block: Block, ) -> Result<(), DynError> { for info in block.blobs() { + info_with_id!(info.blob_id().as_ref(), "HandleNewBlock"); storage_adapter.add_index(info).await?; } Ok(()) } + #[instrument(skip_all)] async fn handle_da_msg( storage_adapter: &DaStorage, msg: DaMsg, ) -> Result<(), DynError> { match msg { - DaMsg::AddIndex { info } => storage_adapter.add_index(&info).await, + DaMsg::AddIndex { info } => { + info_with_id!(info.blob_id().as_ref(), "AddIndex"); + storage_adapter.add_index(&info).await + } DaMsg::GetRange { app_id, range, @@ -401,7 +411,7 @@ where + 'static, ::AppId: Send + Sync, ::Index: Send + Sync, - + <::Item as DispersedBlobInfo>::BlobId: AsRef<[u8]>, A::Backend: 'static, TxS: TxSelect, BS: BlobSelect, @@ -434,6 +444,7 @@ where consensus_relay, storage_relay, } = self; + let consensus_relay = consensus_relay .connect() .await @@ -471,6 +482,7 @@ where } } } + Ok(()) } } diff --git a/nomos-services/data-availability/network/Cargo.toml b/nomos-services/data-availability/network/Cargo.toml index 43a468ef..ebf6f9a2 100644 --- a/nomos-services/data-availability/network/Cargo.toml +++ b/nomos-services/data-availability/network/Cargo.toml @@ -11,6 +11,7 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" nomos-core = { path = "../../../nomos-core/chain-defs" } nomos-libp2p = { path = "../../../nomos-libp2p" } nomos-da-network-core = { path = "../../../nomos-da/network/core" } +nomos-tracing = { path = "../../../nomos-tracing" } subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } libp2p = { version = "0.53", features = ["ed25519"] } serde = { version = "1.0", features = ["derive"] } diff --git a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs index 02f79490..d03d60b6 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs @@ -16,6 +16,7 @@ use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalE use nomos_da_network_core::swarm::executor::ExecutorSwarm; use nomos_da_network_core::SubnetworkId; use nomos_libp2p::ed25519; +use nomos_tracing::info_with_id; use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::services::state::NoState; use serde::{Deserialize, Serialize}; @@ -29,6 +30,7 @@ use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; +use tracing::instrument; /// Message that the backend replies to #[derive(Debug)] @@ -217,18 +219,21 @@ where executor_handle.abort(); } + #[instrument(skip_all)] async fn process(&self, msg: Self::Message) { match msg { ExecutorDaNetworkMessage::RequestSample { subnetwork_id, blob_id, } => { + info_with_id!(&blob_id, "RequestSample"); handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await; } ExecutorDaNetworkMessage::RequestDispersal { subnetwork_id, da_blob, } => { + info_with_id!(&da_blob.id(), "RequestDispersal"); if let Err(e) = self.dispersal_blobs_sender.send((subnetwork_id, *da_blob)) { error!("Could not send internal blob to underlying dispersal behaviour: {e}"); } diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs index 5b033306..d516ce5c 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -11,6 +11,7 @@ use nomos_core::da::BlobId; use nomos_da_network_core::swarm::validator::ValidatorSwarm; use nomos_da_network_core::SubnetworkId; use nomos_libp2p::ed25519; +use nomos_tracing::info_with_id; use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::services::state::NoState; use std::fmt::Debug; @@ -21,6 +22,7 @@ use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::BroadcastStream; +use tracing::instrument; /// Message that the backend replies to #[derive(Debug)] @@ -148,12 +150,14 @@ where replies_handle.abort(); } + #[instrument(skip_all)] async fn process(&self, msg: Self::Message) { match msg { DaNetworkMessage::RequestSample { subnetwork_id, blob_id, } => { + info_with_id!(&blob_id, "RequestSample"); handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await; } } diff --git a/nomos-services/data-availability/network/src/lib.rs b/nomos-services/data-availability/network/src/lib.rs index a7ea6e4b..3f18bd59 100644 --- a/nomos-services/data-availability/network/src/lib.rs +++ b/nomos-services/data-availability/network/src/lib.rs @@ -19,6 +19,8 @@ use tokio::sync::oneshot; use tracing::error; // internal +const DA_NETWORK_TAG: ServiceId = "DA-Network"; + pub enum DaNetworkMsg { Process(B::Message), Subscribe { @@ -61,7 +63,7 @@ pub struct NetworkState { } impl ServiceData for NetworkService { - const SERVICE_ID: ServiceId = "DaNetwork"; + const SERVICE_ID: ServiceId = DA_NETWORK_TAG; type Settings = NetworkConfig; type State = NetworkState; type StateOperator = NoOperator; @@ -94,6 +96,7 @@ where }, mut backend, } = self; + let mut lifecycle_stream = lifecycle_handle.message_stream(); loop { tokio::select! { @@ -108,6 +111,7 @@ where } } } + Ok(()) } } diff --git a/nomos-services/data-availability/sampling/Cargo.toml b/nomos-services/data-availability/sampling/Cargo.toml index b515da4d..4121cca4 100644 --- a/nomos-services/data-availability/sampling/Cargo.toml +++ b/nomos-services/data-availability/sampling/Cargo.toml @@ -15,6 +15,7 @@ nomos-da-network-core = { path = "../../../nomos-da/network/core" } nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" } nomos-da-storage = { path = "../../../nomos-da/storage" } nomos-storage = { path = "../../../nomos-services/storage" } +nomos-tracing = { path = "../../../nomos-tracing" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } serde = { version = "1.0", features = ["derive"] } diff --git a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs index 48fafdcd..c2fd24a0 100644 --- a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs +++ b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs @@ -9,8 +9,10 @@ use rand::prelude::*; use serde::{Deserialize, Serialize}; use tokio::time; use tokio::time::Interval; +use tracing::instrument; use kzgrs_backend::common::blob::DaBlob; +use nomos_tracing::info_with_id; // // internal @@ -71,8 +73,10 @@ impl DaSamplingServiceBackend for KzgrsSamplingBackend< self.validated_blobs.clone() } + #[instrument(skip_all)] async fn mark_completed(&mut self, blobs_ids: &[Self::BlobId]) { for id in blobs_ids { + info_with_id!(id, "MarkInBlock"); self.pending_sampling_blobs.remove(id); self.validated_blobs.remove(id); } diff --git a/nomos-services/data-availability/sampling/src/lib.rs b/nomos-services/data-availability/sampling/src/lib.rs index 9e785534..92054b04 100644 --- a/nomos-services/data-availability/sampling/src/lib.rs +++ b/nomos-services/data-availability/sampling/src/lib.rs @@ -22,9 +22,10 @@ use rand::{RngCore, SeedableRng}; use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; use tokio_stream::StreamExt; -use tracing::{error, span, Instrument, Level}; +use tracing::{error, instrument}; // internal use backend::{DaSamplingServiceBackend, SamplingState}; +use nomos_tracing::{error_with_id, info_with_id}; use storage::DaStorageAdapter; const DA_SAMPLING_TAG: ServiceId = "DA-Sampling"; @@ -93,6 +94,7 @@ where } } + #[instrument(skip_all)] async fn handle_service_message( msg: ::Message, network_adapter: &mut DaNetwork, @@ -102,13 +104,14 @@ where DaSamplingServiceMsg::TriggerSampling { blob_id } => { if let SamplingState::Init(sampling_subnets) = sampler.init_sampling(blob_id).await { + info_with_id!(blob_id, "TriggerSampling"); if let Err(e) = network_adapter .start_sampling(blob_id, &sampling_subnets) .await { // we can short circuit the failure from beginning sampler.handle_sampling_error(blob_id).await; - error!("Error sampling for BlobId: {blob_id:?}: {e}"); + error_with_id!(blob_id, "Error sampling for BlobId: {blob_id:?}: {e}"); } } } @@ -124,6 +127,7 @@ where } } + #[instrument(skip_all)] async fn handle_sampling_message( event: SamplingEvent, sampler: &mut Backend, @@ -131,10 +135,12 @@ where ) { match event { SamplingEvent::SamplingSuccess { blob_id, blob } => { + info_with_id!(blob_id, "SamplingSuccess"); sampler.handle_sampling_success(blob_id, *blob).await; } SamplingEvent::SamplingError { error } => { if let Some(blob_id) = error.blob_id() { + info_with_id!(blob_id, "SamplingError"); sampler.handle_sampling_error(*blob_id).await; return; } @@ -145,6 +151,7 @@ where column_idx, response_sender, } => { + info_with_id!(blob_id, "SamplingRequest"); let maybe_blob = storage_adapter .get_blob(blob_id, column_idx) .await @@ -225,6 +232,7 @@ where mut service_state, mut sampler, } = self; + let DaSamplingServiceSettings { storage_adapter_settings, .. @@ -240,30 +248,26 @@ where let storage_adapter = DaStorage::new(storage_adapter_settings, storage_relay).await; let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); - async { - loop { - tokio::select! { - Some(service_message) = service_state.inbound_relay.recv() => { - Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await; - } - Some(sampling_message) = sampling_message_stream.next() => { - Self::handle_sampling_message(sampling_message, &mut sampler, &storage_adapter).await; - } - Some(msg) = lifecycle_stream.next() => { - if Self::should_stop_service(msg).await { - break; - } - } - // cleanup not on time samples - _ = next_prune_tick.tick() => { - sampler.prune(); - } - + loop { + tokio::select! { + Some(service_message) = service_state.inbound_relay.recv() => { + Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await; } + Some(sampling_message) = sampling_message_stream.next() => { + Self::handle_sampling_message(sampling_message, &mut sampler, &storage_adapter).await; + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + // cleanup not on time samples + _ = next_prune_tick.tick() => { + sampler.prune(); + } + } } - .instrument(span!(Level::TRACE, DA_SAMPLING_TAG)) - .await; Ok(()) } diff --git a/nomos-services/data-availability/verifier/Cargo.toml b/nomos-services/data-availability/verifier/Cargo.toml index 7ef4fccf..bf018500 100644 --- a/nomos-services/data-availability/verifier/Cargo.toml +++ b/nomos-services/data-availability/verifier/Cargo.toml @@ -16,13 +16,14 @@ nomos-da-storage = { path = "../../../nomos-da/storage" } nomos-da-network-core = { path = "../../../nomos-da/network/core" } nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" } nomos-storage = { path = "../../../nomos-services/storage" } +nomos-tracing = { path = "../../../nomos-tracing" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } serde = { version = "1.0", features = ["derive"] } subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } tokio = { version = "1", features = ["sync", "macros"] } tokio-stream = "0.1.15" -tracing = "0.1" +tracing = { version = "0.1", features = ["attributes"] } [features] rocksdb-backend = ["nomos-storage/rocksdb-backend"] diff --git a/nomos-services/data-availability/verifier/src/lib.rs b/nomos-services/data-availability/verifier/src/lib.rs index ff8645e4..5df7115b 100644 --- a/nomos-services/data-availability/verifier/src/lib.rs +++ b/nomos-services/data-availability/verifier/src/lib.rs @@ -18,10 +18,12 @@ use overwatch_rs::DynError; use serde::{Deserialize, Serialize}; use tokio::sync::oneshot::Sender; use tokio_stream::StreamExt; -use tracing::{error, span, Instrument, Level}; +use tracing::error; +use tracing::instrument; // internal use backend::VerifierBackend; use network::NetworkAdapter; +use nomos_tracing::info_with_id; use storage::DaStorageAdapter; const DA_VERIFIER_TAG: ServiceId = "DA-Verifier"; @@ -66,10 +68,12 @@ where Backend::DaBlob: Debug + Send, Backend::Error: Error + Send + Sync, Backend::Settings: Clone, + ::BlobId: AsRef<[u8]>, N: NetworkAdapter + Send + 'static, N::Settings: Clone, S: DaStorageAdapter + Send + 'static, { + #[instrument(skip_all)] async fn handle_new_blob( verifier: &Backend, storage_adapter: &S, @@ -80,8 +84,10 @@ where .await? .is_some() { + info_with_id!(blob.id().as_ref(), "VerifierBlobExists"); Ok(()) } else { + info_with_id!(blob.id().as_ref(), "VerifierAddBlob"); verifier.verify(blob)?; storage_adapter.add_blob(blob, &()).await?; Ok(()) @@ -128,6 +134,7 @@ where Backend::Settings: Clone + Send + Sync + 'static, Backend::DaBlob: Debug + Send + Sync + 'static, Backend::Error: Error + Send + Sync + 'static, + ::BlobId: AsRef<[u8]>, N: NetworkAdapter + Send + Sync + 'static, N::Settings: Clone + Send + Sync + 'static, S: DaStorageAdapter + Send + Sync + 'static, @@ -173,36 +180,36 @@ where let storage_adapter = S::new(storage_adapter_settings, storage_relay).await; let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); - async { - loop { - tokio::select! { - Some(blob) = blob_stream.next() => { - if let Err(err) = Self::handle_new_blob(&verifier,&storage_adapter, &blob).await { - error!("Error handling blob {blob:?} due to {err:?}"); - } + loop { + tokio::select! { + Some(blob) = blob_stream.next() => { + if let Err(err) = Self::handle_new_blob(&verifier,&storage_adapter, &blob).await { + error!("Error handling blob {blob:?} due to {err:?}"); } - Some(msg) = service_state.inbound_relay.recv() => { - let DaVerifierMsg::AddBlob { blob, reply_channel } = msg; - match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await { - Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) { + } + Some(msg) = service_state.inbound_relay.recv() => { + let DaVerifierMsg::AddBlob { blob, reply_channel } = msg; + match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await { + Ok(attestation) => { + if let Err(err) = reply_channel.send(Some(attestation)) { error!("Error replying attestation {err:?}"); - }, - Err(err) => { - error!("Error handling blob {blob:?} due to {err:?}"); - if let Err(err) = reply_channel.send(None) { - error!("Error replying attestation {err:?}"); - } - }, - }; - } - Some(msg) = lifecycle_stream.next() => { - if Self::should_stop_service(msg).await { - break; - } + } + }, + Err(err) => { + error!("Error handling blob {blob:?} due to {err:?}"); + if let Err(err) = reply_channel.send(None) { + error!("Error replying attestation {err:?}"); + } + }, + }; + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; } } } - }.instrument(span!(Level::TRACE, DA_VERIFIER_TAG)).await; + } Ok(()) } diff --git a/nomos-tracing/Cargo.toml b/nomos-tracing/Cargo.toml index 9b584c2f..e4a34c6b 100644 --- a/nomos-tracing/Cargo.toml +++ b/nomos-tracing/Cargo.toml @@ -9,6 +9,7 @@ opentelemetry-otlp = { version = "0.26", features = ["grpc-tonic", "http-proto", opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] } opentelemetry-http = { version = "0.26", features = ["reqwest"] } opentelemetry-semantic-conventions = "0.26" +rand = "0.8" reqwest = "0.12" serde = { version = "1.0", features = ["derive"] } tokio = "1" diff --git a/nomos-tracing/src/tracing/macros.rs b/nomos-tracing/src/tracing/macros.rs new file mode 100644 index 00000000..8d93d7fa --- /dev/null +++ b/nomos-tracing/src/tracing/macros.rs @@ -0,0 +1,75 @@ +pub mod prelude { + use std::collections::HashMap; + + pub use opentelemetry::{ + global, + trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState}, + Context, + }; + pub use rand; + pub use tracing::Span; + pub use tracing_opentelemetry::OpenTelemetrySpanExt; + + // In some places it makes sense to use third party ids such as blob_id or tx_id as a tracing + // id. This allows to track the time during which the entity is propagated throughout the + // system. + // + // Opentelemetry tracing standard has a specific remote context format which is supported by + // most tracing software. + // More information at https://www.w3.org/TR/trace-context/#traceparent-header + pub fn set_remote_context(trace_id: TraceId, span_id: SpanId) -> HashMap { + let mut carrier = HashMap::new(); + carrier.insert( + "traceparent".to_string(), + format!("00-{trace_id}-{span_id}-01"), + ); + + carrier + } +} + +#[macro_export] +macro_rules! info_with_id { + ($idbytes:expr, $msg:expr $(, $key:ident = $value:expr)*) => {{ + use $crate::tracing::macros::prelude::*; + use std::convert::TryInto; + + let trace_id = TraceId::from_bytes($idbytes[..16].try_into().unwrap()); + let span_id = SpanId::from_bytes(rand::random::<[u8; 8]>()); + + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&set_remote_context(trace_id, span_id)) + }); + + let current_span = Span::current(); + current_span.set_parent(parent_context); + + tracing::info!( + trace_id = %trace_id, + $msg $(, $key = $value)* + ); + }}; +} + +#[macro_export] +macro_rules! error_with_id { + ($idbytes:expr, $msg:expr $(, $key:ident = $value:expr)*) => {{ + use $crate::tracing::macros::prelude::*; + use std::convert::TryInto; + + let trace_id = TraceId::from_bytes($idbytes[..16].try_into().unwrap()); + let span_id = SpanId::from_bytes(rand::random::<[u8; 8]>()); + + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&set_remote_context(trace_id, span_id)) + }); + + let current_span = Span::current(); + current_span.set_parent(parent_context); + + tracing::error!( + trace_id = %trace_id, + $msg $(, $key = $value)* + ); + }}; +} diff --git a/nomos-tracing/src/tracing/mod.rs b/nomos-tracing/src/tracing/mod.rs index 95eb89aa..e53f5fe3 100644 --- a/nomos-tracing/src/tracing/mod.rs +++ b/nomos-tracing/src/tracing/mod.rs @@ -1 +1,2 @@ +pub mod macros; pub mod otlp; diff --git a/nomos-tracing/src/tracing/otlp.rs b/nomos-tracing/src/tracing/otlp.rs index 851aa07c..420783ab 100644 --- a/nomos-tracing/src/tracing/otlp.rs +++ b/nomos-tracing/src/tracing/otlp.rs @@ -1,9 +1,14 @@ // std use std::error::Error; // crates -use opentelemetry::{global, trace::TracerProvider as _}; +use opentelemetry::{global, trace::TracerProvider as _, KeyValue}; use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk::trace::{BatchConfig, Sampler, Tracer}; +use opentelemetry_sdk::{ + propagation::TraceContextPropagator, + trace::{BatchConfig, Sampler, Tracer}, + Resource, +}; +use opentelemetry_semantic_conventions::resource::SERVICE_NAME; use serde::{Deserialize, Serialize}; use tracing::Subscriber; use tracing_opentelemetry::OpenTelemetryLayer; @@ -15,6 +20,7 @@ use url::Url; pub struct OtlpTracingConfig { pub endpoint: Url, pub sample_ratio: f64, + pub service_name: String, } pub fn create_otlp_tracing_layer( @@ -26,15 +32,21 @@ where let otel_exporter = opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(config.endpoint); + let resource = Resource::new(vec![KeyValue::new(SERVICE_NAME, config.service_name)]); let tracer_provider = opentelemetry_otlp::new_pipeline() .tracing() - .with_trace_config(opentelemetry_sdk::trace::Config::default().with_sampler( - Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(config.sample_ratio))), - )) + .with_trace_config( + opentelemetry_sdk::trace::Config::default() + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + config.sample_ratio, + )))) + .with_resource(resource), + ) .with_batch_config(BatchConfig::default()) .with_exporter(otel_exporter) .install_batch(opentelemetry_sdk::runtime::Tokio)?; + global::set_text_map_propagator(TraceContextPropagator::new()); global::set_tracer_provider(tracer_provider.clone()); let tracer: opentelemetry_sdk::trace::Tracer = tracer_provider.tracer("NomosTracer"); diff --git a/testnet/cfgsync/src/config.rs b/testnet/cfgsync/src/config.rs index 4268ce23..83efbe64 100644 --- a/testnet/cfgsync/src/config.rs +++ b/testnet/cfgsync/src/config.rs @@ -200,6 +200,7 @@ fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> Gene tracing: nomos_tracing_service::TracingLayer::Otlp(OtlpTracingConfig { endpoint: params.tempo_endpoint, sample_ratio: 1.0, + service_name: identifier.clone(), }), filter: FilterLayer::None, metrics: MetricsLayer::Otlp(OtlpMetricsConfig { diff --git a/testnet/monitoring/grafana/datasources.yaml b/testnet/monitoring/grafana/datasources.yaml index 37b2fc7f..945f1489 100644 --- a/testnet/monitoring/grafana/datasources.yaml +++ b/testnet/monitoring/grafana/datasources.yaml @@ -18,6 +18,7 @@ datasources: is_default: false version: 1 editable: true + uid: tempods - name: Loki type: loki @@ -27,4 +28,10 @@ datasources: is_default: false version: 1 editable: true + jsonData: + derivedFields: + - name: trace_id + matcherRegex: "\"trace_id\":\"(\\w+)\"" + url: "$${__value.raw}" + datasourceUid: tempods diff --git a/testnet/monitoring/tempo.yaml b/testnet/monitoring/tempo.yaml index 52667e2c..6e7594ec 100644 --- a/testnet/monitoring/tempo.yaml +++ b/testnet/monitoring/tempo.yaml @@ -3,16 +3,6 @@ server: http_listen_port: 3200 log_level: info - -cache: - background: - writeback_goroutines: 5 - caches: - - roles: - - frontend-search - memcached: - host: memcached:11211 - query_frontend: search: duration_slo: 5s diff --git a/tests/src/tests/da.rs b/tests/src/tests/da.rs index 3e7c103b..4d0da864 100644 --- a/tests/src/tests/da.rs +++ b/tests/src/tests/da.rs @@ -12,7 +12,11 @@ use tests::topology::TopologyConfig; const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"; -async fn disseminate(executor: &Executor, data: &[u8]) { +async fn disseminate_with_metadata( + executor: &Executor, + data: &[u8], + metadata: kzgrs_backend::dispersal::Metadata, +) { let executor_config = executor.config(); let client = ClientBuilder::new() @@ -23,8 +27,6 @@ async fn disseminate(executor: &Executor, data: &[u8]) { let exec_url = Url::parse(&format!("http://{}", backend_address)).unwrap(); let client = ExecutorHttpClient::new(client, exec_url); - let app_id = hex::decode(APP_ID).unwrap(); - let metadata = kzgrs_backend::dispersal::Metadata::new(app_id.try_into().unwrap(), 0u64.into()); client.publish_blob(data.to_vec(), metadata).await.unwrap(); } @@ -34,15 +36,18 @@ async fn disseminate_and_retrieve() { let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await; let executor = &topology.executors()[0]; let validator = &topology.validators()[0]; + let data = [1u8; 31]; + let app_id = hex::decode(APP_ID).unwrap(); + let metadata = + kzgrs_backend::dispersal::Metadata::new(app_id.clone().try_into().unwrap(), 0u64.into()); tokio::time::sleep(Duration::from_secs(15)).await; - disseminate(executor, &data).await; + disseminate_with_metadata(executor, &data, metadata).await; tokio::time::sleep(Duration::from_secs(20)).await; let from = 0u64.to_be_bytes(); let to = 1u64.to_be_bytes(); - let app_id = hex::decode(APP_ID).unwrap(); let executor_blobs = executor .get_indexer_range(app_id.clone().try_into().unwrap(), from..to) @@ -80,15 +85,18 @@ async fn disseminate_and_retrieve() { async fn disseminate_retrieve_reconstruct() { let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await; let executor = &topology.executors()[0]; + let data = [1u8; 31]; + let app_id = hex::decode(APP_ID).unwrap(); + let metadata = + kzgrs_backend::dispersal::Metadata::new(app_id.clone().try_into().unwrap(), 0u64.into()); tokio::time::sleep(Duration::from_secs(15)).await; - disseminate(executor, &data).await; + disseminate_with_metadata(executor, &data, metadata).await; tokio::time::sleep(Duration::from_secs(20)).await; let from = 0u64.to_be_bytes(); let to = 1u64.to_be_bytes(); - let app_id = hex::decode(APP_ID).unwrap(); let executor_blobs = executor .get_indexer_range(app_id.clone().try_into().unwrap(), from..to) @@ -106,3 +114,35 @@ async fn disseminate_retrieve_reconstruct() { let reconstructed = reconstruct_without_missing_data(&blobs); assert_eq!(reconstructed, data); } + +#[ignore = "for local debugging"] +#[tokio::test] +async fn local_testnet() { + let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await; + let executor = &topology.executors()[0]; + let app_id = hex::decode(APP_ID).expect("Invalid APP_ID"); + + let mut index = 0u64; + loop { + disseminate_with_metadata( + executor, + &generate_data(index), + create_metadata(&app_id, index), + ) + .await; + + index += 1; + tokio::time::sleep(Duration::from_secs(10)).await; + } +} + +fn generate_data(index: u64) -> Vec { + (index as u8..index as u8 + 31).collect() +} + +fn create_metadata(app_id: &[u8], index: u64) -> kzgrs_backend::dispersal::Metadata { + kzgrs_backend::dispersal::Metadata::new( + app_id.try_into().expect("Failed to convert APP_ID"), + index.into(), + ) +} diff --git a/tests/src/topology/configs/tracing.rs b/tests/src/topology/configs/tracing.rs index 7c67f3f7..1b47f0e0 100644 --- a/tests/src/topology/configs/tracing.rs +++ b/tests/src/topology/configs/tracing.rs @@ -23,7 +23,8 @@ impl GeneralTracingConfig { }), tracing: TracingLayer::Otlp(OtlpTracingConfig { endpoint: "http://localhost:4317".try_into().unwrap(), - sample_ratio: 0.1, + sample_ratio: 0.5, + service_name: host_identifier.clone(), }), filter: FilterLayer::EnvFilter(nomos_tracing::filter::envfilter::EnvFilterConfig { // Allow events only from modules that matches the regex, if it matches - use