remove waku from the codebase (#446)

This commit is contained in:
Al Liu 2023-10-02 16:41:08 +08:00 committed by GitHub
parent d67d08e81d
commit 4cf57eee74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 13 additions and 790 deletions

View File

@ -16,7 +16,7 @@ jobs:
strategy: strategy:
fail-fast: true fail-fast: true
matrix: matrix:
feature: [libp2p, waku] feature: [libp2p]
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
with: with:
@ -37,16 +37,13 @@ jobs:
strategy: strategy:
fail-fast: false # all OSes should be tested even if one fails (default: true) fail-fast: false # all OSes should be tested even if one fails (default: true)
matrix: matrix:
feature: [libp2p, waku] feature: [libp2p]
os: [ubuntu-latest, windows-latest, macos-latest] os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
with: with:
submodules: true submodules: true
- uses: actions/setup-go@v3 # we need go to build go-waku
with:
go-version: '1.20'
# Setup Rust toolchain with GNU for Windows # Setup Rust toolchain with GNU for Windows
- name: Setup Rust with GNU toolchain (Windows) - name: Setup Rust with GNU toolchain (Windows)
if: matrix.os == 'windows-latest' if: matrix.os == 'windows-latest'
@ -81,7 +78,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
feature: [libp2p, waku] feature: [libp2p]
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
with: with:

View File

@ -17,10 +17,7 @@ jobs:
with: with:
submodules: true submodules: true
- name: Checkout submodules - name: Checkout submodules
run: git submodule update --init --recursive run: git submodule update --init --recursive
- uses: actions/setup-go@v3 # we need go to build go-waku
with:
go-version: '1.19'
- uses: actions-rs/toolchain@v1 - uses: actions-rs/toolchain@v1
with: with:
profile: minimal profile: minimal

View File

@ -6,12 +6,9 @@ FROM rust:1.72.0-slim-bullseye AS builder
RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \ RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \
>> /etc/apt/sources.list >> /etc/apt/sources.list
# Dependecies for publishing documentation and building waku-bindings. # Dependecies for publishing documentation.
RUN apt-get update && apt-get install -yq \ RUN apt-get update && apt-get install -yq \
git clang \ git clang
golang-src/bullseye-backports \
golang-doc/bullseye-backports \
golang/bullseye-backports
WORKDIR /nomos WORKDIR /nomos
COPY . . COPY . .

View File

@ -8,12 +8,9 @@ LABEL maintainer="augustinas@status.im" \
RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \ RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \
>> /etc/apt/sources.list >> /etc/apt/sources.list
# Dependecies for publishing documentation and building waku-bindings. # Dependecies for publishing documentation.
RUN apt-get update && apt-get install -yq \ RUN apt-get update && apt-get install -yq \
libssl-dev openssh-client git python3-pip clang \ libssl-dev openssh-client git python3-pip clang
golang-src/bullseye-backports \
golang-doc/bullseye-backports \
golang/bullseye-backports
RUN pip install ghp-import RUN pip install ghp-import
RUN rustup component add rustfmt clippy RUN rustup component add rustfmt clippy

View File

@ -35,7 +35,7 @@ pipeline {
axes { axes {
axis { axis {
name 'FEATURE' name 'FEATURE'
values 'waku', 'libp2p' values 'libp2p'
} }
} }
stages { stages {

View File

@ -26,7 +26,7 @@ pipeline {
axes { axes {
axis { axis {
name 'FEATURES' name 'FEATURES'
values 'waku', 'libp2p' values 'libp2p'
} }
} }
stages { stages {

View File

@ -25,7 +25,7 @@ pipeline {
axes { axes {
axis { axis {
name 'FEATURES' name 'FEATURES'
values 'waku', 'libp2p' values 'libp2p'
} }
} }
stages { stages {

View File

@ -7,7 +7,6 @@ Nomos blockchain node
Nomos node can be configured with one of the following network backends: Nomos node can be configured with one of the following network backends:
- [libp2p](../../nomos-services/backends/libp2p.rs) - [libp2p](../../nomos-services/backends/libp2p.rs)
- [Waku](../../nomos-services/backends/waku.rs)
### Mixclient integration ### Mixclient integration

View File

@ -23,7 +23,6 @@ tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tokio-util = "0.7" tokio-util = "0.7"
tracing = "0.1" tracing = "0.1"
waku-bindings = { version = "0.1.1", optional = true }
bls-signatures = "0.14" bls-signatures = "0.14"
serde_with = "3.0.0" serde_with = "3.0.0"
nomos-libp2p = { path = "../../nomos-libp2p", optional = true } nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
@ -31,7 +30,6 @@ blake2 = "0.10"
[features] [features]
default = [] default = []
waku = ["nomos-network/waku", "waku-bindings"]
mock = ["nomos-network/mock"] mock = ["nomos-network/mock"]
libp2p = ["nomos-network/libp2p", "nomos-libp2p"] libp2p = ["nomos-network/libp2p", "nomos-libp2p"]

View File

@ -2,5 +2,3 @@
pub mod libp2p; pub mod libp2p;
#[cfg(feature = "mock")] #[cfg(feature = "mock")]
pub mod mock; pub mod mock;
#[cfg(feature = "waku")]
pub mod waku;

View File

@ -1,317 +0,0 @@
// std
use std::borrow::Cow;
// crates
use futures::{Stream, StreamExt};
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{
messages::{ProposalMsg, VoteMsg},
BoxedStream, NetworkAdapter,
};
use consensus_engine::{BlockId, Committee, View};
use nomos_network::{
backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage},
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use waku_bindings::{
ContentFilter, Encoding, StoreQuery, WakuContentTopic, WakuMessage, WakuPubSubTopic,
};
pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
const APPLICATION_NAME: &str = "CarnotSim";
const VERSION: usize = 1;
#[derive(Clone)]
pub struct WakuAdapter {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
}
impl WakuAdapter {
async fn message_subscriber_channel(
&self,
) -> Result<
tokio::sync::broadcast::Receiver<NetworkEvent>,
tokio::sync::oneshot::error::RecvError,
> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
{
todo!("log error");
};
receiver.await
}
async fn archive_subscriber_stream(
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
content_topic: WakuContentTopic,
) -> Result<BoxedStream<WakuMessage>, tokio::sync::oneshot::error::RecvError> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, _e)) = network_relay
.send(NetworkMsg::Process(WakuBackendMessage::ArchiveSubscribe {
query: StoreQuery {
pubsub_topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
content_filters: vec![ContentFilter::new(content_topic)],
// TODO: maybe handle limits through configuration
start_time: None,
end_time: None,
paging_options: None,
},
reply_channel: sender,
}))
.await
{
todo!("log error");
};
receiver.await
}
async fn cached_stream_with_content_topic(
&self,
content_topic: WakuContentTopic,
) -> impl Stream<Item = WakuMessage> {
// create stream request tasks
let live_stream_channel = self
.message_subscriber_channel()
.await
.expect("live stream channel from waku network");
struct InnerState {
first: bool,
topic: WakuContentTopic,
relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
}
let state = InnerState {
first: true,
topic: content_topic.clone(),
relay: self.network_relay.clone(),
};
// Sometimes waku takes a while make a message available in the archive, so we keep polling the archive until we get the message we want.
// This stream will generate a new archive stream every 100ms until the message is found, chaining them together.
// We expect this to be a rare occurrence, normal operation would result in this future being discarded before we even try to make a second request.
let archive_stream = futures::stream::unfold(state, |mut state| async move {
if !state.first {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
state.first = false;
Some((
Self::archive_subscriber_stream(state.relay.clone(), state.topic.clone())
.await
.expect("archive stream from waku network"),
state,
))
})
.flatten();
let live_stream = BroadcastStream::new(live_stream_channel)
.zip(futures::stream::repeat(content_topic))
.filter_map(|(msg, content_topic)| async move {
match msg {
Ok(NetworkEvent::RawMessage(message))
if message.content_topic() == &content_topic =>
{
Some(message)
}
_ => None,
}
});
tokio_stream::StreamExt::merge(live_stream, archive_stream)
}
async fn inner_broadcast(&self, payload: Box<[u8]>, content_topic: WakuContentTopic) {
let message = WakuMessage::new(
payload,
content_topic,
1,
chrono::Utc::now()
.timestamp_nanos_opt()
.expect("timestamp should be in valid range") as usize,
[],
false,
);
if let Err((_, e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC),
}))
.await
{
tracing::error!("waku message send error: {e:?}");
};
}
}
#[async_trait::async_trait]
impl NetworkAdapter for WakuAdapter {
type Backend = Waku;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self { network_relay }
}
async fn proposal_chunks_stream(&self, view: View) -> BoxedStream<ProposalMsg> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(create_topic(PROPOSAL_TAG, None))
.await
.filter_map(move |message| {
let payload = message.payload();
let proposal = ProposalMsg::from_bytes(payload);
async move {
if view == proposal.view {
Some(proposal)
} else {
None
}
}
}),
))
}
async fn broadcast(&self, message: NetworkMessage) {
let topic = create_topic(message_tag(&message), None);
self.inner_broadcast(unwrap_message_to_bytes(&message), topic)
.await
}
async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream<TimeoutMsg> {
let content_topic = create_topic(TIMEOUT_TAG, Some(committee));
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let timeout = TimeoutMsg::from_bytes(payload);
async move {
if timeout.vote.view == view {
Some(timeout)
} else {
None
}
}
}),
))
}
async fn timeout_qc_stream(&self, view: View) -> BoxedStream<TimeoutQcMsg> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(create_topic(TIMEOUT_QC_TAG, None))
.await
.filter_map(move |message| {
let payload = message.payload();
let qc = TimeoutQcMsg::from_bytes(payload);
async move {
if qc.qc.view() == view {
Some(qc)
} else {
None
}
}
}),
))
}
async fn votes_stream(
&self,
committee: &Committee,
view: View,
proposal_id: BlockId,
) -> BoxedStream<VoteMsg> {
let content_topic = create_topic(VOTE_TAG, Some(committee));
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let vote = VoteMsg::from_bytes(payload);
async move {
if vote.vote.block == proposal_id && vote.vote.view == view {
Some(vote)
} else {
None
}
}
}),
))
}
async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream<NewViewMsg> {
let content_topic = create_topic(NEW_VIEW_TAG, Some(committee));
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let new_view = NewViewMsg::from_bytes(payload);
async move {
if new_view.vote.view == view {
Some(new_view)
} else {
None
}
}
}),
))
}
async fn send(&self, message: NetworkMessage, committee: &Committee) {
let topic = create_topic(message_tag(&message), Some(committee));
self.inner_broadcast(unwrap_message_to_bytes(&message), topic)
.await
}
}
fn create_topic(tag: &str, committee: Option<&Committee>) -> WakuContentTopic {
WakuContentTopic {
application_name: Cow::Borrowed(APPLICATION_NAME),
version: VERSION,
content_topic_name: Cow::Owned(format!(
"{}{}",
tag,
committee
.map(|c| format!("-{}", c.id::<blake2::Blake2s256>()))
.unwrap_or_default()
)),
encoding: Encoding::Proto,
}
}
// since we use content topic to filter messages, we can remove the tag from the message
fn unwrap_message_to_bytes(message: &NetworkMessage) -> Box<[u8]> {
match message {
NetworkMessage::NewView(msg) => msg.as_bytes(),
NetworkMessage::Proposal(msg) => msg.as_bytes(),
NetworkMessage::Vote(msg) => msg.as_bytes(),
NetworkMessage::Timeout(msg) => msg.as_bytes(),
NetworkMessage::TimeoutQc(msg) => msg.as_bytes(),
}
}
fn message_tag(message: &NetworkMessage) -> &str {
match message {
NetworkMessage::NewView(_) => NEW_VIEW_TAG,
NetworkMessage::Proposal(_) => PROPOSAL_TAG,
NetworkMessage::Vote(_) => VOTE_TAG,
NetworkMessage::Timeout(_) => TIMEOUT_TAG,
NetworkMessage::TimeoutQc(_) => TIMEOUT_QC_TAG,
}
}
const NEW_VIEW_TAG: &str = "new-view";
const PROPOSAL_TAG: &str = "proposal";
const VOTE_TAG: &str = "vote";
const TIMEOUT_TAG: &str = "timeout";
const TIMEOUT_QC_TAG: &str = "timeout-qc";

