From f21f1ea10a3d3639f6cba2f3513b1575dc593167 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Tue, 1 Aug 2023 03:26:42 +0200 Subject: [PATCH] Small additions to libp2p (#278) * Add initial peer config to nomos-libp2p * Use custom message id to avoid duplicates * Expose reference to the inner swarm * move closure into function --- nomos-libp2p/Cargo.toml | 1 + nomos-libp2p/src/lib.rs | 29 +++++++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/nomos-libp2p/Cargo.toml b/nomos-libp2p/Cargo.toml index 2f962b16..f56cb46e 100644 --- a/nomos-libp2p/Cargo.toml +++ b/nomos-libp2p/Cargo.toml @@ -17,6 +17,7 @@ libp2p = { version = "0.52.1", features = [ "tokio", "secp256k1", ] } +blake2 = { version = "0.10" } serde = { version = "1.0.166", features = ["derive"] } hex = "0.4.3" log = "0.4.19" diff --git a/nomos-libp2p/src/lib.rs b/nomos-libp2p/src/lib.rs index 5bde3124..30956635 100644 --- a/nomos-libp2p/src/lib.rs +++ b/nomos-libp2p/src/lib.rs @@ -6,7 +6,9 @@ use std::time::Duration; pub use libp2p; -use libp2p::gossipsub::MessageId; +use blake2::digest::{consts::U32, Digest}; +use blake2::Blake2b; +use libp2p::gossipsub::{Message, MessageId}; pub use libp2p::{ core::upgrade, gossipsub::{self, PublishError, SubscriptionError}, @@ -38,6 +40,8 @@ pub struct SwarmConfig { // Secp256k1 private key in Hex format (`0x123...abc`). Default random #[serde(with = "secret_key_serde")] pub node_key: secp256k1::SecretKey, + // Initial peers to connect to + pub initial_peers: Vec, } impl Default for SwarmConfig { @@ -46,6 +50,7 @@ impl Default for SwarmConfig { host: std::net::Ipv4Addr::new(0, 0, 0, 0), port: 60000, node_key: secp256k1::SecretKey::generate(), + initial_peers: Vec::new(), } } } @@ -85,6 +90,7 @@ impl Swarm { gossipsub::MessageAuthenticity::Author(local_peer_id), gossipsub::ConfigBuilder::default() .validation_mode(gossipsub::ValidationMode::None) + .message_id_fn(compute_message_id) .build()?, )?; @@ -97,6 +103,10 @@ impl Swarm { swarm.listen_on(multiaddr!(Ip4(config.host), Tcp(config.port)))?; + for peer in &config.initial_peers { + swarm.dial(peer.clone())?; + } + Ok(Swarm { swarm }) } @@ -117,7 +127,11 @@ impl Swarm { .subscribe(&gossipsub::IdentTopic::new(topic)) } - pub fn broadcast(&mut self, topic: &str, message: Vec) -> Result { + pub fn broadcast( + &mut self, + topic: &str, + message: impl Into>, + ) -> Result { self.swarm .behaviour_mut() .gossipsub @@ -133,6 +147,11 @@ impl Swarm { .gossipsub .unsubscribe(&gossipsub::IdentTopic::new(topic)) } + + /// Returns a reference to the underlying [`libp2p::Swarm`] + pub fn swarm(&self) -> &libp2p::Swarm { + &self.swarm + } } impl futures::Stream for Swarm { @@ -143,6 +162,12 @@ impl futures::Stream for Swarm { } } +fn compute_message_id(message: &Message) -> MessageId { + let mut hasher = Blake2b::::new(); + hasher.update(&message.data); + MessageId::from(hasher.finalize().to_vec()) +} + mod secret_key_serde { use libp2p::identity::secp256k1; use serde::de::Error;