add benchmark for finding the optimal number of workers

This commit is contained in:
Youngjoon Lee 2024-08-08 13:54:31 +09:00
parent 8a16d63131
commit dc79540e15
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
3 changed files with 70 additions and 4 deletions

View File

@ -0,0 +1,13 @@
import argparse
from queuesim import benchmark
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Queuesim Benchmark",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--num-workers", type=int, required=True, help="num workers")
args = parser.parse_args()
benchmark.benchmark(args.num_workers)

View File

@ -0,0 +1,53 @@
from __future__ import annotations
import concurrent.futures
import tempfile
from protocol.temporalmix import TemporalMixType
from queuesim.paramset import ParameterSet
from queuesim.queuesim import IterationInfo, _submit_iterations
def benchmark(num_workers: int):
paramset = ParameterSet(
num_nodes=100,
peering_degree=4,
min_queue_size=10,
transmission_rate=10,
num_sent_msgs=100,
num_senders=10,
queue_type=TemporalMixType.NONE,
num_iterations=100,
)
with tempfile.TemporaryDirectory() as tmpdir:
future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict()
with concurrent.futures.ProcessPoolExecutor(
max_workers=num_workers
) as executor:
future_map.update(
_submit_iterations(
paramset_id=1, paramset=paramset, executor=executor, outdir=tmpdir
)
)
# Wait until all iterations are done
results: list[tuple[IterationInfo, bool]] = []
for future in concurrent.futures.as_completed(future_map):
iter = future_map[future]
succeeded = future.result()
if not succeeded:
print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
print("ITERATION FAILED: See the err file")
print(iter)
print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
else:
print("------------------------------------------------")
print("ITERATION SUCCEEDED")
print(iter)
print("------------------------------------------------")
results.append((iter, succeeded))
# If all iterations of the paramset are done, print a log
if len(results) == iter.paramset.num_iterations:
print("ALL ITERATIONS DONE")

View File

@ -103,7 +103,7 @@ def run_session(
os.makedirs(paramset_dir)
__save_paramset_info(paramset_id, paramset, f"{paramset_dir}/paramset.csv")
future_map.update(
__submit_iterations(paramset_id, paramset, executor, paramset_dir)
_submit_iterations(paramset_id, paramset, executor, paramset_dir)
)
# Wait until all parameter sets are done
@ -163,7 +163,7 @@ def __save_paramset_info(paramset_id: int, paramset: ParameterSet, path: str):
pd.DataFrame([info]).to_csv(path, mode="w", header=True, index=False)
def __submit_iterations(
def _submit_iterations(
paramset_id: int,
paramset: ParameterSet,
executor: concurrent.futures.ProcessPoolExecutor,
@ -195,7 +195,7 @@ def __submit_iterations(
# Submit the iteration to the executor
out_csv_path = f"{outdir}/iteration_{i}.csv"
err_path = f"{outdir}/iteration_{i}.err"
future = executor.submit(__run_iteration, iter_cfg, out_csv_path, err_path)
future = executor.submit(_run_iteration, iter_cfg, out_csv_path, err_path)
future_map[future] = IterationInfo(
paramset_id, paramset, i, out_csv_path, err_path
)
@ -203,7 +203,7 @@ def __submit_iterations(
return future_map
def __run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> bool:
def _run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> bool:
"""
Run a single iteration of a certain parameter set.
The iteration uses the independent uSim instance.