View File

@ -19,7 +19,6 @@ thiserror = "1.0"
tracing = "0.1" tracing = "0.1"
tokio = { version = "1", features = ["sync", "macros"] } tokio = { version = "1", features = ["sync", "macros"] }
tokio-stream = "0.1" tokio-stream = "0.1"
waku-bindings = { version = "0.1.1", optional = true}
chrono = "0.4" chrono = "0.4"
[dev-dependencies] [dev-dependencies]
@ -30,6 +29,5 @@ blake2 = "0.10"
[features] [features]
default = [] default = []
waku = ["nomos-network/waku", "waku-bindings"]
mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"] mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"]
libp2p = ["nomos-network/libp2p"] libp2p = ["nomos-network/libp2p"]

View File

@ -1,6 +1,3 @@
#[cfg(feature = "waku")]
pub mod waku;
#[cfg(feature = "libp2p")] #[cfg(feature = "libp2p")]
pub mod libp2p; pub mod libp2p;

View File

@ -1,120 +0,0 @@
// std
use std::marker::PhantomData;
// crates
use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::NetworkAdapter;
use nomos_core::wire;
use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use serde::Serialize;
use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic};
pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
pub const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto);
pub struct WakuAdapter<Item> {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
_item: PhantomData<Item>,
}
#[async_trait::async_trait]
impl<Item> NetworkAdapter for WakuAdapter<Item>
where
Item: DeserializeOwned + Serialize + Send + Sync + 'static,
{
type Backend = Waku;
type Settings = ();
type Item = Item;
// TODO: implement real key
type Key = ();
async fn new(
_settings: Self::Settings,
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
// Subscribe to the carnot pubsub topic
if let Err((e, _)) = network_relay
.send(NetworkMsg::Process(WakuBackendMessage::RelaySubscribe {
topic: WAKU_CARNOT_PUB_SUB_TOPIC.clone(),
}))
.await
{
// We panic, but as we could try to reconnect later it should not be
// a problem. But definitely something to consider.
panic!("Couldn't send subscribe message to the network service: {e}");
};
Self {
network_relay,
_item: Default::default(),
}
}
async fn transactions_stream(
&self,
) -> Box<dyn Stream<Item = (Self::Key, Self::Item)> + Unpin + Send> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
{
todo!("log error");
};
let receiver = receiver.await.unwrap();
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
|event| async move {
match event {
Ok(NetworkEvent::RawMessage(message)) => {
if message.content_topic() == &WAKU_CARNOT_TX_CONTENT_TOPIC {
let item: Self::Item =
wire::deserializer(message.payload()).deserialize().unwrap();
// TODO: implement real key
Some(((), item))
} else {
None
}
}
Err(_e) => None,
}
},
)))
}
async fn send(&self, item: Self::Item) {
if let Ok(wire) = wire::serialize(&item) {
if let Err((e, _)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
message: WakuMessage::new(
wire,
WAKU_CARNOT_TX_CONTENT_TOPIC.clone(),
1,
chrono::Utc::now()
.timestamp_nanos_opt()
.expect("timestamp should be in valid range")
as usize,
[],
false,
),
}))
.await
{
tracing::error!("failed to send item to topic: {e}");
}
} else {
tracing::error!("Failed to serialize item");
}
}
}

