From c84a29db317d8ec4b88fefb7f4bc89dcb5668b53 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:49:38 +0900 Subject: [PATCH] Use public keys for the mock mix message encoding (#905) * add node IDs in message * integrate * handle variable-lenght messages in nomos-mix-network * add trait MixMessage * make tests work * clippy happy * remove println! * remove unnecssary dependency * remove unnecessary trait bounds * remove leftover --- nodes/nomos-node/config.yaml | 17 ++- nodes/nomos-node/src/config.rs | 8 -- nomos-mix/core/Cargo.toml | 3 +- nomos-mix/core/src/lib.rs | 1 + nomos-mix/core/src/membership.rs | 39 ++++++ nomos-mix/core/src/message_blend/crypto.rs | 52 ++++++-- nomos-mix/core/src/message_blend/mod.rs | 70 +++++++--- nomos-mix/core/src/persistent_transmission.rs | 44 +++--- nomos-mix/message/Cargo.toml | 1 - nomos-mix/message/src/error.rs | 4 +- nomos-mix/message/src/lib.rs | 75 +++-------- nomos-mix/message/src/mock/mod.rs | 125 ++++++++++++++++++ nomos-mix/message/src/packet.rs | 2 +- nomos-mix/network/Cargo.toml | 1 + nomos-mix/network/src/behaviour.rs | 47 ++++--- nomos-mix/network/src/handler.rs | 22 ++- nomos-mix/network/src/lib.rs | 8 +- .../data-availability/tests/Cargo.toml | 2 + .../data-availability/tests/src/common.rs | 63 +++++---- nomos-services/mix/Cargo.toml | 1 + nomos-services/mix/src/backends/libp2p.rs | 50 +++---- nomos-services/mix/src/backends/mod.rs | 12 +- nomos-services/mix/src/lib.rs | 54 ++++++-- testnet/cfgsync/Cargo.toml | 2 + testnet/cfgsync/src/config.rs | 36 ++--- tests/Cargo.toml | 2 + tests/src/nodes/executor.rs | 6 +- tests/src/nodes/validator.rs | 7 +- tests/src/topology/configs/mix.rs | 27 ++-- 29 files changed, 530 insertions(+), 251 deletions(-) create mode 100644 nomos-mix/core/src/membership.rs create mode 100644 nomos-mix/message/src/mock/mod.rs diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index dfe3ce51..aeb64361 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -52,10 +52,21 @@ mix: backend: listening_address: /ip4/0.0.0.0/udp/3001/quic-v1 node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3 - membership: - - /ip4/0.0.0.0/udp/3001/quic-v1/p2p/12D3KooWMj7KgmbDJ7RSXeFhFimvqddSXzA5XWDTwvdEJYfuoPhM peering_degree: 1 - num_mix_layers: 1 + persistent_transmission: + max_emission_frequency: 1 + drop_message_probability: 0.5 + message_blend: + cryptographic_processor: + private_key: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + num_mix_layers: 1 + temporal_processor: + max_delay_seconds: 5 + membership: + - address: /ip4/127.0.0.1/udp/3001/quic-v1 + public_key: [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1] + - address: /ip4/127.0.0.1/udp/3002/quic-v1 + public_key: [2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2] http: backend_settings: diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 6ce275a5..a41927ed 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -83,9 +83,6 @@ pub struct MixArgs { #[clap(long = "mix-node-key", env = "MIX_NODE_KEY")] mix_node_key: Option, - #[clap(long = "mix-membership", env = "MIX_MEMBERSHIP", num_args = 1.., value_delimiter = ',')] - pub mix_membership: Option>, - #[clap(long = "mix-peering-degree", env = "MIX_PEERING_DEGREE")] mix_peering_degree: Option, @@ -256,7 +253,6 @@ pub fn update_mix( let MixArgs { mix_addr, mix_node_key, - mix_membership, mix_peering_degree, mix_num_mix_layers, } = mix_args; @@ -270,10 +266,6 @@ pub fn update_mix( mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?; } - if let Some(membership) = mix_membership { - mix.backend.membership = membership; - } - if let Some(peering_degree) = mix_peering_degree { mix.backend.peering_degree = peering_degree; } diff --git a/nomos-mix/core/Cargo.toml b/nomos-mix/core/Cargo.toml index 09282b2c..cc67d3ed 100644 --- a/nomos-mix/core/Cargo.toml +++ b/nomos-mix/core/Cargo.toml @@ -12,9 +12,10 @@ rand = "0.8" serde = { version = "1.0", features = ["derive"] } nomos-mix-message = { path = "../message" } futures = "0.3" +multiaddr = "0.18" +x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread"] } rand_chacha = "0.3" - diff --git a/nomos-mix/core/src/lib.rs b/nomos-mix/core/src/lib.rs index 0326319c..5484b495 100644 --- a/nomos-mix/core/src/lib.rs +++ b/nomos-mix/core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod membership; pub mod message_blend; pub mod persistent_transmission; diff --git a/nomos-mix/core/src/membership.rs b/nomos-mix/core/src/membership.rs new file mode 100644 index 00000000..857dc450 --- /dev/null +++ b/nomos-mix/core/src/membership.rs @@ -0,0 +1,39 @@ +use multiaddr::Multiaddr; +use nomos_mix_message::MixMessage; +use rand::{seq::SliceRandom, Rng}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug)] +pub struct Membership +where + M: MixMessage, +{ + remote_nodes: Vec>, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Node { + pub address: Multiaddr, + pub public_key: K, +} + +impl Membership +where + M: MixMessage, + M::PublicKey: PartialEq, +{ + pub fn new(mut nodes: Vec>, local_public_key: M::PublicKey) -> Self { + nodes.retain(|node| node.public_key != local_public_key); + Self { + remote_nodes: nodes, + } + } + + pub fn choose_remote_nodes( + &self, + rng: &mut R, + amount: usize, + ) -> Vec<&Node> { + self.remote_nodes.choose_multiple(rng, amount).collect() + } +} diff --git a/nomos-mix/core/src/message_blend/crypto.rs b/nomos-mix/core/src/message_blend/crypto.rs index 910b28e0..67d80afa 100644 --- a/nomos-mix/core/src/message_blend/crypto.rs +++ b/nomos-mix/core/src/message_blend/crypto.rs @@ -1,33 +1,59 @@ -use nomos_mix_message::{new_message, unwrap_message}; +use crate::membership::Membership; +use nomos_mix_message::MixMessage; +use rand::RngCore; use serde::{Deserialize, Serialize}; /// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages /// for the message indistinguishability. -#[derive(Clone, Copy, Debug)] -pub struct CryptographicProcessor { - settings: CryptographicProcessorSettings, +pub struct CryptographicProcessor +where + M: MixMessage, +{ + settings: CryptographicProcessorSettings, + membership: Membership, + rng: R, } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub struct CryptographicProcessorSettings { +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CryptographicProcessorSettings { + pub private_key: K, pub num_mix_layers: usize, } -impl CryptographicProcessor { - pub fn new(settings: CryptographicProcessorSettings) -> Self { - Self { settings } +impl CryptographicProcessor +where + R: RngCore, + M: MixMessage, + M::PublicKey: Clone + PartialEq, +{ + pub fn new( + settings: CryptographicProcessorSettings, + membership: Membership, + rng: R, + ) -> Self { + Self { + settings, + membership, + rng, + } } - pub fn wrap_message(&self, message: &[u8]) -> Result, nomos_mix_message::Error> { + pub fn wrap_message(&mut self, message: &[u8]) -> Result, nomos_mix_message::Error> { // TODO: Use the actual Sphinx encoding instead of mock. - // TODO: Select `num_mix_layers` random nodes from the membership. - new_message(message, self.settings.num_mix_layers.try_into().unwrap()) + let public_keys = self + .membership + .choose_remote_nodes(&mut self.rng, self.settings.num_mix_layers) + .iter() + .map(|node| node.public_key.clone()) + .collect::>(); + + M::build_message(message, &public_keys) } pub fn unwrap_message( &self, message: &[u8], ) -> Result<(Vec, bool), nomos_mix_message::Error> { - unwrap_message(message) + M::unwrap_message(message, &self.settings.private_key) } } diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-mix/core/src/message_blend/mod.rs index 3678701e..568c9da8 100644 --- a/nomos-mix/core/src/message_blend/mod.rs +++ b/nomos-mix/core/src/message_blend/mod.rs @@ -10,41 +10,64 @@ use std::pin::Pin; use std::task::{Context, Poll}; pub use temporal::TemporalProcessorSettings; +use crate::membership::Membership; use crate::message_blend::crypto::CryptographicProcessor; use crate::message_blend::temporal::TemporalProcessorExt; use crate::MixOutgoingMessage; +use nomos_mix_message::MixMessage; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct MessageBlendSettings { - pub cryptographic_processor: CryptographicProcessorSettings, +pub struct MessageBlendSettings +where + M: MixMessage, + M::PrivateKey: Serialize + DeserializeOwned, +{ + pub cryptographic_processor: CryptographicProcessorSettings, pub temporal_processor: TemporalProcessorSettings, } /// [`MessageBlendStream`] handles the entire mixing tiers process /// - Unwraps incoming messages received from network using [`CryptographicProcessor`] /// - Pushes unwrapped messages to [`TemporalProcessor`] -pub struct MessageBlendStream { +pub struct MessageBlendStream +where + M: MixMessage, +{ input_stream: S, output_stream: BoxStream<'static, MixOutgoingMessage>, temporal_sender: UnboundedSender, - cryptographic_processor: CryptographicProcessor, + cryptographic_processor: CryptographicProcessor, _rng: PhantomData, } -impl MessageBlendStream +impl MessageBlendStream where S: Stream>, Rng: RngCore + Unpin + Send + 'static, + M: MixMessage, + M::PrivateKey: Serialize + DeserializeOwned, + M::PublicKey: Clone + PartialEq, { - pub fn new(input_stream: S, settings: MessageBlendSettings, rng: Rng) -> Self { - let cryptographic_processor = CryptographicProcessor::new(settings.cryptographic_processor); + pub fn new( + input_stream: S, + settings: MessageBlendSettings, + membership: Membership, + cryptographic_processor_rng: Rng, + temporal_processor_rng: Rng, + ) -> Self { + let cryptographic_processor = CryptographicProcessor::new( + settings.cryptographic_processor, + membership, + cryptographic_processor_rng, + ); let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel(); let output_stream = UnboundedReceiverStream::new(temporal_receiver) - .temporal_stream(settings.temporal_processor, rng) + .temporal_stream(settings.temporal_processor, temporal_processor_rng) .boxed(); Self { input_stream, @@ -77,10 +100,13 @@ where } } -impl Stream for MessageBlendStream +impl Stream for MessageBlendStream where S: Stream> + Unpin, Rng: RngCore + Unpin + Send + 'static, + M: MixMessage + Unpin, + M::PrivateKey: Serialize + DeserializeOwned + Unpin, + M::PublicKey: Clone + PartialEq + Unpin, { type Item = MixOutgoingMessage; @@ -92,25 +118,39 @@ where } } -pub trait MessageBlendExt: Stream> +pub trait MessageBlendExt: Stream> where Rng: RngCore + Send + Unpin + 'static, + M: MixMessage, + M::PrivateKey: Serialize + DeserializeOwned, + M::PublicKey: Clone + PartialEq, { fn blend( self, - message_blend_settings: MessageBlendSettings, - rng: Rng, - ) -> MessageBlendStream + message_blend_settings: MessageBlendSettings, + membership: Membership, + cryptographic_processor_rng: Rng, + temporal_processor_rng: Rng, + ) -> MessageBlendStream where Self: Sized + Unpin, { - MessageBlendStream::new(self, message_blend_settings, rng) + MessageBlendStream::new( + self, + message_blend_settings, + membership, + cryptographic_processor_rng, + temporal_processor_rng, + ) } } -impl MessageBlendExt for T +impl MessageBlendExt for T where T: Stream>, Rng: RngCore + Unpin + Send + 'static, + M: MixMessage, + M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq, + M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq, { } diff --git a/nomos-mix/core/src/persistent_transmission.rs b/nomos-mix/core/src/persistent_transmission.rs index 0a0df5a0..372bfa02 100644 --- a/nomos-mix/core/src/persistent_transmission.rs +++ b/nomos-mix/core/src/persistent_transmission.rs @@ -1,7 +1,9 @@ use futures::Stream; -use nomos_mix_message::DROP_MESSAGE; +use nomos_mix_message::MixMessage; use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use std::time::Duration; @@ -26,7 +28,7 @@ impl Default for PersistentTransmissionSettings { } /// Transmit scheduled messages with a persistent rate as a stream. -pub struct PersistentTransmissionStream +pub struct PersistentTransmissionStream where S: Stream, Rng: RngCore, @@ -34,18 +36,20 @@ where interval: Interval, coin: Coin, stream: S, + _mix_message: PhantomData, } -impl PersistentTransmissionStream +impl PersistentTransmissionStream where S: Stream, Rng: RngCore, + M: MixMessage, { pub fn new( settings: PersistentTransmissionSettings, stream: S, rng: Rng, - ) -> PersistentTransmissionStream { + ) -> PersistentTransmissionStream { let interval = time::interval(Duration::from_secs_f64( 1.0 / settings.max_emission_frequency, )); @@ -54,14 +58,16 @@ where interval, coin, stream, + _mix_message: Default::default(), } } } -impl Stream for PersistentTransmissionStream +impl Stream for PersistentTransmissionStream where S: Stream> + Unpin, Rng: RngCore + Unpin, + M: MixMessage + Unpin, { type Item = Vec; @@ -78,22 +84,23 @@ where if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) { Poll::Ready(Some(item)) } else if coin.flip() { - Poll::Ready(Some(DROP_MESSAGE.to_vec())) + Poll::Ready(Some(M::DROP_MESSAGE.to_vec())) } else { Poll::Pending } } } -pub trait PersistentTransmissionExt: Stream +pub trait PersistentTransmissionExt: Stream where Rng: RngCore, + M: MixMessage, { fn persistent_transmission( self, settings: PersistentTransmissionSettings, rng: Rng, - ) -> PersistentTransmissionStream + ) -> PersistentTransmissionStream where Self: Sized + Unpin, { @@ -101,10 +108,12 @@ where } } -impl PersistentTransmissionExt for S +impl PersistentTransmissionExt for S where S: Stream, Rng: RngCore, + M: MixMessage, + M::PublicKey: Clone + Serialize + DeserializeOwned, { } @@ -141,6 +150,7 @@ enum CoinError { mod tests { use super::*; use futures::StreamExt; + use nomos_mix_message::mock::MockMixMessage; use rand::SeedableRng; use rand_chacha::ChaCha8Rng; use tokio::sync::mpsc; @@ -183,7 +193,7 @@ mod tests { let lower_bound = expected_emission_interval - torelance; let upper_bound = expected_emission_interval + torelance; // prepare stream - let mut persistent_transmission_stream = + let mut persistent_transmission_stream: PersistentTransmissionStream<_, _, MockMixMessage> = stream.persistent_transmission(settings, ChaCha8Rng::from_entropy()); // Messages must be scheduled in non-blocking manner. schedule_sender.send(vec![1]).unwrap(); @@ -209,16 +219,14 @@ mod tests { ); assert_interval!(&mut last_time, lower_bound, upper_bound); - assert_eq!( - persistent_transmission_stream.next().await.unwrap(), - DROP_MESSAGE.to_vec() - ); + assert!(MockMixMessage::is_drop_message( + &persistent_transmission_stream.next().await.unwrap() + )); assert_interval!(&mut last_time, lower_bound, upper_bound); - assert_eq!( - persistent_transmission_stream.next().await.unwrap(), - DROP_MESSAGE.to_vec() - ); + assert!(MockMixMessage::is_drop_message( + &persistent_transmission_stream.next().await.unwrap() + )); assert_interval!(&mut last_time, lower_bound, upper_bound); // Schedule a new message and check if it is emitted at the next interval diff --git a/nomos-mix/message/Cargo.toml b/nomos-mix/message/Cargo.toml index ef339926..69488a8e 100644 --- a/nomos-mix/message/Cargo.toml +++ b/nomos-mix/message/Cargo.toml @@ -5,7 +5,6 @@ edition = "2021" [dependencies] serde = { version = "1.0", features = ["derive"] } -sha2 = "0.10" sphinx-packet = "0.2" thiserror = "1.0.65" x25519-dalek = { version = "2.0.1", features = [ diff --git a/nomos-mix/message/src/error.rs b/nomos-mix/message/src/error.rs index 92f3d345..38b4e93a 100644 --- a/nomos-mix/message/src/error.rs +++ b/nomos-mix/message/src/error.rs @@ -4,8 +4,8 @@ pub enum Error { InvalidMixMessage, #[error("Payload is too large")] PayloadTooLarge, - #[error("Too many recipients")] - TooManyRecipients, + #[error("Invalid number of layers")] + InvalidNumberOfLayers, #[error("Sphinx packet error: {0}")] SphinxPacketError(#[from] sphinx_packet::Error), #[error("Unwrapping a message is not allowed to this node")] diff --git a/nomos-mix/message/src/lib.rs b/nomos-mix/message/src/lib.rs index cd982f94..5debe180 100644 --- a/nomos-mix/message/src/lib.rs +++ b/nomos-mix/message/src/lib.rs @@ -1,64 +1,27 @@ mod error; +pub mod mock; pub mod packet; pub use error::Error; -use sha2::{Digest, Sha256}; +pub trait MixMessage { + type PublicKey; + type PrivateKey; + const DROP_MESSAGE: &'static [u8]; -pub const MSG_SIZE: usize = 2048; -pub const DROP_MESSAGE: [u8; MSG_SIZE] = [0; MSG_SIZE]; - -// TODO: Remove all the mock below once the actual implementation is integrated to the system. -// -/// A mock implementation of the Sphinx encoding. -/// -/// The length of the encoded message is fixed to [`MSG_SIZE`] bytes. -/// The first byte of the encoded message is the number of remaining layers to be unwrapped. -/// The remaining bytes are the payload that is zero-padded to the end. -pub fn new_message(payload: &[u8], num_layers: u8) -> Result, Error> { - if payload.len() > MSG_SIZE - 1 { - return Err(Error::PayloadTooLarge); - } - - let mut message: Vec = Vec::with_capacity(MSG_SIZE); - message.push(num_layers); - message.extend(payload); - message.extend(std::iter::repeat(0).take(MSG_SIZE - message.len())); - Ok(message) -} - -/// SHA-256 hash of the message -pub fn message_id(message: &[u8]) -> Vec { - let mut hasher = Sha256::new(); - hasher.update(message); - hasher.finalize().to_vec() -} - -/// Unwrap the message one layer. -/// -/// This function returns the unwrapped message and a boolean indicating whether the message was fully unwrapped. -/// (False if the message still has layers to be unwrapped, true otherwise) -/// -/// If the input message was already fully unwrapped, or if ititss format is invalid, -/// this function returns `[Error::InvalidMixMessage]`. -pub fn unwrap_message(message: &[u8]) -> Result<(Vec, bool), Error> { - if message.is_empty() { - return Err(Error::InvalidMixMessage); - } - - match message[0] { - 0 => Err(Error::InvalidMixMessage), - 1 => Ok((message[1..].to_vec(), true)), - n => { - let mut unwrapped: Vec = Vec::with_capacity(message.len()); - unwrapped.push(n - 1); - unwrapped.extend(&message[1..]); - Ok((unwrapped, false)) - } + fn build_message(payload: &[u8], public_keys: &[Self::PublicKey]) -> Result, Error>; + /// Unwrap the message one layer. + /// + /// This function returns the unwrapped message and a boolean indicating whether the message was fully unwrapped. + /// (False if the message still has layers to be unwrapped, true otherwise) + /// + /// If the input message was already fully unwrapped, or if its format is invalid, + /// this function returns `[Error::InvalidMixMessage]`. + fn unwrap_message( + message: &[u8], + private_key: &Self::PrivateKey, + ) -> Result<(Vec, bool), Error>; + fn is_drop_message(message: &[u8]) -> bool { + message == Self::DROP_MESSAGE } } - -/// Check if the message is a drop message. -pub fn is_drop_message(message: &[u8]) -> bool { - message == DROP_MESSAGE -} diff --git a/nomos-mix/message/src/mock/mod.rs b/nomos-mix/message/src/mock/mod.rs new file mode 100644 index 00000000..83c66f8b --- /dev/null +++ b/nomos-mix/message/src/mock/mod.rs @@ -0,0 +1,125 @@ +use crate::{Error, MixMessage}; +// TODO: Remove all the mock below once the actual implementation is integrated to the system. +// +/// A mock implementation of the Sphinx encoding. + +const PRIVATE_KEY_SIZE: usize = 32; +const PUBLIC_KEY_SIZE: usize = 32; + +const PADDED_PAYLOAD_SIZE: usize = 2048; +const PAYLOAD_PADDING_SEPARATOR: u8 = 0x01; +const PAYLOAD_PADDING_SEPARATOR_SIZE: usize = 1; +const MAX_LAYERS: usize = 5; +pub const MESSAGE_SIZE: usize = PUBLIC_KEY_SIZE * MAX_LAYERS + PADDED_PAYLOAD_SIZE; + +#[derive(Clone, Debug)] +pub struct MockMixMessage; + +impl MixMessage for MockMixMessage { + type PublicKey = [u8; PUBLIC_KEY_SIZE]; + type PrivateKey = [u8; PRIVATE_KEY_SIZE]; + const DROP_MESSAGE: &'static [u8] = &[0; MESSAGE_SIZE]; + + /// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes. + /// The [`MAX_LAYERS`] number of [`NodeId`]s are concatenated in front of the payload. + /// The payload is zero-padded to the end. + /// + fn build_message(payload: &[u8], public_keys: &[Self::PublicKey]) -> Result, Error> { + if public_keys.is_empty() || public_keys.len() > MAX_LAYERS { + return Err(Error::InvalidNumberOfLayers); + } + if payload.len() > PADDED_PAYLOAD_SIZE - PAYLOAD_PADDING_SEPARATOR_SIZE { + return Err(Error::PayloadTooLarge); + } + + let mut message: Vec = Vec::with_capacity(MESSAGE_SIZE); + + public_keys.iter().for_each(|public_key| { + message.extend(public_key); + }); + // If there is any remaining layers, fill them with zeros. + (0..MAX_LAYERS - public_keys.len()).for_each(|_| message.extend(&[0; PUBLIC_KEY_SIZE])); + + // Append payload with padding + message.extend(payload); + message.push(PAYLOAD_PADDING_SEPARATOR); + message.extend( + std::iter::repeat(0) + .take(PADDED_PAYLOAD_SIZE - payload.len() - PAYLOAD_PADDING_SEPARATOR_SIZE), + ); + Ok(message) + } + + fn unwrap_message( + message: &[u8], + private_key: &Self::PrivateKey, + ) -> Result<(Vec, bool), Error> { + if message.len() != MESSAGE_SIZE { + return Err(Error::InvalidMixMessage); + } + + let public_key = + x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(*private_key)) + .to_bytes(); + if message[0..PUBLIC_KEY_SIZE] != public_key { + return Err(Error::MsgUnwrapNotAllowed); + } + + // If this is the last layer + if message[PUBLIC_KEY_SIZE..PUBLIC_KEY_SIZE * 2] == [0; PUBLIC_KEY_SIZE] { + let padded_payload = &message[PUBLIC_KEY_SIZE * MAX_LAYERS..]; + // remove the payload padding + match padded_payload + .iter() + .rposition(|&x| x == PAYLOAD_PADDING_SEPARATOR) + { + Some(pos) => { + return Ok((padded_payload[0..pos].to_vec(), true)); + } + _ => return Err(Error::InvalidMixMessage), + } + } + + let mut new_message: Vec = Vec::with_capacity(MESSAGE_SIZE); + new_message.extend(&message[PUBLIC_KEY_SIZE..PUBLIC_KEY_SIZE * MAX_LAYERS]); + new_message.extend(&[0; PUBLIC_KEY_SIZE]); + new_message.extend(&message[PUBLIC_KEY_SIZE * MAX_LAYERS..]); // padded payload + Ok((new_message, false)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn message() { + let private_keys = [ + x25519_dalek::StaticSecret::random(), + x25519_dalek::StaticSecret::random(), + x25519_dalek::StaticSecret::random(), + ]; + let public_keys = private_keys + .iter() + .map(|k| x25519_dalek::PublicKey::from(k).to_bytes()) + .collect::>(); + let payload = [7; 10]; + let message = MockMixMessage::build_message(&payload, &public_keys).unwrap(); + assert_eq!(message.len(), MESSAGE_SIZE); + + let (message, is_fully_unwrapped) = + MockMixMessage::unwrap_message(&message, &private_keys[0].to_bytes()).unwrap(); + assert!(!is_fully_unwrapped); + assert_eq!(message.len(), MESSAGE_SIZE); + + let (message, is_fully_unwrapped) = + MockMixMessage::unwrap_message(&message, &private_keys[1].to_bytes()).unwrap(); + assert!(!is_fully_unwrapped); + assert_eq!(message.len(), MESSAGE_SIZE); + + let (unwrapped_payload, is_fully_unwrapped) = + MockMixMessage::unwrap_message(&message, &private_keys[2].to_bytes()).unwrap(); + assert!(is_fully_unwrapped); + assert_eq!(unwrapped_payload, payload); + } +} diff --git a/nomos-mix/message/src/packet.rs b/nomos-mix/message/src/packet.rs index ecf9fdee..8e9267c3 100644 --- a/nomos-mix/message/src/packet.rs +++ b/nomos-mix/message/src/packet.rs @@ -61,7 +61,7 @@ impl Packet { ephemeral_public_key: x25519_dalek::PublicKey::from(&ephemeral_privkey), routing_info: RoutingInfo { remaining_layers: u8::try_from(recipient_pubkeys.len()) - .map_err(|_| Error::TooManyRecipients)?, + .map_err(|_| Error::InvalidNumberOfLayers)?, }, }, payload: payload.into_bytes(), diff --git a/nomos-mix/network/Cargo.toml b/nomos-mix/network/Cargo.toml index 9871cdc1..c8dfdc24 100644 --- a/nomos-mix/network/Cargo.toml +++ b/nomos-mix/network/Cargo.toml @@ -11,6 +11,7 @@ libp2p = "0.53" tracing = "0.1" nomos-mix = { path = "../core" } nomos-mix-message = { path = "../message" } +sha2 = "0.10" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } diff --git a/nomos-mix/network/src/behaviour.rs b/nomos-mix/network/src/behaviour.rs index 93f55f42..d2e04d84 100644 --- a/nomos-mix/network/src/behaviour.rs +++ b/nomos-mix/network/src/behaviour.rs @@ -1,8 +1,7 @@ -use std::{ - collections::{HashMap, HashSet, VecDeque}, - task::{Context, Poll, Waker}, +use crate::{ + error::Error, + handler::{FromBehaviour, MixConnectionHandler, ToBehaviour}, }; - use cached::{Cached, TimedCache}; use libp2p::{ core::Endpoint, @@ -12,17 +11,18 @@ use libp2p::{ }, Multiaddr, PeerId, }; -use nomos_mix_message::{is_drop_message, message_id}; - -use crate::{ - error::Error, - handler::{FromBehaviour, MixConnectionHandler, ToBehaviour}, +use nomos_mix_message::MixMessage; +use sha2::{Digest, Sha256}; +use std::marker::PhantomData; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + task::{Context, Poll, Waker}, }; /// A [`NetworkBehaviour`]: /// - forwards messages to all connected peers with deduplication. /// - receives messages from all connected peers. -pub struct Behaviour { +pub struct Behaviour { config: Config, /// Peers that support the mix protocol, and their connection IDs negotiated_peers: HashMap>, @@ -33,6 +33,7 @@ pub struct Behaviour { /// An LRU time cache for storing seen messages (based on their ID). This cache prevents /// duplicates from being propagated on the network. duplicate_cache: TimedCache, ()>, + _mix_message: PhantomData, } #[derive(Debug)] @@ -47,7 +48,10 @@ pub enum Event { Error(Error), } -impl Behaviour { +impl Behaviour +where + M: MixMessage, +{ pub fn new(config: Config) -> Self { let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); Self { @@ -56,17 +60,18 @@ impl Behaviour { events: VecDeque::new(), waker: None, duplicate_cache, + _mix_message: Default::default(), } } /// Publish a message (data or drop) to all connected peers pub fn publish(&mut self, message: Vec) -> Result<(), Error> { - if is_drop_message(&message) { + if M::is_drop_message(&message) { // Bypass deduplication for the drop message return self.forward_message(message, None); } - let msg_id = message_id(&message); + let msg_id = Self::message_id(&message); // If the message was already seen, don't forward it again if self.duplicate_cache.cache_get(&msg_id).is_some() { return Ok(()); @@ -136,6 +141,13 @@ impl Behaviour { } } + /// SHA-256 hash of the message + fn message_id(message: &[u8]) -> Vec { + let mut hasher = Sha256::new(); + hasher.update(message); + hasher.finalize().to_vec() + } + fn try_wake(&mut self) { if let Some(waker) = self.waker.take() { waker.wake(); @@ -143,7 +155,10 @@ impl Behaviour { } } -impl NetworkBehaviour for Behaviour { +impl NetworkBehaviour for Behaviour +where + M: MixMessage + 'static, +{ type ConnectionHandler = MixConnectionHandler; type ToSwarm = Event; @@ -191,14 +206,14 @@ impl NetworkBehaviour for Behaviour { // A message was forwarded from the peer. ToBehaviour::Message(message) => { // Ignore drop message - if is_drop_message(&message) { + if M::is_drop_message(&message) { return; } // Add the message to the cache. If it was already seen, ignore it. if self .duplicate_cache - .cache_set(message_id(&message), ()) + .cache_set(Self::message_id(&message), ()) .is_some() { return; diff --git a/nomos-mix/network/src/handler.rs b/nomos-mix/network/src/handler.rs index 09f45415..e6bddc64 100644 --- a/nomos-mix/network/src/handler.rs +++ b/nomos-mix/network/src/handler.rs @@ -13,7 +13,6 @@ use libp2p::{ }, Stream, StreamProtocol, }; -use nomos_mix_message::MSG_SIZE; use crate::behaviour::Config; @@ -248,15 +247,28 @@ impl ConnectionHandler for MixConnectionHandler { /// Write a message to the stream async fn send_msg(mut stream: Stream, msg: Vec) -> io::Result { + let msg_len: u16 = msg.len().try_into().map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!( + "Message length is too big. Got {}, expected {}", + msg.len(), + std::mem::size_of::() + ), + ) + })?; + stream.write_all(msg_len.to_be_bytes().as_ref()).await?; stream.write_all(&msg).await?; stream.flush().await?; Ok(stream) } - -/// Read a fixed-length message from the stream -// TODO: Consider handling variable-length messages +/// Read a message from the stream async fn recv_msg(mut stream: Stream) -> io::Result<(Stream, Vec)> { - let mut buf = vec![0; MSG_SIZE]; + let mut msg_len = [0; std::mem::size_of::()]; + stream.read_exact(&mut msg_len).await?; + let msg_len = u16::from_be_bytes(msg_len) as usize; + + let mut buf = vec![0; msg_len]; stream.read_exact(&mut buf).await?; Ok((stream, buf)) } diff --git a/nomos-mix/network/src/lib.rs b/nomos-mix/network/src/lib.rs index 1de9838c..6db526e4 100644 --- a/nomos-mix/network/src/lib.rs +++ b/nomos-mix/network/src/lib.rs @@ -14,7 +14,7 @@ mod test { swarm::{dummy, NetworkBehaviour, SwarmEvent}, Multiaddr, PeerId, Swarm, SwarmBuilder, }; - use nomos_mix_message::MSG_SIZE; + use nomos_mix_message::mock::MockMixMessage; use tokio::select; use crate::{behaviour::Config, error::Error, Behaviour, Event}; @@ -43,7 +43,7 @@ mod test { // Swamr2 publishes a message. let task = async { - let msg = vec![1; MSG_SIZE]; + let msg = vec![1; 10]; let mut msg_published = false; let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); loop { @@ -98,7 +98,7 @@ mod test { // Expect all publish attempts to fail with [`Error::NoPeers`] // because swarm2 doesn't have any peers that support the mix protocol. - let msg = vec![1; MSG_SIZE]; + let msg = vec![1; 10]; let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); let mut publish_try_count = 0; loop { @@ -116,7 +116,7 @@ mod test { } } - fn new_swarm(key: Keypair) -> Swarm { + fn new_swarm(key: Keypair) -> Swarm> { new_swarm_with_behaviour( key, Behaviour::new(Config { diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 600cac8d..53a87f63 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -25,6 +25,7 @@ nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb nomos-network = { path = "../../network", features = ["mock"] } nomos-mix-service = { path = "../../mix" } nomos-mix = { path = "../../../nomos-mix/core" } +nomos-mix-message = { path = "../../../nomos-mix/message" } nomos-libp2p = { path = "../../../nomos-libp2p" } libp2p = { version = "0.53.2", features = ["ed25519"] } once_cell = "1.19" @@ -41,6 +42,7 @@ time = "0.3" [dev-dependencies] blake2 = { version = "0.10" } +x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [features] default = ["libp2p"] diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index edc406aa..29052433 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -1,9 +1,12 @@ use cryptarchia_consensus::LeaderConfig; // std use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; +use nomos_mix::membership::Node; use nomos_mix::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, }; +use nomos_mix_message::mock::MockMixMessage; +use nomos_mix_message::MixMessage; use std::path::PathBuf; use std::time::Duration; // crates @@ -187,13 +190,19 @@ pub struct TestDaNetworkSettings { pub node_key: ed25519::SecretKey, } +pub struct TestMixSettings { + pub backend: Libp2pMixBackendSettings, + pub private_key: x25519_dalek::StaticSecret, + pub membership: Vec::PublicKey>>, +} + pub fn new_node( leader_config: &LeaderConfig, ledger_config: &nomos_ledger::Config, genesis_state: &LedgerState, time_config: &TimeConfig, swarm_config: &SwarmConfig, - mix_config: &Libp2pMixBackendSettings, + mix_config: &TestMixSettings, db_path: PathBuf, blobs_dir: &PathBuf, initial_peers: Vec, @@ -210,14 +219,18 @@ pub fn new_node( }, }, mix: MixConfig { - backend: mix_config.clone(), + backend: mix_config.backend.clone(), persistent_transmission: Default::default(), message_blend: MessageBlendSettings { - cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + cryptographic_processor: CryptographicProcessorSettings { + private_key: mix_config.private_key.to_bytes(), + num_mix_layers: 1, + }, temporal_processor: TemporalProcessorSettings { max_delay_seconds: 2, }, }, + membership: mix_config.membership.clone(), }, da_network: DaNetworkConfig { backend: DaNetworkBackendSettings { @@ -308,35 +321,37 @@ pub fn new_node( .unwrap() } -pub fn new_mix_configs(listening_addresses: Vec) -> Vec { - let mut configs = listening_addresses +pub fn new_mix_configs(listening_addresses: Vec) -> Vec { + let settings = listening_addresses .iter() - .map(|listening_address| Libp2pMixBackendSettings { - listening_address: listening_address.clone(), - node_key: ed25519::SecretKey::generate(), - membership: Vec::new(), - peering_degree: 1, + .map(|listening_address| { + ( + Libp2pMixBackendSettings { + listening_address: listening_address.clone(), + node_key: ed25519::SecretKey::generate(), + peering_degree: 1, + }, + x25519_dalek::StaticSecret::random(), + ) }) .collect::>(); - let membership = configs + let membership = settings .iter() - .map(|c| { - let peer_id = PeerId::from_public_key( - &ed25519::Keypair::from(c.node_key.clone()).public().into(), - ); - c.listening_address - .clone() - .with_p2p(peer_id) - .unwrap_or_else(|orig_addr| orig_addr) + .map(|(backend, private_key)| Node { + address: backend.listening_address.clone(), + public_key: x25519_dalek::PublicKey::from(private_key).to_bytes(), }) .collect::>(); - configs - .iter_mut() - .for_each(|c| c.membership = membership.clone()); - - configs + settings + .into_iter() + .map(|(backend, private_key)| TestMixSettings { + backend, + private_key, + membership: membership.clone(), + }) + .collect() } // Client node is only created for asyncroniously interact with nodes in the test. diff --git a/nomos-services/mix/Cargo.toml b/nomos-services/mix/Cargo.toml index 4a39cfcd..c5ad3c75 100644 --- a/nomos-services/mix/Cargo.toml +++ b/nomos-services/mix/Cargo.toml @@ -20,6 +20,7 @@ serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["macros", "sync"] } tokio-stream = "0.1" tracing = "0.1" +x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [features] default = [] diff --git a/nomos-services/mix/src/backends/libp2p.rs b/nomos-services/mix/src/backends/libp2p.rs index 7a891d0b..c580f154 100644 --- a/nomos-services/mix/src/backends/libp2p.rs +++ b/nomos-services/mix/src/backends/libp2p.rs @@ -1,16 +1,19 @@ use std::{io, pin::Pin, time::Duration}; +use super::MixBackend; use async_trait::async_trait; use futures::{Stream, StreamExt}; use libp2p::{ core::transport::ListenerId, identity::{ed25519, Keypair}, swarm::SwarmEvent, - Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, + Multiaddr, Swarm, SwarmBuilder, TransportError, }; -use nomos_libp2p::{secret_key_serde, DialError, DialOpts, Protocol}; +use nomos_libp2p::{secret_key_serde, DialError, DialOpts}; +use nomos_mix::membership::Membership; +use nomos_mix_message::mock::MockMixMessage; use overwatch_rs::overwatch::handle::OverwatchHandle; -use rand::seq::IteratorRandom; +use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::{ sync::{broadcast, mpsc}, @@ -18,8 +21,6 @@ use tokio::{ }; use tokio_stream::wrappers::BroadcastStream; -use super::MixBackend; - /// A mix backend that uses the libp2p network stack. pub struct Libp2pMixBackend { #[allow(dead_code)] @@ -34,7 +35,6 @@ pub struct Libp2pMixBackendSettings { // A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC) #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] pub node_key: ed25519::SecretKey, - pub membership: Vec, pub peering_degree: usize, } @@ -44,12 +44,16 @@ const CHANNEL_SIZE: usize = 64; impl MixBackend for Libp2pMixBackend { type Settings = Libp2pMixBackendSettings; - fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { + fn new( + config: Self::Settings, + overwatch_handle: OverwatchHandle, + membership: Membership, + mut rng: R, + ) -> Self { let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE); let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE); let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone())); - let local_peer_id = keypair.public().to_peer_id(); let mut swarm = MixSwarm::new( keypair, swarm_message_receiver, @@ -63,20 +67,12 @@ impl MixBackend for Libp2pMixBackend { }); // Randomly select peering_degree number of peers, and dial to them - // TODO: Consider moving the peer seelction to the nomos_mix_network::Behaviour - config - .membership + membership + .choose_remote_nodes(&mut rng, config.peering_degree) .iter() - .filter(|addr| match extract_peer_id(addr) { - Some(peer_id) => peer_id != local_peer_id, - None => false, - }) - .choose_multiple(&mut rand::thread_rng(), config.peering_degree) - .iter() - .cloned() - .for_each(|addr| { - if let Err(e) = swarm.dial(addr.clone()) { - tracing::error!("failed to dial to {:?}: {:?}", addr, e); + .for_each(|node| { + if let Err(e) = swarm.dial(node.address.clone()) { + tracing::error!("failed to dial to {:?}: {:?}", node.address, e); } }); @@ -110,7 +106,7 @@ impl MixBackend for Libp2pMixBackend { } struct MixSwarm { - swarm: Swarm, + swarm: Swarm>, swarm_messages_receiver: mpsc::Receiver, incoming_message_sender: broadcast::Sender>, } @@ -193,13 +189,3 @@ impl MixSwarm { } } } - -fn extract_peer_id(multiaddr: &Multiaddr) -> Option { - multiaddr.iter().find_map(|protocol| { - if let Protocol::P2p(peer_id) = protocol { - Some(peer_id) - } else { - None - } - }) -} diff --git a/nomos-services/mix/src/backends/mod.rs b/nomos-services/mix/src/backends/mod.rs index f983b2f9..f6244f8a 100644 --- a/nomos-services/mix/src/backends/mod.rs +++ b/nomos-services/mix/src/backends/mod.rs @@ -4,14 +4,24 @@ pub mod libp2p; use std::{fmt::Debug, pin::Pin}; use futures::Stream; +use nomos_mix::membership::Membership; +use nomos_mix_message::mock::MockMixMessage; use overwatch_rs::overwatch::handle::OverwatchHandle; +use rand::Rng; /// A trait for mix backends that send messages to the mix network. #[async_trait::async_trait] pub trait MixBackend { type Settings: Clone + Debug + Send + Sync + 'static; - fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self; + fn new( + config: Self::Settings, + overwatch_handle: OverwatchHandle, + membership: Membership, + rng: R, + ) -> Self + where + R: Rng; /// Publish a message to the mix network. async fn publish(&self, msg: Vec); /// Listen to messages received from the mix network. diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs index be8a588b..ecb4b77f 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/mix/src/lib.rs @@ -6,12 +6,14 @@ use backends::MixBackend; use futures::StreamExt; use network::NetworkAdapter; use nomos_core::wire; +use nomos_mix::membership::{Membership, Node}; use nomos_mix::message_blend::crypto::CryptographicProcessor; use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings}; use nomos_mix::persistent_transmission::{ - PersistentTransmissionExt, PersistentTransmissionSettings, + PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, }; use nomos_mix::MixOutgoingMessage; +use nomos_mix_message::mock::MockMixMessage; use nomos_network::NetworkService; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -42,6 +44,7 @@ where backend: Backend, service_state: ServiceStateHandle, network_relay: Relay>, + membership: Membership, } impl ServiceData for MixService @@ -69,13 +72,17 @@ where { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); + let mix_config = service_state.settings_reader.get_updated_settings(); Ok(Self { backend: ::new( service_state.settings_reader.get_updated_settings().backend, service_state.overwatch_handle.clone(), + mix_config.membership(), + ChaCha12Rng::from_entropy(), ), service_state, network_relay, + membership: mix_config.membership(), }) } @@ -84,25 +91,35 @@ where service_state, mut backend, network_relay, + membership, } = self; let mix_config = service_state.settings_reader.get_updated_settings(); - let cryptographic_processor = - CryptographicProcessor::new(mix_config.message_blend.cryptographic_processor); + let mut cryptographic_processor = CryptographicProcessor::new( + mix_config.message_blend.cryptographic_processor.clone(), + membership.clone(), + ChaCha12Rng::from_entropy(), + ); let network_relay = network_relay.connect().await?; let network_adapter = Network::new(network_relay); // tier 1 persistent transmission let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel(); - let mut persistent_transmission_messages = - UnboundedReceiverStream::new(persistent_receiver).persistent_transmission( - mix_config.persistent_transmission, - ChaCha12Rng::from_entropy(), - ); + let mut persistent_transmission_messages: PersistentTransmissionStream< + _, + _, + MockMixMessage, + > = UnboundedReceiverStream::new(persistent_receiver).persistent_transmission( + mix_config.persistent_transmission, + ChaCha12Rng::from_entropy(), + ); // tier 2 blend - let mut blend_messages = backend - .listen_to_incoming_messages() - .blend(mix_config.message_blend, ChaCha12Rng::from_entropy()); + let mut blend_messages = backend.listen_to_incoming_messages().blend( + mix_config.message_blend, + membership.clone(), + ChaCha12Rng::from_entropy(), + ChaCha12Rng::from_entropy(), + ); // local messages, are bypassed and send immediately let mut local_messages = service_state @@ -190,8 +207,21 @@ where #[derive(Serialize, Deserialize, Clone, Debug)] pub struct MixConfig { pub backend: BackendSettings, - pub message_blend: MessageBlendSettings, + pub message_blend: MessageBlendSettings, pub persistent_transmission: PersistentTransmissionSettings, + pub membership: Vec< + Node<::PublicKey>, + >, +} + +impl MixConfig { + fn membership(&self) -> Membership { + let public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from( + self.message_blend.cryptographic_processor.private_key, + )) + .to_bytes(); + Membership::new(self.membership.clone(), public_key) + } } /// A message that is handled by [`MixService`]. diff --git a/testnet/cfgsync/Cargo.toml b/testnet/cfgsync/Cargo.toml index 4229e187..318760ab 100644 --- a/testnet/cfgsync/Cargo.toml +++ b/testnet/cfgsync/Cargo.toml @@ -9,6 +9,8 @@ clap = { version = "4", features = ["derive"] } nomos-executor = { path = "../../nodes/nomos-executor" } nomos-libp2p = { path = "../../nomos-libp2p" } nomos-node = { path = "../../nodes/nomos-node" } +nomos-mix = { path = "../../nomos-mix/core" } +nomos-mix-message = { path = "../../nomos-mix/message" } nomos-tracing = { path = "../../nomos-tracing" } nomos-tracing-service = { path = "../../nomos-services/tracing" } rand = "0.8" diff --git a/testnet/cfgsync/src/config.rs b/testnet/cfgsync/src/config.rs index 776b57a6..31a610af 100644 --- a/testnet/cfgsync/src/config.rs +++ b/testnet/cfgsync/src/config.rs @@ -1,7 +1,9 @@ // std use std::{collections::HashMap, net::Ipv4Addr, str::FromStr}; // crates -use nomos_libp2p::{Multiaddr, PeerId, Protocol}; +use nomos_libp2p::{Multiaddr, PeerId}; +use nomos_mix::membership::Node; +use nomos_mix_message::{mock::MockMixMessage, MixMessage}; use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig}; use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingSettings}; use rand::{thread_rng, Rng}; @@ -91,7 +93,7 @@ pub fn create_node_configs( let host_network_init_peers = update_network_init_peers(hosts.clone()); let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses); let host_mix_membership = - update_mix_membership(hosts.clone(), mix_configs[0].backend.membership.clone()); + update_mix_membership(hosts.clone(), mix_configs[0].membership.clone()); let new_peer_addresses: HashMap = host_da_peer_addresses .clone() @@ -122,7 +124,7 @@ pub fn create_node_configs( let mut mix_config = mix_configs[i].to_owned(); mix_config.backend.listening_address = Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap(); - mix_config.backend.membership = host_mix_membership.clone(); + mix_config.membership = host_mix_membership.clone(); // Tracing config. let tracing_config = @@ -170,32 +172,22 @@ fn update_da_peer_addresses( .collect() } -fn update_mix_membership(hosts: Vec, membership: Vec) -> Vec { +fn update_mix_membership( + hosts: Vec, + membership: Vec::PublicKey>>, +) -> Vec::PublicKey>> { membership .into_iter() .zip(hosts) - .map(|(addr, host)| { - Multiaddr::from_str(&format!( - "/ip4/{}/udp/{}/quic-v1/p2p/{}", - host.ip, - host.mix_port, - extract_peer_id(&addr).unwrap(), - )) - .unwrap() + .map(|(mut node, host)| { + node.address = + Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.mix_port)) + .unwrap(); + node }) .collect() } -fn extract_peer_id(multiaddr: &Multiaddr) -> Option { - multiaddr.iter().find_map(|protocol| { - if let Protocol::P2p(peer_id) = protocol { - Some(peer_id) - } else { - None - } - }) -} - fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> GeneralTracingConfig { GeneralTracingConfig { tracing_settings: TracingSettings { diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 77297970..8624f54e 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -12,6 +12,7 @@ nomos-executor = { path = "../nodes/nomos-executor", default-features = false } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] } nomos-mix = { path = "../nomos-mix/core" } +nomos-mix-message = { path = "../nomos-mix/message" } cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } nomos-tracing = { path = "../nomos-tracing" } nomos-tracing-service = { path = "../nomos-services/tracing" } @@ -50,6 +51,7 @@ criterion = { version = "0.5", features = ["async_tokio"] } nomos-cli = { path = "../nomos-cli" } time = "0.3" tracing = "0.1" +x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [[test]] name = "test_cryptarchia_happy_path" diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs index de525c1c..b6f1c66d 100644 --- a/tests/src/nodes/executor.rs +++ b/tests/src/nodes/executor.rs @@ -158,11 +158,15 @@ pub fn create_executor_config(config: GeneralConfig) -> Config { backend: config.mix_config.backend, persistent_transmission: Default::default(), message_blend: MessageBlendSettings { - cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + cryptographic_processor: CryptographicProcessorSettings { + private_key: config.mix_config.private_key.to_bytes(), + num_mix_layers: 1, + }, temporal_processor: TemporalProcessorSettings { max_delay_seconds: 2, }, }, + membership: config.mix_config.membership, }, cryptarchia: CryptarchiaSettings { leader_config: config.consensus_config.leader_config, diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs index 1a894f81..9e816196 100644 --- a/tests/src/nodes/validator.rs +++ b/tests/src/nodes/validator.rs @@ -243,13 +243,16 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { mix: nomos_mix_service::MixConfig { backend: config.mix_config.backend, persistent_transmission: Default::default(), - message_blend: MessageBlendSettings { - cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + cryptographic_processor: CryptographicProcessorSettings { + private_key: config.mix_config.private_key.to_bytes(), + num_mix_layers: 1, + }, temporal_processor: TemporalProcessorSettings { max_delay_seconds: 2, }, }, + membership: config.mix_config.membership, }, cryptarchia: CryptarchiaSettings { leader_config: config.consensus_config.leader_config, diff --git a/tests/src/topology/configs/mix.rs b/tests/src/topology/configs/mix.rs index 318f2e85..35676f94 100644 --- a/tests/src/topology/configs/mix.rs +++ b/tests/src/topology/configs/mix.rs @@ -1,13 +1,17 @@ use std::str::FromStr; use nomos_libp2p::{ed25519, Multiaddr}; +use nomos_mix::membership::Node; +use nomos_mix_message::{mock::MockMixMessage, MixMessage}; use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings; -use crate::{get_available_port, secret_key_to_peer_id}; +use crate::get_available_port; #[derive(Clone)] pub struct GeneralMixConfig { pub backend: Libp2pMixBackendSettings, + pub private_key: x25519_dalek::StaticSecret, + pub membership: Vec::PublicKey>>, } pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { @@ -26,33 +30,28 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { )) .unwrap(), node_key, - membership: Vec::new(), peering_degree: 1, }, + private_key: x25519_dalek::StaticSecret::random(), + membership: Vec::new(), } }) .collect(); - let membership = mix_membership(&configs); - + let nodes = mix_nodes(&configs); configs.iter_mut().for_each(|config| { - config.backend.membership = membership.clone(); + config.membership = nodes.clone(); }); configs } -fn mix_membership(configs: &[GeneralMixConfig]) -> Vec { +fn mix_nodes(configs: &[GeneralMixConfig]) -> Vec::PublicKey>> { configs .iter() - .map(|config| { - let peer_id = secret_key_to_peer_id(config.backend.node_key.clone()); - config - .backend - .listening_address - .clone() - .with_p2p(peer_id) - .unwrap_or_else(|orig_addr| orig_addr) + .map(|config| Node { + address: config.backend.listening_address.clone(), + public_key: x25519_dalek::PublicKey::from(&config.private_key).to_bytes(), }) .collect() }