From 8cc37385b3cfd9bea583bad73230d788b0302d4f Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Wed, 15 Feb 2023 16:49:49 +0100 Subject: [PATCH] Waku cached streams consensus adapter (#70) * Added waku archive message to waku network backend * Use cached streams in consensus waku adapter * Fix mock test * Add missing import * Join requests tasks * Use waku-bindings beta4 * Get stream from archive query method * Set store protocol active for waku backend * Implement local query stream response * Add missing linking flags for new waku-bindings version * Cleanup unbounded sender fuse/unwrap * Clippy happy --- .cargo/config.toml | 2 +- nomos-services/consensus/Cargo.toml | 2 +- .../consensus/src/network/adapters/waku.rs | 123 +++++++++++------- nomos-services/mempool/Cargo.toml | 2 +- nomos-services/network/Cargo.toml | 3 +- nomos-services/network/src/backends/waku.rs | 115 +++++++++++++++- 6 files changed, 188 insertions(+), 59 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 5bc05eb0..4838dd95 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,4 +1,4 @@ [target.'cfg(target_os = "macos")'] # when using osx, we need to link against some golang libraries, it did just work with this missing flags # from: https://github.com/golang/go/issues/42459 -rustflags = ["-C", "link-args=-framework CoreFoundation -framework Security"] \ No newline at end of file +rustflags = ["-C", "link-args=-framework CoreFoundation -framework Security -framework CoreServices"] \ No newline at end of file diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index 73e7bfc5..e93ed8bc 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -18,7 +18,7 @@ nomos-core = { path = "../../nomos-core" } tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" futures = "0.3" -waku-bindings = { version = "0.1.0-beta2", optional = true} +waku-bindings = { version = "0.1.0-beta4", optional = true} tracing = "0.1" [features] diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index e0cfb2e0..aeeaefac 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -16,7 +16,9 @@ use nomos_network::{ NetworkMsg, NetworkService, }; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; -use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; +use waku_bindings::{ + ContentFilter, Encoding, StoreQuery, WakuContentTopic, WakuMessage, WakuPubSubTopic, +}; pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = WakuPubSubTopic::new("CarnotSim", Encoding::Proto); @@ -48,6 +50,63 @@ impl WakuAdapter { }; receiver.await } + + async fn archive_subscriber_stream( + &self, + content_topic: WakuContentTopic, + ) -> Result< + Box + Send + Sync + Unpin>, + tokio::sync::oneshot::error::RecvError, + > { + let (sender, receiver) = tokio::sync::oneshot::channel(); + if let Err((_, _e)) = self + .network_relay + .send(NetworkMsg::Process(WakuBackendMessage::ArchiveSubscribe { + query: StoreQuery { + pubsub_topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + content_filters: vec![ContentFilter::new(content_topic)], + // TODO: maybe handle limits through configuration + start_time: None, + end_time: None, + paging_options: None, + }, + reply_channel: sender, + })) + .await + { + todo!("log error"); + }; + receiver.await + } + + async fn cached_stream_with_content_topic( + &self, + content_topic: WakuContentTopic, + ) -> impl Stream { + // create stream request tasks + let live_stream_channel_task = self.message_subscriber_channel(); + let archive_stream_task = self.archive_subscriber_stream(content_topic.clone()); + // wait for both tasks to complete + let (live_stream_channel_result, archive_stream_result) = + futures::join!(live_stream_channel_task, archive_stream_task); + // unwrap results + let live_stream_channel = + live_stream_channel_result.expect("live stream channel from waku network"); + let archive_stream = archive_stream_result.expect("archive stream from waku network"); + let live_stream = BroadcastStream::new(live_stream_channel) + .zip(futures::stream::repeat(content_topic)) + .filter_map(|(msg, content_topic)| async move { + match msg { + Ok(NetworkEvent::RawMessage(message)) + if message.content_topic() == &content_topic => + { + Some(message) + } + _ => None, + } + }); + tokio_stream::StreamExt::merge(archive_stream, live_stream) + } } #[async_trait::async_trait] @@ -65,32 +124,15 @@ impl NetworkAdapter for WakuAdapter { committee: Committee, view: &View, ) -> Box + Send + Sync + Unpin> { - let stream_channel = self - .message_subscriber_channel() - .await - .unwrap_or_else(|_e| todo!("handle error")); let content_topic = proposal_topic(committee, view); - Box::new( - BroadcastStream::new(stream_channel) - .zip(futures::stream::repeat(content_topic)) - .filter_map(|(msg, content_topic)| { - Box::pin(async move { - match msg { - Ok(event) => match event { - NetworkEvent::RawMessage(message) => { - if &content_topic == message.content_topic() { - let payload = message.payload(); - Some(ProposalChunkMsg::from_bytes(payload).chunk) - } else { - None - } - } - }, - Err(_e) => None, - } - }) + Box::new(Box::pin( + self.cached_stream_with_content_topic(content_topic) + .await + .map(|message| { + let payload = message.payload(); + ProposalChunkMsg::from_bytes(payload).chunk }), - ) + )) } async fn broadcast_block_chunk( @@ -124,30 +166,15 @@ impl NetworkAdapter for WakuAdapter { committee: Committee, view: &View, ) -> Box + Send> { - let content_topic = approval_topic(committee, view); - let stream_channel = self - .message_subscriber_channel() - .await - .unwrap_or_else(|_e| todo!("handle error")); - Box::new( - BroadcastStream::new(stream_channel) - .zip(futures::stream::repeat(content_topic)) - .filter_map(|(msg, content_topic)| async move { - match msg { - Ok(event) => match event { - NetworkEvent::RawMessage(message) => { - if &content_topic == message.content_topic() { - let payload = message.payload(); - Some(ApprovalMsg::from_bytes(payload).approval) - } else { - None - } - } - }, - Err(_e) => None, - } + let content_topic = proposal_topic(committee, view); + Box::new(Box::pin( + self.cached_stream_with_content_topic(content_topic) + .await + .map(|message| { + let payload = message.payload(); + ApprovalMsg::from_bytes(payload).approval }), - ) + )) } async fn forward_approval( diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 4b2bb6cb..77af3eb7 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -20,7 +20,7 @@ thiserror = "1.0" tracing = "0.1" tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" -waku-bindings = { version = "0.1.0-beta3", optional = true} +waku-bindings = { version = "0.1.0-beta4", optional = true} [dev-dependencies] nomos-log = { path = "../log" } diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index 61838c8a..687e0307 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -15,10 +15,11 @@ serde = "1.0" sscanf = { version = "0.4", optional = true } sled = { version = "0.34", optional = true } tokio = { version = "1", features = ["sync"] } +tokio-stream = "0.1" thiserror = "1.0" tracing = "0.1" rand = { version = "0.8", optional = true } -waku-bindings = { version = "0.1.0-beta3", optional = true } +waku-bindings = { version = "0.1.0-beta4", optional = true } tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["json"] } tracing-gelf = "0.7" diff --git a/nomos-services/network/src/backends/waku.rs b/nomos-services/network/src/backends/waku.rs index d1fa341e..61847c5c 100644 --- a/nomos-services/network/src/backends/waku.rs +++ b/nomos-services/network/src/backends/waku.rs @@ -1,11 +1,18 @@ -use super::*; -use overwatch_rs::services::state::NoState; +// std +use std::fmt::Formatter; +use std::future::Future; +// crates +use futures::Stream; use serde::{Deserialize, Serialize}; use tokio::sync::{ broadcast::{self, Receiver, Sender}, oneshot, }; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error}; +// internal +use super::*; +use overwatch_rs::services::state::NoState; use waku_bindings::*; const BROADCAST_CHANNEL_BUF: usize = 16; @@ -29,7 +36,6 @@ pub struct WakuConfig { } /// Interaction with Waku node -#[derive(Debug)] pub enum WakuBackendMessage { /// Send a message to the network Broadcast { @@ -42,11 +48,16 @@ pub enum WakuBackendMessage { RelaySubscribe { topic: WakuPubSubTopic }, /// Unsubscribe from a particular Waku topic RelayUnsubscribe { topic: WakuPubSubTopic }, + /// Get a local cached stream of messages for a particular content topic + ArchiveSubscribe { + query: StoreQuery, + reply_channel: oneshot::Sender + Send + Sync + Unpin>>, + }, /// Retrieve old messages from another peer StoreQuery { query: StoreQuery, peer_id: PeerId, - response: oneshot::Sender, + reply_channel: oneshot::Sender, }, /// Send a message using Waku Light Push LightpushPublish { @@ -59,6 +70,49 @@ pub enum WakuBackendMessage { }, } +impl Debug for WakuBackendMessage { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + WakuBackendMessage::Broadcast { message, .. } => f + .debug_struct("WakuBackendMessage::Broadcast") + .field("message", message) + .finish(), + WakuBackendMessage::ConnectPeer { addr } => f + .debug_struct("WakuBackendMessage::ConnectPeer") + .field("addr", addr) + .finish(), + WakuBackendMessage::RelaySubscribe { topic } => f + .debug_struct("WakuBackendMessage::RelaySubscribe") + .field("topic", topic) + .finish(), + WakuBackendMessage::RelayUnsubscribe { topic } => f + .debug_struct("WakuBackendMessage::RelayUnsubscribe") + .field("topic", topic) + .finish(), + WakuBackendMessage::ArchiveSubscribe { query, .. } => f + .debug_struct("WakuBackendMessage::ArchiveSubscribe") + .field("query", query) + .finish(), + WakuBackendMessage::StoreQuery { query, peer_id, .. } => f + .debug_struct("WakuBackendMessage::StoreQuery") + .field("query", query) + .field("peer_id", peer_id) + .finish(), + WakuBackendMessage::LightpushPublish { + message, + topic, + peer_id, + } => f + .debug_struct("WakuBackendMessage::LightpushPublish") + .field("message", message) + .field("topic", topic) + .field("peer_id", peer_id) + .finish(), + WakuBackendMessage::Info { .. } => f.debug_struct("WakuBackendMessage::Info").finish(), + } + } +} + #[derive(Debug)] pub enum EventKind { Message, @@ -69,6 +123,41 @@ pub enum NetworkEvent { RawMessage(WakuMessage), } +impl Waku { + pub fn waku_store_query_stream( + &self, + mut query: StoreQuery, + ) -> ( + impl Stream, + impl Future + '_, + ) { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let task = async move { + while let Ok(StoreResponse { + messages, + paging_options, + }) = self.waku.local_store_query(&query) + { + // send messages + for message in messages { + // this could fail if the receiver is dropped + // break out of the loop in that case + if sender.send(message).is_err() { + break; + } + } + // stop queries if we do not have any more pages + if let Some(paging_options) = paging_options { + query.paging_options = Some(paging_options); + } else { + break; + } + } + }; + (UnboundedReceiverStream::new(receiver), task) + } +} + #[async_trait::async_trait] impl NetworkBackend for Waku { type Settings = WakuConfig; @@ -77,7 +166,9 @@ impl NetworkBackend for Waku { type EventKind = EventKind; type NetworkEvent = NetworkEvent; - fn new(config: Self::Settings) -> Self { + fn new(mut config: Self::Settings) -> Self { + // set store protocol to active at all times + config.inner.store = Some(true); let waku = waku_new(Some(config.inner)).unwrap().start().unwrap(); tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]); for peer in &config.initial_peers { @@ -161,14 +252,14 @@ impl NetworkBackend for Waku { WakuBackendMessage::StoreQuery { query, peer_id, - response, + reply_channel, } => match self.waku.store_query(&query, &peer_id, None) { Ok(res) => { debug!( "successfully retrieved stored messages with options {:?}", query ); - response + reply_channel .send(res) .unwrap_or_else(|_| error!("client hung up store query handle")); } @@ -192,6 +283,16 @@ impl NetworkBackend for Waku { error!("could not send waku info"); } } + WakuBackendMessage::ArchiveSubscribe { + reply_channel, + query, + } => { + let (stream, task) = self.waku_store_query_stream(query); + if reply_channel.send(Box::new(stream)).is_err() { + error!("could not send archive subscribe stream"); + } + task.await; + } }; }