commands for latency and history analysis

This commit is contained in:
Youngjoon Lee 2025-02-08 10:20:53 +09:00
parent fef2b627ef
commit c9fe4994c7
No known key found for this signature in database
GPG Key ID: D94003D91DE12141
8 changed files with 505 additions and 240 deletions

View File

@ -27,3 +27,5 @@ sha2 = "0.10"
uuid = { version = "1", features = ["fast-rng", "v4"] }
tracing-appender = "0.2"
cached = "0.54.0"
polars = "0.46.0"
humantime-serde = "1.1.1"

View File

@ -0,0 +1,136 @@
use std::{
error::Error,
fs::File,
io::{BufRead, BufReader},
ops::{Add, Mul},
path::PathBuf,
time::Duration,
};
use netrunner::node::NodeId;
use serde::{Deserialize, Serialize};
use crate::node::blend::{
log::TopicLog,
message::{MessageEvent, MessageEventType, PayloadId},
};
pub fn analyze_message_history(
log_file: PathBuf,
step_duration: Duration,
payload_id: PayloadId,
) -> Result<(), Box<dyn Error>> {
let file = File::open(log_file)?;
let reader = BufReader::new(file);
let mut history = Vec::new();
let mut target_node_id: Option<NodeId> = None;
let mut target_event: Option<MessageEventType> = None;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
for line in lines.iter().rev() {
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(line) {
assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message;
if event.payload_id == payload_id
&& (target_node_id.is_none() || target_node_id.unwrap() == event.node_id)
&& (target_event.is_none() || target_event.as_ref().unwrap() == &event.event_type)
{
match event.event_type {
MessageEventType::FullyUnwrapped => {
assert!(history.is_empty());
assert!(target_node_id.is_none());
target_node_id = Some(event.node_id);
history.push(event);
}
MessageEventType::Created => {
assert!(!history.is_empty());
assert!(target_node_id.is_some());
history.push(event);
}
MessageEventType::PersistentTransmissionScheduled { .. } => {
assert!(target_node_id.is_some());
assert!(matches!(
history.last().unwrap().event_type,
MessageEventType::PersistentTransmissionReleased { .. }
));
history.push(event);
}
MessageEventType::PersistentTransmissionReleased => {
assert!(target_node_id.is_some());
history.push(event);
}
MessageEventType::TemporalProcessorScheduled { .. } => {
assert!(target_node_id.is_some());
assert!(matches!(
history.last().unwrap().event_type,
MessageEventType::TemporalProcessorReleased { .. }
));
history.push(event);
}
MessageEventType::TemporalProcessorReleased => {
assert!(target_node_id.is_some());
history.push(event);
}
MessageEventType::NetworkReceived { from } => {
assert!(!history.is_empty());
assert!(target_node_id.is_some());
assert_ne!(target_node_id.unwrap(), from);
target_node_id = Some(from);
target_event = Some(MessageEventType::NetworkSent { to: event.node_id });
history.push(event);
}
MessageEventType::NetworkSent { .. } => {
assert!(!history.is_empty());
assert!(target_node_id.is_some());
if target_event.is_none()
|| target_event.as_ref().unwrap() != &event.event_type
{
continue;
}
target_event = None;
history.push(event);
}
}
}
}
}
let mut history_with_durations: Vec<MessageEventWithDuration> = Vec::new();
let (_, total_duration) = history.iter().rev().fold(
(None, Duration::ZERO),
|(prev_step_id, total_duration): (Option<usize>, Duration), event| {
let duration = match prev_step_id {
Some(prev_step_id) => {
step_duration.mul((event.step_id - prev_step_id).try_into().unwrap())
}
None => Duration::ZERO,
};
history_with_durations.push(MessageEventWithDuration {
event: event.clone(),
duration,
});
(Some(event.step_id), total_duration.add(duration))
},
);
let output = Output {
history: history_with_durations,
total_duration,
};
println!("{}", serde_json::to_string(&output).unwrap());
Ok(())
}
#[derive(Serialize, Deserialize)]
struct Output {
history: Vec<MessageEventWithDuration>,
#[serde(with = "humantime_serde")]
total_duration: Duration,
}
#[derive(Serialize, Deserialize)]
struct MessageEventWithDuration {
event: MessageEvent,
#[serde(with = "humantime_serde")]
duration: Duration,
}

