write csv to file instead of stdout

This commit is contained in:
Youngjoon Lee 2024-12-21 21:46:00 +09:00
parent 50eb3cad7b
commit 0a688f59fa
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
1 changed files with 110 additions and 101 deletions

View File

@ -111,117 +111,126 @@ for idx, config_path in enumerate(config_paths):
print("Analyzing logs...")
print("=================")
csv_writer = csv.writer(sys.stdout)
csv_writer.writerow(
[
"network_diameter",
"msg_count",
"min_latency",
"avg_latency",
"median_latency",
"max_latency",
"min_latency_msg_id",
"min_latency_msg_persistent_latency_ms",
"min_latency_msg_persistent_queue_sizes",
"min_latency_msg_temporal_latency_ms",
"min_latency_msg_temporal_queue_sizes",
"max_latency_msg_id",
"max_latency_msg_persistent_latency_ms",
"max_latency_msg_persistent_queue_sizes",
"max_latency_msg_temporal_latency_ms",
"max_latency_msg_temporal_queue_sizes",
"min_bandwidth_kbps",
"avg_bandwidth_kbps",
"max_bandwidth_kbps",
]
)
for idx, log_path in enumerate(log_paths):
csv_row = []
csv_row.append(topology_result(log_path)["diameter"])
message_storage, node_storage = latency.parse_record_stream(
mixlog.get_input_stream(log_path)
with open("output.csv", "w", newline="") as file:
csv_writer = csv.writer(file)
csv_writer.writerow(
[
"network_diameter",
"msg_count",
"min_latency",
"avg_latency",
"median_latency",
"max_latency",
"min_latency_msg_id",
"min_latency_msg_persistent_latency_ms",
"min_latency_msg_persistent_queue_sizes",
"min_latency_msg_temporal_latency_ms",
"min_latency_msg_temporal_queue_sizes",
"max_latency_msg_id",
"max_latency_msg_persistent_latency_ms",
"max_latency_msg_persistent_queue_sizes",
"max_latency_msg_temporal_latency_ms",
"max_latency_msg_temporal_queue_sizes",
"min_bandwidth_kbps",
"avg_bandwidth_kbps",
"max_bandwidth_kbps",
]
)
with open(f"{log_dir}/msgs-{idx}.json", "w") as file:
json.dump(
{msg_id: asdict(msg) for msg_id, msg in message_storage.items()},
file,
indent=2,
for idx, log_path in enumerate(log_paths):
csv_row = []
csv_row.append(topology_result(log_path)["diameter"])
message_storage, node_storage = latency.parse_record_stream(
mixlog.get_input_stream(log_path)
)
with open(f"{log_dir}/nodes-{idx}.json", "w") as file:
json.dump(node_storage.to_dict(), file, indent=2)
with open(f"{log_dir}/msgs-{idx}.json", "w") as file:
json.dump(
{msg_id: asdict(msg) for msg_id, msg in message_storage.items()},
file,
indent=2,
)
with open(f"{log_dir}/nodes-{idx}.json", "w") as file:
json.dump(node_storage.to_dict(), file, indent=2)
latency_analysis = latency.LatencyAnalysis.build(
message_storage, node_storage, STEP_DURATION_MS
)
csv_row.append(latency_analysis.total_complete_messages)
csv_row.append(float(latency_analysis.min_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.avg_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.median_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.max_latency_ms) / 1000.0)
csv_row.append(latency_analysis.min_latency_analysis.message_id)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.min_latency_analysis.persistent_latencies_ms
],
latency_analysis = latency.LatencyAnalysis.build(
message_storage, node_storage, STEP_DURATION_MS
)
csv_row.append(latency_analysis.total_complete_messages)
csv_row.append(float(latency_analysis.min_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.avg_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.median_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.max_latency_ms) / 1000.0)
csv_row.append(latency_analysis.min_latency_analysis.message_id)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.min_latency_analysis.persistent_latencies_ms
],
)
)
)
)
csv_row.append(
",".join(map(str, latency_analysis.min_latency_analysis.persistent_queue_sizes))
)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.min_latency_analysis.temporal_latencies_ms
],
csv_row.append(
",".join(
map(str, latency_analysis.min_latency_analysis.persistent_queue_sizes)
)
)
)
csv_row.append(
",".join(map(str, latency_analysis.min_latency_analysis.temporal_queue_sizes))
)
csv_row.append(latency_analysis.max_latency_analysis.message_id)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.persistent_latencies_ms
],
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.min_latency_analysis.temporal_latencies_ms
],
)
)
)
)
csv_row.append(
",".join(map(str, latency_analysis.max_latency_analysis.persistent_queue_sizes))
)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.temporal_latencies_ms
],
csv_row.append(
",".join(
map(str, latency_analysis.min_latency_analysis.temporal_queue_sizes)
)
)
csv_row.append(latency_analysis.max_latency_analysis.message_id)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.persistent_latencies_ms
],
)
)
)
csv_row.append(
",".join(
map(str, latency_analysis.max_latency_analysis.persistent_queue_sizes)
)
)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.temporal_latencies_ms
],
)
)
)
csv_row.append(
",".join(
map(str, latency_analysis.max_latency_analysis.temporal_queue_sizes)
)
)
)
csv_row.append(
",".join(map(str, latency_analysis.max_latency_analysis.temporal_queue_sizes))
)
bandwidth_res = bandwidth_result(log_path)
csv_row.append(bandwidth_res["min"] * 8 / 1000.0)
csv_row.append(bandwidth_res["avg"] * 8 / 1000.0)
csv_row.append(bandwidth_res["max"] * 8 / 1000.0)
bandwidth_res = bandwidth_result(log_path)
csv_row.append(bandwidth_res["min"] * 8 / 1000.0)
csv_row.append(bandwidth_res["avg"] * 8 / 1000.0)
csv_row.append(bandwidth_res["max"] * 8 / 1000.0)
csv_writer.writerow(csv_row)
csv_writer.writerow(csv_row)