diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 9e90789..1f96bf4 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -17,9 +17,9 @@ serde_json = "1.0.132" tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] } netrunner = { path = "../netrunner" } -nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" } -nomos-blend = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend" } -nomos-blend-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend-message" } +nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git", branch = "blend-persistent-generic-msg" } +nomos-blend = { git = "https://github.com/logos-co/nomos-node", branch = "blend-persistent-generic-msg", package = "nomos-blend" } +nomos-blend-message = { git = "https://github.com/logos-co/nomos-node", branch = "blend-persistent-generic-msg", package = "nomos-blend-message" } futures = "0.3.31" rand_chacha = "0.3" multiaddr = "0.18" @@ -27,4 +27,3 @@ sha2 = "0.10" uuid = { version = "1", features = ["fast-rng", "v4"] } tracing-appender = "0.2" cached = "0.54.0" -bincode = "1.3.3" diff --git a/simlib/blendnet-sims/config/blendnet.json b/simlib/blendnet-sims/config/blendnet.json index 21a4835..90331c7 100644 --- a/simlib/blendnet-sims/config/blendnet.json +++ b/simlib/blendnet-sims/config/blendnet.json @@ -80,25 +80,25 @@ "node_settings": { "timeout": "1000ms" }, - "step_time": "20ms", + "step_time": "10ms", "runner_settings": "Sync", "stream_settings": { "path": "test.json" }, - "node_count": 10, + "node_count": 100, "seed": 0, "record_settings": {}, "wards": [ { - "sum": 10 + "sum": 100 } ], "connected_peers_count": 4, "data_message_lottery_interval": "20s", "stake_proportion": 1.0, - "epoch_duration": "20s", + "epoch_duration": "200s", "slot_duration": "1s", - "slots_per_epoch": 20, + "slots_per_epoch": 200, "number_of_hops": 2, "persistent_transmission": { "max_emission_frequency": 1.0, diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 3ebbb9d..ce8a149 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -10,6 +10,7 @@ use crate::node::blend::{BlendMessage, BlendnodeSettings}; use anyhow::Ok; use clap::Parser; use crossbeam::channel; +use multiaddr::Multiaddr; use netrunner::network::behaviour::create_behaviours; use netrunner::network::regions::{create_regions, RegionsData}; use netrunner::network::{InMemoryNetworkInterface, Network, PayloadSize}; @@ -139,7 +140,14 @@ impl SimulationApp { slots_per_epoch: settings.slots_per_epoch, network_size: node_ids.len(), }, - membership: node_ids.iter().map(|&id| id.into()).collect(), + membership: node_ids + .iter() + .map(|&id| nomos_blend::membership::Node { + id, + address: Multiaddr::empty(), + public_key: id.into(), + }) + .collect(), }, ) }) diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index c43685d..8e98097 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -14,9 +14,8 @@ use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; use message::{Payload, PayloadId}; -use multiaddr::Multiaddr; use netrunner::network::NetworkMessage; -use netrunner::node::{Node, NodeId, NodeIdExt}; +use netrunner::node::{serialize_node_id_as_index, Node, NodeId, NodeIdExt}; use netrunner::{ network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, warding::WardCondition, @@ -42,19 +41,30 @@ use std::ops::Mul; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct BlendMessage { message: Vec, - history: Vec, + history: Vec, } impl BlendMessage { - pub fn new(message: Vec, node_id: NodeId, step_id: usize) -> Self { + fn new(message: Vec, node_id: NodeId, step_id: usize) -> Self { Self { message, - history: vec![MessageHistoryEvent::Created { node_id, step_id }], + history: vec![MessageEvent::Created { node_id, step_id }], } } + + fn new_drop() -> Self { + Self { + message: Vec::new(), + history: vec![], + } + } + + fn is_drop(&self) -> bool { + self.message.is_empty() + } } impl PayloadSize for BlendMessage { @@ -63,60 +73,60 @@ impl PayloadSize for BlendMessage { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -enum MessageHistoryEvent { +#[derive(Debug, Clone, Serialize)] +enum MessageEvent { Created { - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] node_id: NodeId, step_id: usize, }, PersistentTransmissionScheduled { - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] node_id: NodeId, step_id: usize, index: usize, }, PersistentTransmissionReleased { - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] node_id: NodeId, step_id: usize, - #[serde(with = "duration_ms_serde")] + #[serde(serialize_with = "serialize_duration_as_millis")] duration: Duration, }, TemporalProcessorScheduled { - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] node_id: NodeId, step_id: usize, index: usize, }, TemporalProcessorReleased { - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] node_id: NodeId, step_id: usize, - #[serde(with = "duration_ms_serde")] + #[serde(serialize_with = "serialize_duration_as_millis")] duration: Duration, }, NetworkSent { - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] from_node_id: NodeId, - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] to_node_id: NodeId, step_id: usize, }, NetworkReceived { - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] from_node_id: NodeId, - #[serde(with = "node_id_serde")] + #[serde(serialize_with = "serialize_node_id_as_index")] to_node_id: NodeId, step_id: usize, - #[serde(with = "duration_ms_serde")] + #[serde(serialize_with = "serialize_duration_as_millis")] latency: Duration, }, } struct BlendOutgoingMessageWithHistory { outgoing_message: BlendOutgoingMessage, - history: Vec, + history: Vec, } #[derive(Deserialize)] @@ -130,7 +140,12 @@ pub struct BlendnodeSettings { pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, pub cover_traffic_settings: CoverTrafficSettings, - pub membership: Vec<::PublicKey>, + pub membership: Vec< + nomos_blend::membership::Node< + NodeId, + ::PublicKey, + >, + >, } type Sha256Hash = [u8; 32]; @@ -148,16 +163,12 @@ pub struct BlendNode { data_msg_lottery_interval: Interval, data_msg_lottery: StakeLottery, - persistent_sender: channel::Sender>, + persistent_sender: channel::Sender, persistent_update_time_sender: channel::Sender, - persistent_transmission_messages: PersistentTransmissionStream< - CrossbeamReceiverStream>, - ChaCha12Rng, - MockBlendMessage, - Interval, - >, + persistent_transmission_messages: + PersistentTransmissionStream, ChaCha12Rng, Interval>, - crypto_processor: CryptographicProcessor, + crypto_processor: CryptographicProcessor, temporal_sender: channel::Sender, temporal_update_time_sender: channel::Sender, temporal_processor_messages: @@ -190,7 +201,7 @@ impl BlendNode { ); // Init Tier-1: Persistent transmission - let (persistent_sender, persistent_receiver) = channel::unbounded(); + let (persistent_sender, persistent_receiver) = channel::unbounded::(); let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) .persistent_transmission( @@ -202,22 +213,12 @@ impl BlendNode { ), persistent_update_time_receiver, ), + BlendMessage::new_drop(), ); // Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor - let nodes: Vec< - nomos_blend::membership::Node< - ::PublicKey, - >, - > = settings - .membership - .iter() - .map(|&public_key| nomos_blend::membership::Node { - address: Multiaddr::empty(), - public_key, - }) - .collect(); - let membership = Membership::::new(nodes, id.into()); + let membership = + Membership::::new(settings.membership.clone(), id.into()); let crypto_processor = CryptographicProcessor::new( settings.message_blend.cryptographic_processor.clone(), membership.clone(), @@ -287,7 +288,7 @@ impl BlendNode { .filter(|&id| Some(*id) != exclude_node) { let mut message = message.clone(); - message.history.push(MessageHistoryEvent::NetworkSent { + message.history.push(MessageEvent::NetworkSent { from_node_id: self.id, to_node_id: *node_id, step_id: self.state.step_id, @@ -320,14 +321,12 @@ impl BlendNode { fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) { message .history - .push(MessageHistoryEvent::PersistentTransmissionScheduled { + .push(MessageEvent::PersistentTransmissionScheduled { node_id: self.id, step_id: self.state.step_id, index: self.state.cur_num_persistent_transmission_scheduled, }); - self.persistent_sender - .send(bincode::serialize(&message).unwrap()) - .unwrap(); + self.persistent_sender.send(message).unwrap(); self.state.cur_num_persistent_transmission_scheduled += 1; } @@ -354,7 +353,7 @@ impl BlendNode { fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) { message .history - .push(MessageHistoryEvent::TemporalProcessorScheduled { + .push(MessageEvent::TemporalProcessorScheduled { node_id: self.id, step_id: self.state.step_id, index: self.state.cur_num_temporal_processor_scheduled, @@ -373,7 +372,7 @@ impl BlendNode { self.slot_update_sender.send(elapsed).unwrap(); } - fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec) { + fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec) { log!( "MessageFullyUnwrapped", MessageWithHistoryLog { @@ -430,7 +429,7 @@ impl Node for BlendNode { // Handle incoming messages for mut network_message in self.receive() { match network_message.payload.history.last().unwrap() { - MessageHistoryEvent::NetworkSent { + MessageEvent::NetworkSent { to_node_id, step_id, .. @@ -439,7 +438,7 @@ impl Node for BlendNode { network_message .payload .history - .push(MessageHistoryEvent::NetworkReceived { + .push(MessageEvent::NetworkReceived { from_node_id: network_message.from, to_node_id: self.id, step_id: self.state.step_id, @@ -449,6 +448,10 @@ impl Node for BlendNode { event => panic!("Unexpected message history event: {:?}", event), } + if network_message.payload().is_drop() { + continue; + } + self.forward( network_message.payload().clone(), Some(network_message.from), @@ -462,17 +465,17 @@ impl Node for BlendNode { { // Add a TemporalProcessorReleased history event match outgoing_msg_with_route.history.last().unwrap() { - MessageHistoryEvent::TemporalProcessorScheduled { + MessageEvent::TemporalProcessorScheduled { node_id, step_id, .. } => { assert_eq!(*node_id, self.id); - outgoing_msg_with_route.history.push( - MessageHistoryEvent::TemporalProcessorReleased { + outgoing_msg_with_route + .history + .push(MessageEvent::TemporalProcessorReleased { node_id: self.id, step_id: self.state.step_id, duration: self.duration_between(*step_id, self.state.step_id), - }, - ); + }); self.state.cur_num_temporal_processor_scheduled -= 1; } event => panic!("Unexpected message history event: {:?}", event), @@ -509,18 +512,17 @@ impl Node for BlendNode { } // Proceed persistent transmission - if let Poll::Ready(Some(msg)) = + if let Poll::Ready(Some(mut msg)) = pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) { - let mut msg: BlendMessage = bincode::deserialize(&msg).unwrap(); // Add a PersistentTransmissionReleased history event match msg.history.last().unwrap() { - MessageHistoryEvent::PersistentTransmissionScheduled { + MessageEvent::PersistentTransmissionScheduled { node_id, step_id, .. } => { assert_eq!(*node_id, self.id); msg.history - .push(MessageHistoryEvent::PersistentTransmissionReleased { + .push(MessageEvent::PersistentTransmissionReleased { node_id: self.id, step_id: self.state.step_id, duration: self.duration_between(*step_id, self.state.step_id), @@ -556,47 +558,12 @@ struct MessageLog { #[derive(Debug, Serialize)] struct MessageWithHistoryLog { message: MessageLog, - history: Vec, + history: Vec, } -mod node_id_serde { - use super::NodeId; - use netrunner::node::NodeIdExt; - use serde::{Deserialize, Deserializer, Serializer}; - - pub fn serialize(node_id: &NodeId, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_u64(node_id.index() as u64) - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let index = u64::deserialize(deserializer)?; - Ok(NodeId::from_index(index as usize)) - } -} - -mod duration_ms_serde { - use std::time::Duration; - - use serde::{Deserialize, Deserializer, Serializer}; - - pub fn serialize(duration: &Duration, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_u64(duration.as_millis() as u64) - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let millis = u64::deserialize(deserializer)?; - Ok(Duration::from_millis(millis)) - } +pub fn serialize_duration_as_millis(duration: &Duration, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_u64(duration.as_millis().try_into().unwrap()) }