1
0
mirror of synced 2025-01-11 08:15:48 +00:00

Add Waku network backend (#3)

* Add Waku network backend

Add Waku as the first supported network backend and rework API.
In particular, the network message now depends on the underlying
network backend so that it can properly reflects specificities
of the protocol.
Another choice could have been to hardwire domain-specific actions
in the network message type (e.g. send block, send message, ...)
but was discarded in favor of a more general network service.

* address review comments

* add debug impl

* add Serialize/Deserialize to network settings

* add waku functionalities

* add a little bit of documentation
This commit is contained in:
Giacomo Pasini 2022-11-21 12:03:47 +01:00 committed by GitHub
parent a343249d92
commit 61de96a5d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 242 additions and 44 deletions

View File

@ -13,6 +13,9 @@ serde = "1.0"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
thiserror = "1.0" thiserror = "1.0"
tracing = "0.1" tracing = "0.1"
waku = { git = "https://github.com/waku-org/waku-rust-bindings" }
multiaddr = "0.15"
[features] [features]
default = [] default = []

View File

@ -2,11 +2,18 @@ use super::*;
use overwatch::services::state::ServiceState; use overwatch::services::state::ServiceState;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
mod waku;
pub use self::waku::Waku;
#[async_trait::async_trait]
pub trait NetworkBackend { pub trait NetworkBackend {
type Config: Clone + Send + Sync + 'static; type Config: Clone + Debug + Send + Sync + 'static;
type State: ServiceState<Settings = Self::Config> + Clone; type State: ServiceState<Settings = Self::Config> + Clone;
type Message: Debug + Send + Sync + 'static;
type EventKind: Debug + Send + Sync + 'static;
type NetworkEvent: Debug + Send + Sync + 'static;
fn new(config: Self::Config) -> Self; fn new(config: Self::Config) -> Self;
fn broadcast(&self, msg: NetworkData); async fn process(&self, msg: Self::Message);
fn subscribe(&mut self, event: EventKind) -> Receiver<NetworkEvent>; async fn subscribe(&mut self, event: Self::EventKind) -> Receiver<Self::NetworkEvent>;
} }

View File

@ -0,0 +1,175 @@
use super::*;
use ::waku::*;
use overwatch::services::state::NoState;
use serde::{Deserialize, Serialize};
use tokio::sync::{
broadcast::{self, Receiver, Sender},
oneshot,
};
use tracing::{debug, error};
const BROADCAST_CHANNEL_BUF: usize = 16;
pub struct Waku {
waku: WakuNodeHandle<Running>,
message_event: Sender<NetworkEvent>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WakuConfig {
#[serde(flatten)]
inner: WakuNodeConfig,
initial_peers: Vec<Multiaddr>,
}
/// Interaction with Waku node
#[derive(Debug)]
pub enum WakuBackendMessage {
/// Send a message to the network
Broadcast {
message: WakuMessage,
topic: Option<WakuPubSubTopic>,
},
/// Subscribe to a particular Waku topic
RelaySubscribe { topic: WakuPubSubTopic },
/// Unsubscribe from a particular Waku topic
RelayUnsubscribe { topic: WakuPubSubTopic },
/// Retrieve old messages from another peer
StoreQuery {
query: StoreQuery,
peer_id: PeerId,
response: oneshot::Sender<StoreResponse>,
},
/// Send a message using Waku Light Push
LightpushPublish {
message: WakuMessage,
topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
},
}
#[derive(Debug)]
pub enum EventKind {
Message,
}
#[derive(Debug, Clone)]
pub enum NetworkEvent {
RawMessage(WakuMessage),
}
#[async_trait::async_trait]
impl NetworkBackend for Waku {
type Config = WakuConfig;
type State = NoState<WakuConfig>;
type Message = WakuBackendMessage;
type EventKind = EventKind;
type NetworkEvent = NetworkEvent;
fn new(config: Self::Config) -> Self {
let waku = waku_new(Some(config.inner)).unwrap().start().unwrap();
waku.relay_subscribe(None).unwrap();
tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]);
for peer in &config.initial_peers {
if let Err(e) = waku.connect_peer_with_address(peer, None) {
tracing::warn!("Could not connect to {peer}: {e}");
}
}
let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0;
let tx = message_event.clone();
waku_set_event_callback(move |sig| match sig.event() {
Event::WakuMessage(ref msg_event) => {
debug!("received message event");
if tx
.send(NetworkEvent::RawMessage(msg_event.waku_message().clone()))
.is_err()
{
debug!("no active receiver");
}
}
_ => tracing::warn!("unsupported event"),
});
Self {
waku,
message_event,
}
}
async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver<Self::NetworkEvent> {
match kind {
EventKind::Message => {
debug!("processed subscription to incoming messages");
self.message_event.subscribe()
}
}
}
async fn process(&self, msg: Self::Message) {
match msg {
WakuBackendMessage::Broadcast { message, topic } => {
match self.waku.relay_publish_message(&message, topic, None) {
Ok(id) => debug!(
"successfully broadcast message with id: {id}, raw contents: {:?}",
message.payload()
),
Err(e) => tracing::error!(
"could not broadcast message due to {e}, raw contents {:?}",
message.payload()
),
}
}
WakuBackendMessage::LightpushPublish {
message,
topic,
peer_id,
} => match self.waku.lightpush_publish(&message, topic, peer_id, None) {
Ok(id) => debug!(
"successfully published lighpush message with id: {id}, raw contents: {:?}",
message.payload()
),
Err(e) => tracing::error!(
"could not publish lightpush message due to {e}, raw contents {:?}",
message.payload()
),
},
WakuBackendMessage::RelaySubscribe { topic } => {
match self.waku.relay_subscribe(Some(topic.clone())) {
Ok(_) => debug!("successfully subscribed to topic {:?}", topic),
Err(e) => {
tracing::error!("could not subscribe to topic {:?} due to {e}", topic)
}
}
}
WakuBackendMessage::RelayUnsubscribe { topic } => {
match self.waku.relay_unsubscribe(Some(topic.clone())) {
Ok(_) => debug!("successfully unsubscribed to topic {:?}", topic),
Err(e) => {
tracing::error!("could not unsubscribe to topic {:?} due to {e}", topic)
}
}
}
WakuBackendMessage::StoreQuery {
query,
peer_id,
response,
} => match self.waku.store_query(&query, &peer_id, None) {
Ok(res) => {
debug!(
"successfully retrieved stored messages with options {:?}",
query
);
response
.send(res)
.unwrap_or_else(|_| error!("client hung up store query handle"));
}
Err(e) => {
error!(
"could not retrieve store messages due to {e}, options: {:?}",
query
)
}
},
};
}
}

