From 72797e633df22c49082ef169cae248a6142daac8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex?= Date: Wed, 11 Dec 2024 17:44:22 +0100 Subject: [PATCH] feature(tracing): Bandwidth on Mixnet (#933) --- nomos-mix/network/Cargo.toml | 1 + nomos-mix/network/src/handler.rs | 26 ++++++++++++++++++----- nomos-services/mix/src/backends/libp2p.rs | 13 ++++++++++++ 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/nomos-mix/network/Cargo.toml b/nomos-mix/network/Cargo.toml index d22baaf1..1648e26d 100644 --- a/nomos-mix/network/Cargo.toml +++ b/nomos-mix/network/Cargo.toml @@ -13,6 +13,7 @@ nomos-mix = { path = "../core" } nomos-mix-message = { path = "../message" } sha2 = "0.10" rand = "0.8" +opentelemetry = "0.27.1" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } diff --git a/nomos-mix/network/src/handler.rs b/nomos-mix/network/src/handler.rs index a01cc2a6..4714c8f6 100644 --- a/nomos-mix/network/src/handler.rs +++ b/nomos-mix/network/src/handler.rs @@ -14,6 +14,12 @@ use libp2p::{ Stream, StreamProtocol, }; +// Metrics +const VALUE_FULLY_NEGOTIATED_INBOUND: &str = "fully_negotiated_inbound"; +const VALUE_FULLY_NEGOTIATED_OUTBOUND: &str = "fully_negotiated_outbound"; +const VALUE_DIAL_UPGRADE_ERROR: &str = "dial_upgrade_error"; +const VALUE_IGNORED: &str = "ignored"; + const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0"); // TODO: Consider replacing this struct with libp2p_stream ConnectionHandler @@ -99,6 +105,11 @@ impl ConnectionHandler for MixConnectionHandler { ) -> Poll< ConnectionHandlerEvent, > { + tracing::info!(gauge.pending_outbound_messages = self.outbound_msgs.len() as u64,); + tracing::info!( + gauge.pending_events_to_behaviour = self.pending_events_to_behaviour.len() as u64, + ); + // Process pending events to be sent to the behaviour if let Some(event) = self.pending_events_to_behaviour.pop_front() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); @@ -202,13 +213,14 @@ impl ConnectionHandler for MixConnectionHandler { Self::OutboundOpenInfo, >, ) { - match event { + let event_name = match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol: stream, .. }) => { tracing::debug!("FullyNegotiatedInbound: Creating inbound substream"); - self.inbound_substream = Some(recv_msg(stream).boxed()) + self.inbound_substream = Some(recv_msg(stream).boxed()); + VALUE_FULLY_NEGOTIATED_INBOUND } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol: stream, @@ -218,6 +230,7 @@ impl ConnectionHandler for MixConnectionHandler { self.outbound_substream = Some(OutboundSubstreamState::Idle(stream)); self.pending_events_to_behaviour .push_back(ToBehaviour::FullyNegotiatedOutbound); + VALUE_FULLY_NEGOTIATED_OUTBOUND } ConnectionEvent::DialUpgradeError(e) => { tracing::error!("DialUpgradeError: {:?}", e); @@ -238,13 +251,16 @@ impl ConnectionHandler for MixConnectionHandler { ))); } StreamUpgradeError::Apply(_) => unreachable!(), - } + }; + VALUE_DIAL_UPGRADE_ERROR } event => { - tracing::debug!("Ignoring connection event: {:?}", event) + tracing::debug!("Ignoring connection event: {:?}", event); + VALUE_IGNORED } - } + }; + tracing::info!(counter.connection_event = 1, event = event_name); self.try_wake(); } } diff --git a/nomos-services/mix/src/backends/libp2p.rs b/nomos-services/mix/src/backends/libp2p.rs index 962b7f68..ef079485 100644 --- a/nomos-services/mix/src/backends/libp2p.rs +++ b/nomos-services/mix/src/backends/libp2p.rs @@ -171,8 +171,13 @@ where async fn handle_swarm_message(&mut self, msg: MixSwarmMessage) { match msg { MixSwarmMessage::Publish(msg) => { + let msg_size = msg.len(); if let Err(e) = self.swarm.behaviour_mut().publish(msg) { tracing::error!("Failed to publish message to mix network: {e:?}"); + tracing::info!(counter.failed_outbound_messages = 1); + } else { + tracing::info!(counter.successful_outbound_messages = 1); + tracing::info!(histogram.sent_data = msg_size as u64); } } } @@ -182,15 +187,23 @@ where match event { SwarmEvent::Behaviour(nomos_mix_network::Event::Message(msg)) => { tracing::debug!("Received message from a peer: {msg:?}"); + + let msg_size = msg.len(); if let Err(e) = self.incoming_message_sender.send(msg) { tracing::error!("Failed to send incoming message to channel: {e}"); + tracing::info!(counter.failed_inbound_messages = 1); + } else { + tracing::info!(counter.successful_inbound_messages = 1); + tracing::info!(histogram.received_data = msg_size as u64); } } SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => { tracing::error!("Received error from mix network: {e:?}"); + tracing::info!(counter.error = 1); } _ => { tracing::debug!("Received event from mix network: {event:?}"); + tracing::info!(counter.ignored_event = 1); } } }