analysis in Rust + update batch.py

This commit is contained in:
Youngjoon Lee 2025-02-06 17:26:12 +09:00
parent a42dc4b4bf
commit 965d6de967
No known key found for this signature in database
GPG Key ID: 708E758BC82F6092
22 changed files with 1235 additions and 414 deletions

View File

@ -27,3 +27,5 @@ sha2 = "0.10"
uuid = { version = "1", features = ["fast-rng", "v4"] }
tracing-appender = "0.2"
cached = "0.54.0"
polars = "0.46.0"
humantime-serde = "1.1.1"

View File

@ -1,49 +1,109 @@
{
"network_settings": {
"network_behaviors": {
"north america:north america": "50ms",
"north america:europe": "100ms",
"north america:asia": "120ms",
"north america west:north america west": "40ms",
"north america west:north america east": "70ms",
"north america west:north america central": "50ms",
"north america west:europe": "150ms",
"north america west:northern europe": "170ms",
"north america west:east asia": "180ms",
"north america west:southeast asia": "200ms",
"north america west:australia": "250ms",
"north america east:north america west": "70ms",
"north america east:north america east": "40ms",
"north america east:north america central": "50ms",
"north america east:europe": "130ms",
"north america east:northern europe": "140ms",
"north america east:east asia": "250ms",
"north america east:southeast asia": "300ms",
"north america east:australia": "230ms",
"north america central:north america west": "50ms",
"north america central:north america east": "50ms",
"north america central:north america central": "20ms",
"north america central:europe": "140ms",
"north america central:northern europe": "150ms",
"north america central:east asia": "200ms",
"north america central:southeast asia": "280ms",
"north america central:australia": "220ms",
"europe:north america west": "150ms",
"europe:north america east": "130ms",
"europe:north america central": "140ms",
"europe:europe": "50ms",
"europe:asia": "100ms",
"europe:north america": "120ms",
"asia:north america": "100ms",
"asia:europe": "120ms",
"asia:asia": "40ms"
"europe:northern europe": "60ms",
"europe:east asia": "300ms",
"europe:southeast asia": "300ms",
"europe:australia": "300ms",
"northern europe:north america west": "170ms",
"northern europe:north america east": "140ms",
"northern europe:north america central": "150ms",
"northern europe:europe": "60ms",
"northern europe:northern europe": "30ms",
"northern europe:east asia": "400ms",
"northern europe:southeast asia": "320ms",
"northern europe:australia": "300ms",
"east asia:north america west": "180ms",
"east asia:north america east": "250ms",
"east asia:north america central": "200ms",
"east asia:europe": "300ms",
"east asia:northern europe": "400ms",
"east asia:east asia": "50ms",
"east asia:southeast asia": "90ms",
"east asia:australia": "150ms",
"southeast asia:north america west": "200ms",
"southeast asia:north america east": "300ms",
"southeast asia:north america central": "280ms",
"southeast asia:europe": "300ms",
"southeast asia:northern europe": "320ms",
"southeast asia:east asia": "90ms",
"southeast asia:southeast asia": "40ms",
"southeast asia:australia": "150ms",
"australia:north america west": "250ms",
"australia:north america east": "230ms",
"australia:north america central": "220ms",
"australia:europe": "300ms",
"australia:northern europe": "300ms",
"australia:east asia": "150ms",
"australia:southeast asia": "150ms",
"australia:australia": "50ms"
},
"regions": {
"north america": 0.4,
"europe": 0.4,
"asia": 0.3
"north america west": 0.06,
"north america east": 0.15,
"north america central": 0.02,
"europe": 0.47,
"northern europe": 0.10,
"east asia": 0.10,
"southeast asia": 0.07,
"australia": 0.03
}
},
"node_settings": {
"timeout": "1000ms"
},
"step_time": "40ms",
"step_time": "10ms",
"runner_settings": "Sync",
"stream_settings": {
"path": "test.json"
},
"node_count": 10,
"node_count": 100,
"seed": 0,
"record_settings": {},
"wards": [
{
"sum": 10
"sum": 100
}
],
"connected_peers_count": 4,
"data_message_lottery_interval": "20s",
"stake_proportion": 1.0,
"epoch_duration": "432000s",
"slot_duration": "20s",
"slots_per_epoch": 21600,
"stake_proportion": 0.0,
"epoch_duration": "200s",
"slot_duration": "1s",
"slots_per_epoch": 200,
"number_of_hops": 2,
"persistent_transmission": {
"max_emission_frequency": 1.0,
"drop_message_probability": 0.0
},
"number_of_blend_layers": 2,
"max_delay_seconds": 10
"max_delay_seconds": 5
}

View File

