diff --git a/nomos-services/network/src/backends/libp2p/mixnet.rs b/nomos-services/network/src/backends/libp2p/mixnet.rs index 2ff72b28..d817a0c4 100644 --- a/nomos-services/network/src/backends/libp2p/mixnet.rs +++ b/nomos-services/network/src/backends/libp2p/mixnet.rs @@ -1,6 +1,6 @@ use std::{ops::Range, time::Duration}; -use mixnet_client::MixnetClient; +use mixnet_client::{MessageStream, MixnetClient}; use nomos_core::wire; use rand::{rngs::OsRng, thread_rng, Rng}; use serde::{Deserialize, Serialize}; @@ -47,11 +47,26 @@ impl MixnetHandler { } pub async fn run(&mut self) { - let Ok(mut stream) = self.client.run().await else { - tracing::error!("Could not quickstart mixnet stream"); - return; - }; + const BASE_DELAY: Duration = Duration::from_secs(5); + // we need this loop to help us reestablish the connection in case + // the mixnet client fails for whatever reason + let mut backoff = 0; + loop { + match self.client.run().await { + Ok(stream) => { + backoff = 0; + Self::handle_stream(self.commands_tx.clone(), stream).await; + } + Err(e) => { + tracing::error!("mixnet client error: {e}"); + backoff += 1; + tokio::time::sleep(BASE_DELAY * backoff).await; + } + } + } + } + async fn handle_stream(tx: mpsc::Sender, mut stream: MessageStream) { while let Some(result) = stream.next().await { match result { Ok(msg) => { @@ -62,17 +77,17 @@ impl MixnetHandler { continue; }; - self.commands_tx - .send(Command::DirectBroadcastAndRetry { - topic, - message, - retry_count: 0, - }) - .await - .unwrap_or_else(|_| tracing::error!("could not schedule broadcast")); + tx.send(Command::DirectBroadcastAndRetry { + topic, + message, + retry_count: 0, + }) + .await + .unwrap_or_else(|_| tracing::error!("could not schedule broadcast")); } Err(e) => { - todo!("Handle mixclient error: {e}"); + tracing::error!("mixnet client stream error: {e}"); + // TODO: handle mixnet client stream error } } } diff --git a/nomos-services/network/src/backends/libp2p/mod.rs b/nomos-services/network/src/backends/libp2p/mod.rs index 63bd0a38..82a1e908 100644 --- a/nomos-services/network/src/backends/libp2p/mod.rs +++ b/nomos-services/network/src/backends/libp2p/mod.rs @@ -45,7 +45,6 @@ impl NetworkBackend for Libp2p { fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE); let (events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE); - let mut mixnet_handler = MixnetHandler::new(&config, commands_tx.clone()); overwatch_handle.runtime().spawn(async move { mixnet_handler.run().await;