measure iteration duration

This commit is contained in:
Youngjoon Lee 2024-08-08 14:41:58 +09:00
parent dc79540e15
commit 156bff6ddc
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
3 changed files with 52 additions and 32 deletions

View File

@ -2,10 +2,14 @@ from __future__ import annotations
import concurrent.futures
import tempfile
import time
import pandas as pd
from protocol.temporalmix import TemporalMixType
from queuesim.paramset import ParameterSet
from queuesim.queuesim import IterationInfo, _submit_iterations
from queuesim.util import format_elapsed_time
def benchmark(num_workers: int):
@ -21,7 +25,11 @@ def benchmark(num_workers: int):
)
with tempfile.TemporaryDirectory() as tmpdir:
future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict()
start_time = time.time()
future_map: dict[
concurrent.futures.Future[tuple[bool, float]], IterationInfo
] = dict()
with concurrent.futures.ProcessPoolExecutor(
max_workers=num_workers
) as executor:
@ -32,22 +40,29 @@ def benchmark(num_workers: int):
)
# Wait until all iterations are done
results: list[tuple[IterationInfo, bool]] = []
iter_durations: list[float] = []
for future in concurrent.futures.as_completed(future_map):
iter = future_map[future]
succeeded = future.result()
succeeded, duration = future.result()
if not succeeded:
print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
print("ITERATION FAILED: See the err file")
print(iter)
print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
else:
print("------------------------------------------------")
print("ITERATION SUCCEEDED")
print(iter)
print("------------------------------------------------")
results.append((iter, succeeded))
iter_durations.append(duration)
# If all iterations of the paramset are done, print a log
if len(results) == iter.paramset.num_iterations:
if len(iter_durations) == iter.paramset.num_iterations:
iter_durations_series = pd.Series(iter_durations)
print("================================================")
print("ALL ITERATIONS DONE")
print(f"NUM_WORKERS: {num_workers}")
print(f"PARAMSET: {paramset}")
print(
f"TOTAL DURATION: {format_elapsed_time(time.time() - start_time)}"
)
print("ITERATION DURATIONS:")
print(iter_durations_series.describe())
print("================================================")
break

View File

@ -5,10 +5,10 @@ import os
import random
import time
import traceback
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import datetime
from typing import Counter
import pandas as pd
import usim
@ -24,6 +24,7 @@ from queuesim.paramset import (
build_parameter_sets,
)
from queuesim.simulation import Simulation
from queuesim.util import format_elapsed_time
from sim.config import LatencyConfig, TopologyConfig
DEFAULT_CONFIG = Config(
@ -89,7 +90,9 @@ def run_session(
# Run the simulations for each parameter set, using multi processes
session_start_time = time.time()
future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict()
future_map: dict[concurrent.futures.Future[tuple[bool, float]], IterationInfo] = (
dict()
)
total_cores = os.cpu_count()
assert total_cores is not None
max_workers = max(1, total_cores - 1)
@ -107,22 +110,20 @@ def run_session(
)
# Wait until all parameter sets are done
paramset_results: dict[int, list[tuple[IterationInfo, bool]]] = defaultdict(
list
)
iterations_done: Counter[int] = Counter() # per paramset_id
paramsets_done: set[int] = set()
for future in concurrent.futures.as_completed(future_map):
iter = future_map[future]
succeeded = future.result()
succeeded, _ = future.result()
if not succeeded:
print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
print("ITERATION FAILED: See the err file")
print(iter)
print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
paramset_results[iter.paramset_id].append((iter, succeeded))
iterations_done.update([iter.paramset_id])
# If all iterations of the paramset are done, print a log
if len(paramset_results[iter.paramset_id]) == iter.paramset.num_iterations:
if iterations_done[iter.paramset_id] == iter.paramset.num_iterations:
paramsets_done.add(iter.paramset_id)
print("================================================")
print(
@ -131,7 +132,7 @@ def run_session(
print("================================================")
session_elapsed_time = time.time() - session_start_time
session_elapsed_time_str = __format_elapsed_time(session_elapsed_time)
session_elapsed_time_str = format_elapsed_time(session_elapsed_time)
# Rename the WIP directory to the final name
new_subdir = subdir.replace("__WIP__", "").replace(
@ -168,7 +169,7 @@ def _submit_iterations(
paramset: ParameterSet,
executor: concurrent.futures.ProcessPoolExecutor,
outdir: str,
) -> dict[concurrent.futures.Future[bool], IterationInfo]:
) -> dict[concurrent.futures.Future[tuple[bool, float]], IterationInfo]:
"""
Submit all iterations of the given parameter set to the executor,
so that they can be ran by the ProcessPoolExecutor.
@ -183,7 +184,9 @@ def _submit_iterations(
f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}"
)
future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict()
future_map: dict[concurrent.futures.Future[tuple[bool, float]], IterationInfo] = (
dict()
)
for i in range(paramset.num_iterations):
# Update seeds for the current iteration
# Deepcopy the cfg to avoid the same cfg instance between iteration jobs.
@ -203,27 +206,21 @@ def _submit_iterations(
return future_map
def _run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> bool:
def _run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> tuple[bool, float]:
"""
Run a single iteration of a certain parameter set.
The iteration uses the independent uSim instance.
Returns False if exception happened.
"""
start_time = time.time()
try:
sim = Simulation(cfg)
usim.run(sim.run(out_csv_path))
return True
return True, time.time() - start_time
except BaseException as e:
with open(err_path, "w") as f:
traceback.print_exc(file=f)
return False
def __format_elapsed_time(elapsed_time: float) -> str:
td = timedelta(seconds=elapsed_time)
hours, reminder = divmod(td.seconds, 3600)
minutes, seconds = divmod(reminder, 60)
return f"{td.days}d{hours:02}h{minutes:02}m{seconds:02}s"
return False, time.time() - start_time
@dataclass

8
mixnet/queuesim/util.py Normal file
View File

@ -0,0 +1,8 @@
from datetime import timedelta
def format_elapsed_time(elapsed_time: float) -> str:
td = timedelta(seconds=elapsed_time)
hours, reminder = divmod(td.seconds, 3600)
minutes, seconds = divmod(reminder, 60)
return f"{td.days}d{hours:02}h{minutes:02}m{seconds:02}s"