From d73f3c2040ff304794dcb4cdbafb5dbe60cd162d Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Thu, 7 Nov 2024 04:22:26 +0100 Subject: [PATCH] Mix: Implemented cover traffic (#910) * Implemented cover traffic * Update as per last specs --- nomos-mix/core/Cargo.toml | 1 + nomos-mix/core/src/cover_traffic.rs | 110 ++++++++++++++++++++++++++++ nomos-mix/core/src/lib.rs | 1 + 3 files changed, 112 insertions(+) create mode 100644 nomos-mix/core/src/cover_traffic.rs diff --git a/nomos-mix/core/Cargo.toml b/nomos-mix/core/Cargo.toml index cc67d3ed..d55a8c19 100644 --- a/nomos-mix/core/Cargo.toml +++ b/nomos-mix/core/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +blake2 = "0.10" cached = "0.53" tokio = { version = "1", features = ["time", "sync", "macros"] } tokio-stream = "0.1" diff --git a/nomos-mix/core/src/cover_traffic.rs b/nomos-mix/core/src/cover_traffic.rs new file mode 100644 index 00000000..fd41337d --- /dev/null +++ b/nomos-mix/core/src/cover_traffic.rs @@ -0,0 +1,110 @@ +use blake2::Digest; +use futures::{Stream, StreamExt}; +use nomos_mix_message::MixMessage; +use std::collections::HashSet; +use std::hash::Hash; +use std::marker::PhantomData; +use std::ops::{DerefMut, Div}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub struct CoverTrafficSettings { + node_id: [u8; 32], + number_of_hops: usize, + slots_per_epoch: usize, + network_size: usize, +} +pub struct CoverTraffic { + winning_probability: f64, + settings: CoverTrafficSettings, + epoch_stream: EpochStream, + slot_stream: SlotStream, + selected_slots: HashSet, + _message: PhantomData, +} + +impl CoverTraffic +where + EpochStream: Stream, + SlotStream: Stream, +{ + pub fn new( + settings: CoverTrafficSettings, + epoch_stream: EpochStream, + slot_stream: SlotStream, + ) -> Self { + let winning_probability = winning_probability(settings.number_of_hops); + CoverTraffic { + winning_probability, + settings, + epoch_stream, + slot_stream, + selected_slots: Default::default(), + _message: Default::default(), + } + } +} + +impl Stream for CoverTraffic +where + EpochStream: Stream + Unpin, + SlotStream: Stream + Unpin, + Message: MixMessage + Unpin, +{ + type Item = Vec; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + winning_probability, + settings, + epoch_stream, + slot_stream, + selected_slots, + .. + } = self.deref_mut(); + if let Poll::Ready(Some(epoch)) = epoch_stream.poll_next_unpin(cx) { + *selected_slots = select_slot( + settings.node_id, + epoch, + settings.network_size, + settings.slots_per_epoch, + *winning_probability, + ); + } + if let Poll::Ready(Some(slot)) = slot_stream.poll_next_unpin(cx) { + if selected_slots.contains(&(slot as u32)) { + return Poll::Ready(Some(vec![])); + } + } + Poll::Pending + } +} + +fn generate_ticket>(node_id: Id, r: usize, slot: usize) -> u32 { + let mut hasher = blake2::Blake2b512::new(); + hasher.update(node_id); + hasher.update(r.to_be_bytes()); + hasher.update(slot.to_be_bytes()); + let hash = &hasher.finalize()[..]; + u32::from_be_bytes(hash.try_into().unwrap()) +} + +fn select_slot + Copy>( + node_id: Id, + r: usize, + network_size: usize, + slots_per_epoch: usize, + winning_probability: f64, +) -> HashSet { + let i = (slots_per_epoch as f64).div(network_size as f64) * winning_probability; + let i = i.ceil() as usize; + let mut w = HashSet::new(); + while w.len() != i { + w.insert(generate_ticket(node_id, r, slots_per_epoch)); + } + w +} + +fn winning_probability(number_of_hops: usize) -> f64 { + 1.0 / number_of_hops as f64 +} diff --git a/nomos-mix/core/src/lib.rs b/nomos-mix/core/src/lib.rs index 5484b495..53f33bf1 100644 --- a/nomos-mix/core/src/lib.rs +++ b/nomos-mix/core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod cover_traffic; pub mod membership; pub mod message_blend; pub mod persistent_transmission;