View File

@ -0,0 +1,109 @@
use core::panic;
use std::{error::Error, ops::Mul, path::PathBuf, time::Duration};
use std::{
collections::HashMap,
fs::File,
io::{BufRead, BufReader},
};
use polars::prelude::{AnyValue, NamedFrom, QuantileMethod, Scalar};
use polars::series::Series;
use serde::{Deserialize, Serialize};
use crate::node::blend::log::TopicLog;
use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId};
pub fn analyze_latency(log_file: PathBuf, step_duration: Duration) -> Result<(), Box<dyn Error>> {
let file = File::open(log_file)?;
let reader = BufReader::new(file);
let mut messages: HashMap<PayloadId, usize> = HashMap::new();
let mut latencies_ms: Vec<i64> = Vec::new();
let mut latency_to_message: HashMap<i64, PayloadId> = HashMap::new();
for line in reader.lines() {
let line = line?;
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(&line) {
assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message;
match event.event_type {
MessageEventType::Created => {
assert_eq!(messages.insert(event.payload_id, event.step_id), None);
}
MessageEventType::FullyUnwrapped => match messages.remove(&event.payload_id) {
Some(created_step_id) => {
let latency = step_duration
.mul((event.step_id - created_step_id).try_into().unwrap())
.as_millis()
.try_into()
.unwrap();
latencies_ms.push(latency);
latency_to_message.insert(latency, event.payload_id);
}
None => {
panic!(
"FullyUnwrapped event without Created event: {}",
event.payload_id
);
}
},
_ => {
continue;
}
}
}
}
let series = Series::new("latencies".into(), latencies_ms);
let series = Output::new(&series, &latency_to_message);
println!("{}", serde_json::to_string(&series).unwrap());
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
struct Output {
min: i64,
min_payload_id: PayloadId,
q1: f64,
avg: f64,
med: f64,
q3: f64,
max: i64,
max_payload_id: PayloadId,
}
impl Output {
fn new(series: &Series, latency_to_message: &HashMap<i64, PayloadId>) -> Self {
let min = series.min::<i64>().unwrap().unwrap();
let min_payload_id = latency_to_message.get(&min).unwrap().clone();
let max = series.max::<i64>().unwrap().unwrap();
let max_payload_id = latency_to_message.get(&max).unwrap().clone();
Self {
min,
min_payload_id,
q1: quantile(series, 0.25),
avg: series.mean().unwrap(),
med: series.median().unwrap(),
q3: quantile(series, 0.75),
max,
max_payload_id,
}
}
}
fn quantile(series: &Series, quantile: f64) -> f64 {
f64_from_scalar(
&series
.quantile_reduce(quantile, QuantileMethod::Linear)
.unwrap(),
)
}
fn f64_from_scalar(scalar: &Scalar) -> f64 {
match scalar.value() {
AnyValue::Float64(value) => *value,
_ => panic!("Expected f64"),
}
}

View File

@ -0,0 +1,2 @@
pub mod history;
pub mod latency;

View File

@ -6,9 +6,11 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates
use crate::node::blend::state::{BlendnodeRecord, BlendnodeState};
use crate::node::blend::{BlendMessage, BlendnodeSettings};
use crate::node::blend::{BlendnodeSettings, SimMessage};
use analysis::history::analyze_message_history;
use analysis::latency::analyze_latency;
use anyhow::Ok;
use clap::Parser;
use clap::{Parser, Subcommand};
use crossbeam::channel;
use multiaddr::Multiaddr;
use netrunner::network::behaviour::create_behaviours;
@ -18,6 +20,7 @@ use netrunner::node::{NodeId, NodeIdExt};
use netrunner::output_processors::Record;
use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
use node::blend::message::PayloadId;
use node::blend::topology::Topology;
use nomos_blend::cover_traffic::CoverTrafficSettings;
use nomos_blend::message_blend::{
@ -34,10 +37,36 @@ use crate::node::blend::BlendNode;
use crate::settings::SimSettings;
use netrunner::{runner::SimulationRunner, settings::SimulationSettings};
pub mod analysis;
mod log;
mod node;
mod settings;
#[derive(Parser)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Run the simulation
Run(SimulationApp),
/// Analyze the simulation results
Analyze {
#[command(subcommand)]
command: AnalyzeCommands,
},
}
#[derive(Subcommand)]
enum AnalyzeCommands {
/// Analyze the latency of the messages fully unwrapped
Latency(AnalyzeLatencyApp),
/// Analyze the history of a message
MessageHistory(MessageHistoryApp),
}
/// Main simulation wrapper
/// Pipes together the cli arguments with the execution
#[derive(Parser)]
@ -90,15 +119,14 @@ impl SimulationApp {
"Regions",
regions
.iter()
.map(|(region, node_ids)| (region, node_ids.len()))
.map(|(region, node_ids)| (*region, node_ids.len()))
.collect::<HashMap<_, _>>()
);
log!("NumRegions", regions.len());
let behaviours = create_behaviours(&settings.simulation_settings.network_settings);
let regions_data = RegionsData::new(regions, behaviours);
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(
let network = Arc::new(Mutex::new(Network::<SimMessage>::new(
regions_data.clone(),
seed,
)));
@ -162,7 +190,7 @@ impl SimulationApp {
fn create_boxed_blendnode(
node_id: NodeId,
network: &mut Network<BlendMessage>,
network: &mut Network<SimMessage>,
simulation_settings: SimulationSettings,
no_netcap: bool,
blendnode_settings: BlendnodeSettings,
@ -197,7 +225,6 @@ fn create_boxed_blendnode(
Box::new(BlendNode::new(
node_id,
blendnode_settings,
simulation_settings.step_time,
network_interface,
))
}
@ -302,14 +329,54 @@ struct ConnLatencyDistributionLog {
distribution: HashMap<u128, usize>,
}
fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse();
let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics);
if let Err(e) = app.run() {
tracing::error!("error: {}", e);
drop(maybe_guard);
std::process::exit(1);
}
Ok(())
#[derive(Parser)]
struct AnalyzeLatencyApp {
#[clap(long, short)]
log_file: PathBuf,
#[clap(long, short, value_parser = humantime::parse_duration)]
step_duration: Duration,
}
#[derive(Parser)]
struct MessageHistoryApp {
#[clap(long, short)]
log_file: PathBuf,
#[clap(long, short, value_parser = humantime::parse_duration)]
step_duration: Duration,
#[clap(long, short)]
payload_id: PayloadId,
}
fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
match cli.command {
Commands::Run(app) => {
let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics);
if let Err(e) = app.run() {
tracing::error!("error: {}", e);
drop(maybe_guard);
std::process::exit(1);
}
Ok(())
}
Commands::Analyze { command } => match command {
AnalyzeCommands::Latency(app) => {
if let Err(e) = analyze_latency(app.log_file, app.step_duration) {
tracing::error!("error: {}", e);
std::process::exit(1);
}
Ok(())
}
AnalyzeCommands::MessageHistory(app) => {
if let Err(e) =
analyze_message_history(app.log_file, app.step_duration, app.payload_id)
{
tracing::error!("error: {}", e);
std::process::exit(1);
}
Ok(())
}
},
}
}

View File

@ -1,9 +1,9 @@
use serde::Serialize;
use serde::{Deserialize, Serialize};
#[macro_export]
macro_rules! log {
($topic:expr, $msg:expr) => {
tracing::info!(
println!(
"{}",
serde_json::to_string(&$crate::node::blend::log::TopicLog {
topic: $topic.to_string(),
@ -14,8 +14,11 @@ macro_rules! log {
};
}
#[derive(Serialize)]
pub struct TopicLog<M: Serialize> {
#[derive(Serialize, Deserialize)]
pub struct TopicLog<M>
where
M: 'static,
{
pub topic: String,
pub message: M,
}

View File

@ -1,7 +1,5 @@
use std::{ops::Mul, time::Duration};
use netrunner::node::serialize_node_id_as_index;
use netrunner::node::NodeId;
use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;
@ -28,52 +26,16 @@ impl Payload {
}
}
#[derive(Debug, Clone, Serialize)]
pub struct MessageHistory(Vec<MessageEvent>);
impl MessageHistory {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn add(
&mut self,
node_id: NodeId,
step_id: usize,
step_time: Duration,
event_type: MessageEventType,
) {
let duration_from_prev = self.0.last().map_or(Duration::ZERO, |prev_event| {
step_time.mul((step_id - prev_event.step_id).try_into().unwrap())
});
self.0.push(MessageEvent {
node_id,
step_id,
duration_from_prev,
event_type,
});
}
pub fn last_event_type(&self) -> Option<&MessageEventType> {
self.0.last().map(|event| &event.event_type)
}
pub fn total_duration(&self) -> Duration {
self.0.iter().map(|event| event.duration_from_prev).sum()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageEvent {
pub payload_id: PayloadId,
pub step_id: usize,
#[serde(with = "node_id_serde")]
pub node_id: NodeId,
pub event_type: MessageEventType,
}
#[derive(Debug, Clone, Serialize)]
struct MessageEvent {
#[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId,
step_id: usize,
#[serde(serialize_with = "duration_as_millis")]
duration_from_prev: Duration,
event_type: MessageEventType,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageEventType {
Created,
PersistentTransmissionScheduled {
@ -85,20 +47,35 @@ pub enum MessageEventType {
},
TemporalProcessorReleased,
NetworkSent {
#[serde(serialize_with = "serialize_node_id_as_index")]
#[serde(with = "node_id_serde")]
to: NodeId,
},
NetworkReceived {
#[serde(serialize_with = "serialize_node_id_as_index")]
#[serde(with = "node_id_serde")]
from: NodeId,
},
FullyUnwrapped,
}
pub fn duration_as_millis<S>(duration: &Duration, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_u64(duration.as_millis().try_into().unwrap())
mod node_id_serde {
use netrunner::node::{NodeId, NodeIdExt};
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(node_id: &NodeId, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(node_id.index().try_into().unwrap())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<NodeId, D::Error>
where
D: Deserializer<'de>,
{
Ok(NodeId::from_index(
u64::deserialize(deserializer)?.try_into().unwrap(),
))
}
}
#[cfg(test)]

View File

@ -2,7 +2,7 @@ pub mod consensus_streams;
#[macro_use]
pub mod log;
pub mod lottery;
mod message;
pub mod message;
pub mod scheduler;
pub mod state;
pub mod stream_wrapper;
@ -13,9 +13,9 @@ use cached::{Cached, TimedCache};
use crossbeam::channel;
use futures::Stream;
use lottery::StakeLottery;
use message::{duration_as_millis, MessageEventType, MessageHistory, Payload, PayloadId};
use message::{MessageEvent, MessageEventType, Payload};
use netrunner::network::NetworkMessage;
use netrunner::node::{Node, NodeId, NodeIdExt};
use netrunner::node::{Node, NodeId};
use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition,
@ -31,41 +31,20 @@ use nomos_blend::{
BlendOutgoingMessage,
};
use nomos_blend_message::mock::MockBlendMessage;
use nomos_blend_message::BlendMessage as _;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalScheduler};
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use sha2::{Digest, Sha256};
use state::BlendnodeState;
use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone, Serialize)]
pub struct BlendMessage {
message: Vec<u8>,
history: MessageHistory,
}
#[derive(Debug, Clone)]
pub struct SimMessage(Vec<u8>);
impl BlendMessage {
fn new(message: Vec<u8>, node_id: NodeId, step_id: usize, step_time: Duration) -> Self {
let mut history = MessageHistory::new();
history.add(node_id, step_id, step_time, MessageEventType::Created);
Self { message, history }
}
fn new_drop() -> Self {
Self {
message: Vec::new(),
history: MessageHistory::new(),
}
}
fn is_drop(&self) -> bool {
self.message.is_empty()
}
}
impl PayloadSize for BlendMessage {
impl PayloadSize for SimMessage {
fn size_bytes(&self) -> u32 {
// payload: 32 KiB
// header encryption overhead: 133 bytes = 48 + 17 * max_blend_hops(=5)
@ -75,11 +54,6 @@ impl PayloadSize for BlendMessage {
}
}
struct BlendOutgoingMessageWithHistory {
outgoing_message: BlendOutgoingMessage,
history: MessageHistory,
}
#[derive(Deserialize)]
pub struct BlendnodeSettings {
pub connected_peers: Vec<NodeId>,
@ -106,24 +80,23 @@ pub struct BlendNode {
id: NodeId,
state: BlendnodeState,
settings: BlendnodeSettings,
step_time: Duration,
network_interface: InMemoryNetworkInterface<BlendMessage>,
network_interface: InMemoryNetworkInterface<SimMessage>,
message_cache: TimedCache<Sha256Hash, ()>,
data_msg_lottery_update_time_sender: channel::Sender<Duration>,
data_msg_lottery_interval: Interval,
data_msg_lottery: StakeLottery<ChaCha12Rng>,
persistent_sender: channel::Sender<BlendMessage>,
persistent_sender: channel::Sender<SimMessage>,
persistent_update_time_sender: channel::Sender<Duration>,
persistent_transmission_messages:
PersistentTransmissionStream<CrossbeamReceiverStream<BlendMessage>, ChaCha12Rng, Interval>,
PersistentTransmissionStream<CrossbeamReceiverStream<SimMessage>, ChaCha12Rng, Interval>,
crypto_processor: CryptographicProcessor<NodeId, ChaCha12Rng, MockBlendMessage>,
temporal_sender: channel::Sender<BlendOutgoingMessageWithHistory>,
temporal_sender: channel::Sender<BlendOutgoingMessage>,
temporal_update_time_sender: channel::Sender<Duration>,
temporal_processor_messages:
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessageWithHistory>, TemporalScheduler>,
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessage>, TemporalScheduler>,
epoch_update_sender: channel::Sender<Duration>,
slot_update_sender: channel::Sender<Duration>,
@ -134,8 +107,7 @@ impl BlendNode {
pub fn new(
id: NodeId,
settings: BlendnodeSettings,
step_time: Duration,
network_interface: InMemoryNetworkInterface<BlendMessage>,
network_interface: InMemoryNetworkInterface<SimMessage>,
) -> Self {
let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed);
@ -152,7 +124,7 @@ impl BlendNode {
);
// Init Tier-1: Persistent transmission
let (persistent_sender, persistent_receiver) = channel::unbounded::<BlendMessage>();
let (persistent_sender, persistent_receiver) = channel::unbounded::<SimMessage>();
let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded();
let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver)
.persistent_transmission(
@ -164,7 +136,7 @@ impl BlendNode {
),
persistent_update_time_receiver,
),
BlendMessage::new_drop(),
SimMessage(MockBlendMessage::DROP_MESSAGE.to_vec()),
);
// Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor
@ -202,7 +174,6 @@ impl BlendNode {
Self {
id,
step_time,
network_interface,
// We're not coupling this lifespan with the steps now, but it's okay
// We expected that a message will be delivered to most of nodes within 60s.
@ -231,29 +202,41 @@ impl BlendNode {
}
}
fn forward(&mut self, message: BlendMessage, exclude_node: Option<NodeId>) {
fn parse_payload(message: &[u8]) -> Payload {
Payload::load(MockBlendMessage::payload(message).unwrap())
}
fn forward(&mut self, message: SimMessage, exclude_node: Option<NodeId>) {
let payload_id = Self::parse_payload(&message.0).id();
for node_id in self
.settings
.connected_peers
.iter()
.filter(|&id| Some(*id) != exclude_node)
{
let mut message = message.clone();
self.record_network_sent_event(&mut message.history, *node_id);
self.network_interface.send_message(*node_id, message)
log!(
"MessageEvent",
MessageEvent {
payload_id: payload_id.clone(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::NetworkSent { to: *node_id }
}
);
self.network_interface
.send_message(*node_id, message.clone())
}
self.message_cache
.cache_set(Self::sha256(&message.message), ());
self.message_cache.cache_set(Self::sha256(&message.0), ());
}
fn receive(&mut self) -> Vec<NetworkMessage<BlendMessage>> {
fn receive(&mut self) -> Vec<NetworkMessage<SimMessage>> {
self.network_interface
.receive_messages()
.into_iter()
// Retain only messages that have not been seen before
.filter(|msg| {
self.message_cache
.cache_set(Self::sha256(&msg.payload().message), ())
.cache_set(Self::sha256(&msg.payload().0), ())
.is_none()
})
.collect()
@ -265,14 +248,24 @@ impl BlendNode {
hasher.finalize().into()
}
fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) {
self.record_persistent_scheduled_event(&mut message.history);
fn schedule_persistent_transmission(&mut self, message: SimMessage) {
log!(
"MessageEvent",
MessageEvent {
payload_id: Self::parse_payload(&message.0).id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::PersistentTransmissionScheduled {
index: self.state.cur_num_persistent_scheduled
}
}
);
self.persistent_sender.send(message).unwrap();
self.state.cur_num_persistent_scheduled += 1;
}
fn handle_incoming_message(&mut self, message: BlendMessage) {
match self.crypto_processor.unwrap_message(&message.message) {
fn handle_incoming_message(&mut self, message: SimMessage) {
match self.crypto_processor.unwrap_message(&message.0) {
Ok((unwrapped_message, fully_unwrapped)) => {
let temporal_message = if fully_unwrapped {
BlendOutgoingMessage::FullyUnwrapped(unwrapped_message)
@ -280,10 +273,7 @@ impl BlendNode {
BlendOutgoingMessage::Outbound(unwrapped_message)
};
self.schedule_temporal_processor(BlendOutgoingMessageWithHistory {
outgoing_message: temporal_message,
history: message.history,
});
self.schedule_temporal_processor(temporal_message);
}
Err(e) => {
tracing::debug!("Failed to unwrap message: {:?}", e);
@ -291,8 +281,22 @@ impl BlendNode {
}
}
fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) {
self.record_temporal_scheduled_event(&mut message.history);
fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessage) {
log!(
"MessageEvent",
MessageEvent {
payload_id: match &message {
BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg).id(),
BlendOutgoingMessage::FullyUnwrapped(payload) =>
Payload::load(payload.clone()).id(),
},
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::TemporalProcessorScheduled {
index: self.state.cur_num_temporal_scheduled
}
}
);
self.temporal_sender.send(message).unwrap();
self.state.cur_num_temporal_scheduled += 1;
}
@ -306,76 +310,6 @@ impl BlendNode {
self.epoch_update_sender.send(elapsed).unwrap();
self.slot_update_sender.send(elapsed).unwrap();
}
fn log_message_fully_unwrapped(&self, payload: &Payload, history: MessageHistory) {
let total_duration = history.total_duration();
log!(
"MessageFullyUnwrapped",
MessageWithHistoryLog {
message: MessageLog {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id.index(),
},
history,
total_duration,
}
);
}
fn new_blend_message(&self, message: Vec<u8>) -> BlendMessage {
BlendMessage::new(message, self.id, self.state.step_id, self.step_time)
}
fn record_network_sent_event(&self, history: &mut MessageHistory, to: NodeId) {
self.record_message_event(history, MessageEventType::NetworkSent { to });
}
fn record_network_received_event(&self, history: &mut MessageHistory, from: NodeId) {
assert_eq!(
history.last_event_type(),
Some(&MessageEventType::NetworkSent { to: self.id })
);
self.record_message_event(history, MessageEventType::NetworkReceived { from });
}
fn record_persistent_scheduled_event(&self, history: &mut MessageHistory) {
self.record_message_event(
history,
MessageEventType::PersistentTransmissionScheduled {
index: self.state.cur_num_persistent_scheduled,
},
);
}
fn record_persistent_released_event(&self, history: &mut MessageHistory) {
assert!(matches!(
history.last_event_type(),
Some(MessageEventType::PersistentTransmissionScheduled { .. })
));
self.record_message_event(history, MessageEventType::PersistentTransmissionReleased);
}
fn record_temporal_scheduled_event(&self, history: &mut MessageHistory) {
self.record_message_event(
history,
MessageEventType::TemporalProcessorScheduled {
index: self.state.cur_num_temporal_scheduled,
},
);
}
fn record_temporal_released_event(&self, history: &mut MessageHistory) {
assert!(matches!(
history.last_event_type(),
Some(MessageEventType::TemporalProcessorScheduled { .. })
));
self.record_message_event(history, MessageEventType::TemporalProcessorReleased);
}
fn record_message_event(&self, history: &mut MessageHistory, event_type: MessageEventType) {
history.add(self.id, self.state.step_id, self.step_time, event_type);
}
}
impl Node for BlendNode {
@ -404,18 +338,34 @@ impl Node for BlendNode {
.crypto_processor
.wrap_message(payload.as_bytes())
.unwrap();
self.schedule_persistent_transmission(self.new_blend_message(message));
log!(
"MessageEvent",
MessageEvent {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::Created
}
);
self.schedule_persistent_transmission(SimMessage(message));
}
}
// Handle incoming messages
for mut network_message in self.receive() {
self.record_network_received_event(
&mut network_message.payload.history,
network_message.from,
for network_message in self.receive() {
log!(
"MessageEvent",
MessageEvent {
payload_id: Self::parse_payload(&network_message.payload().0).id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::NetworkReceived {
from: network_message.from
}
}
);
if network_message.payload().is_drop() {
if MockBlendMessage::is_drop_message(&network_message.payload().0) {
continue;
}
@ -427,23 +377,40 @@ impl Node for BlendNode {
}
// Proceed temporal processor
if let Poll::Ready(Some(mut outgoing_msg_with_history)) =
if let Poll::Ready(Some(msg)) =
pin!(&mut self.temporal_processor_messages).poll_next(&mut cx)
{
self.record_temporal_released_event(&mut outgoing_msg_with_history.history);
log!(
"MessageEvent",
MessageEvent {
payload_id: match &msg {
BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg).id(),
BlendOutgoingMessage::FullyUnwrapped(payload) =>
Payload::load(payload.clone()).id(),
},
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::TemporalProcessorReleased
}
);
self.state.cur_num_temporal_scheduled -= 1;
// Proceed the message
match outgoing_msg_with_history.outgoing_message {
match msg {
BlendOutgoingMessage::Outbound(message) => {
self.schedule_persistent_transmission(BlendMessage {
message,
history: outgoing_msg_with_history.history,
});
self.schedule_persistent_transmission(SimMessage(message));
}
BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload);
self.log_message_fully_unwrapped(&payload, outgoing_msg_with_history.history);
log!(
"MessageEvent",
MessageEvent {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::FullyUnwrapped
}
);
self.state.num_messages_fully_unwrapped += 1;
}
}
@ -456,14 +423,31 @@ impl Node for BlendNode {
.crypto_processor
.wrap_message(payload.as_bytes())
.unwrap();
self.schedule_persistent_transmission(self.new_blend_message(message));
log!(
"MessageEvent",
MessageEvent {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::Created
}
);
self.schedule_persistent_transmission(SimMessage(message));
}
// Proceed persistent transmission
if let Poll::Ready(Some(mut msg)) =
if let Poll::Ready(Some(msg)) =
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{
self.record_persistent_released_event(&mut msg.history);
log!(
"MessageEvent",
MessageEvent {
payload_id: Self::parse_payload(&msg.0).id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::PersistentTransmissionReleased
}
);
self.state.cur_num_persistent_scheduled -= 1;
self.forward(msg, None);
}
@ -481,18 +465,3 @@ impl Node for BlendNode {
}
}
}
#[derive(Debug, Serialize)]
struct MessageLog {
payload_id: PayloadId,
step_id: usize,
node_id: usize,
}
#[derive(Debug, Serialize)]
struct MessageWithHistoryLog {
message: MessageLog,
history: MessageHistory,
#[serde(serialize_with = "duration_as_millis")]
total_duration: Duration,
}