From e2d1c373998816927f21cb31c6a4f8cd356abc38 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 6 Feb 2025 17:26:12 +0900 Subject: [PATCH] message history + batch --- simlib/blendnet-sims/config/blendnet.json | 96 ++++- simlib/blendnet-sims/scripts/batch.py | 289 +++++++++++++++ simlib/blendnet-sims/scripts/emission.py | 21 +- simlib/blendnet-sims/scripts/latency.py | 172 +++++---- simlib/blendnet-sims/scripts/mixlog.py | 16 +- simlib/blendnet-sims/src/main.rs | 57 ++- simlib/blendnet-sims/src/node/blend/log.rs | 21 ++ .../blendnet-sims/src/node/blend/message.rs | 78 ++++ simlib/blendnet-sims/src/node/blend/mod.rs | 345 +++++++++++------- .../blendnet-sims/src/node/blend/scheduler.rs | 10 +- simlib/blendnet-sims/src/node/blend/state.rs | 2 + .../blendnet-sims/src/node/blend/topology.rs | 32 +- simlib/netrunner/src/network/mod.rs | 6 +- simlib/netrunner/src/network/regions.rs | 48 ++- simlib/netrunner/src/streaming/io.rs | 48 +-- .../src/streaming/runtime_subscriber.rs | 31 +- .../src/streaming/settings_subscriber.rs | 31 +- 17 files changed, 955 insertions(+), 348 deletions(-) create mode 100644 simlib/blendnet-sims/scripts/batch.py create mode 100644 simlib/blendnet-sims/src/node/blend/log.rs diff --git a/simlib/blendnet-sims/config/blendnet.json b/simlib/blendnet-sims/config/blendnet.json index 9179c44..90331c7 100644 --- a/simlib/blendnet-sims/config/blendnet.json +++ b/simlib/blendnet-sims/config/blendnet.json @@ -1,49 +1,109 @@ { "network_settings": { "network_behaviors": { - "north america:north america": "50ms", - "north america:europe": "100ms", - "north america:asia": "120ms", + "north america west:north america west": "40ms", + "north america west:north america east": "70ms", + "north america west:north america central": "50ms", + "north america west:europe": "150ms", + "north america west:northern europe": "170ms", + "north america west:east asia": "180ms", + "north america west:southeast asia": "200ms", + "north america west:australia": "250ms", + "north america east:north america west": "70ms", + "north america east:north america east": "40ms", + "north america east:north america central": "50ms", + "north america east:europe": "130ms", + "north america east:northern europe": "140ms", + "north america east:east asia": "250ms", + "north america east:southeast asia": "300ms", + "north america east:australia": "230ms", + "north america central:north america west": "50ms", + "north america central:north america east": "50ms", + "north america central:north america central": "20ms", + "north america central:europe": "140ms", + "north america central:northern europe": "150ms", + "north america central:east asia": "200ms", + "north america central:southeast asia": "280ms", + "north america central:australia": "220ms", + "europe:north america west": "150ms", + "europe:north america east": "130ms", + "europe:north america central": "140ms", "europe:europe": "50ms", - "europe:asia": "100ms", - "europe:north america": "120ms", - "asia:north america": "100ms", - "asia:europe": "120ms", - "asia:asia": "40ms" + "europe:northern europe": "60ms", + "europe:east asia": "300ms", + "europe:southeast asia": "300ms", + "europe:australia": "300ms", + "northern europe:north america west": "170ms", + "northern europe:north america east": "140ms", + "northern europe:north america central": "150ms", + "northern europe:europe": "60ms", + "northern europe:northern europe": "30ms", + "northern europe:east asia": "400ms", + "northern europe:southeast asia": "320ms", + "northern europe:australia": "300ms", + "east asia:north america west": "180ms", + "east asia:north america east": "250ms", + "east asia:north america central": "200ms", + "east asia:europe": "300ms", + "east asia:northern europe": "400ms", + "east asia:east asia": "50ms", + "east asia:southeast asia": "90ms", + "east asia:australia": "150ms", + "southeast asia:north america west": "200ms", + "southeast asia:north america east": "300ms", + "southeast asia:north america central": "280ms", + "southeast asia:europe": "300ms", + "southeast asia:northern europe": "320ms", + "southeast asia:east asia": "90ms", + "southeast asia:southeast asia": "40ms", + "southeast asia:australia": "150ms", + "australia:north america west": "250ms", + "australia:north america east": "230ms", + "australia:north america central": "220ms", + "australia:europe": "300ms", + "australia:northern europe": "300ms", + "australia:east asia": "150ms", + "australia:southeast asia": "150ms", + "australia:australia": "50ms" }, "regions": { - "north america": 0.4, - "europe": 0.4, - "asia": 0.3 + "north america west": 0.06, + "north america east": 0.15, + "north america central": 0.02, + "europe": 0.47, + "northern europe": 0.10, + "east asia": 0.10, + "southeast asia": 0.07, + "australia": 0.03 } }, "node_settings": { "timeout": "1000ms" }, - "step_time": "40ms", + "step_time": "10ms", "runner_settings": "Sync", "stream_settings": { "path": "test.json" }, - "node_count": 10, + "node_count": 100, "seed": 0, "record_settings": {}, "wards": [ { - "sum": 10 + "sum": 100 } ], "connected_peers_count": 4, "data_message_lottery_interval": "20s", "stake_proportion": 1.0, - "epoch_duration": "432000s", - "slot_duration": "20s", - "slots_per_epoch": 21600, + "epoch_duration": "200s", + "slot_duration": "1s", + "slots_per_epoch": 200, "number_of_hops": 2, "persistent_transmission": { "max_emission_frequency": 1.0, "drop_message_probability": 0.0 }, "number_of_blend_layers": 2, - "max_delay_seconds": 10 + "max_delay_seconds": 5 } diff --git a/simlib/blendnet-sims/scripts/batch.py b/simlib/blendnet-sims/scripts/batch.py new file mode 100644 index 0000000..e2d4022 --- /dev/null +++ b/simlib/blendnet-sims/scripts/batch.py @@ -0,0 +1,289 @@ +# !/usr/bin/env python +import argparse +import csv +import json +import os +import subprocess +from collections import OrderedDict + +import latency +import mixlog + + +def bandwidth_result(log_path: str, step_duration_ms: int) -> dict[str, float]: + max_step_id = 0 + for topic, json_msg in mixlog.get_input_stream(log_path): + if topic == "MessageFullyUnwrapped": + max_step_id = max(max_step_id, json_msg["message"]["step_id"]) + + with open(log_path, "r") as file: + for line in file: + if "total_outbound_bandwidth" in line: + line = line[line.find("{") :] + line = line.replace("{ ", '{"') + line = line.replace(": ", '": ') + line = line.replace(", ", ', "') + record = json.loads(line) + + elapsed = (max_step_id * step_duration_ms) / 1000.0 + return { + "min": float(record["min_node_total_bandwidth"]) / elapsed, + "avg": float(record["avg_node_total_bandwidth"]) / elapsed, + "max": float(record["max_node_total_bandwidth"]) / elapsed, + } + + raise Exception("No bandwidth data found in log file") + + +def topology_result(log_path: str) -> dict[str, int]: + for topic, json_msg in mixlog.get_input_stream(log_path): + if topic == "Topology": + return json_msg + raise Exception("No topology found in log file") + + +def build_argument_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.") + parser.add_argument( + "--step-duration", + type=int, + help="Duration (in ms) of each step in the simulation.", + ) + parser.add_argument( + "--params-file", + type=str, + help="A CSV file that contains all parameter sets", + ) + parser.add_argument( + "--orig-config-file", + type=str, + help="An original blendnet config JSON file that will be modified as specified in `params_file`", + ) + parser.add_argument( + "--outdir", + type=str, + help="A output directory to be created", + ) + parser.add_argument( + "--skip-run", + action="store_true", + help="Skip running the simulations and only analyze the logs", + ) + return parser + + +if __name__ == "__main__": + argument_parser = build_argument_parser() + args = argument_parser.parse_args() + + # Read the params CSV file + csv_data = [] + with open(args.params_file, mode="r") as csvfile: + reader = csv.DictReader(csvfile, delimiter=",") + csv_data = list(reader) + + # Read the original blendnet json config file + with open(args.orig_config_file, "r") as jsonfile: + original_json = json.load(jsonfile) + + # Directory to save modified JSON files + modified_configs_dir = os.path.join(args.outdir, "modified_configs") + os.makedirs(modified_configs_dir, exist_ok=True) + + # Modify and save JSON files for each row in CSV + config_paths = [] + for idx, row in enumerate(csv_data): + output_path = os.path.join(modified_configs_dir, f"{idx}.json") + config_paths.append(output_path) + + if args.skip_run: + continue + + modified_json = OrderedDict(original_json) # Preserve original field order + + # Apply modifications + modified_json["network_settings"]["regions"]["north america west"] = 0.06 + modified_json["network_settings"]["regions"]["north america east"] = 0.15 + modified_json["network_settings"]["regions"]["north america central"] = 0.02 + modified_json["network_settings"]["regions"]["europe"] = 0.47 + modified_json["network_settings"]["regions"]["northern europe"] = 0.10 + modified_json["network_settings"]["regions"]["east asia"] = 0.10 + modified_json["network_settings"]["regions"]["southeast asia"] = 0.07 + modified_json["network_settings"]["regions"]["australia"] = 0.03 + modified_json["step_time"] = f"{args.step_duration}ms" + modified_json["node_count"] = int(row["network_size"]) + modified_json["wards"][0]["sum"] = 1000 + modified_json["connected_peers_count"] = int(row["peering_degree"]) + modified_json["data_message_lottery_interval"] = "20s" + modified_json["stake_proportion"] = 0.0 + modified_json["persistent_transmission"]["max_emission_frequency"] = 1.0 + modified_json["persistent_transmission"]["drop_message_probability"] = 0.0 + modified_json["epoch_duration"] = ( + f"{int(row['cover_slots_per_epoch']) * int(row['cover_slot_duration'])}s" + ) + modified_json["slots_per_epoch"] = int(row["cover_slots_per_epoch"]) + modified_json["slot_duration"] = f"{row['cover_slot_duration']}s" + modified_json["max_delay_seconds"] = int(row["max_temporal_delay"]) + modified_json["number_of_hops"] = int(row["blend_hops"]) + modified_json["number_of_blend_layers"] = int(row["blend_hops"]) + + # Save modified JSON + with open(output_path, "w") as outfile: + json.dump(modified_json, outfile, indent=2) + print("Saved modified JSON to:", output_path) + + # Directory to save logs + log_dir = os.path.join(args.outdir, "logs") + os.makedirs(log_dir, exist_ok=True) + + log_paths = [] + for idx, config_path in enumerate(config_paths): + log_path = f"{log_dir}/{idx}.log" + log_paths.append(log_path) + + if args.skip_run: + continue + + with open(log_path, "w") as log_file: + print( + f"Running simulation-{idx}: {log_file.name} with config: {config_path}" + ) + subprocess.run( + ["../../target/release/blendnet-sims", "--input-settings", config_path], + stdout=log_file, + ) + print(f"Simulation-{idx} completed: {log_file.name}") + + print("Analyzing logs...") + print("=================") + + with open(os.path.join(args.outdir, "output.csv"), "w", newline="") as file: + print(f"Writing results to: {file.name}") + csv_writer = csv.writer(file) + csv_writer.writerow( + [ + "network_diameter", + "msg_count", + "min_latency_sec", + "avg_latency_sec", + "median_latency_sec", + "max_latency_sec", + "min_latency_msg_id", + "min_latency_msg_persistent_latency_sec", + "min_latency_msg_persistent_index", + "min_latency_msg_temporal_latency_sec", + "min_latency_msg_temporal_index", + "max_latency_msg_id", + "max_latency_msg_persistent_latency_sec", + "max_latency_msg_persistent_index", + "max_latency_msg_temporal_latency_sec", + "max_latency_msg_temporal_index", + "min_conn_latency_sec", + "avg_conn_latency_sec", + "med_conn_latency_sec", + "max_conn_latency_sec", + "min_bandwidth_kbps", + "avg_bandwidth_kbps", + "max_bandwidth_kbps", + ] + ) + + for idx, log_path in enumerate(log_paths): + csv_row = [] + csv_row.append(topology_result(log_path)["diameter"]) + + latency_analysis = latency.LatencyAnalysis.build( + mixlog.get_input_stream(log_path) + ) + csv_row.append(latency_analysis.total_messages) + csv_row.append(float(latency_analysis.min_latency_ms) / 1000.0) + csv_row.append(float(latency_analysis.avg_latency_ms) / 1000.0) + csv_row.append(float(latency_analysis.median_latency_ms) / 1000.0) + csv_row.append(float(latency_analysis.max_latency_ms) / 1000.0) + csv_row.append(latency_analysis.min_latency_analysis.message_id) + csv_row.append( + ",".join( + map( + str, + [ + ms / 1000.0 + for ms in latency_analysis.min_latency_analysis.persistent_latencies_ms + ], + ) + ) + ) + csv_row.append( + ",".join( + map(str, latency_analysis.min_latency_analysis.persistent_indices) + ) + ) + csv_row.append( + ",".join( + map( + str, + [ + ms / 1000.0 + for ms in latency_analysis.min_latency_analysis.temporal_latencies_ms + ], + ) + ) + ) + csv_row.append( + ",".join( + map(str, latency_analysis.min_latency_analysis.temporal_indices) + ) + ) + csv_row.append(latency_analysis.max_latency_analysis.message_id) + csv_row.append( + ",".join( + map( + str, + [ + ms / 1000.0 + for ms in latency_analysis.max_latency_analysis.persistent_latencies_ms + ], + ) + ) + ) + csv_row.append( + ",".join( + map(str, latency_analysis.max_latency_analysis.persistent_indices) + ) + ) + csv_row.append( + ",".join( + map( + str, + [ + ms / 1000.0 + for ms in latency_analysis.max_latency_analysis.temporal_latencies_ms + ], + ) + ) + ) + csv_row.append( + ",".join( + map(str, latency_analysis.max_latency_analysis.temporal_indices) + ) + ) + csv_row.append( + float(latency_analysis.conn_latency_analysis.min_ms) / 1000.0 + ) + csv_row.append( + float(latency_analysis.conn_latency_analysis.avg_ms) / 1000.0 + ) + csv_row.append( + float(latency_analysis.conn_latency_analysis.med_ms) / 1000.0 + ) + csv_row.append( + float(latency_analysis.conn_latency_analysis.max_ms) / 1000.0 + ) + + bandwidth_res = bandwidth_result(log_path, args.step_duration) + csv_row.append(bandwidth_res["min"] * 8 / 1000.0) + csv_row.append(bandwidth_res["avg"] * 8 / 1000.0) + csv_row.append(bandwidth_res["max"] * 8 / 1000.0) + + csv_writer.writerow(csv_row) + + print(f"The outputs have been successfully written to {file.name}") diff --git a/simlib/blendnet-sims/scripts/emission.py b/simlib/blendnet-sims/scripts/emission.py index 6caa2e8..fa0be5e 100644 --- a/simlib/blendnet-sims/scripts/emission.py +++ b/simlib/blendnet-sims/scripts/emission.py @@ -1,16 +1,14 @@ import argparse -import json from collections.abc import Iterable from typing import Any import matplotlib import matplotlib.pyplot as plt +import mixlog import pandas as pd -import mixlog - -def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None: +def plot_emissions(input_stream: Iterable[tuple[str, dict]], plot_path: str) -> None: df = pd.DataFrame(emission_records(input_stream)) plt.figure(figsize=(12, 6)) @@ -24,19 +22,8 @@ def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None: plt.show() -def emission_records(input_stream: Iterable[str]) -> list[Any]: - records = [] - - for line in input_stream: - try: - record = json.loads(line) - except json.JSONDecodeError: - continue - - if "emission_type" in record: - records.append(record) - - return records +def emission_records(input_stream: Iterable[tuple[str, dict]]) -> list[Any]: + return [record for _, record in filter(lambda x: x[0] == "Emission", input_stream)] if __name__ == "__main__": diff --git a/simlib/blendnet-sims/scripts/latency.py b/simlib/blendnet-sims/scripts/latency.py index 35551f9..412e238 100755 --- a/simlib/blendnet-sims/scripts/latency.py +++ b/simlib/blendnet-sims/scripts/latency.py @@ -1,88 +1,113 @@ # !/usr/bin/env python import argparse import json -import statistics -from collections.abc import Iterable -from typing import Dict, Optional +from dataclasses import asdict, dataclass +from typing import Iterable import mixlog +import pandas as pd -class Message: - def __init__(self, message_id: str, step_a: Optional[int]): - self.id = message_id - self.step_a = int(step_a) if step_a is not None else None - self.step_b = None +@dataclass +class LatencyAnalysis: + total_messages: int + min_latency_ms: int + min_latency_analysis: "MessageLatencyAnalysis" + max_latency_ms: int + max_latency_analysis: "MessageLatencyAnalysis" + avg_latency_ms: int + median_latency_ms: int + conn_latency_analysis: "ConnectionLatencyAnalysis" - def __hash__(self): - return self.id + @classmethod + def build(cls, input_stream: Iterable[tuple[str, dict]]) -> "LatencyAnalysis": + latencies = [] + message_ids = [] + messages: dict[str, dict] = {} + for topic, record in input_stream: + if topic != "MessageFullyUnwrapped": + continue + latencies.append(record["total_duration"]) + message_id = record["message"]["payload_id"] + message_ids.append(message_id) + messages[message_id] = record - def __repr__(self): - return f"[{self.id}] {self.step_a} -> {self.step_b}" + latencies = pd.Series(latencies) + message_ids = pd.Series(message_ids) - @property - def latency(self) -> Optional[int]: - if self.step_a is not None and self.step_b is not None: - return abs(self.step_a - self.step_b) + return cls( + total_messages=int(latencies.count()), + min_latency_ms=int(latencies.min()), + min_latency_analysis=MessageLatencyAnalysis.build( + messages[str(message_ids[latencies.idxmin()])] + ), + max_latency_ms=int(latencies.max()), + max_latency_analysis=MessageLatencyAnalysis.build( + messages[str(message_ids[latencies.idxmax()])] + ), + avg_latency_ms=int(latencies.mean()), + median_latency_ms=int(latencies.median()), + conn_latency_analysis=ConnectionLatencyAnalysis.build(messages), + ) -MessageStorage = Dict[str, Message] +@dataclass +class MessageLatencyAnalysis: + message_id: str + persistent_latencies_ms: list[int] + persistent_indices: list[int] + temporal_latencies_ms: list[int] + temporal_indices: list[int] + + @classmethod + def build( + cls, + message: dict, + ) -> "MessageLatencyAnalysis": + analysis = cls(message["message"]["payload_id"], [], [], [], []) + + for event in message["history"]: + event_type = event["event_type"] + if "PersistentTransmissionScheduled" in event_type: + analysis.persistent_indices.append( + int(event_type["PersistentTransmissionScheduled"]["index"]) + ) + elif event_type == "PersistentTransmissionReleased": + analysis.persistent_latencies_ms.append(event["duration_from_prev"]) + elif "TemporalProcessorScheduled" in event_type: + analysis.temporal_indices.append( + int(event_type["TemporalProcessorScheduled"]["index"]) + ) + elif event_type == "TemporalProcessorReleased": + analysis.temporal_latencies_ms.append(event["duration_from_prev"]) + + return analysis -def compute_results(message_storage: MessageStorage, step_duration: int) -> str: - latencies = [message_record.latency for message_record in message_storage.values()] - valued_latencies = [latency for latency in latencies if latency is not None] - incomplete_latencies = sum((1 for latency in latencies if latency is None)) +@dataclass +class ConnectionLatencyAnalysis: + min_ms: int + avg_ms: int + med_ms: int + max_ms: int - total_messages = len(latencies) - total_messages_full_latency = len(valued_latencies) - total_messages_incomplete_latency = incomplete_latencies - latency_average_steps = statistics.mean(valued_latencies) - latency_average_ms = "{:.2f}ms".format(latency_average_steps * step_duration) - latency_median_steps = statistics.median(valued_latencies) - latency_median_ms = "{:.2f}ms".format(latency_median_steps * step_duration) - max_latency_steps = max(valued_latencies) - max_latency_ms = "{:.2f}ms".format(max_latency_steps * step_duration) - min_latency_steps = min(valued_latencies) - min_latency_ms = "{:.2f}ms".format(min_latency_steps * step_duration) - - return f"""[Results] -- Total messages: {total_messages} - - Full latencies: {total_messages_full_latency} - - Incomplete latencies: {total_messages_incomplete_latency} -- Averages - - Steps: {latency_average_steps} - - Duration: {latency_average_ms} -- Median - - Steps: {latency_median_steps} - - Duration: {latency_median_ms} -- Max - - Steps: {max_latency_steps} - - Duration: {max_latency_ms} -- Min - - Steps: {min_latency_steps} - - Duration: {min_latency_ms}""" - - -def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage: - storage: MessageStorage = {} - - for record in record_stream: - try: - json_record = json.loads(record) - except json.decoder.JSONDecodeError: - continue - - if (payload_id := json_record.get("payload_id")) is None: - continue - step_id = json_record["step_id"] - - if (stored_message := storage.get(payload_id)) is None: - storage[payload_id] = Message(payload_id, step_id) - else: - stored_message.step_b = step_id - - return storage + @classmethod + def build( + cls, + messages: dict[str, dict], + ) -> "ConnectionLatencyAnalysis": + latencies = [] + for message in messages.values(): + for event in message["history"]: + if "NetworkReceived" in event["event_type"]: + latencies.append(event["duration_from_prev"]) + latencies = pd.Series(latencies) + return cls( + int(latencies.min()), + int(latencies.mean()), + int(latencies.median()), + int(latencies.max()), + ) def build_argument_parser() -> argparse.ArgumentParser: @@ -105,8 +130,5 @@ if __name__ == "__main__": argument_parser = build_argument_parser() arguments = argument_parser.parse_args() - input_stream = mixlog.get_input_stream(arguments.input_file) - messages = parse_record_stream(input_stream) - - results = compute_results(messages, arguments.step_duration) - print(results) + analysis = LatencyAnalysis.build(mixlog.get_input_stream(arguments.input_file)) + print(json.dumps(asdict(analysis), indent=2)) diff --git a/simlib/blendnet-sims/scripts/mixlog.py b/simlib/blendnet-sims/scripts/mixlog.py index c76a613..a21bc31 100644 --- a/simlib/blendnet-sims/scripts/mixlog.py +++ b/simlib/blendnet-sims/scripts/mixlog.py @@ -1,12 +1,20 @@ +import json import sys from collections.abc import Iterable from typing import Optional +TOPIC_LOG_INDICATOR = '{"topic":' -def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]: + +def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[tuple[str, dict]]: for record in record_stream: - bracket_pos = record.rfind("{") - yield record[bracket_pos:] + topic_idx = record.find(TOPIC_LOG_INDICATOR) + if topic_idx == -1: + continue + + # Split the line into 2 parts: topic and JSON message + log = json.loads(record[topic_idx:].strip()) + yield (log["topic"], log["message"]) def get_pipe_stream() -> Iterable[str]: @@ -18,7 +26,7 @@ def get_file_stream(input_filename) -> Iterable[str]: yield from file -def get_input_stream(input_filename: Optional[str]) -> Iterable[str]: +def get_input_stream(input_filename: Optional[str]) -> Iterable[tuple[str, dict]]: stream = ( get_file_stream(input_filename) if input_filename is not None diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index f825a5e..ce8a149 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -10,6 +10,7 @@ use crate::node::blend::{BlendMessage, BlendnodeSettings}; use anyhow::Ok; use clap::Parser; use crossbeam::channel; +use multiaddr::Multiaddr; use netrunner::network::behaviour::create_behaviours; use netrunner::network::regions::{create_regions, RegionsData}; use netrunner::network::{InMemoryNetworkInterface, Network, PayloadSize}; @@ -85,13 +86,26 @@ impl SimulationApp { &mut rng, &settings.simulation_settings.network_settings, ); + log!( + "Regions", + regions + .iter() + .map(|(region, node_ids)| (region, node_ids.len())) + .collect::>() + ); + log!("NumRegions", regions.len()); + let behaviours = create_behaviours(&settings.simulation_settings.network_settings); let regions_data = RegionsData::new(regions, behaviours); - let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); + let network = Arc::new(Mutex::new(Network::::new( + regions_data.clone(), + seed, + ))); let topology = Topology::new(&node_ids, settings.connected_peers_count, &mut rng); log_topology(&topology); + log_conn_latency_distribution(&topology.conn_latency_distribution(®ions_data)); let nodes: Vec<_> = node_ids .iter() @@ -126,7 +140,14 @@ impl SimulationApp { slots_per_epoch: settings.slots_per_epoch, network_size: node_ids.len(), }, - membership: node_ids.iter().map(|&id| id.into()).collect(), + membership: node_ids + .iter() + .map(|&id| nomos_blend::membership::Node { + id, + address: Multiaddr::empty(), + public_key: id.into(), + }) + .collect(), }, ) }) @@ -176,6 +197,7 @@ fn create_boxed_blendnode( Box::new(BlendNode::new( node_id, blendnode_settings, + simulation_settings.step_time, network_interface, )) } @@ -246,11 +268,13 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { } fn log_topology(topology: &Topology) { - let log = TopologyLog { - topology: topology.to_node_indices(), - diameter: topology.diameter(), - }; - tracing::info!("Topology: {}", serde_json::to_string(&log).unwrap()); + log!( + "Topology", + TopologyLog { + topology: topology.to_node_indices(), + diameter: topology.diameter(), + } + ); } #[derive(Debug, Serialize, Deserialize)] @@ -259,6 +283,25 @@ struct TopologyLog { diameter: usize, } +fn log_conn_latency_distribution(distribution: &HashMap) { + log!( + "ConnLatencyDistribution", + ConnLatencyDistributionLog { + num_links: distribution.values().sum(), + distribution: distribution + .iter() + .map(|(latency, count)| (latency.as_millis(), *count)) + .collect::>(), + } + ); +} + +#[derive(Debug, Serialize, Deserialize)] +struct ConnLatencyDistributionLog { + num_links: usize, + distribution: HashMap, +} + fn main() -> anyhow::Result<()> { let app: SimulationApp = SimulationApp::parse(); let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); diff --git a/simlib/blendnet-sims/src/node/blend/log.rs b/simlib/blendnet-sims/src/node/blend/log.rs new file mode 100644 index 0000000..1a92805 --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/log.rs @@ -0,0 +1,21 @@ +use serde::Serialize; + +#[macro_export] +macro_rules! log { + ($topic:expr, $msg:expr) => { + tracing::info!( + "{}", + serde_json::to_string(&$crate::node::blend::log::TopicLog { + topic: $topic.to_string(), + message: $msg + }) + .unwrap() + ); + }; +} + +#[derive(Serialize)] +pub struct TopicLog { + pub topic: String, + pub message: M, +} diff --git a/simlib/blendnet-sims/src/node/blend/message.rs b/simlib/blendnet-sims/src/node/blend/message.rs index 4a30a7d..e314181 100644 --- a/simlib/blendnet-sims/src/node/blend/message.rs +++ b/simlib/blendnet-sims/src/node/blend/message.rs @@ -1,3 +1,8 @@ +use std::{ops::Mul, time::Duration}; + +use netrunner::node::serialize_node_id_as_index; +use netrunner::node::NodeId; +use serde::Serialize; use uuid::Uuid; pub type PayloadId = String; @@ -23,6 +28,79 @@ impl Payload { } } +#[derive(Debug, Clone, Serialize)] +pub struct MessageHistory(Vec); + +impl MessageHistory { + pub fn new() -> Self { + Self(Vec::new()) + } + + pub fn add( + &mut self, + node_id: NodeId, + step_id: usize, + step_time: Duration, + event_type: MessageEventType, + ) { + let duration_from_prev = self.0.last().map_or(Duration::ZERO, |prev_event| { + step_time.mul((step_id - prev_event.step_id).try_into().unwrap()) + }); + self.0.push(MessageEvent { + node_id, + step_id, + duration_from_prev, + event_type, + }); + } + + pub fn last_event_type(&self) -> Option<&MessageEventType> { + self.0.last().map(|event| &event.event_type) + } + + pub fn total_duration(&self) -> Duration { + self.0.iter().map(|event| event.duration_from_prev).sum() + } +} + +#[derive(Debug, Clone, Serialize)] +struct MessageEvent { + #[serde(serialize_with = "serialize_node_id_as_index")] + node_id: NodeId, + step_id: usize, + #[serde(serialize_with = "duration_as_millis")] + duration_from_prev: Duration, + event_type: MessageEventType, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub enum MessageEventType { + Created, + PersistentTransmissionScheduled { + index: usize, + }, + PersistentTransmissionReleased, + TemporalProcessorScheduled { + index: usize, + }, + TemporalProcessorReleased, + NetworkSent { + #[serde(serialize_with = "serialize_node_id_as_index")] + to: NodeId, + }, + NetworkReceived { + #[serde(serialize_with = "serialize_node_id_as_index")] + from: NodeId, + }, +} + +pub fn duration_as_millis(duration: &Duration, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_u64(duration.as_millis().try_into().unwrap()) +} + #[cfg(test)] mod tests { use super::Payload; diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 6dd5c9e..5dd538c 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -1,4 +1,6 @@ pub mod consensus_streams; +#[macro_use] +pub mod log; pub mod lottery; mod message; pub mod scheduler; @@ -11,20 +13,18 @@ use cached::{Cached, TimedCache}; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; -use message::{Payload, PayloadId}; -use multiaddr::Multiaddr; +use message::{duration_as_millis, MessageEventType, MessageHistory, Payload, PayloadId}; use netrunner::network::NetworkMessage; use netrunner::node::{Node, NodeId, NodeIdExt}; use netrunner::{ network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, warding::WardCondition, }; +use nomos_blend::message_blend::temporal::{TemporalProcessorExt, TemporalStream}; use nomos_blend::{ cover_traffic::{CoverTraffic, CoverTrafficSettings}, membership::Membership, - message_blend::{ - crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream, - }, + message_blend::{crypto::CryptographicProcessor, MessageBlendSettings}, persistent_transmission::{ PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, }, @@ -33,22 +33,53 @@ use nomos_blend::{ use nomos_blend_message::mock::MockBlendMessage; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; -use scheduler::{Interval, TemporalRelease}; +use scheduler::{Interval, TemporalScheduler}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use state::BlendnodeState; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; -#[derive(Debug, Clone)] -pub struct BlendMessage(Vec); +#[derive(Debug, Clone, Serialize)] +pub struct BlendMessage { + message: Vec, + history: MessageHistory, +} + +impl BlendMessage { + fn new(message: Vec, node_id: NodeId, step_id: usize, step_time: Duration) -> Self { + let mut history = MessageHistory::new(); + history.add(node_id, step_id, step_time, MessageEventType::Created); + Self { message, history } + } + + fn new_drop() -> Self { + Self { + message: Vec::new(), + history: MessageHistory::new(), + } + } + + fn is_drop(&self) -> bool { + self.message.is_empty() + } +} impl PayloadSize for BlendMessage { fn size_bytes(&self) -> u32 { - 2208 + // payload: 32 KiB + // header encryption overhead: 133 bytes = 48 + 17 * max_blend_hops(=5) + // payload encryption overhaed: 16 bytes + // economic data overhead: 8043 bytes + 40960 } } +struct BlendOutgoingMessageWithHistory { + outgoing_message: BlendOutgoingMessage, + history: MessageHistory, +} + #[derive(Deserialize)] pub struct BlendnodeSettings { pub connected_peers: Vec, @@ -60,7 +91,12 @@ pub struct BlendnodeSettings { pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, pub cover_traffic_settings: CoverTrafficSettings, - pub membership: Vec<::PublicKey>, + pub membership: Vec< + nomos_blend::membership::Node< + NodeId, + ::PublicKey, + >, + >, } type Sha256Hash = [u8; 32]; @@ -70,6 +106,7 @@ pub struct BlendNode { id: NodeId, state: BlendnodeState, settings: BlendnodeSettings, + step_time: Duration, network_interface: InMemoryNetworkInterface, message_cache: TimedCache, @@ -77,23 +114,17 @@ pub struct BlendNode { data_msg_lottery_interval: Interval, data_msg_lottery: StakeLottery, - persistent_sender: channel::Sender>, + persistent_sender: channel::Sender, persistent_update_time_sender: channel::Sender, - persistent_transmission_messages: PersistentTransmissionStream< - CrossbeamReceiverStream>, - ChaCha12Rng, - MockBlendMessage, - Interval, - >, - crypto_processor: CryptographicProcessor, - blend_sender: channel::Sender>, - blend_update_time_sender: channel::Sender, - blend_messages: MessageBlendStream< - CrossbeamReceiverStream>, - ChaCha12Rng, - MockBlendMessage, - TemporalRelease, - >, + persistent_transmission_messages: + PersistentTransmissionStream, ChaCha12Rng, Interval>, + + crypto_processor: CryptographicProcessor, + temporal_sender: channel::Sender, + temporal_update_time_sender: channel::Sender, + temporal_processor_messages: + TemporalStream, TemporalScheduler>, + epoch_update_sender: channel::Sender, slot_update_sender: channel::Sender, cover_traffic: CoverTraffic, @@ -103,6 +134,7 @@ impl BlendNode { pub fn new( id: NodeId, settings: BlendnodeSettings, + step_time: Duration, network_interface: InMemoryNetworkInterface, ) -> Self { let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); @@ -120,7 +152,7 @@ impl BlendNode { ); // Init Tier-1: Persistent transmission - let (persistent_sender, persistent_receiver) = channel::unbounded(); + let (persistent_sender, persistent_receiver) = channel::unbounded::(); let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) .persistent_transmission( @@ -132,43 +164,28 @@ impl BlendNode { ), persistent_update_time_receiver, ), + BlendMessage::new_drop(), ); - // Init Tier-2: message blend - let (blend_sender, blend_receiver) = channel::unbounded(); - let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded(); - let nodes: Vec< - nomos_blend::membership::Node< - ::PublicKey, - >, - > = settings - .membership - .iter() - .map(|&public_key| nomos_blend::membership::Node { - address: Multiaddr::empty(), - public_key, - }) - .collect(); - let membership = Membership::::new(nodes, id.into()); + // Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor + let membership = + Membership::::new(settings.membership.clone(), id.into()); let crypto_processor = CryptographicProcessor::new( settings.message_blend.cryptographic_processor.clone(), membership.clone(), ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), ); - let temporal_release = TemporalRelease::new( - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - blend_update_time_receiver, - ( - 1, - settings.message_blend.temporal_processor.max_delay_seconds, - ), - ); - let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend( - settings.message_blend.clone(), - membership, - temporal_release, - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - ); + let (temporal_sender, temporal_receiver) = channel::unbounded(); + let (temporal_update_time_sender, temporal_update_time_receiver) = channel::unbounded(); + let temporal_processor_messages = CrossbeamReceiverStream::new(temporal_receiver) + .temporal_stream(TemporalScheduler::new( + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + temporal_update_time_receiver, + ( + 0, + settings.message_blend.temporal_processor.max_delay_seconds, + ), + )); // tier 3 cover traffic let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); @@ -185,6 +202,7 @@ impl BlendNode { Self { id, + step_time, network_interface, // We're not coupling this lifespan with the steps now, but it's okay // We expected that a message will be delivered to most of nodes within 60s. @@ -194,6 +212,8 @@ impl BlendNode { node_id: id, step_id: 0, num_messages_fully_unwrapped: 0, + cur_num_persistent_scheduled: 0, + cur_num_temporal_scheduled: 0, }, data_msg_lottery_update_time_sender, data_msg_lottery_interval, @@ -202,37 +222,28 @@ impl BlendNode { persistent_update_time_sender, persistent_transmission_messages, crypto_processor, - blend_sender, - blend_update_time_sender, - blend_messages, + temporal_sender, + temporal_update_time_sender, + temporal_processor_messages, epoch_update_sender, slot_update_sender, cover_traffic, } } - fn forward( - &mut self, - message: BlendMessage, - exclude_node: Option, - log: Option, - ) { - for (i, node_id) in self + fn forward(&mut self, message: BlendMessage, exclude_node: Option) { + for node_id in self .settings .connected_peers .iter() .filter(|&id| Some(*id) != exclude_node) - .enumerate() { - if i == 0 { - if let Some(log) = &log { - Self::log_emission(log); - } - } - self.network_interface - .send_message(*node_id, message.clone()) + let mut message = message.clone(); + self.record_network_sent_event(&mut message.history, *node_id); + self.network_interface.send_message(*node_id, message) } - self.message_cache.cache_set(Self::sha256(&message.0), ()); + self.message_cache + .cache_set(Self::sha256(&message.message), ()); } fn receive(&mut self) -> Vec> { @@ -242,7 +253,7 @@ impl BlendNode { // Retain only messages that have not been seen before .filter(|msg| { self.message_cache - .cache_set(Self::sha256(&msg.payload().0), ()) + .cache_set(Self::sha256(&msg.payload().message), ()) .is_none() }) .collect() @@ -254,43 +265,116 @@ impl BlendNode { hasher.finalize().into() } + fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) { + self.record_persistent_scheduled_event(&mut message.history); + self.persistent_sender.send(message).unwrap(); + self.state.cur_num_persistent_scheduled += 1; + } + + fn handle_incoming_message(&mut self, message: BlendMessage) { + match self.crypto_processor.unwrap_message(&message.message) { + Ok((unwrapped_message, fully_unwrapped)) => { + let temporal_message = if fully_unwrapped { + BlendOutgoingMessage::FullyUnwrapped(unwrapped_message) + } else { + BlendOutgoingMessage::Outbound(unwrapped_message) + }; + + self.schedule_temporal_processor(BlendOutgoingMessageWithHistory { + outgoing_message: temporal_message, + history: message.history, + }); + } + Err(e) => { + tracing::debug!("Failed to unwrap message: {:?}", e); + } + } + } + + fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) { + self.record_temporal_scheduled_event(&mut message.history); + self.temporal_sender.send(message).unwrap(); + self.state.cur_num_temporal_scheduled += 1; + } + fn update_time(&mut self, elapsed: Duration) { self.data_msg_lottery_update_time_sender .send(elapsed) .unwrap(); self.persistent_update_time_sender.send(elapsed).unwrap(); - self.blend_update_time_sender.send(elapsed).unwrap(); + self.temporal_update_time_sender.send(elapsed).unwrap(); self.epoch_update_sender.send(elapsed).unwrap(); self.slot_update_sender.send(elapsed).unwrap(); } - fn log_message_generated(&self, msg_type: &str, payload: &Payload) { - self.log_message(format!("{}MessageGenerated", msg_type).as_str(), payload); + fn log_message_fully_unwrapped(&self, payload: &Payload, history: MessageHistory) { + let total_duration = history.total_duration(); + log!( + "MessageFullyUnwrapped", + MessageWithHistoryLog { + message: MessageLog { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id.index(), + }, + history, + total_duration, + } + ); } - fn log_message_fully_unwrapped(&self, payload: &Payload) { - self.log_message("MessageFullyUnwrapped", payload); + fn new_blend_message(&self, message: Vec) -> BlendMessage { + BlendMessage::new(message, self.id, self.state.step_id, self.step_time) } - fn log_message(&self, tag: &str, payload: &Payload) { - let log = MessageLog { - payload_id: payload.id(), - step_id: self.state.step_id, - node_id: self.id.index(), - }; - tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap()); + fn record_network_sent_event(&self, history: &mut MessageHistory, to: NodeId) { + self.record_message_event(history, MessageEventType::NetworkSent { to }); } - fn log_emission(log: &EmissionLog) { - tracing::info!("Emission: {}", serde_json::to_string(log).unwrap()); + fn record_network_received_event(&self, history: &mut MessageHistory, from: NodeId) { + assert_eq!( + history.last_event_type(), + Some(&MessageEventType::NetworkSent { to: self.id }) + ); + self.record_message_event(history, MessageEventType::NetworkReceived { from }); } - fn new_emission_log(&self, emission_type: &str) -> EmissionLog { - EmissionLog { - emission_type: emission_type.to_string(), - step_id: self.state.step_id, - node_id: self.id.index(), - } + fn record_persistent_scheduled_event(&self, history: &mut MessageHistory) { + self.record_message_event( + history, + MessageEventType::PersistentTransmissionScheduled { + index: self.state.cur_num_persistent_scheduled, + }, + ); + } + + fn record_persistent_released_event(&self, history: &mut MessageHistory) { + assert!(matches!( + history.last_event_type(), + Some(MessageEventType::PersistentTransmissionScheduled { .. }) + )); + self.record_message_event(history, MessageEventType::PersistentTransmissionReleased); + } + + fn record_temporal_scheduled_event(&self, history: &mut MessageHistory) { + self.record_message_event( + history, + MessageEventType::TemporalProcessorScheduled { + index: self.state.cur_num_temporal_scheduled, + }, + ); + } + + fn record_temporal_released_event(&self, history: &mut MessageHistory) { + assert!(matches!( + history.last_event_type(), + Some(MessageEventType::TemporalProcessorScheduled { .. }) + )); + self.record_message_event(history, MessageEventType::TemporalProcessorReleased); + } + + fn record_message_event(&self, history: &mut MessageHistory, event_type: MessageEventType) { + history.add(self.id, self.state.step_id, self.step_time, event_type); } } @@ -316,38 +400,51 @@ impl Node for BlendNode { if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) { if self.data_msg_lottery.run() { let payload = Payload::new(); - self.log_message_generated("Data", &payload); let message = self .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.persistent_sender.send(message).unwrap(); + self.schedule_persistent_transmission(self.new_blend_message(message)); } } // Handle incoming messages - for network_message in self.receive() { + for mut network_message in self.receive() { + self.record_network_received_event( + &mut network_message.payload.history, + network_message.from, + ); + + if network_message.payload().is_drop() { + continue; + } + self.forward( network_message.payload().clone(), Some(network_message.from), - None, ); - self.blend_sender - .send(network_message.into_payload().0) - .unwrap(); + self.handle_incoming_message(network_message.into_payload()); } - // Proceed message blend - if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) { - match msg { - BlendOutgoingMessage::Outbound(msg) => { - self.persistent_sender.send(msg).unwrap(); + // Proceed temporal processor + if let Poll::Ready(Some(mut outgoing_msg_with_history)) = + pin!(&mut self.temporal_processor_messages).poll_next(&mut cx) + { + self.record_temporal_released_event(&mut outgoing_msg_with_history.history); + self.state.cur_num_temporal_scheduled -= 1; + + // Proceed the message + match outgoing_msg_with_history.outgoing_message { + BlendOutgoingMessage::Outbound(message) => { + self.schedule_persistent_transmission(BlendMessage { + message, + history: outgoing_msg_with_history.history, + }); } BlendOutgoingMessage::FullyUnwrapped(payload) => { let payload = Payload::load(payload); - self.log_message_fully_unwrapped(&payload); + self.log_message_fully_unwrapped(&payload, outgoing_msg_with_history.history); self.state.num_messages_fully_unwrapped += 1; - //TODO: create a tracing event } } } @@ -355,23 +452,20 @@ impl Node for BlendNode { // Generate a cover message probabilistically if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) { let payload = Payload::new(); - self.log_message_generated("Cover", &payload); let message = self .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.persistent_sender.send(message).unwrap(); + self.schedule_persistent_transmission(self.new_blend_message(message)); } // Proceed persistent transmission - if let Poll::Ready(Some(msg)) = + if let Poll::Ready(Some(mut msg)) = pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) { - self.forward( - BlendMessage(msg), - None, - Some(self.new_emission_log("FromPersistent")), - ); + self.record_persistent_released_event(&mut msg.history); + self.state.cur_num_persistent_scheduled -= 1; + self.forward(msg, None); } self.state.step_id += 1; @@ -388,16 +482,17 @@ impl Node for BlendNode { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize)] struct MessageLog { payload_id: PayloadId, step_id: usize, node_id: usize, } -#[derive(Debug, Serialize, Deserialize)] -struct EmissionLog { - emission_type: String, - step_id: usize, - node_id: usize, +#[derive(Debug, Serialize)] +struct MessageWithHistoryLog { + message: MessageLog, + history: MessageHistory, + #[serde(serialize_with = "duration_as_millis")] + total_duration: Duration, } diff --git a/simlib/blendnet-sims/src/node/blend/scheduler.rs b/simlib/blendnet-sims/src/node/blend/scheduler.rs index ac403c8..b6a760d 100644 --- a/simlib/blendnet-sims/src/node/blend/scheduler.rs +++ b/simlib/blendnet-sims/src/node/blend/scheduler.rs @@ -44,14 +44,14 @@ impl Stream for Interval { } } -pub struct TemporalRelease { +pub struct TemporalScheduler { random_sleeps: Box + Send + Sync + 'static>, elapsed: Duration, current_sleep: Duration, update_time: channel::Receiver, } -impl TemporalRelease { +impl TemporalScheduler { pub fn new( mut rng: Rng, update_time: channel::Receiver, @@ -80,7 +80,7 @@ impl TemporalRelease { } } -impl Stream for TemporalRelease { +impl Stream for TemporalScheduler { type Item = (); fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -134,7 +134,7 @@ mod tests { fn temporal_release_update() { let (_tx, rx) = channel::unbounded(); let mut temporal_release = - TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); + TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); assert!(!temporal_release.update(Duration::from_secs(0))); assert!(!temporal_release.update(Duration::from_millis(999))); @@ -149,7 +149,7 @@ mod tests { let (tx, rx) = channel::unbounded(); let mut temporal_release = - TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); + TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); tx.send(Duration::from_secs(0)).unwrap(); assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending); diff --git a/simlib/blendnet-sims/src/node/blend/state.rs b/simlib/blendnet-sims/src/node/blend/state.rs index 7fb4417..a0b99a7 100644 --- a/simlib/blendnet-sims/src/node/blend/state.rs +++ b/simlib/blendnet-sims/src/node/blend/state.rs @@ -15,6 +15,8 @@ pub struct BlendnodeState { pub node_id: NodeId, pub step_id: usize, pub num_messages_fully_unwrapped: usize, + pub cur_num_persistent_scheduled: usize, + pub cur_num_temporal_scheduled: usize, } #[derive(Serialize)] diff --git a/simlib/blendnet-sims/src/node/blend/topology.rs b/simlib/blendnet-sims/src/node/blend/topology.rs index d86d40e..4dd3574 100644 --- a/simlib/blendnet-sims/src/node/blend/topology.rs +++ b/simlib/blendnet-sims/src/node/blend/topology.rs @@ -1,6 +1,12 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; -use netrunner::node::{NodeId, NodeIdExt}; +use netrunner::{ + network::regions::RegionsData, + node::{NodeId, NodeIdExt}, +}; use rand::{seq::SliceRandom, RngCore}; #[derive(Clone)] @@ -121,6 +127,28 @@ impl Topology { hop_count } + pub fn conn_latency_distribution( + &self, + regions_data: &RegionsData, + ) -> HashMap { + // Initialize a distribution + let distribution = regions_data + .region_network_behaviour + .values() + .map(|behaviour| (behaviour.delay(), 0)) + .collect(); + // Populate the distribution + self.0.iter().fold(distribution, |mut acc, (node, peers)| { + let region_a = regions_data.node_region(*node); + peers.iter().for_each(|peer| { + let region_b = regions_data.node_region(*peer); + let behaviour = regions_data.network_behaviour_between_regions(region_a, region_b); + acc.entry(behaviour.delay()).and_modify(|count| *count += 1); + }); + acc + }) + } + pub fn get(&self, node: &NodeId) -> Option<&HashSet> { self.0.get(node) } diff --git a/simlib/netrunner/src/network/mod.rs b/simlib/netrunner/src/network/mod.rs index e88d821..aed8c2d 100644 --- a/simlib/netrunner/src/network/mod.rs +++ b/simlib/netrunner/src/network/mod.rs @@ -618,16 +618,16 @@ mod tests { let node_c = NodeId::from_index(2); let regions = HashMap::from([ - (Region::Asia, vec![node_a, node_b]), + (Region::EastAsia, vec![node_a, node_b]), (Region::Europe, vec![node_c]), ]); let behaviour = HashMap::from([ ( - NetworkBehaviourKey::new(Region::Asia, Region::Asia), + NetworkBehaviourKey::new(Region::EastAsia, Region::EastAsia), NetworkBehaviour::new(Duration::from_millis(100), 0.0), ), ( - NetworkBehaviourKey::new(Region::Asia, Region::Europe), + NetworkBehaviourKey::new(Region::EastAsia, Region::Europe), NetworkBehaviour::new(Duration::from_millis(500), 0.0), ), ( diff --git a/simlib/netrunner/src/network/regions.rs b/simlib/netrunner/src/network/regions.rs index 185375b..55852d8 100644 --- a/simlib/netrunner/src/network/regions.rs +++ b/simlib/netrunner/src/network/regions.rs @@ -10,9 +10,13 @@ use super::{NetworkBehaviourKey, NetworkSettings}; #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub enum Region { - NorthAmerica, + NorthAmericaWest, + NorthAmericaCentral, + NorthAmericaEast, Europe, - Asia, + NorthernEurope, + EastAsia, + SoutheastAsia, Africa, SouthAmerica, Australia, @@ -21,9 +25,13 @@ pub enum Region { impl core::fmt::Display for Region { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { let s = match self { - Self::NorthAmerica => "NorthAmerica", + Self::NorthAmericaWest => "NorthAmericaWest", + Self::NorthAmericaCentral => "NorthAmericaCentral", + Self::NorthAmericaEast => "NorthAmericaEast", Self::Europe => "Europe", - Self::Asia => "Asia", + Self::NorthernEurope => "NorthernEurope", + Self::EastAsia => "EastAsia", + Self::SoutheastAsia => "SoutheastAsia", Self::Africa => "Africa", Self::SouthAmerica => "SouthAmerica", Self::Australia => "Australia", @@ -42,9 +50,13 @@ impl FromStr for Region { .replace(['-', '_', ' '], "") .as_str() { - "northamerica" | "na" => Ok(Self::NorthAmerica), + "northamericawest" | "naw" => Ok(Self::NorthAmericaWest), + "northamericacentral" | "nac" => Ok(Self::NorthAmericaCentral), + "northamericaeast" | "nae" => Ok(Self::NorthAmericaEast), "europe" | "eu" => Ok(Self::Europe), - "asia" | "as" => Ok(Self::Asia), + "northerneurope" | "neu" => Ok(Self::NorthernEurope), + "eastasia" | "eas" => Ok(Self::EastAsia), + "southeastasia" | "seas" => Ok(Self::SoutheastAsia), "africa" | "af" => Ok(Self::Africa), "southamerica" | "sa" => Ok(Self::SouthAmerica), "australia" | "au" => Ok(Self::Australia), @@ -56,9 +68,13 @@ impl FromStr for Region { impl Serialize for Region { fn serialize(&self, serializer: S) -> Result { let s = match self { - Self::NorthAmerica => "North America", + Self::NorthAmericaWest => "North America West", + Self::NorthAmericaCentral => "North America Central", + Self::NorthAmericaEast => "North America East", Self::Europe => "Europe", - Self::Asia => "Asia", + Self::NorthernEurope => "Northern Europe", + Self::EastAsia => "EastAsia", + Self::SoutheastAsia => "Southeast Asia", Self::Africa => "Africa", Self::SouthAmerica => "South America", Self::Australia => "Australia", @@ -105,6 +121,14 @@ impl RegionsData { pub fn network_behaviour(&self, node_a: NodeId, node_b: NodeId) -> &NetworkBehaviour { let region_a = self.node_region[&node_a]; let region_b = self.node_region[&node_b]; + self.network_behaviour_between_regions(region_a, region_b) + } + + pub fn network_behaviour_between_regions( + &self, + region_a: Region, + region_b: Region, + ) -> &NetworkBehaviour { let k = NetworkBehaviourKey::new(region_a, region_b); let k_rev = NetworkBehaviourKey::new(region_b, region_a); self.region_network_behaviour @@ -207,9 +231,13 @@ mod tests { .collect::>(); let available_regions = [ - Region::NorthAmerica, + Region::NorthAmericaWest, + Region::NorthAmericaCentral, + Region::NorthAmericaEast, Region::Europe, - Region::Asia, + Region::NorthernEurope, + Region::EastAsia, + Region::SoutheastAsia, Region::Africa, Region::SouthAmerica, Region::Australia, diff --git a/simlib/netrunner/src/streaming/io.rs b/simlib/netrunner/src/streaming/io.rs index b79d2f6..d0b0ec6 100644 --- a/simlib/netrunner/src/streaming/io.rs +++ b/simlib/netrunner/src/streaming/io.rs @@ -114,7 +114,7 @@ where } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::{collections::HashMap, time::Duration}; use crate::{ @@ -174,43 +174,19 @@ mod tests { RegionsData { regions: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (region, vec![NodeId::from_index(idx)]) }) .collect(), node_region: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (NodeId::from_index(idx), region) }) .collect(), region_network_behaviour: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); ( NetworkBehaviourKey::new(region, region), NetworkBehaviour { @@ -231,4 +207,20 @@ mod tests { .stop_after(Duration::from_millis(100)) .unwrap(); } + + pub(crate) fn region_from_index(idx: usize) -> Region { + match idx % 10 { + 0 => Region::Europe, + 1 => Region::NorthernEurope, + 2 => Region::NorthAmericaWest, + 3 => Region::NorthAmericaCentral, + 4 => Region::NorthAmericaEast, + 5 => Region::SouthAmerica, + 6 => Region::EastAsia, + 7 => Region::SoutheastAsia, + 8 => Region::Africa, + 9 => Region::Australia, + _ => unreachable!(), + } + } } diff --git a/simlib/netrunner/src/streaming/runtime_subscriber.rs b/simlib/netrunner/src/streaming/runtime_subscriber.rs index 2b54fc2..59d8809 100644 --- a/simlib/netrunner/src/streaming/runtime_subscriber.rs +++ b/simlib/netrunner/src/streaming/runtime_subscriber.rs @@ -115,6 +115,7 @@ mod tests { }, output_processors::OutData, runner::SimulationRunner, + streaming::io::tests::region_from_index, warding::SimulationState, }; @@ -160,43 +161,19 @@ mod tests { RegionsData { regions: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (region, vec![NodeId::from_index(idx)]) }) .collect(), node_region: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (NodeId::from_index(idx), region) }) .collect(), region_network_behaviour: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); ( NetworkBehaviourKey::new(region, region), NetworkBehaviour { diff --git a/simlib/netrunner/src/streaming/settings_subscriber.rs b/simlib/netrunner/src/streaming/settings_subscriber.rs index 1dd4ec9..e2f6f3d 100644 --- a/simlib/netrunner/src/streaming/settings_subscriber.rs +++ b/simlib/netrunner/src/streaming/settings_subscriber.rs @@ -116,6 +116,7 @@ mod tests { }, output_processors::OutData, runner::SimulationRunner, + streaming::io::tests::region_from_index, warding::SimulationState, }; @@ -156,43 +157,19 @@ mod tests { RegionsData { regions: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (region, vec![NodeId::from_index(idx)]) }) .collect(), node_region: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); (NodeId::from_index(idx), region) }) .collect(), region_network_behaviour: (0..6) .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; + let region = region_from_index(idx); ( NetworkBehaviourKey::new(region, region), NetworkBehaviour {