View File

@ -8,59 +8,67 @@ use overwatch::services::{
state::{NoOperator, ServiceState}, state::{NoOperator, ServiceState},
ServiceCore, ServiceData, ServiceId, ServiceCore, ServiceData, ServiceId,
}; };
use std::fmt::Debug; use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::oneshot; use tokio::sync::oneshot;
pub type NetworkData = Box<[u8]>; pub enum NetworkMsg<B: NetworkBackend> {
Process(B::Message),
#[derive(Debug)]
pub enum NetworkMsg {
Broadcast(NetworkData),
Subscribe { Subscribe {
kind: EventKind, kind: B::EventKind,
sender: oneshot::Sender<broadcast::Receiver<NetworkEvent>>, sender: oneshot::Sender<broadcast::Receiver<B::NetworkEvent>>,
}, },
} }
impl RelayMessage for NetworkMsg {} impl<B: NetworkBackend> Debug for NetworkMsg<B> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
#[derive(Debug)] match self {
pub enum EventKind { Self::Process(msg) => write!(fmt, "NetworkMsg::Process({:?})", msg),
Message, Self::Subscribe { kind, sender } => write!(
fmt,
"NetworkMsg::Subscribe{{ kind: {:?}, sender: {:?}}}",
kind, sender
),
}
}
} }
#[derive(Debug)] impl<T: NetworkBackend + 'static> RelayMessage for NetworkMsg<T> {}
pub enum NetworkEvent {
RawMessage(NetworkData), #[derive(Serialize, Deserialize)]
pub struct NetworkConfig<B: NetworkBackend> {
pub backend: B::Config,
} }
pub struct NetworkConfig<I: NetworkBackend> { impl<B: NetworkBackend> Debug for NetworkConfig<B> {
pub backend: I::Config, fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "NetworkConfig {{ backend: {:?}}}", self.backend)
}
} }
pub struct NetworkService<I: NetworkBackend + Send + 'static> { pub struct NetworkService<B: NetworkBackend + Send + 'static> {
backend: I, backend: B,
service_state: ServiceStateHandle<Self>, service_state: ServiceStateHandle<Self>,
} }
pub struct NetworkState<I: NetworkBackend> { pub struct NetworkState<B: NetworkBackend> {
_backend: I::State, _backend: B::State,
} }
impl<I: NetworkBackend + Send + 'static> ServiceData for NetworkService<I> { impl<B: NetworkBackend + Send + 'static> ServiceData for NetworkService<B> {
const SERVICE_ID: ServiceId = "Network"; const SERVICE_ID: ServiceId = "Network";
type Settings = NetworkConfig<I>; type Settings = NetworkConfig<B>;
type State = NetworkState<I>; type State = NetworkState<B>;
type StateOperator = NoOperator<Self::State>; type StateOperator = NoOperator<Self::State>;
type Message = NetworkMsg; type Message = NetworkMsg<B>;
} }
#[async_trait] #[async_trait]
impl<I: NetworkBackend + Send + 'static> ServiceCore for NetworkService<I> { impl<B: NetworkBackend + Send + 'static> ServiceCore for NetworkService<B> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Self { fn init(mut service_state: ServiceStateHandle<Self>) -> Self {
Self { Self {
backend: <I as NetworkBackend>::new( backend: <B as NetworkBackend>::new(
service_state.settings_reader.get_updated_settings().backend, service_state.settings_reader.get_updated_settings().backend,
), ),
service_state, service_state,
@ -77,20 +85,25 @@ impl<I: NetworkBackend + Send + 'static> ServiceCore for NetworkService<I> {
while let Some(msg) = inbound_relay.recv().await { while let Some(msg) = inbound_relay.recv().await {
match msg { match msg {
NetworkMsg::Broadcast(msg) => backend.broadcast(msg), NetworkMsg::Process(msg) => {
NetworkMsg::Subscribe { kind, sender } => { // split sending in two steps to help the compiler understand we do not
sender.send(backend.subscribe(kind)).unwrap_or_else(|_| { // need to hold an instance of &I (which is not send) across an await point
tracing::warn!( let _send = backend.process(msg);
"client hung up before as subscription handle could be established" _send.await
)
})
} }
NetworkMsg::Subscribe { kind, sender } => sender
.send(backend.subscribe(kind).await)
.unwrap_or_else(|_| {
tracing::warn!(
"client hung up before a subscription handle could be established"
)
}),
} }
} }
} }
} }
impl<I: NetworkBackend> Clone for NetworkConfig<I> { impl<B: NetworkBackend> Clone for NetworkConfig<B> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
NetworkConfig { NetworkConfig {
backend: self.backend.clone(), backend: self.backend.clone(),
@ -98,7 +111,7 @@ impl<I: NetworkBackend> Clone for NetworkConfig<I> {
} }
} }
impl<I: NetworkBackend> Clone for NetworkState<I> { impl<B: NetworkBackend> Clone for NetworkState<B> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
NetworkState { NetworkState {
_backend: self._backend.clone(), _backend: self._backend.clone(),
@ -106,12 +119,12 @@ impl<I: NetworkBackend> Clone for NetworkState<I> {
} }
} }
impl<I: NetworkBackend + Send + 'static> ServiceState for NetworkState<I> { impl<B: NetworkBackend + Send + 'static> ServiceState for NetworkState<B> {
type Settings = NetworkConfig<I>; type Settings = NetworkConfig<B>;
fn from_settings(settings: &Self::Settings) -> Self { fn from_settings(settings: &Self::Settings) -> Self {
Self { Self {
_backend: I::State::from_settings(&settings.backend), _backend: B::State::from_settings(&settings.backend),
} }
} }
} }