feature(tracing): Trace blob by id in network (#968)
* Set parent trace id from blob id * Bind traces to log entries * Same config for testnet and local dev environment
This commit is contained in:
parent
9001b44079
commit
f83e725b0d
|
@ -48,17 +48,6 @@ services:
|
|||
- "chown"
|
||||
- "10001:10001"
|
||||
- "/var/tempo"
|
||||
volumes:
|
||||
- tempo-data:/var/tempo
|
||||
|
||||
memcached:
|
||||
image: memcached:1.6.29
|
||||
container_name: memcached
|
||||
ports:
|
||||
- "11211:11211"
|
||||
environment:
|
||||
- MEMCACHED_MAX_MEMORY=64m
|
||||
- MEMCACHED_THREADS=4
|
||||
|
||||
tempo:
|
||||
image: *tempoImage
|
||||
|
@ -66,12 +55,7 @@ services:
|
|||
command: [ "-config.file=/etc/tempo.yaml" ]
|
||||
volumes:
|
||||
- ./testnet/monitoring/tempo.yaml:/etc/tempo.yaml:z
|
||||
- tempo-data:/var/tempo
|
||||
ports:
|
||||
- "4317:4317" # otlp grpc
|
||||
depends_on:
|
||||
- tempo-init
|
||||
- memcached
|
||||
|
||||
volumes:
|
||||
tempo-data:
|
||||
|
|
|
@ -122,17 +122,6 @@ services:
|
|||
- "chown"
|
||||
- "10001:10001"
|
||||
- "/var/tempo"
|
||||
volumes:
|
||||
- tempo-data:/var/tempo
|
||||
|
||||
memcached:
|
||||
image: memcached:1.6.29
|
||||
container_name: memcached
|
||||
ports:
|
||||
- "11211:11211"
|
||||
environment:
|
||||
- MEMCACHED_MAX_MEMORY=64m
|
||||
- MEMCACHED_THREADS=4
|
||||
|
||||
tempo:
|
||||
image: *tempoImage
|
||||
|
@ -140,12 +129,7 @@ services:
|
|||
command: [ "-config.file=/etc/tempo.yaml" ]
|
||||
volumes:
|
||||
- ./testnet/monitoring/tempo.yaml:/etc/tempo.yaml:z
|
||||
- tempo-data:/var/tempo
|
||||
ports:
|
||||
- "4317:4317" # otlp grpc
|
||||
depends_on:
|
||||
- tempo-init
|
||||
- memcached
|
||||
|
||||
volumes:
|
||||
tempo-data:
|
||||
|
|
|
@ -10,8 +10,6 @@ use nomos_node::{
|
|||
DA_TOPIC,
|
||||
};
|
||||
use overwatch_rs::overwatch::*;
|
||||
use tracing::{span, Level};
|
||||
use uuid::Uuid;
|
||||
// internal
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
|
@ -53,13 +51,6 @@ fn main() -> Result<()> {
|
|||
cryptarchia_args,
|
||||
)?;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
let debug_span = {
|
||||
let debug_id = Uuid::new_v4();
|
||||
span!(Level::DEBUG, "Nomos", debug_id = debug_id.to_string())
|
||||
};
|
||||
#[cfg(debug_assertions)]
|
||||
let _guard = debug_span.enter();
|
||||
let app = OverwatchRunner::<NomosExecutor>::run(
|
||||
NomosExecutorServiceSettings {
|
||||
network: config.network,
|
||||
|
|
|
@ -11,8 +11,6 @@ use nomos_core::{da::blob::info::DispersedBlobInfo, tx::Transaction};
|
|||
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
|
||||
|
||||
use overwatch_rs::overwatch::*;
|
||||
use tracing::{span, Level};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
|
@ -53,13 +51,6 @@ fn main() -> Result<()> {
|
|||
cryptarchia_args,
|
||||
)?;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
let debug_span = {
|
||||
let debug_id = Uuid::new_v4();
|
||||
span!(Level::DEBUG, "Nomos", debug_id = debug_id.to_string())
|
||||
};
|
||||
#[cfg(debug_assertions)]
|
||||
let _guard = debug_span.enter();
|
||||
let app = OverwatchRunner::<Nomos>::run(
|
||||
NomosServiceSettings {
|
||||
network: config.network,
|
||||
|
|
|
@ -12,6 +12,7 @@ nomos-da-network-core = { path = "../../../nomos-da/network/core" }
|
|||
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
|
||||
nomos-da-sampling = { path = "../sampling" }
|
||||
nomos-mempool = { path = "../../mempool", features = ["libp2p"] }
|
||||
nomos-tracing = { path = "../../../nomos-tracing" }
|
||||
kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
|
||||
rand = "0.8"
|
||||
|
|
|
@ -2,7 +2,10 @@ use crate::adapters::{mempool::DaMempoolAdapter, network::DispersalNetworkAdapte
|
|||
use std::time::Duration;
|
||||
|
||||
use nomos_core::da::{blob::metadata, DaDispersal, DaEncoder};
|
||||
use nomos_tracing::info_with_id;
|
||||
use overwatch_rs::DynError;
|
||||
use std::fmt::Debug;
|
||||
use tracing::instrument;
|
||||
|
||||
pub mod kzgrs;
|
||||
|
||||
|
@ -13,8 +16,8 @@ pub trait DispersalBackend {
|
|||
type Dispersal: DaDispersal<EncodedData = <Self::Encoder as DaEncoder>::EncodedData>;
|
||||
type NetworkAdapter: DispersalNetworkAdapter;
|
||||
type MempoolAdapter: DaMempoolAdapter;
|
||||
type Metadata: metadata::Metadata + Send;
|
||||
type BlobId: Send;
|
||||
type Metadata: Debug + metadata::Metadata + Send;
|
||||
type BlobId: AsRef<[u8]> + Send;
|
||||
|
||||
fn init(
|
||||
config: Self::Settings,
|
||||
|
@ -36,12 +39,14 @@ pub trait DispersalBackend {
|
|||
metadata: Self::Metadata,
|
||||
) -> Result<(), DynError>;
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn process_dispersal(
|
||||
&self,
|
||||
data: Vec<u8>,
|
||||
metadata: Self::Metadata,
|
||||
) -> Result<(), DynError> {
|
||||
let (blob_id, encoded_data) = self.encode(data).await?;
|
||||
info_with_id!(blob_id.as_ref(), "ProcessDispersal");
|
||||
self.disperse(encoded_data).await?;
|
||||
// let disperse and replication happen before pushing to mempool
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::marker::PhantomData;
|
|||
// crates
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::log::error;
|
||||
use tracing::error;
|
||||
// internal
|
||||
use crate::adapters::mempool::DaMempoolAdapter;
|
||||
use crate::adapters::network::DispersalNetworkAdapter;
|
||||
|
@ -120,6 +120,7 @@ where
|
|||
service_state,
|
||||
..
|
||||
} = self;
|
||||
|
||||
let DispersalServiceSettings {
|
||||
backend: backend_settings,
|
||||
} = service_state.settings_reader.get_updated_settings();
|
||||
|
@ -144,6 +145,7 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ nomos-da-storage = { path = "../../../nomos-da/storage" }
|
|||
nomos-da-sampling = { path = "../sampling" }
|
||||
nomos-storage = { path = "../../../nomos-services/storage" }
|
||||
nomos-mempool = { path = "../../../nomos-services/mempool" }
|
||||
nomos-tracing = { path = "../../../nomos-tracing" }
|
||||
cryptarchia-consensus = { path = "../../../nomos-services/cryptarchia-consensus" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
|
||||
|
|
|
@ -17,6 +17,7 @@ use nomos_da_sampling::backend::DaSamplingServiceBackend;
|
|||
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter};
|
||||
use nomos_storage::backends::StorageBackend;
|
||||
use nomos_storage::StorageService;
|
||||
use nomos_tracing::info_with_id;
|
||||
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||
|
@ -28,7 +29,9 @@ use serde::de::DeserializeOwned;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use storage::DaStorageAdapter;
|
||||
use tokio::sync::oneshot::Sender;
|
||||
use tracing::error;
|
||||
use tracing::{error, instrument};
|
||||
|
||||
const DA_INDEXER_TAG: ServiceId = "DA-Indexer";
|
||||
|
||||
pub type ConsensusRelay<
|
||||
A,
|
||||
|
@ -213,7 +216,7 @@ where
|
|||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
const SERVICE_ID: ServiceId = "DaIndexer";
|
||||
const SERVICE_ID: ServiceId = DA_INDEXER_TAG;
|
||||
type Settings = IndexerSettings<DaStorage::Settings>;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
|
@ -267,7 +270,8 @@ where
|
|||
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
|
||||
ClPool::Item: Clone + Eq + Hash + Debug + 'static,
|
||||
ClPool::Key: Debug + 'static,
|
||||
DaPool::Item: Metadata + Clone + Eq + Hash + Debug + 'static,
|
||||
DaPool::Item: DispersedBlobInfo + Metadata + Clone + Eq + Hash + Debug + 'static,
|
||||
<DaPool::Item as DispersedBlobInfo>::BlobId: AsRef<[u8]>,
|
||||
DaPool::Key: Debug + 'static,
|
||||
<DaPool::Item as Metadata>::Index: Send + Sync,
|
||||
A::Backend: 'static,
|
||||
|
@ -283,22 +287,28 @@ where
|
|||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_new_block(
|
||||
storage_adapter: &DaStorage,
|
||||
block: Block<ClPool::Item, DaPool::Item>,
|
||||
) -> Result<(), DynError> {
|
||||
for info in block.blobs() {
|
||||
info_with_id!(info.blob_id().as_ref(), "HandleNewBlock");
|
||||
storage_adapter.add_index(info).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_da_msg(
|
||||
storage_adapter: &DaStorage,
|
||||
msg: DaMsg<B, DaPool::Item>,
|
||||
) -> Result<(), DynError> {
|
||||
match msg {
|
||||
DaMsg::AddIndex { info } => storage_adapter.add_index(&info).await,
|
||||
DaMsg::AddIndex { info } => {
|
||||
info_with_id!(info.blob_id().as_ref(), "AddIndex");
|
||||
storage_adapter.add_index(&info).await
|
||||
}
|
||||
DaMsg::GetRange {
|
||||
app_id,
|
||||
range,
|
||||
|
@ -401,7 +411,7 @@ where
|
|||
+ 'static,
|
||||
<DaPool::Item as Metadata>::AppId: Send + Sync,
|
||||
<DaPool::Item as Metadata>::Index: Send + Sync,
|
||||
|
||||
<<DaPool as MemPool>::Item as DispersedBlobInfo>::BlobId: AsRef<[u8]>,
|
||||
A::Backend: 'static,
|
||||
TxS: TxSelect<Tx = ClPool::Item>,
|
||||
BS: BlobSelect<BlobId = DaPool::Item>,
|
||||
|
@ -434,6 +444,7 @@ where
|
|||
consensus_relay,
|
||||
storage_relay,
|
||||
} = self;
|
||||
|
||||
let consensus_relay = consensus_relay
|
||||
.connect()
|
||||
.await
|
||||
|
@ -471,6 +482,7 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd"
|
|||
nomos-core = { path = "../../../nomos-core/chain-defs" }
|
||||
nomos-libp2p = { path = "../../../nomos-libp2p" }
|
||||
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
|
||||
nomos-tracing = { path = "../../../nomos-tracing" }
|
||||
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
|
||||
libp2p = { version = "0.53", features = ["ed25519"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -16,6 +16,7 @@ use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalE
|
|||
use nomos_da_network_core::swarm::executor::ExecutorSwarm;
|
||||
use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_libp2p::ed25519;
|
||||
use nomos_tracing::info_with_id;
|
||||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use overwatch_rs::services::state::NoState;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -29,6 +30,7 @@ use tokio::sync::broadcast;
|
|||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream};
|
||||
use tracing::instrument;
|
||||
|
||||
/// Message that the backend replies to
|
||||
#[derive(Debug)]
|
||||
|
@ -217,18 +219,21 @@ where
|
|||
executor_handle.abort();
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn process(&self, msg: Self::Message) {
|
||||
match msg {
|
||||
ExecutorDaNetworkMessage::RequestSample {
|
||||
subnetwork_id,
|
||||
blob_id,
|
||||
} => {
|
||||
info_with_id!(&blob_id, "RequestSample");
|
||||
handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await;
|
||||
}
|
||||
ExecutorDaNetworkMessage::RequestDispersal {
|
||||
subnetwork_id,
|
||||
da_blob,
|
||||
} => {
|
||||
info_with_id!(&da_blob.id(), "RequestDispersal");
|
||||
if let Err(e) = self.dispersal_blobs_sender.send((subnetwork_id, *da_blob)) {
|
||||
error!("Could not send internal blob to underlying dispersal behaviour: {e}");
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ use nomos_core::da::BlobId;
|
|||
use nomos_da_network_core::swarm::validator::ValidatorSwarm;
|
||||
use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_libp2p::ed25519;
|
||||
use nomos_tracing::info_with_id;
|
||||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use overwatch_rs::services::state::NoState;
|
||||
use std::fmt::Debug;
|
||||
|
@ -21,6 +22,7 @@ use tokio::sync::broadcast;
|
|||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use tracing::instrument;
|
||||
|
||||
/// Message that the backend replies to
|
||||
#[derive(Debug)]
|
||||
|
@ -148,12 +150,14 @@ where
|
|||
replies_handle.abort();
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn process(&self, msg: Self::Message) {
|
||||
match msg {
|
||||
DaNetworkMessage::RequestSample {
|
||||
subnetwork_id,
|
||||
blob_id,
|
||||
} => {
|
||||
info_with_id!(&blob_id, "RequestSample");
|
||||
handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ use tokio::sync::oneshot;
|
|||
use tracing::error;
|
||||
// internal
|
||||
|
||||
const DA_NETWORK_TAG: ServiceId = "DA-Network";
|
||||
|
||||
pub enum DaNetworkMsg<B: NetworkBackend> {
|
||||
Process(B::Message),
|
||||
Subscribe {
|
||||
|
@ -61,7 +63,7 @@ pub struct NetworkState<B: NetworkBackend> {
|
|||
}
|
||||
|
||||
impl<B: NetworkBackend + 'static + Send> ServiceData for NetworkService<B> {
|
||||
const SERVICE_ID: ServiceId = "DaNetwork";
|
||||
const SERVICE_ID: ServiceId = DA_NETWORK_TAG;
|
||||
type Settings = NetworkConfig<B>;
|
||||
type State = NetworkState<B>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
|
@ -94,6 +96,7 @@ where
|
|||
},
|
||||
mut backend,
|
||||
} = self;
|
||||
|
||||
let mut lifecycle_stream = lifecycle_handle.message_stream();
|
||||
loop {
|
||||
tokio::select! {
|
||||
|
@ -108,6 +111,7 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ nomos-da-network-core = { path = "../../../nomos-da/network/core" }
|
|||
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
|
||||
nomos-da-storage = { path = "../../../nomos-da/storage" }
|
||||
nomos-storage = { path = "../../../nomos-services/storage" }
|
||||
nomos-tracing = { path = "../../../nomos-tracing" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -9,8 +9,10 @@ use rand::prelude::*;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time;
|
||||
use tokio::time::Interval;
|
||||
use tracing::instrument;
|
||||
|
||||
use kzgrs_backend::common::blob::DaBlob;
|
||||
use nomos_tracing::info_with_id;
|
||||
|
||||
//
|
||||
// internal
|
||||
|
@ -71,8 +73,10 @@ impl<R: Rng + Sync + Send> DaSamplingServiceBackend<R> for KzgrsSamplingBackend<
|
|||
self.validated_blobs.clone()
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn mark_completed(&mut self, blobs_ids: &[Self::BlobId]) {
|
||||
for id in blobs_ids {
|
||||
info_with_id!(id, "MarkInBlock");
|
||||
self.pending_sampling_blobs.remove(id);
|
||||
self.validated_blobs.remove(id);
|
||||
}
|
||||
|
|
|
@ -22,9 +22,10 @@ use rand::{RngCore, SeedableRng};
|
|||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{error, span, Instrument, Level};
|
||||
use tracing::{error, instrument};
|
||||
// internal
|
||||
use backend::{DaSamplingServiceBackend, SamplingState};
|
||||
use nomos_tracing::{error_with_id, info_with_id};
|
||||
use storage::DaStorageAdapter;
|
||||
|
||||
const DA_SAMPLING_TAG: ServiceId = "DA-Sampling";
|
||||
|
@ -93,6 +94,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_service_message(
|
||||
msg: <Self as ServiceData>::Message,
|
||||
network_adapter: &mut DaNetwork,
|
||||
|
@ -102,13 +104,14 @@ where
|
|||
DaSamplingServiceMsg::TriggerSampling { blob_id } => {
|
||||
if let SamplingState::Init(sampling_subnets) = sampler.init_sampling(blob_id).await
|
||||
{
|
||||
info_with_id!(blob_id, "TriggerSampling");
|
||||
if let Err(e) = network_adapter
|
||||
.start_sampling(blob_id, &sampling_subnets)
|
||||
.await
|
||||
{
|
||||
// we can short circuit the failure from beginning
|
||||
sampler.handle_sampling_error(blob_id).await;
|
||||
error!("Error sampling for BlobId: {blob_id:?}: {e}");
|
||||
error_with_id!(blob_id, "Error sampling for BlobId: {blob_id:?}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,6 +127,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_sampling_message(
|
||||
event: SamplingEvent,
|
||||
sampler: &mut Backend,
|
||||
|
@ -131,10 +135,12 @@ where
|
|||
) {
|
||||
match event {
|
||||
SamplingEvent::SamplingSuccess { blob_id, blob } => {
|
||||
info_with_id!(blob_id, "SamplingSuccess");
|
||||
sampler.handle_sampling_success(blob_id, *blob).await;
|
||||
}
|
||||
SamplingEvent::SamplingError { error } => {
|
||||
if let Some(blob_id) = error.blob_id() {
|
||||
info_with_id!(blob_id, "SamplingError");
|
||||
sampler.handle_sampling_error(*blob_id).await;
|
||||
return;
|
||||
}
|
||||
|
@ -145,6 +151,7 @@ where
|
|||
column_idx,
|
||||
response_sender,
|
||||
} => {
|
||||
info_with_id!(blob_id, "SamplingRequest");
|
||||
let maybe_blob = storage_adapter
|
||||
.get_blob(blob_id, column_idx)
|
||||
.await
|
||||
|
@ -225,6 +232,7 @@ where
|
|||
mut service_state,
|
||||
mut sampler,
|
||||
} = self;
|
||||
|
||||
let DaSamplingServiceSettings {
|
||||
storage_adapter_settings,
|
||||
..
|
||||
|
@ -240,30 +248,26 @@ where
|
|||
let storage_adapter = DaStorage::new(storage_adapter_settings, storage_relay).await;
|
||||
|
||||
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||
async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(service_message) = service_state.inbound_relay.recv() => {
|
||||
Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await;
|
||||
}
|
||||
Some(sampling_message) = sampling_message_stream.next() => {
|
||||
Self::handle_sampling_message(sampling_message, &mut sampler, &storage_adapter).await;
|
||||
}
|
||||
Some(msg) = lifecycle_stream.next() => {
|
||||
if Self::should_stop_service(msg).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// cleanup not on time samples
|
||||
_ = next_prune_tick.tick() => {
|
||||
sampler.prune();
|
||||
}
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(service_message) = service_state.inbound_relay.recv() => {
|
||||
Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await;
|
||||
}
|
||||
Some(sampling_message) = sampling_message_stream.next() => {
|
||||
Self::handle_sampling_message(sampling_message, &mut sampler, &storage_adapter).await;
|
||||
}
|
||||
Some(msg) = lifecycle_stream.next() => {
|
||||
if Self::should_stop_service(msg).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// cleanup not on time samples
|
||||
_ = next_prune_tick.tick() => {
|
||||
sampler.prune();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
.instrument(span!(Level::TRACE, DA_SAMPLING_TAG))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -16,13 +16,14 @@ nomos-da-storage = { path = "../../../nomos-da/storage" }
|
|||
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
|
||||
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
|
||||
nomos-storage = { path = "../../../nomos-services/storage" }
|
||||
nomos-tracing = { path = "../../../nomos-tracing" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
|
||||
tokio = { version = "1", features = ["sync", "macros"] }
|
||||
tokio-stream = "0.1.15"
|
||||
tracing = "0.1"
|
||||
tracing = { version = "0.1", features = ["attributes"] }
|
||||
|
||||
[features]
|
||||
rocksdb-backend = ["nomos-storage/rocksdb-backend"]
|
||||
|
|
|
@ -18,10 +18,12 @@ use overwatch_rs::DynError;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::oneshot::Sender;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{error, span, Instrument, Level};
|
||||
use tracing::error;
|
||||
use tracing::instrument;
|
||||
// internal
|
||||
use backend::VerifierBackend;
|
||||
use network::NetworkAdapter;
|
||||
use nomos_tracing::info_with_id;
|
||||
use storage::DaStorageAdapter;
|
||||
|
||||
const DA_VERIFIER_TAG: ServiceId = "DA-Verifier";
|
||||
|
@ -66,10 +68,12 @@ where
|
|||
Backend::DaBlob: Debug + Send,
|
||||
Backend::Error: Error + Send + Sync,
|
||||
Backend::Settings: Clone,
|
||||
<Backend::DaBlob as Blob>::BlobId: AsRef<[u8]>,
|
||||
N: NetworkAdapter<Blob = Backend::DaBlob> + Send + 'static,
|
||||
N::Settings: Clone,
|
||||
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + 'static,
|
||||
{
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_new_blob(
|
||||
verifier: &Backend,
|
||||
storage_adapter: &S,
|
||||
|
@ -80,8 +84,10 @@ where
|
|||
.await?
|
||||
.is_some()
|
||||
{
|
||||
info_with_id!(blob.id().as_ref(), "VerifierBlobExists");
|
||||
Ok(())
|
||||
} else {
|
||||
info_with_id!(blob.id().as_ref(), "VerifierAddBlob");
|
||||
verifier.verify(blob)?;
|
||||
storage_adapter.add_blob(blob, &()).await?;
|
||||
Ok(())
|
||||
|
@ -128,6 +134,7 @@ where
|
|||
Backend::Settings: Clone + Send + Sync + 'static,
|
||||
Backend::DaBlob: Debug + Send + Sync + 'static,
|
||||
Backend::Error: Error + Send + Sync + 'static,
|
||||
<Backend::DaBlob as Blob>::BlobId: AsRef<[u8]>,
|
||||
N: NetworkAdapter<Blob = Backend::DaBlob> + Send + Sync + 'static,
|
||||
N::Settings: Clone + Send + Sync + 'static,
|
||||
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + Sync + 'static,
|
||||
|
@ -173,36 +180,36 @@ where
|
|||
let storage_adapter = S::new(storage_adapter_settings, storage_relay).await;
|
||||
|
||||
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||
async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(blob) = blob_stream.next() => {
|
||||
if let Err(err) = Self::handle_new_blob(&verifier,&storage_adapter, &blob).await {
|
||||
error!("Error handling blob {blob:?} due to {err:?}");
|
||||
}
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(blob) = blob_stream.next() => {
|
||||
if let Err(err) = Self::handle_new_blob(&verifier,&storage_adapter, &blob).await {
|
||||
error!("Error handling blob {blob:?} due to {err:?}");
|
||||
}
|
||||
Some(msg) = service_state.inbound_relay.recv() => {
|
||||
let DaVerifierMsg::AddBlob { blob, reply_channel } = msg;
|
||||
match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await {
|
||||
Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) {
|
||||
}
|
||||
Some(msg) = service_state.inbound_relay.recv() => {
|
||||
let DaVerifierMsg::AddBlob { blob, reply_channel } = msg;
|
||||
match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await {
|
||||
Ok(attestation) => {
|
||||
if let Err(err) = reply_channel.send(Some(attestation)) {
|
||||
error!("Error replying attestation {err:?}");
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Error handling blob {blob:?} due to {err:?}");
|
||||
if let Err(err) = reply_channel.send(None) {
|
||||
error!("Error replying attestation {err:?}");
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
Some(msg) = lifecycle_stream.next() => {
|
||||
if Self::should_stop_service(msg).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Error handling blob {blob:?} due to {err:?}");
|
||||
if let Err(err) = reply_channel.send(None) {
|
||||
error!("Error replying attestation {err:?}");
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
Some(msg) = lifecycle_stream.next() => {
|
||||
if Self::should_stop_service(msg).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}.instrument(span!(Level::TRACE, DA_VERIFIER_TAG)).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ opentelemetry-otlp = { version = "0.26", features = ["grpc-tonic", "http-proto",
|
|||
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] }
|
||||
opentelemetry-http = { version = "0.26", features = ["reqwest"] }
|
||||
opentelemetry-semantic-conventions = "0.26"
|
||||
rand = "0.8"
|
||||
reqwest = "0.12"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
tokio = "1"
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
pub mod prelude {
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub use opentelemetry::{
|
||||
global,
|
||||
trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
|
||||
Context,
|
||||
};
|
||||
pub use rand;
|
||||
pub use tracing::Span;
|
||||
pub use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
// In some places it makes sense to use third party ids such as blob_id or tx_id as a tracing
|
||||
// id. This allows to track the time during which the entity is propagated throughout the
|
||||
// system.
|
||||
//
|
||||
// Opentelemetry tracing standard has a specific remote context format which is supported by
|
||||
// most tracing software.
|
||||
// More information at https://www.w3.org/TR/trace-context/#traceparent-header
|
||||
pub fn set_remote_context(trace_id: TraceId, span_id: SpanId) -> HashMap<String, String> {
|
||||
let mut carrier = HashMap::new();
|
||||
carrier.insert(
|
||||
"traceparent".to_string(),
|
||||
format!("00-{trace_id}-{span_id}-01"),
|
||||
);
|
||||
|
||||
carrier
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! info_with_id {
|
||||
($idbytes:expr, $msg:expr $(, $key:ident = $value:expr)*) => {{
|
||||
use $crate::tracing::macros::prelude::*;
|
||||
use std::convert::TryInto;
|
||||
|
||||
let trace_id = TraceId::from_bytes($idbytes[..16].try_into().unwrap());
|
||||
let span_id = SpanId::from_bytes(rand::random::<[u8; 8]>());
|
||||
|
||||
let parent_context = global::get_text_map_propagator(|propagator| {
|
||||
propagator.extract(&set_remote_context(trace_id, span_id))
|
||||
});
|
||||
|
||||
let current_span = Span::current();
|
||||
current_span.set_parent(parent_context);
|
||||
|
||||
tracing::info!(
|
||||
trace_id = %trace_id,
|
||||
$msg $(, $key = $value)*
|
||||
);
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! error_with_id {
|
||||
($idbytes:expr, $msg:expr $(, $key:ident = $value:expr)*) => {{
|
||||
use $crate::tracing::macros::prelude::*;
|
||||
use std::convert::TryInto;
|
||||
|
||||
let trace_id = TraceId::from_bytes($idbytes[..16].try_into().unwrap());
|
||||
let span_id = SpanId::from_bytes(rand::random::<[u8; 8]>());
|
||||
|
||||
let parent_context = global::get_text_map_propagator(|propagator| {
|
||||
propagator.extract(&set_remote_context(trace_id, span_id))
|
||||
});
|
||||
|
||||
let current_span = Span::current();
|
||||
current_span.set_parent(parent_context);
|
||||
|
||||
tracing::error!(
|
||||
trace_id = %trace_id,
|
||||
$msg $(, $key = $value)*
|
||||
);
|
||||
}};
|
||||
}
|
|
@ -1 +1,2 @@
|
|||
pub mod macros;
|
||||
pub mod otlp;
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
// std
|
||||
use std::error::Error;
|
||||
// crates
|
||||
use opentelemetry::{global, trace::TracerProvider as _};
|
||||
use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_sdk::trace::{BatchConfig, Sampler, Tracer};
|
||||
use opentelemetry_sdk::{
|
||||
propagation::TraceContextPropagator,
|
||||
trace::{BatchConfig, Sampler, Tracer},
|
||||
Resource,
|
||||
};
|
||||
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::Subscriber;
|
||||
use tracing_opentelemetry::OpenTelemetryLayer;
|
||||
|
@ -15,6 +20,7 @@ use url::Url;
|
|||
pub struct OtlpTracingConfig {
|
||||
pub endpoint: Url,
|
||||
pub sample_ratio: f64,
|
||||
pub service_name: String,
|
||||
}
|
||||
|
||||
pub fn create_otlp_tracing_layer<S>(
|
||||
|
@ -26,15 +32,21 @@ where
|
|||
let otel_exporter = opentelemetry_otlp::new_exporter()
|
||||
.tonic()
|
||||
.with_endpoint(config.endpoint);
|
||||
let resource = Resource::new(vec![KeyValue::new(SERVICE_NAME, config.service_name)]);
|
||||
let tracer_provider = opentelemetry_otlp::new_pipeline()
|
||||
.tracing()
|
||||
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_sampler(
|
||||
Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(config.sample_ratio))),
|
||||
))
|
||||
.with_trace_config(
|
||||
opentelemetry_sdk::trace::Config::default()
|
||||
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
|
||||
config.sample_ratio,
|
||||
))))
|
||||
.with_resource(resource),
|
||||
)
|
||||
.with_batch_config(BatchConfig::default())
|
||||
.with_exporter(otel_exporter)
|
||||
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
|
||||
|
||||
global::set_text_map_propagator(TraceContextPropagator::new());
|
||||
global::set_tracer_provider(tracer_provider.clone());
|
||||
let tracer: opentelemetry_sdk::trace::Tracer = tracer_provider.tracer("NomosTracer");
|
||||
|
||||
|
|
|
@ -200,6 +200,7 @@ fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> Gene
|
|||
tracing: nomos_tracing_service::TracingLayer::Otlp(OtlpTracingConfig {
|
||||
endpoint: params.tempo_endpoint,
|
||||
sample_ratio: 1.0,
|
||||
service_name: identifier.clone(),
|
||||
}),
|
||||
filter: FilterLayer::None,
|
||||
metrics: MetricsLayer::Otlp(OtlpMetricsConfig {
|
||||
|
|
|
@ -18,6 +18,7 @@ datasources:
|
|||
is_default: false
|
||||
version: 1
|
||||
editable: true
|
||||
uid: tempods
|
||||
|
||||
- name: Loki
|
||||
type: loki
|
||||
|
@ -27,4 +28,10 @@ datasources:
|
|||
is_default: false
|
||||
version: 1
|
||||
editable: true
|
||||
jsonData:
|
||||
derivedFields:
|
||||
- name: trace_id
|
||||
matcherRegex: "\"trace_id\":\"(\\w+)\""
|
||||
url: "$${__value.raw}"
|
||||
datasourceUid: tempods
|
||||
|
||||
|
|
|
@ -3,16 +3,6 @@ server:
|
|||
http_listen_port: 3200
|
||||
log_level: info
|
||||
|
||||
|
||||
cache:
|
||||
background:
|
||||
writeback_goroutines: 5
|
||||
caches:
|
||||
- roles:
|
||||
- frontend-search
|
||||
memcached:
|
||||
host: memcached:11211
|
||||
|
||||
query_frontend:
|
||||
search:
|
||||
duration_slo: 5s
|
||||
|
|
|
@ -12,7 +12,11 @@ use tests::topology::TopologyConfig;
|
|||
|
||||
const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715";
|
||||
|
||||
async fn disseminate(executor: &Executor, data: &[u8]) {
|
||||
async fn disseminate_with_metadata(
|
||||
executor: &Executor,
|
||||
data: &[u8],
|
||||
metadata: kzgrs_backend::dispersal::Metadata,
|
||||
) {
|
||||
let executor_config = executor.config();
|
||||
|
||||
let client = ClientBuilder::new()
|
||||
|
@ -23,8 +27,6 @@ async fn disseminate(executor: &Executor, data: &[u8]) {
|
|||
let exec_url = Url::parse(&format!("http://{}", backend_address)).unwrap();
|
||||
let client = ExecutorHttpClient::new(client, exec_url);
|
||||
|
||||
let app_id = hex::decode(APP_ID).unwrap();
|
||||
let metadata = kzgrs_backend::dispersal::Metadata::new(app_id.try_into().unwrap(), 0u64.into());
|
||||
client.publish_blob(data.to_vec(), metadata).await.unwrap();
|
||||
}
|
||||
|
||||
|
@ -34,15 +36,18 @@ async fn disseminate_and_retrieve() {
|
|||
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;
|
||||
let executor = &topology.executors()[0];
|
||||
let validator = &topology.validators()[0];
|
||||
|
||||
let data = [1u8; 31];
|
||||
let app_id = hex::decode(APP_ID).unwrap();
|
||||
let metadata =
|
||||
kzgrs_backend::dispersal::Metadata::new(app_id.clone().try_into().unwrap(), 0u64.into());
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(15)).await;
|
||||
disseminate(executor, &data).await;
|
||||
disseminate_with_metadata(executor, &data, metadata).await;
|
||||
tokio::time::sleep(Duration::from_secs(20)).await;
|
||||
|
||||
let from = 0u64.to_be_bytes();
|
||||
let to = 1u64.to_be_bytes();
|
||||
let app_id = hex::decode(APP_ID).unwrap();
|
||||
|
||||
let executor_blobs = executor
|
||||
.get_indexer_range(app_id.clone().try_into().unwrap(), from..to)
|
||||
|
@ -80,15 +85,18 @@ async fn disseminate_and_retrieve() {
|
|||
async fn disseminate_retrieve_reconstruct() {
|
||||
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;
|
||||
let executor = &topology.executors()[0];
|
||||
|
||||
let data = [1u8; 31];
|
||||
let app_id = hex::decode(APP_ID).unwrap();
|
||||
let metadata =
|
||||
kzgrs_backend::dispersal::Metadata::new(app_id.clone().try_into().unwrap(), 0u64.into());
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(15)).await;
|
||||
disseminate(executor, &data).await;
|
||||
disseminate_with_metadata(executor, &data, metadata).await;
|
||||
tokio::time::sleep(Duration::from_secs(20)).await;
|
||||
|
||||
let from = 0u64.to_be_bytes();
|
||||
let to = 1u64.to_be_bytes();
|
||||
let app_id = hex::decode(APP_ID).unwrap();
|
||||
|
||||
let executor_blobs = executor
|
||||
.get_indexer_range(app_id.clone().try_into().unwrap(), from..to)
|
||||
|
@ -106,3 +114,35 @@ async fn disseminate_retrieve_reconstruct() {
|
|||
let reconstructed = reconstruct_without_missing_data(&blobs);
|
||||
assert_eq!(reconstructed, data);
|
||||
}
|
||||
|
||||
#[ignore = "for local debugging"]
|
||||
#[tokio::test]
|
||||
async fn local_testnet() {
|
||||
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;
|
||||
let executor = &topology.executors()[0];
|
||||
let app_id = hex::decode(APP_ID).expect("Invalid APP_ID");
|
||||
|
||||
let mut index = 0u64;
|
||||
loop {
|
||||
disseminate_with_metadata(
|
||||
executor,
|
||||
&generate_data(index),
|
||||
create_metadata(&app_id, index),
|
||||
)
|
||||
.await;
|
||||
|
||||
index += 1;
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_data(index: u64) -> Vec<u8> {
|
||||
(index as u8..index as u8 + 31).collect()
|
||||
}
|
||||
|
||||
fn create_metadata(app_id: &[u8], index: u64) -> kzgrs_backend::dispersal::Metadata {
|
||||
kzgrs_backend::dispersal::Metadata::new(
|
||||
app_id.try_into().expect("Failed to convert APP_ID"),
|
||||
index.into(),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ impl GeneralTracingConfig {
|
|||
}),
|
||||
tracing: TracingLayer::Otlp(OtlpTracingConfig {
|
||||
endpoint: "http://localhost:4317".try_into().unwrap(),
|
||||
sample_ratio: 0.1,
|
||||
sample_ratio: 0.5,
|
||||
service_name: host_identifier.clone(),
|
||||
}),
|
||||
filter: FilterLayer::EnvFilter(nomos_tracing::filter::envfilter::EnvFilterConfig {
|
||||
// Allow events only from modules that matches the regex, if it matches - use
|
||||
|
|
Loading…
Reference in New Issue