diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-mix/core/src/message_blend/mod.rs index 568c9da8..18d488bd 100644 --- a/nomos-mix/core/src/message_blend/mod.rs +++ b/nomos-mix/core/src/message_blend/mod.rs @@ -8,7 +8,7 @@ use rand::RngCore; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -pub use temporal::TemporalProcessorSettings; +pub use temporal::TemporalSchedulerSettings; use crate::membership::Membership; use crate::message_blend::crypto::CryptographicProcessor; @@ -28,13 +28,13 @@ where M::PrivateKey: Serialize + DeserializeOwned, { pub cryptographic_processor: CryptographicProcessorSettings, - pub temporal_processor: TemporalProcessorSettings, + pub temporal_processor: TemporalSchedulerSettings, } /// [`MessageBlendStream`] handles the entire mixing tiers process /// - Unwraps incoming messages received from network using [`CryptographicProcessor`] /// - Pushes unwrapped messages to [`TemporalProcessor`] -pub struct MessageBlendStream +pub struct MessageBlendStream where M: MixMessage, { @@ -43,22 +43,24 @@ where temporal_sender: UnboundedSender, cryptographic_processor: CryptographicProcessor, _rng: PhantomData, + _scheduler: PhantomData, } -impl MessageBlendStream +impl MessageBlendStream where S: Stream>, Rng: RngCore + Unpin + Send + 'static, M: MixMessage, M::PrivateKey: Serialize + DeserializeOwned, M::PublicKey: Clone + PartialEq, + Scheduler: Stream + Unpin + Send + 'static, { pub fn new( input_stream: S, settings: MessageBlendSettings, membership: Membership, + scheduler: Scheduler, cryptographic_processor_rng: Rng, - temporal_processor_rng: Rng, ) -> Self { let cryptographic_processor = CryptographicProcessor::new( settings.cryptographic_processor, @@ -67,7 +69,7 @@ where ); let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel(); let output_stream = UnboundedReceiverStream::new(temporal_receiver) - .temporal_stream(settings.temporal_processor, temporal_processor_rng) + .temporal_stream(scheduler) .boxed(); Self { input_stream, @@ -75,6 +77,7 @@ where temporal_sender, cryptographic_processor, _rng: Default::default(), + _scheduler: Default::default(), } } @@ -100,13 +103,14 @@ where } } -impl Stream for MessageBlendStream +impl Stream for MessageBlendStream where S: Stream> + Unpin, Rng: RngCore + Unpin + Send + 'static, M: MixMessage + Unpin, M::PrivateKey: Serialize + DeserializeOwned + Unpin, M::PublicKey: Clone + PartialEq + Unpin, + Scheduler: Stream + Unpin + Send + 'static, { type Item = MixOutgoingMessage; @@ -118,20 +122,21 @@ where } } -pub trait MessageBlendExt: Stream> +pub trait MessageBlendExt: Stream> where Rng: RngCore + Send + Unpin + 'static, M: MixMessage, M::PrivateKey: Serialize + DeserializeOwned, M::PublicKey: Clone + PartialEq, + Scheduler: Stream + Unpin + Send + 'static, { fn blend( self, message_blend_settings: MessageBlendSettings, membership: Membership, + scheduler: Scheduler, cryptographic_processor_rng: Rng, - temporal_processor_rng: Rng, - ) -> MessageBlendStream + ) -> MessageBlendStream where Self: Sized + Unpin, { @@ -139,18 +144,19 @@ where self, message_blend_settings, membership, + scheduler, cryptographic_processor_rng, - temporal_processor_rng, ) } } -impl MessageBlendExt for T +impl MessageBlendExt for T where T: Stream>, Rng: RngCore + Unpin + Send + 'static, M: MixMessage, M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq, M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq, + S: Stream + Unpin + Send + 'static, { } diff --git a/nomos-mix/core/src/message_blend/temporal.rs b/nomos-mix/core/src/message_blend/temporal.rs index ed6d4529..f42bc97a 100644 --- a/nomos-mix/core/src/message_blend/temporal.rs +++ b/nomos-mix/core/src/message_blend/temporal.rs @@ -10,14 +10,8 @@ use rand::{Rng, RngCore}; use serde::{Deserialize, Serialize}; use tokio::time; -/// [`TemporalProcessor`] delays messages randomly to hide timing correlation -/// between incoming and outgoing messages from a node. -/// -/// See the [`Stream`] implementation below for more details on how it works. -pub(crate) struct TemporalProcessor { - settings: TemporalProcessorSettings, - // All scheduled messages - queue: VecDeque, +pub struct TemporalScheduler { + settings: TemporalSchedulerSettings, /// Interval in seconds for running the lottery to release a message lottery_interval: time::Interval, /// To wait a few seconds after running the lottery before releasing the message. @@ -27,17 +21,11 @@ pub(crate) struct TemporalProcessor { rng: Rng, } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub struct TemporalProcessorSettings { - pub max_delay_seconds: u64, -} - -impl TemporalProcessor { - pub(crate) fn new(settings: TemporalProcessorSettings, rng: Rng) -> Self { +impl TemporalScheduler { + pub fn new(settings: TemporalSchedulerSettings, rng: Rng) -> Self { let lottery_interval = Self::lottery_interval(settings.max_delay_seconds); Self { settings, - queue: VecDeque::new(), lottery_interval, release_timer: None, rng, @@ -58,12 +46,9 @@ impl TemporalProcessor { fn lottery_interval_seconds(max_delay_seconds: u64) -> u64 { max_delay_seconds / 2 } - /// Schedule a message to be released later. - pub(crate) fn push_message(&mut self, message: M) { - self.queue.push_back(message); - } } -impl TemporalProcessor + +impl TemporalScheduler where Rng: RngCore, { @@ -75,12 +60,11 @@ where } } -impl Stream for TemporalProcessor +impl Stream for TemporalScheduler where - M: Unpin, Rng: RngCore + Unpin, { - type Item = M; + type Item = (); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Check whether it's time to run a new lottery to determine the delay. @@ -94,33 +78,74 @@ where if let Some(timer) = self.release_timer.as_mut() { if timer.as_mut().poll(cx).is_ready() { self.release_timer.take(); // Reset timer after it's done - if let Some(msg) = self.queue.pop_front() { - // Release the 1st message in the queue if it exists. - return Poll::Ready(Some(msg)); - } + return Poll::Ready(Some(())); } } - Poll::Pending } } -pub struct TemporalStream -where - S: Stream, - Rng: RngCore, -{ - processor: TemporalProcessor, - wrapped_stream: S, +/// [`TemporalProcessor`] delays messages randomly to hide timing correlation +/// between incoming and outgoing messages from a node. +/// +/// See the [`Stream`] implementation below for more details on how it works. +pub struct TemporalProcessor { + // All scheduled messages + queue: VecDeque, + scheduler: S, } -impl Stream for TemporalStream +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct TemporalSchedulerSettings { + pub max_delay_seconds: u64, +} + +impl TemporalProcessor { + pub(crate) fn new(scheduler: S) -> Self { + Self { + queue: VecDeque::new(), + scheduler, + } + } + /// Schedule a message to be released later. + pub(crate) fn push_message(&mut self, message: M) { + self.queue.push_back(message); + } +} + +impl Stream for TemporalProcessor where - S: Stream + Unpin, - S::Item: Unpin, - Rng: RngCore + Unpin, + M: Unpin, + S: Stream + Unpin, { - type Item = S::Item; + type Item = M; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.scheduler.poll_next_unpin(cx).is_ready() { + if let Some(msg) = self.queue.pop_front() { + return Poll::Ready(Some(msg)); + } + }; + Poll::Pending + } +} + +pub struct TemporalStream +where + WrappedStream: Stream, + Scheduler: Stream, +{ + processor: TemporalProcessor, + wrapped_stream: WrappedStream, +} + +impl Stream for TemporalStream +where + WrappedStream: Stream + Unpin, + WrappedStream::Item: Unpin, + Scheduler: Stream + Unpin, +{ + type Item = WrappedStream::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Poll::Ready(Some(item)) = self.wrapped_stream.poll_next_unpin(cx) { @@ -129,28 +154,24 @@ where self.processor.poll_next_unpin(cx) } } -pub trait TemporalProcessorExt: Stream +pub trait TemporalProcessorExt: Stream where - Rng: RngCore, + Scheduler: Stream, { - fn temporal_stream( - self, - settings: TemporalProcessorSettings, - rng: Rng, - ) -> TemporalStream + fn temporal_stream(self, scheduler: Scheduler) -> TemporalStream where Self: Sized, { TemporalStream { - processor: TemporalProcessor::new(settings, rng), + processor: TemporalProcessor::::new(scheduler), wrapped_stream: self, } } } -impl TemporalProcessorExt for T +impl TemporalProcessorExt for T where T: Stream, - Rng: RngCore, + S: Stream, { } diff --git a/nomos-mix/core/src/persistent_transmission.rs b/nomos-mix/core/src/persistent_transmission.rs index 372bfa02..72531a49 100644 --- a/nomos-mix/core/src/persistent_transmission.rs +++ b/nomos-mix/core/src/persistent_transmission.rs @@ -1,4 +1,4 @@ -use futures::Stream; +use futures::{Stream, StreamExt}; use nomos_mix_message::MixMessage; use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore}; use serde::de::DeserializeOwned; @@ -6,9 +6,6 @@ use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; -use std::time::Duration; -use tokio::time; -use tokio::time::Interval; #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct PersistentTransmissionSettings { @@ -28,57 +25,57 @@ impl Default for PersistentTransmissionSettings { } /// Transmit scheduled messages with a persistent rate as a stream. -pub struct PersistentTransmissionStream +pub struct PersistentTransmissionStream where S: Stream, Rng: RngCore, { - interval: Interval, coin: Coin, stream: S, + scheduler: Scheduler, _mix_message: PhantomData, } -impl PersistentTransmissionStream +impl PersistentTransmissionStream where S: Stream, Rng: RngCore, M: MixMessage, + Scheduler: Stream, { pub fn new( settings: PersistentTransmissionSettings, stream: S, + scheduler: Scheduler, rng: Rng, - ) -> PersistentTransmissionStream { - let interval = time::interval(Duration::from_secs_f64( - 1.0 / settings.max_emission_frequency, - )); + ) -> PersistentTransmissionStream { let coin = Coin::::new(rng, settings.drop_message_probability).unwrap(); Self { - interval, coin, stream, + scheduler, _mix_message: Default::default(), } } } -impl Stream for PersistentTransmissionStream +impl Stream for PersistentTransmissionStream where S: Stream> + Unpin, Rng: RngCore + Unpin, M: MixMessage + Unpin, + Scheduler: Stream + Unpin, { type Item = Vec; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let Self { - ref mut interval, + ref mut scheduler, ref mut stream, ref mut coin, .. } = self.get_mut(); - if pin!(interval).poll_tick(cx).is_pending() { + if pin!(scheduler).poll_next_unpin(cx).is_pending() { return Poll::Pending; } if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) { @@ -91,29 +88,32 @@ where } } -pub trait PersistentTransmissionExt: Stream +pub trait PersistentTransmissionExt: Stream where Rng: RngCore, M: MixMessage, + Scheduler: Stream, { fn persistent_transmission( self, settings: PersistentTransmissionSettings, rng: Rng, - ) -> PersistentTransmissionStream + scheduler: Scheduler, + ) -> PersistentTransmissionStream where Self: Sized + Unpin, { - PersistentTransmissionStream::new(settings, self, rng) + PersistentTransmissionStream::new(settings, self, scheduler, rng) } } -impl PersistentTransmissionExt for S +impl PersistentTransmissionExt for S where S: Stream, Rng: RngCore, M: MixMessage, M::PublicKey: Clone + Serialize + DeserializeOwned, + Scheduler: Stream, { } @@ -153,7 +153,10 @@ mod tests { use nomos_mix_message::mock::MockMixMessage; use rand::SeedableRng; use rand_chacha::ChaCha8Rng; + use std::time::Duration; use tokio::sync::mpsc; + use tokio::time; + use tokio_stream::wrappers::IntervalStream; macro_rules! assert_interval { ($last_time:expr, $lower_bound:expr, $upper_bound:expr) => { @@ -193,8 +196,16 @@ mod tests { let lower_bound = expected_emission_interval - torelance; let upper_bound = expected_emission_interval + torelance; // prepare stream - let mut persistent_transmission_stream: PersistentTransmissionStream<_, _, MockMixMessage> = - stream.persistent_transmission(settings, ChaCha8Rng::from_entropy()); + let mut persistent_transmission_stream: PersistentTransmissionStream< + _, + _, + MockMixMessage, + _, + > = stream.persistent_transmission( + settings, + ChaCha8Rng::from_entropy(), + IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()), + ); // Messages must be scheduled in non-blocking manner. schedule_sender.send(vec![1]).unwrap(); schedule_sender.send(vec![2]).unwrap(); diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 29052433..90bd74c8 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -3,7 +3,7 @@ use cryptarchia_consensus::LeaderConfig; use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; use nomos_mix::membership::Node; use nomos_mix::message_blend::{ - CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, + CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; use nomos_mix_message::mock::MockMixMessage; use nomos_mix_message::MixMessage; @@ -226,7 +226,7 @@ pub fn new_node( private_key: mix_config.private_key.to_bytes(), num_mix_layers: 1, }, - temporal_processor: TemporalProcessorSettings { + temporal_processor: TemporalSchedulerSettings { max_delay_seconds: 2, }, }, diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs index ecb4b77f..4d4753f6 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/mix/src/lib.rs @@ -8,6 +8,7 @@ use network::NetworkAdapter; use nomos_core::wire; use nomos_mix::membership::{Membership, Node}; use nomos_mix::message_blend::crypto::CryptographicProcessor; +use nomos_mix::message_blend::temporal::TemporalScheduler; use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings}; use nomos_mix::persistent_transmission::{ PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, @@ -26,8 +27,10 @@ use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::fmt::Debug; +use std::time::Duration; use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio::time; +use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream}; /// A mix service that sends messages to the mix network /// and broadcasts fully unwrapped messages through the [`NetworkService`]. @@ -108,16 +111,25 @@ where _, _, MockMixMessage, + _, > = UnboundedReceiverStream::new(persistent_receiver).persistent_transmission( mix_config.persistent_transmission, ChaCha12Rng::from_entropy(), + IntervalStream::new(time::interval(Duration::from_secs_f64( + 1.0 / mix_config.persistent_transmission.max_emission_frequency, + ))) + .map(|_| ()), ); // tier 2 blend + let temporal_scheduler = TemporalScheduler::new( + mix_config.message_blend.temporal_processor, + ChaCha12Rng::from_entropy(), + ); let mut blend_messages = backend.listen_to_incoming_messages().blend( mix_config.message_blend, membership.clone(), - ChaCha12Rng::from_entropy(), + temporal_scheduler, ChaCha12Rng::from_entropy(), ); diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs index b6f1c66d..13ee4650 100644 --- a/tests/src/nodes/executor.rs +++ b/tests/src/nodes/executor.rs @@ -23,7 +23,7 @@ use nomos_da_verifier::DaVerifierServiceSettings; use nomos_executor::api::backend::AxumBackendSettings; use nomos_executor::config::Config; use nomos_mix::message_blend::{ - CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, + CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE}; @@ -162,7 +162,7 @@ pub fn create_executor_config(config: GeneralConfig) -> Config { private_key: config.mix_config.private_key.to_bytes(), num_mix_layers: 1, }, - temporal_processor: TemporalProcessorSettings { + temporal_processor: TemporalSchedulerSettings { max_delay_seconds: 2, }, }, diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs index 9e816196..c11dd115 100644 --- a/tests/src/nodes/validator.rs +++ b/tests/src/nodes/validator.rs @@ -15,7 +15,7 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as Verif use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings}; use nomos_mempool::MempoolMetrics; use nomos_mix::message_blend::{ - CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, + CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{ @@ -248,7 +248,7 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { private_key: config.mix_config.private_key.to_bytes(), num_mix_layers: 1, }, - temporal_processor: TemporalProcessorSettings { + temporal_processor: TemporalSchedulerSettings { max_delay_seconds: 2, }, },