From 61de96a5d374a0b762cfcf37c20801ee145ef762 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Mon, 21 Nov 2022 12:03:47 +0100 Subject: [PATCH] Add Waku network backend (#3) * Add Waku network backend Add Waku as the first supported network backend and rework API. In particular, the network message now depends on the underlying network backend so that it can properly reflects specificities of the protocol. Another choice could have been to hardwire domain-specific actions in the network message type (e.g. send block, send message, ...) but was discarded in favor of a more general network service. * address review comments * add debug impl * add Serialize/Deserialize to network settings * add waku functionalities * add a little bit of documentation --- nomos-services/Cargo.toml | 5 +- nomos-services/src/network/backends/mod.rs | 13 +- nomos-services/src/network/backends/waku.rs | 175 ++++++++++++++++++++ nomos-services/src/network/mod.rs | 93 ++++++----- 4 files changed, 242 insertions(+), 44 deletions(-) create mode 100644 nomos-services/src/network/backends/waku.rs diff --git a/nomos-services/Cargo.toml b/nomos-services/Cargo.toml index 3c00d4e4..b1976139 100644 --- a/nomos-services/Cargo.toml +++ b/nomos-services/Cargo.toml @@ -13,7 +13,10 @@ serde = "1.0" tokio = { version = "1", features = ["sync"] } thiserror = "1.0" tracing = "0.1" +waku = { git = "https://github.com/waku-org/waku-rust-bindings" } +multiaddr = "0.15" + [features] default = [] -mock = [] \ No newline at end of file +mock = [] diff --git a/nomos-services/src/network/backends/mod.rs b/nomos-services/src/network/backends/mod.rs index aaaed677..d9c62f96 100644 --- a/nomos-services/src/network/backends/mod.rs +++ b/nomos-services/src/network/backends/mod.rs @@ -2,11 +2,18 @@ use super::*; use overwatch::services::state::ServiceState; use tokio::sync::broadcast::Receiver; +mod waku; +pub use self::waku::Waku; + +#[async_trait::async_trait] pub trait NetworkBackend { - type Config: Clone + Send + Sync + 'static; + type Config: Clone + Debug + Send + Sync + 'static; type State: ServiceState + Clone; + type Message: Debug + Send + Sync + 'static; + type EventKind: Debug + Send + Sync + 'static; + type NetworkEvent: Debug + Send + Sync + 'static; fn new(config: Self::Config) -> Self; - fn broadcast(&self, msg: NetworkData); - fn subscribe(&mut self, event: EventKind) -> Receiver; + async fn process(&self, msg: Self::Message); + async fn subscribe(&mut self, event: Self::EventKind) -> Receiver; } diff --git a/nomos-services/src/network/backends/waku.rs b/nomos-services/src/network/backends/waku.rs new file mode 100644 index 00000000..cdac8ed3 --- /dev/null +++ b/nomos-services/src/network/backends/waku.rs @@ -0,0 +1,175 @@ +use super::*; +use ::waku::*; +use overwatch::services::state::NoState; +use serde::{Deserialize, Serialize}; +use tokio::sync::{ + broadcast::{self, Receiver, Sender}, + oneshot, +}; +use tracing::{debug, error}; + +const BROADCAST_CHANNEL_BUF: usize = 16; + +pub struct Waku { + waku: WakuNodeHandle, + message_event: Sender, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct WakuConfig { + #[serde(flatten)] + inner: WakuNodeConfig, + initial_peers: Vec, +} + +/// Interaction with Waku node +#[derive(Debug)] +pub enum WakuBackendMessage { + /// Send a message to the network + Broadcast { + message: WakuMessage, + topic: Option, + }, + /// Subscribe to a particular Waku topic + RelaySubscribe { topic: WakuPubSubTopic }, + /// Unsubscribe from a particular Waku topic + RelayUnsubscribe { topic: WakuPubSubTopic }, + /// Retrieve old messages from another peer + StoreQuery { + query: StoreQuery, + peer_id: PeerId, + response: oneshot::Sender, + }, + /// Send a message using Waku Light Push + LightpushPublish { + message: WakuMessage, + topic: Option, + peer_id: PeerId, + }, +} + +#[derive(Debug)] +pub enum EventKind { + Message, +} + +#[derive(Debug, Clone)] +pub enum NetworkEvent { + RawMessage(WakuMessage), +} + +#[async_trait::async_trait] +impl NetworkBackend for Waku { + type Config = WakuConfig; + type State = NoState; + type Message = WakuBackendMessage; + type EventKind = EventKind; + type NetworkEvent = NetworkEvent; + + fn new(config: Self::Config) -> Self { + let waku = waku_new(Some(config.inner)).unwrap().start().unwrap(); + waku.relay_subscribe(None).unwrap(); + tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]); + for peer in &config.initial_peers { + if let Err(e) = waku.connect_peer_with_address(peer, None) { + tracing::warn!("Could not connect to {peer}: {e}"); + } + } + + let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0; + let tx = message_event.clone(); + waku_set_event_callback(move |sig| match sig.event() { + Event::WakuMessage(ref msg_event) => { + debug!("received message event"); + if tx + .send(NetworkEvent::RawMessage(msg_event.waku_message().clone())) + .is_err() + { + debug!("no active receiver"); + } + } + _ => tracing::warn!("unsupported event"), + }); + Self { + waku, + message_event, + } + } + + async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver { + match kind { + EventKind::Message => { + debug!("processed subscription to incoming messages"); + self.message_event.subscribe() + } + } + } + + async fn process(&self, msg: Self::Message) { + match msg { + WakuBackendMessage::Broadcast { message, topic } => { + match self.waku.relay_publish_message(&message, topic, None) { + Ok(id) => debug!( + "successfully broadcast message with id: {id}, raw contents: {:?}", + message.payload() + ), + Err(e) => tracing::error!( + "could not broadcast message due to {e}, raw contents {:?}", + message.payload() + ), + } + } + WakuBackendMessage::LightpushPublish { + message, + topic, + peer_id, + } => match self.waku.lightpush_publish(&message, topic, peer_id, None) { + Ok(id) => debug!( + "successfully published lighpush message with id: {id}, raw contents: {:?}", + message.payload() + ), + Err(e) => tracing::error!( + "could not publish lightpush message due to {e}, raw contents {:?}", + message.payload() + ), + }, + WakuBackendMessage::RelaySubscribe { topic } => { + match self.waku.relay_subscribe(Some(topic.clone())) { + Ok(_) => debug!("successfully subscribed to topic {:?}", topic), + Err(e) => { + tracing::error!("could not subscribe to topic {:?} due to {e}", topic) + } + } + } + WakuBackendMessage::RelayUnsubscribe { topic } => { + match self.waku.relay_unsubscribe(Some(topic.clone())) { + Ok(_) => debug!("successfully unsubscribed to topic {:?}", topic), + Err(e) => { + tracing::error!("could not unsubscribe to topic {:?} due to {e}", topic) + } + } + } + WakuBackendMessage::StoreQuery { + query, + peer_id, + response, + } => match self.waku.store_query(&query, &peer_id, None) { + Ok(res) => { + debug!( + "successfully retrieved stored messages with options {:?}", + query + ); + response + .send(res) + .unwrap_or_else(|_| error!("client hung up store query handle")); + } + Err(e) => { + error!( + "could not retrieve store messages due to {e}, options: {:?}", + query + ) + } + }, + }; + } +} diff --git a/nomos-services/src/network/mod.rs b/nomos-services/src/network/mod.rs index b4b56490..8919b595 100644 --- a/nomos-services/src/network/mod.rs +++ b/nomos-services/src/network/mod.rs @@ -8,59 +8,67 @@ use overwatch::services::{ state::{NoOperator, ServiceState}, ServiceCore, ServiceData, ServiceId, }; -use std::fmt::Debug; +use serde::{Deserialize, Serialize}; +use std::fmt::{self, Debug}; use tokio::sync::broadcast; use tokio::sync::oneshot; -pub type NetworkData = Box<[u8]>; - -#[derive(Debug)] -pub enum NetworkMsg { - Broadcast(NetworkData), +pub enum NetworkMsg { + Process(B::Message), Subscribe { - kind: EventKind, - sender: oneshot::Sender>, + kind: B::EventKind, + sender: oneshot::Sender>, }, } -impl RelayMessage for NetworkMsg {} - -#[derive(Debug)] -pub enum EventKind { - Message, +impl Debug for NetworkMsg { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Process(msg) => write!(fmt, "NetworkMsg::Process({:?})", msg), + Self::Subscribe { kind, sender } => write!( + fmt, + "NetworkMsg::Subscribe{{ kind: {:?}, sender: {:?}}}", + kind, sender + ), + } + } } -#[derive(Debug)] -pub enum NetworkEvent { - RawMessage(NetworkData), +impl RelayMessage for NetworkMsg {} + +#[derive(Serialize, Deserialize)] +pub struct NetworkConfig { + pub backend: B::Config, } -pub struct NetworkConfig { - pub backend: I::Config, +impl Debug for NetworkConfig { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "NetworkConfig {{ backend: {:?}}}", self.backend) + } } -pub struct NetworkService { - backend: I, +pub struct NetworkService { + backend: B, service_state: ServiceStateHandle, } -pub struct NetworkState { - _backend: I::State, +pub struct NetworkState { + _backend: B::State, } -impl ServiceData for NetworkService { +impl ServiceData for NetworkService { const SERVICE_ID: ServiceId = "Network"; - type Settings = NetworkConfig; - type State = NetworkState; + type Settings = NetworkConfig; + type State = NetworkState; type StateOperator = NoOperator; - type Message = NetworkMsg; + type Message = NetworkMsg; } #[async_trait] -impl ServiceCore for NetworkService { +impl ServiceCore for NetworkService { fn init(mut service_state: ServiceStateHandle) -> Self { Self { - backend: ::new( + backend: ::new( service_state.settings_reader.get_updated_settings().backend, ), service_state, @@ -77,20 +85,25 @@ impl ServiceCore for NetworkService { while let Some(msg) = inbound_relay.recv().await { match msg { - NetworkMsg::Broadcast(msg) => backend.broadcast(msg), - NetworkMsg::Subscribe { kind, sender } => { - sender.send(backend.subscribe(kind)).unwrap_or_else(|_| { - tracing::warn!( - "client hung up before as subscription handle could be established" - ) - }) + NetworkMsg::Process(msg) => { + // split sending in two steps to help the compiler understand we do not + // need to hold an instance of &I (which is not send) across an await point + let _send = backend.process(msg); + _send.await } + NetworkMsg::Subscribe { kind, sender } => sender + .send(backend.subscribe(kind).await) + .unwrap_or_else(|_| { + tracing::warn!( + "client hung up before a subscription handle could be established" + ) + }), } } } } -impl Clone for NetworkConfig { +impl Clone for NetworkConfig { fn clone(&self) -> Self { NetworkConfig { backend: self.backend.clone(), @@ -98,7 +111,7 @@ impl Clone for NetworkConfig { } } -impl Clone for NetworkState { +impl Clone for NetworkState { fn clone(&self) -> Self { NetworkState { _backend: self._backend.clone(), @@ -106,12 +119,12 @@ impl Clone for NetworkState { } } -impl ServiceState for NetworkState { - type Settings = NetworkConfig; +impl ServiceState for NetworkState { + type Settings = NetworkConfig; fn from_settings(settings: &Self::Settings) -> Self { Self { - _backend: I::State::from_settings(&settings.backend), + _backend: B::State::from_settings(&settings.backend), } } }