Waku cached streams consensus adapter (#70)

* Added waku archive message to waku network backend

* Use cached streams in consensus waku adapter

* Fix mock test

* Add missing import

* Join requests tasks

* Use waku-bindings beta4

* Get stream from archive query method

* Set store protocol active for waku backend

* Implement local query stream response

* Add missing linking flags for new waku-bindings version

* Cleanup unbounded sender fuse/unwrap

* Clippy happy
This commit is contained in:
Daniel Sanchez 2023-02-15 16:49:49 +01:00 committed by GitHub
parent 1d195959d8
commit 8cc37385b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 188 additions and 59 deletions

View File

@ -1,4 +1,4 @@
[target.'cfg(target_os = "macos")']
# when using osx, we need to link against some golang libraries, it did just work with this missing flags
# from: https://github.com/golang/go/issues/42459
rustflags = ["-C", "link-args=-framework CoreFoundation -framework Security"]
rustflags = ["-C", "link-args=-framework CoreFoundation -framework Security -framework CoreServices"]

View File

@ -18,7 +18,7 @@ nomos-core = { path = "../../nomos-core" }
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
futures = "0.3"
waku-bindings = { version = "0.1.0-beta2", optional = true}
waku-bindings = { version = "0.1.0-beta4", optional = true}
tracing = "0.1"
[features]

View File

@ -16,7 +16,9 @@ use nomos_network::{
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic};
use waku_bindings::{
ContentFilter, Encoding, StoreQuery, WakuContentTopic, WakuMessage, WakuPubSubTopic,
};
pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
@ -48,6 +50,63 @@ impl WakuAdapter {
};
receiver.await
}
async fn archive_subscriber_stream(
&self,
content_topic: WakuContentTopic,
) -> Result<
Box<dyn Stream<Item = WakuMessage> + Send + Sync + Unpin>,
tokio::sync::oneshot::error::RecvError,
> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::ArchiveSubscribe {
query: StoreQuery {
pubsub_topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
content_filters: vec![ContentFilter::new(content_topic)],
// TODO: maybe handle limits through configuration
start_time: None,
end_time: None,
paging_options: None,
},
reply_channel: sender,
}))
.await
{
todo!("log error");
};
receiver.await
}
async fn cached_stream_with_content_topic(
&self,
content_topic: WakuContentTopic,
) -> impl Stream<Item = WakuMessage> {
// create stream request tasks
let live_stream_channel_task = self.message_subscriber_channel();
let archive_stream_task = self.archive_subscriber_stream(content_topic.clone());
// wait for both tasks to complete
let (live_stream_channel_result, archive_stream_result) =
futures::join!(live_stream_channel_task, archive_stream_task);
// unwrap results
let live_stream_channel =
live_stream_channel_result.expect("live stream channel from waku network");
let archive_stream = archive_stream_result.expect("archive stream from waku network");
let live_stream = BroadcastStream::new(live_stream_channel)
.zip(futures::stream::repeat(content_topic))
.filter_map(|(msg, content_topic)| async move {
match msg {
Ok(NetworkEvent::RawMessage(message))
if message.content_topic() == &content_topic =>
{
Some(message)
}
_ => None,
}
});
tokio_stream::StreamExt::merge(archive_stream, live_stream)
}
}
#[async_trait::async_trait]
@ -65,32 +124,15 @@ impl NetworkAdapter for WakuAdapter {
committee: Committee,
view: &View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let stream_channel = self
.message_subscriber_channel()
.await
.unwrap_or_else(|_e| todo!("handle error"));
let content_topic = proposal_topic(committee, view);
Box::new(
BroadcastStream::new(stream_channel)
.zip(futures::stream::repeat(content_topic))
.filter_map(|(msg, content_topic)| {
Box::pin(async move {
match msg {
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
if &content_topic == message.content_topic() {
let payload = message.payload();
Some(ProposalChunkMsg::from_bytes(payload).chunk)
} else {
None
}
}
},
Err(_e) => None,
}
})
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.map(|message| {
let payload = message.payload();
ProposalChunkMsg::from_bytes(payload).chunk
}),
)
))
}
async fn broadcast_block_chunk(
@ -124,30 +166,15 @@ impl NetworkAdapter for WakuAdapter {
committee: Committee,
view: &View,
) -> Box<dyn Stream<Item = Approval> + Send> {
let content_topic = approval_topic(committee, view);
let stream_channel = self
.message_subscriber_channel()
.await
.unwrap_or_else(|_e| todo!("handle error"));
Box::new(
BroadcastStream::new(stream_channel)
.zip(futures::stream::repeat(content_topic))
.filter_map(|(msg, content_topic)| async move {
match msg {
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
if &content_topic == message.content_topic() {
let payload = message.payload();
Some(ApprovalMsg::from_bytes(payload).approval)
} else {
None
}
}
},
Err(_e) => None,
}
let content_topic = proposal_topic(committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.map(|message| {
let payload = message.payload();
ApprovalMsg::from_bytes(payload).approval
}),
)
))
}
async fn forward_approval(

View File

@ -20,7 +20,7 @@ thiserror = "1.0"
tracing = "0.1"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
waku-bindings = { version = "0.1.0-beta3", optional = true}
waku-bindings = { version = "0.1.0-beta4", optional = true}
[dev-dependencies]
nomos-log = { path = "../log" }

View File

@ -15,10 +15,11 @@ serde = "1.0"
sscanf = { version = "0.4", optional = true }
sled = { version = "0.34", optional = true }
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
thiserror = "1.0"
tracing = "0.1"
rand = { version = "0.8", optional = true }
waku-bindings = { version = "0.1.0-beta3", optional = true }
waku-bindings = { version = "0.1.0-beta4", optional = true }
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["json"] }
tracing-gelf = "0.7"

