From 156bff6ddc4b3be40a6e79fb02932209f1b5b7b2 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:41:58 +0900 Subject: [PATCH] measure iteration duration --- mixnet/queuesim/benchmark.py | 35 +++++++++++++++++++++--------- mixnet/queuesim/queuesim.py | 41 +++++++++++++++++------------------- mixnet/queuesim/util.py | 8 +++++++ 3 files changed, 52 insertions(+), 32 deletions(-) create mode 100644 mixnet/queuesim/util.py diff --git a/mixnet/queuesim/benchmark.py b/mixnet/queuesim/benchmark.py index f903fb8..603a9f5 100644 --- a/mixnet/queuesim/benchmark.py +++ b/mixnet/queuesim/benchmark.py @@ -2,10 +2,14 @@ from __future__ import annotations import concurrent.futures import tempfile +import time + +import pandas as pd from protocol.temporalmix import TemporalMixType from queuesim.paramset import ParameterSet from queuesim.queuesim import IterationInfo, _submit_iterations +from queuesim.util import format_elapsed_time def benchmark(num_workers: int): @@ -21,7 +25,11 @@ def benchmark(num_workers: int): ) with tempfile.TemporaryDirectory() as tmpdir: - future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict() + start_time = time.time() + + future_map: dict[ + concurrent.futures.Future[tuple[bool, float]], IterationInfo + ] = dict() with concurrent.futures.ProcessPoolExecutor( max_workers=num_workers ) as executor: @@ -32,22 +40,29 @@ def benchmark(num_workers: int): ) # Wait until all iterations are done - results: list[tuple[IterationInfo, bool]] = [] + iter_durations: list[float] = [] for future in concurrent.futures.as_completed(future_map): iter = future_map[future] - succeeded = future.result() + succeeded, duration = future.result() if not succeeded: print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") print("ITERATION FAILED: See the err file") print(iter) print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") - else: - print("------------------------------------------------") - print("ITERATION SUCCEEDED") - print(iter) - print("------------------------------------------------") - results.append((iter, succeeded)) + iter_durations.append(duration) # If all iterations of the paramset are done, print a log - if len(results) == iter.paramset.num_iterations: + if len(iter_durations) == iter.paramset.num_iterations: + iter_durations_series = pd.Series(iter_durations) + print("================================================") print("ALL ITERATIONS DONE") + print(f"NUM_WORKERS: {num_workers}") + print(f"PARAMSET: {paramset}") + print( + f"TOTAL DURATION: {format_elapsed_time(time.time() - start_time)}" + ) + print("ITERATION DURATIONS:") + print(iter_durations_series.describe()) + print("================================================") + + break diff --git a/mixnet/queuesim/queuesim.py b/mixnet/queuesim/queuesim.py index abf9bcc..437b9c6 100644 --- a/mixnet/queuesim/queuesim.py +++ b/mixnet/queuesim/queuesim.py @@ -5,10 +5,10 @@ import os import random import time import traceback -from collections import defaultdict from copy import deepcopy from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime +from typing import Counter import pandas as pd import usim @@ -24,6 +24,7 @@ from queuesim.paramset import ( build_parameter_sets, ) from queuesim.simulation import Simulation +from queuesim.util import format_elapsed_time from sim.config import LatencyConfig, TopologyConfig DEFAULT_CONFIG = Config( @@ -89,7 +90,9 @@ def run_session( # Run the simulations for each parameter set, using multi processes session_start_time = time.time() - future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict() + future_map: dict[concurrent.futures.Future[tuple[bool, float]], IterationInfo] = ( + dict() + ) total_cores = os.cpu_count() assert total_cores is not None max_workers = max(1, total_cores - 1) @@ -107,22 +110,20 @@ def run_session( ) # Wait until all parameter sets are done - paramset_results: dict[int, list[tuple[IterationInfo, bool]]] = defaultdict( - list - ) + iterations_done: Counter[int] = Counter() # per paramset_id paramsets_done: set[int] = set() for future in concurrent.futures.as_completed(future_map): iter = future_map[future] - succeeded = future.result() + succeeded, _ = future.result() if not succeeded: print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") print("ITERATION FAILED: See the err file") print(iter) print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") - paramset_results[iter.paramset_id].append((iter, succeeded)) + iterations_done.update([iter.paramset_id]) # If all iterations of the paramset are done, print a log - if len(paramset_results[iter.paramset_id]) == iter.paramset.num_iterations: + if iterations_done[iter.paramset_id] == iter.paramset.num_iterations: paramsets_done.add(iter.paramset_id) print("================================================") print( @@ -131,7 +132,7 @@ def run_session( print("================================================") session_elapsed_time = time.time() - session_start_time - session_elapsed_time_str = __format_elapsed_time(session_elapsed_time) + session_elapsed_time_str = format_elapsed_time(session_elapsed_time) # Rename the WIP directory to the final name new_subdir = subdir.replace("__WIP__", "").replace( @@ -168,7 +169,7 @@ def _submit_iterations( paramset: ParameterSet, executor: concurrent.futures.ProcessPoolExecutor, outdir: str, -) -> dict[concurrent.futures.Future[bool], IterationInfo]: +) -> dict[concurrent.futures.Future[tuple[bool, float]], IterationInfo]: """ Submit all iterations of the given parameter set to the executor, so that they can be ran by the ProcessPoolExecutor. @@ -183,7 +184,9 @@ def _submit_iterations( f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}" ) - future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict() + future_map: dict[concurrent.futures.Future[tuple[bool, float]], IterationInfo] = ( + dict() + ) for i in range(paramset.num_iterations): # Update seeds for the current iteration # Deepcopy the cfg to avoid the same cfg instance between iteration jobs. @@ -203,27 +206,21 @@ def _submit_iterations( return future_map -def _run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> bool: +def _run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> tuple[bool, float]: """ Run a single iteration of a certain parameter set. The iteration uses the independent uSim instance. Returns False if exception happened. """ + start_time = time.time() try: sim = Simulation(cfg) usim.run(sim.run(out_csv_path)) - return True + return True, time.time() - start_time except BaseException as e: with open(err_path, "w") as f: traceback.print_exc(file=f) - return False - - -def __format_elapsed_time(elapsed_time: float) -> str: - td = timedelta(seconds=elapsed_time) - hours, reminder = divmod(td.seconds, 3600) - minutes, seconds = divmod(reminder, 60) - return f"{td.days}d{hours:02}h{minutes:02}m{seconds:02}s" + return False, time.time() - start_time @dataclass diff --git a/mixnet/queuesim/util.py b/mixnet/queuesim/util.py new file mode 100644 index 0000000..61c2266 --- /dev/null +++ b/mixnet/queuesim/util.py @@ -0,0 +1,8 @@ +from datetime import timedelta + + +def format_elapsed_time(elapsed_time: float) -> str: + td = timedelta(seconds=elapsed_time) + hours, reminder = divmod(td.seconds, 3600) + minutes, seconds = divmod(reminder, 60) + return f"{td.days}d{hours:02}h{minutes:02}m{seconds:02}s"