diff --git a/nomos-mix/core/src/message_blend/crypto.rs b/nomos-mix/core/src/message_blend/crypto.rs index 62a8a865..b214a39c 100644 --- a/nomos-mix/core/src/message_blend/crypto.rs +++ b/nomos-mix/core/src/message_blend/crypto.rs @@ -12,6 +12,7 @@ where settings: CryptographicProcessorSettings, membership: Membership, rng: R, + mix_message: M, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -28,18 +29,20 @@ where { pub fn new( settings: CryptographicProcessorSettings, + mix_message_settings: M::Settings, membership: Membership, rng: R, ) -> Self { + let mix_message = M::new(mix_message_settings); Self { settings, membership, rng, + mix_message, } } pub fn wrap_message(&mut self, message: &[u8]) -> Result, M::Error> { - // TODO: Use the actual Sphinx encoding instead of mock. let public_keys = self .membership .choose_remote_nodes(&mut self.rng, self.settings.num_mix_layers) @@ -47,10 +50,11 @@ where .map(|node| node.public_key.clone()) .collect::>(); - M::build_message(message, &public_keys) + self.mix_message.build_message(message, &public_keys) } pub fn unwrap_message(&self, message: &[u8]) -> Result<(Vec, bool), M::Error> { - M::unwrap_message(message, &self.settings.private_key) + self.mix_message + .unwrap_message(message, &self.settings.private_key) } } diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-mix/core/src/message_blend/mod.rs index 2cc988b6..140f23ae 100644 --- a/nomos-mix/core/src/message_blend/mod.rs +++ b/nomos-mix/core/src/message_blend/mod.rs @@ -59,12 +59,14 @@ where pub fn new( input_stream: S, settings: MessageBlendSettings, + mix_message_settings: M::Settings, membership: Membership, scheduler: Scheduler, cryptographic_processor_rng: Rng, ) -> Self { let cryptographic_processor = CryptographicProcessor::new( settings.cryptographic_processor, + mix_message_settings, membership, cryptographic_processor_rng, ); @@ -132,6 +134,7 @@ where fn blend( self, message_blend_settings: MessageBlendSettings, + mix_message_settings: M::Settings, membership: Membership, scheduler: Scheduler, cryptographic_processor_rng: Rng, @@ -142,6 +145,7 @@ where MessageBlendStream::new( self, message_blend_settings, + mix_message_settings, membership, scheduler, cryptographic_processor_rng, diff --git a/nomos-mix/core/src/persistent_transmission.rs b/nomos-mix/core/src/persistent_transmission.rs index 72531a49..1e8e3292 100644 --- a/nomos-mix/core/src/persistent_transmission.rs +++ b/nomos-mix/core/src/persistent_transmission.rs @@ -1,9 +1,6 @@ use futures::{Stream, StreamExt}; -use nomos_mix_message::MixMessage; use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore}; -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use std::marker::PhantomData; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; @@ -25,7 +22,7 @@ impl Default for PersistentTransmissionSettings { } /// Transmit scheduled messages with a persistent rate as a stream. -pub struct PersistentTransmissionStream +pub struct PersistentTransmissionStream where S: Stream, Rng: RngCore, @@ -33,14 +30,13 @@ where coin: Coin, stream: S, scheduler: Scheduler, - _mix_message: PhantomData, + drop_message: Vec, } -impl PersistentTransmissionStream +impl PersistentTransmissionStream where S: Stream, Rng: RngCore, - M: MixMessage, Scheduler: Stream, { pub fn new( @@ -48,22 +44,22 @@ where stream: S, scheduler: Scheduler, rng: Rng, - ) -> PersistentTransmissionStream { + drop_message: Vec, + ) -> PersistentTransmissionStream { let coin = Coin::::new(rng, settings.drop_message_probability).unwrap(); Self { coin, stream, scheduler, - _mix_message: Default::default(), + drop_message, } } } -impl Stream for PersistentTransmissionStream +impl Stream for PersistentTransmissionStream where S: Stream> + Unpin, Rng: RngCore + Unpin, - M: MixMessage + Unpin, Scheduler: Stream + Unpin, { type Item = Vec; @@ -73,6 +69,7 @@ where ref mut scheduler, ref mut stream, ref mut coin, + ref drop_message, .. } = self.get_mut(); if pin!(scheduler).poll_next_unpin(cx).is_pending() { @@ -81,17 +78,16 @@ where if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) { Poll::Ready(Some(item)) } else if coin.flip() { - Poll::Ready(Some(M::DROP_MESSAGE.to_vec())) + Poll::Ready(Some(drop_message.clone())) } else { Poll::Pending } } } -pub trait PersistentTransmissionExt: Stream +pub trait PersistentTransmissionExt: Stream where Rng: RngCore, - M: MixMessage, Scheduler: Stream, { fn persistent_transmission( @@ -99,20 +95,19 @@ where settings: PersistentTransmissionSettings, rng: Rng, scheduler: Scheduler, - ) -> PersistentTransmissionStream + drop_message: Vec, + ) -> PersistentTransmissionStream where Self: Sized + Unpin, { - PersistentTransmissionStream::new(settings, self, scheduler, rng) + PersistentTransmissionStream::new(settings, self, scheduler, rng, drop_message) } } -impl PersistentTransmissionExt for S +impl PersistentTransmissionExt for S where S: Stream, Rng: RngCore, - M: MixMessage, - M::PublicKey: Clone + Serialize + DeserializeOwned, Scheduler: Stream, { } @@ -150,7 +145,6 @@ enum CoinError { mod tests { use super::*; use futures::StreamExt; - use nomos_mix_message::mock::MockMixMessage; use rand::SeedableRng; use rand_chacha::ChaCha8Rng; use std::time::Duration; @@ -196,16 +190,14 @@ mod tests { let lower_bound = expected_emission_interval - torelance; let upper_bound = expected_emission_interval + torelance; // prepare stream - let mut persistent_transmission_stream: PersistentTransmissionStream< - _, - _, - MockMixMessage, - _, - > = stream.persistent_transmission( - settings, - ChaCha8Rng::from_entropy(), - IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()), - ); + let drop_message = vec![0u8; 10]; + let mut persistent_transmission_stream: PersistentTransmissionStream<_, _, _> = stream + .persistent_transmission( + settings, + ChaCha8Rng::from_entropy(), + IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()), + drop_message.clone(), + ); // Messages must be scheduled in non-blocking manner. schedule_sender.send(vec![1]).unwrap(); schedule_sender.send(vec![2]).unwrap(); @@ -230,14 +222,16 @@ mod tests { ); assert_interval!(&mut last_time, lower_bound, upper_bound); - assert!(MockMixMessage::is_drop_message( - &persistent_transmission_stream.next().await.unwrap() - )); + assert_eq!( + &persistent_transmission_stream.next().await.unwrap(), + &drop_message + ); assert_interval!(&mut last_time, lower_bound, upper_bound); - assert!(MockMixMessage::is_drop_message( - &persistent_transmission_stream.next().await.unwrap() - )); + assert_eq!( + &persistent_transmission_stream.next().await.unwrap(), + &drop_message + ); assert_interval!(&mut last_time, lower_bound, upper_bound); // Schedule a new message and check if it is emitted at the next interval diff --git a/nomos-mix/message/Cargo.toml b/nomos-mix/message/Cargo.toml index 3a4c33c1..e6078081 100644 --- a/nomos-mix/message/Cargo.toml +++ b/nomos-mix/message/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] itertools = "0.13" rand_chacha = "0.3" +serde = { version = "1.0.215", features = ["derive"] } sha2 = "0.10" sphinx-packet = "0.2" thiserror = "1.0.65" diff --git a/nomos-mix/message/src/lib.rs b/nomos-mix/message/src/lib.rs index cc8e4c27..b6acd2c2 100644 --- a/nomos-mix/message/src/lib.rs +++ b/nomos-mix/message/src/lib.rs @@ -4,10 +4,13 @@ pub mod sphinx; pub trait MixMessage { type PublicKey; type PrivateKey; + type Settings; type Error; - const DROP_MESSAGE: &'static [u8]; + + fn new(settings: Self::Settings) -> Self; fn build_message( + &self, payload: &[u8], public_keys: &[Self::PublicKey], ) -> Result, Self::Error>; @@ -19,10 +22,14 @@ pub trait MixMessage { /// If the input message was already fully unwrapped, or if its format is invalid, /// this function returns `[Error::InvalidMixMessage]`. fn unwrap_message( + &self, message: &[u8], private_key: &Self::PrivateKey, ) -> Result<(Vec, bool), Self::Error>; - fn is_drop_message(message: &[u8]) -> bool { - message == Self::DROP_MESSAGE + + fn drop_message(&self) -> &[u8]; + + fn is_drop_message(&self, message: &[u8]) -> bool { + message == self.drop_message() } } diff --git a/nomos-mix/message/src/mock/mod.rs b/nomos-mix/message/src/mock/mod.rs index fcb8cf6f..8797ad36 100644 --- a/nomos-mix/message/src/mock/mod.rs +++ b/nomos-mix/message/src/mock/mod.rs @@ -1,68 +1,99 @@ pub mod error; use error::Error; +use serde::{Deserialize, Serialize}; use crate::MixMessage; -// TODO: Remove all the mock below once the actual implementation is integrated to the system. -// + /// A mock implementation of the Sphinx encoding. const NODE_ID_SIZE: usize = 32; - -// TODO: Move MAX_PAYLOAD_SIZE and MAX_LAYERS to the upper layer (service layer). -const MAX_PAYLOAD_SIZE: usize = 2048; const PAYLOAD_PADDING_SEPARATOR: u8 = 0x01; -const PAYLOAD_PADDING_SEPARATOR_SIZE: usize = 1; -const MAX_LAYERS: usize = 5; -pub const MESSAGE_SIZE: usize = - NODE_ID_SIZE * MAX_LAYERS + MAX_PAYLOAD_SIZE + PAYLOAD_PADDING_SEPARATOR_SIZE; +const PAYLOAD_PADDING_SEPARATOR_SIZE: usize = std::mem::size_of::(); #[derive(Clone, Debug)] -pub struct MockMixMessage; +pub struct MockMixMessage { + settings: MockMixMessageSettings, + drop_message: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MockMixMessageSettings { + max_layers: usize, + max_payload_size: usize, +} + +impl MockMixMessageSettings { + const fn message_size(&self) -> usize { + NODE_ID_SIZE * self.max_layers + self.max_payload_size + PAYLOAD_PADDING_SEPARATOR_SIZE + } +} + +impl MockMixMessage { + const fn size(&self) -> usize { + self.settings.message_size() + } +} impl MixMessage for MockMixMessage { type PublicKey = [u8; NODE_ID_SIZE]; type PrivateKey = [u8; NODE_ID_SIZE]; + type Settings = MockMixMessageSettings; type Error = Error; - const DROP_MESSAGE: &'static [u8] = &[0; MESSAGE_SIZE]; + + fn new(settings: Self::Settings) -> Self { + let drop_message = vec![0; settings.message_size()]; + Self { + settings, + drop_message, + } + } /// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes. /// The [`MAX_LAYERS`] number of [`NodeId`]s are concatenated in front of the payload. /// The payload is zero-padded to the end. - /// fn build_message( + &self, payload: &[u8], public_keys: &[Self::PublicKey], ) -> Result, Self::Error> { + let MockMixMessageSettings { + max_layers, + max_payload_size, + } = self.settings; + // In this mock, we don't encrypt anything. So, we use public key as just a node ID. let node_ids = public_keys; - if node_ids.is_empty() || node_ids.len() > MAX_LAYERS { + if node_ids.is_empty() || node_ids.len() > max_layers { return Err(Error::InvalidNumberOfLayers); } - if payload.len() > MAX_PAYLOAD_SIZE { + if payload.len() > max_payload_size { return Err(Error::PayloadTooLarge); } - let mut message: Vec = Vec::with_capacity(MESSAGE_SIZE); + let mut message: Vec = Vec::with_capacity(self.size()); node_ids.iter().for_each(|node_id| { message.extend(node_id); }); // If there is any remaining layers, fill them with zeros. - (0..MAX_LAYERS - node_ids.len()).for_each(|_| message.extend(&[0; NODE_ID_SIZE])); + (0..max_layers - node_ids.len()).for_each(|_| message.extend(&[0; NODE_ID_SIZE])); // Append payload with padding message.extend(payload); message.push(PAYLOAD_PADDING_SEPARATOR); - message.extend(std::iter::repeat(0).take(MAX_PAYLOAD_SIZE - payload.len())); + message.extend(std::iter::repeat(0).take(max_payload_size - payload.len())); Ok(message) } fn unwrap_message( + &self, message: &[u8], private_key: &Self::PrivateKey, ) -> Result<(Vec, bool), Self::Error> { - if message.len() != MESSAGE_SIZE { + let MockMixMessageSettings { max_layers, .. } = self.settings; + + if message.len() != self.size() { return Err(Error::InvalidMixMessage); } @@ -74,7 +105,7 @@ impl MixMessage for MockMixMessage { // If this is the last layer if message[NODE_ID_SIZE..NODE_ID_SIZE * 2] == [0; NODE_ID_SIZE] { - let padded_payload = &message[NODE_ID_SIZE * MAX_LAYERS..]; + let padded_payload = &message[NODE_ID_SIZE * max_layers..]; // remove the payload padding match padded_payload .iter() @@ -87,12 +118,16 @@ impl MixMessage for MockMixMessage { } } - let mut new_message: Vec = Vec::with_capacity(MESSAGE_SIZE); - new_message.extend(&message[NODE_ID_SIZE..NODE_ID_SIZE * MAX_LAYERS]); + let mut new_message: Vec = Vec::with_capacity(self.size()); + new_message.extend(&message[NODE_ID_SIZE..NODE_ID_SIZE * max_layers]); new_message.extend(&[0; NODE_ID_SIZE]); - new_message.extend(&message[NODE_ID_SIZE * MAX_LAYERS..]); // padded payload + new_message.extend(&message[NODE_ID_SIZE * max_layers..]); // padded payload Ok((new_message, false)) } + + fn drop_message(&self) -> &[u8] { + &self.drop_message + } } #[cfg(test)] @@ -101,23 +136,28 @@ mod tests { #[test] fn message() { + let mock_message = MockMixMessage::new(MockMixMessageSettings { + max_layers: 5, + max_payload_size: 100, + }); + let node_ids = (0..3).map(|i| [i; NODE_ID_SIZE]).collect::>(); let payload = [7; 10]; - let message = MockMixMessage::build_message(&payload, &node_ids).unwrap(); - assert_eq!(message.len(), MESSAGE_SIZE); + let message = mock_message.build_message(&payload, &node_ids).unwrap(); + assert_eq!(message.len(), mock_message.size()); let (message, is_fully_unwrapped) = - MockMixMessage::unwrap_message(&message, &node_ids[0]).unwrap(); + mock_message.unwrap_message(&message, &node_ids[0]).unwrap(); assert!(!is_fully_unwrapped); - assert_eq!(message.len(), MESSAGE_SIZE); + assert_eq!(message.len(), mock_message.size()); let (message, is_fully_unwrapped) = - MockMixMessage::unwrap_message(&message, &node_ids[1]).unwrap(); + mock_message.unwrap_message(&message, &node_ids[1]).unwrap(); assert!(!is_fully_unwrapped); - assert_eq!(message.len(), MESSAGE_SIZE); + assert_eq!(message.len(), mock_message.size()); let (unwrapped_payload, is_fully_unwrapped) = - MockMixMessage::unwrap_message(&message, &node_ids[2]).unwrap(); + mock_message.unwrap_message(&message, &node_ids[2]).unwrap(); assert!(is_fully_unwrapped); assert_eq!(unwrapped_payload, payload); } diff --git a/nomos-mix/message/src/sphinx/mod.rs b/nomos-mix/message/src/sphinx/mod.rs index c7d62fb5..82b97517 100644 --- a/nomos-mix/message/src/sphinx/mod.rs +++ b/nomos-mix/message/src/sphinx/mod.rs @@ -1,5 +1,6 @@ use error::Error; use packet::{Packet, UnpackedPacket}; +use serde::{Deserialize, Serialize}; use crate::MixMessage; @@ -8,21 +9,35 @@ mod layered_cipher; pub mod packet; mod routing; -pub struct SphinxMessage; +pub struct SphinxMessage { + settings: SphinxMessageSettings, + drop_message: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SphinxMessageSettings { + max_layers: usize, + max_payload_size: usize, +} const ASYM_KEY_SIZE: usize = 32; -// TODO: Move these constants to the upper layer (service layer). -const MAX_PAYLOAD_SIZE: usize = 2048; -const MAX_LAYERS: usize = 5; impl MixMessage for SphinxMessage { type PublicKey = [u8; ASYM_KEY_SIZE]; type PrivateKey = [u8; ASYM_KEY_SIZE]; + type Settings = SphinxMessageSettings; type Error = Error; - const DROP_MESSAGE: &'static [u8] = &[0; Packet::size(MAX_LAYERS, MAX_PAYLOAD_SIZE)]; + fn new(settings: Self::Settings) -> Self { + let drop_message = vec![0; Packet::size(settings.max_layers, settings.max_payload_size)]; + Self { + settings, + drop_message, + } + } fn build_message( + &self, payload: &[u8], public_keys: &[Self::PublicKey], ) -> Result, Self::Error> { @@ -31,25 +46,32 @@ impl MixMessage for SphinxMessage { .iter() .map(|k| x25519_dalek::PublicKey::from(*k)) .collect::>(), - MAX_LAYERS, + self.settings.max_layers, payload, - MAX_PAYLOAD_SIZE, + self.settings.max_payload_size, )?; Ok(packet.to_bytes()) } fn unwrap_message( + &self, message: &[u8], private_key: &Self::PrivateKey, ) -> Result<(Vec, bool), Self::Error> { - let packet = Packet::from_bytes(message, MAX_LAYERS)?; - let unpacked_packet = - packet.unpack(&x25519_dalek::StaticSecret::from(*private_key), MAX_LAYERS)?; + let packet = Packet::from_bytes(message, self.settings.max_layers)?; + let unpacked_packet = packet.unpack( + &x25519_dalek::StaticSecret::from(*private_key), + self.settings.max_layers, + )?; match unpacked_packet { UnpackedPacket::ToForward(packet) => Ok((packet.to_bytes(), false)), UnpackedPacket::FullyUnpacked(payload) => Ok((payload, true)), } } + + fn drop_message(&self) -> &[u8] { + &self.drop_message + } } fn parse_bytes<'a>(data: &'a [u8], sizes: &[usize]) -> Result, String> {