diff --git a/Cargo.toml b/Cargo.toml index 406e04d7..d28c174b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "nomos-core", + "nomos-libp2p", "nomos-services/log", "nomos-services/metrics", "nomos-services/network", diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index dcd49ed2..cc1bef0b 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -18,15 +18,23 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "ma tracing = "0.1" multiaddr = "0.17" nomos-core = { path = "../../nomos-core" } -nomos-network = { path = "../../nomos-services/network", features = ["waku"] } +nomos-network = { path = "../../nomos-services/network", features = [ + "waku", + "libp2p", +] } nomos-log = { path = "../../nomos-services/log" } -nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mock"] } +nomos-mempool = { path = "../../nomos-services/mempool", features = [ + "waku", + "mock", +] } nomos-http = { path = "../../nomos-services/http", features = ["http"] } -nomos-consensus = { path = "../../nomos-services/consensus", features = ["waku"] } +nomos-consensus = { path = "../../nomos-services/consensus", features = [ + "waku", +] } metrics = { path = "../../nomos-services/metrics", optional = true } tracing-subscriber = "0.3" consensus-engine = { path = "../../consensus-engine" } -tokio = {version = "1.24", features = ["sync"] } +tokio = { version = "1.24", features = ["sync"] } serde_json = "1.0" serde_yaml = "0.9" color-eyre = "0.6.0" diff --git a/nomos-libp2p/Cargo.toml b/nomos-libp2p/Cargo.toml new file mode 100644 index 00000000..2f962b16 --- /dev/null +++ b/nomos-libp2p/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "nomos-libp2p" +version = "0.1.0" +edition = "2021" + +[dependencies] +multiaddr = "0.18" +tokio = { version = "1", features = ["sync", "macros"] } +futures = "0.3" +libp2p = { version = "0.52.1", features = [ + "yamux", + "plaintext", + "macros", + "gossipsub", + "identify", + "tcp", + "tokio", + "secp256k1", +] } +serde = { version = "1.0.166", features = ["derive"] } +hex = "0.4.3" +log = "0.4.19" +thiserror = "1.0.40" +tracing = "0.1" + +[dev-dependencies] +env_logger = "0.10.0" +serde_json = "1.0.99" +tokio = { version = "1", features = ["time"] } diff --git a/nomos-libp2p/src/lib.rs b/nomos-libp2p/src/lib.rs new file mode 100644 index 00000000..5bde3124 --- /dev/null +++ b/nomos-libp2p/src/lib.rs @@ -0,0 +1,186 @@ +use std::error::Error; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use std::time::Duration; + +pub use libp2p; + +use libp2p::gossipsub::MessageId; +pub use libp2p::{ + core::upgrade, + gossipsub::{self, PublishError, SubscriptionError}, + identity::{self, secp256k1}, + plaintext::PlainText2Config, + swarm::{DialError, NetworkBehaviour, SwarmBuilder, SwarmEvent, THandlerErr}, + tcp, yamux, PeerId, Transport, +}; +pub use multiaddr::{multiaddr, Multiaddr, Protocol}; +use serde::{Deserialize, Serialize}; + +/// Wraps [`libp2p::Swarm`], and config it for use within Nomos. +pub struct Swarm { + // A core libp2p swarm + swarm: libp2p::Swarm, +} + +#[derive(NetworkBehaviour)] +pub struct Behaviour { + gossipsub: gossipsub::Behaviour, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SwarmConfig { + // Listening IPv4 address + pub host: std::net::Ipv4Addr, + // TCP listening port. Use 0 for random + pub port: u16, + // Secp256k1 private key in Hex format (`0x123...abc`). Default random + #[serde(with = "secret_key_serde")] + pub node_key: secp256k1::SecretKey, +} + +impl Default for SwarmConfig { + fn default() -> Self { + Self { + host: std::net::Ipv4Addr::new(0, 0, 0, 0), + port: 60000, + node_key: secp256k1::SecretKey::generate(), + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum SwarmError { + #[error("duplicate dialing")] + DuplicateDialing, +} + +/// A timeout for the setup and protocol upgrade process for all in/outbound connections +const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20); + +impl Swarm { + /// Builds a [`Swarm`] configured for use with Nomos on top of a tokio executor. + // + // TODO: define error types + pub fn build(config: &SwarmConfig) -> Result> { + let id_keys = identity::Keypair::from(secp256k1::Keypair::from(config.node_key.clone())); + let local_peer_id = PeerId::from(id_keys.public()); + log::info!("libp2p peer_id:{}", local_peer_id); + + // TODO: consider using noise authentication + let tcp_transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true)) + .upgrade(upgrade::Version::V1Lazy) + .authenticate(PlainText2Config { + local_public_key: id_keys.public(), + }) + .multiplex(yamux::Config::default()) + .timeout(TRANSPORT_TIMEOUT) + .boxed(); + + // TODO: consider using Signed or Anonymous. + // For Anonymous, a custom `message_id` function need to be set + // to prevent all messages from a peer being filtered as duplicates. + let gossipsub = gossipsub::Behaviour::new( + gossipsub::MessageAuthenticity::Author(local_peer_id), + gossipsub::ConfigBuilder::default() + .validation_mode(gossipsub::ValidationMode::None) + .build()?, + )?; + + let mut swarm = SwarmBuilder::with_tokio_executor( + tcp_transport, + Behaviour { gossipsub }, + local_peer_id, + ) + .build(); + + swarm.listen_on(multiaddr!(Ip4(config.host), Tcp(config.port)))?; + + Ok(Swarm { swarm }) + } + + /// Initiates a connection attempt to a peer + pub fn connect(&mut self, peer_id: PeerId, peer_addr: Multiaddr) -> Result<(), DialError> { + tracing::debug!("attempting to dial {peer_id}"); + self.swarm.dial(peer_addr.with(Protocol::P2p(peer_id)))?; + Ok(()) + } + + /// Subscribes to a topic + /// + /// Returns true if the topic is newly subscribed or false if already subscribed. + pub fn subscribe(&mut self, topic: &str) -> Result { + self.swarm + .behaviour_mut() + .gossipsub + .subscribe(&gossipsub::IdentTopic::new(topic)) + } + + pub fn broadcast(&mut self, topic: &str, message: Vec) -> Result { + self.swarm + .behaviour_mut() + .gossipsub + .publish(gossipsub::IdentTopic::new(topic), message) + } + + /// Unsubscribes from a topic + /// + /// Returns true if previously subscribed + pub fn unsubscribe(&mut self, topic: &str) -> Result { + self.swarm + .behaviour_mut() + .gossipsub + .unsubscribe(&gossipsub::IdentTopic::new(topic)) + } +} + +impl futures::Stream for Swarm { + type Item = SwarmEvent>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.swarm).poll_next(cx) + } +} + +mod secret_key_serde { + use libp2p::identity::secp256k1; + use serde::de::Error; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize(key: &secp256k1::SecretKey, serializer: S) -> Result + where + S: Serializer, + { + let hex_str = hex::encode(key.to_bytes()); + hex_str.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let hex_str = String::deserialize(deserializer)?; + let mut key_bytes = hex::decode(hex_str).map_err(|e| D::Error::custom(format!("{e}")))?; + secp256k1::SecretKey::try_from_bytes(key_bytes.as_mut_slice()) + .map_err(|e| D::Error::custom(format!("{e}"))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn config_serde() { + let config: SwarmConfig = Default::default(); + + let serialized = serde_json::to_string(&config).unwrap(); + println!("{serialized}"); + + let deserialized: SwarmConfig = serde_json::from_str(serialized.as_str()).unwrap(); + assert_eq!(deserialized.host, config.host); + assert_eq!(deserialized.port, config.port); + assert_eq!(deserialized.node_key.to_bytes(), config.node_key.to_bytes()); + } +} diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index 98a762cf..9c1b59b8 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -25,6 +25,7 @@ tracing-subscriber = { version = "0.3", features = ["json"] } tracing-gelf = "0.7" futures = "0.3" parking_lot = "0.12" +nomos-libp2p = { path = "../../nomos-libp2p", optional = true } [dev-dependencies] tokio = { version = "1", features = ["full"] } @@ -32,4 +33,5 @@ tokio = { version = "1", features = ["full"] } [features] default = [] waku = ["waku-bindings"] +libp2p = ["nomos-libp2p"] mock = ["rand", "chrono"] diff --git a/nomos-services/network/src/backends/libp2p.rs b/nomos-services/network/src/backends/libp2p.rs new file mode 100644 index 00000000..6eb938c3 --- /dev/null +++ b/nomos-services/network/src/backends/libp2p.rs @@ -0,0 +1,157 @@ +use nomos_libp2p::{ + libp2p::{ + gossipsub::{self, Message}, + Multiaddr, PeerId, + }, + BehaviourEvent, Swarm, SwarmConfig, SwarmEvent, +}; +use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; +use tokio::sync::{broadcast, mpsc}; + +use super::NetworkBackend; + +macro_rules! log_error { + ($e:expr) => { + if let Err(e) = $e { + tracing::error!("error while processing {}: {e:?}", stringify!($e)); + } + }; +} + +pub struct Libp2p { + events_tx: broadcast::Sender, + commands_tx: mpsc::Sender, +} + +#[derive(Debug)] +pub enum EventKind { + Message, +} +use std::error::Error; +use tokio::sync::oneshot; + +const BUFFER_SIZE: usize = 16; + +#[derive(Debug)] +pub enum Command { + Connect(PeerId, Multiaddr), + Broadcast { topic: Topic, message: Vec }, + Subscribe(Topic), + Unsubscribe(Topic), +} + +pub type Topic = String; +pub type CommandResultSender = oneshot::Sender>>; + +/// Events emitted from [`NomosLibp2p`], which users can subscribe +#[derive(Debug, Clone)] +pub enum Event { + Message(Message), +} + +#[async_trait::async_trait] +impl NetworkBackend for Libp2p { + type Settings = SwarmConfig; + type State = NoState; + type Message = Command; + type EventKind = EventKind; + type NetworkEvent = Event; + + fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { + let (commands_tx, mut commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE); + let (events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE); + let libp2p = Self { + events_tx: events_tx.clone(), + commands_tx, + }; + overwatch_handle.runtime().spawn(async move { + use tokio_stream::StreamExt; + let mut swarm = Swarm::build(&config).unwrap(); + loop { + tokio::select! { + Some(event) = swarm.next() => { + match event { + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Message { + propagation_source: peer_id, + message_id: id, + message, + })) => { + tracing::debug!("Got message with id: {id} from peer: {peer_id}"); + log_error!(events_tx.send(Event::Message(message))); + } + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + .. + } => { + tracing::debug!("connected to peer: {peer_id} {connection_id:?}"); + } + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + cause, + .. + } => { + tracing::debug!("connection closed from peer: {peer_id} {connection_id:?} due to {cause:?}"); + } + SwarmEvent::OutgoingConnectionError { + peer_id, + connection_id, + error, + .. + } => { + tracing::debug!("failed to connect to peer: {peer_id:?} {connection_id:?} due to: {error}"); + } + _ => {} + } + } + Some(command) = commands_rx.recv() => { + match command { + Command::Connect(peer_id, peer_addr) => { + tracing::debug!("connecting to peer: {peer_id} {peer_addr}"); + log_error!(swarm.connect(peer_id, peer_addr)); + } + Command::Broadcast { topic, message } => { + match swarm.broadcast(&topic, message) { + Ok(id) => { + tracing::debug!("broadcasted message with id: {id} tp topic: {topic}"); + } + Err(e) => { + tracing::error!("failed to broadcast message to topic: {topic} {e:?}"); + } + } + } + Command::Subscribe(topic) => { + tracing::debug!("subscribing to topic: {topic}"); + log_error!(swarm.subscribe(&topic)); + } + Command::Unsubscribe(topic) => { + tracing::debug!("unsubscribing to topic: {topic}"); + log_error!(swarm.unsubscribe(&topic)); + } + }; + } + } + } + }); + libp2p + } + + async fn process(&self, msg: Self::Message) { + if let Err(e) = self.commands_tx.send(msg).await { + tracing::error!("failed to send command to nomos-libp2p: {e:?}"); + } + } + + async fn subscribe( + &mut self, + kind: Self::EventKind, + ) -> broadcast::Receiver { + match kind { + EventKind::Message => { + tracing::debug!("processed subscription to incoming messages"); + self.events_tx.subscribe() + } + } + } +} diff --git a/nomos-services/network/src/backends/mock.rs b/nomos-services/network/src/backends/mock.rs index 57bc200e..4189cb60 100644 --- a/nomos-services/network/src/backends/mock.rs +++ b/nomos-services/network/src/backends/mock.rs @@ -231,7 +231,7 @@ impl NetworkBackend for Mock { type EventKind = EventKind; type NetworkEvent = NetworkEvent; - fn new(config: Self::Settings) -> Self { + fn new(config: Self::Settings, _: OverwatchHandle) -> Self { let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0; Self { @@ -299,6 +299,8 @@ impl NetworkBackend for Mock { #[cfg(test)] mod tests { + use tokio::sync::mpsc; + use super::*; #[tokio::test] @@ -332,7 +334,10 @@ mod tests { weights: None, }; - let mock = Arc::new(Mock::new(config)); + let mock = Arc::new(Mock::new( + config, + OverwatchHandle::new(tokio::runtime::Handle::current(), mpsc::channel(1).0), + )); // run producer let task = mock.clone(); tokio::spawn(async move { diff --git a/nomos-services/network/src/backends/mod.rs b/nomos-services/network/src/backends/mod.rs index 746f280f..dd2a71fe 100644 --- a/nomos-services/network/src/backends/mod.rs +++ b/nomos-services/network/src/backends/mod.rs @@ -1,10 +1,13 @@ use super::*; -use overwatch_rs::services::state::ServiceState; +use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState}; use tokio::sync::broadcast::Receiver; #[cfg(feature = "waku")] pub mod waku; +#[cfg(feature = "libp2p")] +pub mod libp2p; + #[cfg(feature = "mock")] pub mod mock; @@ -16,7 +19,7 @@ pub trait NetworkBackend { type EventKind: Debug + Send + Sync + 'static; type NetworkEvent: Debug + Send + Sync + 'static; - fn new(config: Self::Settings) -> Self; + fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self; async fn process(&self, msg: Self::Message); async fn subscribe(&mut self, event: Self::EventKind) -> Receiver; } diff --git a/nomos-services/network/src/backends/waku.rs b/nomos-services/network/src/backends/waku.rs index aee9187b..e936670a 100644 --- a/nomos-services/network/src/backends/waku.rs +++ b/nomos-services/network/src/backends/waku.rs @@ -166,7 +166,7 @@ impl NetworkBackend for Waku { type EventKind = EventKind; type NetworkEvent = NetworkEvent; - fn new(mut config: Self::Settings) -> Self { + fn new(mut config: Self::Settings, _: OverwatchHandle) -> Self { // set store protocol to active at all times config.inner.store = Some(true); let waku = waku_new(Some(config.inner)).unwrap().start().unwrap(); diff --git a/nomos-services/network/src/lib.rs b/nomos-services/network/src/lib.rs index 01dc5a81..58b92d3e 100644 --- a/nomos-services/network/src/lib.rs +++ b/nomos-services/network/src/lib.rs @@ -76,6 +76,7 @@ where Ok(Self { backend: ::new( service_state.settings_reader.get_updated_settings().backend, + service_state.overwatch_handle.clone(), ), service_state, })