feat: add `nomos-libp2p` crate (for nomos-network backend) (#237)
* feat: add libp2p network backend skeleton * use tokio runtime managed by Overwatch * feat: add nomos-libp2p crate * remove gossipsub_message_id_fn * clippy * use next() instead of select_next_some() * rename send_command to execute_command * const timeout * disable authn / msg signing to start from a clean slate * rename CommandSender to CommandResultSender * add comments * move node machinery to networkbackend * fmt * logs more network events --------- Co-authored-by: Giacomo Pasini <g.pasini98@gmail.com>
This commit is contained in:
parent
a0cb738b9f
commit
2b9769b5b7
|
@ -1,6 +1,7 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [
|
members = [
|
||||||
"nomos-core",
|
"nomos-core",
|
||||||
|
"nomos-libp2p",
|
||||||
"nomos-services/log",
|
"nomos-services/log",
|
||||||
"nomos-services/metrics",
|
"nomos-services/metrics",
|
||||||
"nomos-services/network",
|
"nomos-services/network",
|
||||||
|
|
|
@ -18,15 +18,23 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "ma
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
multiaddr = "0.17"
|
multiaddr = "0.17"
|
||||||
nomos-core = { path = "../../nomos-core" }
|
nomos-core = { path = "../../nomos-core" }
|
||||||
nomos-network = { path = "../../nomos-services/network", features = ["waku"] }
|
nomos-network = { path = "../../nomos-services/network", features = [
|
||||||
|
"waku",
|
||||||
|
"libp2p",
|
||||||
|
] }
|
||||||
nomos-log = { path = "../../nomos-services/log" }
|
nomos-log = { path = "../../nomos-services/log" }
|
||||||
nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mock"] }
|
nomos-mempool = { path = "../../nomos-services/mempool", features = [
|
||||||
|
"waku",
|
||||||
|
"mock",
|
||||||
|
] }
|
||||||
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
|
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
|
||||||
nomos-consensus = { path = "../../nomos-services/consensus", features = ["waku"] }
|
nomos-consensus = { path = "../../nomos-services/consensus", features = [
|
||||||
|
"waku",
|
||||||
|
] }
|
||||||
metrics = { path = "../../nomos-services/metrics", optional = true }
|
metrics = { path = "../../nomos-services/metrics", optional = true }
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
consensus-engine = { path = "../../consensus-engine" }
|
consensus-engine = { path = "../../consensus-engine" }
|
||||||
tokio = {version = "1.24", features = ["sync"] }
|
tokio = { version = "1.24", features = ["sync"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
serde_yaml = "0.9"
|
serde_yaml = "0.9"
|
||||||
color-eyre = "0.6.0"
|
color-eyre = "0.6.0"
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
[package]
|
||||||
|
name = "nomos-libp2p"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
multiaddr = "0.18"
|
||||||
|
tokio = { version = "1", features = ["sync", "macros"] }
|
||||||
|
futures = "0.3"
|
||||||
|
libp2p = { version = "0.52.1", features = [
|
||||||
|
"yamux",
|
||||||
|
"plaintext",
|
||||||
|
"macros",
|
||||||
|
"gossipsub",
|
||||||
|
"identify",
|
||||||
|
"tcp",
|
||||||
|
"tokio",
|
||||||
|
"secp256k1",
|
||||||
|
] }
|
||||||
|
serde = { version = "1.0.166", features = ["derive"] }
|
||||||
|
hex = "0.4.3"
|
||||||
|
log = "0.4.19"
|
||||||
|
thiserror = "1.0.40"
|
||||||
|
tracing = "0.1"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
env_logger = "0.10.0"
|
||||||
|
serde_json = "1.0.99"
|
||||||
|
tokio = { version = "1", features = ["time"] }
|
|
@ -0,0 +1,186 @@
|
||||||
|
use std::error::Error;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
pub use libp2p;
|
||||||
|
|
||||||
|
use libp2p::gossipsub::MessageId;
|
||||||
|
pub use libp2p::{
|
||||||
|
core::upgrade,
|
||||||
|
gossipsub::{self, PublishError, SubscriptionError},
|
||||||
|
identity::{self, secp256k1},
|
||||||
|
plaintext::PlainText2Config,
|
||||||
|
swarm::{DialError, NetworkBehaviour, SwarmBuilder, SwarmEvent, THandlerErr},
|
||||||
|
tcp, yamux, PeerId, Transport,
|
||||||
|
};
|
||||||
|
pub use multiaddr::{multiaddr, Multiaddr, Protocol};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Wraps [`libp2p::Swarm`], and config it for use within Nomos.
|
||||||
|
pub struct Swarm {
|
||||||
|
// A core libp2p swarm
|
||||||
|
swarm: libp2p::Swarm<Behaviour>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(NetworkBehaviour)]
|
||||||
|
pub struct Behaviour {
|
||||||
|
gossipsub: gossipsub::Behaviour,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct SwarmConfig {
|
||||||
|
// Listening IPv4 address
|
||||||
|
pub host: std::net::Ipv4Addr,
|
||||||
|
// TCP listening port. Use 0 for random
|
||||||
|
pub port: u16,
|
||||||
|
// Secp256k1 private key in Hex format (`0x123...abc`). Default random
|
||||||
|
#[serde(with = "secret_key_serde")]
|
||||||
|
pub node_key: secp256k1::SecretKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SwarmConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
host: std::net::Ipv4Addr::new(0, 0, 0, 0),
|
||||||
|
port: 60000,
|
||||||
|
node_key: secp256k1::SecretKey::generate(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
pub enum SwarmError {
|
||||||
|
#[error("duplicate dialing")]
|
||||||
|
DuplicateDialing,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A timeout for the setup and protocol upgrade process for all in/outbound connections
|
||||||
|
const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20);
|
||||||
|
|
||||||
|
impl Swarm {
|
||||||
|
/// Builds a [`Swarm`] configured for use with Nomos on top of a tokio executor.
|
||||||
|
//
|
||||||
|
// TODO: define error types
|
||||||
|
pub fn build(config: &SwarmConfig) -> Result<Self, Box<dyn Error>> {
|
||||||
|
let id_keys = identity::Keypair::from(secp256k1::Keypair::from(config.node_key.clone()));
|
||||||
|
let local_peer_id = PeerId::from(id_keys.public());
|
||||||
|
log::info!("libp2p peer_id:{}", local_peer_id);
|
||||||
|
|
||||||
|
// TODO: consider using noise authentication
|
||||||
|
let tcp_transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
|
||||||
|
.upgrade(upgrade::Version::V1Lazy)
|
||||||
|
.authenticate(PlainText2Config {
|
||||||
|
local_public_key: id_keys.public(),
|
||||||
|
})
|
||||||
|
.multiplex(yamux::Config::default())
|
||||||
|
.timeout(TRANSPORT_TIMEOUT)
|
||||||
|
.boxed();
|
||||||
|
|
||||||
|
// TODO: consider using Signed or Anonymous.
|
||||||
|
// For Anonymous, a custom `message_id` function need to be set
|
||||||
|
// to prevent all messages from a peer being filtered as duplicates.
|
||||||
|
let gossipsub = gossipsub::Behaviour::new(
|
||||||
|
gossipsub::MessageAuthenticity::Author(local_peer_id),
|
||||||
|
gossipsub::ConfigBuilder::default()
|
||||||
|
.validation_mode(gossipsub::ValidationMode::None)
|
||||||
|
.build()?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut swarm = SwarmBuilder::with_tokio_executor(
|
||||||
|
tcp_transport,
|
||||||
|
Behaviour { gossipsub },
|
||||||
|
local_peer_id,
|
||||||
|
)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
swarm.listen_on(multiaddr!(Ip4(config.host), Tcp(config.port)))?;
|
||||||
|
|
||||||
|
Ok(Swarm { swarm })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initiates a connection attempt to a peer
|
||||||
|
pub fn connect(&mut self, peer_id: PeerId, peer_addr: Multiaddr) -> Result<(), DialError> {
|
||||||
|
tracing::debug!("attempting to dial {peer_id}");
|
||||||
|
self.swarm.dial(peer_addr.with(Protocol::P2p(peer_id)))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribes to a topic
|
||||||
|
///
|
||||||
|
/// Returns true if the topic is newly subscribed or false if already subscribed.
|
||||||
|
pub fn subscribe(&mut self, topic: &str) -> Result<bool, SubscriptionError> {
|
||||||
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.gossipsub
|
||||||
|
.subscribe(&gossipsub::IdentTopic::new(topic))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn broadcast(&mut self, topic: &str, message: Vec<u8>) -> Result<MessageId, PublishError> {
|
||||||
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.gossipsub
|
||||||
|
.publish(gossipsub::IdentTopic::new(topic), message)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unsubscribes from a topic
|
||||||
|
///
|
||||||
|
/// Returns true if previously subscribed
|
||||||
|
pub fn unsubscribe(&mut self, topic: &str) -> Result<bool, PublishError> {
|
||||||
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.gossipsub
|
||||||
|
.unsubscribe(&gossipsub::IdentTopic::new(topic))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl futures::Stream for Swarm {
|
||||||
|
type Item = SwarmEvent<BehaviourEvent, THandlerErr<Behaviour>>;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
Pin::new(&mut self.swarm).poll_next(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod secret_key_serde {
|
||||||
|
use libp2p::identity::secp256k1;
|
||||||
|
use serde::de::Error;
|
||||||
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
|
|
||||||
|
pub fn serialize<S>(key: &secp256k1::SecretKey, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
let hex_str = hex::encode(key.to_bytes());
|
||||||
|
hex_str.serialize(serializer)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<secp256k1::SecretKey, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let hex_str = String::deserialize(deserializer)?;
|
||||||
|
let mut key_bytes = hex::decode(hex_str).map_err(|e| D::Error::custom(format!("{e}")))?;
|
||||||
|
secp256k1::SecretKey::try_from_bytes(key_bytes.as_mut_slice())
|
||||||
|
.map_err(|e| D::Error::custom(format!("{e}")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn config_serde() {
|
||||||
|
let config: SwarmConfig = Default::default();
|
||||||
|
|
||||||
|
let serialized = serde_json::to_string(&config).unwrap();
|
||||||
|
println!("{serialized}");
|
||||||
|
|
||||||
|
let deserialized: SwarmConfig = serde_json::from_str(serialized.as_str()).unwrap();
|
||||||
|
assert_eq!(deserialized.host, config.host);
|
||||||
|
assert_eq!(deserialized.port, config.port);
|
||||||
|
assert_eq!(deserialized.node_key.to_bytes(), config.node_key.to_bytes());
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ tracing-subscriber = { version = "0.3", features = ["json"] }
|
||||||
tracing-gelf = "0.7"
|
tracing-gelf = "0.7"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
parking_lot = "0.12"
|
parking_lot = "0.12"
|
||||||
|
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
@ -32,4 +33,5 @@ tokio = { version = "1", features = ["full"] }
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
waku = ["waku-bindings"]
|
waku = ["waku-bindings"]
|
||||||
|
libp2p = ["nomos-libp2p"]
|
||||||
mock = ["rand", "chrono"]
|
mock = ["rand", "chrono"]
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
use nomos_libp2p::{
|
||||||
|
libp2p::{
|
||||||
|
gossipsub::{self, Message},
|
||||||
|
Multiaddr, PeerId,
|
||||||
|
},
|
||||||
|
BehaviourEvent, Swarm, SwarmConfig, SwarmEvent,
|
||||||
|
};
|
||||||
|
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
|
||||||
|
use tokio::sync::{broadcast, mpsc};
|
||||||
|
|
||||||
|
use super::NetworkBackend;
|
||||||
|
|
||||||
|
macro_rules! log_error {
|
||||||
|
($e:expr) => {
|
||||||
|
if let Err(e) = $e {
|
||||||
|
tracing::error!("error while processing {}: {e:?}", stringify!($e));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Libp2p {
|
||||||
|
events_tx: broadcast::Sender<Event>,
|
||||||
|
commands_tx: mpsc::Sender<Command>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum EventKind {
|
||||||
|
Message,
|
||||||
|
}
|
||||||
|
use std::error::Error;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
|
const BUFFER_SIZE: usize = 16;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Command {
|
||||||
|
Connect(PeerId, Multiaddr),
|
||||||
|
Broadcast { topic: Topic, message: Vec<u8> },
|
||||||
|
Subscribe(Topic),
|
||||||
|
Unsubscribe(Topic),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Topic = String;
|
||||||
|
pub type CommandResultSender = oneshot::Sender<Result<(), Box<dyn Error + Send>>>;
|
||||||
|
|
||||||
|
/// Events emitted from [`NomosLibp2p`], which users can subscribe
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum Event {
|
||||||
|
Message(Message),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl NetworkBackend for Libp2p {
|
||||||
|
type Settings = SwarmConfig;
|
||||||
|
type State = NoState<SwarmConfig>;
|
||||||
|
type Message = Command;
|
||||||
|
type EventKind = EventKind;
|
||||||
|
type NetworkEvent = Event;
|
||||||
|
|
||||||
|
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
|
||||||
|
let (commands_tx, mut commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE);
|
||||||
|
let (events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE);
|
||||||
|
let libp2p = Self {
|
||||||
|
events_tx: events_tx.clone(),
|
||||||
|
commands_tx,
|
||||||
|
};
|
||||||
|
overwatch_handle.runtime().spawn(async move {
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
let mut swarm = Swarm::build(&config).unwrap();
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(event) = swarm.next() => {
|
||||||
|
match event {
|
||||||
|
SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Message {
|
||||||
|
propagation_source: peer_id,
|
||||||
|
message_id: id,
|
||||||
|
message,
|
||||||
|
})) => {
|
||||||
|
tracing::debug!("Got message with id: {id} from peer: {peer_id}");
|
||||||
|
log_error!(events_tx.send(Event::Message(message)));
|
||||||
|
}
|
||||||
|
SwarmEvent::ConnectionEstablished {
|
||||||
|
peer_id,
|
||||||
|
connection_id,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
tracing::debug!("connected to peer: {peer_id} {connection_id:?}");
|
||||||
|
}
|
||||||
|
SwarmEvent::ConnectionClosed {
|
||||||
|
peer_id,
|
||||||
|
connection_id,
|
||||||
|
cause,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
tracing::debug!("connection closed from peer: {peer_id} {connection_id:?} due to {cause:?}");
|
||||||
|
}
|
||||||
|
SwarmEvent::OutgoingConnectionError {
|
||||||
|
peer_id,
|
||||||
|
connection_id,
|
||||||
|
error,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
tracing::debug!("failed to connect to peer: {peer_id:?} {connection_id:?} due to: {error}");
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(command) = commands_rx.recv() => {
|
||||||
|
match command {
|
||||||
|
Command::Connect(peer_id, peer_addr) => {
|
||||||
|
tracing::debug!("connecting to peer: {peer_id} {peer_addr}");
|
||||||
|
log_error!(swarm.connect(peer_id, peer_addr));
|
||||||
|
}
|
||||||
|
Command::Broadcast { topic, message } => {
|
||||||
|
match swarm.broadcast(&topic, message) {
|
||||||
|
Ok(id) => {
|
||||||
|
tracing::debug!("broadcasted message with id: {id} tp topic: {topic}");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("failed to broadcast message to topic: {topic} {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Command::Subscribe(topic) => {
|
||||||
|
tracing::debug!("subscribing to topic: {topic}");
|
||||||
|
log_error!(swarm.subscribe(&topic));
|
||||||
|
}
|
||||||
|
Command::Unsubscribe(topic) => {
|
||||||
|
tracing::debug!("unsubscribing to topic: {topic}");
|
||||||
|
log_error!(swarm.unsubscribe(&topic));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
libp2p
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process(&self, msg: Self::Message) {
|
||||||
|
if let Err(e) = self.commands_tx.send(msg).await {
|
||||||
|
tracing::error!("failed to send command to nomos-libp2p: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn subscribe(
|
||||||
|
&mut self,
|
||||||
|
kind: Self::EventKind,
|
||||||
|
) -> broadcast::Receiver<Self::NetworkEvent> {
|
||||||
|
match kind {
|
||||||
|
EventKind::Message => {
|
||||||
|
tracing::debug!("processed subscription to incoming messages");
|
||||||
|
self.events_tx.subscribe()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -231,7 +231,7 @@ impl NetworkBackend for Mock {
|
||||||
type EventKind = EventKind;
|
type EventKind = EventKind;
|
||||||
type NetworkEvent = NetworkEvent;
|
type NetworkEvent = NetworkEvent;
|
||||||
|
|
||||||
fn new(config: Self::Settings) -> Self {
|
fn new(config: Self::Settings, _: OverwatchHandle) -> Self {
|
||||||
let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0;
|
let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0;
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
|
@ -299,6 +299,8 @@ impl NetworkBackend for Mock {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -332,7 +334,10 @@ mod tests {
|
||||||
weights: None,
|
weights: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mock = Arc::new(Mock::new(config));
|
let mock = Arc::new(Mock::new(
|
||||||
|
config,
|
||||||
|
OverwatchHandle::new(tokio::runtime::Handle::current(), mpsc::channel(1).0),
|
||||||
|
));
|
||||||
// run producer
|
// run producer
|
||||||
let task = mock.clone();
|
let task = mock.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
use super::*;
|
use super::*;
|
||||||
use overwatch_rs::services::state::ServiceState;
|
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState};
|
||||||
use tokio::sync::broadcast::Receiver;
|
use tokio::sync::broadcast::Receiver;
|
||||||
|
|
||||||
#[cfg(feature = "waku")]
|
#[cfg(feature = "waku")]
|
||||||
pub mod waku;
|
pub mod waku;
|
||||||
|
|
||||||
|
#[cfg(feature = "libp2p")]
|
||||||
|
pub mod libp2p;
|
||||||
|
|
||||||
#[cfg(feature = "mock")]
|
#[cfg(feature = "mock")]
|
||||||
pub mod mock;
|
pub mod mock;
|
||||||
|
|
||||||
|
@ -16,7 +19,7 @@ pub trait NetworkBackend {
|
||||||
type EventKind: Debug + Send + Sync + 'static;
|
type EventKind: Debug + Send + Sync + 'static;
|
||||||
type NetworkEvent: Debug + Send + Sync + 'static;
|
type NetworkEvent: Debug + Send + Sync + 'static;
|
||||||
|
|
||||||
fn new(config: Self::Settings) -> Self;
|
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self;
|
||||||
async fn process(&self, msg: Self::Message);
|
async fn process(&self, msg: Self::Message);
|
||||||
async fn subscribe(&mut self, event: Self::EventKind) -> Receiver<Self::NetworkEvent>;
|
async fn subscribe(&mut self, event: Self::EventKind) -> Receiver<Self::NetworkEvent>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,7 @@ impl NetworkBackend for Waku {
|
||||||
type EventKind = EventKind;
|
type EventKind = EventKind;
|
||||||
type NetworkEvent = NetworkEvent;
|
type NetworkEvent = NetworkEvent;
|
||||||
|
|
||||||
fn new(mut config: Self::Settings) -> Self {
|
fn new(mut config: Self::Settings, _: OverwatchHandle) -> Self {
|
||||||
// set store protocol to active at all times
|
// set store protocol to active at all times
|
||||||
config.inner.store = Some(true);
|
config.inner.store = Some(true);
|
||||||
let waku = waku_new(Some(config.inner)).unwrap().start().unwrap();
|
let waku = waku_new(Some(config.inner)).unwrap().start().unwrap();
|
||||||
|
|
|
@ -76,6 +76,7 @@ where
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
backend: <B as NetworkBackend>::new(
|
backend: <B as NetworkBackend>::new(
|
||||||
service_state.settings_reader.get_updated_settings().backend,
|
service_state.settings_reader.get_updated_settings().backend,
|
||||||
|
service_state.overwatch_handle.clone(),
|
||||||
),
|
),
|
||||||
service_state,
|
service_state,
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue