From 965d6de967048cfc618870fe841163d625d5501c Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 6 Feb 2025 17:26:12 +0900 Subject: [PATCH] analysis in Rust + update batch.py --- simlib/blendnet-sims/Cargo.toml | 2 + simlib/blendnet-sims/config/blendnet.json | 98 ++++- simlib/blendnet-sims/scripts/batch.py | 260 +++++++++++++ simlib/blendnet-sims/scripts/emission.py | 21 +- simlib/blendnet-sims/scripts/latency.py | 112 ------ simlib/blendnet-sims/scripts/mixlog.py | 16 +- .../src/analysis/conn_latency.rs | 89 +++++ .../src/analysis/message_history.rs | 136 +++++++ .../src/analysis/message_latency.rs | 114 ++++++ simlib/blendnet-sims/src/analysis/mod.rs | 3 + simlib/blendnet-sims/src/main.rs | 168 ++++++++- simlib/blendnet-sims/src/node/blend/log.rs | 24 ++ .../blendnet-sims/src/node/blend/message.rs | 55 +++ simlib/blendnet-sims/src/node/blend/mod.rs | 343 +++++++++++------- .../blendnet-sims/src/node/blend/scheduler.rs | 10 +- simlib/blendnet-sims/src/node/blend/state.rs | 2 + .../blendnet-sims/src/node/blend/topology.rs | 32 +- simlib/netrunner/src/network/mod.rs | 6 +- simlib/netrunner/src/network/regions.rs | 48 ++- simlib/netrunner/src/streaming/io.rs | 48 +-- .../src/streaming/runtime_subscriber.rs | 31 +- .../src/streaming/settings_subscriber.rs | 31 +- 22 files changed, 1235 insertions(+), 414 deletions(-) create mode 100644 simlib/blendnet-sims/scripts/batch.py delete mode 100755 simlib/blendnet-sims/scripts/latency.py create mode 100644 simlib/blendnet-sims/src/analysis/conn_latency.rs create mode 100644 simlib/blendnet-sims/src/analysis/message_history.rs create mode 100644 simlib/blendnet-sims/src/analysis/message_latency.rs create mode 100644 simlib/blendnet-sims/src/analysis/mod.rs create mode 100644 simlib/blendnet-sims/src/node/blend/log.rs diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 9729a7a..6c3e8a8 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -27,3 +27,5 @@ sha2 = "0.10" uuid = { version = "1", features = ["fast-rng", "v4"] } tracing-appender = "0.2" cached = "0.54.0" +polars = "0.46.0" +humantime-serde = "1.1.1" diff --git a/simlib/blendnet-sims/config/blendnet.json b/simlib/blendnet-sims/config/blendnet.json index 9179c44..c03da06 100644 --- a/simlib/blendnet-sims/config/blendnet.json +++ b/simlib/blendnet-sims/config/blendnet.json @@ -1,49 +1,109 @@ { "network_settings": { "network_behaviors": { - "north america:north america": "50ms", - "north america:europe": "100ms", - "north america:asia": "120ms", + "north america west:north america west": "40ms", + "north america west:north america east": "70ms", + "north america west:north america central": "50ms", + "north america west:europe": "150ms", + "north america west:northern europe": "170ms", + "north america west:east asia": "180ms", + "north america west:southeast asia": "200ms", + "north america west:australia": "250ms", + "north america east:north america west": "70ms", + "north america east:north america east": "40ms", + "north america east:north america central": "50ms", + "north america east:europe": "130ms", + "north america east:northern europe": "140ms", + "north america east:east asia": "250ms", + "north america east:southeast asia": "300ms", + "north america east:australia": "230ms", + "north america central:north america west": "50ms", + "north america central:north america east": "50ms", + "north america central:north america central": "20ms", + "north america central:europe": "140ms", + "north america central:northern europe": "150ms", + "north america central:east asia": "200ms", + "north america central:southeast asia": "280ms", + "north america central:australia": "220ms", + "europe:north america west": "150ms", + "europe:north america east": "130ms", + "europe:north america central": "140ms", "europe:europe": "50ms", - "europe:asia": "100ms", - "europe:north america": "120ms", - "asia:north america": "100ms", - "asia:europe": "120ms", - "asia:asia": "40ms" + "europe:northern europe": "60ms", + "europe:east asia": "300ms", + "europe:southeast asia": "300ms", + "europe:australia": "300ms", + "northern europe:north america west": "170ms", + "northern europe:north america east": "140ms", + "northern europe:north america central": "150ms", + "northern europe:europe": "60ms", + "northern europe:northern europe": "30ms", + "northern europe:east asia": "400ms", + "northern europe:southeast asia": "320ms", + "northern europe:australia": "300ms", + "east asia:north america west": "180ms", + "east asia:north america east": "250ms", + "east asia:north america central": "200ms", + "east asia:europe": "300ms", + "east asia:northern europe": "400ms", + "east asia:east asia": "50ms", + "east asia:southeast asia": "90ms", + "east asia:australia": "150ms", + "southeast asia:north america west": "200ms", + "southeast asia:north america east": "300ms", + "southeast asia:north america central": "280ms", + "southeast asia:europe": "300ms", + "southeast asia:northern europe": "320ms", + "southeast asia:east asia": "90ms", + "southeast asia:southeast asia": "40ms", + "southeast asia:australia": "150ms", + "australia:north america west": "250ms", + "australia:north america east": "230ms", + "australia:north america central": "220ms", + "australia:europe": "300ms", + "australia:northern europe": "300ms", + "australia:east asia": "150ms", + "australia:southeast asia": "150ms", + "australia:australia": "50ms" }, "regions": { - "north america": 0.4, - "europe": 0.4, - "asia": 0.3 + "north america west": 0.06, + "north america east": 0.15, + "north america central": 0.02, + "europe": 0.47, + "northern europe": 0.10, + "east asia": 0.10, + "southeast asia": 0.07, + "australia": 0.03 } }, "node_settings": { "timeout": "1000ms" }, - "step_time": "40ms", + "step_time": "10ms", "runner_settings": "Sync", "stream_settings": { "path": "test.json" }, - "node_count": 10, + "node_count": 100, "seed": 0, "record_settings": {}, "wards": [ { - "sum": 10 + "sum": 100 } ], "connected_peers_count": 4, "data_message_lottery_interval": "20s", - "stake_proportion": 1.0, - "epoch_duration": "432000s", - "slot_duration": "20s", - "slots_per_epoch": 21600, + "stake_proportion": 0.0, + "epoch_duration": "200s", + "slot_duration": "1s", + "slots_per_epoch": 200, "number_of_hops": 2, "persistent_transmission": { "max_emission_frequency": 1.0, "drop_message_probability": 0.0 }, "number_of_blend_layers": 2, - "max_delay_seconds": 10 + "max_delay_seconds": 5 } diff --git a/simlib/blendnet-sims/scripts/batch.py b/simlib/blendnet-sims/scripts/batch.py new file mode 100644 index 0000000..ecd364c --- /dev/null +++ b/simlib/blendnet-sims/scripts/batch.py @@ -0,0 +1,260 @@ +# !/usr/bin/env python +import argparse +import csv +import json +import os +import subprocess +from collections import OrderedDict + +import mixlog + +SIM_APP = "../../target/release/blendnet-sims" + + +def bandwidth_result(log_path: str, step_duration_ms: int) -> dict[str, float]: + max_step_id = 0 + for topic, json_msg in mixlog.get_input_stream(log_path): + if topic == "MessageEvent": + max_step_id = max(max_step_id, json_msg["step_id"]) + + with open(log_path, "r") as file: + for line in file: + if "total_outbound_bandwidth" in line: + line = line[line.find("{") :] + line = line.replace("{ ", '{"') + line = line.replace(": ", '": ') + line = line.replace(", ", ', "') + record = json.loads(line) + + elapsed = (max_step_id * step_duration_ms) / 1000.0 + return { + "min": float(record["min_node_total_bandwidth"]) / elapsed, + "avg": float(record["avg_node_total_bandwidth"]) / elapsed, + "max": float(record["max_node_total_bandwidth"]) / elapsed, + } + + raise Exception("No bandwidth data found in log file") + + +def topology_result(log_path: str) -> dict[str, int]: + for topic, json_msg in mixlog.get_input_stream(log_path): + if topic == "Topology": + return json_msg + raise Exception("No topology found in log file") + + +def build_argument_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.") + parser.add_argument( + "--step-duration", + type=int, + help="Duration (in ms) of each step in the simulation.", + ) + parser.add_argument( + "--params-file", + type=str, + help="A CSV file that contains all parameter sets", + ) + parser.add_argument( + "--orig-config-file", + type=str, + help="An original blendnet config JSON file that will be modified as specified in `params_file`", + ) + parser.add_argument( + "--outdir", + type=str, + help="A output directory to be created", + ) + parser.add_argument( + "--skip-run", + action="store_true", + help="Skip running the simulations and only analyze the logs", + ) + return parser + + +if __name__ == "__main__": + argument_parser = build_argument_parser() + args = argument_parser.parse_args() + + # Read the params CSV file + param_sets = [] + param_set_header = [] + with open(args.params_file, mode="r") as csvfile: + param_set_header = csvfile.readline().strip().split(",") + csvfile.seek(0) # Reset file pointer to the beginning after reading the header + reader = csv.DictReader(csvfile, delimiter=",") + param_sets = list(reader) + + # Read the original blendnet json config file + with open(args.orig_config_file, "r") as jsonfile: + original_json = json.load(jsonfile) + + # Directory to save modified JSON files + modified_configs_dir = os.path.join(args.outdir, "modified_configs") + os.makedirs(modified_configs_dir, exist_ok=True) + + # Modify and save JSON files for each row in CSV + config_paths = [] + for idx, param_set in enumerate(param_sets): + output_path = os.path.join(modified_configs_dir, f"{idx}.json") + config_paths.append(output_path) + + if args.skip_run: + continue + + modified_json = OrderedDict(original_json) # Preserve original field order + + # Apply modifications + modified_json["network_settings"]["regions"]["north america west"] = 0.06 + modified_json["network_settings"]["regions"]["north america east"] = 0.15 + modified_json["network_settings"]["regions"]["north america central"] = 0.02 + modified_json["network_settings"]["regions"]["europe"] = 0.47 + modified_json["network_settings"]["regions"]["northern europe"] = 0.10 + modified_json["network_settings"]["regions"]["east asia"] = 0.10 + modified_json["network_settings"]["regions"]["southeast asia"] = 0.07 + modified_json["network_settings"]["regions"]["australia"] = 0.03 + modified_json["step_time"] = f"{args.step_duration}ms" + modified_json["node_count"] = int(param_set["network_size"]) + modified_json["wards"][0]["sum"] = 1000 + modified_json["connected_peers_count"] = int(param_set["peering_degree"]) + modified_json["data_message_lottery_interval"] = "20s" + modified_json["stake_proportion"] = 0.0 + modified_json["persistent_transmission"]["max_emission_frequency"] = 1.0 + modified_json["persistent_transmission"]["drop_message_probability"] = 0.0 + modified_json["epoch_duration"] = ( + f"{int(param_set['cover_slots_per_epoch']) * int(param_set['cover_slot_duration'])}s" + ) + modified_json["slots_per_epoch"] = int(param_set["cover_slots_per_epoch"]) + modified_json["slot_duration"] = f"{param_set['cover_slot_duration']}s" + modified_json["max_delay_seconds"] = int(param_set["max_temporal_delay"]) + modified_json["number_of_hops"] = int(param_set["blend_hops"]) + modified_json["number_of_blend_layers"] = int(param_set["blend_hops"]) + + # Save modified JSON + with open(output_path, "w") as outfile: + json.dump(modified_json, outfile, indent=2) + print("Saved modified JSON to:", output_path) + + # Directory to save logs + log_dir = os.path.join(args.outdir, "logs") + os.makedirs(log_dir, exist_ok=True) + + log_paths = [] + for idx, config_path in enumerate(config_paths): + log_path = f"{log_dir}/{idx}.log" + log_paths.append(log_path) + + if args.skip_run: + continue + + with open(log_path, "w") as log_file: + print( + f"Running simulation-{idx}: {log_file.name} with config: {config_path}" + ) + subprocess.run( + [ + SIM_APP, + "run", + "--input-settings", + config_path, + ], + stdout=log_file, + ) + print(f"Simulation-{idx} completed: {log_file.name}") + + print("Analyzing logs...") + print("=================") + + with open(os.path.join(args.outdir, "output.csv"), "w", newline="") as file: + print(f"Writing results to: {file.name}") + csv_writer = csv.writer(file) + csv_writer.writerow( + param_set_header + + [ + "network_diameter", + "msg_count", + "min_latency_sec", + "25th_latency_sec", + "avg_latency_sec", + "median_latency_sec", + "75th_latency_sec", + "max_latency_sec", + "min_latency_msg_id", + "max_latency_msg_id", + "conn_latency_count", + "min_conn_latency_sec", + "25th_conn_latency_sec", + "avg_conn_latency_sec", + "med_conn_latency_sec", + "75th_conn_latency_sec", + "max_conn_latency_sec", + "min_bandwidth_kbps", + "avg_bandwidth_kbps", + "max_bandwidth_kbps", + ] + ) + + for idx, log_path in enumerate(log_paths): + csv_row = [] + csv_row.extend([param_sets[idx][key] for key in param_set_header]) + + csv_row.append(topology_result(log_path)["diameter"]) + + result = subprocess.run( + [ + SIM_APP, + "analyze", + "message-latency", + "--log-file", + log_path, + "--step-duration", + f"{args.step_duration}ms", + ], + capture_output=True, + text=True, + ) + assert result.returncode == 0 + latency_analysis = json.loads(result.stdout) + + csv_row.append(latency_analysis["count"]) + csv_row.append(float(latency_analysis["min"]) / 1000.0) + csv_row.append(float(latency_analysis["q1"]) / 1000.0) + csv_row.append(float(latency_analysis["avg"]) / 1000.0) + csv_row.append(float(latency_analysis["med"]) / 1000.0) + csv_row.append(float(latency_analysis["q3"]) / 1000.0) + csv_row.append(float(latency_analysis["max"]) / 1000.0) + csv_row.append(latency_analysis["min_payload_id"]) + csv_row.append(latency_analysis["max_payload_id"]) + + result = subprocess.run( + [ + SIM_APP, + "analyze", + "connection-latency", + "--log-file", + log_path, + "--step-duration", + f"{args.step_duration}ms", + ], + capture_output=True, + text=True, + ) + assert result.returncode == 0 + result = json.loads(result.stdout) + csv_row.append(result["count"]) + csv_row.append(float(result["min"]) / 1000.0) + csv_row.append(float(result["q1"]) / 1000.0) + csv_row.append(float(result["avg"]) / 1000.0) + csv_row.append(float(result["med"]) / 1000.0) + csv_row.append(float(result["q3"]) / 1000.0) + csv_row.append(float(result["max"]) / 1000.0) + + bandwidth_res = bandwidth_result(log_path, args.step_duration) + csv_row.append(bandwidth_res["min"] * 8 / 1000.0) + csv_row.append(bandwidth_res["avg"] * 8 / 1000.0) + csv_row.append(bandwidth_res["max"] * 8 / 1000.0) + + csv_writer.writerow(csv_row) + + print(f"The outputs have been successfully written to {file.name}") diff --git a/simlib/blendnet-sims/scripts/emission.py b/simlib/blendnet-sims/scripts/emission.py index 6caa2e8..fa0be5e 100644 --- a/simlib/blendnet-sims/scripts/emission.py +++ b/simlib/blendnet-sims/scripts/emission.py @@ -1,16 +1,14 @@ import argparse -import json from collections.abc import Iterable from typing import Any import matplotlib import matplotlib.pyplot as plt +import mixlog import pandas as pd -import mixlog - -def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None: +def plot_emissions(input_stream: Iterable[tuple[str, dict]], plot_path: str) -> None: df = pd.DataFrame(emission_records(input_stream)) plt.figure(figsize=(12, 6)) @@ -24,19 +22,8 @@ def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None: plt.show() -def emission_records(input_stream: Iterable[str]) -> list[Any]: - records = [] - - for line in input_stream: - try: - record = json.loads(line) - except json.JSONDecodeError: - continue - - if "emission_type" in record: - records.append(record) - - return records +def emission_records(input_stream: Iterable[tuple[str, dict]]) -> list[Any]: + return [record for _, record in filter(lambda x: x[0] == "Emission", input_stream)] if __name__ == "__main__": diff --git a/simlib/blendnet-sims/scripts/latency.py b/simlib/blendnet-sims/scripts/latency.py deleted file mode 100755 index 35551f9..0000000 --- a/simlib/blendnet-sims/scripts/latency.py +++ /dev/null @@ -1,112 +0,0 @@ -# !/usr/bin/env python -import argparse -import json -import statistics -from collections.abc import Iterable -from typing import Dict, Optional - -import mixlog - - -class Message: - def __init__(self, message_id: str, step_a: Optional[int]): - self.id = message_id - self.step_a = int(step_a) if step_a is not None else None - self.step_b = None - - def __hash__(self): - return self.id - - def __repr__(self): - return f"[{self.id}] {self.step_a} -> {self.step_b}" - - @property - def latency(self) -> Optional[int]: - if self.step_a is not None and self.step_b is not None: - return abs(self.step_a - self.step_b) - - -MessageStorage = Dict[str, Message] - - -def compute_results(message_storage: MessageStorage, step_duration: int) -> str: - latencies = [message_record.latency for message_record in message_storage.values()] - valued_latencies = [latency for latency in latencies if latency is not None] - incomplete_latencies = sum((1 for latency in latencies if latency is None)) - - total_messages = len(latencies) - total_messages_full_latency = len(valued_latencies) - total_messages_incomplete_latency = incomplete_latencies - latency_average_steps = statistics.mean(valued_latencies) - latency_average_ms = "{:.2f}ms".format(latency_average_steps * step_duration) - latency_median_steps = statistics.median(valued_latencies) - latency_median_ms = "{:.2f}ms".format(latency_median_steps * step_duration) - max_latency_steps = max(valued_latencies) - max_latency_ms = "{:.2f}ms".format(max_latency_steps * step_duration) - min_latency_steps = min(valued_latencies) - min_latency_ms = "{:.2f}ms".format(min_latency_steps * step_duration) - - return f"""[Results] -- Total messages: {total_messages} - - Full latencies: {total_messages_full_latency} - - Incomplete latencies: {total_messages_incomplete_latency} -- Averages - - Steps: {latency_average_steps} - - Duration: {latency_average_ms} -- Median - - Steps: {latency_median_steps} - - Duration: {latency_median_ms} -- Max - - Steps: {max_latency_steps} - - Duration: {max_latency_ms} -- Min - - Steps: {min_latency_steps} - - Duration: {min_latency_ms}""" - - -def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage: - storage: MessageStorage = {} - - for record in record_stream: - try: - json_record = json.loads(record) - except json.decoder.JSONDecodeError: - continue - - if (payload_id := json_record.get("payload_id")) is None: - continue - step_id = json_record["step_id"] - - if (stored_message := storage.get(payload_id)) is None: - storage[payload_id] = Message(payload_id, step_id) - else: - stored_message.step_b = step_id - - return storage - - -def build_argument_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.") - parser.add_argument( - "--step-duration", - type=int, - default=100, - help="Duration (in ms) of each step in the simulation.", - ) - parser.add_argument( - "input_file", - nargs="?", - help="The file to parse. If not provided, input will be read from stdin.", - ) - return parser - - -if __name__ == "__main__": - argument_parser = build_argument_parser() - arguments = argument_parser.parse_args() - - input_stream = mixlog.get_input_stream(arguments.input_file) - messages = parse_record_stream(input_stream) - - results = compute_results(messages, arguments.step_duration) - print(results) diff --git a/simlib/blendnet-sims/scripts/mixlog.py b/simlib/blendnet-sims/scripts/mixlog.py index c76a613..a21bc31 100644 --- a/simlib/blendnet-sims/scripts/mixlog.py +++ b/simlib/blendnet-sims/scripts/mixlog.py @@ -1,12 +1,20 @@ +import json import sys from collections.abc import Iterable from typing import Optional +TOPIC_LOG_INDICATOR = '{"topic":' -def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]: + +def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[tuple[str, dict]]: for record in record_stream: - bracket_pos = record.rfind("{") - yield record[bracket_pos:] + topic_idx = record.find(TOPIC_LOG_INDICATOR) + if topic_idx == -1: + continue + + # Split the line into 2 parts: topic and JSON message + log = json.loads(record[topic_idx:].strip()) + yield (log["topic"], log["message"]) def get_pipe_stream() -> Iterable[str]: @@ -18,7 +26,7 @@ def get_file_stream(input_filename) -> Iterable[str]: yield from file -def get_input_stream(input_filename: Optional[str]) -> Iterable[str]: +def get_input_stream(input_filename: Optional[str]) -> Iterable[tuple[str, dict]]: stream = ( get_file_stream(input_filename) if input_filename is not None diff --git a/simlib/blendnet-sims/src/analysis/conn_latency.rs b/simlib/blendnet-sims/src/analysis/conn_latency.rs new file mode 100644 index 0000000..7d364c2 --- /dev/null +++ b/simlib/blendnet-sims/src/analysis/conn_latency.rs @@ -0,0 +1,89 @@ +use std::{error::Error, ops::Mul, path::PathBuf, time::Duration}; + +use std::{ + collections::HashMap, + fs::File, + io::{BufRead, BufReader}, +}; + +use netrunner::node::NodeId; +use polars::prelude::NamedFrom; +use polars::series::Series; +use serde::{Deserialize, Serialize}; + +use crate::node::blend::log::TopicLog; +use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId}; + +use super::message_latency::quantile; + +pub fn analyze_connection_latency( + log_file: PathBuf, + step_duration: Duration, +) -> Result<(), Box> { + let file = File::open(log_file)?; + let reader = BufReader::new(file); + + let mut sent_events: HashMap<(PayloadId, NodeId, NodeId), usize> = HashMap::new(); + let mut latencies_ms: Vec = Vec::new(); + + for line in reader.lines() { + let line = line?; + if let Ok(topic_log) = serde_json::from_str::>(&line) { + assert_eq!(topic_log.topic, "MessageEvent"); + let event = topic_log.message; + match event.event_type { + MessageEventType::NetworkSent { to } => { + assert_eq!( + sent_events.insert((event.payload_id, event.node_id, to), event.step_id), + None + ); + } + MessageEventType::NetworkReceived { from } => { + let sent_step_id = sent_events + .remove(&(event.payload_id, from, event.node_id)) + .unwrap(); + let latency = step_duration + .mul((event.step_id - sent_step_id).try_into().unwrap()) + .as_millis() + .try_into() + .unwrap(); + latencies_ms.push(latency); + } + _ => { + continue; + } + } + } + } + + let series = Series::new("latencies".into(), latencies_ms); + let series = Output::new(&series); + println!("{}", serde_json::to_string(&series).unwrap()); + + Ok(()) +} + +#[derive(Serialize, Deserialize, Debug)] +struct Output { + count: usize, + min: i64, + q1: f64, + avg: f64, + med: f64, + q3: f64, + max: i64, +} + +impl Output { + fn new(series: &Series) -> Self { + Self { + count: series.len(), + min: series.min::().unwrap().unwrap(), + q1: quantile(series, 0.25), + avg: series.mean().unwrap(), + med: series.median().unwrap(), + q3: quantile(series, 0.75), + max: series.max::().unwrap().unwrap(), + } + } +} diff --git a/simlib/blendnet-sims/src/analysis/message_history.rs b/simlib/blendnet-sims/src/analysis/message_history.rs new file mode 100644 index 0000000..bf47964 --- /dev/null +++ b/simlib/blendnet-sims/src/analysis/message_history.rs @@ -0,0 +1,136 @@ +use std::{ + error::Error, + fs::File, + io::{BufRead, BufReader}, + ops::{Add, Mul}, + path::PathBuf, + time::Duration, +}; + +use netrunner::node::NodeId; +use serde::{Deserialize, Serialize}; + +use crate::node::blend::{ + log::TopicLog, + message::{MessageEvent, MessageEventType, PayloadId}, +}; + +pub fn analyze_message_history( + log_file: PathBuf, + step_duration: Duration, + payload_id: PayloadId, +) -> Result<(), Box> { + let file = File::open(log_file)?; + let reader = BufReader::new(file); + + let mut history = Vec::new(); + let mut target_node_id: Option = None; + let mut target_event: Option = None; + + let lines: Vec = reader.lines().collect::>()?; + for line in lines.iter().rev() { + if let Ok(topic_log) = serde_json::from_str::>(line) { + assert_eq!(topic_log.topic, "MessageEvent"); + let event = topic_log.message; + if event.payload_id == payload_id + && (target_node_id.is_none() || target_node_id.unwrap() == event.node_id) + && (target_event.is_none() || target_event.as_ref().unwrap() == &event.event_type) + { + match event.event_type { + MessageEventType::FullyUnwrapped => { + assert!(history.is_empty()); + assert!(target_node_id.is_none()); + target_node_id = Some(event.node_id); + history.push(event); + } + MessageEventType::Created => { + assert!(!history.is_empty()); + assert!(target_node_id.is_some()); + history.push(event); + } + MessageEventType::PersistentTransmissionScheduled { .. } => { + assert!(target_node_id.is_some()); + assert!(matches!( + history.last().unwrap().event_type, + MessageEventType::PersistentTransmissionReleased { .. } + )); + history.push(event); + } + MessageEventType::PersistentTransmissionReleased => { + assert!(target_node_id.is_some()); + history.push(event); + } + MessageEventType::TemporalProcessorScheduled { .. } => { + assert!(target_node_id.is_some()); + assert!(matches!( + history.last().unwrap().event_type, + MessageEventType::TemporalProcessorReleased { .. } + )); + history.push(event); + } + MessageEventType::TemporalProcessorReleased => { + assert!(target_node_id.is_some()); + history.push(event); + } + MessageEventType::NetworkReceived { from } => { + assert!(!history.is_empty()); + assert!(target_node_id.is_some()); + assert_ne!(target_node_id.unwrap(), from); + target_node_id = Some(from); + target_event = Some(MessageEventType::NetworkSent { to: event.node_id }); + history.push(event); + } + MessageEventType::NetworkSent { .. } => { + assert!(!history.is_empty()); + assert!(target_node_id.is_some()); + if target_event.is_none() + || target_event.as_ref().unwrap() != &event.event_type + { + continue; + } + target_event = None; + history.push(event); + } + } + } + } + } + + let mut history_with_durations: Vec = Vec::new(); + let (_, total_duration) = history.iter().rev().fold( + (None, Duration::ZERO), + |(prev_step_id, total_duration): (Option, Duration), event| { + let duration = match prev_step_id { + Some(prev_step_id) => { + step_duration.mul((event.step_id - prev_step_id).try_into().unwrap()) + } + None => Duration::ZERO, + }; + history_with_durations.push(MessageEventWithDuration { + event: event.clone(), + duration, + }); + (Some(event.step_id), total_duration.add(duration)) + }, + ); + let output = Output { + history: history_with_durations, + total_duration, + }; + println!("{}", serde_json::to_string(&output).unwrap()); + Ok(()) +} + +#[derive(Serialize, Deserialize)] +struct Output { + history: Vec, + #[serde(with = "humantime_serde")] + total_duration: Duration, +} + +#[derive(Serialize, Deserialize)] +struct MessageEventWithDuration { + event: MessageEvent, + #[serde(with = "humantime_serde")] + duration: Duration, +} diff --git a/simlib/blendnet-sims/src/analysis/message_latency.rs b/simlib/blendnet-sims/src/analysis/message_latency.rs new file mode 100644 index 0000000..74a3c3d --- /dev/null +++ b/simlib/blendnet-sims/src/analysis/message_latency.rs @@ -0,0 +1,114 @@ +use core::panic; +use std::{error::Error, ops::Mul, path::PathBuf, time::Duration}; + +use std::{ + collections::HashMap, + fs::File, + io::{BufRead, BufReader}, +}; + +use polars::prelude::{AnyValue, NamedFrom, QuantileMethod, Scalar}; +use polars::series::Series; +use serde::{Deserialize, Serialize}; + +use crate::node::blend::log::TopicLog; +use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId}; + +pub fn analyze_message_latency( + log_file: PathBuf, + step_duration: Duration, +) -> Result<(), Box> { + let file = File::open(log_file)?; + let reader = BufReader::new(file); + + let mut messages: HashMap = HashMap::new(); + let mut latencies_ms: Vec = Vec::new(); + let mut latency_to_message: HashMap = HashMap::new(); + + for line in reader.lines() { + let line = line?; + if let Ok(topic_log) = serde_json::from_str::>(&line) { + assert_eq!(topic_log.topic, "MessageEvent"); + let event = topic_log.message; + match event.event_type { + MessageEventType::Created => { + assert_eq!(messages.insert(event.payload_id, event.step_id), None); + } + MessageEventType::FullyUnwrapped => match messages.remove(&event.payload_id) { + Some(created_step_id) => { + let latency = step_duration + .mul((event.step_id - created_step_id).try_into().unwrap()) + .as_millis() + .try_into() + .unwrap(); + latencies_ms.push(latency); + latency_to_message.insert(latency, event.payload_id); + } + None => { + panic!( + "FullyUnwrapped event without Created event: {}", + event.payload_id + ); + } + }, + _ => { + continue; + } + } + } + } + + let series = Series::new("latencies".into(), latencies_ms); + let series = Output::new(&series, &latency_to_message); + println!("{}", serde_json::to_string(&series).unwrap()); + + Ok(()) +} + +#[derive(Serialize, Deserialize, Debug)] +struct Output { + count: usize, + min: i64, + min_payload_id: PayloadId, + q1: f64, + avg: f64, + med: f64, + q3: f64, + max: i64, + max_payload_id: PayloadId, +} + +impl Output { + fn new(series: &Series, latency_to_message: &HashMap) -> Self { + let min = series.min::().unwrap().unwrap(); + let min_payload_id = latency_to_message.get(&min).unwrap().clone(); + let max = series.max::().unwrap().unwrap(); + let max_payload_id = latency_to_message.get(&max).unwrap().clone(); + Self { + count: series.len(), + min, + min_payload_id, + q1: quantile(series, 0.25), + avg: series.mean().unwrap(), + med: series.median().unwrap(), + q3: quantile(series, 0.75), + max, + max_payload_id, + } + } +} + +pub(crate) fn quantile(series: &Series, quantile: f64) -> f64 { + f64_from_scalar( + &series + .quantile_reduce(quantile, QuantileMethod::Linear) + .unwrap(), + ) +} + +fn f64_from_scalar(scalar: &Scalar) -> f64 { + match scalar.value() { + AnyValue::Float64(value) => *value, + _ => panic!("Expected f64"), + } +} diff --git a/simlib/blendnet-sims/src/analysis/mod.rs b/simlib/blendnet-sims/src/analysis/mod.rs new file mode 100644 index 0000000..4d909d2 --- /dev/null +++ b/simlib/blendnet-sims/src/analysis/mod.rs @@ -0,0 +1,3 @@ +pub mod conn_latency; +pub mod message_history; +pub mod message_latency; diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index f825a5e..646b928 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -6,10 +6,14 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; // crates use crate::node::blend::state::{BlendnodeRecord, BlendnodeState}; -use crate::node::blend::{BlendMessage, BlendnodeSettings}; +use crate::node::blend::{BlendnodeSettings, SimMessage}; +use analysis::conn_latency::analyze_connection_latency; +use analysis::message_history::analyze_message_history; +use analysis::message_latency::analyze_message_latency; use anyhow::Ok; -use clap::Parser; +use clap::{Parser, Subcommand}; use crossbeam::channel; +use multiaddr::Multiaddr; use netrunner::network::behaviour::create_behaviours; use netrunner::network::regions::{create_regions, RegionsData}; use netrunner::network::{InMemoryNetworkInterface, Network, PayloadSize}; @@ -17,6 +21,7 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; +use node::blend::message::PayloadId; use node::blend::topology::Topology; use nomos_blend::cover_traffic::CoverTrafficSettings; use nomos_blend::message_blend::{ @@ -33,10 +38,38 @@ use crate::node::blend::BlendNode; use crate::settings::SimSettings; use netrunner::{runner::SimulationRunner, settings::SimulationSettings}; +pub mod analysis; mod log; mod node; mod settings; +#[derive(Parser)] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Run the simulation + Run(SimulationApp), + /// Analyze the simulation results + Analyze { + #[command(subcommand)] + command: AnalyzeCommands, + }, +} + +#[derive(Subcommand)] +enum AnalyzeCommands { + /// Analyze the latency of the messages fully unwrapped + MessageLatency(MessageLatencyApp), + /// Analyze the history of a message + MessageHistory(MessageHistoryApp), + /// Analyze connection latency + ConnectionLatency(ConnectionLatencyApp), +} + /// Main simulation wrapper /// Pipes together the cli arguments with the execution #[derive(Parser)] @@ -85,13 +118,25 @@ impl SimulationApp { &mut rng, &settings.simulation_settings.network_settings, ); + log!( + "Regions", + regions + .iter() + .map(|(region, node_ids)| (*region, node_ids.len())) + .collect::>() + ); + let behaviours = create_behaviours(&settings.simulation_settings.network_settings); let regions_data = RegionsData::new(regions, behaviours); - let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); + let network = Arc::new(Mutex::new(Network::::new( + regions_data.clone(), + seed, + ))); let topology = Topology::new(&node_ids, settings.connected_peers_count, &mut rng); log_topology(&topology); + log_conn_latency_distribution(&topology.conn_latency_distribution(®ions_data)); let nodes: Vec<_> = node_ids .iter() @@ -126,7 +171,14 @@ impl SimulationApp { slots_per_epoch: settings.slots_per_epoch, network_size: node_ids.len(), }, - membership: node_ids.iter().map(|&id| id.into()).collect(), + membership: node_ids + .iter() + .map(|&id| nomos_blend::membership::Node { + id, + address: Multiaddr::empty(), + public_key: id.into(), + }) + .collect(), }, ) }) @@ -141,7 +193,7 @@ impl SimulationApp { fn create_boxed_blendnode( node_id: NodeId, - network: &mut Network, + network: &mut Network, simulation_settings: SimulationSettings, no_netcap: bool, blendnode_settings: BlendnodeSettings, @@ -246,11 +298,13 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { } fn log_topology(topology: &Topology) { - let log = TopologyLog { - topology: topology.to_node_indices(), - diameter: topology.diameter(), - }; - tracing::info!("Topology: {}", serde_json::to_string(&log).unwrap()); + log!( + "Topology", + TopologyLog { + topology: topology.to_node_indices(), + diameter: topology.diameter(), + } + ); } #[derive(Debug, Serialize, Deserialize)] @@ -259,14 +313,88 @@ struct TopologyLog { diameter: usize, } -fn main() -> anyhow::Result<()> { - let app: SimulationApp = SimulationApp::parse(); - let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); - - if let Err(e) = app.run() { - tracing::error!("error: {}", e); - drop(maybe_guard); - std::process::exit(1); - } - Ok(()) +fn log_conn_latency_distribution(distribution: &HashMap) { + log!( + "ConnLatencyDistribution", + ConnLatencyDistributionLog { + num_links: distribution.values().sum(), + distribution: distribution + .iter() + .map(|(latency, count)| (latency.as_millis(), *count)) + .collect::>(), + } + ); +} + +#[derive(Debug, Serialize, Deserialize)] +struct ConnLatencyDistributionLog { + num_links: usize, + distribution: HashMap, +} + +#[derive(Parser)] +struct MessageLatencyApp { + #[clap(long, short)] + log_file: PathBuf, + #[clap(long, short, value_parser = humantime::parse_duration)] + step_duration: Duration, +} + +#[derive(Parser)] +struct MessageHistoryApp { + #[clap(long, short)] + log_file: PathBuf, + #[clap(long, short, value_parser = humantime::parse_duration)] + step_duration: Duration, + #[clap(long, short)] + payload_id: PayloadId, +} + +#[derive(Parser)] +struct ConnectionLatencyApp { + #[clap(long, short)] + log_file: PathBuf, + #[clap(long, short, value_parser = humantime::parse_duration)] + step_duration: Duration, +} + +fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + match cli.command { + Commands::Run(app) => { + let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); + + if let Err(e) = app.run() { + tracing::error!("error: {}", e); + drop(maybe_guard); + std::process::exit(1); + } + Ok(()) + } + Commands::Analyze { command } => match command { + AnalyzeCommands::MessageLatency(app) => { + if let Err(e) = analyze_message_latency(app.log_file, app.step_duration) { + tracing::error!("error: {}", e); + std::process::exit(1); + } + Ok(()) + } + AnalyzeCommands::MessageHistory(app) => { + if let Err(e) = + analyze_message_history(app.log_file, app.step_duration, app.payload_id) + { + tracing::error!("error: {}", e); + std::process::exit(1); + } + Ok(()) + } + AnalyzeCommands::ConnectionLatency(app) => { + if let Err(e) = analyze_connection_latency(app.log_file, app.step_duration) { + tracing::error!("error: {}", e); + std::process::exit(1); + } + Ok(()) + } + }, + } } diff --git a/simlib/blendnet-sims/src/node/blend/log.rs b/simlib/blendnet-sims/src/node/blend/log.rs new file mode 100644 index 0000000..b1f5bed --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/log.rs @@ -0,0 +1,24 @@ +use serde::{Deserialize, Serialize}; + +#[macro_export] +macro_rules! log { + ($topic:expr, $msg:expr) => { + println!( + "{}", + serde_json::to_string(&$crate::node::blend::log::TopicLog { + topic: $topic.to_string(), + message: $msg + }) + .unwrap() + ); + }; +} + +#[derive(Serialize, Deserialize)] +pub struct TopicLog +where + M: 'static, +{ + pub topic: String, + pub message: M, +} diff --git a/simlib/blendnet-sims/src/node/blend/message.rs b/simlib/blendnet-sims/src/node/blend/message.rs index 4a30a7d..fb760f3 100644 --- a/simlib/blendnet-sims/src/node/blend/message.rs +++ b/simlib/blendnet-sims/src/node/blend/message.rs @@ -1,3 +1,6 @@ +use netrunner::node::NodeId; +use serde::Deserialize; +use serde::Serialize; use uuid::Uuid; pub type PayloadId = String; @@ -23,6 +26,58 @@ impl Payload { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MessageEvent { + pub payload_id: PayloadId, + pub step_id: usize, + #[serde(with = "node_id_serde")] + pub node_id: NodeId, + pub event_type: MessageEventType, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum MessageEventType { + Created, + PersistentTransmissionScheduled { + index: usize, + }, + PersistentTransmissionReleased, + TemporalProcessorScheduled { + index: usize, + }, + TemporalProcessorReleased, + NetworkSent { + #[serde(with = "node_id_serde")] + to: NodeId, + }, + NetworkReceived { + #[serde(with = "node_id_serde")] + from: NodeId, + }, + FullyUnwrapped, +} + +mod node_id_serde { + use netrunner::node::{NodeId, NodeIdExt}; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(node_id: &NodeId, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_u64(node_id.index().try_into().unwrap()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + Ok(NodeId::from_index( + u64::deserialize(deserializer)?.try_into().unwrap(), + )) + } +} + #[cfg(test)] mod tests { use super::Payload; diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 6dd5c9e..d6c204b 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -1,6 +1,8 @@ pub mod consensus_streams; +#[macro_use] +pub mod log; pub mod lottery; -mod message; +pub mod message; pub mod scheduler; pub mod state; pub mod stream_wrapper; @@ -11,41 +13,44 @@ use cached::{Cached, TimedCache}; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; -use message::{Payload, PayloadId}; -use multiaddr::Multiaddr; +use message::{MessageEvent, MessageEventType, Payload}; use netrunner::network::NetworkMessage; -use netrunner::node::{Node, NodeId, NodeIdExt}; +use netrunner::node::{Node, NodeId}; use netrunner::{ network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, warding::WardCondition, }; +use nomos_blend::message_blend::temporal::{TemporalProcessorExt, TemporalStream}; use nomos_blend::{ cover_traffic::{CoverTraffic, CoverTrafficSettings}, membership::Membership, - message_blend::{ - crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream, - }, + message_blend::{crypto::CryptographicProcessor, MessageBlendSettings}, persistent_transmission::{ PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, }, BlendOutgoingMessage, }; use nomos_blend_message::mock::MockBlendMessage; +use nomos_blend_message::BlendMessage as _; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; -use scheduler::{Interval, TemporalRelease}; -use serde::{Deserialize, Serialize}; +use scheduler::{Interval, TemporalScheduler}; +use serde::Deserialize; use sha2::{Digest, Sha256}; use state::BlendnodeState; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; #[derive(Debug, Clone)] -pub struct BlendMessage(Vec); +pub struct SimMessage(Vec); -impl PayloadSize for BlendMessage { +impl PayloadSize for SimMessage { fn size_bytes(&self) -> u32 { - 2208 + // payload: 32 KiB + // header encryption overhead: 133 bytes = 48 + 17 * max_blend_hops(=5) + // payload encryption overhaed: 16 bytes + // economic data overhead: 8043 bytes + 40960 } } @@ -60,7 +65,12 @@ pub struct BlendnodeSettings { pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, pub cover_traffic_settings: CoverTrafficSettings, - pub membership: Vec<::PublicKey>, + pub membership: Vec< + nomos_blend::membership::Node< + NodeId, + ::PublicKey, + >, + >, } type Sha256Hash = [u8; 32]; @@ -70,30 +80,24 @@ pub struct BlendNode { id: NodeId, state: BlendnodeState, settings: BlendnodeSettings, - network_interface: InMemoryNetworkInterface, + network_interface: InMemoryNetworkInterface, message_cache: TimedCache, data_msg_lottery_update_time_sender: channel::Sender, data_msg_lottery_interval: Interval, data_msg_lottery: StakeLottery, - persistent_sender: channel::Sender>, + persistent_sender: channel::Sender, persistent_update_time_sender: channel::Sender, - persistent_transmission_messages: PersistentTransmissionStream< - CrossbeamReceiverStream>, - ChaCha12Rng, - MockBlendMessage, - Interval, - >, - crypto_processor: CryptographicProcessor, - blend_sender: channel::Sender>, - blend_update_time_sender: channel::Sender, - blend_messages: MessageBlendStream< - CrossbeamReceiverStream>, - ChaCha12Rng, - MockBlendMessage, - TemporalRelease, - >, + persistent_transmission_messages: + PersistentTransmissionStream, ChaCha12Rng, Interval>, + + crypto_processor: CryptographicProcessor, + temporal_sender: channel::Sender, + temporal_update_time_sender: channel::Sender, + temporal_processor_messages: + TemporalStream, TemporalScheduler>, + epoch_update_sender: channel::Sender, slot_update_sender: channel::Sender, cover_traffic: CoverTraffic, @@ -103,7 +107,7 @@ impl BlendNode { pub fn new( id: NodeId, settings: BlendnodeSettings, - network_interface: InMemoryNetworkInterface, + network_interface: InMemoryNetworkInterface, ) -> Self { let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); @@ -120,7 +124,7 @@ impl BlendNode { ); // Init Tier-1: Persistent transmission - let (persistent_sender, persistent_receiver) = channel::unbounded(); + let (persistent_sender, persistent_receiver) = channel::unbounded::(); let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) .persistent_transmission( @@ -132,43 +136,28 @@ impl BlendNode { ), persistent_update_time_receiver, ), + SimMessage(MockBlendMessage::DROP_MESSAGE.to_vec()), ); - // Init Tier-2: message blend - let (blend_sender, blend_receiver) = channel::unbounded(); - let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded(); - let nodes: Vec< - nomos_blend::membership::Node< - ::PublicKey, - >, - > = settings - .membership - .iter() - .map(|&public_key| nomos_blend::membership::Node { - address: Multiaddr::empty(), - public_key, - }) - .collect(); - let membership = Membership::::new(nodes, id.into()); + // Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor + let membership = + Membership::::new(settings.membership.clone(), id.into()); let crypto_processor = CryptographicProcessor::new( settings.message_blend.cryptographic_processor.clone(), membership.clone(), ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), ); - let temporal_release = TemporalRelease::new( - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - blend_update_time_receiver, - ( - 1, - settings.message_blend.temporal_processor.max_delay_seconds, - ), - ); - let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend( - settings.message_blend.clone(), - membership, - temporal_release, - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - ); + let (temporal_sender, temporal_receiver) = channel::unbounded(); + let (temporal_update_time_sender, temporal_update_time_receiver) = channel::unbounded(); + let temporal_processor_messages = CrossbeamReceiverStream::new(temporal_receiver) + .temporal_stream(TemporalScheduler::new( + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + temporal_update_time_receiver, + ( + 0, + settings.message_blend.temporal_processor.max_delay_seconds, + ), + )); // tier 3 cover traffic let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); @@ -194,6 +183,8 @@ impl BlendNode { node_id: id, step_id: 0, num_messages_fully_unwrapped: 0, + cur_num_persistent_scheduled: 0, + cur_num_temporal_scheduled: 0, }, data_msg_lottery_update_time_sender, data_msg_lottery_interval, @@ -202,43 +193,57 @@ impl BlendNode { persistent_update_time_sender, persistent_transmission_messages, crypto_processor, - blend_sender, - blend_update_time_sender, - blend_messages, + temporal_sender, + temporal_update_time_sender, + temporal_processor_messages, epoch_update_sender, slot_update_sender, cover_traffic, } } - fn forward( - &mut self, - message: BlendMessage, - exclude_node: Option, - log: Option, - ) { - for (i, node_id) in self + fn parse_payload(message: &[u8]) -> Payload { + Payload::load(MockBlendMessage::payload(message).unwrap()) + } + + fn forward(&mut self, message: SimMessage, exclude_node: Option) { + let payload_id = Self::parse_payload(&message.0).id(); + for node_id in self .settings .connected_peers .iter() .filter(|&id| Some(*id) != exclude_node) - .enumerate() { - if i == 0 { - if let Some(log) = &log { - Self::log_emission(log); + log!( + "MessageEvent", + MessageEvent { + payload_id: payload_id.clone(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::NetworkSent { to: *node_id } } - } + ); self.network_interface .send_message(*node_id, message.clone()) } self.message_cache.cache_set(Self::sha256(&message.0), ()); } - fn receive(&mut self) -> Vec> { + fn receive(&mut self) -> Vec> { self.network_interface .receive_messages() .into_iter() + .inspect(|msg| { + log!( + "MessageEvent", + MessageEvent { + payload_id: Self::parse_payload(&msg.payload().0).id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::NetworkReceived { from: msg.from } + } + ); + }) // Retain only messages that have not been seen before .filter(|msg| { self.message_cache @@ -254,44 +259,68 @@ impl BlendNode { hasher.finalize().into() } + fn schedule_persistent_transmission(&mut self, message: SimMessage) { + log!( + "MessageEvent", + MessageEvent { + payload_id: Self::parse_payload(&message.0).id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::PersistentTransmissionScheduled { + index: self.state.cur_num_persistent_scheduled + } + } + ); + self.persistent_sender.send(message).unwrap(); + self.state.cur_num_persistent_scheduled += 1; + } + + fn handle_incoming_message(&mut self, message: SimMessage) { + match self.crypto_processor.unwrap_message(&message.0) { + Ok((unwrapped_message, fully_unwrapped)) => { + let temporal_message = if fully_unwrapped { + BlendOutgoingMessage::FullyUnwrapped(unwrapped_message) + } else { + BlendOutgoingMessage::Outbound(unwrapped_message) + }; + + self.schedule_temporal_processor(temporal_message); + } + Err(e) => { + tracing::debug!("Failed to unwrap message: {:?}", e); + } + } + } + + fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessage) { + log!( + "MessageEvent", + MessageEvent { + payload_id: match &message { + BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg).id(), + BlendOutgoingMessage::FullyUnwrapped(payload) => + Payload::load(payload.clone()).id(), + }, + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::TemporalProcessorScheduled { + index: self.state.cur_num_temporal_scheduled + } + } + ); + self.temporal_sender.send(message).unwrap(); + self.state.cur_num_temporal_scheduled += 1; + } + fn update_time(&mut self, elapsed: Duration) { self.data_msg_lottery_update_time_sender .send(elapsed) .unwrap(); self.persistent_update_time_sender.send(elapsed).unwrap(); - self.blend_update_time_sender.send(elapsed).unwrap(); + self.temporal_update_time_sender.send(elapsed).unwrap(); self.epoch_update_sender.send(elapsed).unwrap(); self.slot_update_sender.send(elapsed).unwrap(); } - - fn log_message_generated(&self, msg_type: &str, payload: &Payload) { - self.log_message(format!("{}MessageGenerated", msg_type).as_str(), payload); - } - - fn log_message_fully_unwrapped(&self, payload: &Payload) { - self.log_message("MessageFullyUnwrapped", payload); - } - - fn log_message(&self, tag: &str, payload: &Payload) { - let log = MessageLog { - payload_id: payload.id(), - step_id: self.state.step_id, - node_id: self.id.index(), - }; - tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap()); - } - - fn log_emission(log: &EmissionLog) { - tracing::info!("Emission: {}", serde_json::to_string(log).unwrap()); - } - - fn new_emission_log(&self, emission_type: &str) -> EmissionLog { - EmissionLog { - emission_type: emission_type.to_string(), - step_id: self.state.step_id, - node_id: self.id.index(), - } - } } impl Node for BlendNode { @@ -316,38 +345,72 @@ impl Node for BlendNode { if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) { if self.data_msg_lottery.run() { let payload = Payload::new(); - self.log_message_generated("Data", &payload); let message = self .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.persistent_sender.send(message).unwrap(); + log!( + "MessageEvent", + MessageEvent { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::Created + } + ); + self.schedule_persistent_transmission(SimMessage(message)); } } // Handle incoming messages for network_message in self.receive() { + if MockBlendMessage::is_drop_message(&network_message.payload().0) { + continue; + } + self.forward( network_message.payload().clone(), Some(network_message.from), - None, ); - self.blend_sender - .send(network_message.into_payload().0) - .unwrap(); + self.handle_incoming_message(network_message.into_payload()); } - // Proceed message blend - if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) { + // Proceed temporal processor + if let Poll::Ready(Some(msg)) = + pin!(&mut self.temporal_processor_messages).poll_next(&mut cx) + { + log!( + "MessageEvent", + MessageEvent { + payload_id: match &msg { + BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg).id(), + BlendOutgoingMessage::FullyUnwrapped(payload) => + Payload::load(payload.clone()).id(), + }, + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::TemporalProcessorReleased + } + ); + self.state.cur_num_temporal_scheduled -= 1; + + // Proceed the message match msg { - BlendOutgoingMessage::Outbound(msg) => { - self.persistent_sender.send(msg).unwrap(); + BlendOutgoingMessage::Outbound(message) => { + self.schedule_persistent_transmission(SimMessage(message)); } BlendOutgoingMessage::FullyUnwrapped(payload) => { let payload = Payload::load(payload); - self.log_message_fully_unwrapped(&payload); + log!( + "MessageEvent", + MessageEvent { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::FullyUnwrapped + } + ); self.state.num_messages_fully_unwrapped += 1; - //TODO: create a tracing event } } } @@ -355,23 +418,37 @@ impl Node for BlendNode { // Generate a cover message probabilistically if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) { let payload = Payload::new(); - self.log_message_generated("Cover", &payload); let message = self .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.persistent_sender.send(message).unwrap(); + log!( + "MessageEvent", + MessageEvent { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::Created + } + ); + self.schedule_persistent_transmission(SimMessage(message)); } // Proceed persistent transmission if let Poll::Ready(Some(msg)) = pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) { - self.forward( - BlendMessage(msg), - None, - Some(self.new_emission_log("FromPersistent")), + log!( + "MessageEvent", + MessageEvent { + payload_id: Self::parse_payload(&msg.0).id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::PersistentTransmissionReleased + } ); + self.state.cur_num_persistent_scheduled -= 1; + self.forward(msg, None); } self.state.step_id += 1; @@ -387,17 +464,3 @@ impl Node for BlendNode { } } } - -#[derive(Debug, Serialize, Deserialize)] -struct MessageLog { - payload_id: PayloadId, - step_id: usize, - node_id: usize, -} - -#[derive(Debug, Serialize, Deserialize)] -struct EmissionLog { - emission_type: String, - step_id: usize, - node_id: usize, -} diff --git a/simlib/blendnet-sims/src/node/blend/scheduler.rs b/simlib/blendnet-sims/src/node/blend/scheduler.rs index ac403c8..b6a760d 100644 --- a/simlib/blendnet-sims/src/node/blend/scheduler.rs +++ b/simlib/blendnet-sims/src/node/blend/scheduler.rs @@ -44,14 +44,14 @@ impl Stream for Interval { } } -pub struct TemporalRelease { +pub struct TemporalScheduler { random_sleeps: Box + Send + Sync + 'static>, elapsed: Duration, current_sleep: Duration, update_time: channel::Receiver, } -impl TemporalRelease { +impl TemporalScheduler { pub fn new( mut rng: Rng, update_time: channel::Receiver, @@ -80,7 +80,7 @@ impl TemporalRelease { } } -impl Stream for TemporalRelease { +impl Stream for TemporalScheduler { type Item = (); fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -134,7 +134,7 @@ mod tests { fn temporal_release_update() { let (_tx, rx) = channel::unbounded(); let mut temporal_release = - TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); + TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); assert!(!temporal_release.update(Duration::from_secs(0))); assert!(!temporal_release.update(Duration::from_millis(999))); @@ -149,7 +149,7 @@ mod tests { let (tx, rx) = channel::unbounded(); let mut temporal_release = - TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); + TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); tx.send(Duration::from_secs(0)).unwrap(); assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending); diff --git a/simlib/blendnet-sims/src/node/blend/state.rs b/simlib/blendnet-sims/src/node/blend/state.rs index 7fb4417..a0b99a7 100644 --- a/simlib/blendnet-sims/src/node/blend/state.rs +++ b/simlib/blendnet-sims/src/node/blend/state.rs @@ -15,6 +15,8 @@ pub struct BlendnodeState { pub node_id: NodeId, pub step_id: usize, pub num_messages_fully_unwrapped: usize, + pub cur_num_persistent_scheduled: usize, + pub cur_num_temporal_scheduled: usize, } #[derive(Serialize)] diff --git a/simlib/blendnet-sims/src/node/blend/topology.rs b/simlib/blendnet-sims/src/node/blend/topology.rs index d86d40e..4dd3574 100644 --- a/simlib/blendnet-sims/src/node/blend/topology.rs +++ b/simlib/blendnet-sims/src/node/blend/topology.rs @@ -1,6 +1,12 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; -use netrunner::node::{NodeId, NodeIdExt}; +use netrunner::{ + network::regions::RegionsData, + node::{NodeId, NodeIdExt}, +}; use rand::{seq::SliceRandom, RngCore}; #[derive(Clone)] @@ -121,6 +127,28 @@ impl Topology { hop_count } + pub fn conn_latency_distribution( + &self, + regions_data: &RegionsData, + ) -> HashMap { + // Initialize a distribution + let distribution = regions_data + .region_network_behaviour + .values() + .map(|behaviour| (behaviour.delay(), 0)) + .collect(); + // Populate the distribution + self.0.iter().fold(distribution, |mut acc, (node, peers)| { + let region_a = regions_data.node_region(*node); + peers.iter().for_each(|peer| { + let region_b = regions_data.node_region(*peer); + let behaviour = regions_data.network_behaviour_between_regions(region_a, region_b); + acc.entry(behaviour.delay()).and_modify(|count| *count += 1); + }); + acc + }) + } + pub fn get(&self, node: &NodeId) -> Option<&HashSet> { self.0.get(node) } diff --git a/simlib/netrunner/src/network/mod.rs b/simlib/netrunner/src/network/mod.rs index e88d821..aed8c2d 100644 --- a/simlib/netrunner/src/network/mod.rs +++ b/simlib/netrunner/src/network/mod.rs @@ -618,16 +618,16 @@ mod tests { let node_c = NodeId::from_index(2); let regions = HashMap::from([ - (Region::Asia, vec![node_a, node_b]), + (Region::EastAsia, vec![node_a, node_b]), (Region::Europe, vec![node_c]), ]); let behaviour = HashMap::from([ ( - NetworkBehaviourKey::new(Region::Asia, Region::Asia), + NetworkBehaviourKey::new(Region::EastAsia, Region::EastAsia), NetworkBehaviour::new(Duration::from_millis(100), 0.0), ), ( - NetworkBehaviourKey::new(Region::Asia, Region::Europe), + NetworkBehaviourKey::new(Region::EastAsia, Region::Europe), NetworkBehaviour::new(Duration::from_millis(500), 0.0), ), ( diff --git a/simlib/netrunner/src/network/regions.rs b/simlib/netrunner/src/network/regions.rs index 185375b..55852d8 100644 --- a/simlib/netrunner/src/network/regions.rs +++ b/simlib/netrunner/src/network/regions.rs @@ -10,9 +10,13 @@ use super::{NetworkBehaviourKey, NetworkSettings}; #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub enum Region { - NorthAmerica, + NorthAmericaWest, + NorthAmericaCentral, + NorthAmericaEast, Europe, - Asia, + NorthernEurope, + EastAsia, + SoutheastAsia, Africa, SouthAmerica, Australia, @@ -21,9 +25,13 @@ pub enum Region { impl core::fmt::Display for Region { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { let s = match self { - Self::NorthAmerica => "NorthAmerica", + Self::NorthAmericaWest => "NorthAmericaWest", + Self::NorthAmericaCentral => "NorthAmericaCentral", + Self::NorthAmericaEast => "NorthAmericaEast", Self::Europe => "Europe", - Self::Asia => "Asia", + Self::NorthernEurope => "NorthernEurope", + Self::EastAsia => "EastAsia", + Self::SoutheastAsia => "SoutheastAsia", Self::Africa => "Africa", Self::SouthAmerica => "SouthAmerica", Self::Australia => "Australia", @@ -42,9 +50,13 @@ impl FromStr for Region { .replace(['-', '_', ' '], "") .as_str() { - "northamerica" | "na" => Ok(Self::NorthAmerica), + "northamericawest" | "naw" => Ok(Self::NorthAmericaWest), + "northamericacentral" | "nac" => Ok(Self::NorthAmericaCentral), + "northamericaeast" | "nae" => Ok(Self::NorthAmericaEast), "europe" | "eu" => Ok(Self::Europe), - "asia" | "as" => Ok(Self::Asia), + "northerneurope" | "neu" => Ok(Self::NorthernEurope), + "eastasia" | "eas" => Ok(Self::EastAsia), + "southeastasia" | "seas" => Ok(Self::SoutheastAsia), "africa" | "af" => Ok(Self::Africa), "southamerica" | "sa" => Ok(Self::SouthAmerica), "australia" | "au" => Ok(Self::Australia), @@ -56,9 +68,13 @@ impl FromStr for Region { impl Serialize for Region { fn serialize(&self, serializer: S) -> Result { let s = match self { - Self::NorthAmerica => "North America", + Self::NorthAmericaWest => "North America West", + Self::NorthAmericaCentral => "North America Central", + Self::NorthAmericaEast => "North America East", Self::Europe => "Europe", - Self::Asia => "Asia", + Self::NorthernEurope => "Northern Europe", + Self::EastAsia => "EastAsia", + Self::SoutheastAsia => "Southeast Asia", Self::Africa => "Africa", Self::SouthAmerica => "South America", Self::Australia => "Australia", @@ -105,6 +121,14 @@ impl RegionsData { pub fn network_behaviour(&self, node_a: NodeId, node_b: NodeId) -> &NetworkBehaviour { let region_a = self.node_region[&node_a]; let region_b = self.node_region[&node_b]; + self.network_behaviour_between_regions(region_a, region_b) + } + + pub fn network_behaviour_between_regions( + &self, + region_a: Region, + region_b: Region, + ) -> &NetworkBehaviour { let k = NetworkBehaviourKey::new(region_a, region_b); let k_rev = NetworkBehaviourKey::new(region_b, region_a); self.region_network_behaviour @@ -207,9 +231,13 @@ mod tests { .collect::>(); let available_regions = [ - Region::NorthAmerica, + Region::NorthAmericaWest, + Region::NorthAmericaCentral, + Region::NorthAmericaEast, Region::Europe, - Region::Asia, + Region::NorthernEurope, + Region::EastAsia, + Region::SoutheastAsia, Region::Africa, Region::SouthAmerica, Region::Australia, diff --git a/simlib/netrunner/src/streaming/io.rs b/simlib/netrunner/src/streaming/io.rs index b79d2f6..d0b0ec6 100644 --- a/simlib/netrunner/src/streaming/io.rs +++ b/simlib/netrunner/src/streaming/io.rs @@ -114,7 +114,7 @@ where } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::{collections::HashMap, time::Duration}; use crate::{ @@ -174,43 +174,19 @@ mod tests { RegionsData { regions: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (region, vec![NodeId::from_index(idx)]) }) .collect(), node_region: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (NodeId::from_index(idx), region) }) .collect(), region_network_behaviour: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); ( NetworkBehaviourKey::new(region, region), NetworkBehaviour { @@ -231,4 +207,20 @@ mod tests { .stop_after(Duration::from_millis(100)) .unwrap(); } + + pub(crate) fn region_from_index(idx: usize) -> Region { + match idx % 10 { + 0 => Region::Europe, + 1 => Region::NorthernEurope, + 2 => Region::NorthAmericaWest, + 3 => Region::NorthAmericaCentral, + 4 => Region::NorthAmericaEast, + 5 => Region::SouthAmerica, + 6 => Region::EastAsia, + 7 => Region::SoutheastAsia, + 8 => Region::Africa, + 9 => Region::Australia, + _ => unreachable!(), + } + } } diff --git a/simlib/netrunner/src/streaming/runtime_subscriber.rs b/simlib/netrunner/src/streaming/runtime_subscriber.rs index 2b54fc2..59d8809 100644 --- a/simlib/netrunner/src/streaming/runtime_subscriber.rs +++ b/simlib/netrunner/src/streaming/runtime_subscriber.rs @@ -115,6 +115,7 @@ mod tests { }, output_processors::OutData, runner::SimulationRunner, + streaming::io::tests::region_from_index, warding::SimulationState, }; @@ -160,43 +161,19 @@ mod tests { RegionsData { regions: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (region, vec![NodeId::from_index(idx)]) }) .collect(), node_region: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (NodeId::from_index(idx), region) }) .collect(), region_network_behaviour: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); ( NetworkBehaviourKey::new(region, region), NetworkBehaviour { diff --git a/simlib/netrunner/src/streaming/settings_subscriber.rs b/simlib/netrunner/src/streaming/settings_subscriber.rs index 1dd4ec9..e2f6f3d 100644 --- a/simlib/netrunner/src/streaming/settings_subscriber.rs +++ b/simlib/netrunner/src/streaming/settings_subscriber.rs @@ -116,6 +116,7 @@ mod tests { }, output_processors::OutData, runner::SimulationRunner, + streaming::io::tests::region_from_index, warding::SimulationState, }; @@ -156,43 +157,19 @@ mod tests { RegionsData { regions: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (region, vec![NodeId::from_index(idx)]) }) .collect(), node_region: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (NodeId::from_index(idx), region) }) .collect(), region_network_behaviour: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); ( NetworkBehaviourKey::new(region, region), NetworkBehaviour {