From 2e5db9a3f368575f98d91f93ab75045fecbeed1e Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:26:19 +0700 Subject: [PATCH] wip: add blend --- network-runner/Cargo.lock | 1 + network-runner/Cargo.toml | 1 + network-runner/src/node/mix/mod.rs | 97 ++++++++++++++++--- network-runner/src/node/mix/step_scheduler.rs | 6 +- 4 files changed, 89 insertions(+), 16 deletions(-) diff --git a/network-runner/Cargo.lock b/network-runner/Cargo.lock index e19d68d..e133e41 100644 --- a/network-runner/Cargo.lock +++ b/network-runner/Cargo.lock @@ -1909,6 +1909,7 @@ dependencies = [ "getrandom", "humantime", "humantime-serde", + "multiaddr", "nomos-mix", "nomos-mix-message", "once_cell", diff --git a/network-runner/Cargo.toml b/network-runner/Cargo.toml index bd709b5..e11de03 100644 --- a/network-runner/Cargo.toml +++ b/network-runner/Cargo.toml @@ -53,6 +53,7 @@ tracing-subscriber = { version = "0.3", features = [ nomos-mix = { git = "https://github.com/logos-co/nomos-node", rev = "9b29c17", package = "nomos-mix" } nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", rev = "9b29c17", package = "nomos-mix-message" } rand_chacha = "0.3.1" +multiaddr = "0.18.2" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs index 2cf232c..36446f2 100644 --- a/network-runner/src/node/mix/mod.rs +++ b/network-runner/src/node/mix/mod.rs @@ -6,8 +6,14 @@ use super::{Node, NodeId}; use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}; use crossbeam::channel; use futures::Stream; -use nomos_mix::persistent_transmission::{ - PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, +use multiaddr::Multiaddr; +use nomos_mix::{ + membership::Membership, + message_blend::{MessageBlendExt, MessageBlendSettings, MessageBlendStream}, + persistent_transmission::{ + PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, + }, + MixOutgoingMessage, }; use nomos_mix_message::mock::MockMixMessage; use rand::SeedableRng; @@ -19,7 +25,7 @@ use std::{ task::Poll, time::Duration, }; -use step_scheduler::Interval; +use step_scheduler::{Interval, TemporalRelease}; use stream_wrapper::CrossbeamReceiverStream; #[derive(Debug, Clone)] @@ -33,11 +39,13 @@ impl PayloadSize for MixMessage { } } -#[derive(Clone, Default, Deserialize)] +#[derive(Clone, Deserialize)] pub struct MixnodeSettings { pub connected_peers: Vec, pub seed: u64, pub persistent_transmission: PersistentTransmissionSettings, + pub message_blend: MessageBlendSettings, + pub membership: Vec<::PublicKey>, } /// This node implementation only used for testing different streaming implementation purposes. @@ -48,13 +56,21 @@ pub struct MixNode { network_interface: InMemoryNetworkInterface, persistent_sender: channel::Sender>, - update_time_sender: channel::Sender, + persistent_update_time_sender: channel::Sender, persistent_transmission_messages: PersistentTransmissionStream< CrossbeamReceiverStream>, ChaCha12Rng, MockMixMessage, Interval, >, + blend_sender: channel::Sender>, + blend_update_time_sender: channel::Sender, + blend_messages: MessageBlendStream< + CrossbeamReceiverStream>, + ChaCha12Rng, + MockMixMessage, + TemporalRelease, + >, } impl MixNode { @@ -69,28 +85,65 @@ impl MixNode { step_id: 0, }; + let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); + + // Init Tier-1: Persistent transmission let (persistent_sender, persistent_receiver) = channel::unbounded(); - let (update_time_sender, update_time_receiver) = channel::unbounded(); + let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) .persistent_transmission( settings.persistent_transmission, - ChaCha12Rng::seed_from_u64(settings.seed), + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), Interval::new( Duration::from_secs_f64( 1.0 / settings.persistent_transmission.max_emission_frequency, ), - update_time_receiver, + persistent_update_time_receiver, ), ); + // Init Tier-2: Temporal processor + let (blend_sender, blend_receiver) = channel::unbounded(); + let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded(); + let nodes: Vec< + nomos_mix::membership::Node< + ::PublicKey, + >, + > = settings + .membership + .iter() + .map(|&public_key| nomos_mix::membership::Node { + address: Multiaddr::empty(), + public_key, + }) + .collect(); + let membership = Membership::::new(nodes, id.into()); + let temporal_release = TemporalRelease::new( + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + blend_update_time_receiver, + ( + 1, + settings.message_blend.temporal_processor.max_delay_seconds, + ), + ); + let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend( + settings.message_blend.clone(), + membership, + temporal_release, + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + ); + Self { id, network_interface, settings, state, persistent_sender, - update_time_sender, + persistent_update_time_sender, persistent_transmission_messages, + blend_sender, + blend_update_time_sender, + blend_messages, } } @@ -100,6 +153,11 @@ impl MixNode { .send_message(*node_id, message.clone()) } } + + fn update_time(&mut self, elapsed: Duration) { + self.persistent_update_time_sender.send(elapsed).unwrap(); + self.blend_update_time_sender.send(elapsed).unwrap(); + } } impl Node for MixNode { @@ -116,9 +174,13 @@ impl Node for MixNode { } fn step(&mut self, elapsed: Duration) { + self.update_time(elapsed); + let Self { - update_time_sender, + persistent_sender, persistent_transmission_messages, + blend_sender, + blend_messages, .. } = self; @@ -126,16 +188,25 @@ impl Node for MixNode { for message in messages { println!(">>>>> Node {}, message: {message:?}", self.id); let MixMessage::Dummy(msg) = message.into_payload(); - self.persistent_sender.send(msg).unwrap(); + blend_sender.send(msg).unwrap(); } self.state.step_id += 1; self.state.mock_counter += 1; - update_time_sender.send(elapsed).unwrap(); - let waker = futures::task::noop_waker(); let mut cx = futures::task::Context::from_waker(&waker); + if let Poll::Ready(Some(msg)) = pin::pin!(blend_messages).poll_next(&mut cx) { + match msg { + MixOutgoingMessage::Outbound(msg) => { + persistent_sender.send(msg).unwrap(); + } + MixOutgoingMessage::FullyUnwrapped(_) => { + //TODO: increase counter and create a tracing event + } + } + } + if let Poll::Ready(Some(msg)) = pin::pin!(persistent_transmission_messages).poll_next(&mut cx) { diff --git a/network-runner/src/node/mix/step_scheduler.rs b/network-runner/src/node/mix/step_scheduler.rs index 886ee47..75027eb 100644 --- a/network-runner/src/node/mix/step_scheduler.rs +++ b/network-runner/src/node/mix/step_scheduler.rs @@ -44,15 +44,15 @@ impl Stream for Interval { } } -struct TemporalRelease { - random_sleeps: Box>, +pub struct TemporalRelease { + random_sleeps: Box + Send + Sync + 'static>, elapsed: Duration, current_sleep: Duration, update_time: channel::Receiver, } impl TemporalRelease { - pub fn new( + pub fn new( mut rng: Rng, update_time: channel::Receiver, (min_delay, max_delay): (u64, u64),