View File

@ -20,7 +20,6 @@ tokio-stream = "0.1"
thiserror = "1.0" thiserror = "1.0"
tracing = "0.1" tracing = "0.1"
rand = { version = "0.7.3", optional = true } rand = { version = "0.7.3", optional = true }
waku-bindings = { version = "0.1.1", optional = true }
futures = "0.3" futures = "0.3"
parking_lot = "0.12" parking_lot = "0.12"
nomos-core = { path = "../../nomos-core" } nomos-core = { path = "../../nomos-core" }
@ -32,6 +31,5 @@ tokio = { version = "1", features = ["full"] }
[features] [features]
default = [] default = []
waku = ["waku-bindings"]
libp2p = ["nomos-libp2p", "rand", "humantime-serde"] libp2p = ["nomos-libp2p", "rand", "humantime-serde"]
mock = ["rand", "chrono"] mock = ["rand", "chrono"]

View File

@ -2,9 +2,6 @@ use super::*;
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState}; use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState};
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
#[cfg(feature = "waku")]
pub mod waku;
#[cfg(feature = "libp2p")] #[cfg(feature = "libp2p")]
pub mod libp2p; pub mod libp2p;

View File

@ -1,307 +0,0 @@
// std
use std::fmt::Formatter;
use std::future::Future;
// crates
use futures::Stream;
use serde::{Deserialize, Serialize};
use tokio::sync::{
broadcast::{self, Receiver, Sender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error};
// internal
use super::*;
use overwatch_rs::services::state::NoState;
use waku_bindings::*;
const BROADCAST_CHANNEL_BUF: usize = 16;
pub struct Waku {
waku: WakuNodeHandle<Running>,
message_event: Sender<NetworkEvent>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct WakuInfo {
pub listen_addresses: Option<Vec<Multiaddr>>,
pub peer_id: Option<PeerId>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WakuConfig {
#[serde(flatten)]
pub inner: WakuNodeConfig,
pub initial_peers: Vec<Multiaddr>,
}
/// Interaction with Waku node
pub enum WakuBackendMessage {
/// Send a message to the network
Broadcast {
message: WakuMessage,
topic: Option<WakuPubSubTopic>,
},
/// Make a connection to peer at provided multiaddress
ConnectPeer { addr: Multiaddr },
/// Subscribe to a particular Waku topic
RelaySubscribe { topic: WakuPubSubTopic },
/// Unsubscribe from a particular Waku topic
RelayUnsubscribe { topic: WakuPubSubTopic },
/// Get a local cached stream of messages for a particular content topic
ArchiveSubscribe {
query: StoreQuery,
reply_channel: oneshot::Sender<Box<dyn Stream<Item = WakuMessage> + Send + Sync + Unpin>>,
},
/// Retrieve old messages from another peer
StoreQuery {
query: StoreQuery,
peer_id: PeerId,
reply_channel: oneshot::Sender<StoreResponse>,
},
/// Send a message using Waku Light Push
LightpushPublish {
message: WakuMessage,
topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
},
Info {
reply_channel: oneshot::Sender<WakuInfo>,
},
}
impl Debug for WakuBackendMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
WakuBackendMessage::Broadcast { message, .. } => f
.debug_struct("WakuBackendMessage::Broadcast")
.field("message", message)
.finish(),
WakuBackendMessage::ConnectPeer { addr } => f
.debug_struct("WakuBackendMessage::ConnectPeer")
.field("addr", addr)
.finish(),
WakuBackendMessage::RelaySubscribe { topic } => f
.debug_struct("WakuBackendMessage::RelaySubscribe")
.field("topic", topic)
.finish(),
WakuBackendMessage::RelayUnsubscribe { topic } => f
.debug_struct("WakuBackendMessage::RelayUnsubscribe")
.field("topic", topic)
.finish(),
WakuBackendMessage::ArchiveSubscribe { query, .. } => f
.debug_struct("WakuBackendMessage::ArchiveSubscribe")
.field("query", query)
.finish(),
WakuBackendMessage::StoreQuery { query, peer_id, .. } => f
.debug_struct("WakuBackendMessage::StoreQuery")
.field("query", query)
.field("peer_id", peer_id)
.finish(),
WakuBackendMessage::LightpushPublish {
message,
topic,
peer_id,
} => f
.debug_struct("WakuBackendMessage::LightpushPublish")
.field("message", message)
.field("topic", topic)
.field("peer_id", peer_id)
.finish(),
WakuBackendMessage::Info { .. } => f.debug_struct("WakuBackendMessage::Info").finish(),
}
}
}
#[derive(Debug)]
pub enum EventKind {
Message,
}
#[derive(Debug, Clone)]
pub enum NetworkEvent {
RawMessage(WakuMessage),
}
impl Waku {
pub fn waku_store_query_stream(
&self,
mut query: StoreQuery,
) -> (
impl Stream<Item = WakuMessage>,
impl Future<Output = ()> + '_,
) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let task = async move {
while let Ok(StoreResponse {
messages,
paging_options,
}) = self.waku.local_store_query(&query)
{
// send messages
for message in messages {
// this could fail if the receiver is dropped
// break out of the loop in that case
if sender.send(message).is_err() {
break;
}
}
// stop queries if we do not have any more pages
if let Some(paging_options) = paging_options {
query.paging_options = Some(paging_options);
} else {
break;
}
}
};
(UnboundedReceiverStream::new(receiver), task)
}
}
#[async_trait::async_trait]
impl NetworkBackend for Waku {
type Settings = WakuConfig;
type State = NoState<WakuConfig>;
type Message = WakuBackendMessage;
type EventKind = EventKind;
type NetworkEvent = NetworkEvent;
fn new(mut config: Self::Settings, _: OverwatchHandle) -> Self {
// set store protocol to active at all times
config.inner.store = Some(true);
let waku = waku_new(Some(config.inner)).unwrap().start().unwrap();
tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]);
for peer in &config.initial_peers {
if let Err(e) = waku.connect_peer_with_address(peer, None) {
tracing::warn!("Could not connect to {peer}: {e}");
}
}
let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0;
let tx = message_event.clone();
waku_set_event_callback(move |sig| match sig.event() {
Event::WakuMessage(ref msg_event) => {
debug!("received message event");
if tx
.send(NetworkEvent::RawMessage(msg_event.waku_message().clone()))
.is_err()
{
debug!("no active receiver");
}
}
_ => tracing::warn!("unsupported event"),
});
Self {
waku,
message_event,
}
}
async fn process(&self, msg: Self::Message) {
match msg {
WakuBackendMessage::Broadcast { message, topic } => {
match self.waku.relay_publish_message(&message, topic, None) {
Ok(id) => debug!(
"successfully broadcast message with id: {id}, raw contents: {:?}",
message.payload()
),
Err(e) => tracing::error!(
"could not broadcast message due to {e}, raw contents {:?}",
message.payload()
),
}
}
WakuBackendMessage::ConnectPeer { addr } => {
match self.waku.connect_peer_with_address(&addr, None) {
Ok(_) => debug!("successfully connected to {addr}"),
Err(e) => {
tracing::warn!("Could not connect to {addr}: {e}");
}
}
}
WakuBackendMessage::LightpushPublish {
message,
topic,
peer_id,
} => match self.waku.lightpush_publish(&message, topic, peer_id, None) {
Ok(id) => debug!(
"successfully published lighpush message with id: {id}, raw contents: {:?}",
message.payload()
),
Err(e) => tracing::error!(
"could not publish lightpush message due to {e}, raw contents {:?}",
message.payload()
),
},
WakuBackendMessage::RelaySubscribe { topic } => {
match self.waku.relay_subscribe(Some(topic.clone())) {
Ok(_) => debug!("successfully subscribed to topic {:?}", topic),
Err(e) => {
tracing::error!("could not subscribe to topic {:?} due to {e}", topic)
}
}
}
WakuBackendMessage::RelayUnsubscribe { topic } => {
match self.waku.relay_unsubscribe(Some(topic.clone())) {
Ok(_) => debug!("successfully unsubscribed to topic {:?}", topic),
Err(e) => {
tracing::error!("could not unsubscribe to topic {:?} due to {e}", topic)
}
}
}
WakuBackendMessage::StoreQuery {
query,
peer_id,
reply_channel,
} => match self.waku.store_query(&query, &peer_id, None) {
Ok(res) => {
debug!(
"successfully retrieved stored messages with options {:?}",
query
);
reply_channel
.send(res)
.unwrap_or_else(|_| error!("client hung up store query handle"));
}
Err(e) => {
error!(
"could not retrieve store messages due to {e}, options: {:?}",
query
)
}
},
WakuBackendMessage::Info { reply_channel } => {
let listen_addresses = self.waku.listen_addresses().ok();
let peer_id = self.waku.peer_id().ok();
if reply_channel
.send(WakuInfo {
listen_addresses,
peer_id,
})
.is_err()
{
error!("could not send waku info");
}
}
WakuBackendMessage::ArchiveSubscribe {
reply_channel,
query,
} => {
let (stream, task) = self.waku_store_query_stream(query);
if reply_channel.send(Box::new(stream)).is_err() {
error!("could not send archive subscribe stream");
}
task.await;
}
};
}
async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver<Self::NetworkEvent> {
match kind {
EventKind::Message => {
debug!("processed subscription to incoming messages");
self.message_event.subscribe()
}
}
}
}

