diff --git a/mixnet/cmd/queuesim_benchmark.py b/mixnet/cmd/queuesim_benchmark.py new file mode 100644 index 0000000..51b204e --- /dev/null +++ b/mixnet/cmd/queuesim_benchmark.py @@ -0,0 +1,13 @@ +import argparse + +from queuesim import benchmark + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Queuesim Benchmark", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("--num-workers", type=int, required=True, help="num workers") + args = parser.parse_args() + + benchmark.benchmark(args.num_workers) diff --git a/mixnet/queuesim/benchmark.py b/mixnet/queuesim/benchmark.py new file mode 100644 index 0000000..f903fb8 --- /dev/null +++ b/mixnet/queuesim/benchmark.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import concurrent.futures +import tempfile + +from protocol.temporalmix import TemporalMixType +from queuesim.paramset import ParameterSet +from queuesim.queuesim import IterationInfo, _submit_iterations + + +def benchmark(num_workers: int): + paramset = ParameterSet( + num_nodes=100, + peering_degree=4, + min_queue_size=10, + transmission_rate=10, + num_sent_msgs=100, + num_senders=10, + queue_type=TemporalMixType.NONE, + num_iterations=100, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + future_map: dict[concurrent.futures.Future[bool], IterationInfo] = dict() + with concurrent.futures.ProcessPoolExecutor( + max_workers=num_workers + ) as executor: + future_map.update( + _submit_iterations( + paramset_id=1, paramset=paramset, executor=executor, outdir=tmpdir + ) + ) + + # Wait until all iterations are done + results: list[tuple[IterationInfo, bool]] = [] + for future in concurrent.futures.as_completed(future_map): + iter = future_map[future] + succeeded = future.result() + if not succeeded: + print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") + print("ITERATION FAILED: See the err file") + print(iter) + print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") + else: + print("------------------------------------------------") + print("ITERATION SUCCEEDED") + print(iter) + print("------------------------------------------------") + + results.append((iter, succeeded)) + # If all iterations of the paramset are done, print a log + if len(results) == iter.paramset.num_iterations: + print("ALL ITERATIONS DONE") diff --git a/mixnet/queuesim/queuesim.py b/mixnet/queuesim/queuesim.py index d440f01..abf9bcc 100644 --- a/mixnet/queuesim/queuesim.py +++ b/mixnet/queuesim/queuesim.py @@ -103,7 +103,7 @@ def run_session( os.makedirs(paramset_dir) __save_paramset_info(paramset_id, paramset, f"{paramset_dir}/paramset.csv") future_map.update( - __submit_iterations(paramset_id, paramset, executor, paramset_dir) + _submit_iterations(paramset_id, paramset, executor, paramset_dir) ) # Wait until all parameter sets are done @@ -163,7 +163,7 @@ def __save_paramset_info(paramset_id: int, paramset: ParameterSet, path: str): pd.DataFrame([info]).to_csv(path, mode="w", header=True, index=False) -def __submit_iterations( +def _submit_iterations( paramset_id: int, paramset: ParameterSet, executor: concurrent.futures.ProcessPoolExecutor, @@ -195,7 +195,7 @@ def __submit_iterations( # Submit the iteration to the executor out_csv_path = f"{outdir}/iteration_{i}.csv" err_path = f"{outdir}/iteration_{i}.err" - future = executor.submit(__run_iteration, iter_cfg, out_csv_path, err_path) + future = executor.submit(_run_iteration, iter_cfg, out_csv_path, err_path) future_map[future] = IterationInfo( paramset_id, paramset, i, out_csv_path, err_path ) @@ -203,7 +203,7 @@ def __submit_iterations( return future_map -def __run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> bool: +def _run_iteration(cfg: Config, out_csv_path: str, err_path: str) -> bool: """ Run a single iteration of a certain parameter set. The iteration uses the independent uSim instance.