diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index aeb64361..339f5334 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -62,6 +62,13 @@ mix: num_mix_layers: 1 temporal_processor: max_delay_seconds: 5 + cover_traffic: + epoch_duration: + secs: 432000 + nanos: 0 + slot_duration: + secs: 20 + nanos: 0 membership: - address: /ip4/127.0.0.1/udp/3001/quic-v1 public_key: [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1] diff --git a/nomos-mix/core/src/cover_traffic.rs b/nomos-mix/core/src/cover_traffic.rs index ababf121..8042ed2b 100644 --- a/nomos-mix/core/src/cover_traffic.rs +++ b/nomos-mix/core/src/cover_traffic.rs @@ -122,7 +122,7 @@ mod tests { #[test] fn test_ticket() { generate_ticket(10u32.to_be_bytes(), 1123, 0); - for i in (0..1u32) { + for i in 0..1u32 { let slots = select_slot(i.to_be_bytes(), 1234, 100, 21600, winning_probability(1)); println!("slots = {slots:?}"); } diff --git a/nomos-mix/core/src/membership.rs b/nomos-mix/core/src/membership.rs index 857dc450..f1bda2ca 100644 --- a/nomos-mix/core/src/membership.rs +++ b/nomos-mix/core/src/membership.rs @@ -9,6 +9,7 @@ where M: MixMessage, { remote_nodes: Vec>, + local_node: Node, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -22,10 +23,20 @@ where M: MixMessage, M::PublicKey: PartialEq, { - pub fn new(mut nodes: Vec>, local_public_key: M::PublicKey) -> Self { - nodes.retain(|node| node.public_key != local_public_key); + pub fn new(nodes: Vec>, local_public_key: M::PublicKey) -> Self { + let mut remote_nodes = Vec::with_capacity(nodes.len() - 1); + let mut local_node = None; + nodes.into_iter().for_each(|node| { + if node.public_key == local_public_key { + local_node = Some(node); + } else { + remote_nodes.push(node); + } + }); + Self { - remote_nodes: nodes, + remote_nodes, + local_node: local_node.expect("Local node not found"), } } @@ -36,4 +47,12 @@ where ) -> Vec<&Node> { self.remote_nodes.choose_multiple(rng, amount).collect() } + + pub fn local_node(&self) -> &Node { + &self.local_node + } + + pub fn size(&self) -> usize { + self.remote_nodes.len() + 1 + } } diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index d0d72dad..2ab024d4 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -229,6 +229,10 @@ pub fn new_node( max_delay_seconds: 2, }, }, + cover_traffic: nomos_mix_service::CoverTrafficExtSettings { + epoch_duration: Duration::from_secs(432000), + slot_duration: Duration::from_secs(20), + }, membership: mix_config.membership.clone(), }, da_network: DaNetworkConfig { diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs index 8e3bbc34..dfbc681e 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/mix/src/lib.rs @@ -6,15 +6,18 @@ use backends::MixBackend; use futures::StreamExt; use network::NetworkAdapter; use nomos_core::wire; -use nomos_mix::membership::{Membership, Node}; -use nomos_mix::message_blend::crypto::CryptographicProcessor; use nomos_mix::message_blend::temporal::TemporalScheduler; +use nomos_mix::message_blend::{crypto::CryptographicProcessor, CryptographicProcessorSettings}; use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings}; use nomos_mix::persistent_transmission::{ PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, }; use nomos_mix::MixOutgoingMessage; -use nomos_mix_message::mock::MockMixMessage; +use nomos_mix::{ + cover_traffic::{CoverTraffic, CoverTrafficSettings}, + membership::{Membership, Node}, +}; +use nomos_mix_message::{mock::MockMixMessage, MixMessage}; use nomos_network::NetworkService; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -127,12 +130,22 @@ where ChaCha12Rng::from_entropy(), ); let mut blend_messages = backend.listen_to_incoming_messages().blend( - mix_config.message_blend, + mix_config.message_blend.clone(), membership.clone(), temporal_scheduler, ChaCha12Rng::from_entropy(), ); + // tier 3 cover traffic + let mut cover_traffic: CoverTraffic<_, _, MockMixMessage> = CoverTraffic::new( + mix_config.cover_traffic.cover_traffic_settings( + &membership, + &mix_config.message_blend.cryptographic_processor, + ), + mix_config.cover_traffic.epoch_stream(), + mix_config.cover_traffic.slot_stream(), + ); + // local messages, are bypassed and send immediately let mut local_messages = service_state .inbound_relay @@ -162,23 +175,17 @@ where network_adapter.broadcast(msg.message, msg.broadcast_settings).await; }, _ => { - tracing::error!("unrecognized message from mix backend"); + tracing::debug!("unrecognized message from mix backend"); } } } } } + Some(msg) = cover_traffic.next() => { + Self::wrap_and_send_to_persistent_transmission(msg, &mut cryptographic_processor, &persistent_sender); + } Some(msg) = local_messages.next() => { - match cryptographic_processor.wrap_message(&msg) { - Ok(wrapped_message) => { - if let Err(e) = persistent_sender.send(wrapped_message) { - tracing::error!("Error sending message to persistent stream: {e}"); - } - } - Err(e) => { - tracing::error!("Failed to wrap message: {:?}", e); - } - } + Self::wrap_and_send_to_persistent_transmission(msg, &mut cryptographic_processor, &persistent_sender); } Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { @@ -214,6 +221,23 @@ where } } } + + fn wrap_and_send_to_persistent_transmission( + message: Vec, + cryptographic_processor: &mut CryptographicProcessor, + persistent_sender: &mpsc::UnboundedSender>, + ) { + match cryptographic_processor.wrap_message(&message) { + Ok(wrapped_message) => { + if let Err(e) = persistent_sender.send(wrapped_message) { + tracing::error!("Error sending message to persistent stream: {e}"); + } + } + Err(e) => { + tracing::error!("Failed to wrap message: {:?}", e); + } + } + } } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -221,11 +245,64 @@ pub struct MixConfig { pub backend: BackendSettings, pub message_blend: MessageBlendSettings, pub persistent_transmission: PersistentTransmissionSettings, + pub cover_traffic: CoverTrafficExtSettings, pub membership: Vec< Node<::PublicKey>, >, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct CoverTrafficExtSettings { + pub epoch_duration: Duration, + pub slot_duration: Duration, +} + +impl CoverTrafficExtSettings { + fn cover_traffic_settings( + &self, + membership: &Membership, + cryptographic_processor_settings: &CryptographicProcessorSettings< + ::PrivateKey, + >, + ) -> CoverTrafficSettings { + CoverTrafficSettings { + node_id: membership.local_node().public_key, + number_of_hops: cryptographic_processor_settings.num_mix_layers, + slots_per_epoch: self.slots_per_epoch(), + network_size: membership.size(), + } + } + + fn slots_per_epoch(&self) -> usize { + (self.epoch_duration.as_secs() as usize) + .checked_div(self.slot_duration.as_secs() as usize) + .expect("Invalid epoch & slot duration") + } + + fn epoch_stream( + &self, + ) -> futures::stream::Map< + futures::stream::Enumerate, + impl FnMut((usize, time::Instant)) -> usize, + > { + IntervalStream::new(time::interval(self.epoch_duration)) + .enumerate() + .map(|(i, _)| i) + } + + fn slot_stream( + &self, + ) -> futures::stream::Map< + futures::stream::Enumerate, + impl FnMut((usize, time::Instant)) -> usize, + > { + let slots_per_epoch = self.slots_per_epoch(); + IntervalStream::new(time::interval(self.slot_duration)) + .enumerate() + .map(move |(i, _)| i % slots_per_epoch) + } +} + impl MixConfig { fn membership(&self) -> Membership { // We use private key as a public key because the `MockMixMessage` doesn't differentiate between them. diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs index 13ee4650..382b56eb 100644 --- a/tests/src/nodes/executor.rs +++ b/tests/src/nodes/executor.rs @@ -166,6 +166,10 @@ pub fn create_executor_config(config: GeneralConfig) -> Config { max_delay_seconds: 2, }, }, + cover_traffic: nomos_mix_service::CoverTrafficExtSettings { + epoch_duration: Duration::from_secs(432000), + slot_duration: Duration::from_secs(20), + }, membership: config.mix_config.membership, }, cryptarchia: CryptarchiaSettings { diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs index c11dd115..1ce45546 100644 --- a/tests/src/nodes/validator.rs +++ b/tests/src/nodes/validator.rs @@ -252,6 +252,10 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { max_delay_seconds: 2, }, }, + cover_traffic: nomos_mix_service::CoverTrafficExtSettings { + epoch_duration: Duration::from_secs(432000), + slot_duration: Duration::from_secs(20), + }, membership: config.mix_config.membership, }, cryptarchia: CryptarchiaSettings {