update batch.py

This commit is contained in:
Youngjoon Lee 2025-02-06 15:04:39 +09:00
parent 6bf7ae49bd
commit 1a4657202b
No known key found for this signature in database
GPG Key ID: D94003D91DE12141

View File

@ -1,22 +1,20 @@
# !/usr/bin/env python
import argparse
import csv import csv
import json import json
import os import os
import subprocess import subprocess
import sys
from collections import OrderedDict from collections import OrderedDict
from dataclasses import asdict
import latency import latency
import mixlog import mixlog
STEP_DURATION_MS = 50
def bandwidth_result(log_path: str, step_duration_ms: int) -> dict[str, float]:
def bandwidth_result(log_path: str) -> dict[str, float]:
max_step_id = 0 max_step_id = 0
for _, json_msg in mixlog.get_input_stream(log_path): for topic, json_msg in mixlog.get_input_stream(log_path):
if (step_id := json_msg.get("step_id")) is not None: if topic == "MessageFullyUnwrapped":
max_step_id = max(max_step_id, step_id) max_step_id = max(max_step_id, json_msg["message"]["step_id"])
with open(log_path, "r") as file: with open(log_path, "r") as file:
for line in file: for line in file:
@ -27,7 +25,7 @@ def bandwidth_result(log_path: str) -> dict[str, float]:
line = line.replace(", ", ', "') line = line.replace(", ", ', "')
record = json.loads(line) record = json.loads(line)
elapsed = (max_step_id * STEP_DURATION_MS) / 1000.0 elapsed = (max_step_id * step_duration_ms) / 1000.0
return { return {
"min": float(record["min_node_total_bandwidth"]) / elapsed, "min": float(record["min_node_total_bandwidth"]) / elapsed,
"avg": float(record["avg_node_total_bandwidth"]) / elapsed, "avg": float(record["avg_node_total_bandwidth"]) / elapsed,
@ -44,196 +42,232 @@ def topology_result(log_path: str) -> dict[str, int]:
raise Exception("No topology found in log file") raise Exception("No topology found in log file")
# Read the CSV data def build_argument_parser() -> argparse.ArgumentParser:
csv_data = [] parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.")
with open("params.csv", mode="r") as csvfile: # Replace with your CSV file name parser.add_argument(
reader = csv.DictReader(csvfile, delimiter=",") "--step-duration",
csv_data = list(reader) type=int,
help="Duration (in ms) of each step in the simulation.",
# Read the original blendnet.json
with open("../config/blendnet.json", "r") as jsonfile:
original_json = json.load(jsonfile)
# Directory to save modified JSON files
output_dir = "modified_configs"
os.makedirs(output_dir, exist_ok=True)
# Modify and save JSON files for each row in CSV
config_paths = []
for idx, row in enumerate(csv_data):
modified_json = OrderedDict(original_json) # Preserve original field order
# Apply modifications
modified_json["network_settings"]["regions"]["north america"] = 0.0
modified_json["network_settings"]["regions"]["europe"] = 1.0
modified_json["network_settings"]["regions"]["asia"] = 0.0
modified_json["step_time"] = f"{STEP_DURATION_MS}ms"
modified_json["node_count"] = int(row["network_size"])
modified_json["wards"][0]["sum"] = 1000
modified_json["connected_peers_count"] = int(row["peering_degree"])
modified_json["data_message_lottery_interval"] = "20s"
modified_json["stake_proportion"] = 0.0
modified_json["persistent_transmission"]["max_emission_frequency"] = 1.0
modified_json["persistent_transmission"]["drop_message_probability"] = 0.0
modified_json["epoch_duration"] = (
f"{int(row['cover_slots_per_epoch']) * int(row['cover_slot_duration'])}s"
) )
modified_json["slots_per_epoch"] = int(row["cover_slots_per_epoch"]) parser.add_argument(
modified_json["slot_duration"] = f"{row['cover_slot_duration']}s" "--params-file",
modified_json["max_delay_seconds"] = int(row["max_temporal_delay"]) type=str,
modified_json["number_of_hops"] = int(row["blend_hops"]) help="A CSV file that contains all parameter sets",
modified_json["number_of_blend_layers"] = int(row["blend_hops"])
# Save modified JSON
output_path = os.path.join(output_dir, f"{idx}.json")
with open(output_path, "w") as outfile:
json.dump(modified_json, outfile, indent=2)
print("Saved modified JSON to:", output_path)
config_paths.append(output_path)
# Directory to save logs
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
log_paths = []
for idx, config_path in enumerate(config_paths):
log_path = f"{log_dir}/{idx}.log"
with open(log_path, "w") as log_file:
print(f"Running simulation-{idx}: {log_file.name} with config: {config_path}")
subprocess.run(
["../../target/release/blendnet-sims", "--input-settings", config_path],
stdout=log_file,
)
print(f"Simulation-{idx} completed: {log_file.name}")
log_paths.append(log_path)
print("Analyzing logs...")
print("=================")
with open("output.csv", "w", newline="") as file:
print(f"Writing results to: {file.name}")
csv_writer = csv.writer(file)
csv_writer.writerow(
[
"network_diameter",
"msg_count",
"min_latency",
"avg_latency",
"median_latency",
"max_latency",
"min_latency_msg_id",
"min_latency_msg_persistent_latency_ms",
"min_latency_msg_persistent_queue_sizes",
"min_latency_msg_temporal_latency_ms",
"min_latency_msg_temporal_queue_sizes",
"max_latency_msg_id",
"max_latency_msg_persistent_latency_ms",
"max_latency_msg_persistent_queue_sizes",
"max_latency_msg_temporal_latency_ms",
"max_latency_msg_temporal_queue_sizes",
"min_bandwidth_kbps",
"avg_bandwidth_kbps",
"max_bandwidth_kbps",
]
) )
parser.add_argument(
"--orig-config-file",
type=str,
help="An original blendnet config JSON file that will be modified as specified in `params_file`",
)
parser.add_argument(
"--outdir",
type=str,
help="A output directory to be created",
)
parser.add_argument(
"--skip-run",
action="store_true",
help="Skip running the simulations and only analyze the logs",
)
return parser
for idx, log_path in enumerate(log_paths):
csv_row = []
csv_row.append(topology_result(log_path)["diameter"])
message_storage, node_storage = latency.parse_record_stream( if __name__ == "__main__":
mixlog.get_input_stream(log_path) argument_parser = build_argument_parser()
args = argument_parser.parse_args()
# Read the params CSV file
csv_data = []
with open(args.params_file, mode="r") as csvfile:
reader = csv.DictReader(csvfile, delimiter=",")
csv_data = list(reader)
# Read the original blendnet json config file
with open(args.orig_config_file, "r") as jsonfile:
original_json = json.load(jsonfile)
# Directory to save modified JSON files
modified_configs_dir = os.path.join(args.outdir, "modified_configs")
os.makedirs(modified_configs_dir, exist_ok=True)
# Modify and save JSON files for each row in CSV
config_paths = []
for idx, row in enumerate(csv_data):
output_path = os.path.join(modified_configs_dir, f"{idx}.json")
config_paths.append(output_path)
if args.skip_run:
continue
modified_json = OrderedDict(original_json) # Preserve original field order
# Apply modifications
modified_json["network_settings"]["regions"]["north america west"] = 0.06
modified_json["network_settings"]["regions"]["north america east"] = 0.15
modified_json["network_settings"]["regions"]["north america central"] = 0.02
modified_json["network_settings"]["regions"]["europe"] = 0.47
modified_json["network_settings"]["regions"]["northern europe"] = 0.10
modified_json["network_settings"]["regions"]["east asia"] = 0.10
modified_json["network_settings"]["regions"]["southeast asia"] = 0.07
modified_json["network_settings"]["regions"]["australia"] = 0.03
modified_json["step_time"] = f"{args.step_duration}ms"
modified_json["node_count"] = int(row["network_size"])
modified_json["wards"][0]["sum"] = 1000
modified_json["connected_peers_count"] = int(row["peering_degree"])
modified_json["data_message_lottery_interval"] = "20s"
modified_json["stake_proportion"] = 0.0
modified_json["persistent_transmission"]["max_emission_frequency"] = 1.0
modified_json["persistent_transmission"]["drop_message_probability"] = 0.0
modified_json["epoch_duration"] = (
f"{int(row['cover_slots_per_epoch']) * int(row['cover_slot_duration'])}s"
) )
with open(f"{log_dir}/msgs-{idx}.json", "w") as file: modified_json["slots_per_epoch"] = int(row["cover_slots_per_epoch"])
json.dump( modified_json["slot_duration"] = f"{row['cover_slot_duration']}s"
{msg_id: asdict(msg) for msg_id, msg in message_storage.items()}, modified_json["max_delay_seconds"] = int(row["max_temporal_delay"])
file, modified_json["number_of_hops"] = int(row["blend_hops"])
indent=2, modified_json["number_of_blend_layers"] = int(row["blend_hops"])
# Save modified JSON
with open(output_path, "w") as outfile:
json.dump(modified_json, outfile, indent=2)
print("Saved modified JSON to:", output_path)
# Directory to save logs
log_dir = os.path.join(args.outdir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_paths = []
for idx, config_path in enumerate(config_paths):
log_path = f"{log_dir}/{idx}.log"
log_paths.append(log_path)
if args.skip_run:
continue
with open(log_path, "w") as log_file:
print(
f"Running simulation-{idx}: {log_file.name} with config: {config_path}"
) )
with open(f"{log_dir}/nodes-{idx}.json", "w") as file: subprocess.run(
json.dump(node_storage.to_dict(), file, indent=2) ["../../target/release/blendnet-sims", "--input-settings", config_path],
stdout=log_file,
)
print(f"Simulation-{idx} completed: {log_file.name}")
latency_analysis = latency.LatencyAnalysis.build( print("Analyzing logs...")
message_storage, node_storage, STEP_DURATION_MS print("=================")
with open(os.path.join(args.outdir, "output.csv"), "w", newline="") as file:
print(f"Writing results to: {file.name}")
csv_writer = csv.writer(file)
csv_writer.writerow(
[
"network_diameter",
"msg_count",
"min_latency_sec",
"avg_latency_sec",
"median_latency_sec",
"max_latency_sec",
"min_latency_msg_id",
"min_latency_msg_persistent_latency_sec",
"min_latency_msg_persistent_index",
"min_latency_msg_temporal_latency_sec",
"min_latency_msg_temporal_index",
"max_latency_msg_id",
"max_latency_msg_persistent_latency_sec",
"max_latency_msg_persistent_index",
"max_latency_msg_temporal_latency_sec",
"max_latency_msg_temporal_index",
"min_bandwidth_kbps",
"avg_bandwidth_kbps",
"max_bandwidth_kbps",
]
) )
csv_row.append(latency_analysis.total_complete_messages)
csv_row.append(float(latency_analysis.min_latency_ms) / 1000.0) for idx, log_path in enumerate(log_paths):
csv_row.append(float(latency_analysis.avg_latency_ms) / 1000.0) csv_row = []
csv_row.append(float(latency_analysis.median_latency_ms) / 1000.0) csv_row.append(topology_result(log_path)["diameter"])
csv_row.append(float(latency_analysis.max_latency_ms) / 1000.0)
csv_row.append(latency_analysis.min_latency_analysis.message_id) latency_analysis = latency.LatencyAnalysis.build(
csv_row.append( mixlog.get_input_stream(log_path)
",".join( )
map( csv_row.append(latency_analysis.total_messages)
str, csv_row.append(float(latency_analysis.min_latency_ms) / 1000.0)
[ csv_row.append(float(latency_analysis.avg_latency_ms) / 1000.0)
ms / 1000.0 csv_row.append(float(latency_analysis.median_latency_ms) / 1000.0)
for ms in latency_analysis.min_latency_analysis.persistent_latencies_ms csv_row.append(float(latency_analysis.max_latency_ms) / 1000.0)
], csv_row.append(latency_analysis.min_latency_analysis.message_id)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.min_latency_analysis.persistent_latencies_ms
],
)
) )
) )
) csv_row.append(
csv_row.append( ",".join(
",".join( map(str, latency_analysis.min_latency_analysis.persistent_indices)
map(str, latency_analysis.min_latency_analysis.persistent_queue_sizes)
)
)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.min_latency_analysis.temporal_latencies_ms
],
) )
) )
) csv_row.append(
csv_row.append( ",".join(
",".join( map(
map(str, latency_analysis.min_latency_analysis.temporal_queue_sizes) str,
) [
) ms / 1000.0
csv_row.append(latency_analysis.max_latency_analysis.message_id) for ms in latency_analysis.min_latency_analysis.temporal_latencies_ms
csv_row.append( ],
",".join( )
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.persistent_latencies_ms
],
) )
) )
) csv_row.append(
csv_row.append( ",".join(
",".join( map(str, latency_analysis.min_latency_analysis.temporal_indices)
map(str, latency_analysis.max_latency_analysis.persistent_queue_sizes)
)
)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.temporal_latencies_ms
],
) )
) )
) csv_row.append(latency_analysis.max_latency_analysis.message_id)
csv_row.append( csv_row.append(
",".join( ",".join(
map(str, latency_analysis.max_latency_analysis.temporal_queue_sizes) map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.persistent_latencies_ms
],
)
)
)
csv_row.append(
",".join(
map(str, latency_analysis.max_latency_analysis.persistent_indices)
)
)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.temporal_latencies_ms
],
)
)
)
csv_row.append(
",".join(
map(str, latency_analysis.max_latency_analysis.temporal_indices)
)
) )
)
bandwidth_res = bandwidth_result(log_path) bandwidth_res = bandwidth_result(log_path, args.step_duration)
csv_row.append(bandwidth_res["min"] * 8 / 1000.0) csv_row.append(bandwidth_res["min"] * 8 / 1000.0)
csv_row.append(bandwidth_res["avg"] * 8 / 1000.0) csv_row.append(bandwidth_res["avg"] * 8 / 1000.0)
csv_row.append(bandwidth_res["max"] * 8 / 1000.0) csv_row.append(bandwidth_res["max"] * 8 / 1000.0)
csv_writer.writerow(csv_row) csv_writer.writerow(csv_row)
print(f"The outputs have been successfully written to {file.name}") print(f"The outputs have been successfully written to {file.name}")