@ -0,0 +1,260 @@
# !/usr/bin/env python
import argparse
import csv
import json
import os
import subprocess
from collections import OrderedDict
import mixlog
SIM_APP = "../../target/release/blendnet-sims"
def bandwidth_result(log_path: str, step_duration_ms: int) -> dict[str, float]:
max_step_id = 0
for topic, json_msg in mixlog.get_input_stream(log_path):
if topic == "MessageEvent":
max_step_id = max(max_step_id, json_msg["step_id"])
with open(log_path, "r") as file:
for line in file:
if "total_outbound_bandwidth" in line:
line = line[line.find("{") :]
line = line.replace("{ ", '{"')
line = line.replace(": ", '": ')
line = line.replace(", ", ', "')
record = json.loads(line)
elapsed = (max_step_id * step_duration_ms) / 1000.0
return {
"min": float(record["min_node_total_bandwidth"]) / elapsed,
"avg": float(record["avg_node_total_bandwidth"]) / elapsed,
"max": float(record["max_node_total_bandwidth"]) / elapsed,
}
raise Exception("No bandwidth data found in log file")
def topology_result(log_path: str) -> dict[str, int]:
for topic, json_msg in mixlog.get_input_stream(log_path):
if topic == "Topology":
return json_msg
raise Exception("No topology found in log file")
def build_argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.")
parser.add_argument(
"--step-duration",
type=int,
help="Duration (in ms) of each step in the simulation.",
)
parser.add_argument(
"--params-file",
type=str,
help="A CSV file that contains all parameter sets",
)
parser.add_argument(
"--orig-config-file",
type=str,
help="An original blendnet config JSON file that will be modified as specified in `params_file`",
)
parser.add_argument(
"--outdir",
type=str,
help="A output directory to be created",
)
parser.add_argument(
"--skip-run",
action="store_true",
help="Skip running the simulations and only analyze the logs",
)
return parser
if __name__ == "__main__":
argument_parser = build_argument_parser()
args = argument_parser.parse_args()
# Read the params CSV file
param_sets = []
param_set_header = []
with open(args.params_file, mode="r") as csvfile:
param_set_header = csvfile.readline().strip().split(",")
csvfile.seek(0) # Reset file pointer to the beginning after reading the header
reader = csv.DictReader(csvfile, delimiter=",")
param_sets = list(reader)
# Read the original blendnet json config file
with open(args.orig_config_file, "r") as jsonfile:
original_json = json.load(jsonfile)
# Directory to save modified JSON files
modified_configs_dir = os.path.join(args.outdir, "modified_configs")
os.makedirs(modified_configs_dir, exist_ok=True)
# Modify and save JSON files for each row in CSV
config_paths = []
for idx, param_set in enumerate(param_sets):
output_path = os.path.join(modified_configs_dir, f"{idx}.json")
config_paths.append(output_path)
if args.skip_run:
continue
modified_json = OrderedDict(original_json) # Preserve original field order
# Apply modifications
modified_json["network_settings"]["regions"]["north america west"] = 0.06
modified_json["network_settings"]["regions"]["north america east"] = 0.15
modified_json["network_settings"]["regions"]["north america central"] = 0.02
modified_json["network_settings"]["regions"]["europe"] = 0.47
modified_json["network_settings"]["regions"]["northern europe"] = 0.10
modified_json["network_settings"]["regions"]["east asia"] = 0.10
modified_json["network_settings"]["regions"]["southeast asia"] = 0.07
modified_json["network_settings"]["regions"]["australia"] = 0.03
modified_json["step_time"] = f"{args.step_duration}ms"
modified_json["node_count"] = int(param_set["network_size"])
modified_json["wards"][0]["sum"] = 1000
modified_json["connected_peers_count"] = int(param_set["peering_degree"])
modified_json["data_message_lottery_interval"] = "20s"
modified_json["stake_proportion"] = 0.0
modified_json["persistent_transmission"]["max_emission_frequency"] = 1.0
modified_json["persistent_transmission"]["drop_message_probability"] = 0.0
modified_json["epoch_duration"] = (
f"{int(param_set['cover_slots_per_epoch']) * int(param_set['cover_slot_duration'])}s"
)
modified_json["slots_per_epoch"] = int(param_set["cover_slots_per_epoch"])
modified_json["slot_duration"] = f"{param_set['cover_slot_duration']}s"
modified_json["max_delay_seconds"] = int(param_set["max_temporal_delay"])
modified_json["number_of_hops"] = int(param_set["blend_hops"])
modified_json["number_of_blend_layers"] = int(param_set["blend_hops"])
# Save modified JSON
with open(output_path, "w") as outfile:
json.dump(modified_json, outfile, indent=2)
print("Saved modified JSON to:", output_path)
# Directory to save logs
log_dir = os.path.join(args.outdir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_paths = []
for idx, config_path in enumerate(config_paths):
log_path = f"{log_dir}/{idx}.log"
log_paths.append(log_path)
if args.skip_run:
continue
with open(log_path, "w") as log_file:
print(
f"Running simulation-{idx}: {log_file.name} with config: {config_path}"
)
subprocess.run(
[
SIM_APP,
"run",
"--input-settings",
config_path,
],
stdout=log_file,
)
print(f"Simulation-{idx} completed: {log_file.name}")
print("Analyzing logs...")
print("=================")
with open(os.path.join(args.outdir, "output.csv"), "w", newline="") as file:
print(f"Writing results to: {file.name}")
csv_writer = csv.writer(file)
csv_writer.writerow(
param_set_header
+ [
"network_diameter",
"msg_count",
"min_latency_sec",
"25th_latency_sec",
"avg_latency_sec",
"median_latency_sec",
"75th_latency_sec",
"max_latency_sec",
"min_latency_msg_id",
"max_latency_msg_id",
"conn_latency_count",
"min_conn_latency_sec",
"25th_conn_latency_sec",
"avg_conn_latency_sec",
"med_conn_latency_sec",
"75th_conn_latency_sec",
"max_conn_latency_sec",
"min_bandwidth_kbps",
"avg_bandwidth_kbps",
"max_bandwidth_kbps",
]
)
for idx, log_path in enumerate(log_paths):
csv_row = []
csv_row.extend([param_sets[idx][key] for key in param_set_header])
csv_row.append(topology_result(log_path)["diameter"])
result = subprocess.run(
[
SIM_APP,
"analyze",
"message-latency",
"--log-file",
log_path,
"--step-duration",
f"{args.step_duration}ms",
],
capture_output=True,
text=True,
)
assert result.returncode == 0
latency_analysis = json.loads(result.stdout)
csv_row.append(latency_analysis["count"])
csv_row.append(float(latency_analysis["min"]) / 1000.0)
csv_row.append(float(latency_analysis["q1"]) / 1000.0)
csv_row.append(float(latency_analysis["avg"]) / 1000.0)
csv_row.append(float(latency_analysis["med"]) / 1000.0)
csv_row.append(float(latency_analysis["q3"]) / 1000.0)
csv_row.append(float(latency_analysis["max"]) / 1000.0)
csv_row.append(latency_analysis["min_payload_id"])
csv_row.append(latency_analysis["max_payload_id"])
result = subprocess.run(
[
SIM_APP,
"analyze",
"connection-latency",
"--log-file",
log_path,
"--step-duration",
f"{args.step_duration}ms",
],
capture_output=True,
text=True,
)
assert result.returncode == 0
result = json.loads(result.stdout)
csv_row.append(result["count"])
csv_row.append(float(result["min"]) / 1000.0)
csv_row.append(float(result["q1"]) / 1000.0)
csv_row.append(float(result["avg"]) / 1000.0)
csv_row.append(float(result["med"]) / 1000.0)
csv_row.append(float(result["q3"]) / 1000.0)
csv_row.append(float(result["max"]) / 1000.0)
bandwidth_res = bandwidth_result(log_path, args.step_duration)
csv_row.append(bandwidth_res["min"] * 8 / 1000.0)
csv_row.append(bandwidth_res["avg"] * 8 / 1000.0)
csv_row.append(bandwidth_res["max"] * 8 / 1000.0)
csv_writer.writerow(csv_row)
print(f"The outputs have been successfully written to {file.name}")

View File

@ -1,16 +1,14 @@
import argparse
import json
from collections.abc import Iterable
from typing import Any
import matplotlib
import matplotlib.pyplot as plt
import mixlog
import pandas as pd
import mixlog
def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None:
def plot_emissions(input_stream: Iterable[tuple[str, dict]], plot_path: str) -> None:
df = pd.DataFrame(emission_records(input_stream))
plt.figure(figsize=(12, 6))
@ -24,19 +22,8 @@ def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None:
plt.show()
def emission_records(input_stream: Iterable[str]) -> list[Any]:
records = []
for line in input_stream:
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
if "emission_type" in record:
records.append(record)
return records
def emission_records(input_stream: Iterable[tuple[str, dict]]) -> list[Any]:
return [record for _, record in filter(lambda x: x[0] == "Emission", input_stream)]
if __name__ == "__main__":

View File

@ -1,112 +0,0 @@
# !/usr/bin/env python
import argparse
import json
import statistics
from collections.abc import Iterable
from typing import Dict, Optional
import mixlog
class Message:
def __init__(self, message_id: str, step_a: Optional[int]):
self.id = message_id
self.step_a = int(step_a) if step_a is not None else None
self.step_b = None
def __hash__(self):
return self.id
def __repr__(self):
return f"[{self.id}] {self.step_a} -> {self.step_b}"
@property
def latency(self) -> Optional[int]:
if self.step_a is not None and self.step_b is not None:
return abs(self.step_a - self.step_b)
MessageStorage = Dict[str, Message]
def compute_results(message_storage: MessageStorage, step_duration: int) -> str:
latencies = [message_record.latency for message_record in message_storage.values()]
valued_latencies = [latency for latency in latencies if latency is not None]
incomplete_latencies = sum((1 for latency in latencies if latency is None))
total_messages = len(latencies)
total_messages_full_latency = len(valued_latencies)
total_messages_incomplete_latency = incomplete_latencies
latency_average_steps = statistics.mean(valued_latencies)
latency_average_ms = "{:.2f}ms".format(latency_average_steps * step_duration)
latency_median_steps = statistics.median(valued_latencies)
latency_median_ms = "{:.2f}ms".format(latency_median_steps * step_duration)
max_latency_steps = max(valued_latencies)
max_latency_ms = "{:.2f}ms".format(max_latency_steps * step_duration)
min_latency_steps = min(valued_latencies)
min_latency_ms = "{:.2f}ms".format(min_latency_steps * step_duration)
return f"""[Results]
- Total messages: {total_messages}
- Full latencies: {total_messages_full_latency}
- Incomplete latencies: {total_messages_incomplete_latency}
- Averages
- Steps: {latency_average_steps}
- Duration: {latency_average_ms}
- Median
- Steps: {latency_median_steps}
- Duration: {latency_median_ms}
- Max
- Steps: {max_latency_steps}
- Duration: {max_latency_ms}
- Min
- Steps: {min_latency_steps}
- Duration: {min_latency_ms}"""
def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage:
storage: MessageStorage = {}
for record in record_stream:
try:
json_record = json.loads(record)
except json.decoder.JSONDecodeError:
continue
if (payload_id := json_record.get("payload_id")) is None:
continue
step_id = json_record["step_id"]
if (stored_message := storage.get(payload_id)) is None:
storage[payload_id] = Message(payload_id, step_id)
else:
stored_message.step_b = step_id
return storage
def build_argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.")
parser.add_argument(
"--step-duration",
type=int,
default=100,
help="Duration (in ms) of each step in the simulation.",
)
parser.add_argument(
"input_file",
nargs="?",
help="The file to parse. If not provided, input will be read from stdin.",
)
return parser
if __name__ == "__main__":
argument_parser = build_argument_parser()
arguments = argument_parser.parse_args()
input_stream = mixlog.get_input_stream(arguments.input_file)
messages = parse_record_stream(input_stream)
results = compute_results(messages, arguments.step_duration)
print(results)

View File

@ -1,12 +1,20 @@
import json
import sys
from collections.abc import Iterable
from typing import Optional
TOPIC_LOG_INDICATOR = '{"topic":'
def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]:
def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[tuple[str, dict]]:
for record in record_stream:
bracket_pos = record.rfind("{")
yield record[bracket_pos:]
topic_idx = record.find(TOPIC_LOG_INDICATOR)
if topic_idx == -1:
continue
# Split the line into 2 parts: topic and JSON message
log = json.loads(record[topic_idx:].strip())
yield (log["topic"], log["message"])
def get_pipe_stream() -> Iterable[str]:
@ -18,7 +26,7 @@ def get_file_stream(input_filename) -> Iterable[str]:
yield from file
def get_input_stream(input_filename: Optional[str]) -> Iterable[str]:
def get_input_stream(input_filename: Optional[str]) -> Iterable[tuple[str, dict]]:
stream = (
get_file_stream(input_filename)
if input_filename is not None

View File

@ -0,0 +1,89 @@
use std::{error::Error, ops::Mul, path::PathBuf, time::Duration};
use std::{
collections::HashMap,
fs::File,
io::{BufRead, BufReader},
};
use netrunner::node::NodeId;
use polars::prelude::NamedFrom;
use polars::series::Series;
use serde::{Deserialize, Serialize};
use crate::node::blend::log::TopicLog;
use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId};
use super::message_latency::quantile;
pub fn analyze_connection_latency(
log_file: PathBuf,
step_duration: Duration,
) -> Result<(), Box<dyn Error>> {
let file = File::open(log_file)?;
let reader = BufReader::new(file);
let mut sent_events: HashMap<(PayloadId, NodeId, NodeId), usize> = HashMap::new();
let mut latencies_ms: Vec<i64> = Vec::new();
for line in reader.lines() {
let line = line?;
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(&line) {
assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message;
match event.event_type {
MessageEventType::NetworkSent { to } => {
assert_eq!(
sent_events.insert((event.payload_id, event.node_id, to), event.step_id),
None
);
}
MessageEventType::NetworkReceived { from } => {
let sent_step_id = sent_events
.remove(&(event.payload_id, from, event.node_id))
.unwrap();
let latency = step_duration
.mul((event.step_id - sent_step_id).try_into().unwrap())
.as_millis()
.try_into()
.unwrap();
latencies_ms.push(latency);
}
_ => {
continue;
}
}
}
}
let series = Series::new("latencies".into(), latencies_ms);
let series = Output::new(&series);
println!("{}", serde_json::to_string(&series).unwrap());
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
struct Output {
count: usize,
min: i64,
q1: f64,
avg: f64,
med: f64,
q3: f64,
max: i64,
}
impl Output {
fn new(series: &Series) -> Self {
Self {
count: series.len(),
min: series.min::<i64>().unwrap().unwrap(),
q1: quantile(series, 0.25),
avg: series.mean().unwrap(),
med: series.median().unwrap(),
q3: quantile(series, 0.75),
max: series.max::<i64>().unwrap().unwrap(),
}
}
}

View File

@ -0,0 +1,136 @@
use std::{
error::Error,
fs::File,
io::{BufRead, BufReader},
ops::{Add, Mul},
path::PathBuf,
time::Duration,
};
use netrunner::node::NodeId;
use serde::{Deserialize, Serialize};
use crate::node::blend::{
log::TopicLog,
message::{MessageEvent, MessageEventType, PayloadId},
};
pub fn analyze_message_history(
log_file: PathBuf,
step_duration: Duration,
payload_id: PayloadId,
) -> Result<(), Box<dyn Error>> {
let file = File::open(log_file)?;
let reader = BufReader::new(file);
let mut history = Vec::new();
let mut target_node_id: Option<NodeId> = None;
let mut target_event: Option<MessageEventType> = None;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
for line in lines.iter().rev() {
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(line) {
assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message;
if event.payload_id == payload_id
&& (target_node_id.is_none() || target_node_id.unwrap() == event.node_id)
&& (target_event.is_none() || target_event.as_ref().unwrap() == &event.event_type)
{
match event.event_type {
MessageEventType::FullyUnwrapped => {
assert!(history.is_empty());
assert!(target_node_id.is_none());
target_node_id = Some(event.node_id);
history.push(event);
}
MessageEventType::Created => {
assert!(!history.is_empty());
assert!(target_node_id.is_some());
history.push(event);
}
MessageEventType::PersistentTransmissionScheduled { .. } => {
assert!(target_node_id.is_some());
assert!(matches!(
history.last().unwrap().event_type,
MessageEventType::PersistentTransmissionReleased { .. }
));
history.push(event);
}
MessageEventType::PersistentTransmissionReleased => {
assert!(target_node_id.is_some());
history.push(event);
}
MessageEventType::TemporalProcessorScheduled { .. } => {
assert!(target_node_id.is_some());
assert!(matches!(
history.last().unwrap().event_type,
MessageEventType::TemporalProcessorReleased { .. }
));
history.push(event);
}
MessageEventType::TemporalProcessorReleased => {
assert!(target_node_id.is_some());
history.push(event);
}
MessageEventType::NetworkReceived { from } => {
assert!(!history.is_empty());
assert!(target_node_id.is_some());
assert_ne!(target_node_id.unwrap(), from);
target_node_id = Some(from);
target_event = Some(MessageEventType::NetworkSent { to: event.node_id });
history.push(event);
}
MessageEventType::NetworkSent { .. } => {
assert!(!history.is_empty());
assert!(target_node_id.is_some());
if target_event.is_none()
|| target_event.as_ref().unwrap() != &event.event_type
{
continue;
}
target_event = None;
history.push(event);
}
}
}
}
}
let mut history_with_durations: Vec<MessageEventWithDuration> = Vec::new();
let (_, total_duration) = history.iter().rev().fold(
(None, Duration::ZERO),
|(prev_step_id, total_duration): (Option<usize>, Duration), event| {
let duration = match prev_step_id {
Some(prev_step_id) => {
step_duration.mul((event.step_id - prev_step_id).try_into().unwrap())
}
None => Duration::ZERO,
};
history_with_durations.push(MessageEventWithDuration {
event: event.clone(),
duration,
});
(Some(event.step_id), total_duration.add(duration))
},
);
let output = Output {
history: history_with_durations,
total_duration,
};
println!("{}", serde_json::to_string(&output).unwrap());
Ok(())
}
#[derive(Serialize, Deserialize)]
struct Output {
history: Vec<MessageEventWithDuration>,
#[serde(with = "humantime_serde")]
total_duration: Duration,
}
#[derive(Serialize, Deserialize)]
struct MessageEventWithDuration {
event: MessageEvent,
#[serde(with = "humantime_serde")]
duration: Duration,
}

View File

@ -0,0 +1,114 @@
use core::panic;
use std::{error::Error, ops::Mul, path::PathBuf, time::Duration};
use std::{
collections::HashMap,
fs::File,
io::{BufRead, BufReader},
};
use polars::prelude::{AnyValue, NamedFrom, QuantileMethod, Scalar};
use polars::series::Series;
use serde::{Deserialize, Serialize};
use crate::node::blend::log::TopicLog;
use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId};
pub fn analyze_message_latency(
log_file: PathBuf,
step_duration: Duration,
) -> Result<(), Box<dyn Error>> {
let file = File::open(log_file)?;
let reader = BufReader::new(file);
let mut messages: HashMap<PayloadId, usize> = HashMap::new();
let mut latencies_ms: Vec<i64> = Vec::new();
let mut latency_to_message: HashMap<i64, PayloadId> = HashMap::new();
for line in reader.lines() {
let line = line?;
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(&line) {
assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message;
match event.event_type {
MessageEventType::Created => {
assert_eq!(messages.insert(event.payload_id, event.step_id), None);
}
MessageEventType::FullyUnwrapped => match messages.remove(&event.payload_id) {
Some(created_step_id) => {
let latency = step_duration
.mul((event.step_id - created_step_id).try_into().unwrap())
.as_millis()
.try_into()
.unwrap();
latencies_ms.push(latency);
latency_to_message.insert(latency, event.payload_id);
}
None => {
panic!(
"FullyUnwrapped event without Created event: {}",
event.payload_id
);
}
},
_ => {
continue;
}
}
}
}
let series = Series::new("latencies".into(), latencies_ms);
let series = Output::new(&series, &latency_to_message);
println!("{}", serde_json::to_string(&series).unwrap());
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
struct Output {
count: usize,
min: i64,
min_payload_id: PayloadId,
q1: f64,
avg: f64,
med: f64,
q3: f64,
max: i64,
max_payload_id: PayloadId,
}
impl Output {
fn new(series: &Series, latency_to_message: &HashMap<i64, PayloadId>) -> Self {
let min = series.min::<i64>().unwrap().unwrap();
let min_payload_id = latency_to_message.get(&min).unwrap().clone();
let max = series.max::<i64>().unwrap().unwrap();
let max_payload_id = latency_to_message.get(&max).unwrap().clone();
Self {
count: series.len(),
min,
min_payload_id,
q1: quantile(series, 0.25),
avg: series.mean().unwrap(),
med: series.median().unwrap(),
q3: quantile(series, 0.75),
max,
max_payload_id,
}
}
}
pub(crate) fn quantile(series: &Series, quantile: f64) -> f64 {
f64_from_scalar(
&series
.quantile_reduce(quantile, QuantileMethod::Linear)
.unwrap(),
)
}
fn f64_from_scalar(scalar: &Scalar) -> f64 {
match scalar.value() {
AnyValue::Float64(value) => *value,
_ => panic!("Expected f64"),
}
}

View File

@ -0,0 +1,3 @@
pub mod conn_latency;
pub mod message_history;
pub mod message_latency;

View File

@ -6,10 +6,14 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates
use crate::node::blend::state::{BlendnodeRecord, BlendnodeState};
use crate::node::blend::{BlendMessage, BlendnodeSettings};
use crate::node::blend::{BlendnodeSettings, SimMessage};
use analysis::conn_latency::analyze_connection_latency;
use analysis::message_history::analyze_message_history;
use analysis::message_latency::analyze_message_latency;
use anyhow::Ok;
use clap::Parser;
use clap::{Parser, Subcommand};
use crossbeam::channel;
use multiaddr::Multiaddr;
use netrunner::network::behaviour::create_behaviours;
use netrunner::network::regions::{create_regions, RegionsData};
use netrunner::network::{InMemoryNetworkInterface, Network, PayloadSize};
@ -17,6 +21,7 @@ use netrunner::node::{NodeId, NodeIdExt};
use netrunner::output_processors::Record;
use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
use node::blend::message::PayloadId;
use node::blend::topology::Topology;
use nomos_blend::cover_traffic::CoverTrafficSettings;
use nomos_blend::message_blend::{
@ -33,10 +38,38 @@ use crate::node::blend::BlendNode;
use crate::settings::SimSettings;
use netrunner::{runner::SimulationRunner, settings::SimulationSettings};
pub mod analysis;
mod log;
mod node;
mod settings;
#[derive(Parser)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Run the simulation
Run(SimulationApp),
/// Analyze the simulation results
Analyze {
#[command(subcommand)]
command: AnalyzeCommands,
},
}
#[derive(Subcommand)]
enum AnalyzeCommands {
/// Analyze the latency of the messages fully unwrapped
MessageLatency(MessageLatencyApp),
/// Analyze the history of a message
MessageHistory(MessageHistoryApp),
/// Analyze connection latency
ConnectionLatency(ConnectionLatencyApp),
}
/// Main simulation wrapper
/// Pipes together the cli arguments with the execution
#[derive(Parser)]
@ -85,13 +118,25 @@ impl SimulationApp {
&mut rng,
&settings.simulation_settings.network_settings,
);
log!(
"Regions",
regions
.iter()
.map(|(region, node_ids)| (*region, node_ids.len()))
.collect::<HashMap<_, _>>()
);
let behaviours = create_behaviours(&settings.simulation_settings.network_settings);
let regions_data = RegionsData::new(regions, behaviours);
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed)));
let network = Arc::new(Mutex::new(Network::<SimMessage>::new(
regions_data.clone(),
seed,
)));
let topology = Topology::new(&node_ids, settings.connected_peers_count, &mut rng);
log_topology(&topology);
log_conn_latency_distribution(&topology.conn_latency_distribution(&regions_data));
let nodes: Vec<_> = node_ids
.iter()
@ -126,7 +171,14 @@ impl SimulationApp {
slots_per_epoch: settings.slots_per_epoch,
network_size: node_ids.len(),
},
membership: node_ids.iter().map(|&id| id.into()).collect(),
membership: node_ids
.iter()
.map(|&id| nomos_blend::membership::Node {
id,
address: Multiaddr::empty(),
public_key: id.into(),
})
.collect(),
},
)
})
@ -141,7 +193,7 @@ impl SimulationApp {
fn create_boxed_blendnode(
node_id: NodeId,
network: &mut Network<BlendMessage>,
network: &mut Network<SimMessage>,
simulation_settings: SimulationSettings,
no_netcap: bool,
blendnode_settings: BlendnodeSettings,
@ -246,11 +298,13 @@ fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
}
fn log_topology(topology: &Topology) {
let log = TopologyLog {
topology: topology.to_node_indices(),
diameter: topology.diameter(),
};
tracing::info!("Topology: {}", serde_json::to_string(&log).unwrap());
log!(
"Topology",
TopologyLog {
topology: topology.to_node_indices(),
diameter: topology.diameter(),
}
);
}
#[derive(Debug, Serialize, Deserialize)]
@ -259,14 +313,88 @@ struct TopologyLog {
diameter: usize,
}
fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse();
let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics);
if let Err(e) = app.run() {
tracing::error!("error: {}", e);
drop(maybe_guard);
std::process::exit(1);
}
Ok(())
fn log_conn_latency_distribution(distribution: &HashMap<Duration, usize>) {
log!(
"ConnLatencyDistribution",
ConnLatencyDistributionLog {
num_links: distribution.values().sum(),
distribution: distribution
.iter()
.map(|(latency, count)| (latency.as_millis(), *count))
.collect::<HashMap<_, _>>(),
}
);
}
#[derive(Debug, Serialize, Deserialize)]
struct ConnLatencyDistributionLog {
num_links: usize,
distribution: HashMap<u128, usize>,
}
#[derive(Parser)]
struct MessageLatencyApp {
#[clap(long, short)]
log_file: PathBuf,
#[clap(long, short, value_parser = humantime::parse_duration)]
step_duration: Duration,
}
#[derive(Parser)]
struct MessageHistoryApp {
#[clap(long, short)]
log_file: PathBuf,
#[clap(long, short, value_parser = humantime::parse_duration)]
step_duration: Duration,
#[clap(long, short)]
payload_id: PayloadId,
}
#[derive(Parser)]
struct ConnectionLatencyApp {
#[clap(long, short)]
log_file: PathBuf,
#[clap(long, short, value_parser = humantime::parse_duration)]
step_duration: Duration,
}
fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
match cli.command {
Commands::Run(app) => {
let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics);
if let Err(e) = app.run() {
tracing::error!("error: {}", e);
drop(maybe_guard);
std::process::exit(1);
}
Ok(())
}
Commands::Analyze { command } => match command {
AnalyzeCommands::MessageLatency(app) => {
if let Err(e) = analyze_message_latency(app.log_file, app.step_duration) {
tracing::error!("error: {}", e);
std::process::exit(1);
}
Ok(())
}
AnalyzeCommands::MessageHistory(app) => {
if let Err(e) =
analyze_message_history(app.log_file, app.step_duration, app.payload_id)
{
tracing::error!("error: {}", e);
std::process::exit(1);
}
Ok(())
}
AnalyzeCommands::ConnectionLatency(app) => {
if let Err(e) = analyze_connection_latency(app.log_file, app.step_duration) {
tracing::error!("error: {}", e);
std::process::exit(1);
}
Ok(())
}
},
}
}

