1
0
mirror of synced 2025-02-15 01:06:48 +00:00

Use public keys for the mock mix message encoding (#905)

* add node IDs in message

* integrate

* handle variable-lenght messages in nomos-mix-network

* add trait MixMessage

* make tests work

* clippy happy

* remove println!

* remove unnecssary dependency

* remove unnecessary trait bounds

* remove leftover
This commit is contained in:
Youngjoon Lee 2024-11-06 10:49:38 +09:00 committed by GitHub
parent d0eafb7635
commit c84a29db31
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 530 additions and 251 deletions

View File

@ -52,10 +52,21 @@ mix:
backend: backend:
listening_address: /ip4/0.0.0.0/udp/3001/quic-v1 listening_address: /ip4/0.0.0.0/udp/3001/quic-v1
node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3 node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3
membership:
- /ip4/0.0.0.0/udp/3001/quic-v1/p2p/12D3KooWMj7KgmbDJ7RSXeFhFimvqddSXzA5XWDTwvdEJYfuoPhM
peering_degree: 1 peering_degree: 1
num_mix_layers: 1 persistent_transmission:
max_emission_frequency: 1
drop_message_probability: 0.5
message_blend:
cryptographic_processor:
private_key: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
num_mix_layers: 1
temporal_processor:
max_delay_seconds: 5
membership:
- address: /ip4/127.0.0.1/udp/3001/quic-v1
public_key: [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
- address: /ip4/127.0.0.1/udp/3002/quic-v1
public_key: [2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2]
http: http:
backend_settings: backend_settings:

View File

@ -83,9 +83,6 @@ pub struct MixArgs {
#[clap(long = "mix-node-key", env = "MIX_NODE_KEY")] #[clap(long = "mix-node-key", env = "MIX_NODE_KEY")]
mix_node_key: Option<String>, mix_node_key: Option<String>,
#[clap(long = "mix-membership", env = "MIX_MEMBERSHIP", num_args = 1.., value_delimiter = ',')]
pub mix_membership: Option<Vec<Multiaddr>>,
#[clap(long = "mix-peering-degree", env = "MIX_PEERING_DEGREE")] #[clap(long = "mix-peering-degree", env = "MIX_PEERING_DEGREE")]
mix_peering_degree: Option<usize>, mix_peering_degree: Option<usize>,
@ -256,7 +253,6 @@ pub fn update_mix(
let MixArgs { let MixArgs {
mix_addr, mix_addr,
mix_node_key, mix_node_key,
mix_membership,
mix_peering_degree, mix_peering_degree,
mix_num_mix_layers, mix_num_mix_layers,
} = mix_args; } = mix_args;
@ -270,10 +266,6 @@ pub fn update_mix(
mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?; mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
} }
if let Some(membership) = mix_membership {
mix.backend.membership = membership;
}
if let Some(peering_degree) = mix_peering_degree { if let Some(peering_degree) = mix_peering_degree {
mix.backend.peering_degree = peering_degree; mix.backend.peering_degree = peering_degree;
} }

View File

@ -12,9 +12,10 @@ rand = "0.8"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
nomos-mix-message = { path = "../message" } nomos-mix-message = { path = "../message" }
futures = "0.3" futures = "0.3"
multiaddr = "0.18"
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] } tokio = { version = "1", features = ["rt-multi-thread"] }
rand_chacha = "0.3" rand_chacha = "0.3"

View File

@ -1,3 +1,4 @@
pub mod membership;
pub mod message_blend; pub mod message_blend;
pub mod persistent_transmission; pub mod persistent_transmission;

View File

@ -0,0 +1,39 @@
use multiaddr::Multiaddr;
use nomos_mix_message::MixMessage;
use rand::{seq::SliceRandom, Rng};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug)]
pub struct Membership<M>
where
M: MixMessage,
{
remote_nodes: Vec<Node<M::PublicKey>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Node<K> {
pub address: Multiaddr,
pub public_key: K,
}
impl<M> Membership<M>
where
M: MixMessage,
M::PublicKey: PartialEq,
{
pub fn new(mut nodes: Vec<Node<M::PublicKey>>, local_public_key: M::PublicKey) -> Self {
nodes.retain(|node| node.public_key != local_public_key);
Self {
remote_nodes: nodes,
}
}
pub fn choose_remote_nodes<R: Rng>(
&self,
rng: &mut R,
amount: usize,
) -> Vec<&Node<M::PublicKey>> {
self.remote_nodes.choose_multiple(rng, amount).collect()
}
}

View File

@ -1,33 +1,59 @@
use nomos_mix_message::{new_message, unwrap_message}; use crate::membership::Membership;
use nomos_mix_message::MixMessage;
use rand::RngCore;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages /// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages
/// for the message indistinguishability. /// for the message indistinguishability.
#[derive(Clone, Copy, Debug)] pub struct CryptographicProcessor<R, M>
pub struct CryptographicProcessor { where
settings: CryptographicProcessorSettings, M: MixMessage,
{
settings: CryptographicProcessorSettings<M::PrivateKey>,
membership: Membership<M>,
rng: R,
} }
#[derive(Clone, Copy, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CryptographicProcessorSettings { pub struct CryptographicProcessorSettings<K> {
pub private_key: K,
pub num_mix_layers: usize, pub num_mix_layers: usize,
} }
impl CryptographicProcessor { impl<R, M> CryptographicProcessor<R, M>
pub fn new(settings: CryptographicProcessorSettings) -> Self { where
Self { settings } R: RngCore,
M: MixMessage,
M::PublicKey: Clone + PartialEq,
{
pub fn new(
settings: CryptographicProcessorSettings<M::PrivateKey>,
membership: Membership<M>,
rng: R,
) -> Self {
Self {
settings,
membership,
rng,
}
} }
pub fn wrap_message(&self, message: &[u8]) -> Result<Vec<u8>, nomos_mix_message::Error> { pub fn wrap_message(&mut self, message: &[u8]) -> Result<Vec<u8>, nomos_mix_message::Error> {
// TODO: Use the actual Sphinx encoding instead of mock. // TODO: Use the actual Sphinx encoding instead of mock.
// TODO: Select `num_mix_layers` random nodes from the membership. let public_keys = self
new_message(message, self.settings.num_mix_layers.try_into().unwrap()) .membership
.choose_remote_nodes(&mut self.rng, self.settings.num_mix_layers)
.iter()
.map(|node| node.public_key.clone())
.collect::<Vec<_>>();
M::build_message(message, &public_keys)
} }
pub fn unwrap_message( pub fn unwrap_message(
&self, &self,
message: &[u8], message: &[u8],
) -> Result<(Vec<u8>, bool), nomos_mix_message::Error> { ) -> Result<(Vec<u8>, bool), nomos_mix_message::Error> {
unwrap_message(message) M::unwrap_message(message, &self.settings.private_key)
} }
} }

View File

@ -10,41 +10,64 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub use temporal::TemporalProcessorSettings; pub use temporal::TemporalProcessorSettings;
use crate::membership::Membership;
use crate::message_blend::crypto::CryptographicProcessor; use crate::message_blend::crypto::CryptographicProcessor;
use crate::message_blend::temporal::TemporalProcessorExt; use crate::message_blend::temporal::TemporalProcessorExt;
use crate::MixOutgoingMessage; use crate::MixOutgoingMessage;
use nomos_mix_message::MixMessage;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MessageBlendSettings { pub struct MessageBlendSettings<M>
pub cryptographic_processor: CryptographicProcessorSettings, where
M: MixMessage,
M::PrivateKey: Serialize + DeserializeOwned,
{
pub cryptographic_processor: CryptographicProcessorSettings<M::PrivateKey>,
pub temporal_processor: TemporalProcessorSettings, pub temporal_processor: TemporalProcessorSettings,
} }
/// [`MessageBlendStream`] handles the entire mixing tiers process /// [`MessageBlendStream`] handles the entire mixing tiers process
/// - Unwraps incoming messages received from network using [`CryptographicProcessor`] /// - Unwraps incoming messages received from network using [`CryptographicProcessor`]
/// - Pushes unwrapped messages to [`TemporalProcessor`] /// - Pushes unwrapped messages to [`TemporalProcessor`]
pub struct MessageBlendStream<S, Rng> { pub struct MessageBlendStream<S, Rng, M>
where
M: MixMessage,
{
input_stream: S, input_stream: S,
output_stream: BoxStream<'static, MixOutgoingMessage>, output_stream: BoxStream<'static, MixOutgoingMessage>,
temporal_sender: UnboundedSender<MixOutgoingMessage>, temporal_sender: UnboundedSender<MixOutgoingMessage>,
cryptographic_processor: CryptographicProcessor, cryptographic_processor: CryptographicProcessor<Rng, M>,
_rng: PhantomData<Rng>, _rng: PhantomData<Rng>,
} }
impl<S, Rng> MessageBlendStream<S, Rng> impl<S, Rng, M> MessageBlendStream<S, Rng, M>
where where
S: Stream<Item = Vec<u8>>, S: Stream<Item = Vec<u8>>,
Rng: RngCore + Unpin + Send + 'static, Rng: RngCore + Unpin + Send + 'static,
M: MixMessage,
M::PrivateKey: Serialize + DeserializeOwned,
M::PublicKey: Clone + PartialEq,
{ {
pub fn new(input_stream: S, settings: MessageBlendSettings, rng: Rng) -> Self { pub fn new(
let cryptographic_processor = CryptographicProcessor::new(settings.cryptographic_processor); input_stream: S,
settings: MessageBlendSettings<M>,
membership: Membership<M>,
cryptographic_processor_rng: Rng,
temporal_processor_rng: Rng,
) -> Self {
let cryptographic_processor = CryptographicProcessor::new(
settings.cryptographic_processor,
membership,
cryptographic_processor_rng,
);
let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel(); let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel();
let output_stream = UnboundedReceiverStream::new(temporal_receiver) let output_stream = UnboundedReceiverStream::new(temporal_receiver)
.temporal_stream(settings.temporal_processor, rng) .temporal_stream(settings.temporal_processor, temporal_processor_rng)
.boxed(); .boxed();
Self { Self {
input_stream, input_stream,
@ -77,10 +100,13 @@ where
} }
} }
impl<S, Rng> Stream for MessageBlendStream<S, Rng> impl<S, Rng, M> Stream for MessageBlendStream<S, Rng, M>
where where
S: Stream<Item = Vec<u8>> + Unpin, S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin + Send + 'static, Rng: RngCore + Unpin + Send + 'static,
M: MixMessage + Unpin,
M::PrivateKey: Serialize + DeserializeOwned + Unpin,
M::PublicKey: Clone + PartialEq + Unpin,
{ {
type Item = MixOutgoingMessage; type Item = MixOutgoingMessage;
@ -92,25 +118,39 @@ where
} }
} }
pub trait MessageBlendExt<Rng>: Stream<Item = Vec<u8>> pub trait MessageBlendExt<Rng, M>: Stream<Item = Vec<u8>>
where where
Rng: RngCore + Send + Unpin + 'static, Rng: RngCore + Send + Unpin + 'static,
M: MixMessage,
M::PrivateKey: Serialize + DeserializeOwned,
M::PublicKey: Clone + PartialEq,
{ {
fn blend( fn blend(
self, self,
message_blend_settings: MessageBlendSettings, message_blend_settings: MessageBlendSettings<M>,
rng: Rng, membership: Membership<M>,
) -> MessageBlendStream<Self, Rng> cryptographic_processor_rng: Rng,
temporal_processor_rng: Rng,
) -> MessageBlendStream<Self, Rng, M>
where where
Self: Sized + Unpin, Self: Sized + Unpin,
{ {
MessageBlendStream::new(self, message_blend_settings, rng) MessageBlendStream::new(
self,
message_blend_settings,
membership,
cryptographic_processor_rng,
temporal_processor_rng,
)
} }
} }
impl<T, Rng> MessageBlendExt<Rng> for T impl<T, Rng, M> MessageBlendExt<Rng, M> for T
where where
T: Stream<Item = Vec<u8>>, T: Stream<Item = Vec<u8>>,
Rng: RngCore + Unpin + Send + 'static, Rng: RngCore + Unpin + Send + 'static,
M: MixMessage,
M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq,
M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq,
{ {
} }

View File

@ -1,7 +1,9 @@
use futures::Stream; use futures::Stream;
use nomos_mix_message::DROP_MESSAGE; use nomos_mix_message::MixMessage;
use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore}; use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use std::pin::{pin, Pin}; use std::pin::{pin, Pin};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
@ -26,7 +28,7 @@ impl Default for PersistentTransmissionSettings {
} }
/// Transmit scheduled messages with a persistent rate as a stream. /// Transmit scheduled messages with a persistent rate as a stream.
pub struct PersistentTransmissionStream<S, Rng> pub struct PersistentTransmissionStream<S, Rng, M>
where where
S: Stream, S: Stream,
Rng: RngCore, Rng: RngCore,
@ -34,18 +36,20 @@ where
interval: Interval, interval: Interval,
coin: Coin<Rng>, coin: Coin<Rng>,
stream: S, stream: S,
_mix_message: PhantomData<M>,
} }
impl<S, Rng> PersistentTransmissionStream<S, Rng> impl<S, Rng, M> PersistentTransmissionStream<S, Rng, M>
where where
S: Stream, S: Stream,
Rng: RngCore, Rng: RngCore,
M: MixMessage,
{ {
pub fn new( pub fn new(
settings: PersistentTransmissionSettings, settings: PersistentTransmissionSettings,
stream: S, stream: S,
rng: Rng, rng: Rng,
) -> PersistentTransmissionStream<S, Rng> { ) -> PersistentTransmissionStream<S, Rng, M> {
let interval = time::interval(Duration::from_secs_f64( let interval = time::interval(Duration::from_secs_f64(
1.0 / settings.max_emission_frequency, 1.0 / settings.max_emission_frequency,
)); ));
@ -54,14 +58,16 @@ where
interval, interval,
coin, coin,
stream, stream,
_mix_message: Default::default(),
} }
} }
} }
impl<S, Rng> Stream for PersistentTransmissionStream<S, Rng> impl<S, Rng, M> Stream for PersistentTransmissionStream<S, Rng, M>
where where
S: Stream<Item = Vec<u8>> + Unpin, S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin, Rng: RngCore + Unpin,
M: MixMessage + Unpin,
{ {
type Item = Vec<u8>; type Item = Vec<u8>;
@ -78,22 +84,23 @@ where
if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) { if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) {
Poll::Ready(Some(item)) Poll::Ready(Some(item))
} else if coin.flip() { } else if coin.flip() {
Poll::Ready(Some(DROP_MESSAGE.to_vec())) Poll::Ready(Some(M::DROP_MESSAGE.to_vec()))
} else { } else {
Poll::Pending Poll::Pending
} }
} }
} }
pub trait PersistentTransmissionExt<Rng>: Stream pub trait PersistentTransmissionExt<Rng, M>: Stream
where where
Rng: RngCore, Rng: RngCore,
M: MixMessage,
{ {
fn persistent_transmission( fn persistent_transmission(
self, self,
settings: PersistentTransmissionSettings, settings: PersistentTransmissionSettings,
rng: Rng, rng: Rng,
) -> PersistentTransmissionStream<Self, Rng> ) -> PersistentTransmissionStream<Self, Rng, M>
where where
Self: Sized + Unpin, Self: Sized + Unpin,
{ {
@ -101,10 +108,12 @@ where
} }
} }
impl<S, Rng> PersistentTransmissionExt<Rng> for S impl<S, Rng, M> PersistentTransmissionExt<Rng, M> for S
where where
S: Stream, S: Stream,
Rng: RngCore, Rng: RngCore,
M: MixMessage,
M::PublicKey: Clone + Serialize + DeserializeOwned,
{ {
} }
@ -141,6 +150,7 @@ enum CoinError {
mod tests { mod tests {
use super::*; use super::*;
use futures::StreamExt; use futures::StreamExt;
use nomos_mix_message::mock::MockMixMessage;
use rand::SeedableRng; use rand::SeedableRng;
use rand_chacha::ChaCha8Rng; use rand_chacha::ChaCha8Rng;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -183,7 +193,7 @@ mod tests {
let lower_bound = expected_emission_interval - torelance; let lower_bound = expected_emission_interval - torelance;
let upper_bound = expected_emission_interval + torelance; let upper_bound = expected_emission_interval + torelance;
// prepare stream // prepare stream
let mut persistent_transmission_stream = let mut persistent_transmission_stream: PersistentTransmissionStream<_, _, MockMixMessage> =
stream.persistent_transmission(settings, ChaCha8Rng::from_entropy()); stream.persistent_transmission(settings, ChaCha8Rng::from_entropy());
// Messages must be scheduled in non-blocking manner. // Messages must be scheduled in non-blocking manner.
schedule_sender.send(vec![1]).unwrap(); schedule_sender.send(vec![1]).unwrap();
@ -209,16 +219,14 @@ mod tests {
); );
assert_interval!(&mut last_time, lower_bound, upper_bound); assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!( assert!(MockMixMessage::is_drop_message(
persistent_transmission_stream.next().await.unwrap(), &persistent_transmission_stream.next().await.unwrap()
DROP_MESSAGE.to_vec() ));
);
assert_interval!(&mut last_time, lower_bound, upper_bound); assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!( assert!(MockMixMessage::is_drop_message(
persistent_transmission_stream.next().await.unwrap(), &persistent_transmission_stream.next().await.unwrap()
DROP_MESSAGE.to_vec() ));
);
assert_interval!(&mut last_time, lower_bound, upper_bound); assert_interval!(&mut last_time, lower_bound, upper_bound);
// Schedule a new message and check if it is emitted at the next interval // Schedule a new message and check if it is emitted at the next interval

View File

@ -5,7 +5,6 @@ edition = "2021"
[dependencies] [dependencies]
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
sha2 = "0.10"
sphinx-packet = "0.2" sphinx-packet = "0.2"
thiserror = "1.0.65" thiserror = "1.0.65"
x25519-dalek = { version = "2.0.1", features = [ x25519-dalek = { version = "2.0.1", features = [

View File

@ -4,8 +4,8 @@ pub enum Error {
InvalidMixMessage, InvalidMixMessage,
#[error("Payload is too large")] #[error("Payload is too large")]
PayloadTooLarge, PayloadTooLarge,
#[error("Too many recipients")] #[error("Invalid number of layers")]
TooManyRecipients, InvalidNumberOfLayers,
#[error("Sphinx packet error: {0}")] #[error("Sphinx packet error: {0}")]
SphinxPacketError(#[from] sphinx_packet::Error), SphinxPacketError(#[from] sphinx_packet::Error),
#[error("Unwrapping a message is not allowed to this node")] #[error("Unwrapping a message is not allowed to this node")]

View File

@ -1,64 +1,27 @@
mod error; mod error;
pub mod mock;
pub mod packet; pub mod packet;
pub use error::Error; pub use error::Error;
use sha2::{Digest, Sha256}; pub trait MixMessage {
type PublicKey;
type PrivateKey;
const DROP_MESSAGE: &'static [u8];
pub const MSG_SIZE: usize = 2048; fn build_message(payload: &[u8], public_keys: &[Self::PublicKey]) -> Result<Vec<u8>, Error>;
pub const DROP_MESSAGE: [u8; MSG_SIZE] = [0; MSG_SIZE]; /// Unwrap the message one layer.
///
// TODO: Remove all the mock below once the actual implementation is integrated to the system. /// This function returns the unwrapped message and a boolean indicating whether the message was fully unwrapped.
// /// (False if the message still has layers to be unwrapped, true otherwise)
/// A mock implementation of the Sphinx encoding. ///
/// /// If the input message was already fully unwrapped, or if its format is invalid,
/// The length of the encoded message is fixed to [`MSG_SIZE`] bytes. /// this function returns `[Error::InvalidMixMessage]`.
/// The first byte of the encoded message is the number of remaining layers to be unwrapped. fn unwrap_message(
/// The remaining bytes are the payload that is zero-padded to the end. message: &[u8],
pub fn new_message(payload: &[u8], num_layers: u8) -> Result<Vec<u8>, Error> { private_key: &Self::PrivateKey,
if payload.len() > MSG_SIZE - 1 { ) -> Result<(Vec<u8>, bool), Error>;
return Err(Error::PayloadTooLarge); fn is_drop_message(message: &[u8]) -> bool {
} message == Self::DROP_MESSAGE
let mut message: Vec<u8> = Vec::with_capacity(MSG_SIZE);
message.push(num_layers);
message.extend(payload);
message.extend(std::iter::repeat(0).take(MSG_SIZE - message.len()));
Ok(message)
}
/// SHA-256 hash of the message
pub fn message_id(message: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(message);
hasher.finalize().to_vec()
}
/// Unwrap the message one layer.
///
/// This function returns the unwrapped message and a boolean indicating whether the message was fully unwrapped.
/// (False if the message still has layers to be unwrapped, true otherwise)
///
/// If the input message was already fully unwrapped, or if ititss format is invalid,
/// this function returns `[Error::InvalidMixMessage]`.
pub fn unwrap_message(message: &[u8]) -> Result<(Vec<u8>, bool), Error> {
if message.is_empty() {
return Err(Error::InvalidMixMessage);
}
match message[0] {
0 => Err(Error::InvalidMixMessage),
1 => Ok((message[1..].to_vec(), true)),
n => {
let mut unwrapped: Vec<u8> = Vec::with_capacity(message.len());
unwrapped.push(n - 1);
unwrapped.extend(&message[1..]);
Ok((unwrapped, false))
}
} }
} }
/// Check if the message is a drop message.
pub fn is_drop_message(message: &[u8]) -> bool {
message == DROP_MESSAGE
}

View File

@ -0,0 +1,125 @@
use crate::{Error, MixMessage};
// TODO: Remove all the mock below once the actual implementation is integrated to the system.
//
/// A mock implementation of the Sphinx encoding.
const PRIVATE_KEY_SIZE: usize = 32;
const PUBLIC_KEY_SIZE: usize = 32;
const PADDED_PAYLOAD_SIZE: usize = 2048;
const PAYLOAD_PADDING_SEPARATOR: u8 = 0x01;
const PAYLOAD_PADDING_SEPARATOR_SIZE: usize = 1;
const MAX_LAYERS: usize = 5;
pub const MESSAGE_SIZE: usize = PUBLIC_KEY_SIZE * MAX_LAYERS + PADDED_PAYLOAD_SIZE;
#[derive(Clone, Debug)]
pub struct MockMixMessage;
impl MixMessage for MockMixMessage {
type PublicKey = [u8; PUBLIC_KEY_SIZE];
type PrivateKey = [u8; PRIVATE_KEY_SIZE];
const DROP_MESSAGE: &'static [u8] = &[0; MESSAGE_SIZE];
/// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes.
/// The [`MAX_LAYERS`] number of [`NodeId`]s are concatenated in front of the payload.
/// The payload is zero-padded to the end.
///
fn build_message(payload: &[u8], public_keys: &[Self::PublicKey]) -> Result<Vec<u8>, Error> {
if public_keys.is_empty() || public_keys.len() > MAX_LAYERS {
return Err(Error::InvalidNumberOfLayers);
}
if payload.len() > PADDED_PAYLOAD_SIZE - PAYLOAD_PADDING_SEPARATOR_SIZE {
return Err(Error::PayloadTooLarge);
}
let mut message: Vec<u8> = Vec::with_capacity(MESSAGE_SIZE);
public_keys.iter().for_each(|public_key| {
message.extend(public_key);
});
// If there is any remaining layers, fill them with zeros.
(0..MAX_LAYERS - public_keys.len()).for_each(|_| message.extend(&[0; PUBLIC_KEY_SIZE]));
// Append payload with padding
message.extend(payload);
message.push(PAYLOAD_PADDING_SEPARATOR);
message.extend(
std::iter::repeat(0)
.take(PADDED_PAYLOAD_SIZE - payload.len() - PAYLOAD_PADDING_SEPARATOR_SIZE),
);
Ok(message)
}
fn unwrap_message(
message: &[u8],
private_key: &Self::PrivateKey,
) -> Result<(Vec<u8>, bool), Error> {
if message.len() != MESSAGE_SIZE {
return Err(Error::InvalidMixMessage);
}
let public_key =
x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(*private_key))
.to_bytes();
if message[0..PUBLIC_KEY_SIZE] != public_key {
return Err(Error::MsgUnwrapNotAllowed);
}
// If this is the last layer
if message[PUBLIC_KEY_SIZE..PUBLIC_KEY_SIZE * 2] == [0; PUBLIC_KEY_SIZE] {
let padded_payload = &message[PUBLIC_KEY_SIZE * MAX_LAYERS..];
// remove the payload padding
match padded_payload
.iter()
.rposition(|&x| x == PAYLOAD_PADDING_SEPARATOR)
{
Some(pos) => {
return Ok((padded_payload[0..pos].to_vec(), true));
}
_ => return Err(Error::InvalidMixMessage),
}
}
let mut new_message: Vec<u8> = Vec::with_capacity(MESSAGE_SIZE);
new_message.extend(&message[PUBLIC_KEY_SIZE..PUBLIC_KEY_SIZE * MAX_LAYERS]);
new_message.extend(&[0; PUBLIC_KEY_SIZE]);
new_message.extend(&message[PUBLIC_KEY_SIZE * MAX_LAYERS..]); // padded payload
Ok((new_message, false))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn message() {
let private_keys = [
x25519_dalek::StaticSecret::random(),
x25519_dalek::StaticSecret::random(),
x25519_dalek::StaticSecret::random(),
];
let public_keys = private_keys
.iter()
.map(|k| x25519_dalek::PublicKey::from(k).to_bytes())
.collect::<Vec<_>>();
let payload = [7; 10];
let message = MockMixMessage::build_message(&payload, &public_keys).unwrap();
assert_eq!(message.len(), MESSAGE_SIZE);
let (message, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &private_keys[0].to_bytes()).unwrap();
assert!(!is_fully_unwrapped);
assert_eq!(message.len(), MESSAGE_SIZE);
let (message, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &private_keys[1].to_bytes()).unwrap();
assert!(!is_fully_unwrapped);
assert_eq!(message.len(), MESSAGE_SIZE);
let (unwrapped_payload, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &private_keys[2].to_bytes()).unwrap();
assert!(is_fully_unwrapped);
assert_eq!(unwrapped_payload, payload);
}
}

View File

@ -61,7 +61,7 @@ impl Packet {
ephemeral_public_key: x25519_dalek::PublicKey::from(&ephemeral_privkey), ephemeral_public_key: x25519_dalek::PublicKey::from(&ephemeral_privkey),
routing_info: RoutingInfo { routing_info: RoutingInfo {
remaining_layers: u8::try_from(recipient_pubkeys.len()) remaining_layers: u8::try_from(recipient_pubkeys.len())
.map_err(|_| Error::TooManyRecipients)?, .map_err(|_| Error::InvalidNumberOfLayers)?,
}, },
}, },
payload: payload.into_bytes(), payload: payload.into_bytes(),

View File

@ -11,6 +11,7 @@ libp2p = "0.53"
tracing = "0.1" tracing = "0.1"
nomos-mix = { path = "../core" } nomos-mix = { path = "../core" }
nomos-mix-message = { path = "../message" } nomos-mix-message = { path = "../message" }
sha2 = "0.10"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }

View File

@ -1,8 +1,7 @@
use std::{ use crate::{
collections::{HashMap, HashSet, VecDeque}, error::Error,
task::{Context, Poll, Waker}, handler::{FromBehaviour, MixConnectionHandler, ToBehaviour},
}; };
use cached::{Cached, TimedCache}; use cached::{Cached, TimedCache};
use libp2p::{ use libp2p::{
core::Endpoint, core::Endpoint,
@ -12,17 +11,18 @@ use libp2p::{
}, },
Multiaddr, PeerId, Multiaddr, PeerId,
}; };
use nomos_mix_message::{is_drop_message, message_id}; use nomos_mix_message::MixMessage;
use sha2::{Digest, Sha256};
use crate::{ use std::marker::PhantomData;
error::Error, use std::{
handler::{FromBehaviour, MixConnectionHandler, ToBehaviour}, collections::{HashMap, HashSet, VecDeque},
task::{Context, Poll, Waker},
}; };
/// A [`NetworkBehaviour`]: /// A [`NetworkBehaviour`]:
/// - forwards messages to all connected peers with deduplication. /// - forwards messages to all connected peers with deduplication.
/// - receives messages from all connected peers. /// - receives messages from all connected peers.
pub struct Behaviour { pub struct Behaviour<M> {
config: Config, config: Config,
/// Peers that support the mix protocol, and their connection IDs /// Peers that support the mix protocol, and their connection IDs
negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>, negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>,
@ -33,6 +33,7 @@ pub struct Behaviour {
/// An LRU time cache for storing seen messages (based on their ID). This cache prevents /// An LRU time cache for storing seen messages (based on their ID). This cache prevents
/// duplicates from being propagated on the network. /// duplicates from being propagated on the network.
duplicate_cache: TimedCache<Vec<u8>, ()>, duplicate_cache: TimedCache<Vec<u8>, ()>,
_mix_message: PhantomData<M>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -47,7 +48,10 @@ pub enum Event {
Error(Error), Error(Error),
} }
impl Behaviour { impl<M> Behaviour<M>
where
M: MixMessage,
{
pub fn new(config: Config) -> Self { pub fn new(config: Config) -> Self {
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
Self { Self {
@ -56,17 +60,18 @@ impl Behaviour {
events: VecDeque::new(), events: VecDeque::new(),
waker: None, waker: None,
duplicate_cache, duplicate_cache,
_mix_message: Default::default(),
} }
} }
/// Publish a message (data or drop) to all connected peers /// Publish a message (data or drop) to all connected peers
pub fn publish(&mut self, message: Vec<u8>) -> Result<(), Error> { pub fn publish(&mut self, message: Vec<u8>) -> Result<(), Error> {
if is_drop_message(&message) { if M::is_drop_message(&message) {
// Bypass deduplication for the drop message // Bypass deduplication for the drop message
return self.forward_message(message, None); return self.forward_message(message, None);
} }
let msg_id = message_id(&message); let msg_id = Self::message_id(&message);
// If the message was already seen, don't forward it again // If the message was already seen, don't forward it again
if self.duplicate_cache.cache_get(&msg_id).is_some() { if self.duplicate_cache.cache_get(&msg_id).is_some() {
return Ok(()); return Ok(());
@ -136,6 +141,13 @@ impl Behaviour {
} }
} }
/// SHA-256 hash of the message
fn message_id(message: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(message);
hasher.finalize().to_vec()
}
fn try_wake(&mut self) { fn try_wake(&mut self) {
if let Some(waker) = self.waker.take() { if let Some(waker) = self.waker.take() {
waker.wake(); waker.wake();
@ -143,7 +155,10 @@ impl Behaviour {
} }
} }
impl NetworkBehaviour for Behaviour { impl<M> NetworkBehaviour for Behaviour<M>
where
M: MixMessage + 'static,
{
type ConnectionHandler = MixConnectionHandler; type ConnectionHandler = MixConnectionHandler;
type ToSwarm = Event; type ToSwarm = Event;
@ -191,14 +206,14 @@ impl NetworkBehaviour for Behaviour {
// A message was forwarded from the peer. // A message was forwarded from the peer.
ToBehaviour::Message(message) => { ToBehaviour::Message(message) => {
// Ignore drop message // Ignore drop message
if is_drop_message(&message) { if M::is_drop_message(&message) {
return; return;
} }
// Add the message to the cache. If it was already seen, ignore it. // Add the message to the cache. If it was already seen, ignore it.
if self if self
.duplicate_cache .duplicate_cache
.cache_set(message_id(&message), ()) .cache_set(Self::message_id(&message), ())
.is_some() .is_some()
{ {
return; return;

View File

@ -13,7 +13,6 @@ use libp2p::{
}, },
Stream, StreamProtocol, Stream, StreamProtocol,
}; };
use nomos_mix_message::MSG_SIZE;
use crate::behaviour::Config; use crate::behaviour::Config;
@ -248,15 +247,28 @@ impl ConnectionHandler for MixConnectionHandler {
/// Write a message to the stream /// Write a message to the stream
async fn send_msg(mut stream: Stream, msg: Vec<u8>) -> io::Result<Stream> { async fn send_msg(mut stream: Stream, msg: Vec<u8>) -> io::Result<Stream> {
let msg_len: u16 = msg.len().try_into().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"Message length is too big. Got {}, expected {}",
msg.len(),
std::mem::size_of::<u16>()
),
)
})?;
stream.write_all(msg_len.to_be_bytes().as_ref()).await?;
stream.write_all(&msg).await?; stream.write_all(&msg).await?;
stream.flush().await?; stream.flush().await?;
Ok(stream) Ok(stream)
} }
/// Read a message from the stream
/// Read a fixed-length message from the stream
// TODO: Consider handling variable-length messages
async fn recv_msg(mut stream: Stream) -> io::Result<(Stream, Vec<u8>)> { async fn recv_msg(mut stream: Stream) -> io::Result<(Stream, Vec<u8>)> {
let mut buf = vec![0; MSG_SIZE]; let mut msg_len = [0; std::mem::size_of::<u16>()];
stream.read_exact(&mut msg_len).await?;
let msg_len = u16::from_be_bytes(msg_len) as usize;
let mut buf = vec![0; msg_len];
stream.read_exact(&mut buf).await?; stream.read_exact(&mut buf).await?;
Ok((stream, buf)) Ok((stream, buf))
} }

View File

@ -14,7 +14,7 @@ mod test {
swarm::{dummy, NetworkBehaviour, SwarmEvent}, swarm::{dummy, NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId, Swarm, SwarmBuilder, Multiaddr, PeerId, Swarm, SwarmBuilder,
}; };
use nomos_mix_message::MSG_SIZE; use nomos_mix_message::mock::MockMixMessage;
use tokio::select; use tokio::select;
use crate::{behaviour::Config, error::Error, Behaviour, Event}; use crate::{behaviour::Config, error::Error, Behaviour, Event};
@ -43,7 +43,7 @@ mod test {
// Swamr2 publishes a message. // Swamr2 publishes a message.
let task = async { let task = async {
let msg = vec![1; MSG_SIZE]; let msg = vec![1; 10];
let mut msg_published = false; let mut msg_published = false;
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
loop { loop {
@ -98,7 +98,7 @@ mod test {
// Expect all publish attempts to fail with [`Error::NoPeers`] // Expect all publish attempts to fail with [`Error::NoPeers`]
// because swarm2 doesn't have any peers that support the mix protocol. // because swarm2 doesn't have any peers that support the mix protocol.
let msg = vec![1; MSG_SIZE]; let msg = vec![1; 10];
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
let mut publish_try_count = 0; let mut publish_try_count = 0;
loop { loop {
@ -116,7 +116,7 @@ mod test {
} }
} }
fn new_swarm(key: Keypair) -> Swarm<Behaviour> { fn new_swarm(key: Keypair) -> Swarm<Behaviour<MockMixMessage>> {
new_swarm_with_behaviour( new_swarm_with_behaviour(
key, key,
Behaviour::new(Config { Behaviour::new(Config {

View File

@ -25,6 +25,7 @@ nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb
nomos-network = { path = "../../network", features = ["mock"] } nomos-network = { path = "../../network", features = ["mock"] }
nomos-mix-service = { path = "../../mix" } nomos-mix-service = { path = "../../mix" }
nomos-mix = { path = "../../../nomos-mix/core" } nomos-mix = { path = "../../../nomos-mix/core" }
nomos-mix-message = { path = "../../../nomos-mix/message" }
nomos-libp2p = { path = "../../../nomos-libp2p" } nomos-libp2p = { path = "../../../nomos-libp2p" }
libp2p = { version = "0.53.2", features = ["ed25519"] } libp2p = { version = "0.53.2", features = ["ed25519"] }
once_cell = "1.19" once_cell = "1.19"
@ -41,6 +42,7 @@ time = "0.3"
[dev-dependencies] [dev-dependencies]
blake2 = { version = "0.10" } blake2 = { version = "0.10" }
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
[features] [features]
default = ["libp2p"] default = ["libp2p"]

View File

@ -1,9 +1,12 @@
use cryptarchia_consensus::LeaderConfig; use cryptarchia_consensus::LeaderConfig;
// std // std
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_mix::membership::Node;
use nomos_mix::message_blend::{ use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
}; };
use nomos_mix_message::mock::MockMixMessage;
use nomos_mix_message::MixMessage;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
// crates // crates
@ -187,13 +190,19 @@ pub struct TestDaNetworkSettings {
pub node_key: ed25519::SecretKey, pub node_key: ed25519::SecretKey,
} }
pub struct TestMixSettings {
pub backend: Libp2pMixBackendSettings,
pub private_key: x25519_dalek::StaticSecret,
pub membership: Vec<Node<<MockMixMessage as MixMessage>::PublicKey>>,
}
pub fn new_node( pub fn new_node(
leader_config: &LeaderConfig, leader_config: &LeaderConfig,
ledger_config: &nomos_ledger::Config, ledger_config: &nomos_ledger::Config,
genesis_state: &LedgerState, genesis_state: &LedgerState,
time_config: &TimeConfig, time_config: &TimeConfig,
swarm_config: &SwarmConfig, swarm_config: &SwarmConfig,
mix_config: &Libp2pMixBackendSettings, mix_config: &TestMixSettings,
db_path: PathBuf, db_path: PathBuf,
blobs_dir: &PathBuf, blobs_dir: &PathBuf,
initial_peers: Vec<Multiaddr>, initial_peers: Vec<Multiaddr>,
@ -210,14 +219,18 @@ pub fn new_node(
}, },
}, },
mix: MixConfig { mix: MixConfig {
backend: mix_config.clone(), backend: mix_config.backend.clone(),
persistent_transmission: Default::default(), persistent_transmission: Default::default(),
message_blend: MessageBlendSettings { message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, cryptographic_processor: CryptographicProcessorSettings {
private_key: mix_config.private_key.to_bytes(),
num_mix_layers: 1,
},
temporal_processor: TemporalProcessorSettings { temporal_processor: TemporalProcessorSettings {
max_delay_seconds: 2, max_delay_seconds: 2,
}, },
}, },
membership: mix_config.membership.clone(),
}, },
da_network: DaNetworkConfig { da_network: DaNetworkConfig {
backend: DaNetworkBackendSettings { backend: DaNetworkBackendSettings {
@ -308,35 +321,37 @@ pub fn new_node(
.unwrap() .unwrap()
} }
pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<Libp2pMixBackendSettings> { pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestMixSettings> {
let mut configs = listening_addresses let settings = listening_addresses
.iter() .iter()
.map(|listening_address| Libp2pMixBackendSettings { .map(|listening_address| {
listening_address: listening_address.clone(), (
node_key: ed25519::SecretKey::generate(), Libp2pMixBackendSettings {
membership: Vec::new(), listening_address: listening_address.clone(),
peering_degree: 1, node_key: ed25519::SecretKey::generate(),
peering_degree: 1,
},
x25519_dalek::StaticSecret::random(),
)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let membership = configs let membership = settings
.iter() .iter()
.map(|c| { .map(|(backend, private_key)| Node {
let peer_id = PeerId::from_public_key( address: backend.listening_address.clone(),
&ed25519::Keypair::from(c.node_key.clone()).public().into(), public_key: x25519_dalek::PublicKey::from(private_key).to_bytes(),
);
c.listening_address
.clone()
.with_p2p(peer_id)
.unwrap_or_else(|orig_addr| orig_addr)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
configs settings
.iter_mut() .into_iter()
.for_each(|c| c.membership = membership.clone()); .map(|(backend, private_key)| TestMixSettings {
backend,
configs private_key,
membership: membership.clone(),
})
.collect()
} }
// Client node is only created for asyncroniously interact with nodes in the test. // Client node is only created for asyncroniously interact with nodes in the test.

View File

@ -20,6 +20,7 @@ serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["macros", "sync"] } tokio = { version = "1", features = ["macros", "sync"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tracing = "0.1" tracing = "0.1"
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
[features] [features]
default = [] default = []

View File

@ -1,16 +1,19 @@
use std::{io, pin::Pin, time::Duration}; use std::{io, pin::Pin, time::Duration};
use super::MixBackend;
use async_trait::async_trait; use async_trait::async_trait;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use libp2p::{ use libp2p::{
core::transport::ListenerId, core::transport::ListenerId,
identity::{ed25519, Keypair}, identity::{ed25519, Keypair},
swarm::SwarmEvent, swarm::SwarmEvent,
Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, Multiaddr, Swarm, SwarmBuilder, TransportError,
}; };
use nomos_libp2p::{secret_key_serde, DialError, DialOpts, Protocol}; use nomos_libp2p::{secret_key_serde, DialError, DialOpts};
use nomos_mix::membership::Membership;
use nomos_mix_message::mock::MockMixMessage;
use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::seq::IteratorRandom; use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{ use tokio::{
sync::{broadcast, mpsc}, sync::{broadcast, mpsc},
@ -18,8 +21,6 @@ use tokio::{
}; };
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
use super::MixBackend;
/// A mix backend that uses the libp2p network stack. /// A mix backend that uses the libp2p network stack.
pub struct Libp2pMixBackend { pub struct Libp2pMixBackend {
#[allow(dead_code)] #[allow(dead_code)]
@ -34,7 +35,6 @@ pub struct Libp2pMixBackendSettings {
// A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC) // A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC)
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey, pub node_key: ed25519::SecretKey,
pub membership: Vec<Multiaddr>,
pub peering_degree: usize, pub peering_degree: usize,
} }
@ -44,12 +44,16 @@ const CHANNEL_SIZE: usize = 64;
impl MixBackend for Libp2pMixBackend { impl MixBackend for Libp2pMixBackend {
type Settings = Libp2pMixBackendSettings; type Settings = Libp2pMixBackendSettings;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { fn new<R: Rng>(
config: Self::Settings,
overwatch_handle: OverwatchHandle,
membership: Membership<MockMixMessage>,
mut rng: R,
) -> Self {
let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE); let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE);
let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE); let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE);
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone())); let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let local_peer_id = keypair.public().to_peer_id();
let mut swarm = MixSwarm::new( let mut swarm = MixSwarm::new(
keypair, keypair,
swarm_message_receiver, swarm_message_receiver,
@ -63,20 +67,12 @@ impl MixBackend for Libp2pMixBackend {
}); });
// Randomly select peering_degree number of peers, and dial to them // Randomly select peering_degree number of peers, and dial to them
// TODO: Consider moving the peer seelction to the nomos_mix_network::Behaviour membership
config .choose_remote_nodes(&mut rng, config.peering_degree)
.membership
.iter() .iter()
.filter(|addr| match extract_peer_id(addr) { .for_each(|node| {
Some(peer_id) => peer_id != local_peer_id, if let Err(e) = swarm.dial(node.address.clone()) {
None => false, tracing::error!("failed to dial to {:?}: {:?}", node.address, e);
})
.choose_multiple(&mut rand::thread_rng(), config.peering_degree)
.iter()
.cloned()
.for_each(|addr| {
if let Err(e) = swarm.dial(addr.clone()) {
tracing::error!("failed to dial to {:?}: {:?}", addr, e);
} }
}); });
@ -110,7 +106,7 @@ impl MixBackend for Libp2pMixBackend {
} }
struct MixSwarm { struct MixSwarm {
swarm: Swarm<nomos_mix_network::Behaviour>, swarm: Swarm<nomos_mix_network::Behaviour<MockMixMessage>>,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>, swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>, incoming_message_sender: broadcast::Sender<Vec<u8>>,
} }
@ -193,13 +189,3 @@ impl MixSwarm {
} }
} }
} }
fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
multiaddr.iter().find_map(|protocol| {
if let Protocol::P2p(peer_id) = protocol {
Some(peer_id)
} else {
None
}
})
}

View File

@ -4,14 +4,24 @@ pub mod libp2p;
use std::{fmt::Debug, pin::Pin}; use std::{fmt::Debug, pin::Pin};
use futures::Stream; use futures::Stream;
use nomos_mix::membership::Membership;
use nomos_mix_message::mock::MockMixMessage;
use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::Rng;
/// A trait for mix backends that send messages to the mix network. /// A trait for mix backends that send messages to the mix network.
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait MixBackend { pub trait MixBackend {
type Settings: Clone + Debug + Send + Sync + 'static; type Settings: Clone + Debug + Send + Sync + 'static;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self; fn new<R>(
config: Self::Settings,
overwatch_handle: OverwatchHandle,
membership: Membership<MockMixMessage>,
rng: R,
) -> Self
where
R: Rng;
/// Publish a message to the mix network. /// Publish a message to the mix network.
async fn publish(&self, msg: Vec<u8>); async fn publish(&self, msg: Vec<u8>);
/// Listen to messages received from the mix network. /// Listen to messages received from the mix network.

View File

@ -6,12 +6,14 @@ use backends::MixBackend;
use futures::StreamExt; use futures::StreamExt;
use network::NetworkAdapter; use network::NetworkAdapter;
use nomos_core::wire; use nomos_core::wire;
use nomos_mix::membership::{Membership, Node};
use nomos_mix::message_blend::crypto::CryptographicProcessor; use nomos_mix::message_blend::crypto::CryptographicProcessor;
use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings}; use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings};
use nomos_mix::persistent_transmission::{ use nomos_mix::persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
}; };
use nomos_mix::MixOutgoingMessage; use nomos_mix::MixOutgoingMessage;
use nomos_mix_message::mock::MockMixMessage;
use nomos_network::NetworkService; use nomos_network::NetworkService;
use overwatch_rs::services::{ use overwatch_rs::services::{
handle::ServiceStateHandle, handle::ServiceStateHandle,
@ -42,6 +44,7 @@ where
backend: Backend, backend: Backend,
service_state: ServiceStateHandle<Self>, service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<Network::Backend>>, network_relay: Relay<NetworkService<Network::Backend>>,
membership: Membership<MockMixMessage>,
} }
impl<Backend, Network> ServiceData for MixService<Backend, Network> impl<Backend, Network> ServiceData for MixService<Backend, Network>
@ -69,13 +72,17 @@ where
{ {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay(); let network_relay = service_state.overwatch_handle.relay();
let mix_config = service_state.settings_reader.get_updated_settings();
Ok(Self { Ok(Self {
backend: <Backend as MixBackend>::new( backend: <Backend as MixBackend>::new(
service_state.settings_reader.get_updated_settings().backend, service_state.settings_reader.get_updated_settings().backend,
service_state.overwatch_handle.clone(), service_state.overwatch_handle.clone(),
mix_config.membership(),
ChaCha12Rng::from_entropy(),
), ),
service_state, service_state,
network_relay, network_relay,
membership: mix_config.membership(),
}) })
} }
@ -84,25 +91,35 @@ where
service_state, service_state,
mut backend, mut backend,
network_relay, network_relay,
membership,
} = self; } = self;
let mix_config = service_state.settings_reader.get_updated_settings(); let mix_config = service_state.settings_reader.get_updated_settings();
let cryptographic_processor = let mut cryptographic_processor = CryptographicProcessor::new(
CryptographicProcessor::new(mix_config.message_blend.cryptographic_processor); mix_config.message_blend.cryptographic_processor.clone(),
membership.clone(),
ChaCha12Rng::from_entropy(),
);
let network_relay = network_relay.connect().await?; let network_relay = network_relay.connect().await?;
let network_adapter = Network::new(network_relay); let network_adapter = Network::new(network_relay);
// tier 1 persistent transmission // tier 1 persistent transmission
let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel(); let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel();
let mut persistent_transmission_messages = let mut persistent_transmission_messages: PersistentTransmissionStream<
UnboundedReceiverStream::new(persistent_receiver).persistent_transmission( _,
mix_config.persistent_transmission, _,
ChaCha12Rng::from_entropy(), MockMixMessage,
); > = UnboundedReceiverStream::new(persistent_receiver).persistent_transmission(
mix_config.persistent_transmission,
ChaCha12Rng::from_entropy(),
);
// tier 2 blend // tier 2 blend
let mut blend_messages = backend let mut blend_messages = backend.listen_to_incoming_messages().blend(
.listen_to_incoming_messages() mix_config.message_blend,
.blend(mix_config.message_blend, ChaCha12Rng::from_entropy()); membership.clone(),
ChaCha12Rng::from_entropy(),
ChaCha12Rng::from_entropy(),
);
// local messages, are bypassed and send immediately // local messages, are bypassed and send immediately
let mut local_messages = service_state let mut local_messages = service_state
@ -190,8 +207,21 @@ where
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixConfig<BackendSettings> { pub struct MixConfig<BackendSettings> {
pub backend: BackendSettings, pub backend: BackendSettings,
pub message_blend: MessageBlendSettings, pub message_blend: MessageBlendSettings<MockMixMessage>,
pub persistent_transmission: PersistentTransmissionSettings, pub persistent_transmission: PersistentTransmissionSettings,
pub membership: Vec<
Node<<nomos_mix_message::mock::MockMixMessage as nomos_mix_message::MixMessage>::PublicKey>,
>,
}
impl<BackendSettings> MixConfig<BackendSettings> {
fn membership(&self) -> Membership<MockMixMessage> {
let public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(
self.message_blend.cryptographic_processor.private_key,
))
.to_bytes();
Membership::new(self.membership.clone(), public_key)
}
} }
/// A message that is handled by [`MixService`]. /// A message that is handled by [`MixService`].

View File

@ -9,6 +9,8 @@ clap = { version = "4", features = ["derive"] }
nomos-executor = { path = "../../nodes/nomos-executor" } nomos-executor = { path = "../../nodes/nomos-executor" }
nomos-libp2p = { path = "../../nomos-libp2p" } nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-node = { path = "../../nodes/nomos-node" } nomos-node = { path = "../../nodes/nomos-node" }
nomos-mix = { path = "../../nomos-mix/core" }
nomos-mix-message = { path = "../../nomos-mix/message" }
nomos-tracing = { path = "../../nomos-tracing" } nomos-tracing = { path = "../../nomos-tracing" }
nomos-tracing-service = { path = "../../nomos-services/tracing" } nomos-tracing-service = { path = "../../nomos-services/tracing" }
rand = "0.8" rand = "0.8"

View File

@ -1,7 +1,9 @@
// std // std
use std::{collections::HashMap, net::Ipv4Addr, str::FromStr}; use std::{collections::HashMap, net::Ipv4Addr, str::FromStr};
// crates // crates
use nomos_libp2p::{Multiaddr, PeerId, Protocol}; use nomos_libp2p::{Multiaddr, PeerId};
use nomos_mix::membership::Node;
use nomos_mix_message::{mock::MockMixMessage, MixMessage};
use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig}; use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig};
use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingSettings}; use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingSettings};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
@ -91,7 +93,7 @@ pub fn create_node_configs(
let host_network_init_peers = update_network_init_peers(hosts.clone()); let host_network_init_peers = update_network_init_peers(hosts.clone());
let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses); let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses);
let host_mix_membership = let host_mix_membership =
update_mix_membership(hosts.clone(), mix_configs[0].backend.membership.clone()); update_mix_membership(hosts.clone(), mix_configs[0].membership.clone());
let new_peer_addresses: HashMap<PeerId, Multiaddr> = host_da_peer_addresses let new_peer_addresses: HashMap<PeerId, Multiaddr> = host_da_peer_addresses
.clone() .clone()
@ -122,7 +124,7 @@ pub fn create_node_configs(
let mut mix_config = mix_configs[i].to_owned(); let mut mix_config = mix_configs[i].to_owned();
mix_config.backend.listening_address = mix_config.backend.listening_address =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap(); Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap();
mix_config.backend.membership = host_mix_membership.clone(); mix_config.membership = host_mix_membership.clone();
// Tracing config. // Tracing config.
let tracing_config = let tracing_config =
@ -170,32 +172,22 @@ fn update_da_peer_addresses(
.collect() .collect()
} }
fn update_mix_membership(hosts: Vec<Host>, membership: Vec<Multiaddr>) -> Vec<Multiaddr> { fn update_mix_membership(
hosts: Vec<Host>,
membership: Vec<Node<<MockMixMessage as MixMessage>::PublicKey>>,
) -> Vec<Node<<MockMixMessage as MixMessage>::PublicKey>> {
membership membership
.into_iter() .into_iter()
.zip(hosts) .zip(hosts)
.map(|(addr, host)| { .map(|(mut node, host)| {
Multiaddr::from_str(&format!( node.address =
"/ip4/{}/udp/{}/quic-v1/p2p/{}", Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.mix_port))
host.ip, .unwrap();
host.mix_port, node
extract_peer_id(&addr).unwrap(),
))
.unwrap()
}) })
.collect() .collect()
} }
fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
multiaddr.iter().find_map(|protocol| {
if let Protocol::P2p(peer_id) = protocol {
Some(peer_id)
} else {
None
}
})
}
fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> GeneralTracingConfig { fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> GeneralTracingConfig {
GeneralTracingConfig { GeneralTracingConfig {
tracing_settings: TracingSettings { tracing_settings: TracingSettings {

View File

@ -12,6 +12,7 @@ nomos-executor = { path = "../nodes/nomos-executor", default-features = false }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] } nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] }
nomos-mix = { path = "../nomos-mix/core" } nomos-mix = { path = "../nomos-mix/core" }
nomos-mix-message = { path = "../nomos-mix/message" }
cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" }
nomos-tracing = { path = "../nomos-tracing" } nomos-tracing = { path = "../nomos-tracing" }
nomos-tracing-service = { path = "../nomos-services/tracing" } nomos-tracing-service = { path = "../nomos-services/tracing" }
@ -50,6 +51,7 @@ criterion = { version = "0.5", features = ["async_tokio"] }
nomos-cli = { path = "../nomos-cli" } nomos-cli = { path = "../nomos-cli" }
time = "0.3" time = "0.3"
tracing = "0.1" tracing = "0.1"
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
[[test]] [[test]]
name = "test_cryptarchia_happy_path" name = "test_cryptarchia_happy_path"

View File

@ -158,11 +158,15 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
backend: config.mix_config.backend, backend: config.mix_config.backend,
persistent_transmission: Default::default(), persistent_transmission: Default::default(),
message_blend: MessageBlendSettings { message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, cryptographic_processor: CryptographicProcessorSettings {
private_key: config.mix_config.private_key.to_bytes(),
num_mix_layers: 1,
},
temporal_processor: TemporalProcessorSettings { temporal_processor: TemporalProcessorSettings {
max_delay_seconds: 2, max_delay_seconds: 2,
}, },
}, },
membership: config.mix_config.membership,
}, },
cryptarchia: CryptarchiaSettings { cryptarchia: CryptarchiaSettings {
leader_config: config.consensus_config.leader_config, leader_config: config.consensus_config.leader_config,

View File

@ -243,13 +243,16 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
mix: nomos_mix_service::MixConfig { mix: nomos_mix_service::MixConfig {
backend: config.mix_config.backend, backend: config.mix_config.backend,
persistent_transmission: Default::default(), persistent_transmission: Default::default(),
message_blend: MessageBlendSettings { message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, cryptographic_processor: CryptographicProcessorSettings {
private_key: config.mix_config.private_key.to_bytes(),
num_mix_layers: 1,
},
temporal_processor: TemporalProcessorSettings { temporal_processor: TemporalProcessorSettings {
max_delay_seconds: 2, max_delay_seconds: 2,
}, },
}, },
membership: config.mix_config.membership,
}, },
cryptarchia: CryptarchiaSettings { cryptarchia: CryptarchiaSettings {
leader_config: config.consensus_config.leader_config, leader_config: config.consensus_config.leader_config,

View File

@ -1,13 +1,17 @@
use std::str::FromStr; use std::str::FromStr;
use nomos_libp2p::{ed25519, Multiaddr}; use nomos_libp2p::{ed25519, Multiaddr};
use nomos_mix::membership::Node;
use nomos_mix_message::{mock::MockMixMessage, MixMessage};
use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings; use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings;
use crate::{get_available_port, secret_key_to_peer_id}; use crate::get_available_port;
#[derive(Clone)] #[derive(Clone)]
pub struct GeneralMixConfig { pub struct GeneralMixConfig {
pub backend: Libp2pMixBackendSettings, pub backend: Libp2pMixBackendSettings,
pub private_key: x25519_dalek::StaticSecret,
pub membership: Vec<Node<<MockMixMessage as MixMessage>::PublicKey>>,
} }
pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> { pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
@ -26,33 +30,28 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
)) ))
.unwrap(), .unwrap(),
node_key, node_key,
membership: Vec::new(),
peering_degree: 1, peering_degree: 1,
}, },
private_key: x25519_dalek::StaticSecret::random(),
membership: Vec::new(),
} }
}) })
.collect(); .collect();
let membership = mix_membership(&configs); let nodes = mix_nodes(&configs);
configs.iter_mut().for_each(|config| { configs.iter_mut().for_each(|config| {
config.backend.membership = membership.clone(); config.membership = nodes.clone();
}); });
configs configs
} }
fn mix_membership(configs: &[GeneralMixConfig]) -> Vec<Multiaddr> { fn mix_nodes(configs: &[GeneralMixConfig]) -> Vec<Node<<MockMixMessage as MixMessage>::PublicKey>> {
configs configs
.iter() .iter()
.map(|config| { .map(|config| Node {
let peer_id = secret_key_to_peer_id(config.backend.node_key.clone()); address: config.backend.listening_address.clone(),
config public_key: x25519_dalek::PublicKey::from(&config.private_key).to_bytes(),
.backend
.listening_address
.clone()
.with_p2p(peer_id)
.unwrap_or_else(|orig_addr| orig_addr)
}) })
.collect() .collect()
} }