refactor(blendnet): rename mix to blend (#63)

* rename mix to blend

* update ci

* add missing dir
This commit is contained in:
Youngjoon Lee 2024-12-13 13:31:28 +09:00 committed by GitHub
parent 8ef0adc996
commit 45a2152bba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 73 additions and 73 deletions

View File

@ -11,7 +11,7 @@ env:
CARGO_TERM_COLOR: always CARGO_TERM_COLOR: always
jobs: jobs:
mixnet: simlib:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4

View File

@ -18,8 +18,8 @@ tracing = "0.1.40"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] } tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] }
netrunner = { path = "../netrunner" } netrunner = { path = "../netrunner" }
nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" } nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" }
nomos-mix = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix" } nomos-blend = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend" }
nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix-message" } nomos-blend-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend-message" }
futures = "0.3.31" futures = "0.3.31"
rand_chacha = "0.3" rand_chacha = "0.3"
multiaddr = "0.18" multiaddr = "0.18"

View File

@ -44,6 +44,6 @@
"max_emission_frequency": 1.0, "max_emission_frequency": 1.0,
"drop_message_probability": 0.0 "drop_message_probability": 0.0
}, },
"number_of_mix_layers": 2, "number_of_blend_layers": 2,
"max_delay_seconds": 10 "max_delay_seconds": 10
} }

View File

