diff --git a/nomos-services/network/src/backends/libp2p/mixnet.rs b/nomos-services/network/src/backends/libp2p/mixnet.rs index d817a0c4..2ff72b28 100644 --- a/nomos-services/network/src/backends/libp2p/mixnet.rs +++ b/nomos-services/network/src/backends/libp2p/mixnet.rs @@ -1,6 +1,6 @@ use std::{ops::Range, time::Duration}; -use mixnet_client::{MessageStream, MixnetClient}; +use mixnet_client::MixnetClient; use nomos_core::wire; use rand::{rngs::OsRng, thread_rng, Rng}; use serde::{Deserialize, Serialize}; @@ -47,26 +47,11 @@ impl MixnetHandler { } pub async fn run(&mut self) { - const BASE_DELAY: Duration = Duration::from_secs(5); - // we need this loop to help us reestablish the connection in case - // the mixnet client fails for whatever reason - let mut backoff = 0; - loop { - match self.client.run().await { - Ok(stream) => { - backoff = 0; - Self::handle_stream(self.commands_tx.clone(), stream).await; - } - Err(e) => { - tracing::error!("mixnet client error: {e}"); - backoff += 1; - tokio::time::sleep(BASE_DELAY * backoff).await; - } - } - } - } + let Ok(mut stream) = self.client.run().await else { + tracing::error!("Could not quickstart mixnet stream"); + return; + }; - async fn handle_stream(tx: mpsc::Sender, mut stream: MessageStream) { while let Some(result) = stream.next().await { match result { Ok(msg) => { @@ -77,17 +62,17 @@ impl MixnetHandler { continue; }; - tx.send(Command::DirectBroadcastAndRetry { - topic, - message, - retry_count: 0, - }) - .await - .unwrap_or_else(|_| tracing::error!("could not schedule broadcast")); + self.commands_tx + .send(Command::DirectBroadcastAndRetry { + topic, + message, + retry_count: 0, + }) + .await + .unwrap_or_else(|_| tracing::error!("could not schedule broadcast")); } Err(e) => { - tracing::error!("mixnet client stream error: {e}"); - // TODO: handle mixnet client stream error + todo!("Handle mixclient error: {e}"); } } } diff --git a/nomos-services/network/src/backends/libp2p/mod.rs b/nomos-services/network/src/backends/libp2p/mod.rs index 82a1e908..63bd0a38 100644 --- a/nomos-services/network/src/backends/libp2p/mod.rs +++ b/nomos-services/network/src/backends/libp2p/mod.rs @@ -45,6 +45,7 @@ impl NetworkBackend for Libp2p { fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE); let (events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE); + let mut mixnet_handler = MixnetHandler::new(&config, commands_tx.clone()); overwatch_handle.runtime().spawn(async move { mixnet_handler.run().await;