View File

@ -6,12 +6,9 @@ FROM rust:1.72.0-slim-bullseye AS builder
RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \ RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \
>> /etc/apt/sources.list >> /etc/apt/sources.list
# Dependecies for publishing documentation and building waku-bindings. # Dependecies for publishing documentation.
RUN apt-get update && apt-get install -yq \ RUN apt-get update && apt-get install -yq \
git clang etcd-client \ git clang etcd-client
golang-src/bullseye-backports \
golang-doc/bullseye-backports \
golang/bullseye-backports
WORKDIR /nomos WORKDIR /nomos
COPY . . COPY . .

View File

@ -24,7 +24,6 @@ mixnet-topology = { path = "../mixnet/topology" }
rand = "0.7.3" rand = "0.7.3"
once_cell = "1" once_cell = "1"
secp256k1 = { version = "0.26", features = ["rand"] } secp256k1 = { version = "0.26", features = ["rand"] }
waku-bindings = { version = "0.1.1", optional = true }
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
nomos-libp2p = { path = "../nomos-libp2p" } nomos-libp2p = { path = "../nomos-libp2p" }
tempfile = "3.6" tempfile = "3.6"

View File

@ -14,8 +14,6 @@ use nomos_http::backends::axum::AxumBackendSettings;
use nomos_libp2p::Multiaddr; use nomos_libp2p::Multiaddr;
use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo}; use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo};
#[cfg(feature = "waku")]
use nomos_network::backends::waku::{WakuConfig, WakuInfo};
use nomos_network::NetworkConfig; use nomos_network::NetworkConfig;
use nomos_node::Config; use nomos_node::Config;
// crates // crates