diff --git a/nomos-services/Cargo.toml b/nomos-services/Cargo.toml index 3c00d4e4..b1976139 100644 --- a/nomos-services/Cargo.toml +++ b/nomos-services/Cargo.toml @@ -13,7 +13,10 @@ serde = "1.0" tokio = { version = "1", features = ["sync"] } thiserror = "1.0" tracing = "0.1" +waku = { git = "https://github.com/waku-org/waku-rust-bindings" } +multiaddr = "0.15" + [features] default = [] -mock = [] \ No newline at end of file +mock = [] diff --git a/nomos-services/src/network/backends/mod.rs b/nomos-services/src/network/backends/mod.rs index aaaed677..d9c62f96 100644 --- a/nomos-services/src/network/backends/mod.rs +++ b/nomos-services/src/network/backends/mod.rs @@ -2,11 +2,18 @@ use super::*; use overwatch::services::state::ServiceState; use tokio::sync::broadcast::Receiver; +mod waku; +pub use self::waku::Waku; + +#[async_trait::async_trait] pub trait NetworkBackend { - type Config: Clone + Send + Sync + 'static; + type Config: Clone + Debug + Send + Sync + 'static; type State: ServiceState + Clone; + type Message: Debug + Send + Sync + 'static; + type EventKind: Debug + Send + Sync + 'static; + type NetworkEvent: Debug + Send + Sync + 'static; fn new(config: Self::Config) -> Self; - fn broadcast(&self, msg: NetworkData); - fn subscribe(&mut self, event: EventKind) -> Receiver; + async fn process(&self, msg: Self::Message); + async fn subscribe(&mut self, event: Self::EventKind) -> Receiver; } diff --git a/nomos-services/src/network/backends/waku.rs b/nomos-services/src/network/backends/waku.rs new file mode 100644 index 00000000..cdac8ed3 --- /dev/null +++ b/nomos-services/src/network/backends/waku.rs @@ -0,0 +1,175 @@ +use super::*; +use ::waku::*; +use overwatch::services::state::NoState; +use serde::{Deserialize, Serialize}; +use tokio::sync::{ + broadcast::{self, Receiver, Sender}, + oneshot, +}; +use tracing::{debug, error}; + +const BROADCAST_CHANNEL_BUF: usize = 16; + +pub struct Waku { + waku: WakuNodeHandle, + message_event: Sender, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct WakuConfig { + #[serde(flatten)] + inner: WakuNodeConfig, + initial_peers: Vec, +} + +/// Interaction with Waku node +#[derive(Debug)] +pub enum WakuBackendMessage { + /// Send a message to the network + Broadcast { + message: WakuMessage, + topic: Option, + }, + /// Subscribe to a particular Waku topic + RelaySubscribe { topic: WakuPubSubTopic }, + /// Unsubscribe from a particular Waku topic + RelayUnsubscribe { topic: WakuPubSubTopic }, + /// Retrieve old messages from another peer + StoreQuery { + query: StoreQuery, + peer_id: PeerId, + response: oneshot::Sender, + }, + /// Send a message using Waku Light Push + LightpushPublish { + message: WakuMessage, + topic: Option, + peer_id: PeerId, + }, +} + +#[derive(Debug)] +pub enum EventKind { + Message, +} + +#[derive(Debug, Clone)] +pub enum NetworkEvent { + RawMessage(WakuMessage), +} + +#[async_trait::async_trait] +impl NetworkBackend for Waku { + type Config = WakuConfig; + type State = NoState; + type Message = WakuBackendMessage; + type EventKind = EventKind; + type NetworkEvent = NetworkEvent; + + fn new(config: Self::Config) -> Self { + let waku = waku_new(Some(config.inner)).unwrap().start().unwrap(); + waku.relay_subscribe(None).unwrap(); + tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]); + for peer in &config.initial_peers { + if let Err(e) = waku.connect_peer_with_address(peer, None) { + tracing::warn!("Could not connect to {peer}: {e}"); + } + } + + let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0; + let tx = message_event.clone(); + waku_set_event_callback(move |sig| match sig.event() { + Event::WakuMessage(ref msg_event) => { + debug!("received message event"); + if tx + .send(NetworkEvent::RawMessage(msg_event.waku_message().clone())) + .is_err() + { + debug!("no active receiver"); + } + } + _ => tracing::warn!("unsupported event"), + }); + Self { + waku, + message_event, + } + } + + async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver { + match kind { + EventKind::Message => { + debug!("processed subscription to incoming messages"); + self.message_event.subscribe() + } + } + } + + async fn process(&self, msg: Self::Message) { + match msg { + WakuBackendMessage::Broadcast { message, topic } => { + match self.waku.relay_publish_message(&message, topic, None) { + Ok(id) => debug!( + "successfully broadcast message with id: {id}, raw contents: {:?}", + message.payload() + ), + Err(e) => tracing::error!( + "could not broadcast message due to {e}, raw contents {:?}", + message.payload() + ), + } + } + WakuBackendMessage::LightpushPublish { + message, + topic, + peer_id, + } => match self.waku.lightpush_publish(&message, topic, peer_id, None) { + Ok(id) => debug!( + "successfully published lighpush message with id: {id}, raw contents: {:?}", + message.payload() + ), + Err(e) => tracing::error!( + "could not publish lightpush message due to {e}, raw contents {:?}", + message.payload() + ), + }, + WakuBackendMessage::RelaySubscribe { topic } => { + match self.waku.relay_subscribe(Some(topic.clone())) { + Ok(_) => debug!("successfully subscribed to topic {:?}", topic), + Err(e) => { + tracing::error!("could not subscribe to topic {:?} due to {e}", topic) + } + } + } + WakuBackendMessage::RelayUnsubscribe { topic } => { + match self.waku.relay_unsubscribe(Some(topic.clone())) { + Ok(_) => debug!("successfully unsubscribed to topic {:?}", topic), + Err(e) => { + tracing::error!("could not unsubscribe to topic {:?} due to {e}", topic) + } + } + } + WakuBackendMessage::StoreQuery { + query, + peer_id, + response, + } => match self.waku.store_query(&query, &peer_id, None) { + Ok(res) => { + debug!( + "successfully retrieved stored messages with options {:?}", + query + ); + response + .send(res) + .unwrap_or_else(|_| error!("client hung up store query handle")); + } + Err(e) => { + error!( + "could not retrieve store messages due to {e}, options: {:?}", + query + ) + } + }, + }; + } +} diff --git a/nomos-services/src/network/mod.rs b/nomos-services/src/network/mod.rs index b4b56490..8919b595 100644 --- a/nomos-services/src/network/mod.rs +++ b/nomos-services/src/network/mod.rs @@ -8,59 +8,67 @@ use overwatch::services::{ state::{NoOperator, ServiceState}, ServiceCore, ServiceData, ServiceId, }; -use std::fmt::Debug; +use serde::{Deserialize, Serialize}; +use std::fmt::{self, Debug}; use tokio::sync::broadcast; use tokio::sync::oneshot; -pub type NetworkData = Box<[u8]>; - -#[derive(Debug)] -pub enum NetworkMsg { - Broadcast(NetworkData), +pub enum NetworkMsg { + Process(B::Message), Subscribe { - kind: EventKind, - sender: oneshot::Sender>, + kind: B::EventKind, + sender: oneshot::Sender>, }, } -impl RelayMessage for NetworkMsg {} - -#[derive(Debug)] -pub enum EventKind { - Message, +impl Debug for NetworkMsg { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Process(msg) => write!(fmt, "NetworkMsg::Process({:?})", msg), + Self::Subscribe { kind, sender } => write!( + fmt, + "NetworkMsg::Subscribe{{ kind: {:?}, sender: {:?}}}", + kind, sender + ), + } + } } -#[derive(Debug)] -pub enum NetworkEvent { - RawMessage(NetworkData), +impl RelayMessage for NetworkMsg {} + +#[derive(Serialize, Deserialize)] +pub struct NetworkConfig { + pub backend: B::Config, } -pub struct NetworkConfig { - pub backend: I::Config, +impl Debug for NetworkConfig { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "NetworkConfig {{ backend: {:?}}}", self.backend) + } } -pub struct NetworkService { - backend: I, +pub struct NetworkService { + backend: B, service_state: ServiceStateHandle, } -pub struct NetworkState { - _backend: I::State, +pub struct NetworkState { + _backend: B::State, } -impl ServiceData for NetworkService { +impl ServiceData for NetworkService { const SERVICE_ID: ServiceId = "Network"; - type Settings = NetworkConfig; - type State = NetworkState; + type Settings = NetworkConfig; + type State = NetworkState; type StateOperator = NoOperator; - type Message = NetworkMsg; + type Message = NetworkMsg; } #[async_trait] -impl ServiceCore for NetworkService { +impl ServiceCore for NetworkService { fn init(mut service_state: ServiceStateHandle) -> Self { Self { - backend: ::new( + backend: ::new( service_state.settings_reader.get_updated_settings().backend, ), service_state, @@ -77,20 +85,25 @@ impl ServiceCore for NetworkService { while let Some(msg) = inbound_relay.recv().await { match msg { - NetworkMsg::Broadcast(msg) => backend.broadcast(msg), - NetworkMsg::Subscribe { kind, sender } => { - sender.send(backend.subscribe(kind)).unwrap_or_else(|_| { - tracing::warn!( - "client hung up before as subscription handle could be established" - ) - }) + NetworkMsg::Process(msg) => { + // split sending in two steps to help the compiler understand we do not + // need to hold an instance of &I (which is not send) across an await point + let _send = backend.process(msg); + _send.await } + NetworkMsg::Subscribe { kind, sender } => sender + .send(backend.subscribe(kind).await) + .unwrap_or_else(|_| { + tracing::warn!( + "client hung up before a subscription handle could be established" + ) + }), } } } } -impl Clone for NetworkConfig { +impl Clone for NetworkConfig { fn clone(&self) -> Self { NetworkConfig { backend: self.backend.clone(), @@ -98,7 +111,7 @@ impl Clone for NetworkConfig { } } -impl Clone for NetworkState { +impl Clone for NetworkState { fn clone(&self) -> Self { NetworkState { _backend: self._backend.clone(), @@ -106,12 +119,12 @@ impl Clone for NetworkState { } } -impl ServiceState for NetworkState { - type Settings = NetworkConfig; +impl ServiceState for NetworkState { + type Settings = NetworkConfig; fn from_settings(settings: &Self::Settings) -> Self { Self { - _backend: I::State::from_settings(&settings.backend), + _backend: B::State::from_settings(&settings.backend), } } }