View File

@ -0,0 +1,24 @@
use serde::{Deserialize, Serialize};
#[macro_export]
macro_rules! log {
($topic:expr, $msg:expr) => {
println!(
"{}",
serde_json::to_string(&$crate::node::blend::log::TopicLog {
topic: $topic.to_string(),
message: $msg
})
.unwrap()
);
};
}
#[derive(Serialize, Deserialize)]
pub struct TopicLog<M>
where
M: 'static,
{
pub topic: String,
pub message: M,
}

View File

@ -1,3 +1,6 @@
use netrunner::node::NodeId;
use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;
pub type PayloadId = String;
@ -23,6 +26,58 @@ impl Payload {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageEvent {
pub payload_id: PayloadId,
pub step_id: usize,
#[serde(with = "node_id_serde")]
pub node_id: NodeId,
pub event_type: MessageEventType,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageEventType {
Created,
PersistentTransmissionScheduled {
index: usize,
},
PersistentTransmissionReleased,
TemporalProcessorScheduled {
index: usize,
},
TemporalProcessorReleased,
NetworkSent {
#[serde(with = "node_id_serde")]
to: NodeId,
},
NetworkReceived {
#[serde(with = "node_id_serde")]
from: NodeId,
},
FullyUnwrapped,
}
mod node_id_serde {
use netrunner::node::{NodeId, NodeIdExt};
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(node_id: &NodeId, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(node_id.index().try_into().unwrap())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<NodeId, D::Error>
where
D: Deserializer<'de>,
{
Ok(NodeId::from_index(
u64::deserialize(deserializer)?.try_into().unwrap(),
))
}
}
#[cfg(test)]
mod tests {
use super::Payload;

View File

@ -1,6 +1,8 @@
pub mod consensus_streams;
#[macro_use]
pub mod log;
pub mod lottery;
mod message;
pub mod message;
pub mod scheduler;
pub mod state;
pub mod stream_wrapper;
@ -11,41 +13,44 @@ use cached::{Cached, TimedCache};
use crossbeam::channel;
use futures::Stream;
use lottery::StakeLottery;
use message::{Payload, PayloadId};
use multiaddr::Multiaddr;
use message::{MessageEvent, MessageEventType, Payload};
use netrunner::network::NetworkMessage;
use netrunner::node::{Node, NodeId, NodeIdExt};
use netrunner::node::{Node, NodeId};
use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition,
};
use nomos_blend::message_blend::temporal::{TemporalProcessorExt, TemporalStream};
use nomos_blend::{
cover_traffic::{CoverTraffic, CoverTrafficSettings},
membership::Membership,
message_blend::{
crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream,
},
message_blend::{crypto::CryptographicProcessor, MessageBlendSettings},
persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
},
BlendOutgoingMessage,
};
use nomos_blend_message::mock::MockBlendMessage;
use nomos_blend_message::BlendMessage as _;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalRelease};
use serde::{Deserialize, Serialize};
use scheduler::{Interval, TemporalScheduler};
use serde::Deserialize;
use sha2::{Digest, Sha256};
use state::BlendnodeState;
use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone)]
pub struct BlendMessage(Vec<u8>);
pub struct SimMessage(Vec<u8>);
impl PayloadSize for BlendMessage {
impl PayloadSize for SimMessage {
fn size_bytes(&self) -> u32 {
2208
// payload: 32 KiB
// header encryption overhead: 133 bytes = 48 + 17 * max_blend_hops(=5)
// payload encryption overhaed: 16 bytes
// economic data overhead: 8043 bytes
40960
}
}
@ -60,7 +65,12 @@ pub struct BlendnodeSettings {
pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings<MockBlendMessage>,
pub cover_traffic_settings: CoverTrafficSettings,
pub membership: Vec<<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey>,
pub membership: Vec<
nomos_blend::membership::Node<
NodeId,
<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
>,
>,
}
type Sha256Hash = [u8; 32];
@ -70,30 +80,24 @@ pub struct BlendNode {
id: NodeId,
state: BlendnodeState,
settings: BlendnodeSettings,
network_interface: InMemoryNetworkInterface<BlendMessage>,
network_interface: InMemoryNetworkInterface<SimMessage>,
message_cache: TimedCache<Sha256Hash, ()>,
data_msg_lottery_update_time_sender: channel::Sender<Duration>,
data_msg_lottery_interval: Interval,
data_msg_lottery: StakeLottery<ChaCha12Rng>,
persistent_sender: channel::Sender<Vec<u8>>,
persistent_sender: channel::Sender<SimMessage>,
persistent_update_time_sender: channel::Sender<Duration>,
persistent_transmission_messages: PersistentTransmissionStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
MockBlendMessage,
Interval,
>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockBlendMessage>,
blend_sender: channel::Sender<Vec<u8>>,
blend_update_time_sender: channel::Sender<Duration>,
blend_messages: MessageBlendStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
MockBlendMessage,
TemporalRelease,
>,
persistent_transmission_messages:
PersistentTransmissionStream<CrossbeamReceiverStream<SimMessage>, ChaCha12Rng, Interval>,
crypto_processor: CryptographicProcessor<NodeId, ChaCha12Rng, MockBlendMessage>,
temporal_sender: channel::Sender<BlendOutgoingMessage>,
temporal_update_time_sender: channel::Sender<Duration>,
temporal_processor_messages:
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessage>, TemporalScheduler>,
epoch_update_sender: channel::Sender<Duration>,
slot_update_sender: channel::Sender<Duration>,
cover_traffic: CoverTraffic<Epoch, Slot, MockBlendMessage>,
@ -103,7 +107,7 @@ impl BlendNode {
pub fn new(
id: NodeId,
settings: BlendnodeSettings,
network_interface: InMemoryNetworkInterface<BlendMessage>,
network_interface: InMemoryNetworkInterface<SimMessage>,
) -> Self {
let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed);
@ -120,7 +124,7 @@ impl BlendNode {
);
// Init Tier-1: Persistent transmission
let (persistent_sender, persistent_receiver) = channel::unbounded();
let (persistent_sender, persistent_receiver) = channel::unbounded::<SimMessage>();
let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded();
let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver)
.persistent_transmission(
@ -132,43 +136,28 @@ impl BlendNode {
),
persistent_update_time_receiver,
),
SimMessage(MockBlendMessage::DROP_MESSAGE.to_vec()),
);
// Init Tier-2: message blend
let (blend_sender, blend_receiver) = channel::unbounded();
let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded();
let nodes: Vec<
nomos_blend::membership::Node<
<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
>,
> = settings
.membership
.iter()
.map(|&public_key| nomos_blend::membership::Node {
address: Multiaddr::empty(),
public_key,
})
.collect();
let membership = Membership::<MockBlendMessage>::new(nodes, id.into());
// Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor
let membership =
Membership::<NodeId, MockBlendMessage>::new(settings.membership.clone(), id.into());
let crypto_processor = CryptographicProcessor::new(
settings.message_blend.cryptographic_processor.clone(),
membership.clone(),
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
);
let temporal_release = TemporalRelease::new(
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
blend_update_time_receiver,
(
1,
settings.message_blend.temporal_processor.max_delay_seconds,
),
);
let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend(
settings.message_blend.clone(),
membership,
temporal_release,
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
);
let (temporal_sender, temporal_receiver) = channel::unbounded();
let (temporal_update_time_sender, temporal_update_time_receiver) = channel::unbounded();
let temporal_processor_messages = CrossbeamReceiverStream::new(temporal_receiver)
.temporal_stream(TemporalScheduler::new(
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
temporal_update_time_receiver,
(
0,
settings.message_blend.temporal_processor.max_delay_seconds,
),
));
// tier 3 cover traffic
let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded();
@ -194,6 +183,8 @@ impl BlendNode {
node_id: id,
step_id: 0,
num_messages_fully_unwrapped: 0,
cur_num_persistent_scheduled: 0,
cur_num_temporal_scheduled: 0,
},
data_msg_lottery_update_time_sender,
data_msg_lottery_interval,
@ -202,43 +193,57 @@ impl BlendNode {
persistent_update_time_sender,
persistent_transmission_messages,
crypto_processor,
blend_sender,
blend_update_time_sender,
blend_messages,
temporal_sender,
temporal_update_time_sender,
temporal_processor_messages,
epoch_update_sender,
slot_update_sender,
cover_traffic,
}
}
fn forward(
&mut self,
message: BlendMessage,
exclude_node: Option<NodeId>,
log: Option<EmissionLog>,
) {
for (i, node_id) in self
fn parse_payload(message: &[u8]) -> Payload {
Payload::load(MockBlendMessage::payload(message).unwrap())
}
fn forward(&mut self, message: SimMessage, exclude_node: Option<NodeId>) {
let payload_id = Self::parse_payload(&message.0).id();
for node_id in self
.settings
.connected_peers
.iter()
.filter(|&id| Some(*id) != exclude_node)
.enumerate()
{
if i == 0 {
if let Some(log) = &log {
Self::log_emission(log);
log!(
"MessageEvent",
MessageEvent {
payload_id: payload_id.clone(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::NetworkSent { to: *node_id }
}
}
);
self.network_interface
.send_message(*node_id, message.clone())
}
self.message_cache.cache_set(Self::sha256(&message.0), ());
}
fn receive(&mut self) -> Vec<NetworkMessage<BlendMessage>> {
fn receive(&mut self) -> Vec<NetworkMessage<SimMessage>> {
self.network_interface
.receive_messages()
.into_iter()
.inspect(|msg| {
log!(
"MessageEvent",
MessageEvent {
payload_id: Self::parse_payload(&msg.payload().0).id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::NetworkReceived { from: msg.from }
}
);
})
// Retain only messages that have not been seen before
.filter(|msg| {
self.message_cache
@ -254,44 +259,68 @@ impl BlendNode {
hasher.finalize().into()
}
fn schedule_persistent_transmission(&mut self, message: SimMessage) {
log!(
"MessageEvent",
MessageEvent {
payload_id: Self::parse_payload(&message.0).id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::PersistentTransmissionScheduled {
index: self.state.cur_num_persistent_scheduled
}
}
);
self.persistent_sender.send(message).unwrap();
self.state.cur_num_persistent_scheduled += 1;
}
fn handle_incoming_message(&mut self, message: SimMessage) {
match self.crypto_processor.unwrap_message(&message.0) {
Ok((unwrapped_message, fully_unwrapped)) => {
let temporal_message = if fully_unwrapped {
BlendOutgoingMessage::FullyUnwrapped(unwrapped_message)
} else {
BlendOutgoingMessage::Outbound(unwrapped_message)
};
self.schedule_temporal_processor(temporal_message);
}
Err(e) => {
tracing::debug!("Failed to unwrap message: {:?}", e);
}
}
}
fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessage) {
log!(
"MessageEvent",
MessageEvent {
payload_id: match &message {
BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg).id(),
BlendOutgoingMessage::FullyUnwrapped(payload) =>
Payload::load(payload.clone()).id(),
},
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::TemporalProcessorScheduled {
index: self.state.cur_num_temporal_scheduled
}
}
);
self.temporal_sender.send(message).unwrap();
self.state.cur_num_temporal_scheduled += 1;
}
fn update_time(&mut self, elapsed: Duration) {
self.data_msg_lottery_update_time_sender
.send(elapsed)
.unwrap();
self.persistent_update_time_sender.send(elapsed).unwrap();
self.blend_update_time_sender.send(elapsed).unwrap();
self.temporal_update_time_sender.send(elapsed).unwrap();
self.epoch_update_sender.send(elapsed).unwrap();
self.slot_update_sender.send(elapsed).unwrap();
}
fn log_message_generated(&self, msg_type: &str, payload: &Payload) {
self.log_message(format!("{}MessageGenerated", msg_type).as_str(), payload);
}
fn log_message_fully_unwrapped(&self, payload: &Payload) {
self.log_message("MessageFullyUnwrapped", payload);
}
fn log_message(&self, tag: &str, payload: &Payload) {
let log = MessageLog {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id.index(),
};
tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap());
}
fn log_emission(log: &EmissionLog) {
tracing::info!("Emission: {}", serde_json::to_string(log).unwrap());
}
fn new_emission_log(&self, emission_type: &str) -> EmissionLog {
EmissionLog {
emission_type: emission_type.to_string(),
step_id: self.state.step_id,
node_id: self.id.index(),
}
}
}
impl Node for BlendNode {
@ -316,38 +345,72 @@ impl Node for BlendNode {
if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) {
if self.data_msg_lottery.run() {
let payload = Payload::new();
self.log_message_generated("Data", &payload);
let message = self
.crypto_processor
.wrap_message(payload.as_bytes())
.unwrap();
self.persistent_sender.send(message).unwrap();
log!(
"MessageEvent",
MessageEvent {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::Created
}
);
self.schedule_persistent_transmission(SimMessage(message));
}
}
// Handle incoming messages
for network_message in self.receive() {
if MockBlendMessage::is_drop_message(&network_message.payload().0) {
continue;
}
self.forward(
network_message.payload().clone(),
Some(network_message.from),
None,
);
self.blend_sender
.send(network_message.into_payload().0)
.unwrap();
self.handle_incoming_message(network_message.into_payload());
}
// Proceed message blend
if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) {
// Proceed temporal processor
if let Poll::Ready(Some(msg)) =
pin!(&mut self.temporal_processor_messages).poll_next(&mut cx)
{
log!(
"MessageEvent",
MessageEvent {
payload_id: match &msg {
BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg).id(),
BlendOutgoingMessage::FullyUnwrapped(payload) =>
Payload::load(payload.clone()).id(),
},
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::TemporalProcessorReleased
}
);
self.state.cur_num_temporal_scheduled -= 1;
// Proceed the message
match msg {
BlendOutgoingMessage::Outbound(msg) => {
self.persistent_sender.send(msg).unwrap();
BlendOutgoingMessage::Outbound(message) => {
self.schedule_persistent_transmission(SimMessage(message));
}
BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload);
self.log_message_fully_unwrapped(&payload);
log!(
"MessageEvent",
MessageEvent {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::FullyUnwrapped
}
);
self.state.num_messages_fully_unwrapped += 1;
//TODO: create a tracing event
}
}
}
@ -355,23 +418,37 @@ impl Node for BlendNode {
// Generate a cover message probabilistically
if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) {
let payload = Payload::new();
self.log_message_generated("Cover", &payload);
let message = self
.crypto_processor
.wrap_message(payload.as_bytes())
.unwrap();
self.persistent_sender.send(message).unwrap();
log!(
"MessageEvent",
MessageEvent {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::Created
}
);
self.schedule_persistent_transmission(SimMessage(message));
}
// Proceed persistent transmission
if let Poll::Ready(Some(msg)) =
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{
self.forward(
BlendMessage(msg),
None,
Some(self.new_emission_log("FromPersistent")),
log!(
"MessageEvent",
MessageEvent {
payload_id: Self::parse_payload(&msg.0).id(),
step_id: self.state.step_id,
node_id: self.id,
event_type: MessageEventType::PersistentTransmissionReleased
}
);
self.state.cur_num_persistent_scheduled -= 1;
self.forward(msg, None);
}
self.state.step_id += 1;
@ -387,17 +464,3 @@ impl Node for BlendNode {
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct MessageLog {
payload_id: PayloadId,
step_id: usize,
node_id: usize,
}
#[derive(Debug, Serialize, Deserialize)]
struct EmissionLog {
emission_type: String,
step_id: usize,
node_id: usize,
}

View File

@ -44,14 +44,14 @@ impl Stream for Interval {
}
}
pub struct TemporalRelease {
pub struct TemporalScheduler {
random_sleeps: Box<dyn Iterator<Item = Duration> + Send + Sync + 'static>,
elapsed: Duration,
current_sleep: Duration,
update_time: channel::Receiver<Duration>,
}
impl TemporalRelease {
impl TemporalScheduler {
pub fn new<Rng: RngCore + Send + Sync + 'static>(
mut rng: Rng,
update_time: channel::Receiver<Duration>,
@ -80,7 +80,7 @@ impl TemporalRelease {
}
}
impl Stream for TemporalRelease {
impl Stream for TemporalScheduler {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -134,7 +134,7 @@ mod tests {
fn temporal_release_update() {
let (_tx, rx) = channel::unbounded();
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
assert!(!temporal_release.update(Duration::from_secs(0)));
assert!(!temporal_release.update(Duration::from_millis(999)));
@ -149,7 +149,7 @@ mod tests {
let (tx, rx) = channel::unbounded();
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
tx.send(Duration::from_secs(0)).unwrap();
assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending);

View File

@ -15,6 +15,8 @@ pub struct BlendnodeState {
pub node_id: NodeId,
pub step_id: usize,
pub num_messages_fully_unwrapped: usize,
pub cur_num_persistent_scheduled: usize,
pub cur_num_temporal_scheduled: usize,
}
#[derive(Serialize)]

View File

@ -1,6 +1,12 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use netrunner::node::{NodeId, NodeIdExt};
use netrunner::{
network::regions::RegionsData,
node::{NodeId, NodeIdExt},
};
use rand::{seq::SliceRandom, RngCore};
#[derive(Clone)]
@ -121,6 +127,28 @@ impl Topology {
hop_count
}
pub fn conn_latency_distribution(
&self,
regions_data: &RegionsData,
) -> HashMap<Duration, usize> {
// Initialize a distribution
let distribution = regions_data
.region_network_behaviour
.values()
.map(|behaviour| (behaviour.delay(), 0))
.collect();
// Populate the distribution
self.0.iter().fold(distribution, |mut acc, (node, peers)| {
let region_a = regions_data.node_region(*node);
peers.iter().for_each(|peer| {
let region_b = regions_data.node_region(*peer);
let behaviour = regions_data.network_behaviour_between_regions(region_a, region_b);
acc.entry(behaviour.delay()).and_modify(|count| *count += 1);
});
acc
})
}
pub fn get(&self, node: &NodeId) -> Option<&HashSet<NodeId>> {
self.0.get(node)
}

View File

@ -618,16 +618,16 @@ mod tests {
let node_c = NodeId::from_index(2);
let regions = HashMap::from([
(Region::Asia, vec![node_a, node_b]),
(Region::EastAsia, vec![node_a, node_b]),
(Region::Europe, vec![node_c]),
]);
let behaviour = HashMap::from([
(
NetworkBehaviourKey::new(Region::Asia, Region::Asia),
NetworkBehaviourKey::new(Region::EastAsia, Region::EastAsia),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
(
NetworkBehaviourKey::new(Region::Asia, Region::Europe),
NetworkBehaviourKey::new(Region::EastAsia, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(500), 0.0),
),
(

View File

@ -10,9 +10,13 @@ use super::{NetworkBehaviourKey, NetworkSettings};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Region {
NorthAmerica,
NorthAmericaWest,
NorthAmericaCentral,
NorthAmericaEast,
Europe,
Asia,
NorthernEurope,
EastAsia,
SoutheastAsia,
Africa,
SouthAmerica,
Australia,
@ -21,9 +25,13 @@ pub enum Region {
impl core::fmt::Display for Region {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let s = match self {
Self::NorthAmerica => "NorthAmerica",
Self::NorthAmericaWest => "NorthAmericaWest",
Self::NorthAmericaCentral => "NorthAmericaCentral",
Self::NorthAmericaEast => "NorthAmericaEast",
Self::Europe => "Europe",
Self::Asia => "Asia",
Self::NorthernEurope => "NorthernEurope",
Self::EastAsia => "EastAsia",
Self::SoutheastAsia => "SoutheastAsia",
Self::Africa => "Africa",
Self::SouthAmerica => "SouthAmerica",
Self::Australia => "Australia",
@ -42,9 +50,13 @@ impl FromStr for Region {
.replace(['-', '_', ' '], "")
.as_str()
{
"northamerica" | "na" => Ok(Self::NorthAmerica),
"northamericawest" | "naw" => Ok(Self::NorthAmericaWest),
"northamericacentral" | "nac" => Ok(Self::NorthAmericaCentral),
"northamericaeast" | "nae" => Ok(Self::NorthAmericaEast),
"europe" | "eu" => Ok(Self::Europe),
"asia" | "as" => Ok(Self::Asia),
"northerneurope" | "neu" => Ok(Self::NorthernEurope),
"eastasia" | "eas" => Ok(Self::EastAsia),
"southeastasia" | "seas" => Ok(Self::SoutheastAsia),
"africa" | "af" => Ok(Self::Africa),
"southamerica" | "sa" => Ok(Self::SouthAmerica),
"australia" | "au" => Ok(Self::Australia),
@ -56,9 +68,13 @@ impl FromStr for Region {
impl Serialize for Region {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let s = match self {
Self::NorthAmerica => "North America",
Self::NorthAmericaWest => "North America West",
Self::NorthAmericaCentral => "North America Central",
Self::NorthAmericaEast => "North America East",
Self::Europe => "Europe",
Self::Asia => "Asia",
Self::NorthernEurope => "Northern Europe",
Self::EastAsia => "EastAsia",
Self::SoutheastAsia => "Southeast Asia",
Self::Africa => "Africa",
Self::SouthAmerica => "South America",
Self::Australia => "Australia",
@ -105,6 +121,14 @@ impl RegionsData {
pub fn network_behaviour(&self, node_a: NodeId, node_b: NodeId) -> &NetworkBehaviour {
let region_a = self.node_region[&node_a];
let region_b = self.node_region[&node_b];
self.network_behaviour_between_regions(region_a, region_b)
}
pub fn network_behaviour_between_regions(
&self,
region_a: Region,
region_b: Region,
) -> &NetworkBehaviour {
let k = NetworkBehaviourKey::new(region_a, region_b);
let k_rev = NetworkBehaviourKey::new(region_b, region_a);
self.region_network_behaviour
@ -207,9 +231,13 @@ mod tests {
.collect::<Vec<NodeId>>();
let available_regions = [
Region::NorthAmerica,
Region::NorthAmericaWest,
Region::NorthAmericaCentral,
Region::NorthAmericaEast,
Region::Europe,
Region::Asia,
Region::NorthernEurope,
Region::EastAsia,
Region::SoutheastAsia,
Region::Africa,
Region::SouthAmerica,
Region::Australia,

View File

@ -114,7 +114,7 @@ where
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::{collections::HashMap, time::Duration};
use crate::{
@ -174,43 +174,19 @@ mod tests {
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {
@ -231,4 +207,20 @@ mod tests {
.stop_after(Duration::from_millis(100))
.unwrap();
}
pub(crate) fn region_from_index(idx: usize) -> Region {
match idx % 10 {
0 => Region::Europe,
1 => Region::NorthernEurope,
2 => Region::NorthAmericaWest,
3 => Region::NorthAmericaCentral,
4 => Region::NorthAmericaEast,
5 => Region::SouthAmerica,
6 => Region::EastAsia,
7 => Region::SoutheastAsia,
8 => Region::Africa,
9 => Region::Australia,
_ => unreachable!(),
}
}
}

View File

@ -115,6 +115,7 @@ mod tests {
},
output_processors::OutData,
runner::SimulationRunner,
streaming::io::tests::region_from_index,
warding::SimulationState,
};
@ -160,43 +161,19 @@ mod tests {
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {

View File

@ -116,6 +116,7 @@ mod tests {
},
output_processors::OutData,
runner::SimulationRunner,
streaming::io::tests::region_from_index,
warding::SimulationState,
};
@ -156,43 +157,19 @@ mod tests {
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {