diff --git a/mixnet/cmd/queuesim_stats.py b/mixnet/cmd/queuesim_stats.py new file mode 100644 index 0000000..45789ea --- /dev/null +++ b/mixnet/cmd/queuesim_stats.py @@ -0,0 +1,13 @@ +import argparse + +from queuesim.statistics import calculate_session_stats + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Calculate statistics for a session.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("--dir", type=str, required=True, help="session directory") + args = parser.parse_args() + + calculate_session_stats(args.dir) diff --git a/mixnet/queuesim/queuesim.py b/mixnet/queuesim/queuesim.py index 6d00fba..d440f01 100644 --- a/mixnet/queuesim/queuesim.py +++ b/mixnet/queuesim/queuesim.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import concurrent.futures import os import random @@ -5,6 +7,7 @@ import time import traceback from collections import defaultdict from copy import deepcopy +from dataclasses import dataclass from datetime import datetime, timedelta import pandas as pd @@ -50,8 +53,7 @@ DEFAULT_CONFIG = Config( sender_generator=random.Random(0), ) - -RESULT_COLUMNS = [ +PARAMSET_INFO_COLUMNS = [ "paramset", "num_nodes", "peering_degree", @@ -61,14 +63,6 @@ RESULT_COLUMNS = [ "num_senders", "queue_type", "num_iterations", - "dtime_count", - "dtime_mean", - "dtime_std", - "dtime_min", - "dtime_25%", - "dtime_50%", - "dtime_75%", - "dtime_max", ] @@ -88,12 +82,6 @@ def run_session( assert os.path.isdir(outdir) subdir = f"__WIP__queuesim_e{exp_id.value}s{session_id.value}_{queue_type.name}_{datetime.now().isoformat()}___DUR__" os.makedirs(f"{outdir}/{subdir}") - session_result_path = f"{outdir}/{subdir}/session.csv" - assert not os.path.exists(session_result_path) - pd.DataFrame(columns=pd.Series(RESULT_COLUMNS)).to_csv( - session_result_path, index=False - ) - print(f"Initialized a CSV file: {session_result_path}") # Prepare all parameter sets of the session paramsets = build_parameter_sets(exp_id, session_id, queue_type) @@ -101,9 +89,7 @@ def run_session( # Run the simulations for each parameter set, using multi processes session_start_time = time.time() - future_map: dict[ - concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int] - ] = dict() + future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict() total_cores = os.cpu_count() assert total_cores is not None max_workers = max(1, total_cores - 1) @@ -113,60 +99,40 @@ def run_session( paramset_id = paramset_idx + 1 if paramset_id < from_paramset: continue - future_map.update(__submit_iterations(paramset_id, paramset, executor)) + paramset_dir = f"{outdir}/{subdir}/paramset_{paramset_id}" + os.makedirs(paramset_dir) + __save_paramset_info(paramset_id, paramset, f"{paramset_dir}/paramset.csv") + future_map.update( + __submit_iterations(paramset_id, paramset, executor, paramset_dir) + ) - # Collect results of each iteration - paramset_results: dict[int, tuple[set[int], list[float]]] = defaultdict( - lambda: (set(), []) + # Wait until all parameter sets are done + paramset_results: dict[int, list[tuple[IterationInfo, bool]]] = defaultdict( + list ) paramsets_done: set[int] = set() for future in concurrent.futures.as_completed(future_map): - paramset_id, paramset, iter_idx = future_map[future] - try: - dissemination_times = future.result() - except BaseException as e: - msg = ( - f"Error occurred in ParamSet-{paramset_id}, IterIdx-{iter_idx}: {e}" - ) - print(msg) - traceback.print_exc() - with open( - f"{outdir}/{subdir}/paramset_{paramset_id}_iteridx_{iter_idx}.err", - "a", - ) as f: - f.write(f"{msg}\n\n") - traceback.print_exc(file=f) - continue # skip the iteration failed + iter = future_map[future] + succeeded = future.result() + if not succeeded: + print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") + print("ITERATION FAILED: See the err file") + print(iter) + print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") - paramset_results[paramset_id][0].add(iter_idx) - paramset_results[paramset_id][1].extend(dissemination_times) - # If all iterations of the paramset are done, process the results - if len(paramset_results[paramset_id][0]) == paramset.num_iterations: - paramsets_done.add(paramset_id) + paramset_results[iter.paramset_id].append((iter, succeeded)) + # If all iterations of the paramset are done, print a log + if len(paramset_results[iter.paramset_id]) == iter.paramset.num_iterations: + paramsets_done.add(iter.paramset_id) print("================================================") - print(f"ParamSet-{paramset_id} is done. Processing results...") print( - f"Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far." - ) - print("------------------------------------------------") - __process_paramset_result( - paramset_id, - paramset, - paramset_results[paramset_id][1], - session_result_path, - f"{outdir}/{subdir}/paramset_{paramset_id}.csv", + f"ParamSet-{iter.paramset_id} is done. Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far." ) print("================================================") session_elapsed_time = time.time() - session_start_time session_elapsed_time_str = __format_elapsed_time(session_elapsed_time) - # Load the completed session CSV file, sort rows by paramset_id, - # and overrite the file with the sorted rows. - pd.read_csv(session_result_path).sort_values(by="paramset").to_csv( - session_result_path, index=False - ) - # Rename the WIP directory to the final name new_subdir = subdir.replace("__WIP__", "").replace( "__DUR__", session_elapsed_time_str @@ -180,70 +146,9 @@ def run_session( print("******************************************************************") -def __submit_iterations( - paramset_id: int, - paramset: ParameterSet, - executor: concurrent.futures.ProcessPoolExecutor, -) -> dict[concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int]]: - """ - Submit all iterations of the given parameter set to the executor, - so that they can be ran by the ProcessPoolExecutor. - """ - # Prepare the configuration for the parameter set - cfg = deepcopy(DEFAULT_CONFIG) - paramset.apply_to(cfg) - - print( - f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}" - ) - - future_map: dict[ - concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int] - ] = dict() - for i in range(paramset.num_iterations): - # Update seeds for the current iteration - # Deepcopy the cfg to avoid the same cfg instance between iteration jobs. - iter_cfg = deepcopy(cfg) - iter_cfg.nomssip.temporal_mix.seed_generator = random.Random(i) - iter_cfg.topology.seed = random.Random(i) - iter_cfg.latency.seed = random.Random(i) - iter_cfg.sender_generator = random.Random(i) - # Submit the iteration to the executor - future = executor.submit(__run_iteration, iter_cfg) - future_map[future] = (paramset_id, paramset, i) - - return future_map - - -def __run_iteration(cfg: Config) -> list[float]: - """ - Run a single iteration of a certain parameter set. - The iteration uses the independent uSim instance. - """ - try: - sim = Simulation(cfg) - usim.run(sim.run()) - return sim.dissemination_times - except BaseException as e: - print(f"Error in running the iteration: {e}") - traceback.print_exc() - raise - - -def __process_paramset_result( - paramset_id: int, - paramset: ParameterSet, - dissemination_times: list[float], - session_result_path: str, - paramset_result_path: str, -): - """ - Convert the result into a pd.Series, store the Series into a CSV file, - and append the summary of Series to the session CSV file. - """ - series = pd.Series(dissemination_times) - stats = series.describe() - result = { +def __save_paramset_info(paramset_id: int, paramset: ParameterSet, path: str): + assert not os.path.exists(path) + info = { "paramset": paramset_id, "num_nodes": paramset.num_nodes, "peering_degree": paramset.peering_degree, @@ -253,22 +158,65 @@ def __process_paramset_result( "num_senders": paramset.num_senders, "queue_type": paramset.queue_type.name, "num_iterations": paramset.num_iterations, - "dtime_count": stats["count"], - "dtime_mean": stats["mean"], - "dtime_std": stats["std"], - "dtime_min": stats["min"], - "dtime_25%": stats["25%"], - "dtime_50%": stats["50%"], - "dtime_75%": stats["75%"], - "dtime_max": stats["max"], } - assert result.keys() == set(RESULT_COLUMNS) - pd.DataFrame([result]).to_csv( - session_result_path, mode="a", header=False, index=False + assert info.keys() == set(PARAMSET_INFO_COLUMNS) + pd.DataFrame([info]).to_csv(path, mode="w", header=True, index=False) + + +def __submit_iterations( + paramset_id: int, + paramset: ParameterSet, + executor: concurrent.futures.ProcessPoolExecutor, + outdir: str, +) -> dict[concurrent.futures.Future[bool], IterationInfo]: + """ + Submit all iterations of the given parameter set to the executor, + so that they can be ran by the ProcessPoolExecutor. + """ + assert os.path.exists(outdir) + + # Prepare the configuration for the parameter set + cfg = deepcopy(DEFAULT_CONFIG) + paramset.apply_to(cfg) + + print( + f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}" ) - print(f"Appended a row to {session_result_path}") - series.to_csv(paramset_result_path, header=False, index=False) - print(f"Stored the dissemination times to {paramset_result_path}") + + future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict() + for i in range(paramset.num_iterations): + # Update seeds for the current iteration + # Deepcopy the cfg to avoid the same cfg instance between iteration jobs. + iter_cfg = deepcopy(cfg) + iter_cfg.nomssip.temporal_mix.seed_generator = random.Random(i) + iter_cfg.topology.seed = random.Random(i) + iter_cfg.latency.seed = random.Random(i) + iter_cfg.sender_generator = random.Random(i) + # Submit the iteration to the executor + out_csv_path = f"{outdir}/iteration_{i}.csv" + err_path = f"{outdir}/iteration_{i}.err" + future = executor.submit(__run_iteration, iter_cfg, out_csv_path, err_path) + future_map[future] = IterationInfo( + paramset_id, paramset, i, out_csv_path, err_path + ) + + return future_map + + +def __run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> bool: + """ + Run a single iteration of a certain parameter set. + The iteration uses the independent uSim instance. + Returns False if exception happened. + """ + try: + sim = Simulation(cfg) + usim.run(sim.run(out_csv_path)) + return True + except BaseException as e: + with open(err_path, "w") as f: + traceback.print_exc(file=f) + return False def __format_elapsed_time(elapsed_time: float) -> str: @@ -276,3 +224,12 @@ def __format_elapsed_time(elapsed_time: float) -> str: hours, reminder = divmod(td.seconds, 3600) minutes, seconds = divmod(reminder, 60) return f"{td.days}d{hours:02}h{minutes:02}m{seconds:02}s" + + +@dataclass +class IterationInfo: + paramset_id: int + paramset: ParameterSet + iteration_idx: int + out_csv_path: str + err_path: str diff --git a/mixnet/queuesim/simulation.py b/mixnet/queuesim/simulation.py index a2cc3a6..74abf02 100644 --- a/mixnet/queuesim/simulation.py +++ b/mixnet/queuesim/simulation.py @@ -1,3 +1,4 @@ +import csv import struct from dataclasses import dataclass from typing import Counter, Self @@ -21,14 +22,14 @@ class Simulation: def __init__(self, config: Config): self.config = config - async def run(self): + async def run(self, out_csv_path: str): async with usim.Scope() as scope: self.framework = Framework(scope) self.message_builder = MessageBuilder(self.framework) - self.dissemination_times = await self.__run() + await self.__run(out_csv_path) self.framework.stop_tasks() - async def __run(self) -> list[float]: + async def __run(self, out_csv_path: str): self.received_msg_queue: Queue[tuple[float, bytes]] = self.framework.queue() # Run and connect nodes @@ -40,24 +41,27 @@ class Simulation: for sender in senders: self.framework.spawn(self.__run_sender(sender)) - # To count how many nodes have received each message - received_msg_counters: Counter[bytes] = Counter() - # To collect the dissemination times of each message. - dissemination_times: list[float] = [] - # Wait until all messages are disseminated to the entire network. - while ( - len(dissemination_times) - < self.config.num_sent_msgs * self.config.num_senders - ): - # Wait until a node notifies that it has received a new message. - received_time, msg = await self.received_msg_queue.get() - # If the message has been received by all nodes, calculate the dissemination time. - received_msg_counters.update([msg]) - if received_msg_counters[msg] == len(nodes): - dissemination_times.append( - received_time - Message.from_bytes(msg).sent_time - ) - return dissemination_times + # Open the output CSV file + with open(out_csv_path, "w", newline="") as f: + # Use CSV writer which is less error-prone than manually writing rows to the file + writer = csv.writer(f) + # To count how many nodes have received each message + received_msg_counters: Counter[bytes] = Counter() + # To count how many results (dissemination time) have been collected so far + result_cnt = 0 + # Wait until all messages are disseminated to the entire network. + while result_cnt < self.config.num_sent_msgs * self.config.num_senders: + # Wait until a node notifies that it has received a new message. + received_time, msg = await self.received_msg_queue.get() + # If the message has been received by all nodes, calculate the dissemination time. + received_msg_counters.update([msg]) + if received_msg_counters[msg] == len(nodes): + dissemination_time = ( + received_time - Message.from_bytes(msg).sent_time + ) + # Use repr to convert a float to a string with as much precision as Python can provide + writer.writerow([repr(dissemination_time)]) + result_cnt += 1 def __run_nodes(self) -> list[Node]: return [ diff --git a/mixnet/queuesim/statistics.py b/mixnet/queuesim/statistics.py new file mode 100644 index 0000000..3fef822 --- /dev/null +++ b/mixnet/queuesim/statistics.py @@ -0,0 +1,74 @@ +import glob +import os + +import pandas as pd + +from queuesim.queuesim import PARAMSET_INFO_COLUMNS + +RESULT_COLUMNS = PARAMSET_INFO_COLUMNS + [ + "dtime_count", + "dtime_mean", + "dtime_std", + "dtime_min", + "dtime_25%", + "dtime_50%", + "dtime_75%", + "dtime_max", +] + + +def calculate_session_stats(dir: str): + session_result_path = f"{dir}/session.csv" + assert not os.path.exists(session_result_path) + pd.DataFrame(columns=pd.Series(RESULT_COLUMNS)).to_csv( + session_result_path, index=False + ) + print(f"Initialized a CSV file: {session_result_path}") + + paramset_dirs = [ + path for path in glob.glob(f"{dir}/paramset_*") if os.path.isdir(path) + ] + for paramset_dir in paramset_dirs: + __calculate_paramset_stats(paramset_dir, session_result_path) + print(f"Appended a row to {session_result_path}") + + # Load the completed session CSV file, sort rows by paramset_id, + # and overrite the file with the sorted rows. + pd.read_csv(session_result_path).sort_values(by="paramset").to_csv( + session_result_path, index=False + ) + + +def __calculate_paramset_stats(paramset_dir: str, session_result_path: str): + info = pd.read_csv(f"{paramset_dir}/paramset.csv") + + series_list = [] + for iter_csv in glob.glob(f"{paramset_dir}/iteration_*.csv"): + df = pd.read_csv(iter_csv, header=None) + series_list.append(pd.Series(df.squeeze())) + + series = pd.concat(series_list, ignore_index=True) + stats = series.describe() + result = { + "paramset": info["paramset"].iloc[0], + "num_nodes": info["num_nodes"].iloc[0], + "peering_degree": info["peering_degree"].iloc[0], + "min_queue_size": info["min_queue_size"].iloc[0], + "transmission_rate": info["transmission_rate"].iloc[0], + "num_sent_msgs": info["num_sent_msgs"].iloc[0], + "num_senders": info["num_senders"].iloc[0], + "queue_type": info["queue_type"].iloc[0], + "num_iterations": info["num_iterations"].iloc[0], + "dtime_count": stats["count"], + "dtime_mean": stats["mean"], + "dtime_std": stats["std"], + "dtime_min": stats["min"], + "dtime_25%": stats["25%"], + "dtime_50%": stats["50%"], + "dtime_75%": stats["75%"], + "dtime_max": stats["max"], + } + assert result.keys() == set(RESULT_COLUMNS) + pd.DataFrame([result]).to_csv( + session_result_path, mode="a", header=False, index=False + )