View File

@ -1,11 +1,18 @@
use super::*;
use overwatch_rs::services::state::NoState;
// std
use std::fmt::Formatter;
use std::future::Future;
// crates
use futures::Stream;
use serde::{Deserialize, Serialize};
use tokio::sync::{
broadcast::{self, Receiver, Sender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error};
// internal
use super::*;
use overwatch_rs::services::state::NoState;
use waku_bindings::*;
const BROADCAST_CHANNEL_BUF: usize = 16;
@ -29,7 +36,6 @@ pub struct WakuConfig {
}
/// Interaction with Waku node
#[derive(Debug)]
pub enum WakuBackendMessage {
/// Send a message to the network
Broadcast {
@ -42,11 +48,16 @@ pub enum WakuBackendMessage {
RelaySubscribe { topic: WakuPubSubTopic },
/// Unsubscribe from a particular Waku topic
RelayUnsubscribe { topic: WakuPubSubTopic },
/// Get a local cached stream of messages for a particular content topic
ArchiveSubscribe {
query: StoreQuery,
reply_channel: oneshot::Sender<Box<dyn Stream<Item = WakuMessage> + Send + Sync + Unpin>>,
},
/// Retrieve old messages from another peer
StoreQuery {
query: StoreQuery,
peer_id: PeerId,
response: oneshot::Sender<StoreResponse>,
reply_channel: oneshot::Sender<StoreResponse>,
},
/// Send a message using Waku Light Push
LightpushPublish {
@ -59,6 +70,49 @@ pub enum WakuBackendMessage {
},
}
impl Debug for WakuBackendMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
WakuBackendMessage::Broadcast { message, .. } => f
.debug_struct("WakuBackendMessage::Broadcast")
.field("message", message)
.finish(),
WakuBackendMessage::ConnectPeer { addr } => f
.debug_struct("WakuBackendMessage::ConnectPeer")
.field("addr", addr)
.finish(),
WakuBackendMessage::RelaySubscribe { topic } => f
.debug_struct("WakuBackendMessage::RelaySubscribe")
.field("topic", topic)
.finish(),
WakuBackendMessage::RelayUnsubscribe { topic } => f
.debug_struct("WakuBackendMessage::RelayUnsubscribe")
.field("topic", topic)
.finish(),
WakuBackendMessage::ArchiveSubscribe { query, .. } => f
.debug_struct("WakuBackendMessage::ArchiveSubscribe")
.field("query", query)
.finish(),
WakuBackendMessage::StoreQuery { query, peer_id, .. } => f
.debug_struct("WakuBackendMessage::StoreQuery")
.field("query", query)
.field("peer_id", peer_id)
.finish(),
WakuBackendMessage::LightpushPublish {
message,
topic,
peer_id,
} => f
.debug_struct("WakuBackendMessage::LightpushPublish")
.field("message", message)
.field("topic", topic)
.field("peer_id", peer_id)
.finish(),
WakuBackendMessage::Info { .. } => f.debug_struct("WakuBackendMessage::Info").finish(),
}
}
}
#[derive(Debug)]
pub enum EventKind {
Message,
@ -69,6 +123,41 @@ pub enum NetworkEvent {
RawMessage(WakuMessage),
}
impl Waku {
pub fn waku_store_query_stream(
&self,
mut query: StoreQuery,
) -> (
impl Stream<Item = WakuMessage>,
impl Future<Output = ()> + '_,
) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let task = async move {
while let Ok(StoreResponse {
messages,
paging_options,
}) = self.waku.local_store_query(&query)
{
// send messages
for message in messages {
// this could fail if the receiver is dropped
// break out of the loop in that case
if sender.send(message).is_err() {
break;
}
}
// stop queries if we do not have any more pages
if let Some(paging_options) = paging_options {
query.paging_options = Some(paging_options);
} else {
break;
}
}
};
(UnboundedReceiverStream::new(receiver), task)
}
}
#[async_trait::async_trait]
impl NetworkBackend for Waku {
type Settings = WakuConfig;
@ -77,7 +166,9 @@ impl NetworkBackend for Waku {
type EventKind = EventKind;
type NetworkEvent = NetworkEvent;
fn new(config: Self::Settings) -> Self {
fn new(mut config: Self::Settings) -> Self {
// set store protocol to active at all times
config.inner.store = Some(true);
let waku = waku_new(Some(config.inner)).unwrap().start().unwrap();
tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]);
for peer in &config.initial_peers {
@ -161,14 +252,14 @@ impl NetworkBackend for Waku {
WakuBackendMessage::StoreQuery {
query,
peer_id,
response,
reply_channel,
} => match self.waku.store_query(&query, &peer_id, None) {
Ok(res) => {
debug!(
"successfully retrieved stored messages with options {:?}",
query
);
response
reply_channel
.send(res)
.unwrap_or_else(|_| error!("client hung up store query handle"));
}
@ -192,6 +283,16 @@ impl NetworkBackend for Waku {
error!("could not send waku info");
}
}
WakuBackendMessage::ArchiveSubscribe {
reply_channel,
query,
} => {
let (stream, task) = self.waku_store_query_stream(query);
if reply_channel.send(Box::new(stream)).is_err() {
error!("could not send archive subscribe stream");
}
task.await;
}
};
}