use generic msg for persistent transmission

This commit is contained in:
Youngjoon Lee 2025-02-05 19:17:55 +09:00
parent 2a8fd1b27d
commit 2ec3834fbb
No known key found for this signature in database
GPG Key ID: D94003D91DE12141
4 changed files with 87 additions and 113 deletions

View File

@ -17,9 +17,9 @@ serde_json = "1.0.132"
tracing = "0.1.40" tracing = "0.1.40"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] } tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] }
netrunner = { path = "../netrunner" } netrunner = { path = "../netrunner" }
nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" } nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git", branch = "blend-persistent-generic-msg" }
nomos-blend = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend" } nomos-blend = { git = "https://github.com/logos-co/nomos-node", branch = "blend-persistent-generic-msg", package = "nomos-blend" }
nomos-blend-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend-message" } nomos-blend-message = { git = "https://github.com/logos-co/nomos-node", branch = "blend-persistent-generic-msg", package = "nomos-blend-message" }
futures = "0.3.31" futures = "0.3.31"
rand_chacha = "0.3" rand_chacha = "0.3"
multiaddr = "0.18" multiaddr = "0.18"
@ -27,4 +27,3 @@ sha2 = "0.10"
uuid = { version = "1", features = ["fast-rng", "v4"] } uuid = { version = "1", features = ["fast-rng", "v4"] }
tracing-appender = "0.2" tracing-appender = "0.2"
cached = "0.54.0" cached = "0.54.0"
bincode = "1.3.3"

View File

@ -80,25 +80,25 @@
"node_settings": { "node_settings": {
"timeout": "1000ms" "timeout": "1000ms"
}, },
"step_time": "20ms", "step_time": "10ms",
"runner_settings": "Sync", "runner_settings": "Sync",
"stream_settings": { "stream_settings": {
"path": "test.json" "path": "test.json"
}, },
"node_count": 10, "node_count": 100,
"seed": 0, "seed": 0,
"record_settings": {}, "record_settings": {},
"wards": [ "wards": [
{ {
"sum": 10 "sum": 100
} }
], ],
"connected_peers_count": 4, "connected_peers_count": 4,
"data_message_lottery_interval": "20s", "data_message_lottery_interval": "20s",
"stake_proportion": 1.0, "stake_proportion": 1.0,
"epoch_duration": "20s", "epoch_duration": "200s",
"slot_duration": "1s", "slot_duration": "1s",
"slots_per_epoch": 20, "slots_per_epoch": 200,
"number_of_hops": 2, "number_of_hops": 2,
"persistent_transmission": { "persistent_transmission": {
"max_emission_frequency": 1.0, "max_emission_frequency": 1.0,

View File

@ -10,6 +10,7 @@ use crate::node::blend::{BlendMessage, BlendnodeSettings};
use anyhow::Ok; use anyhow::Ok;
use clap::Parser; use clap::Parser;
use crossbeam::channel; use crossbeam::channel;
use multiaddr::Multiaddr;
use netrunner::network::behaviour::create_behaviours; use netrunner::network::behaviour::create_behaviours;
use netrunner::network::regions::{create_regions, RegionsData}; use netrunner::network::regions::{create_regions, RegionsData};
use netrunner::network::{InMemoryNetworkInterface, Network, PayloadSize}; use netrunner::network::{InMemoryNetworkInterface, Network, PayloadSize};
@ -139,7 +140,14 @@ impl SimulationApp {
slots_per_epoch: settings.slots_per_epoch, slots_per_epoch: settings.slots_per_epoch,
network_size: node_ids.len(), network_size: node_ids.len(),
}, },
membership: node_ids.iter().map(|&id| id.into()).collect(), membership: node_ids
.iter()
.map(|&id| nomos_blend::membership::Node {
id,
address: Multiaddr::empty(),
public_key: id.into(),
})
.collect(),
}, },
) )
}) })

View File

