From 05778437edeca72f3ffb505ae59b4713f8b3e1ad Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:32:22 +0700 Subject: [PATCH] integrate --- nodes/nomos-node/config.yaml | 17 +++++- nodes/nomos-node/src/config.rs | 8 --- nomos-mix/core/Cargo.toml | 2 + nomos-mix/core/src/lib.rs | 1 + nomos-mix/core/src/membership.rs | 27 ++++++++ nomos-mix/core/src/message_blend/crypto.rs | 38 +++++++++--- nomos-mix/core/src/message_blend/mod.rs | 32 +++++++--- nomos-mix/message/src/lib.rs | 8 +-- nomos-mix/network/src/handler.rs | 4 +- nomos-mix/network/src/lib.rs | 6 +- .../data-availability/tests/Cargo.toml | 1 + .../data-availability/tests/src/common.rs | 61 +++++++++++-------- nomos-services/mix/Cargo.toml | 2 + nomos-services/mix/src/backends/libp2p.rs | 44 +++++-------- nomos-services/mix/src/backends/mod.rs | 9 ++- nomos-services/mix/src/lib.rs | 36 +++++++++-- testnet/cfgsync/Cargo.toml | 1 + testnet/cfgsync/src/config.rs | 32 +++------- tests/Cargo.toml | 1 + tests/src/nodes/executor.rs | 6 +- tests/src/nodes/validator.rs | 7 ++- tests/src/topology/configs/mix.rs | 26 ++++---- 22 files changed, 234 insertions(+), 135 deletions(-) create mode 100644 nomos-mix/core/src/membership.rs diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index dfe3ce51..aeb64361 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -52,10 +52,21 @@ mix: backend: listening_address: /ip4/0.0.0.0/udp/3001/quic-v1 node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3 - membership: - - /ip4/0.0.0.0/udp/3001/quic-v1/p2p/12D3KooWMj7KgmbDJ7RSXeFhFimvqddSXzA5XWDTwvdEJYfuoPhM peering_degree: 1 - num_mix_layers: 1 + persistent_transmission: + max_emission_frequency: 1 + drop_message_probability: 0.5 + message_blend: + cryptographic_processor: + private_key: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + num_mix_layers: 1 + temporal_processor: + max_delay_seconds: 5 + membership: + - address: /ip4/127.0.0.1/udp/3001/quic-v1 + public_key: [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1] + - address: /ip4/127.0.0.1/udp/3002/quic-v1 + public_key: [2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2] http: backend_settings: diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 6ce275a5..a41927ed 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -83,9 +83,6 @@ pub struct MixArgs { #[clap(long = "mix-node-key", env = "MIX_NODE_KEY")] mix_node_key: Option, - #[clap(long = "mix-membership", env = "MIX_MEMBERSHIP", num_args = 1.., value_delimiter = ',')] - pub mix_membership: Option>, - #[clap(long = "mix-peering-degree", env = "MIX_PEERING_DEGREE")] mix_peering_degree: Option, @@ -256,7 +253,6 @@ pub fn update_mix( let MixArgs { mix_addr, mix_node_key, - mix_membership, mix_peering_degree, mix_num_mix_layers, } = mix_args; @@ -270,10 +266,6 @@ pub fn update_mix( mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?; } - if let Some(membership) = mix_membership { - mix.backend.membership = membership; - } - if let Some(peering_degree) = mix_peering_degree { mix.backend.peering_degree = peering_degree; } diff --git a/nomos-mix/core/Cargo.toml b/nomos-mix/core/Cargo.toml index fe6901cd..261a69a7 100644 --- a/nomos-mix/core/Cargo.toml +++ b/nomos-mix/core/Cargo.toml @@ -13,6 +13,8 @@ serde = { version = "1.0", features = ["derive"] } nomos-mix-message = { path = "../message" } futures = "0.3" rand_chacha = "0.3" +multiaddr = "0.18" +x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [dev-dependencies] diff --git a/nomos-mix/core/src/lib.rs b/nomos-mix/core/src/lib.rs index 0326319c..5484b495 100644 --- a/nomos-mix/core/src/lib.rs +++ b/nomos-mix/core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod membership; pub mod message_blend; pub mod persistent_transmission; diff --git a/nomos-mix/core/src/membership.rs b/nomos-mix/core/src/membership.rs new file mode 100644 index 00000000..904b568a --- /dev/null +++ b/nomos-mix/core/src/membership.rs @@ -0,0 +1,27 @@ +use multiaddr::Multiaddr; +use rand::{seq::SliceRandom, Rng}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug)] +pub struct Membership { + remote_nodes: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Node { + pub address: Multiaddr, + pub public_key: [u8; 32], +} + +impl Membership { + pub fn new(mut nodes: Vec, local_public_key: &[u8; 32]) -> Self { + nodes.retain(|node| node.public_key != *local_public_key); + Self { + remote_nodes: nodes, + } + } + + pub fn choose_remote_nodes(&self, rng: &mut R, amount: usize) -> Vec<&Node> { + self.remote_nodes.choose_multiple(rng, amount).collect() + } +} diff --git a/nomos-mix/core/src/message_blend/crypto.rs b/nomos-mix/core/src/message_blend/crypto.rs index 910b28e0..9d937e40 100644 --- a/nomos-mix/core/src/message_blend/crypto.rs +++ b/nomos-mix/core/src/message_blend/crypto.rs @@ -1,33 +1,53 @@ use nomos_mix_message::{new_message, unwrap_message}; +use rand::Rng; use serde::{Deserialize, Serialize}; +use crate::membership::Membership; + /// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages /// for the message indistinguishability. -#[derive(Clone, Copy, Debug)] -pub struct CryptographicProcessor { +#[derive(Clone, Debug)] +pub struct CryptographicProcessor { settings: CryptographicProcessorSettings, + membership: Membership, + rng: R, + public_key: [u8; 32], } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct CryptographicProcessorSettings { + pub private_key: [u8; 32], pub num_mix_layers: usize, } -impl CryptographicProcessor { - pub fn new(settings: CryptographicProcessorSettings) -> Self { - Self { settings } +impl CryptographicProcessor { + pub fn new(settings: CryptographicProcessorSettings, membership: Membership, rng: R) -> Self { + let public_key = + x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(settings.private_key)) + .to_bytes(); + Self { + settings, + membership, + rng, + public_key, + } } - pub fn wrap_message(&self, message: &[u8]) -> Result, nomos_mix_message::Error> { + pub fn wrap_message(&mut self, message: &[u8]) -> Result, nomos_mix_message::Error> { // TODO: Use the actual Sphinx encoding instead of mock. - // TODO: Select `num_mix_layers` random nodes from the membership. - new_message(message, self.settings.num_mix_layers.try_into().unwrap()) + let node_ids = self + .membership + .choose_remote_nodes(&mut self.rng, self.settings.num_mix_layers) + .iter() + .map(|node| node.public_key) + .collect::>(); + new_message(message, &node_ids) } pub fn unwrap_message( &self, message: &[u8], ) -> Result<(Vec, bool), nomos_mix_message::Error> { - unwrap_message(message) + unwrap_message(message, &self.public_key) } } diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-mix/core/src/message_blend/mod.rs index 180a583f..d82655aa 100644 --- a/nomos-mix/core/src/message_blend/mod.rs +++ b/nomos-mix/core/src/message_blend/mod.rs @@ -4,10 +4,12 @@ pub mod temporal; pub use crypto::CryptographicProcessorSettings; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; +use rand::Rng; use std::pin::Pin; use std::task::{Context, Poll}; pub use temporal::TemporalProcessorSettings; +use crate::membership::Membership; use crate::message_blend::crypto::CryptographicProcessor; use crate::message_blend::temporal::TemporalProcessorExt; use crate::MixOutgoingMessage; @@ -25,19 +27,26 @@ pub struct MessageBlendSettings { /// [`MessageBlendStream`] handles the entire mixing tiers process /// - Unwraps incoming messages received from network using [`CryptographicProcessor`] /// - Pushes unwrapped messages to [`TemporalProcessor`] -pub struct MessageBlendStream { +pub struct MessageBlendStream { input_stream: S, output_stream: BoxStream<'static, MixOutgoingMessage>, temporal_sender: UnboundedSender, - cryptographic_processor: CryptographicProcessor, + cryptographic_processor: CryptographicProcessor, } -impl MessageBlendStream +impl MessageBlendStream where S: Stream>, + R: Rng, { - pub fn new(input_stream: S, settings: MessageBlendSettings) -> Self { - let cryptographic_processor = CryptographicProcessor::new(settings.cryptographic_processor); + pub fn new( + input_stream: S, + settings: MessageBlendSettings, + membership: Membership, + rng: R, + ) -> Self { + let cryptographic_processor = + CryptographicProcessor::new(settings.cryptographic_processor, membership, rng); let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel(); let output_stream = UnboundedReceiverStream::new(temporal_receiver) .temporal_stream(settings.temporal_processor) @@ -72,9 +81,10 @@ where } } -impl Stream for MessageBlendStream +impl Stream for MessageBlendStream where S: Stream> + Unpin, + R: Rng + Unpin, { type Item = MixOutgoingMessage; @@ -87,11 +97,17 @@ where } pub trait MessageBlendExt: Stream> { - fn blend(self, message_blend_settings: MessageBlendSettings) -> MessageBlendStream + fn blend( + self, + message_blend_settings: MessageBlendSettings, + membership: Membership, + rng: R, + ) -> MessageBlendStream where Self: Sized + Unpin, + R: Rng, { - MessageBlendStream::new(self, message_blend_settings) + MessageBlendStream::new(self, message_blend_settings, membership, rng) } } diff --git a/nomos-mix/message/src/lib.rs b/nomos-mix/message/src/lib.rs index 4bb674e2..96250fd7 100644 --- a/nomos-mix/message/src/lib.rs +++ b/nomos-mix/message/src/lib.rs @@ -13,11 +13,11 @@ pub type NodeId = [u8; NODE_ID_SIZE]; const NODE_ID_SIZE: usize = 32; const DUMMY_NODE_ID: NodeId = [0; NODE_ID_SIZE]; -const MAX_PADDED_PAYLOAD_SIZE: usize = 2048; +const PADDED_PAYLOAD_SIZE: usize = 2048; const PAYLOAD_PADDING_SEPARATOR: u8 = 0x01; const PAYLOAD_PADDING_SEPARATOR_SIZE: usize = 1; const MAX_LAYERS: usize = 5; -const MESSAGE_SIZE: usize = NODE_ID_SIZE * MAX_LAYERS + MAX_PADDED_PAYLOAD_SIZE; +pub const MESSAGE_SIZE: usize = NODE_ID_SIZE * MAX_LAYERS + PADDED_PAYLOAD_SIZE; pub const DROP_MESSAGE: [u8; MESSAGE_SIZE] = [0; MESSAGE_SIZE]; /// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes. @@ -27,7 +27,7 @@ pub fn new_message(payload: &[u8], node_ids: &[NodeId]) -> Result, Error if node_ids.is_empty() || node_ids.len() > MAX_LAYERS { return Err(Error::InvalidNumberOfLayers); } - if payload.len() > MAX_PADDED_PAYLOAD_SIZE - PAYLOAD_PADDING_SEPARATOR_SIZE { + if payload.len() > PADDED_PAYLOAD_SIZE - PAYLOAD_PADDING_SEPARATOR_SIZE { return Err(Error::PayloadTooLarge); } @@ -44,7 +44,7 @@ pub fn new_message(payload: &[u8], node_ids: &[NodeId]) -> Result, Error message.push(PAYLOAD_PADDING_SEPARATOR); message.extend( std::iter::repeat(0) - .take(MAX_PADDED_PAYLOAD_SIZE - payload.len() - PAYLOAD_PADDING_SEPARATOR_SIZE), + .take(PADDED_PAYLOAD_SIZE - payload.len() - PAYLOAD_PADDING_SEPARATOR_SIZE), ); Ok(message) } diff --git a/nomos-mix/network/src/handler.rs b/nomos-mix/network/src/handler.rs index 9740947a..a6086e49 100644 --- a/nomos-mix/network/src/handler.rs +++ b/nomos-mix/network/src/handler.rs @@ -13,7 +13,7 @@ use libp2p::{ }, Stream, StreamProtocol, }; -use nomos_mix_message::MAX_PADDED_PAYLOAD_SIZE; +use nomos_mix_message::MESSAGE_SIZE; use crate::behaviour::Config; @@ -256,7 +256,7 @@ async fn send_msg(mut stream: Stream, msg: Vec) -> io::Result { /// Read a fixed-length message from the stream // TODO: Consider handling variable-length messages async fn recv_msg(mut stream: Stream) -> io::Result<(Stream, Vec)> { - let mut buf = vec![0; MAX_PADDED_PAYLOAD_SIZE]; + let mut buf = vec![0; MESSAGE_SIZE]; stream.read_exact(&mut buf).await?; Ok((stream, buf)) } diff --git a/nomos-mix/network/src/lib.rs b/nomos-mix/network/src/lib.rs index 16c2ed5b..e61330e1 100644 --- a/nomos-mix/network/src/lib.rs +++ b/nomos-mix/network/src/lib.rs @@ -14,7 +14,7 @@ mod test { swarm::{dummy, NetworkBehaviour, SwarmEvent}, Multiaddr, PeerId, Swarm, SwarmBuilder, }; - use nomos_mix_message::MAX_PADDED_PAYLOAD_SIZE; + use nomos_mix_message::MESSAGE_SIZE; use tokio::select; use crate::{behaviour::Config, error::Error, Behaviour, Event}; @@ -43,7 +43,7 @@ mod test { // Swamr2 publishes a message. let task = async { - let msg = vec![1; MAX_PADDED_PAYLOAD_SIZE]; + let msg = vec![1; MESSAGE_SIZE]; let mut msg_published = false; let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); loop { @@ -98,7 +98,7 @@ mod test { // Expect all publish attempts to fail with [`Error::NoPeers`] // because swarm2 doesn't have any peers that support the mix protocol. - let msg = vec![1; MAX_PADDED_PAYLOAD_SIZE]; + let msg = vec![1; MESSAGE_SIZE]; let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); let mut publish_try_count = 0; loop { diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 600cac8d..aec0f82d 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -41,6 +41,7 @@ time = "0.3" [dev-dependencies] blake2 = { version = "0.10" } +x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [features] default = ["libp2p"] diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index edc406aa..3aaeb578 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -1,6 +1,7 @@ use cryptarchia_consensus::LeaderConfig; // std use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; +use nomos_mix::membership::Node; use nomos_mix::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, }; @@ -187,13 +188,19 @@ pub struct TestDaNetworkSettings { pub node_key: ed25519::SecretKey, } +pub struct TestMixSettings { + pub backend: Libp2pMixBackendSettings, + pub private_key: x25519_dalek::StaticSecret, + pub membership: Vec, +} + pub fn new_node( leader_config: &LeaderConfig, ledger_config: &nomos_ledger::Config, genesis_state: &LedgerState, time_config: &TimeConfig, swarm_config: &SwarmConfig, - mix_config: &Libp2pMixBackendSettings, + mix_config: &TestMixSettings, db_path: PathBuf, blobs_dir: &PathBuf, initial_peers: Vec, @@ -210,14 +217,18 @@ pub fn new_node( }, }, mix: MixConfig { - backend: mix_config.clone(), + backend: mix_config.backend.clone(), persistent_transmission: Default::default(), message_blend: MessageBlendSettings { - cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + cryptographic_processor: CryptographicProcessorSettings { + private_key: mix_config.private_key.to_bytes(), + num_mix_layers: 1, + }, temporal_processor: TemporalProcessorSettings { max_delay_seconds: 2, }, }, + membership: mix_config.membership.clone(), }, da_network: DaNetworkConfig { backend: DaNetworkBackendSettings { @@ -308,35 +319,37 @@ pub fn new_node( .unwrap() } -pub fn new_mix_configs(listening_addresses: Vec) -> Vec { - let mut configs = listening_addresses +pub fn new_mix_configs(listening_addresses: Vec) -> Vec { + let settings = listening_addresses .iter() - .map(|listening_address| Libp2pMixBackendSettings { - listening_address: listening_address.clone(), - node_key: ed25519::SecretKey::generate(), - membership: Vec::new(), - peering_degree: 1, + .map(|listening_address| { + ( + Libp2pMixBackendSettings { + listening_address: listening_address.clone(), + node_key: ed25519::SecretKey::generate(), + peering_degree: 1, + }, + x25519_dalek::StaticSecret::random(), + ) }) .collect::>(); - let membership = configs + let membership = settings .iter() - .map(|c| { - let peer_id = PeerId::from_public_key( - &ed25519::Keypair::from(c.node_key.clone()).public().into(), - ); - c.listening_address - .clone() - .with_p2p(peer_id) - .unwrap_or_else(|orig_addr| orig_addr) + .map(|(backend, private_key)| Node { + address: backend.listening_address.clone(), + public_key: x25519_dalek::PublicKey::from(private_key).to_bytes(), }) .collect::>(); - configs - .iter_mut() - .for_each(|c| c.membership = membership.clone()); - - configs + settings + .into_iter() + .map(|(backend, private_key)| TestMixSettings { + backend, + private_key, + membership: membership.clone(), + }) + .collect() } // Client node is only created for asyncroniously interact with nodes in the test. diff --git a/nomos-services/mix/Cargo.toml b/nomos-services/mix/Cargo.toml index 79b475eb..43371670 100644 --- a/nomos-services/mix/Cargo.toml +++ b/nomos-services/mix/Cargo.toml @@ -19,6 +19,8 @@ serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["macros", "sync"] } tokio-stream = "0.1" tracing = "0.1" +rand_chacha = "0.3.1" +x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [features] default = [] diff --git a/nomos-services/mix/src/backends/libp2p.rs b/nomos-services/mix/src/backends/libp2p.rs index 7a891d0b..11e184e7 100644 --- a/nomos-services/mix/src/backends/libp2p.rs +++ b/nomos-services/mix/src/backends/libp2p.rs @@ -6,11 +6,12 @@ use libp2p::{ core::transport::ListenerId, identity::{ed25519, Keypair}, swarm::SwarmEvent, - Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, + Multiaddr, Swarm, SwarmBuilder, TransportError, }; -use nomos_libp2p::{secret_key_serde, DialError, DialOpts, Protocol}; +use nomos_libp2p::{secret_key_serde, DialError, DialOpts}; +use nomos_mix::membership::Membership; use overwatch_rs::overwatch::handle::OverwatchHandle; -use rand::seq::IteratorRandom; +use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::{ sync::{broadcast, mpsc}, @@ -34,7 +35,6 @@ pub struct Libp2pMixBackendSettings { // A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC) #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] pub node_key: ed25519::SecretKey, - pub membership: Vec, pub peering_degree: usize, } @@ -44,12 +44,16 @@ const CHANNEL_SIZE: usize = 64; impl MixBackend for Libp2pMixBackend { type Settings = Libp2pMixBackendSettings; - fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { + fn new( + config: Self::Settings, + overwatch_handle: OverwatchHandle, + membership: Membership, + mut rng: R, + ) -> Self { let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE); let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE); let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone())); - let local_peer_id = keypair.public().to_peer_id(); let mut swarm = MixSwarm::new( keypair, swarm_message_receiver, @@ -63,20 +67,12 @@ impl MixBackend for Libp2pMixBackend { }); // Randomly select peering_degree number of peers, and dial to them - // TODO: Consider moving the peer seelction to the nomos_mix_network::Behaviour - config - .membership + membership + .choose_remote_nodes(&mut rng, config.peering_degree) .iter() - .filter(|addr| match extract_peer_id(addr) { - Some(peer_id) => peer_id != local_peer_id, - None => false, - }) - .choose_multiple(&mut rand::thread_rng(), config.peering_degree) - .iter() - .cloned() - .for_each(|addr| { - if let Err(e) = swarm.dial(addr.clone()) { - tracing::error!("failed to dial to {:?}: {:?}", addr, e); + .for_each(|node| { + if let Err(e) = swarm.dial(node.address.clone()) { + tracing::error!("failed to dial to {:?}: {:?}", node.address, e); } }); @@ -193,13 +189,3 @@ impl MixSwarm { } } } - -fn extract_peer_id(multiaddr: &Multiaddr) -> Option { - multiaddr.iter().find_map(|protocol| { - if let Protocol::P2p(peer_id) = protocol { - Some(peer_id) - } else { - None - } - }) -} diff --git a/nomos-services/mix/src/backends/mod.rs b/nomos-services/mix/src/backends/mod.rs index f983b2f9..61494344 100644 --- a/nomos-services/mix/src/backends/mod.rs +++ b/nomos-services/mix/src/backends/mod.rs @@ -4,14 +4,21 @@ pub mod libp2p; use std::{fmt::Debug, pin::Pin}; use futures::Stream; +use nomos_mix::membership::Membership; use overwatch_rs::overwatch::handle::OverwatchHandle; +use rand::Rng; /// A trait for mix backends that send messages to the mix network. #[async_trait::async_trait] pub trait MixBackend { type Settings: Clone + Debug + Send + Sync + 'static; - fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self; + fn new( + config: Self::Settings, + overwatch_handle: OverwatchHandle, + membership: Membership, + rng: R, + ) -> Self; /// Publish a message to the mix network. async fn publish(&self, msg: Vec); /// Listen to messages received from the mix network. diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs index 8a2e3777..e5b151e3 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/mix/src/lib.rs @@ -6,6 +6,7 @@ use backends::MixBackend; use futures::StreamExt; use network::NetworkAdapter; use nomos_core::wire; +use nomos_mix::membership::{Membership, Node}; use nomos_mix::message_blend::crypto::CryptographicProcessor; use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings}; use nomos_mix::persistent_transmission::{ @@ -20,6 +21,8 @@ use overwatch_rs::services::{ state::{NoOperator, NoState}, ServiceCore, ServiceData, ServiceId, }; +use rand::SeedableRng; +use rand_chacha::ChaCha12Rng; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::fmt::Debug; use tokio::sync::mpsc; @@ -40,6 +43,7 @@ where backend: Backend, service_state: ServiceStateHandle, network_relay: Relay>, + membership: Membership, } impl ServiceData for MixService @@ -67,13 +71,18 @@ where { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); + let mix_config = service_state.settings_reader.get_updated_settings(); + let membership = mix_config.membership(); Ok(Self { backend: ::new( service_state.settings_reader.get_updated_settings().backend, service_state.overwatch_handle.clone(), + membership.clone(), + ChaCha12Rng::from_entropy(), ), service_state, network_relay, + membership, }) } @@ -82,10 +91,14 @@ where service_state, mut backend, network_relay, + membership, } = self; let mix_config = service_state.settings_reader.get_updated_settings(); - let cryptographic_processor = - CryptographicProcessor::new(mix_config.message_blend.cryptographic_processor); + let mut cryptographic_processor = CryptographicProcessor::new( + mix_config.message_blend.cryptographic_processor, + membership.clone(), + ChaCha12Rng::from_entropy(), + ); let network_relay = network_relay.connect().await?; let network_adapter = Network::new(network_relay); @@ -96,9 +109,11 @@ where .persistent_transmission(mix_config.persistent_transmission); // tier 2 blend - let mut blend_messages = backend - .listen_to_incoming_messages() - .blend(mix_config.message_blend); + let mut blend_messages = backend.listen_to_incoming_messages().blend( + mix_config.message_blend, + membership.clone(), + ChaCha12Rng::from_entropy(), + ); // local messages, are bypassed and send immediately let mut local_messages = service_state @@ -188,6 +203,17 @@ pub struct MixConfig { pub backend: BackendSettings, pub message_blend: MessageBlendSettings, pub persistent_transmission: PersistentTransmissionSettings, + pub membership: Vec, +} + +impl MixConfig { + fn membership(&self) -> Membership { + let public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from( + self.message_blend.cryptographic_processor.private_key, + )) + .to_bytes(); + Membership::new(self.membership.clone(), &public_key) + } } /// A message that is handled by [`MixService`]. diff --git a/testnet/cfgsync/Cargo.toml b/testnet/cfgsync/Cargo.toml index 4229e187..d63ee69c 100644 --- a/testnet/cfgsync/Cargo.toml +++ b/testnet/cfgsync/Cargo.toml @@ -9,6 +9,7 @@ clap = { version = "4", features = ["derive"] } nomos-executor = { path = "../../nodes/nomos-executor" } nomos-libp2p = { path = "../../nomos-libp2p" } nomos-node = { path = "../../nodes/nomos-node" } +nomos-mix = { path = "../../nomos-mix/core" } nomos-tracing = { path = "../../nomos-tracing" } nomos-tracing-service = { path = "../../nomos-services/tracing" } rand = "0.8" diff --git a/testnet/cfgsync/src/config.rs b/testnet/cfgsync/src/config.rs index 776b57a6..49a6e8f9 100644 --- a/testnet/cfgsync/src/config.rs +++ b/testnet/cfgsync/src/config.rs @@ -1,7 +1,8 @@ // std use std::{collections::HashMap, net::Ipv4Addr, str::FromStr}; // crates -use nomos_libp2p::{Multiaddr, PeerId, Protocol}; +use nomos_libp2p::{Multiaddr, PeerId}; +use nomos_mix::membership::Node; use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig}; use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingSettings}; use rand::{thread_rng, Rng}; @@ -91,7 +92,7 @@ pub fn create_node_configs( let host_network_init_peers = update_network_init_peers(hosts.clone()); let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses); let host_mix_membership = - update_mix_membership(hosts.clone(), mix_configs[0].backend.membership.clone()); + update_mix_membership(hosts.clone(), mix_configs[0].membership.clone()); let new_peer_addresses: HashMap = host_da_peer_addresses .clone() @@ -122,7 +123,7 @@ pub fn create_node_configs( let mut mix_config = mix_configs[i].to_owned(); mix_config.backend.listening_address = Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap(); - mix_config.backend.membership = host_mix_membership.clone(); + mix_config.membership = host_mix_membership.clone(); // Tracing config. let tracing_config = @@ -170,32 +171,19 @@ fn update_da_peer_addresses( .collect() } -fn update_mix_membership(hosts: Vec, membership: Vec) -> Vec { +fn update_mix_membership(hosts: Vec, membership: Vec) -> Vec { membership .into_iter() .zip(hosts) - .map(|(addr, host)| { - Multiaddr::from_str(&format!( - "/ip4/{}/udp/{}/quic-v1/p2p/{}", - host.ip, - host.mix_port, - extract_peer_id(&addr).unwrap(), - )) - .unwrap() + .map(|(mut node, host)| { + node.address = + Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.mix_port)) + .unwrap(); + node }) .collect() } -fn extract_peer_id(multiaddr: &Multiaddr) -> Option { - multiaddr.iter().find_map(|protocol| { - if let Protocol::P2p(peer_id) = protocol { - Some(peer_id) - } else { - None - } - }) -} - fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> GeneralTracingConfig { GeneralTracingConfig { tracing_settings: TracingSettings { diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 77297970..a324a6f1 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -50,6 +50,7 @@ criterion = { version = "0.5", features = ["async_tokio"] } nomos-cli = { path = "../nomos-cli" } time = "0.3" tracing = "0.1" +x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [[test]] name = "test_cryptarchia_happy_path" diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs index de525c1c..b6f1c66d 100644 --- a/tests/src/nodes/executor.rs +++ b/tests/src/nodes/executor.rs @@ -158,11 +158,15 @@ pub fn create_executor_config(config: GeneralConfig) -> Config { backend: config.mix_config.backend, persistent_transmission: Default::default(), message_blend: MessageBlendSettings { - cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + cryptographic_processor: CryptographicProcessorSettings { + private_key: config.mix_config.private_key.to_bytes(), + num_mix_layers: 1, + }, temporal_processor: TemporalProcessorSettings { max_delay_seconds: 2, }, }, + membership: config.mix_config.membership, }, cryptarchia: CryptarchiaSettings { leader_config: config.consensus_config.leader_config, diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs index 1a894f81..9e816196 100644 --- a/tests/src/nodes/validator.rs +++ b/tests/src/nodes/validator.rs @@ -243,13 +243,16 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { mix: nomos_mix_service::MixConfig { backend: config.mix_config.backend, persistent_transmission: Default::default(), - message_blend: MessageBlendSettings { - cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + cryptographic_processor: CryptographicProcessorSettings { + private_key: config.mix_config.private_key.to_bytes(), + num_mix_layers: 1, + }, temporal_processor: TemporalProcessorSettings { max_delay_seconds: 2, }, }, + membership: config.mix_config.membership, }, cryptarchia: CryptarchiaSettings { leader_config: config.consensus_config.leader_config, diff --git a/tests/src/topology/configs/mix.rs b/tests/src/topology/configs/mix.rs index 318f2e85..9fabed41 100644 --- a/tests/src/topology/configs/mix.rs +++ b/tests/src/topology/configs/mix.rs @@ -1,13 +1,16 @@ use std::str::FromStr; use nomos_libp2p::{ed25519, Multiaddr}; +use nomos_mix::membership::Node; use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings; -use crate::{get_available_port, secret_key_to_peer_id}; +use crate::get_available_port; #[derive(Clone)] pub struct GeneralMixConfig { pub backend: Libp2pMixBackendSettings, + pub private_key: x25519_dalek::StaticSecret, + pub membership: Vec, } pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { @@ -26,33 +29,28 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { )) .unwrap(), node_key, - membership: Vec::new(), peering_degree: 1, }, + private_key: x25519_dalek::StaticSecret::random(), + membership: Vec::new(), } }) .collect(); - let membership = mix_membership(&configs); - + let nodes = mix_nodes(&configs); configs.iter_mut().for_each(|config| { - config.backend.membership = membership.clone(); + config.membership = nodes.clone(); }); configs } -fn mix_membership(configs: &[GeneralMixConfig]) -> Vec { +fn mix_nodes(configs: &[GeneralMixConfig]) -> Vec { configs .iter() - .map(|config| { - let peer_id = secret_key_to_peer_id(config.backend.node_key.clone()); - config - .backend - .listening_address - .clone() - .with_p2p(peer_id) - .unwrap_or_else(|orig_addr| orig_addr) + .map(|config| Node { + address: config.backend.listening_address.clone(), + public_key: x25519_dalek::PublicKey::from(&config.private_key).to_bytes(), }) .collect() }