use only transmission_rate=1 in session2.1 and ignore min_queue_size for some queue types when building paramsets

This commit is contained in:
Youngjoon Lee 2024-08-14 11:40:17 +09:00
parent 4bd1f5f159
commit 618705e252
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
4 changed files with 143 additions and 31 deletions

View File

@ -14,6 +14,7 @@ from queuesim.util import format_elapsed_time
def benchmark(num_workers: int):
paramset = ParameterSet(
id=1,
num_nodes=100,
peering_degree=4,
min_queue_size=10,
@ -34,9 +35,7 @@ def benchmark(num_workers: int):
max_workers=num_workers
) as executor:
future_map.update(
_submit_iterations(
paramset_id=1, paramset=paramset, executor=executor, outdir=tmpdir
)
_submit_iterations(paramset=paramset, executor=executor, outdir=tmpdir)
)
# Wait until all iterations are done

View File

@ -31,6 +31,7 @@ EXPERIMENT_TITLES: dict[ExperimentID, str] = {
@dataclass
class ParameterSet:
id: int
num_nodes: int
peering_degree: int
min_queue_size: int
@ -69,6 +70,7 @@ def __build_session_1_parameter_sets(
) -> list[ParameterSet]:
sets: list[ParameterSet] = []
id = 0 # must start from 1
for num_nodes in [20, 40, 80]:
peering_degree_list = [num_nodes // 5, num_nodes // 4, num_nodes // 2]
min_queue_size_list = [num_nodes // 2, num_nodes, num_nodes * 2]
@ -88,8 +90,16 @@ def __build_session_1_parameter_sets(
min_queue_size_list,
transmission_rate_list,
):
id += 1
if (
not _is_min_queue_size_applicable(queue_type)
and min_queue_size != min_queue_size_list[0]
):
continue
sets.append(
ParameterSet(
id=id,
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
@ -112,8 +122,16 @@ def __build_session_1_parameter_sets(
transmission_rate_list,
num_sent_msgs_list,
):
id += 1
if (
not _is_min_queue_size_applicable(queue_type)
and min_queue_size != min_queue_size_list[0]
):
continue
sets.append(
ParameterSet(
id=id,
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
@ -136,8 +154,16 @@ def __build_session_1_parameter_sets(
transmission_rate_list,
num_senders_list,
):
id += 1
if (
not _is_min_queue_size_applicable(queue_type)
and min_queue_size != min_queue_size_list[0]
):
continue
sets.append(
ParameterSet(
id=id,
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
@ -162,8 +188,16 @@ def __build_session_1_parameter_sets(
num_sent_msgs_list,
num_senders_list,
):
id += 1
if (
not _is_min_queue_size_applicable(queue_type)
and min_queue_size != min_queue_size_list[0]
):
continue
sets.append(
ParameterSet(
id=id,
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
@ -185,6 +219,7 @@ def __build_session_2_parameter_sets(
) -> list[ParameterSet]:
sets: list[ParameterSet] = []
id = 0 # must start from 1
for num_nodes in [100, 1000, 10000]:
peering_degree_list = [4, 8, 16]
min_queue_size_list = [10, 50, 100]
@ -203,8 +238,16 @@ def __build_session_2_parameter_sets(
min_queue_size_list,
transmission_rate_list,
):
id += 1
if (
not _is_min_queue_size_applicable(queue_type)
and min_queue_size != min_queue_size_list[0]
):
continue
sets.append(
ParameterSet(
id=id,
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
@ -232,8 +275,16 @@ def __build_session_2_parameter_sets(
min_queue_size,
min_queue_size * 2,
]:
id += 1
if (
not _is_min_queue_size_applicable(queue_type)
and min_queue_size != min_queue_size_list[0]
):
continue
sets.append(
ParameterSet(
id=id,
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
@ -257,10 +308,11 @@ def __build_session_2_1_parameter_sets(
) -> list[ParameterSet]:
sets: list[ParameterSet] = []
id = 0 # must start from 1
for num_nodes in [20, 200, 2000]:
peering_degree_list = [4, 6, 8]
min_queue_size_list = [10, 50, 100]
transmission_rate_list = [1, 10, 100]
transmission_rate = 1
num_sent_msgs = 1000
num_senders_list = [num_nodes // 10, num_nodes // 5, num_nodes // 2]
num_iterations = 20
@ -270,14 +322,20 @@ def __build_session_2_1_parameter_sets(
for (
peering_degree,
min_queue_size,
transmission_rate,
) in itertools.product(
peering_degree_list,
min_queue_size_list,
transmission_rate_list,
):
id += 1
if (
not _is_min_queue_size_applicable(queue_type)
and min_queue_size != min_queue_size_list[0]
):
continue
sets.append(
ParameterSet(
id=id,
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
@ -292,16 +350,22 @@ def __build_session_2_1_parameter_sets(
for (
peering_degree,
min_queue_size,
transmission_rate,
num_senders,
) in itertools.product(
peering_degree_list,
min_queue_size_list,
transmission_rate_list,
num_senders_list,
):
id += 1
if (
not _is_min_queue_size_applicable(queue_type)
and min_queue_size != min_queue_size_list[0]
):
continue
sets.append(
ParameterSet(
id=id,
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
@ -318,3 +382,11 @@ def __build_session_2_1_parameter_sets(
)
return sets
def _is_min_queue_size_applicable(queue_type: TemporalMixType) -> bool:
return queue_type in [
TemporalMixType.PURE_COIN_FLIPPING,
TemporalMixType.PURE_RANDOM_SAMPLING,
TemporalMixType.PERMUTED_COIN_FLIPPING,
]

View File

@ -87,7 +87,6 @@ def run_session(
# Prepare all parameter sets of the session
paramsets = build_parameter_sets(exp_id, session_id, queue_type)
assert 1 <= from_paramset <= len(paramsets)
# Run the simulations for each parameter set, using multi processes
session_start_time = time.time()
@ -96,16 +95,13 @@ def run_session(
)
with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
# Submit all iterations of all parameter sets to the ProcessPoolExecutor
for paramset_idx, paramset in enumerate(paramsets):
paramset_id = paramset_idx + 1
if paramset_id < from_paramset:
for paramset in paramsets:
if paramset.id < from_paramset:
continue
paramset_dir = f"{outdir}/{subdir}/__WIP__paramset_{paramset_id}"
paramset_dir = f"{outdir}/{subdir}/__WIP__paramset_{paramset.id}"
os.makedirs(paramset_dir)
__save_paramset_info(paramset_id, paramset, f"{paramset_dir}/paramset.csv")
future_map.update(
_submit_iterations(paramset_id, paramset, executor, paramset_dir)
)
__save_paramset_info(paramset, f"{paramset_dir}/paramset.csv")
future_map.update(_submit_iterations(paramset, executor, paramset_dir))
# Wait until all parameter sets are done
iterations_done: Counter[int] = Counter() # per paramset_id
@ -126,18 +122,18 @@ def run_session(
assert not os.path.exists(new_iteration_csv_path)
os.rename(iter.out_csv_path, new_iteration_csv_path)
iterations_done.update([iter.paramset_id])
iterations_done.update([iter.paramset.id])
# If all iterations of the paramset are done, print a log
if iterations_done[iter.paramset_id] == iter.paramset.num_iterations:
paramsets_done.add(iter.paramset_id)
paramset_dir = f"{outdir}/{subdir}/__WIP__paramset_{iter.paramset_id}"
new_paramset_dir = f"{outdir}/{subdir}/paramset_{iter.paramset_id}"
if iterations_done[iter.paramset.id] == iter.paramset.num_iterations:
paramsets_done.add(iter.paramset.id)
paramset_dir = f"{outdir}/{subdir}/__WIP__paramset_{iter.paramset.id}"
new_paramset_dir = f"{outdir}/{subdir}/paramset_{iter.paramset.id}"
assert not os.path.exists(new_paramset_dir)
os.rename(paramset_dir, new_paramset_dir)
print("================================================")
print(
f"ParamSet-{iter.paramset_id} is done. Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far."
f"ParamSet-{iter.paramset.id} is done. Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far."
)
print(f"Renamed the WIP directory to {new_paramset_dir}")
print("================================================")
@ -158,10 +154,10 @@ def run_session(
print("******************************************************************")
def __save_paramset_info(paramset_id: int, paramset: ParameterSet, path: str):
def __save_paramset_info(paramset: ParameterSet, path: str):
assert not os.path.exists(path)
info = {
"paramset": paramset_id,
"paramset": paramset.id,
"num_nodes": paramset.num_nodes,
"peering_degree": paramset.peering_degree,
"min_queue_size": paramset.min_queue_size,
@ -176,7 +172,6 @@ def __save_paramset_info(paramset_id: int, paramset: ParameterSet, path: str):
def _submit_iterations(
paramset_id: int,
paramset: ParameterSet,
executor: concurrent.futures.ProcessPoolExecutor,
outdir: str,
@ -192,7 +187,7 @@ def _submit_iterations(
paramset.apply_to(cfg)
print(
f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}"
f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset.id}"
)
future_map: dict[concurrent.futures.Future[tuple[bool, float]], IterationInfo] = (
@ -214,7 +209,7 @@ def _submit_iterations(
_run_iteration, iter_cfg, out_csv_path, err_path, topology_path
)
future_map[future] = IterationInfo(
paramset_id, paramset, i, out_csv_path, err_path, topology_path
paramset, i, out_csv_path, err_path, topology_path
)
return future_map
@ -241,7 +236,6 @@ def _run_iteration(
@dataclass
class IterationInfo:
paramset_id: int
paramset: ParameterSet
iteration_idx: int
out_csv_path: str

View File

@ -9,6 +9,7 @@ from queuesim.paramset import (
ExperimentID,
ParameterSet,
SessionID,
_is_min_queue_size_applicable,
build_parameter_sets,
)
from sim.config import LatencyConfig, TopologyConfig
@ -17,6 +18,7 @@ from sim.config import LatencyConfig, TopologyConfig
class TestParameterSet(TestCase):
def test_apply_to_config(self):
paramset = ParameterSet(
id=1,
num_nodes=10000,
peering_degree=20000,
min_queue_size=30000,
@ -48,13 +50,19 @@ class TestParameterSet(TestCase):
(ExperimentID.EXPERIMENT_4, SessionID.SESSION_1): pow(3, 6),
(ExperimentID.EXPERIMENT_1, SessionID.SESSION_2): pow(3, 4),
(ExperimentID.EXPERIMENT_4, SessionID.SESSION_2): pow(3, 6),
(ExperimentID.EXPERIMENT_1, SessionID.SESSION_2_1): pow(3, 4),
(ExperimentID.EXPERIMENT_4, SessionID.SESSION_2_1): pow(3, 5),
(ExperimentID.EXPERIMENT_1, SessionID.SESSION_2_1): pow(3, 3),
(ExperimentID.EXPERIMENT_4, SessionID.SESSION_2_1): pow(3, 4),
}
for queue_type in TemporalMixType:
for (exp_id, session_id), expected_cnt in cases.items():
sets = build_parameter_sets(exp_id, session_id, queue_type)
# Check if the number of parameter sets is correct
if not _is_min_queue_size_applicable(queue_type):
expected_cnt //= 3
self.assertEqual(expected_cnt, len(sets), f"{exp_id}: {session_id}")
# Check if all parameter sets are unique
self.assertEqual(
len(sets),
@ -62,6 +70,45 @@ class TestParameterSet(TestCase):
f"{exp_id}: {session_id}",
)
# Check if paramset IDs are correct.
# ID starts from 1.
if _is_min_queue_size_applicable(queue_type):
for i, paramset in enumerate(sets):
self.assertEqual(i + 1, paramset.id, f"{exp_id}: {session_id}")
def test_parameter_set_id_consistency(self):
cases = [
(ExperimentID.EXPERIMENT_1, SessionID.SESSION_1),
(ExperimentID.EXPERIMENT_2, SessionID.SESSION_1),
(ExperimentID.EXPERIMENT_3, SessionID.SESSION_1),
(ExperimentID.EXPERIMENT_4, SessionID.SESSION_1),
(ExperimentID.EXPERIMENT_1, SessionID.SESSION_2),
(ExperimentID.EXPERIMENT_4, SessionID.SESSION_2),
(ExperimentID.EXPERIMENT_1, SessionID.SESSION_2_1),
(ExperimentID.EXPERIMENT_4, SessionID.SESSION_2_1),
]
for exp_id, session_id in cases:
sets_with_min_queue_size = build_parameter_sets(
exp_id, session_id, TemporalMixType.PURE_COIN_FLIPPING
)
sets_without_min_queue_size = build_parameter_sets(
exp_id, session_id, TemporalMixType.NONE
)
for i, paramset in enumerate(sets_with_min_queue_size):
self.assertEqual(i + 1, paramset.id, f"{exp_id}: {session_id}")
for set in sets_without_min_queue_size:
# To compare ParameterSet instances, use the same queue type.
modified_set = deepcopy(set)
modified_set.queue_type = TemporalMixType.PURE_COIN_FLIPPING
self.assertEqual(
sets_with_min_queue_size[set.id - 1],
modified_set,
f"{exp_id}: {session_id}",
)
SAMPLE_CONFIG = Config(
num_nodes=10,