@ -14,9 +14,8 @@ use crossbeam::channel;
use futures::Stream; use futures::Stream;
use lottery::StakeLottery; use lottery::StakeLottery;
use message::{Payload, PayloadId}; use message::{Payload, PayloadId};
use multiaddr::Multiaddr;
use netrunner::network::NetworkMessage; use netrunner::network::NetworkMessage;
use netrunner::node::{Node, NodeId, NodeIdExt}; use netrunner::node::{serialize_node_id_as_index, Node, NodeId, NodeIdExt};
use netrunner::{ use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition, warding::WardCondition,
@ -42,19 +41,30 @@ use std::ops::Mul;
use std::{pin::pin, task::Poll, time::Duration}; use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream; use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize)]
pub struct BlendMessage { pub struct BlendMessage {
message: Vec<u8>, message: Vec<u8>,
history: Vec<MessageHistoryEvent>, history: Vec<MessageEvent>,
} }
impl BlendMessage { impl BlendMessage {
pub fn new(message: Vec<u8>, node_id: NodeId, step_id: usize) -> Self { fn new(message: Vec<u8>, node_id: NodeId, step_id: usize) -> Self {
Self { Self {
message, message,
history: vec![MessageHistoryEvent::Created { node_id, step_id }], history: vec![MessageEvent::Created { node_id, step_id }],
} }
} }
fn new_drop() -> Self {
Self {
message: Vec::new(),
history: vec![],
}
}
fn is_drop(&self) -> bool {
self.message.is_empty()
}
} }
impl PayloadSize for BlendMessage { impl PayloadSize for BlendMessage {
@ -63,60 +73,60 @@ impl PayloadSize for BlendMessage {
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize)]
enum MessageHistoryEvent { enum MessageEvent {
Created { Created {
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId, node_id: NodeId,
step_id: usize, step_id: usize,
}, },
PersistentTransmissionScheduled { PersistentTransmissionScheduled {
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId, node_id: NodeId,
step_id: usize, step_id: usize,
index: usize, index: usize,
}, },
PersistentTransmissionReleased { PersistentTransmissionReleased {
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId, node_id: NodeId,
step_id: usize, step_id: usize,
#[serde(with = "duration_ms_serde")] #[serde(serialize_with = "serialize_duration_as_millis")]
duration: Duration, duration: Duration,
}, },
TemporalProcessorScheduled { TemporalProcessorScheduled {
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId, node_id: NodeId,
step_id: usize, step_id: usize,
index: usize, index: usize,
}, },
TemporalProcessorReleased { TemporalProcessorReleased {
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId, node_id: NodeId,
step_id: usize, step_id: usize,
#[serde(with = "duration_ms_serde")] #[serde(serialize_with = "serialize_duration_as_millis")]
duration: Duration, duration: Duration,
}, },
NetworkSent { NetworkSent {
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
from_node_id: NodeId, from_node_id: NodeId,
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
to_node_id: NodeId, to_node_id: NodeId,
step_id: usize, step_id: usize,
}, },
NetworkReceived { NetworkReceived {
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
from_node_id: NodeId, from_node_id: NodeId,
#[serde(with = "node_id_serde")] #[serde(serialize_with = "serialize_node_id_as_index")]
to_node_id: NodeId, to_node_id: NodeId,
step_id: usize, step_id: usize,
#[serde(with = "duration_ms_serde")] #[serde(serialize_with = "serialize_duration_as_millis")]
latency: Duration, latency: Duration,
}, },
} }
struct BlendOutgoingMessageWithHistory { struct BlendOutgoingMessageWithHistory {
outgoing_message: BlendOutgoingMessage, outgoing_message: BlendOutgoingMessage,
history: Vec<MessageHistoryEvent>, history: Vec<MessageEvent>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -130,7 +140,12 @@ pub struct BlendnodeSettings {
pub persistent_transmission: PersistentTransmissionSettings, pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings<MockBlendMessage>, pub message_blend: MessageBlendSettings<MockBlendMessage>,
pub cover_traffic_settings: CoverTrafficSettings, pub cover_traffic_settings: CoverTrafficSettings,
pub membership: Vec<<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey>, pub membership: Vec<
nomos_blend::membership::Node<
NodeId,
<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
>,
>,
} }
type Sha256Hash = [u8; 32]; type Sha256Hash = [u8; 32];
@ -148,16 +163,12 @@ pub struct BlendNode {
data_msg_lottery_interval: Interval, data_msg_lottery_interval: Interval,
data_msg_lottery: StakeLottery<ChaCha12Rng>, data_msg_lottery: StakeLottery<ChaCha12Rng>,
persistent_sender: channel::Sender<Vec<u8>>, persistent_sender: channel::Sender<BlendMessage>,
persistent_update_time_sender: channel::Sender<Duration>, persistent_update_time_sender: channel::Sender<Duration>,
persistent_transmission_messages: PersistentTransmissionStream< persistent_transmission_messages:
CrossbeamReceiverStream<Vec<u8>>, PersistentTransmissionStream<CrossbeamReceiverStream<BlendMessage>, ChaCha12Rng, Interval>,
ChaCha12Rng,
MockBlendMessage,
Interval,
>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockBlendMessage>, crypto_processor: CryptographicProcessor<NodeId, ChaCha12Rng, MockBlendMessage>,
temporal_sender: channel::Sender<BlendOutgoingMessageWithHistory>, temporal_sender: channel::Sender<BlendOutgoingMessageWithHistory>,
temporal_update_time_sender: channel::Sender<Duration>, temporal_update_time_sender: channel::Sender<Duration>,
temporal_processor_messages: temporal_processor_messages:
@ -190,7 +201,7 @@ impl BlendNode {
); );
// Init Tier-1: Persistent transmission // Init Tier-1: Persistent transmission
let (persistent_sender, persistent_receiver) = channel::unbounded(); let (persistent_sender, persistent_receiver) = channel::unbounded::<BlendMessage>();
let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded();
let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver)
.persistent_transmission( .persistent_transmission(
@ -202,22 +213,12 @@ impl BlendNode {
), ),
persistent_update_time_receiver, persistent_update_time_receiver,
), ),
BlendMessage::new_drop(),
); );
// Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor // Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor
let nodes: Vec< let membership =
nomos_blend::membership::Node< Membership::<NodeId, MockBlendMessage>::new(settings.membership.clone(), id.into());
<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
>,
> = settings
.membership
.iter()
.map(|&public_key| nomos_blend::membership::Node {
address: Multiaddr::empty(),
public_key,
})
.collect();
let membership = Membership::<MockBlendMessage>::new(nodes, id.into());
let crypto_processor = CryptographicProcessor::new( let crypto_processor = CryptographicProcessor::new(
settings.message_blend.cryptographic_processor.clone(), settings.message_blend.cryptographic_processor.clone(),
membership.clone(), membership.clone(),
@ -287,7 +288,7 @@ impl BlendNode {
.filter(|&id| Some(*id) != exclude_node) .filter(|&id| Some(*id) != exclude_node)
{ {
let mut message = message.clone(); let mut message = message.clone();
message.history.push(MessageHistoryEvent::NetworkSent { message.history.push(MessageEvent::NetworkSent {
from_node_id: self.id, from_node_id: self.id,
to_node_id: *node_id, to_node_id: *node_id,
step_id: self.state.step_id, step_id: self.state.step_id,
@ -320,14 +321,12 @@ impl BlendNode {
fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) { fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) {
message message
.history .history
.push(MessageHistoryEvent::PersistentTransmissionScheduled { .push(MessageEvent::PersistentTransmissionScheduled {
node_id: self.id, node_id: self.id,
step_id: self.state.step_id, step_id: self.state.step_id,
index: self.state.cur_num_persistent_transmission_scheduled, index: self.state.cur_num_persistent_transmission_scheduled,
}); });
self.persistent_sender self.persistent_sender.send(message).unwrap();
.send(bincode::serialize(&message).unwrap())
.unwrap();
self.state.cur_num_persistent_transmission_scheduled += 1; self.state.cur_num_persistent_transmission_scheduled += 1;
} }
@ -354,7 +353,7 @@ impl BlendNode {
fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) { fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) {
message message
.history .history
.push(MessageHistoryEvent::TemporalProcessorScheduled { .push(MessageEvent::TemporalProcessorScheduled {
node_id: self.id, node_id: self.id,
step_id: self.state.step_id, step_id: self.state.step_id,
index: self.state.cur_num_temporal_processor_scheduled, index: self.state.cur_num_temporal_processor_scheduled,
@ -373,7 +372,7 @@ impl BlendNode {
self.slot_update_sender.send(elapsed).unwrap(); self.slot_update_sender.send(elapsed).unwrap();
} }
fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec<MessageHistoryEvent>) { fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec<MessageEvent>) {
log!( log!(
"MessageFullyUnwrapped", "MessageFullyUnwrapped",
MessageWithHistoryLog { MessageWithHistoryLog {
@ -430,7 +429,7 @@ impl Node for BlendNode {
// Handle incoming messages // Handle incoming messages
for mut network_message in self.receive() { for mut network_message in self.receive() {
match network_message.payload.history.last().unwrap() { match network_message.payload.history.last().unwrap() {
MessageHistoryEvent::NetworkSent { MessageEvent::NetworkSent {
to_node_id, to_node_id,
step_id, step_id,
.. ..
@ -439,7 +438,7 @@ impl Node for BlendNode {
network_message network_message
.payload .payload
.history .history
.push(MessageHistoryEvent::NetworkReceived { .push(MessageEvent::NetworkReceived {
from_node_id: network_message.from, from_node_id: network_message.from,
to_node_id: self.id, to_node_id: self.id,
step_id: self.state.step_id, step_id: self.state.step_id,
@ -449,6 +448,10 @@ impl Node for BlendNode {
event => panic!("Unexpected message history event: {:?}", event), event => panic!("Unexpected message history event: {:?}", event),
} }
if network_message.payload().is_drop() {
continue;
}
self.forward( self.forward(
network_message.payload().clone(), network_message.payload().clone(),
Some(network_message.from), Some(network_message.from),
@ -462,17 +465,17 @@ impl Node for BlendNode {
{ {
// Add a TemporalProcessorReleased history event // Add a TemporalProcessorReleased history event
match outgoing_msg_with_route.history.last().unwrap() { match outgoing_msg_with_route.history.last().unwrap() {
MessageHistoryEvent::TemporalProcessorScheduled { MessageEvent::TemporalProcessorScheduled {
node_id, step_id, .. node_id, step_id, ..
} => { } => {
assert_eq!(*node_id, self.id); assert_eq!(*node_id, self.id);
outgoing_msg_with_route.history.push( outgoing_msg_with_route
MessageHistoryEvent::TemporalProcessorReleased { .history
.push(MessageEvent::TemporalProcessorReleased {
node_id: self.id, node_id: self.id,
step_id: self.state.step_id, step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id), duration: self.duration_between(*step_id, self.state.step_id),
}, });
);
self.state.cur_num_temporal_processor_scheduled -= 1; self.state.cur_num_temporal_processor_scheduled -= 1;
} }
event => panic!("Unexpected message history event: {:?}", event), event => panic!("Unexpected message history event: {:?}", event),
@ -509,18 +512,17 @@ impl Node for BlendNode {
} }
// Proceed persistent transmission // Proceed persistent transmission
if let Poll::Ready(Some(msg)) = if let Poll::Ready(Some(mut msg)) =
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{ {
let mut msg: BlendMessage = bincode::deserialize(&msg).unwrap();
// Add a PersistentTransmissionReleased history event // Add a PersistentTransmissionReleased history event
match msg.history.last().unwrap() { match msg.history.last().unwrap() {
MessageHistoryEvent::PersistentTransmissionScheduled { MessageEvent::PersistentTransmissionScheduled {
node_id, step_id, .. node_id, step_id, ..
} => { } => {
assert_eq!(*node_id, self.id); assert_eq!(*node_id, self.id);
msg.history msg.history
.push(MessageHistoryEvent::PersistentTransmissionReleased { .push(MessageEvent::PersistentTransmissionReleased {
node_id: self.id, node_id: self.id,
step_id: self.state.step_id, step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id), duration: self.duration_between(*step_id, self.state.step_id),
@ -556,47 +558,12 @@ struct MessageLog {
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
struct MessageWithHistoryLog { struct MessageWithHistoryLog {
message: MessageLog, message: MessageLog,
history: Vec<MessageHistoryEvent>, history: Vec<MessageEvent>,
} }
mod node_id_serde { pub fn serialize_duration_as_millis<S>(duration: &Duration, s: S) -> Result<S::Ok, S::Error>
use super::NodeId;
use netrunner::node::NodeIdExt;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(node_id: &NodeId, serializer: S) -> Result<S::Ok, S::Error>
where where
S: Serializer, S: serde::Serializer,
{ {
serializer.serialize_u64(node_id.index() as u64) s.serialize_u64(duration.as_millis().try_into().unwrap())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<NodeId, D::Error>
where
D: Deserializer<'de>,
{
let index = u64::deserialize(deserializer)?;
Ok(NodeId::from_index(index as usize))
}
}
mod duration_ms_serde {
use std::time::Duration;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(duration.as_millis() as u64)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let millis = u64::deserialize(deserializer)?;
Ok(Duration::from_millis(millis))
}
} }