diff --git a/mixnet/queuesim/benchmark.py b/mixnet/queuesim/benchmark.py index 603a9f5..1516226 100644 --- a/mixnet/queuesim/benchmark.py +++ b/mixnet/queuesim/benchmark.py @@ -14,6 +14,7 @@ from queuesim.util import format_elapsed_time def benchmark(num_workers: int): paramset = ParameterSet( + id=1, num_nodes=100, peering_degree=4, min_queue_size=10, @@ -34,9 +35,7 @@ def benchmark(num_workers: int): max_workers=num_workers ) as executor: future_map.update( - _submit_iterations( - paramset_id=1, paramset=paramset, executor=executor, outdir=tmpdir - ) + _submit_iterations(paramset=paramset, executor=executor, outdir=tmpdir) ) # Wait until all iterations are done diff --git a/mixnet/queuesim/paramset.py b/mixnet/queuesim/paramset.py index 00a9925..7357a9f 100644 --- a/mixnet/queuesim/paramset.py +++ b/mixnet/queuesim/paramset.py @@ -31,6 +31,7 @@ EXPERIMENT_TITLES: dict[ExperimentID, str] = { @dataclass class ParameterSet: + id: int num_nodes: int peering_degree: int min_queue_size: int @@ -69,6 +70,7 @@ def __build_session_1_parameter_sets( ) -> list[ParameterSet]: sets: list[ParameterSet] = [] + id = 0 # must start from 1 for num_nodes in [20, 40, 80]: peering_degree_list = [num_nodes // 5, num_nodes // 4, num_nodes // 2] min_queue_size_list = [num_nodes // 2, num_nodes, num_nodes * 2] @@ -88,8 +90,16 @@ def __build_session_1_parameter_sets( min_queue_size_list, transmission_rate_list, ): + id += 1 + if ( + not _is_min_queue_size_applicable(queue_type) + and min_queue_size != min_queue_size_list[0] + ): + continue + sets.append( ParameterSet( + id=id, num_nodes=num_nodes, peering_degree=peering_degree, min_queue_size=min_queue_size, @@ -112,8 +122,16 @@ def __build_session_1_parameter_sets( transmission_rate_list, num_sent_msgs_list, ): + id += 1 + if ( + not _is_min_queue_size_applicable(queue_type) + and min_queue_size != min_queue_size_list[0] + ): + continue + sets.append( ParameterSet( + id=id, num_nodes=num_nodes, peering_degree=peering_degree, min_queue_size=min_queue_size, @@ -136,8 +154,16 @@ def __build_session_1_parameter_sets( transmission_rate_list, num_senders_list, ): + id += 1 + if ( + not _is_min_queue_size_applicable(queue_type) + and min_queue_size != min_queue_size_list[0] + ): + continue + sets.append( ParameterSet( + id=id, num_nodes=num_nodes, peering_degree=peering_degree, min_queue_size=min_queue_size, @@ -162,8 +188,16 @@ def __build_session_1_parameter_sets( num_sent_msgs_list, num_senders_list, ): + id += 1 + if ( + not _is_min_queue_size_applicable(queue_type) + and min_queue_size != min_queue_size_list[0] + ): + continue + sets.append( ParameterSet( + id=id, num_nodes=num_nodes, peering_degree=peering_degree, min_queue_size=min_queue_size, @@ -185,6 +219,7 @@ def __build_session_2_parameter_sets( ) -> list[ParameterSet]: sets: list[ParameterSet] = [] + id = 0 # must start from 1 for num_nodes in [100, 1000, 10000]: peering_degree_list = [4, 8, 16] min_queue_size_list = [10, 50, 100] @@ -203,8 +238,16 @@ def __build_session_2_parameter_sets( min_queue_size_list, transmission_rate_list, ): + id += 1 + if ( + not _is_min_queue_size_applicable(queue_type) + and min_queue_size != min_queue_size_list[0] + ): + continue + sets.append( ParameterSet( + id=id, num_nodes=num_nodes, peering_degree=peering_degree, min_queue_size=min_queue_size, @@ -232,8 +275,16 @@ def __build_session_2_parameter_sets( min_queue_size, min_queue_size * 2, ]: + id += 1 + if ( + not _is_min_queue_size_applicable(queue_type) + and min_queue_size != min_queue_size_list[0] + ): + continue + sets.append( ParameterSet( + id=id, num_nodes=num_nodes, peering_degree=peering_degree, min_queue_size=min_queue_size, @@ -257,10 +308,11 @@ def __build_session_2_1_parameter_sets( ) -> list[ParameterSet]: sets: list[ParameterSet] = [] + id = 0 # must start from 1 for num_nodes in [20, 200, 2000]: peering_degree_list = [4, 6, 8] min_queue_size_list = [10, 50, 100] - transmission_rate_list = [1, 10, 100] + transmission_rate = 1 num_sent_msgs = 1000 num_senders_list = [num_nodes // 10, num_nodes // 5, num_nodes // 2] num_iterations = 20 @@ -270,14 +322,20 @@ def __build_session_2_1_parameter_sets( for ( peering_degree, min_queue_size, - transmission_rate, ) in itertools.product( peering_degree_list, min_queue_size_list, - transmission_rate_list, ): + id += 1 + if ( + not _is_min_queue_size_applicable(queue_type) + and min_queue_size != min_queue_size_list[0] + ): + continue + sets.append( ParameterSet( + id=id, num_nodes=num_nodes, peering_degree=peering_degree, min_queue_size=min_queue_size, @@ -292,16 +350,22 @@ def __build_session_2_1_parameter_sets( for ( peering_degree, min_queue_size, - transmission_rate, num_senders, ) in itertools.product( peering_degree_list, min_queue_size_list, - transmission_rate_list, num_senders_list, ): + id += 1 + if ( + not _is_min_queue_size_applicable(queue_type) + and min_queue_size != min_queue_size_list[0] + ): + continue + sets.append( ParameterSet( + id=id, num_nodes=num_nodes, peering_degree=peering_degree, min_queue_size=min_queue_size, @@ -318,3 +382,11 @@ def __build_session_2_1_parameter_sets( ) return sets + + +def _is_min_queue_size_applicable(queue_type: TemporalMixType) -> bool: + return queue_type in [ + TemporalMixType.PURE_COIN_FLIPPING, + TemporalMixType.PURE_RANDOM_SAMPLING, + TemporalMixType.PERMUTED_COIN_FLIPPING, + ] diff --git a/mixnet/queuesim/queuesim.py b/mixnet/queuesim/queuesim.py index 945ab28..be01aa3 100644 --- a/mixnet/queuesim/queuesim.py +++ b/mixnet/queuesim/queuesim.py @@ -87,7 +87,6 @@ def run_session( # Prepare all parameter sets of the session paramsets = build_parameter_sets(exp_id, session_id, queue_type) - assert 1 <= from_paramset <= len(paramsets) # Run the simulations for each parameter set, using multi processes session_start_time = time.time() @@ -96,16 +95,13 @@ def run_session( ) with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: # Submit all iterations of all parameter sets to the ProcessPoolExecutor - for paramset_idx, paramset in enumerate(paramsets): - paramset_id = paramset_idx + 1 - if paramset_id < from_paramset: + for paramset in paramsets: + if paramset.id < from_paramset: continue - paramset_dir = f"{outdir}/{subdir}/__WIP__paramset_{paramset_id}" + paramset_dir = f"{outdir}/{subdir}/__WIP__paramset_{paramset.id}" os.makedirs(paramset_dir) - __save_paramset_info(paramset_id, paramset, f"{paramset_dir}/paramset.csv") - future_map.update( - _submit_iterations(paramset_id, paramset, executor, paramset_dir) - ) + __save_paramset_info(paramset, f"{paramset_dir}/paramset.csv") + future_map.update(_submit_iterations(paramset, executor, paramset_dir)) # Wait until all parameter sets are done iterations_done: Counter[int] = Counter() # per paramset_id @@ -126,18 +122,18 @@ def run_session( assert not os.path.exists(new_iteration_csv_path) os.rename(iter.out_csv_path, new_iteration_csv_path) - iterations_done.update([iter.paramset_id]) + iterations_done.update([iter.paramset.id]) # If all iterations of the paramset are done, print a log - if iterations_done[iter.paramset_id] == iter.paramset.num_iterations: - paramsets_done.add(iter.paramset_id) - paramset_dir = f"{outdir}/{subdir}/__WIP__paramset_{iter.paramset_id}" - new_paramset_dir = f"{outdir}/{subdir}/paramset_{iter.paramset_id}" + if iterations_done[iter.paramset.id] == iter.paramset.num_iterations: + paramsets_done.add(iter.paramset.id) + paramset_dir = f"{outdir}/{subdir}/__WIP__paramset_{iter.paramset.id}" + new_paramset_dir = f"{outdir}/{subdir}/paramset_{iter.paramset.id}" assert not os.path.exists(new_paramset_dir) os.rename(paramset_dir, new_paramset_dir) print("================================================") print( - f"ParamSet-{iter.paramset_id} is done. Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far." + f"ParamSet-{iter.paramset.id} is done. Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far." ) print(f"Renamed the WIP directory to {new_paramset_dir}") print("================================================") @@ -158,10 +154,10 @@ def run_session( print("******************************************************************") -def __save_paramset_info(paramset_id: int, paramset: ParameterSet, path: str): +def __save_paramset_info(paramset: ParameterSet, path: str): assert not os.path.exists(path) info = { - "paramset": paramset_id, + "paramset": paramset.id, "num_nodes": paramset.num_nodes, "peering_degree": paramset.peering_degree, "min_queue_size": paramset.min_queue_size, @@ -176,7 +172,6 @@ def __save_paramset_info(paramset_id: int, paramset: ParameterSet, path: str): def _submit_iterations( - paramset_id: int, paramset: ParameterSet, executor: concurrent.futures.ProcessPoolExecutor, outdir: str, @@ -192,7 +187,7 @@ def _submit_iterations( paramset.apply_to(cfg) print( - f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}" + f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset.id}" ) future_map: dict[concurrent.futures.Future[tuple[bool, float]], IterationInfo] = ( @@ -214,7 +209,7 @@ def _submit_iterations( _run_iteration, iter_cfg, out_csv_path, err_path, topology_path ) future_map[future] = IterationInfo( - paramset_id, paramset, i, out_csv_path, err_path, topology_path + paramset, i, out_csv_path, err_path, topology_path ) return future_map @@ -241,7 +236,6 @@ def _run_iteration( @dataclass class IterationInfo: - paramset_id: int paramset: ParameterSet iteration_idx: int out_csv_path: str diff --git a/mixnet/queuesim/test_paramset.py b/mixnet/queuesim/test_paramset.py index a655525..57cbe88 100644 --- a/mixnet/queuesim/test_paramset.py +++ b/mixnet/queuesim/test_paramset.py @@ -9,6 +9,7 @@ from queuesim.paramset import ( ExperimentID, ParameterSet, SessionID, + _is_min_queue_size_applicable, build_parameter_sets, ) from sim.config import LatencyConfig, TopologyConfig @@ -17,6 +18,7 @@ from sim.config import LatencyConfig, TopologyConfig class TestParameterSet(TestCase): def test_apply_to_config(self): paramset = ParameterSet( + id=1, num_nodes=10000, peering_degree=20000, min_queue_size=30000, @@ -48,13 +50,19 @@ class TestParameterSet(TestCase): (ExperimentID.EXPERIMENT_4, SessionID.SESSION_1): pow(3, 6), (ExperimentID.EXPERIMENT_1, SessionID.SESSION_2): pow(3, 4), (ExperimentID.EXPERIMENT_4, SessionID.SESSION_2): pow(3, 6), - (ExperimentID.EXPERIMENT_1, SessionID.SESSION_2_1): pow(3, 4), - (ExperimentID.EXPERIMENT_4, SessionID.SESSION_2_1): pow(3, 5), + (ExperimentID.EXPERIMENT_1, SessionID.SESSION_2_1): pow(3, 3), + (ExperimentID.EXPERIMENT_4, SessionID.SESSION_2_1): pow(3, 4), } + for queue_type in TemporalMixType: for (exp_id, session_id), expected_cnt in cases.items(): sets = build_parameter_sets(exp_id, session_id, queue_type) + + # Check if the number of parameter sets is correct + if not _is_min_queue_size_applicable(queue_type): + expected_cnt //= 3 self.assertEqual(expected_cnt, len(sets), f"{exp_id}: {session_id}") + # Check if all parameter sets are unique self.assertEqual( len(sets), @@ -62,6 +70,45 @@ class TestParameterSet(TestCase): f"{exp_id}: {session_id}", ) + # Check if paramset IDs are correct. + # ID starts from 1. + if _is_min_queue_size_applicable(queue_type): + for i, paramset in enumerate(sets): + self.assertEqual(i + 1, paramset.id, f"{exp_id}: {session_id}") + + def test_parameter_set_id_consistency(self): + cases = [ + (ExperimentID.EXPERIMENT_1, SessionID.SESSION_1), + (ExperimentID.EXPERIMENT_2, SessionID.SESSION_1), + (ExperimentID.EXPERIMENT_3, SessionID.SESSION_1), + (ExperimentID.EXPERIMENT_4, SessionID.SESSION_1), + (ExperimentID.EXPERIMENT_1, SessionID.SESSION_2), + (ExperimentID.EXPERIMENT_4, SessionID.SESSION_2), + (ExperimentID.EXPERIMENT_1, SessionID.SESSION_2_1), + (ExperimentID.EXPERIMENT_4, SessionID.SESSION_2_1), + ] + + for exp_id, session_id in cases: + sets_with_min_queue_size = build_parameter_sets( + exp_id, session_id, TemporalMixType.PURE_COIN_FLIPPING + ) + sets_without_min_queue_size = build_parameter_sets( + exp_id, session_id, TemporalMixType.NONE + ) + + for i, paramset in enumerate(sets_with_min_queue_size): + self.assertEqual(i + 1, paramset.id, f"{exp_id}: {session_id}") + + for set in sets_without_min_queue_size: + # To compare ParameterSet instances, use the same queue type. + modified_set = deepcopy(set) + modified_set.queue_type = TemporalMixType.PURE_COIN_FLIPPING + self.assertEqual( + sets_with_min_queue_size[set.id - 1], + modified_set, + f"{exp_id}: {session_id}", + ) + SAMPLE_CONFIG = Config( num_nodes=10,