From b884e1ceca1a1148f23babb6e062c5487b70548b Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Tue, 27 Jun 2023 16:29:01 +0200 Subject: [PATCH] Fix waku backend (#231) * Fix order of network streams When fetching a message from the network, we need to first listen for incoming messages and then look at the storage. If we do this in the opposite order, there's a brief moment where we've freezed our view of stored messages and are not yet listening for incoming ones, thus risking loosing messages. * Wait for waku db spurious delays Sometimes waku takes some time (e.g. a few seconds) before making a received message available through the archive query. Let's account for this by making repeated calls if the first one is not successful. * Add initial network wait We've observed in testing that even if waku reports that some peers are connected it can't really deliver a message. To overcome this limitation, we add a wait at the network service startup. We know this is not ideal, but Waku will eventually be replaced and we're looking for a quick fix. * fmt --- .../consensus/src/network/adapters/waku.rs | 50 ++++++++++++++----- nomos-services/network/src/lib.rs | 6 +++ 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index c70c677c..79ee45af 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -54,15 +54,14 @@ impl WakuAdapter { } async fn archive_subscriber_stream( - &self, + network_relay: OutboundRelay< as ServiceData>::Message>, content_topic: WakuContentTopic, ) -> Result< Box + Send + Sync + Unpin>, tokio::sync::oneshot::error::RecvError, > { let (sender, receiver) = tokio::sync::oneshot::channel(); - if let Err((_, _e)) = self - .network_relay + if let Err((_, _e)) = network_relay .send(NetworkMsg::Process(WakuBackendMessage::ArchiveSubscribe { query: StoreQuery { pubsub_topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), @@ -86,15 +85,40 @@ impl WakuAdapter { content_topic: WakuContentTopic, ) -> impl Stream { // create stream request tasks - let live_stream_channel_task = self.message_subscriber_channel(); - let archive_stream_task = self.archive_subscriber_stream(content_topic.clone()); - // wait for both tasks to complete - let (live_stream_channel_result, archive_stream_result) = - futures::join!(live_stream_channel_task, archive_stream_task); - // unwrap results - let live_stream_channel = - live_stream_channel_result.expect("live stream channel from waku network"); - let archive_stream = archive_stream_result.expect("archive stream from waku network"); + let live_stream_channel = self + .message_subscriber_channel() + .await + .expect("live stream channel from waku network"); + + struct InnerState { + first: bool, + topic: WakuContentTopic, + relay: OutboundRelay< as ServiceData>::Message>, + } + + let state = InnerState { + first: true, + topic: content_topic.clone(), + relay: self.network_relay.clone(), + }; + + // Sometimes waku takes a while make a message available in the archive, so we keep polling the archive until we get the message we want. + // This stream will generate a new archive stream every 100ms until the message is found, chaining them together. + // We expect this to be a rare occurrence, normal operation would result in this future being discarded before we even try to make a second request. + let archive_stream = futures::stream::unfold(state, |mut state| async move { + if !state.first { + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + state.first = false; + Some(( + Self::archive_subscriber_stream(state.relay.clone(), state.topic.clone()) + .await + .expect("archive stream from waku network"), + state, + )) + }) + .flatten(); + let live_stream = BroadcastStream::new(live_stream_channel) .zip(futures::stream::repeat(content_topic)) .filter_map(|(msg, content_topic)| async move { @@ -107,7 +131,7 @@ impl WakuAdapter { _ => None, } }); - tokio_stream::StreamExt::merge(archive_stream, live_stream) + tokio_stream::StreamExt::merge(live_stream, archive_stream) } async fn broadcast(&self, bytes: Box<[u8]>, topic: WakuContentTopic) { diff --git a/nomos-services/network/src/lib.rs b/nomos-services/network/src/lib.rs index c2670cad..01dc5a81 100644 --- a/nomos-services/network/src/lib.rs +++ b/nomos-services/network/src/lib.rs @@ -82,6 +82,12 @@ where } async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + tracing::debug!("Starting up..."); + // this wait seems to be helpful in some cases for waku, where it reports + // to be connected to peers but does not seem to be able to send messages + // to them + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let Self { service_state: ServiceStateHandle { mut inbound_relay, ..