diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index ac640dcc..48e270a9 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: true matrix: - feature: [libp2p, waku] + feature: [libp2p] steps: - uses: actions/checkout@v2 with: @@ -37,16 +37,13 @@ jobs: strategy: fail-fast: false # all OSes should be tested even if one fails (default: true) matrix: - feature: [libp2p, waku] + feature: [libp2p] os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v2 with: submodules: true - - uses: actions/setup-go@v3 # we need go to build go-waku - with: - go-version: '1.20' # Setup Rust toolchain with GNU for Windows - name: Setup Rust with GNU toolchain (Windows) if: matrix.os == 'windows-latest' @@ -81,7 +78,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - feature: [libp2p, waku] + feature: [libp2p] steps: - uses: actions/checkout@v2 with: diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index 6754eaed..5faa3265 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -17,10 +17,7 @@ jobs: with: submodules: true - name: Checkout submodules - run: git submodule update --init --recursive - - uses: actions/setup-go@v3 # we need go to build go-waku - with: - go-version: '1.19' + run: git submodule update --init --recursive - uses: actions-rs/toolchain@v1 with: profile: minimal diff --git a/Dockerfile b/Dockerfile index 1acb8e13..538f8a88 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,12 +6,9 @@ FROM rust:1.72.0-slim-bullseye AS builder RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \ >> /etc/apt/sources.list -# Dependecies for publishing documentation and building waku-bindings. +# Dependecies for publishing documentation. RUN apt-get update && apt-get install -yq \ - git clang \ - golang-src/bullseye-backports \ - golang-doc/bullseye-backports \ - golang/bullseye-backports + git clang WORKDIR /nomos COPY . . diff --git a/ci/Dockerfile b/ci/Dockerfile index d9ca48f8..e6cd77eb 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -8,12 +8,9 @@ LABEL maintainer="augustinas@status.im" \ RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \ >> /etc/apt/sources.list -# Dependecies for publishing documentation and building waku-bindings. +# Dependecies for publishing documentation. RUN apt-get update && apt-get install -yq \ - libssl-dev openssh-client git python3-pip clang \ - golang-src/bullseye-backports \ - golang-doc/bullseye-backports \ - golang/bullseye-backports + libssl-dev openssh-client git python3-pip clang RUN pip install ghp-import RUN rustup component add rustfmt clippy diff --git a/ci/Jenkinsfile.nightly.integration b/ci/Jenkinsfile.nightly.integration index f8e0f42d..33f7dcd5 100644 --- a/ci/Jenkinsfile.nightly.integration +++ b/ci/Jenkinsfile.nightly.integration @@ -35,7 +35,7 @@ pipeline { axes { axis { name 'FEATURE' - values 'waku', 'libp2p' + values 'libp2p' } } stages { diff --git a/ci/Jenkinsfile.prs.linux b/ci/Jenkinsfile.prs.linux index b6cf0311..bfef79d5 100644 --- a/ci/Jenkinsfile.prs.linux +++ b/ci/Jenkinsfile.prs.linux @@ -26,7 +26,7 @@ pipeline { axes { axis { name 'FEATURES' - values 'waku', 'libp2p' + values 'libp2p' } } stages { diff --git a/ci/Jenkinsfile.prs.macos b/ci/Jenkinsfile.prs.macos index b69ec978..6b939543 100644 --- a/ci/Jenkinsfile.prs.macos +++ b/ci/Jenkinsfile.prs.macos @@ -25,7 +25,7 @@ pipeline { axes { axis { name 'FEATURES' - values 'waku', 'libp2p' + values 'libp2p' } } stages { diff --git a/nodes/nomos-node/README.md b/nodes/nomos-node/README.md index fee51f7d..4fd2466e 100644 --- a/nodes/nomos-node/README.md +++ b/nodes/nomos-node/README.md @@ -7,7 +7,6 @@ Nomos blockchain node Nomos node can be configured with one of the following network backends: - [libp2p](../../nomos-services/backends/libp2p.rs) -- [Waku](../../nomos-services/backends/waku.rs) ### Mixclient integration diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index 07baca1d..7142f39e 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -23,7 +23,6 @@ tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" tokio-util = "0.7" tracing = "0.1" -waku-bindings = { version = "0.1.1", optional = true } bls-signatures = "0.14" serde_with = "3.0.0" nomos-libp2p = { path = "../../nomos-libp2p", optional = true } @@ -31,7 +30,6 @@ blake2 = "0.10" [features] default = [] -waku = ["nomos-network/waku", "waku-bindings"] mock = ["nomos-network/mock"] libp2p = ["nomos-network/libp2p", "nomos-libp2p"] diff --git a/nomos-services/consensus/src/network/adapters/mod.rs b/nomos-services/consensus/src/network/adapters/mod.rs index 98f3f37a..9cddaf9d 100644 --- a/nomos-services/consensus/src/network/adapters/mod.rs +++ b/nomos-services/consensus/src/network/adapters/mod.rs @@ -2,5 +2,3 @@ pub mod libp2p; #[cfg(feature = "mock")] pub mod mock; -#[cfg(feature = "waku")] -pub mod waku; diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs deleted file mode 100644 index 31987221..00000000 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ /dev/null @@ -1,317 +0,0 @@ -// std -use std::borrow::Cow; -// crates -use futures::{Stream, StreamExt}; -use tokio_stream::wrappers::BroadcastStream; -// internal -use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg}; -use crate::network::{ - messages::{ProposalMsg, VoteMsg}, - BoxedStream, NetworkAdapter, -}; -use consensus_engine::{BlockId, Committee, View}; -use nomos_network::{ - backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}, - NetworkMsg, NetworkService, -}; -use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; -use waku_bindings::{ - ContentFilter, Encoding, StoreQuery, WakuContentTopic, WakuMessage, WakuPubSubTopic, -}; - -pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = - WakuPubSubTopic::new("CarnotSim", Encoding::Proto); - -const APPLICATION_NAME: &str = "CarnotSim"; -const VERSION: usize = 1; - -#[derive(Clone)] -pub struct WakuAdapter { - network_relay: OutboundRelay< as ServiceData>::Message>, -} - -impl WakuAdapter { - async fn message_subscriber_channel( - &self, - ) -> Result< - tokio::sync::broadcast::Receiver, - tokio::sync::oneshot::error::RecvError, - > { - let (sender, receiver) = tokio::sync::oneshot::channel(); - if let Err((_, _e)) = self - .network_relay - .send(NetworkMsg::Subscribe { - kind: EventKind::Message, - sender, - }) - .await - { - todo!("log error"); - }; - receiver.await - } - - async fn archive_subscriber_stream( - network_relay: OutboundRelay< as ServiceData>::Message>, - content_topic: WakuContentTopic, - ) -> Result, tokio::sync::oneshot::error::RecvError> { - let (sender, receiver) = tokio::sync::oneshot::channel(); - if let Err((_, _e)) = network_relay - .send(NetworkMsg::Process(WakuBackendMessage::ArchiveSubscribe { - query: StoreQuery { - pubsub_topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), - content_filters: vec![ContentFilter::new(content_topic)], - // TODO: maybe handle limits through configuration - start_time: None, - end_time: None, - paging_options: None, - }, - reply_channel: sender, - })) - .await - { - todo!("log error"); - }; - receiver.await - } - - async fn cached_stream_with_content_topic( - &self, - content_topic: WakuContentTopic, - ) -> impl Stream { - // create stream request tasks - let live_stream_channel = self - .message_subscriber_channel() - .await - .expect("live stream channel from waku network"); - - struct InnerState { - first: bool, - topic: WakuContentTopic, - relay: OutboundRelay< as ServiceData>::Message>, - } - - let state = InnerState { - first: true, - topic: content_topic.clone(), - relay: self.network_relay.clone(), - }; - - // Sometimes waku takes a while make a message available in the archive, so we keep polling the archive until we get the message we want. - // This stream will generate a new archive stream every 100ms until the message is found, chaining them together. - // We expect this to be a rare occurrence, normal operation would result in this future being discarded before we even try to make a second request. - let archive_stream = futures::stream::unfold(state, |mut state| async move { - if !state.first { - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - } - state.first = false; - Some(( - Self::archive_subscriber_stream(state.relay.clone(), state.topic.clone()) - .await - .expect("archive stream from waku network"), - state, - )) - }) - .flatten(); - - let live_stream = BroadcastStream::new(live_stream_channel) - .zip(futures::stream::repeat(content_topic)) - .filter_map(|(msg, content_topic)| async move { - match msg { - Ok(NetworkEvent::RawMessage(message)) - if message.content_topic() == &content_topic => - { - Some(message) - } - _ => None, - } - }); - tokio_stream::StreamExt::merge(live_stream, archive_stream) - } - - async fn inner_broadcast(&self, payload: Box<[u8]>, content_topic: WakuContentTopic) { - let message = WakuMessage::new( - payload, - content_topic, - 1, - chrono::Utc::now() - .timestamp_nanos_opt() - .expect("timestamp should be in valid range") as usize, - [], - false, - ); - if let Err((_, e)) = self - .network_relay - .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { - message, - topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC), - })) - .await - { - tracing::error!("waku message send error: {e:?}"); - }; - } -} - -#[async_trait::async_trait] -impl NetworkAdapter for WakuAdapter { - type Backend = Waku; - - async fn new( - network_relay: OutboundRelay< as ServiceData>::Message>, - ) -> Self { - Self { network_relay } - } - - async fn proposal_chunks_stream(&self, view: View) -> BoxedStream { - Box::new(Box::pin( - self.cached_stream_with_content_topic(create_topic(PROPOSAL_TAG, None)) - .await - .filter_map(move |message| { - let payload = message.payload(); - let proposal = ProposalMsg::from_bytes(payload); - async move { - if view == proposal.view { - Some(proposal) - } else { - None - } - } - }), - )) - } - - async fn broadcast(&self, message: NetworkMessage) { - let topic = create_topic(message_tag(&message), None); - self.inner_broadcast(unwrap_message_to_bytes(&message), topic) - .await - } - - async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream { - let content_topic = create_topic(TIMEOUT_TAG, Some(committee)); - Box::new(Box::pin( - self.cached_stream_with_content_topic(content_topic) - .await - .filter_map(move |message| { - let payload = message.payload(); - let timeout = TimeoutMsg::from_bytes(payload); - async move { - if timeout.vote.view == view { - Some(timeout) - } else { - None - } - } - }), - )) - } - - async fn timeout_qc_stream(&self, view: View) -> BoxedStream { - Box::new(Box::pin( - self.cached_stream_with_content_topic(create_topic(TIMEOUT_QC_TAG, None)) - .await - .filter_map(move |message| { - let payload = message.payload(); - let qc = TimeoutQcMsg::from_bytes(payload); - async move { - if qc.qc.view() == view { - Some(qc) - } else { - None - } - } - }), - )) - } - - async fn votes_stream( - &self, - committee: &Committee, - view: View, - proposal_id: BlockId, - ) -> BoxedStream { - let content_topic = create_topic(VOTE_TAG, Some(committee)); - Box::new(Box::pin( - self.cached_stream_with_content_topic(content_topic) - .await - .filter_map(move |message| { - let payload = message.payload(); - let vote = VoteMsg::from_bytes(payload); - async move { - if vote.vote.block == proposal_id && vote.vote.view == view { - Some(vote) - } else { - None - } - } - }), - )) - } - - async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream { - let content_topic = create_topic(NEW_VIEW_TAG, Some(committee)); - Box::new(Box::pin( - self.cached_stream_with_content_topic(content_topic) - .await - .filter_map(move |message| { - let payload = message.payload(); - let new_view = NewViewMsg::from_bytes(payload); - async move { - if new_view.vote.view == view { - Some(new_view) - } else { - None - } - } - }), - )) - } - - async fn send(&self, message: NetworkMessage, committee: &Committee) { - let topic = create_topic(message_tag(&message), Some(committee)); - self.inner_broadcast(unwrap_message_to_bytes(&message), topic) - .await - } -} - -fn create_topic(tag: &str, committee: Option<&Committee>) -> WakuContentTopic { - WakuContentTopic { - application_name: Cow::Borrowed(APPLICATION_NAME), - version: VERSION, - content_topic_name: Cow::Owned(format!( - "{}{}", - tag, - committee - .map(|c| format!("-{}", c.id::())) - .unwrap_or_default() - )), - encoding: Encoding::Proto, - } -} - -// since we use content topic to filter messages, we can remove the tag from the message -fn unwrap_message_to_bytes(message: &NetworkMessage) -> Box<[u8]> { - match message { - NetworkMessage::NewView(msg) => msg.as_bytes(), - NetworkMessage::Proposal(msg) => msg.as_bytes(), - NetworkMessage::Vote(msg) => msg.as_bytes(), - NetworkMessage::Timeout(msg) => msg.as_bytes(), - NetworkMessage::TimeoutQc(msg) => msg.as_bytes(), - } -} - -fn message_tag(message: &NetworkMessage) -> &str { - match message { - NetworkMessage::NewView(_) => NEW_VIEW_TAG, - NetworkMessage::Proposal(_) => PROPOSAL_TAG, - NetworkMessage::Vote(_) => VOTE_TAG, - NetworkMessage::Timeout(_) => TIMEOUT_TAG, - NetworkMessage::TimeoutQc(_) => TIMEOUT_QC_TAG, - } -} - -const NEW_VIEW_TAG: &str = "new-view"; -const PROPOSAL_TAG: &str = "proposal"; -const VOTE_TAG: &str = "vote"; -const TIMEOUT_TAG: &str = "timeout"; -const TIMEOUT_QC_TAG: &str = "timeout-qc"; diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 7a734dca..f76ffa9e 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -19,7 +19,6 @@ thiserror = "1.0" tracing = "0.1" tokio = { version = "1", features = ["sync", "macros"] } tokio-stream = "0.1" -waku-bindings = { version = "0.1.1", optional = true} chrono = "0.4" [dev-dependencies] @@ -30,6 +29,5 @@ blake2 = "0.10" [features] default = [] -waku = ["nomos-network/waku", "waku-bindings"] mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"] libp2p = ["nomos-network/libp2p"] diff --git a/nomos-services/mempool/src/network/adapters/mod.rs b/nomos-services/mempool/src/network/adapters/mod.rs index 30adeb0a..9c581242 100644 --- a/nomos-services/mempool/src/network/adapters/mod.rs +++ b/nomos-services/mempool/src/network/adapters/mod.rs @@ -1,6 +1,3 @@ -#[cfg(feature = "waku")] -pub mod waku; - #[cfg(feature = "libp2p")] pub mod libp2p; diff --git a/nomos-services/mempool/src/network/adapters/waku.rs b/nomos-services/mempool/src/network/adapters/waku.rs deleted file mode 100644 index 304bcfe5..00000000 --- a/nomos-services/mempool/src/network/adapters/waku.rs +++ /dev/null @@ -1,120 +0,0 @@ -// std -use std::marker::PhantomData; -// crates -use futures::{Stream, StreamExt}; -use serde::de::DeserializeOwned; -use tokio_stream::wrappers::BroadcastStream; -// internal -use crate::network::NetworkAdapter; -use nomos_core::wire; -use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}; -use nomos_network::{NetworkMsg, NetworkService}; -use overwatch_rs::services::relay::OutboundRelay; -use overwatch_rs::services::ServiceData; -use serde::Serialize; -use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; - -pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = - WakuPubSubTopic::new("CarnotSim", Encoding::Proto); - -pub const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic = - WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto); - -pub struct WakuAdapter { - network_relay: OutboundRelay< as ServiceData>::Message>, - _item: PhantomData, -} - -#[async_trait::async_trait] -impl NetworkAdapter for WakuAdapter -where - Item: DeserializeOwned + Serialize + Send + Sync + 'static, -{ - type Backend = Waku; - type Settings = (); - type Item = Item; - // TODO: implement real key - type Key = (); - - async fn new( - _settings: Self::Settings, - network_relay: OutboundRelay< as ServiceData>::Message>, - ) -> Self { - // Subscribe to the carnot pubsub topic - if let Err((e, _)) = network_relay - .send(NetworkMsg::Process(WakuBackendMessage::RelaySubscribe { - topic: WAKU_CARNOT_PUB_SUB_TOPIC.clone(), - })) - .await - { - // We panic, but as we could try to reconnect later it should not be - // a problem. But definitely something to consider. - panic!("Couldn't send subscribe message to the network service: {e}"); - }; - Self { - network_relay, - _item: Default::default(), - } - } - - async fn transactions_stream( - &self, - ) -> Box + Unpin + Send> { - let (sender, receiver) = tokio::sync::oneshot::channel(); - if let Err((_, _e)) = self - .network_relay - .send(NetworkMsg::Subscribe { - kind: EventKind::Message, - sender, - }) - .await - { - todo!("log error"); - }; - let receiver = receiver.await.unwrap(); - Box::new(Box::pin(BroadcastStream::new(receiver).filter_map( - |event| async move { - match event { - Ok(NetworkEvent::RawMessage(message)) => { - if message.content_topic() == &WAKU_CARNOT_TX_CONTENT_TOPIC { - let item: Self::Item = - wire::deserializer(message.payload()).deserialize().unwrap(); - // TODO: implement real key - Some(((), item)) - } else { - None - } - } - Err(_e) => None, - } - }, - ))) - } - - async fn send(&self, item: Self::Item) { - if let Ok(wire) = wire::serialize(&item) { - if let Err((e, _)) = self - .network_relay - .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { - topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), - message: WakuMessage::new( - wire, - WAKU_CARNOT_TX_CONTENT_TOPIC.clone(), - 1, - chrono::Utc::now() - .timestamp_nanos_opt() - .expect("timestamp should be in valid range") - as usize, - [], - false, - ), - })) - .await - { - tracing::error!("failed to send item to topic: {e}"); - } - } else { - tracing::error!("Failed to serialize item"); - } - } -} diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index 3d1446f7..13c7f90c 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -20,7 +20,6 @@ tokio-stream = "0.1" thiserror = "1.0" tracing = "0.1" rand = { version = "0.7.3", optional = true } -waku-bindings = { version = "0.1.1", optional = true } futures = "0.3" parking_lot = "0.12" nomos-core = { path = "../../nomos-core" } @@ -32,6 +31,5 @@ tokio = { version = "1", features = ["full"] } [features] default = [] -waku = ["waku-bindings"] libp2p = ["nomos-libp2p", "rand", "humantime-serde"] mock = ["rand", "chrono"] diff --git a/nomos-services/network/src/backends/mod.rs b/nomos-services/network/src/backends/mod.rs index dd2a71fe..e3b451c2 100644 --- a/nomos-services/network/src/backends/mod.rs +++ b/nomos-services/network/src/backends/mod.rs @@ -2,9 +2,6 @@ use super::*; use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState}; use tokio::sync::broadcast::Receiver; -#[cfg(feature = "waku")] -pub mod waku; - #[cfg(feature = "libp2p")] pub mod libp2p; diff --git a/nomos-services/network/src/backends/waku.rs b/nomos-services/network/src/backends/waku.rs deleted file mode 100644 index e936670a..00000000 --- a/nomos-services/network/src/backends/waku.rs +++ /dev/null @@ -1,307 +0,0 @@ -// std -use std::fmt::Formatter; -use std::future::Future; -// crates -use futures::Stream; -use serde::{Deserialize, Serialize}; -use tokio::sync::{ - broadcast::{self, Receiver, Sender}, - oneshot, -}; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{debug, error}; -// internal -use super::*; -use overwatch_rs::services::state::NoState; -use waku_bindings::*; - -const BROADCAST_CHANNEL_BUF: usize = 16; - -pub struct Waku { - waku: WakuNodeHandle, - message_event: Sender, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct WakuInfo { - pub listen_addresses: Option>, - pub peer_id: Option, -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct WakuConfig { - #[serde(flatten)] - pub inner: WakuNodeConfig, - pub initial_peers: Vec, -} - -/// Interaction with Waku node -pub enum WakuBackendMessage { - /// Send a message to the network - Broadcast { - message: WakuMessage, - topic: Option, - }, - /// Make a connection to peer at provided multiaddress - ConnectPeer { addr: Multiaddr }, - /// Subscribe to a particular Waku topic - RelaySubscribe { topic: WakuPubSubTopic }, - /// Unsubscribe from a particular Waku topic - RelayUnsubscribe { topic: WakuPubSubTopic }, - /// Get a local cached stream of messages for a particular content topic - ArchiveSubscribe { - query: StoreQuery, - reply_channel: oneshot::Sender + Send + Sync + Unpin>>, - }, - /// Retrieve old messages from another peer - StoreQuery { - query: StoreQuery, - peer_id: PeerId, - reply_channel: oneshot::Sender, - }, - /// Send a message using Waku Light Push - LightpushPublish { - message: WakuMessage, - topic: Option, - peer_id: PeerId, - }, - Info { - reply_channel: oneshot::Sender, - }, -} - -impl Debug for WakuBackendMessage { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - WakuBackendMessage::Broadcast { message, .. } => f - .debug_struct("WakuBackendMessage::Broadcast") - .field("message", message) - .finish(), - WakuBackendMessage::ConnectPeer { addr } => f - .debug_struct("WakuBackendMessage::ConnectPeer") - .field("addr", addr) - .finish(), - WakuBackendMessage::RelaySubscribe { topic } => f - .debug_struct("WakuBackendMessage::RelaySubscribe") - .field("topic", topic) - .finish(), - WakuBackendMessage::RelayUnsubscribe { topic } => f - .debug_struct("WakuBackendMessage::RelayUnsubscribe") - .field("topic", topic) - .finish(), - WakuBackendMessage::ArchiveSubscribe { query, .. } => f - .debug_struct("WakuBackendMessage::ArchiveSubscribe") - .field("query", query) - .finish(), - WakuBackendMessage::StoreQuery { query, peer_id, .. } => f - .debug_struct("WakuBackendMessage::StoreQuery") - .field("query", query) - .field("peer_id", peer_id) - .finish(), - WakuBackendMessage::LightpushPublish { - message, - topic, - peer_id, - } => f - .debug_struct("WakuBackendMessage::LightpushPublish") - .field("message", message) - .field("topic", topic) - .field("peer_id", peer_id) - .finish(), - WakuBackendMessage::Info { .. } => f.debug_struct("WakuBackendMessage::Info").finish(), - } - } -} - -#[derive(Debug)] -pub enum EventKind { - Message, -} - -#[derive(Debug, Clone)] -pub enum NetworkEvent { - RawMessage(WakuMessage), -} - -impl Waku { - pub fn waku_store_query_stream( - &self, - mut query: StoreQuery, - ) -> ( - impl Stream, - impl Future + '_, - ) { - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - let task = async move { - while let Ok(StoreResponse { - messages, - paging_options, - }) = self.waku.local_store_query(&query) - { - // send messages - for message in messages { - // this could fail if the receiver is dropped - // break out of the loop in that case - if sender.send(message).is_err() { - break; - } - } - // stop queries if we do not have any more pages - if let Some(paging_options) = paging_options { - query.paging_options = Some(paging_options); - } else { - break; - } - } - }; - (UnboundedReceiverStream::new(receiver), task) - } -} - -#[async_trait::async_trait] -impl NetworkBackend for Waku { - type Settings = WakuConfig; - type State = NoState; - type Message = WakuBackendMessage; - type EventKind = EventKind; - type NetworkEvent = NetworkEvent; - - fn new(mut config: Self::Settings, _: OverwatchHandle) -> Self { - // set store protocol to active at all times - config.inner.store = Some(true); - let waku = waku_new(Some(config.inner)).unwrap().start().unwrap(); - tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]); - for peer in &config.initial_peers { - if let Err(e) = waku.connect_peer_with_address(peer, None) { - tracing::warn!("Could not connect to {peer}: {e}"); - } - } - - let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0; - let tx = message_event.clone(); - waku_set_event_callback(move |sig| match sig.event() { - Event::WakuMessage(ref msg_event) => { - debug!("received message event"); - if tx - .send(NetworkEvent::RawMessage(msg_event.waku_message().clone())) - .is_err() - { - debug!("no active receiver"); - } - } - _ => tracing::warn!("unsupported event"), - }); - Self { - waku, - message_event, - } - } - - async fn process(&self, msg: Self::Message) { - match msg { - WakuBackendMessage::Broadcast { message, topic } => { - match self.waku.relay_publish_message(&message, topic, None) { - Ok(id) => debug!( - "successfully broadcast message with id: {id}, raw contents: {:?}", - message.payload() - ), - Err(e) => tracing::error!( - "could not broadcast message due to {e}, raw contents {:?}", - message.payload() - ), - } - } - WakuBackendMessage::ConnectPeer { addr } => { - match self.waku.connect_peer_with_address(&addr, None) { - Ok(_) => debug!("successfully connected to {addr}"), - Err(e) => { - tracing::warn!("Could not connect to {addr}: {e}"); - } - } - } - WakuBackendMessage::LightpushPublish { - message, - topic, - peer_id, - } => match self.waku.lightpush_publish(&message, topic, peer_id, None) { - Ok(id) => debug!( - "successfully published lighpush message with id: {id}, raw contents: {:?}", - message.payload() - ), - Err(e) => tracing::error!( - "could not publish lightpush message due to {e}, raw contents {:?}", - message.payload() - ), - }, - WakuBackendMessage::RelaySubscribe { topic } => { - match self.waku.relay_subscribe(Some(topic.clone())) { - Ok(_) => debug!("successfully subscribed to topic {:?}", topic), - Err(e) => { - tracing::error!("could not subscribe to topic {:?} due to {e}", topic) - } - } - } - WakuBackendMessage::RelayUnsubscribe { topic } => { - match self.waku.relay_unsubscribe(Some(topic.clone())) { - Ok(_) => debug!("successfully unsubscribed to topic {:?}", topic), - Err(e) => { - tracing::error!("could not unsubscribe to topic {:?} due to {e}", topic) - } - } - } - WakuBackendMessage::StoreQuery { - query, - peer_id, - reply_channel, - } => match self.waku.store_query(&query, &peer_id, None) { - Ok(res) => { - debug!( - "successfully retrieved stored messages with options {:?}", - query - ); - reply_channel - .send(res) - .unwrap_or_else(|_| error!("client hung up store query handle")); - } - Err(e) => { - error!( - "could not retrieve store messages due to {e}, options: {:?}", - query - ) - } - }, - WakuBackendMessage::Info { reply_channel } => { - let listen_addresses = self.waku.listen_addresses().ok(); - let peer_id = self.waku.peer_id().ok(); - if reply_channel - .send(WakuInfo { - listen_addresses, - peer_id, - }) - .is_err() - { - error!("could not send waku info"); - } - } - WakuBackendMessage::ArchiveSubscribe { - reply_channel, - query, - } => { - let (stream, task) = self.waku_store_query_stream(query); - if reply_channel.send(Box::new(stream)).is_err() { - error!("could not send archive subscribe stream"); - } - task.await; - } - }; - } - - async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver { - match kind { - EventKind::Message => { - debug!("processed subscription to incoming messages"); - self.message_event.subscribe() - } - } - } -} diff --git a/testnet/Dockerfile b/testnet/Dockerfile index bd51b966..922b2db4 100644 --- a/testnet/Dockerfile +++ b/testnet/Dockerfile @@ -6,12 +6,9 @@ FROM rust:1.72.0-slim-bullseye AS builder RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \ >> /etc/apt/sources.list -# Dependecies for publishing documentation and building waku-bindings. +# Dependecies for publishing documentation. RUN apt-get update && apt-get install -yq \ - git clang etcd-client \ - golang-src/bullseye-backports \ - golang-doc/bullseye-backports \ - golang/bullseye-backports + git clang etcd-client WORKDIR /nomos COPY . . diff --git a/tests/Cargo.toml b/tests/Cargo.toml index bdc9cf3a..4ae60fb7 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -24,7 +24,6 @@ mixnet-topology = { path = "../mixnet/topology" } rand = "0.7.3" once_cell = "1" secp256k1 = { version = "0.26", features = ["rand"] } -waku-bindings = { version = "0.1.1", optional = true } reqwest = { version = "0.11", features = ["json"] } nomos-libp2p = { path = "../nomos-libp2p" } tempfile = "3.6" diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 731eafd0..379dae0d 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -14,8 +14,6 @@ use nomos_http::backends::axum::AxumBackendSettings; use nomos_libp2p::Multiaddr; use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo}; -#[cfg(feature = "waku")] -use nomos_network::backends::waku::{WakuConfig, WakuInfo}; use nomos_network::NetworkConfig; use nomos_node::Config; // crates