diff --git a/nomos-mix/core/src/lib.rs b/nomos-mix/core/src/lib.rs index baf08ef6..0326319c 100644 --- a/nomos-mix/core/src/lib.rs +++ b/nomos-mix/core/src/lib.rs @@ -1,2 +1,16 @@ pub mod message_blend; pub mod persistent_transmission; + +pub enum MixOutgoingMessage { + FullyUnwrapped(Vec), + Outbound(Vec), +} + +impl From for Vec { + fn from(value: MixOutgoingMessage) -> Self { + match value { + MixOutgoingMessage::FullyUnwrapped(v) => v, + MixOutgoingMessage::Outbound(v) => v, + } + } +} diff --git a/nomos-mix/core/src/message_blend/crypto.rs b/nomos-mix/core/src/message_blend/crypto.rs index 4f76b4b8..910b28e0 100644 --- a/nomos-mix/core/src/message_blend/crypto.rs +++ b/nomos-mix/core/src/message_blend/crypto.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; /// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages /// for the message indistinguishability. #[derive(Clone, Copy, Debug)] -pub(crate) struct CryptographicProcessor { +pub struct CryptographicProcessor { settings: CryptographicProcessorSettings, } @@ -14,17 +14,17 @@ pub struct CryptographicProcessorSettings { } impl CryptographicProcessor { - pub(crate) fn new(settings: CryptographicProcessorSettings) -> Self { + pub fn new(settings: CryptographicProcessorSettings) -> Self { Self { settings } } - pub(crate) fn wrap_message(&self, message: &[u8]) -> Result, nomos_mix_message::Error> { + pub fn wrap_message(&self, message: &[u8]) -> Result, nomos_mix_message::Error> { // TODO: Use the actual Sphinx encoding instead of mock. // TODO: Select `num_mix_layers` random nodes from the membership. new_message(message, self.settings.num_mix_layers.try_into().unwrap()) } - pub(crate) fn unwrap_message( + pub fn unwrap_message( &self, message: &[u8], ) -> Result<(Vec, bool), nomos_mix_message::Error> { diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-mix/core/src/message_blend/mod.rs index 5577b4e7..180a583f 100644 --- a/nomos-mix/core/src/message_blend/mod.rs +++ b/nomos-mix/core/src/message_blend/mod.rs @@ -1,5 +1,5 @@ -mod crypto; -mod temporal; +pub mod crypto; +pub mod temporal; pub use crypto::CryptographicProcessorSettings; use futures::stream::BoxStream; @@ -8,8 +8,9 @@ use std::pin::Pin; use std::task::{Context, Poll}; pub use temporal::TemporalProcessorSettings; +use crate::message_blend::crypto::CryptographicProcessor; use crate::message_blend::temporal::TemporalProcessorExt; -use crate::message_blend::{crypto::CryptographicProcessor, temporal::TemporalProcessor}; +use crate::MixOutgoingMessage; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; @@ -21,175 +22,41 @@ pub struct MessageBlendSettings { pub temporal_processor: TemporalProcessorSettings, } -/// [`MessageBlend`] handles the entire Tier-2 spec. -/// - Wraps new messages using [`CryptographicProcessor`] +/// [`MessageBlendStream`] handles the entire mixing tiers process /// - Unwraps incoming messages received from network using [`CryptographicProcessor`] /// - Pushes unwrapped messages to [`TemporalProcessor`] -/// - Releases messages returned by [`TemporalProcessor`] to the proper channel -pub struct MessageBlend { - /// To receive new messages originated from this node - new_message_receiver: mpsc::UnboundedReceiver>, - /// To receive incoming messages from the network - inbound_message_receiver: mpsc::UnboundedReceiver>, - /// To release messages that are successfully processed but still wrapped - outbound_message_sender: mpsc::UnboundedSender>, - /// To release fully unwrapped messages - fully_unwrapped_message_sender: mpsc::UnboundedSender>, - /// Processors - cryptographic_processor: CryptographicProcessor, - temporal_processor: TemporalProcessor, -} - -impl MessageBlend { - pub fn new( - settings: MessageBlendSettings, - new_message_receiver: mpsc::UnboundedReceiver>, - inbound_message_receiver: mpsc::UnboundedReceiver>, - outbound_message_sender: mpsc::UnboundedSender>, - fully_unwrapped_message_sender: mpsc::UnboundedSender>, - ) -> Self { - Self { - new_message_receiver, - inbound_message_receiver, - outbound_message_sender, - fully_unwrapped_message_sender, - cryptographic_processor: CryptographicProcessor::new(settings.cryptographic_processor), - temporal_processor: TemporalProcessor::<_>::new(settings.temporal_processor), - } - } - - pub async fn run(&mut self) { - loop { - tokio::select! { - Some(new_message) = self.new_message_receiver.recv() => { - self.handle_new_message(new_message); - } - Some(incoming_message) = self.inbound_message_receiver.recv() => { - self.handle_incoming_message(incoming_message); - } - Some(msg) = self.temporal_processor.next() => { - self.release_temporal_processed_message(msg); - } - } - } - } - - fn handle_new_message(&mut self, message: Vec) { - match self.cryptographic_processor.wrap_message(&message) { - Ok(wrapped_message) => { - // Bypass Temporal Processor, and send the message to the outbound channel directly - // because the message is originated from this node. - if let Err(e) = self.outbound_message_sender.send(wrapped_message) { - tracing::error!("Failed to send message to the outbound channel: {e:?}"); - } - } - Err(e) => { - tracing::error!("Failed to wrap message: {:?}", e); - } - } - } - - fn handle_incoming_message(&mut self, message: Vec) { - match self.cryptographic_processor.unwrap_message(&message) { - Ok((unwrapped_message, fully_unwrapped)) => { - self.temporal_processor - .push_message(TemporalProcessableMessage { - message: unwrapped_message, - fully_unwrapped, - }); - } - Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => { - tracing::debug!("Message cannot be unwrapped by this node"); - } - Err(e) => { - tracing::error!("Failed to unwrap message: {:?}", e); - } - } - } - - fn release_temporal_processed_message(&mut self, message: TemporalProcessableMessage) { - if message.fully_unwrapped { - if let Err(e) = self.fully_unwrapped_message_sender.send(message.message) { - tracing::error!( - "Failed to send fully unwrapped message to the fully unwrapped channel: {e:?}" - ); - } - } else if let Err(e) = self.outbound_message_sender.send(message.message) { - tracing::error!("Failed to send message to the outbound channel: {e:?}"); - } - } -} - -#[derive(Clone)] -struct TemporalProcessableMessage { - message: Vec, - fully_unwrapped: bool, -} - -pub enum MessageBlendStreamIncomingMessage { - Local(Vec), - Inbound(Vec), -} - -pub enum MessageBlendStreamOutgoingMessage { - FullyUnwrapped(Vec), - Outbound(Vec), -} - pub struct MessageBlendStream { input_stream: S, - output_stream: BoxStream<'static, MessageBlendStreamOutgoingMessage>, - bypass_sender: UnboundedSender, - temporal_sender: UnboundedSender, + output_stream: BoxStream<'static, MixOutgoingMessage>, + temporal_sender: UnboundedSender, cryptographic_processor: CryptographicProcessor, } impl MessageBlendStream where - S: Stream, + S: Stream>, { pub fn new(input_stream: S, settings: MessageBlendSettings) -> Self { let cryptographic_processor = CryptographicProcessor::new(settings.cryptographic_processor); - let (bypass_sender, bypass_receiver) = mpsc::unbounded_channel(); let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel(); - let output_stream = tokio_stream::StreamExt::merge( - UnboundedReceiverStream::new(bypass_receiver), - UnboundedReceiverStream::new(temporal_receiver) - .to_temporal_stream(settings.temporal_processor), - ) - .boxed(); + let output_stream = UnboundedReceiverStream::new(temporal_receiver) + .temporal_stream(settings.temporal_processor) + .boxed(); Self { input_stream, output_stream, - bypass_sender, temporal_sender, cryptographic_processor, } } - fn process_new_message(self: &mut Pin<&mut Self>, message: Vec) { - match self.cryptographic_processor.wrap_message(&message) { - Ok(wrapped_message) => { - if let Err(e) = self - .bypass_sender - .send(MessageBlendStreamOutgoingMessage::Outbound(wrapped_message)) - { - tracing::error!("Failed to send message to the outbound channel: {e:?}"); - } - } - Err(e) => { - tracing::error!("Failed to wrap message: {:?}", e); - } - } - } - fn process_incoming_message(self: &mut Pin<&mut Self>, message: Vec) { match self.cryptographic_processor.unwrap_message(&message) { Ok((unwrapped_message, fully_unwrapped)) => { let message = if fully_unwrapped { - MessageBlendStreamOutgoingMessage::FullyUnwrapped(unwrapped_message) + MixOutgoingMessage::FullyUnwrapped(unwrapped_message) } else { - MessageBlendStreamOutgoingMessage::Outbound(unwrapped_message) + MixOutgoingMessage::Outbound(unwrapped_message) }; if let Err(e) = self.temporal_sender.send(message) { tracing::error!("Failed to send message to the outbound channel: {e:?}"); @@ -207,31 +74,25 @@ where impl Stream for MessageBlendStream where - S: Stream + Unpin, + S: Stream> + Unpin, { - type Item = MessageBlendStreamOutgoingMessage; + type Item = MixOutgoingMessage; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.input_stream.poll_next_unpin(cx) { - Poll::Ready(Some(MessageBlendStreamIncomingMessage::Local(message))) => { - self.process_new_message(message); - } - Poll::Ready(Some(MessageBlendStreamIncomingMessage::Inbound(message))) => { - self.process_incoming_message(message); - } - _ => {} + if let Poll::Ready(Some(message)) = self.input_stream.poll_next_unpin(cx) { + self.process_incoming_message(message); } self.output_stream.poll_next_unpin(cx) } } -pub trait MessageBlendExt: Stream { +pub trait MessageBlendExt: Stream> { fn blend(self, message_blend_settings: MessageBlendSettings) -> MessageBlendStream where - Self: Sized, + Self: Sized + Unpin, { MessageBlendStream::new(self, message_blend_settings) } } -impl MessageBlendExt for T where T: Stream {} +impl MessageBlendExt for T where T: Stream> {} diff --git a/nomos-mix/core/src/message_blend/temporal.rs b/nomos-mix/core/src/message_blend/temporal.rs index 3b857f93..586254b7 100644 --- a/nomos-mix/core/src/message_blend/temporal.rs +++ b/nomos-mix/core/src/message_blend/temporal.rs @@ -120,9 +120,8 @@ where self.processor.poll_next_unpin(cx) } } -#[allow(dead_code)] // TODO: Remove when integrating into blend pub trait TemporalProcessorExt: Stream { - fn to_temporal_stream(self, settings: TemporalProcessorSettings) -> TemporalStream + fn temporal_stream(self, settings: TemporalProcessorSettings) -> TemporalStream where Self: Sized, { diff --git a/nomos-mix/core/src/persistent_transmission.rs b/nomos-mix/core/src/persistent_transmission.rs index 48946286..a0ef0520 100644 --- a/nomos-mix/core/src/persistent_transmission.rs +++ b/nomos-mix/core/src/persistent_transmission.rs @@ -6,13 +6,10 @@ use serde::{Deserialize, Serialize}; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use std::time::Duration; +use tokio::time; use tokio::time::Interval; -use tokio::{ - sync::mpsc::{self, error::TryRecvError}, - time, -}; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct PersistentTransmissionSettings { /// The maximum number of messages that can be emitted per second pub max_emission_frequency: f64, @@ -29,6 +26,7 @@ impl Default for PersistentTransmissionSettings { } } +/// Transmit scheduled messages with a persistent rate as a stream. pub struct PersistentTransmissionStream where S: Stream, @@ -66,7 +64,7 @@ impl Stream for PersistentTransmissionStream where S: Stream> + Unpin, { - type Item = S::Item; + type Item = Vec; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let Self { @@ -102,57 +100,6 @@ pub trait PersistentTransmissionExt: Stream { impl PersistentTransmissionExt for S where S: Stream {} -/// Transmit scheduled messages with a persistent rate to the transmission channel. -/// -/// # Arguments -/// -/// * `settings` - The settings for the persistent transmission -/// * `schedule_receiver` - The channel for messages scheduled (from Tier 2 currently) -/// * `emission_sender` - The channel to emit messages -pub async fn persistent_transmission( - settings: PersistentTransmissionSettings, - schedule_receiver: mpsc::UnboundedReceiver>, - emission_sender: mpsc::UnboundedSender>, -) { - let mut schedule_receiver = schedule_receiver; - let mut interval = time::interval(Duration::from_secs_f64( - 1.0 / settings.max_emission_frequency, - )); - let mut coin = Coin::<_>::new( - ChaCha12Rng::from_entropy(), - settings.drop_message_probability, - ) - .unwrap(); - - loop { - interval.tick().await; - - // Emit the first one of the scheduled messages. - // If there is no scheduled message, emit a drop message with probability. - match schedule_receiver.try_recv() { - Ok(msg) => { - if let Err(e) = emission_sender.send(msg) { - tracing::error!("Failed to send message to the transmission channel: {e:?}"); - } - } - Err(TryRecvError::Empty) => { - // If the coin is head, emit the drop message. - if coin.flip() { - if let Err(e) = emission_sender.send(DROP_MESSAGE.to_vec()) { - tracing::error!( - "Failed to send drop message to the transmission channel: {e:?}" - ); - } - } - } - Err(TryRecvError::Disconnected) => { - tracing::error!("The schedule channel has been closed"); - break; - } - } - } -} - struct Coin { rng: R, distribution: Uniform, @@ -186,6 +133,7 @@ enum CoinError { mod tests { use super::*; use futures::StreamExt; + use tokio::sync::mpsc; macro_rules! assert_interval { ($last_time:expr, $lower_bound:expr, $upper_bound:expr) => { @@ -209,63 +157,6 @@ mod tests { }; } - #[tokio::test] - async fn test_persistent_transmission() { - let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel(); - let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel(); - - let settings = PersistentTransmissionSettings { - max_emission_frequency: 1.0, - // Set to always emit drop messages if no scheduled messages for easy testing - drop_message_probability: 1.0, - }; - - // Prepare the expected emission interval with torelance - let expected_emission_interval = - Duration::from_secs_f64(1.0 / settings.max_emission_frequency); - let torelance = expected_emission_interval / 10; // 10% torelance - let lower_bound = expected_emission_interval - torelance; - let upper_bound = expected_emission_interval + torelance; - - // Start the persistent transmission and schedule messages - tokio::spawn(persistent_transmission( - settings, - schedule_receiver, - emission_sender, - )); - // Messages must be scheduled in non-blocking manner. - schedule_sender.send(vec![1]).unwrap(); - schedule_sender.send(vec![2]).unwrap(); - schedule_sender.send(vec![3]).unwrap(); - - // Check if expected messages are emitted with the expected interval - assert_eq!(emission_receiver.recv().await.unwrap(), vec![1]); - let mut last_time = time::Instant::now(); - - assert_eq!(emission_receiver.recv().await.unwrap(), vec![2]); - assert_interval!(&mut last_time, lower_bound, upper_bound); - - assert_eq!(emission_receiver.recv().await.unwrap(), vec![3]); - assert_interval!(&mut last_time, lower_bound, upper_bound); - - assert_eq!( - emission_receiver.recv().await.unwrap(), - DROP_MESSAGE.to_vec() - ); - assert_interval!(&mut last_time, lower_bound, upper_bound); - - assert_eq!( - emission_receiver.recv().await.unwrap(), - DROP_MESSAGE.to_vec() - ); - assert_interval!(&mut last_time, lower_bound, upper_bound); - - // Schedule a new message and check if it is emitted at the next interval - schedule_sender.send(vec![4]).unwrap(); - assert_eq!(emission_receiver.recv().await.unwrap(), vec![4]); - assert_interval!(&mut last_time, lower_bound, upper_bound); - } - #[tokio::test] async fn test_persistent_transmission_stream() { let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel(); diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs index ef104604..8a2e3777 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/mix/src/lib.rs @@ -1,17 +1,17 @@ pub mod backends; pub mod network; -use std::fmt::Debug; - use async_trait::async_trait; use backends::MixBackend; use futures::StreamExt; use network::NetworkAdapter; use nomos_core::wire; -use nomos_mix::{ - message_blend::{MessageBlend, MessageBlendSettings}, - persistent_transmission::{persistent_transmission, PersistentTransmissionSettings}, +use nomos_mix::message_blend::crypto::CryptographicProcessor; +use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings}; +use nomos_mix::persistent_transmission::{ + PersistentTransmissionExt, PersistentTransmissionSettings, }; +use nomos_mix::MixOutgoingMessage; use nomos_network::NetworkService; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -21,7 +21,9 @@ use overwatch_rs::services::{ ServiceCore, ServiceData, ServiceId, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::fmt::Debug; use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; /// A mix service that sends messages to the mix network /// and broadcasts fully unwrapped messages through the [`NetworkService`]. @@ -77,73 +79,73 @@ where async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let Self { - mut service_state, + service_state, mut backend, network_relay, } = self; let mix_config = service_state.settings_reader.get_updated_settings(); - + let cryptographic_processor = + CryptographicProcessor::new(mix_config.message_blend.cryptographic_processor); let network_relay = network_relay.connect().await?; let network_adapter = Network::new(network_relay); - // Spawn Persistent Transmission - let (transmission_schedule_sender, transmission_schedule_receiver) = - mpsc::unbounded_channel(); - let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel(); - tokio::spawn(async move { - persistent_transmission( - mix_config.persistent_transmission, - transmission_schedule_receiver, - emission_sender, - ) - .await; - }); + // tier 1 persistent transmission + let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel(); + let mut persistent_transmission_messages = + UnboundedReceiverStream::new(persistent_receiver) + .persistent_transmission(mix_config.persistent_transmission); - // Spawn Message Blend and connect it to Persistent Transmission - let (new_message_sender, new_message_receiver) = mpsc::unbounded_channel(); - let (processor_inbound_sender, processor_inbound_receiver) = mpsc::unbounded_channel(); - let (fully_unwrapped_message_sender, mut fully_unwrapped_message_receiver) = - mpsc::unbounded_channel(); - tokio::spawn(async move { - MessageBlend::new( - mix_config.message_blend, - new_message_receiver, - processor_inbound_receiver, - // Connect the outputs of Message Blend to Persistent Transmission - transmission_schedule_sender, - fully_unwrapped_message_sender, - ) - .run() - .await; - }); + // tier 2 blend + let mut blend_messages = backend + .listen_to_incoming_messages() + .blend(mix_config.message_blend); - // A channel to listen to messages received from the [`MixBackend`] - let mut incoming_message_stream = backend.listen_to_incoming_messages(); + // local messages, are bypassed and send immediately + let mut local_messages = service_state + .inbound_relay + .map(|ServiceMessage::Mix(message)| { + wire::serialize(&message) + .expect("Message from internal services should not fail to serialize") + }); let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); loop { tokio::select! { - Some(msg) = incoming_message_stream.next() => { - tracing::debug!("Received message from mix backend. Sending it to Processor"); - if let Err(e) = processor_inbound_sender.send(msg) { - tracing::error!("Failed to send incoming message to processor: {e:?}"); - } - } - Some(msg) = emission_receiver.recv() => { - tracing::debug!("Emitting message to mix network"); + Some(msg) = persistent_transmission_messages.next() => { backend.publish(msg).await; } - Some(msg) = fully_unwrapped_message_receiver.recv() => { - tracing::debug!("Broadcasting fully unwrapped message"); - match wire::deserialize::>(&msg) { - Ok(msg) => { - network_adapter.broadcast(msg.message, msg.broadcast_settings).await; - }, - _ => tracing::error!("unrecognized message from mix backend") + // Already processed blend messages + Some(msg) = blend_messages.next() => { + match msg { + MixOutgoingMessage::Outbound(msg) => { + if let Err(e) = persistent_sender.send(msg) { + tracing::error!("Error sending message to persistent stream: {e}"); + } + } + MixOutgoingMessage::FullyUnwrapped(msg) => { + tracing::debug!("Broadcasting fully unwrapped message"); + match wire::deserialize::>(&msg) { + Ok(msg) => { + network_adapter.broadcast(msg.message, msg.broadcast_settings).await; + }, + _ => { + tracing::error!("unrecognized message from mix backend"); + } + } + } } } - Some(msg) = service_state.inbound_relay.recv() => { - Self::handle_service_message(msg, &new_message_sender); + Some(msg) = local_messages.next() => { + match cryptographic_processor.wrap_message(&msg) { + Ok(wrapped_message) => { + if let Err(e) = persistent_sender.send(wrapped_message) { + tracing::error!("Error sending message to persistent stream: {e}"); + } + } + Err(e) => { + tracing::error!("Failed to wrap message: {:?}", e); + } + } } Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { @@ -164,20 +166,6 @@ where Network: NetworkAdapter, Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned, { - fn handle_service_message( - msg: ServiceMessage, - new_message_sender: &mpsc::UnboundedSender>, - ) { - match msg { - ServiceMessage::Mix(msg) => { - // Serialize the new message and send it to the Processor - if let Err(e) = new_message_sender.send(wire::serialize(&msg).unwrap()) { - tracing::error!("Failed to send a new message to processor: {e:?}"); - } - } - } - } - async fn should_stop_service(msg: LifecycleMessage) -> bool { match msg { LifecycleMessage::Kill => true, @@ -198,8 +186,8 @@ where #[derive(Serialize, Deserialize, Clone, Debug)] pub struct MixConfig { pub backend: BackendSettings, - pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, + pub persistent_transmission: PersistentTransmissionSettings, } /// A message that is handled by [`MixService`]. diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs index 9b6a2a69..de525c1c 100644 --- a/tests/src/nodes/executor.rs +++ b/tests/src/nodes/executor.rs @@ -3,6 +3,8 @@ use std::process::{Command, Stdio}; use std::time::Duration; use std::{net::SocketAddr, process::Child}; +use crate::adjust_timeout; +use crate::topology::configs::GeneralConfig; use cryptarchia_consensus::CryptarchiaSettings; use nomos_da_dispersal::backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings}; use nomos_da_dispersal::DispersalServiceSettings; @@ -28,9 +30,6 @@ use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE}; use nomos_node::RocksBackendSettings; use tempfile::NamedTempFile; -use crate::adjust_timeout; -use crate::topology::configs::GeneralConfig; - use super::{create_tempdir, persist_tempdir, GetRangeReq, CLIENT}; const BIN_PATH: &str = "../target/debug/nomos-executor"; diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs index 68fa5518..1a894f81 100644 --- a/tests/src/nodes/validator.rs +++ b/tests/src/nodes/validator.rs @@ -243,6 +243,7 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { mix: nomos_mix_service::MixConfig { backend: config.mix_config.backend, persistent_transmission: Default::default(), + message_blend: MessageBlendSettings { cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, temporal_processor: TemporalProcessorSettings {