@ -4,8 +4,8 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates // crates
use crate::node::mix::state::{MixnodeRecord, MixnodeState}; use crate::node::blend::state::{BlendnodeRecord, BlendnodeState};
use crate::node::mix::{MixMessage, MixnodeSettings}; use crate::node::blend::{BlendMessage, BlendnodeSettings};
use anyhow::Ok; use anyhow::Ok;
use clap::Parser; use clap::Parser;
use crossbeam::channel; use crossbeam::channel;
@ -16,8 +16,8 @@ use netrunner::node::{NodeId, NodeIdExt};
use netrunner::output_processors::Record; use netrunner::output_processors::Record;
use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
use nomos_mix::cover_traffic::CoverTrafficSettings; use nomos_blend::cover_traffic::CoverTrafficSettings;
use nomos_mix::message_blend::{ use nomos_blend::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
@ -28,7 +28,7 @@ use rand_chacha::ChaCha12Rng;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
// internal // internal
use crate::node::mix::MixNode; use crate::node::blend::BlendNode;
use crate::settings::SimSettings; use crate::settings::SimSettings;
use netrunner::{runner::SimulationRunner, settings::SimulationSettings}; use netrunner::{runner::SimulationRunner, settings::SimulationSettings};
@ -88,19 +88,19 @@ impl SimulationApp {
let regions_data = RegionsData::new(regions, behaviours); let regions_data = RegionsData::new(regions, behaviours);
let ids = node_ids.clone(); let ids = node_ids.clone();
let network = Arc::new(Mutex::new(Network::<MixMessage>::new(regions_data, seed))); let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed)));
let nodes: Vec<_> = node_ids let nodes: Vec<_> = node_ids
.iter() .iter()
.copied() .copied()
.map(|node_id| { .map(|node_id| {
let mut network = network.lock(); let mut network = network.lock();
create_boxed_mixnode( create_boxed_blendnode(
node_id, node_id,
&mut network, &mut network,
settings.simulation_settings.clone(), settings.simulation_settings.clone(),
no_netcap, no_netcap,
MixnodeSettings { BlendnodeSettings {
connected_peers: ids connected_peers: ids
.iter() .iter()
.filter(|&id| id != &node_id) .filter(|&id| id != &node_id)
@ -115,7 +115,7 @@ impl SimulationApp {
message_blend: MessageBlendSettings { message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings { cryptographic_processor: CryptographicProcessorSettings {
private_key: node_id.into(), private_key: node_id.into(),
num_mix_layers: settings.number_of_mix_layers, num_blend_layers: settings.number_of_blend_layers,
}, },
temporal_processor: TemporalSchedulerSettings { temporal_processor: TemporalSchedulerSettings {
max_delay_seconds: settings.max_delay_seconds, max_delay_seconds: settings.max_delay_seconds,
@ -140,13 +140,13 @@ impl SimulationApp {
} }
} }
fn create_boxed_mixnode( fn create_boxed_blendnode(
node_id: NodeId, node_id: NodeId,
network: &mut Network<MixMessage>, network: &mut Network<BlendMessage>,
simulation_settings: SimulationSettings, simulation_settings: SimulationSettings,
no_netcap: bool, no_netcap: bool,
mixnode_settings: MixnodeSettings, blendnode_settings: BlendnodeSettings,
) -> BoxedNode<MixnodeSettings, MixnodeState> { ) -> BoxedNode<BlendnodeSettings, BlendnodeState> {
let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded(); let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded();
let (node_message_sender, node_message_receiver) = channel::unbounded(); let (node_message_sender, node_message_receiver) = channel::unbounded();
// Dividing milliseconds in second by milliseconds in the step. // Dividing milliseconds in second by milliseconds in the step.
@ -174,7 +174,7 @@ fn create_boxed_mixnode(
node_message_sender, node_message_sender,
network_message_receiver, network_message_receiver,
); );
Box::new(MixNode::new(node_id, mixnode_settings, network_interface)) Box::new(BlendNode::new(node_id, blendnode_settings, network_interface))
} }
fn run<M, S, T>( fn run<M, S, T>(
@ -189,7 +189,7 @@ where
T: Serialize + Clone + 'static, T: Serialize + Clone + 'static,
{ {
let stream_settings = settings.stream_settings.clone(); let stream_settings = settings.stream_settings.clone();
let runner = SimulationRunner::<_, MixnodeRecord, S, T>::new( let runner = SimulationRunner::<_, BlendnodeRecord, S, T>::new(
network, network,
nodes, nodes,
Default::default(), Default::default(),
@ -199,11 +199,11 @@ where
let handle = match stream_type { let handle = match stream_type {
Some(StreamType::Naive) => { Some(StreamType::Naive) => {
let settings = stream_settings.unwrap_naive(); let settings = stream_settings.unwrap_naive();
runner.simulate_and_subscribe::<NaiveSubscriber<MixnodeRecord>>(settings)? runner.simulate_and_subscribe::<NaiveSubscriber<BlendnodeRecord>>(settings)?
} }
Some(StreamType::IO) => { Some(StreamType::IO) => {
let settings = stream_settings.unwrap_io(); let settings = stream_settings.unwrap_io();
runner.simulate_and_subscribe::<IOSubscriber<MixnodeRecord>>(settings)? runner.simulate_and_subscribe::<IOSubscriber<BlendnodeRecord>>(settings)?
} }
None => runner.simulate()?, None => runner.simulate()?,
}; };

View File

@ -1,4 +1,4 @@
use crate::node::mix::scheduler::Interval; use crate::node::blend::scheduler::Interval;
use crossbeam::channel; use crossbeam::channel;
use futures::stream::iter; use futures::stream::iter;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};

View File

@ -5,7 +5,7 @@ pub mod scheduler;
pub mod state; pub mod state;
pub mod stream_wrapper; pub mod stream_wrapper;
use crate::node::mix::consensus_streams::{Epoch, Slot}; use crate::node::blend::consensus_streams::{Epoch, Slot};
use cached::{Cached, TimedCache}; use cached::{Cached, TimedCache};
use crossbeam::channel; use crossbeam::channel;
use futures::Stream; use futures::Stream;
@ -18,7 +18,7 @@ use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition, warding::WardCondition,
}; };
use nomos_mix::{ use nomos_blend::{
cover_traffic::{CoverTraffic, CoverTrafficSettings}, cover_traffic::{CoverTraffic, CoverTrafficSettings},
membership::Membership, membership::Membership,
message_blend::{ message_blend::{
@ -27,29 +27,29 @@ use nomos_mix::{
persistent_transmission::{ persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
}, },
MixOutgoingMessage, BlendOutgoingMessage,
}; };
use nomos_mix_message::mock::MockMixMessage; use nomos_blend_message::mock::MockBlendMessage;
use rand::SeedableRng; use rand::SeedableRng;
use rand_chacha::ChaCha12Rng; use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalRelease}; use scheduler::{Interval, TemporalRelease};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use state::MixnodeState; use state::BlendnodeState;
use std::{pin::pin, task::Poll, time::Duration}; use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream; use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct MixMessage(Vec<u8>); pub struct BlendMessage(Vec<u8>);
impl PayloadSize for MixMessage { impl PayloadSize for BlendMessage {
fn size_bytes(&self) -> u32 { fn size_bytes(&self) -> u32 {
2208 2208
} }
} }
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct MixnodeSettings { pub struct BlendnodeSettings {
pub connected_peers: Vec<NodeId>, pub connected_peers: Vec<NodeId>,
pub data_message_lottery_interval: Duration, pub data_message_lottery_interval: Duration,
pub stake_proportion: f64, pub stake_proportion: f64,
@ -57,19 +57,19 @@ pub struct MixnodeSettings {
pub epoch_duration: Duration, pub epoch_duration: Duration,
pub slot_duration: Duration, pub slot_duration: Duration,
pub persistent_transmission: PersistentTransmissionSettings, pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings<MockMixMessage>, pub message_blend: MessageBlendSettings<MockBlendMessage>,
pub cover_traffic_settings: CoverTrafficSettings, pub cover_traffic_settings: CoverTrafficSettings,
pub membership: Vec<<MockMixMessage as nomos_mix_message::MixMessage>::PublicKey>, pub membership: Vec<<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey>,
} }
type Sha256Hash = [u8; 32]; type Sha256Hash = [u8; 32];
/// This node implementation only used for testing different streaming implementation purposes. /// This node implementation only used for testing different streaming implementation purposes.
pub struct MixNode { pub struct BlendNode {
id: NodeId, id: NodeId,
state: MixnodeState, state: BlendnodeState,
settings: MixnodeSettings, settings: BlendnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>, network_interface: InMemoryNetworkInterface<BlendMessage>,
message_cache: TimedCache<Sha256Hash, ()>, message_cache: TimedCache<Sha256Hash, ()>,
data_msg_lottery_update_time_sender: channel::Sender<Duration>, data_msg_lottery_update_time_sender: channel::Sender<Duration>,
@ -81,28 +81,28 @@ pub struct MixNode {
persistent_transmission_messages: PersistentTransmissionStream< persistent_transmission_messages: PersistentTransmissionStream<
CrossbeamReceiverStream<Vec<u8>>, CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng, ChaCha12Rng,
MockMixMessage, MockBlendMessage,
Interval, Interval,
>, >,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockMixMessage>, crypto_processor: CryptographicProcessor<ChaCha12Rng, MockBlendMessage>,
blend_sender: channel::Sender<Vec<u8>>, blend_sender: channel::Sender<Vec<u8>>,
blend_update_time_sender: channel::Sender<Duration>, blend_update_time_sender: channel::Sender<Duration>,
blend_messages: MessageBlendStream< blend_messages: MessageBlendStream<
CrossbeamReceiverStream<Vec<u8>>, CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng, ChaCha12Rng,
MockMixMessage, MockBlendMessage,
TemporalRelease, TemporalRelease,
>, >,
epoch_update_sender: channel::Sender<Duration>, epoch_update_sender: channel::Sender<Duration>,
slot_update_sender: channel::Sender<Duration>, slot_update_sender: channel::Sender<Duration>,
cover_traffic: CoverTraffic<Epoch, Slot, MockMixMessage>, cover_traffic: CoverTraffic<Epoch, Slot, MockBlendMessage>,
} }
impl MixNode { impl BlendNode {
pub fn new( pub fn new(
id: NodeId, id: NodeId,
settings: MixnodeSettings, settings: BlendnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>, network_interface: InMemoryNetworkInterface<BlendMessage>,
) -> Self { ) -> Self {
let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed);
@ -137,18 +137,18 @@ impl MixNode {
let (blend_sender, blend_receiver) = channel::unbounded(); let (blend_sender, blend_receiver) = channel::unbounded();
let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded(); let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded();
let nodes: Vec< let nodes: Vec<
nomos_mix::membership::Node< nomos_blend::membership::Node<
<MockMixMessage as nomos_mix_message::MixMessage>::PublicKey, <MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
>, >,
> = settings > = settings
.membership .membership
.iter() .iter()
.map(|&public_key| nomos_mix::membership::Node { .map(|&public_key| nomos_blend::membership::Node {
address: Multiaddr::empty(), address: Multiaddr::empty(),
public_key, public_key,
}) })
.collect(); .collect();
let membership = Membership::<MockMixMessage>::new(nodes, id.into()); let membership = Membership::<MockBlendMessage>::new(nodes, id.into());
let crypto_processor = CryptographicProcessor::new( let crypto_processor = CryptographicProcessor::new(
settings.message_blend.cryptographic_processor.clone(), settings.message_blend.cryptographic_processor.clone(),
membership.clone(), membership.clone(),
@ -172,7 +172,7 @@ impl MixNode {
// tier 3 cover traffic // tier 3 cover traffic
let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded();
let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded(); let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded();
let cover_traffic: CoverTraffic<Epoch, Slot, MockMixMessage> = CoverTraffic::new( let cover_traffic: CoverTraffic<Epoch, Slot, MockBlendMessage> = CoverTraffic::new(
settings.cover_traffic_settings, settings.cover_traffic_settings,
Epoch::new(settings.epoch_duration, epoch_updater_update_receiver), Epoch::new(settings.epoch_duration, epoch_updater_update_receiver),
Slot::new( Slot::new(
@ -189,7 +189,7 @@ impl MixNode {
// We expected that a message will be delivered to most of nodes within 60s. // We expected that a message will be delivered to most of nodes within 60s.
message_cache: TimedCache::with_lifespan(60), message_cache: TimedCache::with_lifespan(60),
settings, settings,
state: MixnodeState { state: BlendnodeState {
node_id: id, node_id: id,
step_id: 0, step_id: 0,
num_messages_fully_unwrapped: 0, num_messages_fully_unwrapped: 0,
@ -212,7 +212,7 @@ impl MixNode {
fn forward( fn forward(
&mut self, &mut self,
message: MixMessage, message: BlendMessage,
exclude_node: Option<NodeId>, exclude_node: Option<NodeId>,
log: Option<EmissionLog>, log: Option<EmissionLog>,
) { ) {
@ -234,7 +234,7 @@ impl MixNode {
self.message_cache.cache_set(Self::sha256(&message.0), ()); self.message_cache.cache_set(Self::sha256(&message.0), ());
} }
fn receive(&mut self) -> Vec<NetworkMessage<MixMessage>> { fn receive(&mut self) -> Vec<NetworkMessage<BlendMessage>> {
self.network_interface self.network_interface
.receive_messages() .receive_messages()
.into_iter() .into_iter()
@ -293,10 +293,10 @@ impl MixNode {
} }
} }
impl Node for MixNode { impl Node for BlendNode {
type Settings = MixnodeSettings; type Settings = BlendnodeSettings;
type State = MixnodeState; type State = BlendnodeState;
fn id(&self) -> NodeId { fn id(&self) -> NodeId {
self.id self.id
@ -339,10 +339,10 @@ impl Node for MixNode {
// Proceed message blend // Proceed message blend
if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) { if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) {
match msg { match msg {
MixOutgoingMessage::Outbound(msg) => { BlendOutgoingMessage::Outbound(msg) => {
self.persistent_sender.send(msg).unwrap(); self.persistent_sender.send(msg).unwrap();
} }
MixOutgoingMessage::FullyUnwrapped(payload) => { BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload); let payload = Payload::load(payload);
self.log_message_fully_unwrapped(&payload); self.log_message_fully_unwrapped(&payload);
self.state.num_messages_fully_unwrapped += 1; self.state.num_messages_fully_unwrapped += 1;
@ -367,7 +367,7 @@ impl Node for MixNode {
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{ {
self.forward( self.forward(
MixMessage(msg), BlendMessage(msg),
None, None,
Some(self.new_emission_log("FromPersistent")), Some(self.new_emission_log("FromPersistent")),
); );

View File

@ -10,7 +10,7 @@ use netrunner::{
}; };
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct MixnodeState { pub struct BlendnodeState {
#[serde(serialize_with = "serialize_node_id_as_index")] #[serde(serialize_with = "serialize_node_id_as_index")]
pub node_id: NodeId, pub node_id: NodeId,
pub step_id: usize, pub step_id: usize,
@ -19,45 +19,45 @@ pub struct MixnodeState {
#[derive(Serialize)] #[derive(Serialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum MixnodeRecord { pub enum BlendnodeRecord {
Runtime(Runtime), Runtime(Runtime),
Settings(Box<SimulationSettings>), Settings(Box<SimulationSettings>),
#[allow(clippy::vec_box)] // we downcast stuff and we need the extra boxing #[allow(clippy::vec_box)] // we downcast stuff and we need the extra boxing
Data(Vec<Box<MixnodeState>>), Data(Vec<Box<BlendnodeState>>),
} }
impl From<Runtime> for MixnodeRecord { impl From<Runtime> for BlendnodeRecord {
fn from(value: Runtime) -> Self { fn from(value: Runtime) -> Self {
Self::Runtime(value) Self::Runtime(value)
} }
} }
impl From<SimulationSettings> for MixnodeRecord { impl From<SimulationSettings> for BlendnodeRecord {
fn from(value: SimulationSettings) -> Self { fn from(value: SimulationSettings) -> Self {
Self::Settings(Box::new(value)) Self::Settings(Box::new(value))
} }
} }
impl Record for MixnodeRecord { impl Record for BlendnodeRecord {
type Data = MixnodeState; type Data = BlendnodeState;
fn record_type(&self) -> RecordType { fn record_type(&self) -> RecordType {
match self { match self {
MixnodeRecord::Runtime(_) => RecordType::Meta, BlendnodeRecord::Runtime(_) => RecordType::Meta,
MixnodeRecord::Settings(_) => RecordType::Settings, BlendnodeRecord::Settings(_) => RecordType::Settings,
MixnodeRecord::Data(_) => RecordType::Data, BlendnodeRecord::Data(_) => RecordType::Data,
} }
} }
fn data(&self) -> Vec<&MixnodeState> { fn data(&self) -> Vec<&BlendnodeState> {
match self { match self {
MixnodeRecord::Data(d) => d.iter().map(AsRef::as_ref).collect(), BlendnodeRecord::Data(d) => d.iter().map(AsRef::as_ref).collect(),
_ => vec![], _ => vec![],
} }
} }
} }
impl<S, T: Clone + Serialize + 'static> TryFrom<&SimulationState<S, T>> for MixnodeRecord { impl<S, T: Clone + Serialize + 'static> TryFrom<&SimulationState<S, T>> for BlendnodeRecord {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(state: &SimulationState<S, T>) -> Result<Self, Self::Error> { fn try_from(state: &SimulationState<S, T>) -> Result<Self, Self::Error> {

View File

@ -1 +1 @@
pub mod mix; pub mod blend;

View File

@ -1,5 +1,5 @@
use netrunner::settings::SimulationSettings; use netrunner::settings::SimulationSettings;
use nomos_mix::persistent_transmission::PersistentTransmissionSettings; use nomos_blend::persistent_transmission::PersistentTransmissionSettings;
use serde::{Deserialize, Deserializer}; use serde::{Deserialize, Deserializer};
use std::time::Duration; use std::time::Duration;
@ -21,7 +21,7 @@ pub struct SimSettings {
// For tier 1 // For tier 1
pub persistent_transmission: PersistentTransmissionSettings, pub persistent_transmission: PersistentTransmissionSettings,
// For tier 2 // For tier 2
pub number_of_mix_layers: usize, pub number_of_blend_layers: usize,
pub max_delay_seconds: u64, pub max_delay_seconds: u64,
} }