feature(tracing): bandwith on DA (#931)
This commit is contained in:
parent
8c3c4c1bfb
commit
72535e83b5
@ -24,6 +24,17 @@ pub enum DispersalEvent {
|
||||
/// Received a n
|
||||
IncomingMessage { message: DispersalReq },
|
||||
}
|
||||
|
||||
impl DispersalEvent {
|
||||
pub fn blob_size(&self) -> Option<usize> {
|
||||
match self {
|
||||
DispersalEvent::IncomingMessage { message } => {
|
||||
message.blob.as_ref().map(|blob| blob.data.len())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DispersalValidatorBehaviour<Membership> {
|
||||
stream_behaviour: libp2p_stream::Behaviour,
|
||||
incoming_streams: IncomingStreams,
|
||||
|
@ -12,7 +12,6 @@ use libp2p::swarm::{
|
||||
};
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use log::{error, trace};
|
||||
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
|
||||
use crate::SubnetworkId;
|
||||
@ -31,6 +30,16 @@ pub enum ReplicationEvent {
|
||||
IncomingMessage { peer_id: PeerId, message: DaMessage },
|
||||
}
|
||||
|
||||
impl ReplicationEvent {
|
||||
pub fn blob_size(&self) -> Option<usize> {
|
||||
match self {
|
||||
ReplicationEvent::IncomingMessage { message, .. } => {
|
||||
message.blob.as_ref().map(|blob| blob.data.len())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Nomos DA broadcas network behaviour
|
||||
/// This item handles the logic of the nomos da subnetworks broadcasting
|
||||
/// DA subnetworks are a logical distribution of subsets.
|
||||
|
@ -29,6 +29,12 @@ use crate::swarm::validator::ValidatorEventsStream;
|
||||
use crate::SubnetworkId;
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
|
||||
// Metrics
|
||||
const EVENT_SAMPLING: &str = "sampling";
|
||||
const EVENT_DISPERSAL_EXECUTOR_DISPERSAL: &str = "dispersal_executor_event";
|
||||
const EVENT_VALIDATOR_DISPERSAL: &str = "validator_dispersal";
|
||||
const EVENT_REPLICATION: &str = "replication";
|
||||
|
||||
pub struct ExecutorEventsStream {
|
||||
pub validator_events_stream: ValidatorEventsStream,
|
||||
pub dispersal_events_receiver: UnboundedReceiverStream<DispersalExecutorEvent>,
|
||||
@ -58,6 +64,7 @@ where
|
||||
let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver);
|
||||
let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver);
|
||||
let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver);
|
||||
|
||||
(
|
||||
Self {
|
||||
swarm: Self::build_swarm(key, membership, addresses),
|
||||
@ -161,15 +168,33 @@ where
|
||||
async fn handle_behaviour_event(&mut self, event: ExecutorBehaviourEvent<Membership>) {
|
||||
match event {
|
||||
ExecutorBehaviourEvent::Sampling(event) => {
|
||||
tracing::info!(
|
||||
counter.behaviour_events_received = 1,
|
||||
event = EVENT_SAMPLING
|
||||
);
|
||||
self.handle_sampling_event(event).await;
|
||||
}
|
||||
ExecutorBehaviourEvent::ExecutorDispersal(event) => {
|
||||
tracing::info!(
|
||||
counter.behaviour_events_received = 1,
|
||||
event = EVENT_DISPERSAL_EXECUTOR_DISPERSAL
|
||||
);
|
||||
self.handle_executor_dispersal_event(event).await;
|
||||
}
|
||||
ExecutorBehaviourEvent::ValidatorDispersal(event) => {
|
||||
tracing::info!(
|
||||
counter.behaviour_events_received = 1,
|
||||
event = EVENT_VALIDATOR_DISPERSAL,
|
||||
blob_size = event.blob_size()
|
||||
);
|
||||
self.handle_validator_dispersal_event(event).await;
|
||||
}
|
||||
ExecutorBehaviourEvent::Replication(event) => {
|
||||
tracing::info!(
|
||||
counter.behaviour_events_received = 1,
|
||||
event = EVENT_REPLICATION,
|
||||
blob_size = event.blob_size()
|
||||
);
|
||||
self.handle_replication_event(event).await;
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,11 @@ use crate::swarm::common::{
|
||||
use crate::SubnetworkId;
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
|
||||
// Metrics
|
||||
const EVENT_SAMPLING: &str = "sampling";
|
||||
const EVENT_VALIDATOR_DISPERSAL: &str = "validator_dispersal";
|
||||
const EVENT_REPLICATION: &str = "replication";
|
||||
|
||||
pub struct ValidatorEventsStream {
|
||||
pub sampling_events_receiver: UnboundedReceiverStream<SamplingEvent>,
|
||||
pub validation_events_receiver: UnboundedReceiverStream<DaBlob>,
|
||||
@ -52,6 +57,7 @@ where
|
||||
|
||||
let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver);
|
||||
let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver);
|
||||
|
||||
(
|
||||
Self {
|
||||
swarm: Self::build_swarm(key, membership, addresses),
|
||||
@ -131,12 +137,26 @@ where
|
||||
async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent<Membership>) {
|
||||
match event {
|
||||
ValidatorBehaviourEvent::Sampling(event) => {
|
||||
tracing::info!(
|
||||
counter.behaviour_events_received = 1,
|
||||
event = EVENT_SAMPLING
|
||||
);
|
||||
self.handle_sampling_event(event).await;
|
||||
}
|
||||
ValidatorBehaviourEvent::Dispersal(event) => {
|
||||
tracing::info!(
|
||||
counter.behaviour_events_received = 1,
|
||||
event = EVENT_VALIDATOR_DISPERSAL,
|
||||
blob_size = event.blob_size()
|
||||
);
|
||||
self.handle_dispersal_event(event).await;
|
||||
}
|
||||
ValidatorBehaviourEvent::Replication(event) => {
|
||||
tracing::info!(
|
||||
counter.behaviour_events_received = 1,
|
||||
event = EVENT_REPLICATION,
|
||||
blob_size = event.blob_size()
|
||||
);
|
||||
self.handle_replication_event(event).await;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user