diff --git a/nomos-services/network/src/backends/libp2p/mixnet.rs b/nomos-services/network/src/backends/libp2p/mixnet.rs index 2ff72b28..382f7059 100644 --- a/nomos-services/network/src/backends/libp2p/mixnet.rs +++ b/nomos-services/network/src/backends/libp2p/mixnet.rs @@ -1,6 +1,6 @@ use std::{ops::Range, time::Duration}; -use mixnet_client::MixnetClient; +use mixnet_client::{MessageStream, MixnetClient}; use nomos_core::wire; use rand::{rngs::OsRng, thread_rng, Rng}; use serde::{Deserialize, Serialize}; @@ -47,11 +47,26 @@ impl MixnetHandler { } pub async fn run(&mut self) { - let Ok(mut stream) = self.client.run().await else { - tracing::error!("Could not quickstart mixnet stream"); - return; - }; + const BASE_DELAY: Duration = Duration::from_secs(5); + // we need this loop to help us reestablish the connection in case + // the mixnet client fails for whatever reason + let mut backoff = 0; + loop { + match self.client.run().await { + Ok(stream) => { + backoff = 0; + self.handle_stream(stream).await; + } + Err(e) => { + tracing::error!("mixnet client error: {e}"); + backoff += 1; + tokio::time::sleep(BASE_DELAY * backoff).await; + } + } + } + } + async fn handle_stream(&mut self, mut stream: MessageStream) { while let Some(result) = stream.next().await { match result { Ok(msg) => { @@ -72,7 +87,8 @@ impl MixnetHandler { .unwrap_or_else(|_| tracing::error!("could not schedule broadcast")); } Err(e) => { - todo!("Handle mixclient error: {e}"); + tracing::error!("mixnet client stream error: {e}"); + return; } } }