feature(tracing): Bandwidth on Mixnet (#933)
This commit is contained in:
parent
25d92390fa
commit
72797e633d
@ -13,6 +13,7 @@ nomos-mix = { path = "../core" }
|
|||||||
nomos-mix-message = { path = "../message" }
|
nomos-mix-message = { path = "../message" }
|
||||||
sha2 = "0.10"
|
sha2 = "0.10"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
opentelemetry = "0.27.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
|
||||||
|
@ -14,6 +14,12 @@ use libp2p::{
|
|||||||
Stream, StreamProtocol,
|
Stream, StreamProtocol,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Metrics
|
||||||
|
const VALUE_FULLY_NEGOTIATED_INBOUND: &str = "fully_negotiated_inbound";
|
||||||
|
const VALUE_FULLY_NEGOTIATED_OUTBOUND: &str = "fully_negotiated_outbound";
|
||||||
|
const VALUE_DIAL_UPGRADE_ERROR: &str = "dial_upgrade_error";
|
||||||
|
const VALUE_IGNORED: &str = "ignored";
|
||||||
|
|
||||||
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0");
|
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0");
|
||||||
|
|
||||||
// TODO: Consider replacing this struct with libp2p_stream ConnectionHandler
|
// TODO: Consider replacing this struct with libp2p_stream ConnectionHandler
|
||||||
@ -99,6 +105,11 @@ impl ConnectionHandler for MixConnectionHandler {
|
|||||||
) -> Poll<
|
) -> Poll<
|
||||||
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
|
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
|
||||||
> {
|
> {
|
||||||
|
tracing::info!(gauge.pending_outbound_messages = self.outbound_msgs.len() as u64,);
|
||||||
|
tracing::info!(
|
||||||
|
gauge.pending_events_to_behaviour = self.pending_events_to_behaviour.len() as u64,
|
||||||
|
);
|
||||||
|
|
||||||
// Process pending events to be sent to the behaviour
|
// Process pending events to be sent to the behaviour
|
||||||
if let Some(event) = self.pending_events_to_behaviour.pop_front() {
|
if let Some(event) = self.pending_events_to_behaviour.pop_front() {
|
||||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
|
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
|
||||||
@ -202,13 +213,14 @@ impl ConnectionHandler for MixConnectionHandler {
|
|||||||
Self::OutboundOpenInfo,
|
Self::OutboundOpenInfo,
|
||||||
>,
|
>,
|
||||||
) {
|
) {
|
||||||
match event {
|
let event_name = match event {
|
||||||
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
|
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
|
||||||
protocol: stream,
|
protocol: stream,
|
||||||
..
|
..
|
||||||
}) => {
|
}) => {
|
||||||
tracing::debug!("FullyNegotiatedInbound: Creating inbound substream");
|
tracing::debug!("FullyNegotiatedInbound: Creating inbound substream");
|
||||||
self.inbound_substream = Some(recv_msg(stream).boxed())
|
self.inbound_substream = Some(recv_msg(stream).boxed());
|
||||||
|
VALUE_FULLY_NEGOTIATED_INBOUND
|
||||||
}
|
}
|
||||||
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
|
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
|
||||||
protocol: stream,
|
protocol: stream,
|
||||||
@ -218,6 +230,7 @@ impl ConnectionHandler for MixConnectionHandler {
|
|||||||
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
|
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
|
||||||
self.pending_events_to_behaviour
|
self.pending_events_to_behaviour
|
||||||
.push_back(ToBehaviour::FullyNegotiatedOutbound);
|
.push_back(ToBehaviour::FullyNegotiatedOutbound);
|
||||||
|
VALUE_FULLY_NEGOTIATED_OUTBOUND
|
||||||
}
|
}
|
||||||
ConnectionEvent::DialUpgradeError(e) => {
|
ConnectionEvent::DialUpgradeError(e) => {
|
||||||
tracing::error!("DialUpgradeError: {:?}", e);
|
tracing::error!("DialUpgradeError: {:?}", e);
|
||||||
@ -238,13 +251,16 @@ impl ConnectionHandler for MixConnectionHandler {
|
|||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
StreamUpgradeError::Apply(_) => unreachable!(),
|
StreamUpgradeError::Apply(_) => unreachable!(),
|
||||||
}
|
};
|
||||||
|
VALUE_DIAL_UPGRADE_ERROR
|
||||||
}
|
}
|
||||||
event => {
|
event => {
|
||||||
tracing::debug!("Ignoring connection event: {:?}", event)
|
tracing::debug!("Ignoring connection event: {:?}", event);
|
||||||
|
VALUE_IGNORED
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
|
tracing::info!(counter.connection_event = 1, event = event_name);
|
||||||
self.try_wake();
|
self.try_wake();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -171,8 +171,13 @@ where
|
|||||||
async fn handle_swarm_message(&mut self, msg: MixSwarmMessage) {
|
async fn handle_swarm_message(&mut self, msg: MixSwarmMessage) {
|
||||||
match msg {
|
match msg {
|
||||||
MixSwarmMessage::Publish(msg) => {
|
MixSwarmMessage::Publish(msg) => {
|
||||||
|
let msg_size = msg.len();
|
||||||
if let Err(e) = self.swarm.behaviour_mut().publish(msg) {
|
if let Err(e) = self.swarm.behaviour_mut().publish(msg) {
|
||||||
tracing::error!("Failed to publish message to mix network: {e:?}");
|
tracing::error!("Failed to publish message to mix network: {e:?}");
|
||||||
|
tracing::info!(counter.failed_outbound_messages = 1);
|
||||||
|
} else {
|
||||||
|
tracing::info!(counter.successful_outbound_messages = 1);
|
||||||
|
tracing::info!(histogram.sent_data = msg_size as u64);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -182,15 +187,23 @@ where
|
|||||||
match event {
|
match event {
|
||||||
SwarmEvent::Behaviour(nomos_mix_network::Event::Message(msg)) => {
|
SwarmEvent::Behaviour(nomos_mix_network::Event::Message(msg)) => {
|
||||||
tracing::debug!("Received message from a peer: {msg:?}");
|
tracing::debug!("Received message from a peer: {msg:?}");
|
||||||
|
|
||||||
|
let msg_size = msg.len();
|
||||||
if let Err(e) = self.incoming_message_sender.send(msg) {
|
if let Err(e) = self.incoming_message_sender.send(msg) {
|
||||||
tracing::error!("Failed to send incoming message to channel: {e}");
|
tracing::error!("Failed to send incoming message to channel: {e}");
|
||||||
|
tracing::info!(counter.failed_inbound_messages = 1);
|
||||||
|
} else {
|
||||||
|
tracing::info!(counter.successful_inbound_messages = 1);
|
||||||
|
tracing::info!(histogram.received_data = msg_size as u64);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => {
|
SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => {
|
||||||
tracing::error!("Received error from mix network: {e:?}");
|
tracing::error!("Received error from mix network: {e:?}");
|
||||||
|
tracing::info!(counter.error = 1);
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
tracing::debug!("Received event from mix network: {event:?}");
|
tracing::debug!("Received event from mix network: {event:?}");
|
||||||
|
tracing::info!(counter.ignored_event = 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user