Fix waku backend (#231)
* Fix order of network streams When fetching a message from the network, we need to first listen for incoming messages and then look at the storage. If we do this in the opposite order, there's a brief moment where we've freezed our view of stored messages and are not yet listening for incoming ones, thus risking loosing messages. * Wait for waku db spurious delays Sometimes waku takes some time (e.g. a few seconds) before making a received message available through the archive query. Let's account for this by making repeated calls if the first one is not successful. * Add initial network wait We've observed in testing that even if waku reports that some peers are connected it can't really deliver a message. To overcome this limitation, we add a wait at the network service startup. We know this is not ideal, but Waku will eventually be replaced and we're looking for a quick fix. * fmt
This commit is contained in:
parent
55648e3151
commit
b884e1ceca
|
@ -54,15 +54,14 @@ impl WakuAdapter {
|
|||
}
|
||||
|
||||
async fn archive_subscriber_stream(
|
||||
&self,
|
||||
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
|
||||
content_topic: WakuContentTopic,
|
||||
) -> Result<
|
||||
Box<dyn Stream<Item = WakuMessage> + Send + Sync + Unpin>,
|
||||
tokio::sync::oneshot::error::RecvError,
|
||||
> {
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||
if let Err((_, _e)) = self
|
||||
.network_relay
|
||||
if let Err((_, _e)) = network_relay
|
||||
.send(NetworkMsg::Process(WakuBackendMessage::ArchiveSubscribe {
|
||||
query: StoreQuery {
|
||||
pubsub_topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
|
||||
|
@ -86,15 +85,40 @@ impl WakuAdapter {
|
|||
content_topic: WakuContentTopic,
|
||||
) -> impl Stream<Item = WakuMessage> {
|
||||
// create stream request tasks
|
||||
let live_stream_channel_task = self.message_subscriber_channel();
|
||||
let archive_stream_task = self.archive_subscriber_stream(content_topic.clone());
|
||||
// wait for both tasks to complete
|
||||
let (live_stream_channel_result, archive_stream_result) =
|
||||
futures::join!(live_stream_channel_task, archive_stream_task);
|
||||
// unwrap results
|
||||
let live_stream_channel =
|
||||
live_stream_channel_result.expect("live stream channel from waku network");
|
||||
let archive_stream = archive_stream_result.expect("archive stream from waku network");
|
||||
let live_stream_channel = self
|
||||
.message_subscriber_channel()
|
||||
.await
|
||||
.expect("live stream channel from waku network");
|
||||
|
||||
struct InnerState {
|
||||
first: bool,
|
||||
topic: WakuContentTopic,
|
||||
relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
|
||||
}
|
||||
|
||||
let state = InnerState {
|
||||
first: true,
|
||||
topic: content_topic.clone(),
|
||||
relay: self.network_relay.clone(),
|
||||
};
|
||||
|
||||
// Sometimes waku takes a while make a message available in the archive, so we keep polling the archive until we get the message we want.
|
||||
// This stream will generate a new archive stream every 100ms until the message is found, chaining them together.
|
||||
// We expect this to be a rare occurrence, normal operation would result in this future being discarded before we even try to make a second request.
|
||||
let archive_stream = futures::stream::unfold(state, |mut state| async move {
|
||||
if !state.first {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
}
|
||||
state.first = false;
|
||||
Some((
|
||||
Self::archive_subscriber_stream(state.relay.clone(), state.topic.clone())
|
||||
.await
|
||||
.expect("archive stream from waku network"),
|
||||
state,
|
||||
))
|
||||
})
|
||||
.flatten();
|
||||
|
||||
let live_stream = BroadcastStream::new(live_stream_channel)
|
||||
.zip(futures::stream::repeat(content_topic))
|
||||
.filter_map(|(msg, content_topic)| async move {
|
||||
|
@ -107,7 +131,7 @@ impl WakuAdapter {
|
|||
_ => None,
|
||||
}
|
||||
});
|
||||
tokio_stream::StreamExt::merge(archive_stream, live_stream)
|
||||
tokio_stream::StreamExt::merge(live_stream, archive_stream)
|
||||
}
|
||||
|
||||
async fn broadcast(&self, bytes: Box<[u8]>, topic: WakuContentTopic) {
|
||||
|
|
|
@ -82,6 +82,12 @@ where
|
|||
}
|
||||
|
||||
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
||||
tracing::debug!("Starting up...");
|
||||
// this wait seems to be helpful in some cases for waku, where it reports
|
||||
// to be connected to peers but does not seem to be able to send messages
|
||||
// to them
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
let Self {
|
||||
service_state: ServiceStateHandle {
|
||||
mut inbound_relay, ..
|
||||
|
|
Loading…
Reference in New Issue