store iteration result into csv gradually, and add queuesim_stats CLI to merge all iterations of all paramsets

This commit is contained in:
Youngjoon Lee 2024-08-08 12:33:41 +09:00
parent 00c058528d
commit 8a16d63131
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
4 changed files with 207 additions and 159 deletions

View File

@ -0,0 +1,13 @@
import argparse
from queuesim.statistics import calculate_session_stats
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Calculate statistics for a session.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--dir", type=str, required=True, help="session directory")
args = parser.parse_args()
calculate_session_stats(args.dir)

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import concurrent.futures
import os
import random
@ -5,6 +7,7 @@ import time
import traceback
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime, timedelta
import pandas as pd
@ -50,8 +53,7 @@ DEFAULT_CONFIG = Config(
sender_generator=random.Random(0),
)
RESULT_COLUMNS = [
PARAMSET_INFO_COLUMNS = [
"paramset",
"num_nodes",
"peering_degree",
@ -61,14 +63,6 @@ RESULT_COLUMNS = [
"num_senders",
"queue_type",
"num_iterations",
"dtime_count",
"dtime_mean",
"dtime_std",
"dtime_min",
"dtime_25%",
"dtime_50%",
"dtime_75%",
"dtime_max",
]
@ -88,12 +82,6 @@ def run_session(
assert os.path.isdir(outdir)
subdir = f"__WIP__queuesim_e{exp_id.value}s{session_id.value}_{queue_type.name}_{datetime.now().isoformat()}___DUR__"
os.makedirs(f"{outdir}/{subdir}")
session_result_path = f"{outdir}/{subdir}/session.csv"
assert not os.path.exists(session_result_path)
pd.DataFrame(columns=pd.Series(RESULT_COLUMNS)).to_csv(
session_result_path, index=False
)
print(f"Initialized a CSV file: {session_result_path}")
# Prepare all parameter sets of the session
paramsets = build_parameter_sets(exp_id, session_id, queue_type)
@ -101,9 +89,7 @@ def run_session(
# Run the simulations for each parameter set, using multi processes
session_start_time = time.time()
future_map: dict[
concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int]
] = dict()
future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict()
total_cores = os.cpu_count()
assert total_cores is not None
max_workers = max(1, total_cores - 1)
@ -113,60 +99,40 @@ def run_session(
paramset_id = paramset_idx + 1
if paramset_id < from_paramset:
continue
future_map.update(__submit_iterations(paramset_id, paramset, executor))
paramset_dir = f"{outdir}/{subdir}/paramset_{paramset_id}"
os.makedirs(paramset_dir)
__save_paramset_info(paramset_id, paramset, f"{paramset_dir}/paramset.csv")
future_map.update(
__submit_iterations(paramset_id, paramset, executor, paramset_dir)
)
# Collect results of each iteration
paramset_results: dict[int, tuple[set[int], list[float]]] = defaultdict(
lambda: (set(), [])
# Wait until all parameter sets are done
paramset_results: dict[int, list[tuple[IterationInfo, bool]]] = defaultdict(
list
)
paramsets_done: set[int] = set()
for future in concurrent.futures.as_completed(future_map):
paramset_id, paramset, iter_idx = future_map[future]
try:
dissemination_times = future.result()
except BaseException as e:
msg = (
f"Error occurred in ParamSet-{paramset_id}, IterIdx-{iter_idx}: {e}"
)
print(msg)
traceback.print_exc()
with open(
f"{outdir}/{subdir}/paramset_{paramset_id}_iteridx_{iter_idx}.err",
"a",
) as f:
f.write(f"{msg}\n\n")
traceback.print_exc(file=f)
continue # skip the iteration failed
iter = future_map[future]
succeeded = future.result()
if not succeeded:
print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
print("ITERATION FAILED: See the err file")
print(iter)
print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
paramset_results[paramset_id][0].add(iter_idx)
paramset_results[paramset_id][1].extend(dissemination_times)
# If all iterations of the paramset are done, process the results
if len(paramset_results[paramset_id][0]) == paramset.num_iterations:
paramsets_done.add(paramset_id)
paramset_results[iter.paramset_id].append((iter, succeeded))
# If all iterations of the paramset are done, print a log
if len(paramset_results[iter.paramset_id]) == iter.paramset.num_iterations:
paramsets_done.add(iter.paramset_id)
print("================================================")
print(f"ParamSet-{paramset_id} is done. Processing results...")
print(
f"Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far."
)
print("------------------------------------------------")
__process_paramset_result(
paramset_id,
paramset,
paramset_results[paramset_id][1],
session_result_path,
f"{outdir}/{subdir}/paramset_{paramset_id}.csv",
f"ParamSet-{iter.paramset_id} is done. Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far."
)
print("================================================")
session_elapsed_time = time.time() - session_start_time
session_elapsed_time_str = __format_elapsed_time(session_elapsed_time)
# Load the completed session CSV file, sort rows by paramset_id,
# and overrite the file with the sorted rows.
pd.read_csv(session_result_path).sort_values(by="paramset").to_csv(
session_result_path, index=False
)
# Rename the WIP directory to the final name
new_subdir = subdir.replace("__WIP__", "").replace(
"__DUR__", session_elapsed_time_str
@ -180,70 +146,9 @@ def run_session(
print("******************************************************************")
def __submit_iterations(
paramset_id: int,
paramset: ParameterSet,
executor: concurrent.futures.ProcessPoolExecutor,
) -> dict[concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int]]:
"""
Submit all iterations of the given parameter set to the executor,
so that they can be ran by the ProcessPoolExecutor.
"""
# Prepare the configuration for the parameter set
cfg = deepcopy(DEFAULT_CONFIG)
paramset.apply_to(cfg)
print(
f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}"
)
future_map: dict[
concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int]
] = dict()
for i in range(paramset.num_iterations):
# Update seeds for the current iteration
# Deepcopy the cfg to avoid the same cfg instance between iteration jobs.
iter_cfg = deepcopy(cfg)
iter_cfg.nomssip.temporal_mix.seed_generator = random.Random(i)
iter_cfg.topology.seed = random.Random(i)
iter_cfg.latency.seed = random.Random(i)
iter_cfg.sender_generator = random.Random(i)
# Submit the iteration to the executor
future = executor.submit(__run_iteration, iter_cfg)
future_map[future] = (paramset_id, paramset, i)
return future_map
def __run_iteration(cfg: Config) -> list[float]:
"""
Run a single iteration of a certain parameter set.
The iteration uses the independent uSim instance.
"""
try:
sim = Simulation(cfg)
usim.run(sim.run())
return sim.dissemination_times
except BaseException as e:
print(f"Error in running the iteration: {e}")
traceback.print_exc()
raise
def __process_paramset_result(
paramset_id: int,
paramset: ParameterSet,
dissemination_times: list[float],
session_result_path: str,
paramset_result_path: str,
):
"""
Convert the result into a pd.Series, store the Series into a CSV file,
and append the summary of Series to the session CSV file.
"""
series = pd.Series(dissemination_times)
stats = series.describe()
result = {
def __save_paramset_info(paramset_id: int, paramset: ParameterSet, path: str):
assert not os.path.exists(path)
info = {
"paramset": paramset_id,
"num_nodes": paramset.num_nodes,
"peering_degree": paramset.peering_degree,
@ -253,22 +158,65 @@ def __process_paramset_result(
"num_senders": paramset.num_senders,
"queue_type": paramset.queue_type.name,
"num_iterations": paramset.num_iterations,
"dtime_count": stats["count"],
"dtime_mean": stats["mean"],
"dtime_std": stats["std"],
"dtime_min": stats["min"],
"dtime_25%": stats["25%"],
"dtime_50%": stats["50%"],
"dtime_75%": stats["75%"],
"dtime_max": stats["max"],
}
assert result.keys() == set(RESULT_COLUMNS)
pd.DataFrame([result]).to_csv(
session_result_path, mode="a", header=False, index=False
assert info.keys() == set(PARAMSET_INFO_COLUMNS)
pd.DataFrame([info]).to_csv(path, mode="w", header=True, index=False)
def __submit_iterations(
paramset_id: int,
paramset: ParameterSet,
executor: concurrent.futures.ProcessPoolExecutor,
outdir: str,
) -> dict[concurrent.futures.Future[bool], IterationInfo]:
"""
Submit all iterations of the given parameter set to the executor,
so that they can be ran by the ProcessPoolExecutor.
"""
assert os.path.exists(outdir)
# Prepare the configuration for the parameter set
cfg = deepcopy(DEFAULT_CONFIG)
paramset.apply_to(cfg)
print(
f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}"
)
print(f"Appended a row to {session_result_path}")
series.to_csv(paramset_result_path, header=False, index=False)
print(f"Stored the dissemination times to {paramset_result_path}")
future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict()
for i in range(paramset.num_iterations):
# Update seeds for the current iteration
# Deepcopy the cfg to avoid the same cfg instance between iteration jobs.
iter_cfg = deepcopy(cfg)
iter_cfg.nomssip.temporal_mix.seed_generator = random.Random(i)
iter_cfg.topology.seed = random.Random(i)
iter_cfg.latency.seed = random.Random(i)
iter_cfg.sender_generator = random.Random(i)
# Submit the iteration to the executor
out_csv_path = f"{outdir}/iteration_{i}.csv"
err_path = f"{outdir}/iteration_{i}.err"
future = executor.submit(__run_iteration, iter_cfg, out_csv_path, err_path)
future_map[future] = IterationInfo(
paramset_id, paramset, i, out_csv_path, err_path
)
return future_map
def __run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> bool:
"""
Run a single iteration of a certain parameter set.
The iteration uses the independent uSim instance.
Returns False if exception happened.
"""
try:
sim = Simulation(cfg)
usim.run(sim.run(out_csv_path))
return True
except BaseException as e:
with open(err_path, "w") as f:
traceback.print_exc(file=f)
return False
def __format_elapsed_time(elapsed_time: float) -> str:
@ -276,3 +224,12 @@ def __format_elapsed_time(elapsed_time: float) -> str:
hours, reminder = divmod(td.seconds, 3600)
minutes, seconds = divmod(reminder, 60)
return f"{td.days}d{hours:02}h{minutes:02}m{seconds:02}s"
@dataclass
class IterationInfo:
paramset_id: int
paramset: ParameterSet
iteration_idx: int
out_csv_path: str
err_path: str

View File

@ -1,3 +1,4 @@
import csv
import struct
from dataclasses import dataclass
from typing import Counter, Self
@ -21,14 +22,14 @@ class Simulation:
def __init__(self, config: Config):
self.config = config
async def run(self):
async def run(self, out_csv_path: str):
async with usim.Scope() as scope:
self.framework = Framework(scope)
self.message_builder = MessageBuilder(self.framework)
self.dissemination_times = await self.__run()
await self.__run(out_csv_path)
self.framework.stop_tasks()
async def __run(self) -> list[float]:
async def __run(self, out_csv_path: str):
self.received_msg_queue: Queue[tuple[float, bytes]] = self.framework.queue()
# Run and connect nodes
@ -40,24 +41,27 @@ class Simulation:
for sender in senders:
self.framework.spawn(self.__run_sender(sender))
# To count how many nodes have received each message
received_msg_counters: Counter[bytes] = Counter()
# To collect the dissemination times of each message.
dissemination_times: list[float] = []
# Wait until all messages are disseminated to the entire network.
while (
len(dissemination_times)
< self.config.num_sent_msgs * self.config.num_senders
):
# Wait until a node notifies that it has received a new message.
received_time, msg = await self.received_msg_queue.get()
# If the message has been received by all nodes, calculate the dissemination time.
received_msg_counters.update([msg])
if received_msg_counters[msg] == len(nodes):
dissemination_times.append(
received_time - Message.from_bytes(msg).sent_time
)
return dissemination_times
# Open the output CSV file
with open(out_csv_path, "w", newline="") as f:
# Use CSV writer which is less error-prone than manually writing rows to the file
writer = csv.writer(f)
# To count how many nodes have received each message
received_msg_counters: Counter[bytes] = Counter()
# To count how many results (dissemination time) have been collected so far
result_cnt = 0
# Wait until all messages are disseminated to the entire network.
while result_cnt < self.config.num_sent_msgs * self.config.num_senders:
# Wait until a node notifies that it has received a new message.
received_time, msg = await self.received_msg_queue.get()
# If the message has been received by all nodes, calculate the dissemination time.
received_msg_counters.update([msg])
if received_msg_counters[msg] == len(nodes):
dissemination_time = (
received_time - Message.from_bytes(msg).sent_time
)
# Use repr to convert a float to a string with as much precision as Python can provide
writer.writerow([repr(dissemination_time)])
result_cnt += 1
def __run_nodes(self) -> list[Node]:
return [

View File

@ -0,0 +1,74 @@
import glob
import os
import pandas as pd
from queuesim.queuesim import PARAMSET_INFO_COLUMNS
RESULT_COLUMNS = PARAMSET_INFO_COLUMNS + [
"dtime_count",
"dtime_mean",
"dtime_std",
"dtime_min",
"dtime_25%",
"dtime_50%",
"dtime_75%",
"dtime_max",
]
def calculate_session_stats(dir: str):
session_result_path = f"{dir}/session.csv"
assert not os.path.exists(session_result_path)
pd.DataFrame(columns=pd.Series(RESULT_COLUMNS)).to_csv(
session_result_path, index=False
)
print(f"Initialized a CSV file: {session_result_path}")
paramset_dirs = [
path for path in glob.glob(f"{dir}/paramset_*") if os.path.isdir(path)
]
for paramset_dir in paramset_dirs:
__calculate_paramset_stats(paramset_dir, session_result_path)
print(f"Appended a row to {session_result_path}")
# Load the completed session CSV file, sort rows by paramset_id,
# and overrite the file with the sorted rows.
pd.read_csv(session_result_path).sort_values(by="paramset").to_csv(
session_result_path, index=False
)
def __calculate_paramset_stats(paramset_dir: str, session_result_path: str):
info = pd.read_csv(f"{paramset_dir}/paramset.csv")
series_list = []
for iter_csv in glob.glob(f"{paramset_dir}/iteration_*.csv"):
df = pd.read_csv(iter_csv, header=None)
series_list.append(pd.Series(df.squeeze()))
series = pd.concat(series_list, ignore_index=True)
stats = series.describe()
result = {
"paramset": info["paramset"].iloc[0],
"num_nodes": info["num_nodes"].iloc[0],
"peering_degree": info["peering_degree"].iloc[0],
"min_queue_size": info["min_queue_size"].iloc[0],
"transmission_rate": info["transmission_rate"].iloc[0],
"num_sent_msgs": info["num_sent_msgs"].iloc[0],
"num_senders": info["num_senders"].iloc[0],
"queue_type": info["queue_type"].iloc[0],
"num_iterations": info["num_iterations"].iloc[0],
"dtime_count": stats["count"],
"dtime_mean": stats["mean"],
"dtime_std": stats["std"],
"dtime_min": stats["min"],
"dtime_25%": stats["25%"],
"dtime_50%": stats["50%"],
"dtime_75%": stats["75%"],
"dtime_max": stats["max"],
}
assert result.keys() == set(RESULT_COLUMNS)
pd.DataFrame([result]).to_csv(
session_result_path, mode="a", header=False, index=False
)