diff --git a/Cargo.toml b/Cargo.toml index 3a37ddfd..40ab82be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,9 +25,9 @@ members = [ "nomos-services/data-availability/dispersal", "nomos-services/data-availability/tests", "nomos-services/mix", + "nomos-mix/core", "nomos-mix/message", "nomos-mix/network", - "nomos-mix/queue", "nomos-tracing", "nomos-cli", "nomos-utils", diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 0fa8e053..5c2a65b1 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -271,8 +271,8 @@ pub fn update_mix( mix.backend.peering_degree = peering_degree; } - if let Some(_num_mix_layers) = mix_num_mix_layers { - // TODO: Set num_mix_layers to the proper module setting + if let Some(num_mix_layers) = mix_num_mix_layers { + mix.message_blend.cryptographic_processor.num_mix_layers = num_mix_layers; } Ok(()) diff --git a/nomos-mix/core/Cargo.toml b/nomos-mix/core/Cargo.toml new file mode 100644 index 00000000..6c8f90b7 --- /dev/null +++ b/nomos-mix/core/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "nomos-mix" +version = "0.1.0" +edition = "2021" + +[dependencies] +cached = "0.53" +tokio = { version = "1" } +tracing = "0.1" +rand = "0.8" +serde = { version = "1.0", features = ["derive"] } +nomos-mix-message = { path = "../message" } +futures = "0.3" +rand_chacha = "0.3" diff --git a/nomos-mix/core/src/lib.rs b/nomos-mix/core/src/lib.rs new file mode 100644 index 00000000..baf08ef6 --- /dev/null +++ b/nomos-mix/core/src/lib.rs @@ -0,0 +1,2 @@ +pub mod message_blend; +pub mod persistent_transmission; diff --git a/nomos-mix/core/src/message_blend/crypto.rs b/nomos-mix/core/src/message_blend/crypto.rs new file mode 100644 index 00000000..cfd7401a --- /dev/null +++ b/nomos-mix/core/src/message_blend/crypto.rs @@ -0,0 +1,32 @@ +use nomos_mix_message::{new_message, unwrap_message}; +use serde::{Deserialize, Serialize}; + +/// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages +/// for the message indistinguishability. +pub(crate) struct CryptographicProcessor { + settings: CryptographicProcessorSettings, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CryptographicProcessorSettings { + pub num_mix_layers: usize, +} + +impl CryptographicProcessor { + pub(crate) fn new(settings: CryptographicProcessorSettings) -> Self { + Self { settings } + } + + pub(crate) fn wrap_message(&self, message: &[u8]) -> Result, nomos_mix_message::Error> { + // TODO: Use the actual Sphinx encoding instead of mock. + // TODO: Select `num_mix_layers` random nodes from the membership. + new_message(message, self.settings.num_mix_layers.try_into().unwrap()) + } + + pub(crate) fn unwrap_message( + &self, + message: &[u8], + ) -> Result<(Vec, bool), nomos_mix_message::Error> { + unwrap_message(message) + } +} diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-mix/core/src/message_blend/mod.rs new file mode 100644 index 00000000..04f9da14 --- /dev/null +++ b/nomos-mix/core/src/message_blend/mod.rs @@ -0,0 +1,122 @@ +mod crypto; +mod temporal; + +pub use crypto::CryptographicProcessorSettings; +use futures::StreamExt; +pub use temporal::TemporalProcessorSettings; + +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; + +use crate::message_blend::{crypto::CryptographicProcessor, temporal::TemporalProcessor}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MessageBlendSettings { + pub cryptographic_processor: CryptographicProcessorSettings, + pub temporal_processor: TemporalProcessorSettings, +} + +/// [`MessageBlend`] handles the entire Tier-2 spec. +/// - Wraps new messages using [`CryptographicProcessor`] +/// - Unwraps incoming messages received from network using [`CryptographicProcessor`] +/// - Pushes unwrapped messages to [`TemporalProcessor`] +/// - Releases messages returned by [`TemporalProcessor`] to the proper channel +pub struct MessageBlend { + /// To receive new messages originated from this node + new_message_receiver: mpsc::UnboundedReceiver>, + /// To receive incoming messages from the network + inbound_message_receiver: mpsc::UnboundedReceiver>, + /// To release messages that are successfully processed but still wrapped + outbound_message_sender: mpsc::UnboundedSender>, + /// To release fully unwrapped messages + fully_unwrapped_message_sender: mpsc::UnboundedSender>, + /// Processors + cryptographic_processor: CryptographicProcessor, + temporal_processor: TemporalProcessor, +} + +impl MessageBlend { + pub fn new( + settings: MessageBlendSettings, + new_message_receiver: mpsc::UnboundedReceiver>, + inbound_message_receiver: mpsc::UnboundedReceiver>, + outbound_message_sender: mpsc::UnboundedSender>, + fully_unwrapped_message_sender: mpsc::UnboundedSender>, + ) -> Self { + Self { + new_message_receiver, + inbound_message_receiver, + outbound_message_sender, + fully_unwrapped_message_sender, + cryptographic_processor: CryptographicProcessor::new(settings.cryptographic_processor), + temporal_processor: TemporalProcessor::<_>::new(settings.temporal_processor), + } + } + + pub async fn run(&mut self) { + loop { + tokio::select! { + Some(new_message) = self.new_message_receiver.recv() => { + self.handle_new_message(new_message); + } + Some(incoming_message) = self.inbound_message_receiver.recv() => { + self.handle_incoming_message(incoming_message); + } + Some(msg) = self.temporal_processor.next() => { + self.release_temporal_processed_message(msg); + } + } + } + } + + fn handle_new_message(&mut self, message: Vec) { + match self.cryptographic_processor.wrap_message(&message) { + Ok(wrapped_message) => { + // Bypass Temporal Processor, and send the message to the outbound channel directly + // because the message is originated from this node. + if let Err(e) = self.outbound_message_sender.send(wrapped_message) { + tracing::error!("Failed to send message to the outbound channel: {e:?}"); + } + } + Err(e) => { + tracing::error!("Failed to wrap message: {:?}", e); + } + } + } + + fn handle_incoming_message(&mut self, message: Vec) { + match self.cryptographic_processor.unwrap_message(&message) { + Ok((unwrapped_message, fully_unwrapped)) => { + self.temporal_processor + .push_message(TemporalProcessableMessage { + message: unwrapped_message, + fully_unwrapped, + }); + } + Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => { + tracing::debug!("Message cannot be unwrapped by this node"); + } + Err(e) => { + tracing::error!("Failed to unwrap message: {:?}", e); + } + } + } + + fn release_temporal_processed_message(&mut self, message: TemporalProcessableMessage) { + if message.fully_unwrapped { + if let Err(e) = self.fully_unwrapped_message_sender.send(message.message) { + tracing::error!( + "Failed to send fully unwrapped message to the fully unwrapped channel: {e:?}" + ); + } + } else if let Err(e) = self.outbound_message_sender.send(message.message) { + tracing::error!("Failed to send message to the outbound channel: {e:?}"); + } + } +} + +#[derive(Clone)] +struct TemporalProcessableMessage { + message: Vec, + fully_unwrapped: bool, +} diff --git a/nomos-mix/core/src/message_blend/temporal.rs b/nomos-mix/core/src/message_blend/temporal.rs new file mode 100644 index 00000000..16c1753d --- /dev/null +++ b/nomos-mix/core/src/message_blend/temporal.rs @@ -0,0 +1,99 @@ +use std::{ + collections::VecDeque, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{Future, Stream}; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use tokio::time; + +/// [`TemporalProcessor`] delays messages randomly to hide timing correlation +/// between incoming and outgoing messages from a node. +/// +/// See the [`Stream`] implementation below for more details on how it works. +pub(crate) struct TemporalProcessor { + settings: TemporalProcessorSettings, + // All scheduled messages + queue: VecDeque, + /// Interval in seconds for running the lottery to release a message + lottery_interval: time::Interval, + /// To wait a few seconds after running the lottery before releasing the message. + /// The lottery returns how long to wait before releasing the message. + release_timer: Option>>, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TemporalProcessorSettings { + pub max_delay_seconds: u64, +} + +impl TemporalProcessor { + pub(crate) fn new(settings: TemporalProcessorSettings) -> Self { + let lottery_interval = Self::lottery_interval(settings.max_delay_seconds); + Self { + settings, + queue: VecDeque::new(), + lottery_interval, + release_timer: None, + } + } + + /// Create [`time::Interval`] for running the lottery to release a message. + fn lottery_interval(max_delay_seconds: u64) -> time::Interval { + time::interval(Duration::from_secs(Self::lottery_interval_seconds( + max_delay_seconds, + ))) + } + + /// Calculate the interval in seconds for running the lottery. + /// The lottery interval is half of the maximum delay, + /// in order to guarantee that the interval between two subsequent message emissions + /// is at most [`max_delay_seconds`]. + fn lottery_interval_seconds(max_delay_seconds: u64) -> u64 { + max_delay_seconds / 2 + } + + /// Run the lottery to determine the delay before releasing a message. + /// The delay is in [0, `lottery_interval_seconds`). + fn run_lottery(&self) -> u64 { + let interval = Self::lottery_interval_seconds(self.settings.max_delay_seconds); + rand::thread_rng().gen_range(0..interval) + } + + /// Schedule a message to be released later. + pub(crate) fn push_message(&mut self, message: M) { + self.queue.push_back(message); + } +} + +impl Stream for TemporalProcessor +where + M: Unpin + Clone + 'static, +{ + type Item = M; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Check whether it's time to run a new lottery to determine the delay. + if self.lottery_interval.poll_tick(cx).is_ready() { + let delay = self.run_lottery(); + // Set timer to release the message after the delay. + self.release_timer = Some(Box::pin(time::sleep(Duration::from_secs(delay)))); + } + + // Check whether the release timer is done if it exists. + if let Some(timer) = self.release_timer.as_mut() { + if timer.as_mut().poll(cx).is_ready() { + self.release_timer.take(); // Reset timer after it's done + if let Some(msg) = self.queue.pop_front() { + // Release the 1st message in the queue if it exists. + return Poll::Ready(Some(msg)); + } + } + } + + Poll::Pending + } +} diff --git a/nomos-mix/core/src/persistent_transmission.rs b/nomos-mix/core/src/persistent_transmission.rs new file mode 100644 index 00000000..7e1dbab7 --- /dev/null +++ b/nomos-mix/core/src/persistent_transmission.rs @@ -0,0 +1,191 @@ +use std::time::Duration; + +use nomos_mix_message::DROP_MESSAGE; +use rand::{distributions::Uniform, prelude::Distribution, Rng, SeedableRng}; +use rand_chacha::ChaCha12Rng; +use serde::{Deserialize, Serialize}; +use tokio::{ + sync::mpsc::{self, error::TryRecvError}, + time, +}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PersistentTransmissionSettings { + /// The maximum number of messages that can be emitted per second + pub max_emission_frequency: f64, + /// The probability of emitting a drop message by coin flipping + pub drop_message_probability: f64, +} + +impl Default for PersistentTransmissionSettings { + fn default() -> Self { + Self { + max_emission_frequency: 1.0, + drop_message_probability: 0.5, + } + } +} + +/// Transmit scheduled messages with a persistent rate to the transmission channel. +/// +/// # Arguments +/// +/// * `settings` - The settings for the persistent transmission +/// * `schedule_receiver` - The channel for messages scheduled (from Tier 2 currently) +/// * `emission_sender` - The channel to emit messages +pub async fn persistent_transmission( + settings: PersistentTransmissionSettings, + schedule_receiver: mpsc::UnboundedReceiver>, + emission_sender: mpsc::UnboundedSender>, +) { + let mut schedule_receiver = schedule_receiver; + let mut interval = time::interval(Duration::from_secs_f64( + 1.0 / settings.max_emission_frequency, + )); + let mut coin = Coin::<_>::new( + ChaCha12Rng::from_entropy(), + settings.drop_message_probability, + ) + .unwrap(); + + loop { + interval.tick().await; + + // Emit the first one of the scheduled messages. + // If there is no scheduled message, emit a drop message with probability. + match schedule_receiver.try_recv() { + Ok(msg) => { + if let Err(e) = emission_sender.send(msg) { + tracing::error!("Failed to send message to the transmission channel: {e:?}"); + } + } + Err(TryRecvError::Empty) => { + // If the coin is head, emit the drop message. + if coin.flip() { + if let Err(e) = emission_sender.send(DROP_MESSAGE.to_vec()) { + tracing::error!( + "Failed to send drop message to the transmission channel: {e:?}" + ); + } + } + } + Err(TryRecvError::Disconnected) => { + tracing::error!("The schedule channel has been closed"); + break; + } + } + } +} + +struct Coin { + rng: R, + distribution: Uniform, + probability: f64, +} + +impl Coin { + fn new(rng: R, probability: f64) -> Result { + if !(0.0..=1.0).contains(&probability) { + return Err(CoinError::InvalidProbability); + } + Ok(Self { + rng, + distribution: Uniform::from(0.0..1.0), + probability, + }) + } + + // Flip the coin based on the given probability. + fn flip(&mut self) -> bool { + self.distribution.sample(&mut self.rng) < self.probability + } +} + +#[derive(Debug)] +enum CoinError { + InvalidProbability, +} + +#[cfg(test)] +mod tests { + use super::*; + + macro_rules! assert_interval { + ($last_time:expr, $lower_bound:expr, $upper_bound:expr) => { + let now = time::Instant::now(); + let interval = now.duration_since(*$last_time); + + assert!( + interval >= $lower_bound, + "interval {:?} is too short. lower_bound: {:?}", + interval, + $lower_bound, + ); + assert!( + interval <= $upper_bound, + "interval {:?} is too long. upper_bound: {:?}", + interval, + $upper_bound, + ); + + *$last_time = now; + }; + } + + #[tokio::test] + async fn test_persistent_transmission() { + let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel(); + let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel(); + + let settings = PersistentTransmissionSettings { + max_emission_frequency: 1.0, + // Set to always emit drop messages if no scheduled messages for easy testing + drop_message_probability: 1.0, + }; + + // Prepare the expected emission interval with torelance + let expected_emission_interval = + Duration::from_secs_f64(1.0 / settings.max_emission_frequency); + let torelance = expected_emission_interval / 10; // 10% torelance + let lower_bound = expected_emission_interval - torelance; + let upper_bound = expected_emission_interval + torelance; + + // Start the persistent transmission and schedule messages + tokio::spawn(persistent_transmission( + settings, + schedule_receiver, + emission_sender, + )); + // Messages must be scheduled in non-blocking manner. + schedule_sender.send(vec![1]).unwrap(); + schedule_sender.send(vec![2]).unwrap(); + schedule_sender.send(vec![3]).unwrap(); + + // Check if expected messages are emitted with the expected interval + assert_eq!(emission_receiver.recv().await.unwrap(), vec![1]); + let mut last_time = time::Instant::now(); + + assert_eq!(emission_receiver.recv().await.unwrap(), vec![2]); + assert_interval!(&mut last_time, lower_bound, upper_bound); + + assert_eq!(emission_receiver.recv().await.unwrap(), vec![3]); + assert_interval!(&mut last_time, lower_bound, upper_bound); + + assert_eq!( + emission_receiver.recv().await.unwrap(), + DROP_MESSAGE.to_vec() + ); + assert_interval!(&mut last_time, lower_bound, upper_bound); + + assert_eq!( + emission_receiver.recv().await.unwrap(), + DROP_MESSAGE.to_vec() + ); + assert_interval!(&mut last_time, lower_bound, upper_bound); + + // Schedule a new message and check if it is emitted at the next interval + schedule_sender.send(vec![4]).unwrap(); + assert_eq!(emission_receiver.recv().await.unwrap(), vec![4]); + assert_interval!(&mut last_time, lower_bound, upper_bound); + } +} diff --git a/nomos-mix/network/Cargo.toml b/nomos-mix/network/Cargo.toml index 64bfcb21..9871cdc1 100644 --- a/nomos-mix/network/Cargo.toml +++ b/nomos-mix/network/Cargo.toml @@ -9,8 +9,8 @@ futures = "0.3.30" futures-timer = "3.0.3" libp2p = "0.53" tracing = "0.1" +nomos-mix = { path = "../core" } nomos-mix-message = { path = "../message" } -nomos-mix-queue = { path = "../queue" } [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } diff --git a/nomos-mix/queue/Cargo.toml b/nomos-mix/queue/Cargo.toml deleted file mode 100644 index 77c0e7ed..00000000 --- a/nomos-mix/queue/Cargo.toml +++ /dev/null @@ -1,7 +0,0 @@ -[package] -name = "nomos-mix-queue" -version = "0.1.0" -edition = "2021" - -[dependencies] -rand = "0.8.5" diff --git a/nomos-mix/queue/src/lib.rs b/nomos-mix/queue/src/lib.rs deleted file mode 100644 index 397dc4a2..00000000 --- a/nomos-mix/queue/src/lib.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::collections::VecDeque; - -/// A [`Queue`] controls the order of messages to be emitted to a single connection. -pub trait Queue { - /// Push a message to the queue. - fn push(&mut self, data: T); - - /// Pop a message from the queue. - /// - /// The returned message is either the real message pushed before or a noise message. - fn pop(&mut self) -> T; -} - -/// A regular queue that does not mix the order of messages. -/// -/// This queue returns a noise message if the queue is empty. -pub struct NonMixQueue { - queue: VecDeque, - noise: T, -} - -impl NonMixQueue { - pub fn new(noise: T) -> Self { - Self { - queue: VecDeque::new(), - noise, - } - } -} - -impl Queue for NonMixQueue { - fn push(&mut self, data: T) { - self.queue.push_back(data); - } - - fn pop(&mut self) -> T { - self.queue.pop_front().unwrap_or(self.noise.clone()) - } -} diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index e08bfd36..600cac8d 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -24,6 +24,7 @@ nomos-mempool = { path = "../../../nomos-services/mempool" } nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb-backend"] } nomos-network = { path = "../../network", features = ["mock"] } nomos-mix-service = { path = "../../mix" } +nomos-mix = { path = "../../../nomos-mix/core" } nomos-libp2p = { path = "../../../nomos-libp2p" } libp2p = { version = "0.53.2", features = ["ed25519"] } once_cell = "1.19" diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 8ab1b94e..cc2b5a4a 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -1,5 +1,8 @@ // std use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; +use nomos_mix::message_blend::{ + CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, +}; use std::path::PathBuf; use std::time::Duration; // crates @@ -207,6 +210,13 @@ pub fn new_node( }, mix: MixConfig { backend: mix_config.clone(), + persistent_transmission: Default::default(), + message_blend: MessageBlendSettings { + cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + temporal_processor: TemporalProcessorSettings { + max_delay_seconds: 2, + }, + }, }, da_network: DaNetworkConfig { backend: DaNetworkBackendSettings { diff --git a/nomos-services/mix/Cargo.toml b/nomos-services/mix/Cargo.toml index 38666224..79b475eb 100644 --- a/nomos-services/mix/Cargo.toml +++ b/nomos-services/mix/Cargo.toml @@ -8,6 +8,7 @@ async-trait = "0.1" futures = "0.3" libp2p = { version = "0.53", features = ["ed25519"] } nomos-libp2p = { path = "../../nomos-libp2p", optional = true } +nomos-mix = { path = "../../nomos-mix/core" } nomos-core = { path = "../../nomos-core/chain-defs" } nomos-mix-network = { path = "../../nomos-mix/network" } nomos-mix-message = { path = "../../nomos-mix/message" } diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs index d937b6c4..ef104604 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/mix/src/lib.rs @@ -8,7 +8,10 @@ use backends::MixBackend; use futures::StreamExt; use network::NetworkAdapter; use nomos_core::wire; -use nomos_mix_message::{new_message, unwrap_message}; +use nomos_mix::{ + message_blend::{MessageBlend, MessageBlendSettings}, + persistent_transmission::{persistent_transmission, PersistentTransmissionSettings}, +}; use nomos_network::NetworkService; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -18,6 +21,7 @@ use overwatch_rs::services::{ ServiceCore, ServiceData, ServiceId, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::sync::mpsc; /// A mix service that sends messages to the mix network /// and broadcasts fully unwrapped messages through the [`NetworkService`]. @@ -77,12 +81,41 @@ where mut backend, network_relay, } = self; + let mix_config = service_state.settings_reader.get_updated_settings(); let network_relay = network_relay.connect().await?; let network_adapter = Network::new(network_relay); - // TODO: Spawn PersistentTransmission (Tier 1) - // TODO: Spawn Processor (Tier 2) and connect it to PersistentTransmission + // Spawn Persistent Transmission + let (transmission_schedule_sender, transmission_schedule_receiver) = + mpsc::unbounded_channel(); + let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel(); + tokio::spawn(async move { + persistent_transmission( + mix_config.persistent_transmission, + transmission_schedule_receiver, + emission_sender, + ) + .await; + }); + + // Spawn Message Blend and connect it to Persistent Transmission + let (new_message_sender, new_message_receiver) = mpsc::unbounded_channel(); + let (processor_inbound_sender, processor_inbound_receiver) = mpsc::unbounded_channel(); + let (fully_unwrapped_message_sender, mut fully_unwrapped_message_receiver) = + mpsc::unbounded_channel(); + tokio::spawn(async move { + MessageBlend::new( + mix_config.message_blend, + new_message_receiver, + processor_inbound_receiver, + // Connect the outputs of Message Blend to Persistent Transmission + transmission_schedule_sender, + fully_unwrapped_message_sender, + ) + .run() + .await; + }); // A channel to listen to messages received from the [`MixBackend`] let mut incoming_message_stream = backend.listen_to_incoming_messages(); @@ -91,11 +124,17 @@ where loop { tokio::select! { Some(msg) = incoming_message_stream.next() => { - // TODO: The following logic is wrong and temporary. - // Here we're unwrapping the message and broadcasting it, - // but the message should be handled by Processor and PersistentTransmission. - let (msg, fully_unwrapped) = unwrap_message(&msg).unwrap(); - assert!(fully_unwrapped); + tracing::debug!("Received message from mix backend. Sending it to Processor"); + if let Err(e) = processor_inbound_sender.send(msg) { + tracing::error!("Failed to send incoming message to processor: {e:?}"); + } + } + Some(msg) = emission_receiver.recv() => { + tracing::debug!("Emitting message to mix network"); + backend.publish(msg).await; + } + Some(msg) = fully_unwrapped_message_receiver.recv() => { + tracing::debug!("Broadcasting fully unwrapped message"); match wire::deserialize::>(&msg) { Ok(msg) => { network_adapter.broadcast(msg.message, msg.broadcast_settings).await; @@ -104,7 +143,7 @@ where } } Some(msg) = service_state.inbound_relay.recv() => { - Self::handle_service_message(msg, &mut backend).await; + Self::handle_service_message(msg, &new_message_sender); } Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { @@ -125,20 +164,16 @@ where Network: NetworkAdapter, Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned, { - async fn handle_service_message( + fn handle_service_message( msg: ServiceMessage, - backend: &mut Backend, + new_message_sender: &mpsc::UnboundedSender>, ) { match msg { ServiceMessage::Mix(msg) => { - // split sending in two steps to help the compiler understand we do not - // need to hold an instance of &I (which is not send) across an await point - // TODO: The following logic is wrong and temporary. - // Here we're wrapping the message here and publishing the message immediately, - // but the message should be handled by Processor and PersistentTransmission. - let wrapped_msg = new_message(&wire::serialize(&msg).unwrap(), 1).unwrap(); - let _send = backend.publish(wrapped_msg); - _send.await + // Serialize the new message and send it to the Processor + if let Err(e) = new_message_sender.send(wire::serialize(&msg).unwrap()) { + tracing::error!("Failed to send a new message to processor: {e:?}"); + } } } } @@ -163,6 +198,8 @@ where #[derive(Serialize, Deserialize, Clone, Debug)] pub struct MixConfig { pub backend: BackendSettings, + pub persistent_transmission: PersistentTransmissionSettings, + pub message_blend: MessageBlendSettings, } /// A message that is handled by [`MixService`]. diff --git a/tests/Cargo.toml b/tests/Cargo.toml index f9c1f917..77297970 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -11,6 +11,7 @@ nomos-node = { path = "../nodes/nomos-node", default-features = false } nomos-executor = { path = "../nodes/nomos-executor", default-features = false } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] } +nomos-mix = { path = "../nomos-mix/core" } cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } nomos-tracing = { path = "../nomos-tracing" } nomos-tracing-service = { path = "../nomos-services/tracing" } diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs index dcdd5c43..49b4022a 100644 --- a/tests/src/nodes/executor.rs +++ b/tests/src/nodes/executor.rs @@ -20,6 +20,9 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as Verif use nomos_da_verifier::DaVerifierServiceSettings; use nomos_executor::api::backend::AxumBackendSettings; use nomos_executor::config::Config; +use nomos_mix::message_blend::{ + CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, +}; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE}; use nomos_node::RocksBackendSettings; @@ -154,6 +157,13 @@ pub fn create_executor_config(config: GeneralConfig) -> Config { }, mix: nomos_mix_service::MixConfig { backend: config.mix_config.backend, + persistent_transmission: Default::default(), + message_blend: MessageBlendSettings { + cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + temporal_processor: TemporalProcessorSettings { + max_delay_seconds: 2, + }, + }, }, cryptarchia: CryptarchiaSettings { notes: config.consensus_config.notes, diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs index e7816a7b..d09b2cfd 100644 --- a/tests/src/nodes/validator.rs +++ b/tests/src/nodes/validator.rs @@ -14,6 +14,9 @@ use nomos_da_sampling::{backend::kzgrs::KzgrsSamplingBackendSettings, DaSampling use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings}; use nomos_mempool::MempoolMetrics; +use nomos_mix::message_blend::{ + CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings, +}; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{ CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK, @@ -239,6 +242,13 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { }, mix: nomos_mix_service::MixConfig { backend: config.mix_config.backend, + persistent_transmission: Default::default(), + message_blend: MessageBlendSettings { + cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, + temporal_processor: TemporalProcessorSettings { + max_delay_seconds: 2, + }, + }, }, cryptarchia: CryptarchiaSettings { notes: config.consensus_config.notes,