Bind traces to log entries

This commit is contained in:
Gusto 2024-12-20 12:53:04 +02:00
parent 598509a637
commit fc507e010c
24 changed files with 247 additions and 137 deletions

View File

@ -48,17 +48,6 @@ services:
- "chown"
- "10001:10001"
- "/var/tempo"
volumes:
- tempo-data:/var/tempo
memcached:
image: memcached:1.6.29
container_name: memcached
ports:
- "11211:11211"
environment:
- MEMCACHED_MAX_MEMORY=64m
- MEMCACHED_THREADS=4
tempo:
image: *tempoImage
@ -66,12 +55,7 @@ services:
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./testnet/monitoring/tempo.yaml:/etc/tempo.yaml:z
- tempo-data:/var/tempo
ports:
- "4317:4317" # otlp grpc
depends_on:
- tempo-init
- memcached
volumes:
tempo-data:

View File

@ -53,13 +53,6 @@ fn main() -> Result<()> {
cryptarchia_args,
)?;
#[cfg(debug_assertions)]
let debug_span = {
let debug_id = Uuid::new_v4();
span!(Level::DEBUG, "Nomos", debug_id = debug_id.to_string())
};
#[cfg(debug_assertions)]
let _guard = debug_span.enter();
let app = OverwatchRunner::<NomosExecutor>::run(
NomosExecutorServiceSettings {
network: config.network,

View File

@ -53,13 +53,6 @@ fn main() -> Result<()> {
cryptarchia_args,
)?;
#[cfg(debug_assertions)]
let debug_span = {
let debug_id = Uuid::new_v4();
span!(Level::DEBUG, "Nomos", debug_id = debug_id.to_string())
};
#[cfg(debug_assertions)]
let _guard = debug_span.enter();
let app = OverwatchRunner::<Nomos>::run(
NomosServiceSettings {
network: config.network,

View File

@ -2,7 +2,10 @@ use crate::adapters::{mempool::DaMempoolAdapter, network::DispersalNetworkAdapte
use std::time::Duration;
use nomos_core::da::{blob::metadata, DaDispersal, DaEncoder};
use nomos_tracing::info_with_id;
use overwatch_rs::DynError;
use std::fmt::Debug;
use tracing::instrument;
pub mod kzgrs;
@ -13,8 +16,8 @@ pub trait DispersalBackend {
type Dispersal: DaDispersal<EncodedData = <Self::Encoder as DaEncoder>::EncodedData>;
type NetworkAdapter: DispersalNetworkAdapter;
type MempoolAdapter: DaMempoolAdapter;
type Metadata: metadata::Metadata + Send;
type BlobId: Send;
type Metadata: Debug + metadata::Metadata + Send;
type BlobId: AsRef<[u8]> + Send;
fn init(
config: Self::Settings,
@ -36,12 +39,14 @@ pub trait DispersalBackend {
metadata: Self::Metadata,
) -> Result<(), DynError>;
#[instrument(skip_all)]
async fn process_dispersal(
&self,
data: Vec<u8>,
metadata: Self::Metadata,
) -> Result<(), DynError> {
let (blob_id, encoded_data) = self.encode(data).await?;
info_with_id!(blob_id.as_ref(), "ProcessDispersal");
self.disperse(encoded_data).await?;
// let disperse and replication happen before pushing to mempool
tokio::time::sleep(Duration::from_secs(1)).await;

View File

@ -4,7 +4,7 @@ use std::marker::PhantomData;
// crates
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tracing::{log::error, span, Level};
use tracing::{error, instrument, span, Instrument, Level};
// internal
use crate::adapters::mempool::DaMempoolAdapter;
use crate::adapters::network::DispersalNetworkAdapter;
@ -121,9 +121,6 @@ where
..
} = self;
let trace_span = span!(Level::INFO, "service", service = DA_DISPERSAL_TAG);
let _trace_guard = trace_span.enter();
let DispersalServiceSettings {
backend: backend_settings,
} = service_state.settings_reader.get_updated_settings();
@ -133,21 +130,25 @@ where
let mempool_adapter = MempoolAdapter::new(mempool_relay);
let backend = Backend::init(backend_settings, network_adapter, mempool_adapter);
let mut inbound_relay = service_state.inbound_relay;
while let Some(dispersal_msg) = inbound_relay.recv().await {
match dispersal_msg {
DaDispersalMsg::Disperse {
data,
metadata,
reply_channel,
} => {
if let Err(Err(e)) =
reply_channel.send(backend.process_dispersal(data, metadata).await)
{
error!("Error forwarding dispersal response: {e}");
async {
while let Some(dispersal_msg) = inbound_relay.recv().await {
match dispersal_msg {
DaDispersalMsg::Disperse {
data,
metadata,
reply_channel,
} => {
if let Err(Err(e)) =
reply_channel.send(backend.process_dispersal(data, metadata).await)
{
error!("Error forwarding dispersal response: {e}");
}
}
}
}
}
.await;
Ok(())
}
}

View File

@ -14,6 +14,7 @@ nomos-da-storage = { path = "../../../nomos-da/storage" }
nomos-da-sampling = { path = "../sampling" }
nomos-storage = { path = "../../../nomos-services/storage" }
nomos-mempool = { path = "../../../nomos-services/mempool" }
nomos-tracing = { path = "../../../nomos-tracing" }
cryptarchia-consensus = { path = "../../../nomos-services/cryptarchia-consensus" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }

View File

@ -17,6 +17,7 @@ use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter};
use nomos_storage::backends::StorageBackend;
use nomos_storage::StorageService;
use nomos_tracing::info_with_id;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::{Relay, RelayMessage};
@ -28,7 +29,7 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use storage::DaStorageAdapter;
use tokio::sync::oneshot::Sender;
use tracing::{error, span, Level};
use tracing::{error, instrument, span, Level};
const DA_INDEXER_TAG: ServiceId = "DA-Indexer";
@ -269,7 +270,8 @@ where
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
ClPool::Item: Clone + Eq + Hash + Debug + 'static,
ClPool::Key: Debug + 'static,
DaPool::Item: Metadata + Clone + Eq + Hash + Debug + 'static,
DaPool::Item: DispersedBlobInfo + Metadata + Clone + Eq + Hash + Debug + 'static,
<DaPool::Item as DispersedBlobInfo>::BlobId: AsRef<[u8]>,
DaPool::Key: Debug + 'static,
<DaPool::Item as Metadata>::Index: Send + Sync,
A::Backend: 'static,
@ -285,22 +287,28 @@ where
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
{
#[instrument(skip_all)]
async fn handle_new_block(
storage_adapter: &DaStorage,
block: Block<ClPool::Item, DaPool::Item>,
) -> Result<(), DynError> {
for info in block.blobs() {
info_with_id!(info.blob_id().as_ref(), "HandleNewBlock");
storage_adapter.add_index(info).await?;
}
Ok(())
}
#[instrument(skip_all)]
async fn handle_da_msg(
storage_adapter: &DaStorage,
msg: DaMsg<B, DaPool::Item>,
) -> Result<(), DynError> {
match msg {
DaMsg::AddIndex { info } => storage_adapter.add_index(&info).await,
DaMsg::AddIndex { info } => {
info_with_id!(info.blob_id().as_ref(), "AddIndex");
storage_adapter.add_index(&info).await
}
DaMsg::GetRange {
app_id,
range,
@ -403,7 +411,7 @@ where
+ 'static,
<DaPool::Item as Metadata>::AppId: Send + Sync,
<DaPool::Item as Metadata>::Index: Send + Sync,
<<DaPool as MemPool>::Item as DispersedBlobInfo>::BlobId: AsRef<[u8]>,
A::Backend: 'static,
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobSelect<BlobId = DaPool::Item>,
@ -437,9 +445,6 @@ where
storage_relay,
} = self;
let trace_span = span!(Level::INFO, "service", service = DA_INDEXER_TAG);
let _trace_guard = trace_span.enter();
let consensus_relay = consensus_relay
.connect()
.await
@ -458,25 +463,29 @@ where
.await;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(block) = consensus_blocks.next() => {
if let Err(e) = Self::handle_new_block(&storage_adapter, block).await {
tracing::debug!("Failed to add a new received block: {e:?}");
async {
loop {
tokio::select! {
Some(block) = consensus_blocks.next() => {
if let Err(e) = Self::handle_new_block(&storage_adapter, block).await {
tracing::debug!("Failed to add a new received block: {e:?}");
}
}
}
Some(msg) = service_state.inbound_relay.recv() => {
if let Err(e) = Self::handle_da_msg(&storage_adapter, msg).await {
tracing::debug!("Failed to handle da msg: {e:?}");
Some(msg) = service_state.inbound_relay.recv() => {
if let Err(e) = Self::handle_da_msg(&storage_adapter, msg).await {
tracing::debug!("Failed to handle da msg: {e:?}");
}
}
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
}
.await;
Ok(())
}
}

View File

@ -11,6 +11,7 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd"
nomos-core = { path = "../../../nomos-core/chain-defs" }
nomos-libp2p = { path = "../../../nomos-libp2p" }
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
nomos-tracing = { path = "../../../nomos-tracing" }
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
libp2p = { version = "0.53", features = ["ed25519"] }
serde = { version = "1.0", features = ["derive"] }

View File

@ -16,6 +16,7 @@ use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalE
use nomos_da_network_core::swarm::executor::ExecutorSwarm;
use nomos_da_network_core::SubnetworkId;
use nomos_libp2p::ed25519;
use nomos_tracing::info_with_id;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::services::state::NoState;
use serde::{Deserialize, Serialize};
@ -29,6 +30,7 @@ use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream};
use tracing::instrument;
/// Message that the backend replies to
#[derive(Debug)]
@ -217,18 +219,21 @@ where
executor_handle.abort();
}
#[instrument(skip_all)]
async fn process(&self, msg: Self::Message) {
match msg {
ExecutorDaNetworkMessage::RequestSample {
subnetwork_id,
blob_id,
} => {
info_with_id!(&blob_id, "RequestSample");
handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await;
}
ExecutorDaNetworkMessage::RequestDispersal {
subnetwork_id,
da_blob,
} => {
info_with_id!(&da_blob.id(), "RequestDispersal");
if let Err(e) = self.dispersal_blobs_sender.send((subnetwork_id, *da_blob)) {
error!("Could not send internal blob to underlying dispersal behaviour: {e}");
}

View File

@ -11,6 +11,7 @@ use nomos_core::da::BlobId;
use nomos_da_network_core::swarm::validator::ValidatorSwarm;
use nomos_da_network_core::SubnetworkId;
use nomos_libp2p::ed25519;
use nomos_tracing::info_with_id;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::services::state::NoState;
use std::fmt::Debug;
@ -21,6 +22,7 @@ use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream;
use tracing::instrument;
/// Message that the backend replies to
#[derive(Debug)]
@ -148,12 +150,14 @@ where
replies_handle.abort();
}
#[instrument(skip_all)]
async fn process(&self, msg: Self::Message) {
match msg {
DaNetworkMessage::RequestSample {
subnetwork_id,
blob_id,
} => {
info_with_id!(&blob_id, "RequestSample");
handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await;
}
}

View File

@ -16,7 +16,7 @@ use overwatch_rs::services::{
};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tracing::{error, span, Level};
use tracing::{error, instrument, span, Instrument, Level};
// internal
const DA_NETWORK_TAG: ServiceId = "DA-Network";
@ -97,23 +97,24 @@ where
mut backend,
} = self;
let trace_span = span!(Level::INFO, "service", service = DA_NETWORK_TAG);
let _trace_guard = trace_span.enter();
let mut lifecycle_stream = lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = inbound_relay.recv() => {
Self::handle_network_service_message(msg, &mut backend).await;
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
backend.shutdown();
break;
async {
loop {
tokio::select! {
Some(msg) = inbound_relay.recv() => {
Self::handle_network_service_message(msg, &mut backend).await;
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
backend.shutdown();
break;
}
}
}
}
}
.await;
Ok(())
}
}

View File

@ -15,6 +15,7 @@ nomos-da-network-core = { path = "../../../nomos-da/network/core" }
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
nomos-da-storage = { path = "../../../nomos-da/storage" }
nomos-storage = { path = "../../../nomos-services/storage" }
nomos-tracing = { path = "../../../nomos-tracing" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
serde = { version = "1.0", features = ["derive"] }

View File

@ -9,8 +9,10 @@ use rand::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::time;
use tokio::time::Interval;
use tracing::instrument;
use kzgrs_backend::common::blob::DaBlob;
use nomos_tracing::info_with_id;
//
// internal
@ -71,8 +73,10 @@ impl<R: Rng + Sync + Send> DaSamplingServiceBackend<R> for KzgrsSamplingBackend<
self.validated_blobs.clone()
}
#[instrument(skip_all)]
async fn mark_completed(&mut self, blobs_ids: &[Self::BlobId]) {
for id in blobs_ids {
info_with_id!(id, "MarkInBlock");
self.pending_sampling_blobs.remove(id);
self.validated_blobs.remove(id);
}

View File

@ -22,9 +22,10 @@ use rand::{RngCore, SeedableRng};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio_stream::StreamExt;
use tracing::{error, span, Instrument, Level};
use tracing::{error, instrument, span, Instrument, Level};
// internal
use backend::{DaSamplingServiceBackend, SamplingState};
use nomos_tracing::{error_with_id, info_with_id};
use storage::DaStorageAdapter;
const DA_SAMPLING_TAG: ServiceId = "DA-Sampling";
@ -93,6 +94,7 @@ where
}
}
#[instrument(skip_all)]
async fn handle_service_message(
msg: <Self as ServiceData>::Message,
network_adapter: &mut DaNetwork,
@ -102,13 +104,14 @@ where
DaSamplingServiceMsg::TriggerSampling { blob_id } => {
if let SamplingState::Init(sampling_subnets) = sampler.init_sampling(blob_id).await
{
info_with_id!(blob_id, "TriggerSampling");
if let Err(e) = network_adapter
.start_sampling(blob_id, &sampling_subnets)
.await
{
// we can short circuit the failure from beginning
sampler.handle_sampling_error(blob_id).await;
error!("Error sampling for BlobId: {blob_id:?}: {e}");
error_with_id!(blob_id, "Error sampling for BlobId: {blob_id:?}: {e}");
}
}
}
@ -124,6 +127,7 @@ where
}
}
#[instrument(skip_all)]
async fn handle_sampling_message(
event: SamplingEvent,
sampler: &mut Backend,
@ -131,10 +135,12 @@ where
) {
match event {
SamplingEvent::SamplingSuccess { blob_id, blob } => {
info_with_id!(blob_id, "SamplingSuccess");
sampler.handle_sampling_success(blob_id, *blob).await;
}
SamplingEvent::SamplingError { error } => {
if let Some(blob_id) = error.blob_id() {
info_with_id!(blob_id, "SamplingError");
sampler.handle_sampling_error(*blob_id).await;
return;
}
@ -145,6 +151,7 @@ where
column_idx,
response_sender,
} => {
info_with_id!(blob_id, "SamplingRequest");
let maybe_blob = storage_adapter
.get_blob(blob_id, column_idx)
.await
@ -226,9 +233,6 @@ where
mut sampler,
} = self;
let trace_span = span!(Level::INFO, "service", service = DA_SAMPLING_TAG);
let _trace_guard = trace_span.enter();
let DaSamplingServiceSettings {
storage_adapter_settings,
..
@ -266,7 +270,6 @@ where
}
}
}
.instrument(span!(Level::TRACE, DA_SAMPLING_TAG))
.await;
Ok(())

View File

@ -16,13 +16,14 @@ nomos-da-storage = { path = "../../../nomos-da/storage" }
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
nomos-storage = { path = "../../../nomos-services/storage" }
nomos-tracing = { path = "../../../nomos-tracing" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
serde = { version = "1.0", features = ["derive"] }
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
tokio = { version = "1", features = ["sync", "macros"] }
tokio-stream = "0.1.15"
tracing = "0.1"
tracing = { version = "0.1", features = ["attributes"] }
[features]
rocksdb-backend = ["nomos-storage/rocksdb-backend"]

View File

@ -18,10 +18,12 @@ use overwatch_rs::DynError;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot::Sender;
use tokio_stream::StreamExt;
use tracing::instrument;
use tracing::{error, span, Instrument, Level};
// internal
use backend::VerifierBackend;
use network::NetworkAdapter;
use nomos_tracing::info_with_id;
use storage::DaStorageAdapter;
const DA_VERIFIER_TAG: ServiceId = "DA-Verifier";
@ -66,10 +68,12 @@ where
Backend::DaBlob: Debug + Send,
Backend::Error: Error + Send + Sync,
Backend::Settings: Clone,
<Backend::DaBlob as Blob>::BlobId: AsRef<[u8]>,
N: NetworkAdapter<Blob = Backend::DaBlob> + Send + 'static,
N::Settings: Clone,
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + 'static,
{
#[instrument(skip_all)]
async fn handle_new_blob(
verifier: &Backend,
storage_adapter: &S,
@ -80,8 +84,10 @@ where
.await?
.is_some()
{
info_with_id!(blob.id().as_ref(), "VerifierBlobExists");
Ok(())
} else {
info_with_id!(blob.id().as_ref(), "VerifierAddBlob");
verifier.verify(blob)?;
storage_adapter.add_blob(blob, &()).await?;
Ok(())
@ -128,6 +134,7 @@ where
Backend::Settings: Clone + Send + Sync + 'static,
Backend::DaBlob: Debug + Send + Sync + 'static,
Backend::Error: Error + Send + Sync + 'static,
<Backend::DaBlob as Blob>::BlobId: AsRef<[u8]>,
N: NetworkAdapter<Blob = Backend::DaBlob> + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + Sync + 'static,
@ -159,9 +166,6 @@ where
verifier,
} = self;
let trace_span = span!(Level::INFO, "service", service = DA_VERIFIER_TAG);
let _trace_guard = trace_span.enter();
let DaVerifierServiceSettings {
network_adapter_settings,
storage_adapter_settings,
@ -187,8 +191,10 @@ where
Some(msg) = service_state.inbound_relay.recv() => {
let DaVerifierMsg::AddBlob { blob, reply_channel } = msg;
match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await {
Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) {
error!("Error replying attestation {err:?}");
Ok(attestation) => {
if let Err(err) = reply_channel.send(Some(attestation)) {
error!("Error replying attestation {err:?}");
}
},
Err(err) => {
error!("Error handling blob {blob:?} due to {err:?}");
@ -205,7 +211,7 @@ where
}
}
}
}.instrument(span!(Level::TRACE, DA_VERIFIER_TAG)).await;
}.await;
Ok(())
}

View File

@ -9,6 +9,7 @@ opentelemetry-otlp = { version = "0.26", features = ["grpc-tonic", "http-proto",
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] }
opentelemetry-http = { version = "0.26", features = ["reqwest"] }
opentelemetry-semantic-conventions = "0.26"
rand = "0.8"
reqwest = "0.12"
serde = { version = "1.0", features = ["derive"] }
tokio = "1"

View File

@ -1,30 +1,75 @@
#[macro_export]
macro_rules! trace_with_parent {
($idbytes:expr, $msg:expr $(, $key:ident = $value:expr)*) => {{
use opentelemetry::trace::{SpanContext, SpanId, TraceId, TraceState};
use opentelemetry::Context;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub mod prelude {
use std::collections::HashMap;
// Derive trace_id from bytes.
let trace_id = TraceId::from_bytes($idbytes);
pub use opentelemetry::{
global,
trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
Context,
};
pub use rand;
pub use tracing::Span;
pub use tracing_opentelemetry::OpenTelemetrySpanExt;
// Create a SpanContext with the derived trace_id.
let span_id = SpanId::new();
let span_context = SpanContext::new(
trace_id,
span_id,
0, // Smapled
false, // Not a remote span
TraceState::default(),
// In some places it makes sense to use third party ids such as blob_id or tx_id as a tracing
// id. This allows to track the time during which the entity is propagated throughout the
// system.
//
// Opentelemetry tracing standard has a specific remote context format which is supported by
// most tracing software.
// More information at https://www.w3.org/TR/trace-context/#traceparent-header
pub fn set_remote_context(trace_id: TraceId, span_id: SpanId) -> HashMap<String, String> {
let mut carrier = HashMap::new();
carrier.insert(
"traceparent".to_string(),
format!("00-{trace_id}-{span_id}-01"),
);
let otel_context = Context::current_with_span_context(span_context);
carrier
}
}
#[macro_export]
macro_rules! info_with_id {
($idbytes:expr, $msg:expr $(, $key:ident = $value:expr)*) => {{
use $crate::tracing::macros::prelude::*;
use std::convert::TryInto;
let trace_id = TraceId::from_bytes($idbytes[..16].try_into().unwrap());
let span_id = SpanId::from_bytes(rand::random::<[u8; 8]>());
let parent_context = global::get_text_map_propagator(|propagator| {
propagator.extract(&set_remote_context(trace_id, span_id))
});
let current_span = Span::current();
current_span.set_parent(otel_context);
current_span.set_parent(parent_context);
// Call tracing with the updated parent span id.
tracing::info!($msg $(, $key = $value)*);
tracing::info!(
trace_id = %trace_id,
$msg $(, $key = $value)*
);
}};
}
#[macro_export]
macro_rules! error_with_id {
($idbytes:expr, $msg:expr $(, $key:ident = $value:expr)*) => {{
use $crate::tracing::macros::prelude::*;
use std::convert::TryInto;
let trace_id = TraceId::from_bytes($idbytes[..16].try_into().unwrap());
let span_id = SpanId::from_bytes(rand::random::<[u8; 8]>());
let parent_context = global::get_text_map_propagator(|propagator| {
propagator.extract(&set_remote_context(trace_id, span_id))
});
let current_span = Span::current();
current_span.set_parent(parent_context);
tracing::error!(
trace_id = %trace_id,
$msg $(, $key = $value)*
);
}};
}

View File

@ -1,9 +1,14 @@
// std
use std::error::Error;
// crates
use opentelemetry::{global, trace::TracerProvider as _};
use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::trace::{BatchConfig, Sampler, Tracer};
use opentelemetry_sdk::{
propagation::TraceContextPropagator,
trace::{BatchConfig, Sampler, Tracer},
Resource,
};
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use serde::{Deserialize, Serialize};
use tracing::Subscriber;
use tracing_opentelemetry::OpenTelemetryLayer;
@ -15,6 +20,7 @@ use url::Url;
pub struct OtlpTracingConfig {
pub endpoint: Url,
pub sample_ratio: f64,
pub service_name: String,
}
pub fn create_otlp_tracing_layer<S>(
@ -26,15 +32,21 @@ where
let otel_exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(config.endpoint);
let resource = Resource::new(vec![KeyValue::new(SERVICE_NAME, config.service_name)]);
let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_sampler(
Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(config.sample_ratio))),
))
.with_trace_config(
opentelemetry_sdk::trace::Config::default()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
config.sample_ratio,
))))
.with_resource(resource),
)
.with_batch_config(BatchConfig::default())
.with_exporter(otel_exporter)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
global::set_text_map_propagator(TraceContextPropagator::new());
global::set_tracer_provider(tracer_provider.clone());
let tracer: opentelemetry_sdk::trace::Tracer = tracer_provider.tracer("NomosTracer");

View File

@ -200,6 +200,7 @@ fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> Gene
tracing: nomos_tracing_service::TracingLayer::Otlp(OtlpTracingConfig {
endpoint: params.tempo_endpoint,
sample_ratio: 1.0,
service_name: identifier.clone(),
}),
filter: FilterLayer::None,
metrics: MetricsLayer::Otlp(OtlpMetricsConfig {

View File

@ -18,6 +18,7 @@ datasources:
is_default: false
version: 1
editable: true
uid: tempods
- name: Loki
type: loki
@ -27,4 +28,10 @@ datasources:
is_default: false
version: 1
editable: true
jsonData:
derivedFields:
- name: trace_id
matcherRegex: "\"trace_id\":\"(\\w+)\""
url: "$${__value.raw}"
datasourceUid: tempods

View File

@ -3,16 +3,6 @@ server:
http_listen_port: 3200
log_level: info
cache:
background:
writeback_goroutines: 5
caches:
- roles:
- frontend-search
memcached:
host: memcached:11211
query_frontend:
search:
duration_slo: 5s

View File

@ -12,7 +12,11 @@ use tests::topology::TopologyConfig;
const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715";
async fn disseminate(executor: &Executor, data: &[u8]) {
async fn disseminate_with_metadata(
executor: &Executor,
data: &[u8],
metadata: kzgrs_backend::dispersal::Metadata,
) {
let executor_config = executor.config();
let client = ClientBuilder::new()
@ -23,8 +27,6 @@ async fn disseminate(executor: &Executor, data: &[u8]) {
let exec_url = Url::parse(&format!("http://{}", backend_address)).unwrap();
let client = ExecutorHttpClient::new(client, exec_url);
let app_id = hex::decode(APP_ID).unwrap();
let metadata = kzgrs_backend::dispersal::Metadata::new(app_id.try_into().unwrap(), 0u64.into());
client.publish_blob(data.to_vec(), metadata).await.unwrap();
}
@ -34,15 +36,18 @@ async fn disseminate_and_retrieve() {
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;
let executor = &topology.executors()[0];
let validator = &topology.validators()[0];
let data = [1u8; 31];
let app_id = hex::decode(APP_ID).unwrap();
let metadata =
kzgrs_backend::dispersal::Metadata::new(app_id.clone().try_into().unwrap(), 0u64.into());
tokio::time::sleep(Duration::from_secs(15)).await;
disseminate(executor, &data).await;
disseminate_with_metadata(executor, &data, metadata).await;
tokio::time::sleep(Duration::from_secs(20)).await;
let from = 0u64.to_be_bytes();
let to = 1u64.to_be_bytes();
let app_id = hex::decode(APP_ID).unwrap();
let executor_blobs = executor
.get_indexer_range(app_id.clone().try_into().unwrap(), from..to)
@ -80,15 +85,18 @@ async fn disseminate_and_retrieve() {
async fn disseminate_retrieve_reconstruct() {
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;
let executor = &topology.executors()[0];
let data = [1u8; 31];
let app_id = hex::decode(APP_ID).unwrap();
let metadata =
kzgrs_backend::dispersal::Metadata::new(app_id.clone().try_into().unwrap(), 0u64.into());
tokio::time::sleep(Duration::from_secs(15)).await;
disseminate(executor, &data).await;
disseminate_with_metadata(executor, &data, metadata).await;
tokio::time::sleep(Duration::from_secs(20)).await;
let from = 0u64.to_be_bytes();
let to = 1u64.to_be_bytes();
let app_id = hex::decode(APP_ID).unwrap();
let executor_blobs = executor
.get_indexer_range(app_id.clone().try_into().unwrap(), from..to)
@ -106,3 +114,36 @@ async fn disseminate_retrieve_reconstruct() {
let reconstructed = reconstruct_without_missing_data(&blobs);
assert_eq!(reconstructed, data);
}
#[ignore = "for local debugging"]
#[tokio::test]
async fn local_testnet() {
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;
let executor = &topology.executors()[0];
let app_id = hex::decode(APP_ID).expect("Invalid APP_ID");
let mut index = 0u64;
loop {
disseminate_with_metadata(
executor,
&generate_data(index),
create_metadata(&app_id, index),
)
.await;
index += 1;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
fn generate_data(index: u64) -> Vec<u8> {
(index as u8..index as u8 + 31).collect()
}
fn create_metadata(app_id: &[u8], index: u64) -> kzgrs_backend::dispersal::Metadata {
kzgrs_backend::dispersal::Metadata::new(
app_id.try_into().expect("Failed to convert APP_ID"),
index.into(),
)
}

View File

@ -23,7 +23,8 @@ impl GeneralTracingConfig {
}),
tracing: TracingLayer::Otlp(OtlpTracingConfig {
endpoint: "http://localhost:4317".try_into().unwrap(),
sample_ratio: 0.1,
sample_ratio: 0.5,
service_name: host_identifier.clone(),
}),
filter: FilterLayer::EnvFilter(nomos_tracing::filter::envfilter::EnvFilterConfig {
// Allow events only from modules that matches the regex, if it matches - use