Targeted experiments for queuing mechanism

Targeted experiments for queuing mechanism

gather series into dataframe

put exp_id to the CSV path

revert iterations back to num_nodes/2

add missing print and decrease msg_interval_sec

change param sequence for readability

use struct instead of pickle for fixed-size & faster serde

include dtime series into dataframe

optimize: choose optimized connection type according to latency setting

add skip_sending_noise option

optimize filling up the queue with noises

move queue_type to the end of param set, and build CSV gradually row by row

fix: consider num_senders when waiting until all messages are disseminated

fix: sample senders without duplicate

fix: build param combinations correctly

add plot script

initialize MinSizeMixQueue with noises

define SessionParameterSet and add paramset for session2

improve topology connectivity check to avoid "maxmimum recursions depth exceeded" error

fix: the correct parameter set constructor

store individual series to separate CSV files

reorganize files and draw plot automatically

start series file id from 1 (not 0)

add queue_type CLI argument for parallelization

pretty format of elapsed time

pretty format of elapsed time

add merge CLI and draw multiple plots

split functions

do not draw plot for each session

use concurrent.futures to utilize multiprocessing

add from_paramset argument

fix: count num of finished iterations correctly

draw plots for num_sent_msgs and num_senders for specific experiments
This commit is contained in:
Youngjoon Lee 2024-07-24 15:35:41 +09:00
parent 39eabe1537
commit 3d14319588
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
23 changed files with 1106 additions and 53 deletions

View File

@ -26,5 +26,5 @@ jobs:
run: python -m unittest -v
- name: Run a short mixnet simulation
working-directory: mixnet
run: python -m cmd.main --config config.ci.yaml
run: python -m cmd.main --config sim/config.ci.yaml

1
mixnet/.gitignore vendored
View File

@ -1,2 +1,3 @@
.venv/
*.csv
*.png

View File

@ -43,7 +43,7 @@ pip install -r requirements.txt
## Getting Started
Copy the [`config.ci.yaml`](./config.ci.yaml) file and adjust the parameters to your needs.
Copy the [`sim/config.ci.yaml`](./sim/config.ci.yaml) file and adjust the parameters to your needs.
Each parameter is explained in the config file.
For more details, please refer to the [documentation](https://www.notion.so/NomMix-Sim-Getting-Started-ee0e2191f4e7437e93976aff2627d7ce?pvs=4).

39
mixnet/cmd/queuesim.py Normal file
View File

@ -0,0 +1,39 @@
import argparse
from protocol.temporalmix import TemporalMixType
from queuesim.paramset import ExperimentID, SessionID
from queuesim.queuesim import run_session
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Measure the message dissemination time with various configurations.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--exp-id", type=int, required=True, help="Experiment ID (>=1)")
parser.add_argument(
"--session-id", type=int, required=True, help="Session ID (>=1)"
)
parser.add_argument(
"--queue-type",
type=str,
required=True,
help=f"Queue type: {' | '.join([t.value for t in TemporalMixType])}",
)
parser.add_argument("--outdir", type=str, required=True, help="output directory")
parser.add_argument(
"--from-paramset",
type=int,
required=False,
default=1,
help="A parameter set ID (>=1) to start from",
)
args = parser.parse_args()
run_session(
ExperimentID(args.exp_id),
SessionID(args.session_id),
TemporalMixType(args.queue_type),
args.outdir,
args.from_paramset,
)
print("All simulations completed!")

View File

@ -0,0 +1,45 @@
import argparse
import glob
import os
from protocol.temporalmix import TemporalMixType
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Merge multiple `all_results.csv` files into a single CSV file.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--exp-id", type=int, required=True, help="Experiment ID (>=1)")
parser.add_argument(
"--session-id", type=int, required=True, help="Session ID (>=1)"
)
parser.add_argument("--indir", type=str, required=True, help="input directory")
parser.add_argument(
"--out-csv-path", type=str, required=True, help="output CSV file path"
)
args = parser.parse_args()
pattern = os.path.join(args.indir, f"queuesim_e{args.exp_id}s{args.session_id}_*")
files = glob.glob(pattern)
assert len(TemporalMixType) == len(files), f"{len(TemporalMixType)} != {len(files)}"
paths = [""] * len(TemporalMixType)
for file in files:
for i, queue_type in enumerate(TemporalMixType):
if f"_{queue_type.name}_" in file:
assert paths[i] == ""
paths[i] = file
break
assert all(path != "" for path in paths)
with open(args.out_csv_path, "w", newline="") as output:
for i, path in enumerate(paths):
with open(f"{path}/session.csv") as input:
header = input.readline()
if i == 0:
output.write(header)
for line in input:
output.write(line)
print(f"Saved to {args.out_csv_path}")

View File

@ -0,0 +1,27 @@
import argparse
import pandas as pd
from queuesim.paramset import ExperimentID, SessionID
from queuesim.plot import draw_plots
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Draw plots from a merged CSV file.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--exp-id", type=int, required=True, help="Experiment ID (>=1)")
parser.add_argument(
"--session-id", type=int, required=True, help="Session ID (>=1)"
)
parser.add_argument(
"--csv-path", type=str, required=True, help="input CSV file path"
)
parser.add_argument("--outdir", type=str, required=True, help="output directory")
args = parser.parse_args()
exp_id: ExperimentID = ExperimentID(args.exp_id)
session_id: SessionID = SessionID(args.session_id)
df = pd.read_csv(args.csv_path)
draw_plots(df, exp_id, session_id, args.outdir)

View File

@ -1,6 +1,7 @@
from typing import Any, Awaitable, Coroutine, TypeVar
import usim
from usim._primitives.task import Task, TaskCancelled
from framework import framework
@ -19,6 +20,7 @@ class Framework(framework.Framework):
# Because of the way μSim works, the scope must be created using `async with` syntax
# and be passed to this constructor.
self._scope = scope
self._tasks: list[Task] = []
def queue(self) -> framework.Queue:
return Queue()
@ -33,7 +35,13 @@ class Framework(framework.Framework):
def spawn(
self, coroutine: Coroutine[Any, Any, framework.RT]
) -> Awaitable[framework.RT]:
return self._scope.do(coroutine)
task = self._scope.do(coroutine)
self._tasks.append(task)
return task
def stop_tasks(self) -> None:
for task in self._tasks:
task.cancel()
T = TypeVar("T")

View File

@ -66,6 +66,9 @@ class MixSimplexConnection(SimplexConnection):
transmission_rate_per_sec: int,
noise_msg: bytes,
temporal_mix_config: TemporalMixConfig,
# OPTIMIZATION ONLY FOR EXPERIMENTS WITHOUT BANDWIDTH MEASUREMENT
# If True, skip sending a noise even if it's time to send one.
skip_sending_noise: bool,
):
self.framework = framework
self.queue: Queue[bytes] = TemporalMix.queue(
@ -73,12 +76,16 @@ class MixSimplexConnection(SimplexConnection):
)
self.conn = conn
self.transmission_rate_per_sec = transmission_rate_per_sec
self.noise_msg = noise_msg
self.skip_sending_noise = skip_sending_noise
self.task = framework.spawn(self.__run())
async def __run(self):
while True:
await self.framework.sleep(1 / self.transmission_rate_per_sec)
msg = await self.queue.get()
if self.skip_sending_noise and msg == self.noise_msg:
continue
await self.conn.send(msg)
async def send(self, data: bytes) -> None:

View File

@ -108,7 +108,7 @@ class Node:
inbound_conn: SimplexConnection,
outbound_conn: SimplexConnection,
):
Node.__connect(self.nomssip, peer.nomssip, inbound_conn, outbound_conn)
connect_nodes(self.nomssip, peer.nomssip, inbound_conn, outbound_conn)
def connect_broadcast(
self,
@ -116,25 +116,7 @@ class Node:
inbound_conn: SimplexConnection,
outbound_conn: SimplexConnection,
):
Node.__connect(self.broadcast, peer.broadcast, inbound_conn, outbound_conn)
@staticmethod
def __connect(
self_channel: Gossip,
peer_channel: Gossip,
inbound_conn: SimplexConnection,
outbound_conn: SimplexConnection,
):
"""
Establish a duplex connection with a peer node.
"""
if not self_channel.can_accept_conn() or not peer_channel.can_accept_conn():
raise PeeringDegreeReached()
# Register a duplex connection for its own use
self_channel.add_conn(inbound_conn, outbound_conn)
# Register a duplex connection for the peer
peer_channel.add_conn(outbound_conn, inbound_conn)
connect_nodes(self.broadcast, peer.broadcast, inbound_conn, outbound_conn)
async def send_message(self, msg: bytes):
"""
@ -148,3 +130,21 @@ class Node:
self.config.mix_path_length,
)
await self.nomssip.gossip(sphinx_packet.bytes())
def connect_nodes(
self_channel: Gossip,
peer_channel: Gossip,
inbound_conn: SimplexConnection,
outbound_conn: SimplexConnection,
):
"""
Establish a duplex connection with a peer node.
"""
if not self_channel.can_accept_conn() or not peer_channel.can_accept_conn():
raise PeeringDegreeReached()
# Register a duplex connection for its own use
self_channel.add_conn(inbound_conn, outbound_conn)
# Register a duplex connection for the peer
peer_channel.add_conn(outbound_conn, inbound_conn)

View File

@ -22,6 +22,9 @@ class NomssipConfig(GossipConfig):
transmission_rate_per_sec: int
msg_size: int
temporal_mix: TemporalMixConfig
# OPTIMIZATION ONLY FOR EXPERIMENTS WITHOUT BANDWIDTH MEASUREMENT
# If True, skip sending a noise even if it's time to send one.
skip_sending_noise: bool = False
class Nomssip(Gossip):
@ -52,6 +55,7 @@ class Nomssip(Gossip):
self.config.transmission_rate_per_sec,
noise_packet,
self.config.temporal_mix,
self.config.skip_sending_noise,
),
)
@ -72,7 +76,7 @@ class Nomssip(Gossip):
Gossip a message to all connected peers with prepending a message flag
"""
# The message size must be fixed.
assert len(msg) == self.config.msg_size
assert len(msg) == self.config.msg_size, f"{len(msg)} != {self.config.msg_size}"
packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg)
await self.__gossip_flagged_packet(packet)

View File

@ -111,12 +111,17 @@ class MixQueue(Queue[T]):
class MinSizeMixQueue(MixQueue[T]):
def __init__(self, min_pool_size: int, rng: random.Random, noise_msg: T):
super().__init__(rng, noise_msg)
# Initialize the queue with noise messages
# to ensure that the queue size is at least `min_pool_size`.
self._queue = [self._noise_msg] * min_pool_size
self._mix_pool_size = min_pool_size
@abstractmethod
async def get(self) -> T:
while len(self._queue) < self._mix_pool_size:
self._queue.append(self._noise_msg)
if len(self._queue) < self._mix_pool_size:
self._queue.extend(
[self._noise_msg] * (self._mix_pool_size - len(self._queue))
)
# Subclass must implement this method
pass

17
mixnet/queuesim/config.py Normal file
View File

@ -0,0 +1,17 @@
import random
from dataclasses import dataclass
from protocol.nomssip import NomssipConfig
from sim.config import LatencyConfig, TopologyConfig
@dataclass
class Config:
num_nodes: int
nomssip: NomssipConfig
topology: TopologyConfig
latency: LatencyConfig
num_sent_msgs: int
msg_interval_sec: float
num_senders: int
sender_generator: random.Random

32
mixnet/queuesim/node.py Normal file
View File

@ -0,0 +1,32 @@
from __future__ import annotations
from typing import Awaitable, Callable
from framework.framework import Framework
from protocol.connection import SimplexConnection
from protocol.node import connect_nodes
from protocol.nomssip import Nomssip, NomssipConfig
class Node:
def __init__(
self,
framework: Framework,
nomssip_config: NomssipConfig,
msg_handler: Callable[[bytes], Awaitable[None]],
):
self.nomssip = Nomssip(framework, nomssip_config, msg_handler)
def connect(
self,
peer: Node,
inbound_conn: SimplexConnection,
outbound_conn: SimplexConnection,
):
connect_nodes(self.nomssip, peer.nomssip, inbound_conn, outbound_conn)
async def send_message(self, msg: bytes):
"""
Send the message via Nomos Gossip to all connected peers.
"""
await self.nomssip.gossip(msg)

219
mixnet/queuesim/paramset.py Normal file
View File

@ -0,0 +1,219 @@
from __future__ import annotations
import itertools
from dataclasses import dataclass
from enum import Enum
from protocol.temporalmix import TemporalMixType
from queuesim.config import Config
class ExperimentID(Enum):
EXPERIMENT_1 = 1
EXPERIMENT_2 = 2
EXPERIMENT_3 = 3
EXPERIMENT_4 = 4
class SessionID(Enum):
SESSION_1 = 1
SESSION_2 = 2
EXPERIMENT_TITLES: dict[ExperimentID, str] = {
ExperimentID.EXPERIMENT_1: "Single Sender - Single Message",
ExperimentID.EXPERIMENT_2: "Single Sender - Multiple Messages",
ExperimentID.EXPERIMENT_3: "Multiple Senders - Single Message",
ExperimentID.EXPERIMENT_4: "Multiple Senders - Multiple Messages",
}
@dataclass
class ParameterSet:
num_nodes: int
peering_degree: int
min_queue_size: int
transmission_rate: int
num_sent_msgs: int
num_senders: int
queue_type: TemporalMixType
num_iterations: int
def apply_to(self, cfg: Config) -> None:
cfg.num_nodes = self.num_nodes
cfg.nomssip.peering_degree = self.peering_degree
cfg.nomssip.temporal_mix.min_queue_size = self.min_queue_size
cfg.nomssip.transmission_rate_per_sec = self.transmission_rate
cfg.num_sent_msgs = self.num_sent_msgs
cfg.num_senders = self.num_senders
cfg.nomssip.temporal_mix.mix_type = self.queue_type
def build_parameter_sets(
exp_id: ExperimentID, session_id: SessionID, queue_type: TemporalMixType
) -> list[ParameterSet]:
match session_id:
case SessionID.SESSION_1:
return __build_session_1_parameter_sets(exp_id, queue_type)
case SessionID.SESSION_2:
return __build_session_2_parameter_sets(exp_id, queue_type)
case _:
raise ValueError(f"Unknown session ID: {session_id}")
def __build_session_1_parameter_sets(
exp_id: ExperimentID, queue_type: TemporalMixType
) -> list[ParameterSet]:
sets: list[ParameterSet] = []
for num_nodes in [20, 40, 80]:
peering_degree_list = [num_nodes // 5, num_nodes // 4, num_nodes // 2]
min_queue_size_list = [num_nodes // 2, num_nodes, num_nodes * 2]
transmission_rate_list = [num_nodes // 2, num_nodes, num_nodes * 2]
num_sent_msgs_list = [8, 16, 32]
num_senders_list = [num_nodes // 10, num_nodes // 5, num_nodes // 2]
num_iterations = num_nodes // 2
match exp_id:
case ExperimentID.EXPERIMENT_1:
for (
peering_degree,
min_queue_size,
transmission_rate,
) in itertools.product(
peering_degree_list,
min_queue_size_list,
transmission_rate_list,
):
sets.append(
ParameterSet(
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
transmission_rate=transmission_rate,
num_sent_msgs=1,
num_senders=1,
queue_type=queue_type,
num_iterations=num_iterations,
)
)
case ExperimentID.EXPERIMENT_2:
for (
peering_degree,
min_queue_size,
transmission_rate,
num_sent_msgs,
) in itertools.product(
peering_degree_list,
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
):
sets.append(
ParameterSet(
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
transmission_rate=transmission_rate,
num_sent_msgs=num_sent_msgs,
num_senders=1,
queue_type=queue_type,
num_iterations=num_iterations,
)
)
case ExperimentID.EXPERIMENT_3:
for (
peering_degree,
min_queue_size,
transmission_rate,
num_senders,
) in itertools.product(
peering_degree_list,
min_queue_size_list,
transmission_rate_list,
num_senders_list,
):
sets.append(
ParameterSet(
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
transmission_rate=transmission_rate,
num_sent_msgs=1,
num_senders=num_senders,
queue_type=queue_type,
num_iterations=num_iterations,
)
)
case ExperimentID.EXPERIMENT_4:
for (
peering_degree,
min_queue_size,
transmission_rate,
num_sent_msgs,
num_senders,
) in itertools.product(
peering_degree_list,
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
num_senders_list,
):
sets.append(
ParameterSet(
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
transmission_rate=transmission_rate,
num_sent_msgs=num_sent_msgs,
num_senders=num_senders,
queue_type=queue_type,
num_iterations=num_iterations,
)
)
case _:
raise ValueError(f"Unknown experiment ID: {exp_id}")
return sets
def __build_session_2_parameter_sets(
exp_id: ExperimentID, queue_type: TemporalMixType
) -> list[ParameterSet]:
sets: list[ParameterSet] = []
for num_nodes in [100, 1000, 10000]:
peering_degree_list = [4, 8, 16]
min_queue_size_list = [10, 50, 100]
transmission_rate_list = [1, 10, 100]
num_iterations = 20
match exp_id:
case ExperimentID.EXPERIMENT_1:
for (
peering_degree,
min_queue_size,
transmission_rate,
) in itertools.product(
peering_degree_list,
min_queue_size_list,
transmission_rate_list,
):
sets.append(
ParameterSet(
num_nodes=num_nodes,
peering_degree=peering_degree,
min_queue_size=min_queue_size,
transmission_rate=transmission_rate,
num_sent_msgs=1,
num_senders=1,
queue_type=queue_type,
num_iterations=num_iterations,
)
)
case _:
raise NotImplementedError(
f"Experiment {exp_id} not implemented for session 2"
)
return sets

133
mixnet/queuesim/plot.py Normal file
View File

@ -0,0 +1,133 @@
import os
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from queuesim.paramset import ExperimentID, SessionID
BOXPLOT_VALUE_VARS = [
"dtime_min",
"dtime_25%",
"dtime_50%",
"dtime_mean",
"dtime_75%",
"dtime_max",
]
PARAM_SET = [
"num_nodes",
"peering_degree",
"min_queue_size",
"transmission_rate",
"num_sent_msgs",
"num_senders",
]
def draw_plots(
df: pd.DataFrame, exp_id: ExperimentID, session_id: SessionID, outdir: str
):
assert os.path.exists(outdir)
__overview_by_queue_type(df, exp_id, session_id, f"{outdir}/plot_overview.png")
num_nodes = int(df["num_nodes"].min())
for param in ["peering_degree", "min_queue_size", "transmission_rate"]:
__impact_of_param_by_queue_type(
df, exp_id, session_id, num_nodes, param, f"{outdir}/plot_{param}.png"
)
if exp_id == ExperimentID(2) or exp_id == ExperimentID(4):
__impact_of_param_by_queue_type(
df,
exp_id,
session_id,
num_nodes,
"num_sent_msgs",
f"{outdir}/plot_num_sent_msgs.png",
)
if exp_id == ExperimentID(3) or exp_id == ExperimentID(4):
__impact_of_param_by_queue_type(
df,
exp_id,
session_id,
num_nodes,
"num_senders",
f"{outdir}/plot_num_senders.png",
)
def __param_set_legend(row):
legend = ""
for i, param in enumerate(PARAM_SET):
if i > 0:
legend += ", "
legend += f"{param}:{row[param]}"
return legend
def __overview_by_queue_type(
df: pd.DataFrame,
exp_id: ExperimentID,
session_id: SessionID,
out_path: str,
):
df = df.drop_duplicates(subset=["num_nodes", "queue_type"])
print(df)
__draw_plot_by_queue_type(
df, f"{exp_id.name}: {session_id.name}: Overview", out_path
)
def __impact_of_param_by_queue_type(
df: pd.DataFrame,
exp_id: ExperimentID,
session_id: SessionID,
num_nodes: int,
param: str,
out_path: str,
):
df = pd.DataFrame(df[df["num_nodes"] == num_nodes])
df = df.drop_duplicates(subset=[param, "queue_type"])
print(df)
__draw_plot_by_queue_type(
df,
f"{exp_id.name}: {session_id.name}: Impact of {param} ({num_nodes} nodes)",
out_path,
)
def __draw_plot_by_queue_type(df: pd.DataFrame, title: str, out_path: str):
# Add a column that will be used as a legend
df["parameter_set"] = df.apply(__param_set_legend, axis=1)
# Prepare DataFrame in long format for seaborn boxplot
long_format_df = pd.melt(
df,
id_vars=["queue_type", "parameter_set"],
value_vars=BOXPLOT_VALUE_VARS,
var_name="dtime_metric",
value_name="dtime",
)
# Plotting
plt.figure(figsize=(15, 10))
sns.boxplot(data=long_format_df, x="queue_type", y="dtime", hue="parameter_set")
plt.title(title)
plt.xlabel("Queue Type")
plt.ylabel("Dissemination Time")
plt.legend(loc="upper right", ncol=1)
# Adding vertical grid lines between x elements
plt.grid(axis="y")
plt.gca().set_xticks(
[i - 0.5 for i in range(1, len(df["queue_type"].unique()))], minor=True
)
plt.grid(which="minor", axis="x", linestyle="--")
plt.tight_layout()
# Save the plot as a PNG file
assert not os.path.exists(out_path)
plt.savefig(out_path)
plt.draw()

254
mixnet/queuesim/queuesim.py Normal file
View File

@ -0,0 +1,254 @@
import concurrent.futures
import itertools
import os
import random
import time
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timedelta
import pandas as pd
import usim
from protocol.nomssip import NomssipConfig
from protocol.temporalmix import TemporalMixConfig, TemporalMixType
from queuesim.config import Config
from queuesim.paramset import (
EXPERIMENT_TITLES,
ExperimentID,
ParameterSet,
SessionID,
build_parameter_sets,
)
from queuesim.simulation import Simulation
from sim.config import LatencyConfig, TopologyConfig
DEFAULT_CONFIG = Config(
num_nodes=10,
nomssip=NomssipConfig(
peering_degree=3,
transmission_rate_per_sec=10,
msg_size=8,
temporal_mix=TemporalMixConfig(
mix_type=TemporalMixType.NONE,
min_queue_size=10,
seed_generator=random.Random(0),
),
skip_sending_noise=True,
),
topology=TopologyConfig(
seed=random.Random(0),
),
latency=LatencyConfig(
min_latency_sec=0,
max_latency_sec=0,
seed=random.Random(0),
),
num_sent_msgs=1,
msg_interval_sec=0.1,
num_senders=1,
sender_generator=random.Random(0),
)
RESULT_COLUMNS = [
"paramset",
"num_nodes",
"peering_degree",
"min_queue_size",
"transmission_rate",
"num_sent_msgs",
"num_senders",
"queue_type",
"num_iterations",
"dtime_count",
"dtime_mean",
"dtime_std",
"dtime_min",
"dtime_25%",
"dtime_50%",
"dtime_75%",
"dtime_max",
]
def run_session(
exp_id: ExperimentID,
session_id: SessionID,
queue_type: TemporalMixType,
outdir: str,
from_paramset: int = 1,
):
print("******************************************************************")
print(f"{exp_id.name}: {session_id.name}: {EXPERIMENT_TITLES[exp_id]}")
print(f"Queue type: {queue_type.name}")
print("******************************************************************")
# Create a directory and initialize a CSV file only with a header
assert os.path.isdir(outdir)
subdir = f"__WIP__queuesim_e{exp_id.value}s{session_id.value}_{queue_type.name}_{datetime.now().isoformat()}___DUR__"
os.makedirs(f"{outdir}/{subdir}")
session_result_path = f"{outdir}/{subdir}/session.csv"
assert not os.path.exists(session_result_path)
pd.DataFrame(columns=pd.Series(RESULT_COLUMNS)).to_csv(
session_result_path, index=False
)
print(f"Initialized a CSV file: {session_result_path}")
# Prepare all parameter sets of the session
paramsets = build_parameter_sets(exp_id, session_id, queue_type)
assert 1 <= from_paramset <= len(paramsets)
# Run the simulations for each parameter set, using multi processes
session_start_time = time.time()
future_map: dict[
concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int]
] = dict()
with concurrent.futures.ProcessPoolExecutor() as executor:
# Submit all iterations of all parameter sets to the ProcessPoolExecutor
for paramset_idx, paramset in enumerate(paramsets):
paramset_id = paramset_idx + 1
if paramset_id < from_paramset:
continue
future_map.update(__submit_iterations(paramset_id, paramset, executor))
# Collect results of each iteration
paramset_results: dict[int, tuple[set[int], list[float]]] = defaultdict(
lambda: (set(), [])
)
paramsets_done: set[int] = set()
for future in concurrent.futures.as_completed(future_map):
paramset_id, paramset, iter_idx = future_map[future]
paramset_results[paramset_id][0].add(iter_idx)
paramset_results[paramset_id][1].extend(future.result())
# If all iterations of the paramset are done, process the results
if len(paramset_results[paramset_id][0]) == paramset.num_iterations:
paramsets_done.add(paramset_id)
print("================================================")
print(f"ParamSet-{paramset_id} is done. Processing results...")
print(
f"Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far."
)
print("------------------------------------------------")
__process_paramset_result(
paramset_id,
paramset,
paramset_results[paramset_id][1],
session_result_path,
f"{outdir}/{subdir}/paramset_{paramset_id}.csv",
)
print("================================================")
session_elapsed_time = time.time() - session_start_time
session_elapsed_time_str = __format_elapsed_time(session_elapsed_time)
# Load the completed session CSV file, sort rows by paramset_id,
# and overrite the file with the sorted rows.
pd.read_csv(session_result_path).sort_values(by="paramset").to_csv(
session_result_path, index=False
)
# Rename the WIP directory to the final name
new_subdir = subdir.replace("__WIP__", "").replace(
"__DUR__", session_elapsed_time_str
)
assert not os.path.exists(f"{outdir}/{new_subdir}")
os.rename(f"{outdir}/{subdir}", f"{outdir}/{new_subdir}")
print("******************************************************************")
print(f"Session Elapsed Time: {session_elapsed_time_str}")
print(f"Renamed the WIP directory to {outdir}/{new_subdir}")
print("******************************************************************")
def __submit_iterations(
paramset_id: int,
paramset: ParameterSet,
executor: concurrent.futures.ProcessPoolExecutor,
) -> dict[concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int]]:
"""
Submit all iterations of the given parameter set to the executor,
so that they can be ran by the ProcessPoolExecutor.
"""
# Prepare the configuration for the parameter set
cfg = deepcopy(DEFAULT_CONFIG)
paramset.apply_to(cfg)
print(
f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}"
)
future_map: dict[
concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int]
] = dict()
for i in range(paramset.num_iterations):
# Update seeds for the current iteration
# Deepcopy the cfg to avoid the same cfg instance between iteration jobs.
iter_cfg = deepcopy(cfg)
iter_cfg.nomssip.temporal_mix.seed_generator = random.Random(i)
iter_cfg.topology.seed = random.Random(i)
iter_cfg.latency.seed = random.Random(i)
iter_cfg.sender_generator = random.Random(i)
# Submit the iteration to the executor
future = executor.submit(__run_iteration, iter_cfg)
future_map[future] = (paramset_id, paramset, i)
return future_map
def __run_iteration(cfg: Config) -> list[float]:
"""
Run a single iteration of a certain parameter set.
The iteration uses the independent uSim instance.
"""
sim = Simulation(cfg)
usim.run(sim.run())
return sim.dissemination_times
def __process_paramset_result(
paramset_id: int,
paramset: ParameterSet,
dissemination_times: list[float],
session_result_path: str,
paramset_result_path: str,
):
"""
Convert the result into a pd.Series, store the Series into a CSV file,
and append the summary of Series to the session CSV file.
"""
series = pd.Series(dissemination_times)
stats = series.describe()
result = {
"paramset": paramset_id,
"num_nodes": paramset.num_nodes,
"peering_degree": paramset.peering_degree,
"min_queue_size": paramset.min_queue_size,
"transmission_rate": paramset.transmission_rate,
"num_sent_msgs": paramset.num_sent_msgs,
"num_senders": paramset.num_senders,
"queue_type": paramset.queue_type.name,
"num_iterations": paramset.num_iterations,
"dtime_count": stats["count"],
"dtime_mean": stats["mean"],
"dtime_std": stats["std"],
"dtime_min": stats["min"],
"dtime_25%": stats["25%"],
"dtime_50%": stats["50%"],
"dtime_75%": stats["75%"],
"dtime_max": stats["max"],
}
assert result.keys() == set(RESULT_COLUMNS)
pd.DataFrame([result]).to_csv(
session_result_path, mode="a", header=False, index=False
)
print(f"Appended a row to {session_result_path}")
series.to_csv(paramset_result_path, header=False, index=False)
print(f"Stored the dissemination times to {paramset_result_path}")
def __format_elapsed_time(elapsed_time: float) -> str:
td = timedelta(seconds=elapsed_time)
hours, reminder = divmod(td.seconds, 3600)
minutes, seconds = divmod(reminder, 60)
return f"{td.days}d{hours:02}h{minutes:02}m{seconds:02}s"

View File

@ -0,0 +1,149 @@
import random
import struct
from dataclasses import dataclass
from random import Random
from typing import Counter, Self
import usim
from framework.framework import Queue
from framework.usim import Framework
from protocol.connection import LocalSimplexConnection, SimplexConnection
from protocol.nomssip import NomssipConfig
from protocol.temporalmix import TemporalMixConfig, TemporalMixType
from queuesim.config import Config
from queuesim.node import Node
from sim.config import LatencyConfig
from sim.connection import RemoteSimplexConnection
from sim.topology import build_full_random_topology
class Simulation:
"""
Manages the entire cycle of simulation: initialization, running, and analysis.
"""
def __init__(self, config: Config):
self.config = config
async def run(self):
async with usim.Scope() as scope:
self.framework = Framework(scope)
self.message_builder = MessageBuilder(self.framework)
self.dissemination_times = await self.__run()
self.framework.stop_tasks()
async def __run(self) -> list[float]:
self.received_msg_queue: Queue[tuple[float, bytes]] = self.framework.queue()
# Run and connect nodes
nodes = self.__run_nodes()
self.__connect_nodes(nodes)
# Choose and start senders
senders = self.config.sender_generator.sample(nodes, k=self.config.num_senders)
for sender in senders:
self.framework.spawn(self.__run_sender(sender))
# To count how many nodes have received each message
received_msg_counters: Counter[bytes] = Counter()
# To collect the dissemination times of each message.
dissemination_times: list[float] = []
# Wait until all messages are disseminated to the entire network.
while (
len(dissemination_times)
< self.config.num_sent_msgs * self.config.num_senders
):
# Wait until a node notifies that it has received a new message.
received_time, msg = await self.received_msg_queue.get()
# If the message has been received by all nodes, calculate the dissemination time.
received_msg_counters.update([msg])
if received_msg_counters[msg] == len(nodes):
dissemination_times.append(
received_time - Message.from_bytes(msg).sent_time
)
return dissemination_times
def __run_nodes(self) -> list[Node]:
return [
Node(
self.framework,
self.config.nomssip,
self.__process_msg,
)
for _ in range(self.config.num_nodes)
]
async def __process_msg(self, msg: bytes) -> None:
"""
A handler to process messages received via Nomos Gossip channel
"""
# Notify that a new message has been received by the node.
# The received time is also included in the notification.
await self.received_msg_queue.put((self.framework.now(), msg))
def __connect_nodes(self, nodes: list[Node]):
topology = build_full_random_topology(
rng=self.config.topology.seed,
num_nodes=len(nodes),
peering_degree=self.config.nomssip.peering_degree,
)
# Sort the topology by node index for the connection RULE defined below.
for node_idx, peer_indices in sorted(topology.items()):
for peer_idx in peer_indices:
# Since the topology is undirected, we only need to connect the two nodes once.
# RULE: the node with the smaller index establishes the connection.
assert node_idx != peer_idx
if node_idx > peer_idx:
continue
# Connect the node and peer for Nomos Gossip
node = nodes[node_idx]
peer = nodes[peer_idx]
node.connect(peer, self.__create_conn(), self.__create_conn())
def __create_conn(self) -> SimplexConnection:
# If latency is always zero, use the local connection which is the lightest.
if (
self.config.latency.min_latency_sec
== self.config.latency.max_latency_sec
== 0
):
return LocalSimplexConnection(self.framework)
else:
return RemoteSimplexConnection(
self.config.latency,
self.framework,
)
async def __run_sender(self, sender: Node):
for i in range(self.config.num_sent_msgs):
if i > 0:
await self.framework.sleep(self.config.msg_interval_sec)
msg = bytes(self.message_builder.next())
await sender.send_message(msg)
@dataclass
class Message:
id: int
sent_time: float
def __bytes__(self) -> bytes:
return struct.pack("if", self.id, self.sent_time)
@classmethod
def from_bytes(cls, data: bytes) -> Self:
id, sent_from = struct.unpack("if", data)
return cls(id, sent_from)
class MessageBuilder:
def __init__(self, framework: Framework):
self.framework = framework
self.next_id = 0
def next(self) -> Message:
msg = Message(self.next_id, self.framework.now())
self.next_id += 1
return msg

View File

@ -0,0 +1,88 @@
import random
from copy import deepcopy
from unittest import TestCase
from protocol.nomssip import NomssipConfig
from protocol.temporalmix import TemporalMixConfig, TemporalMixType
from queuesim.config import Config
from queuesim.paramset import (
ExperimentID,
ParameterSet,
SessionID,
build_parameter_sets,
)
from sim.config import LatencyConfig, TopologyConfig
class TestParameterSet(TestCase):
def test_apply_to_config(self):
paramset = ParameterSet(
num_nodes=10000,
peering_degree=20000,
min_queue_size=30000,
transmission_rate=40000,
num_sent_msgs=50000,
num_senders=60000,
queue_type=TemporalMixType.NOISY_COIN_FLIPPING,
num_iterations=70000,
)
config = deepcopy(SAMPLE_CONFIG)
paramset.apply_to(config)
self.assertEqual(paramset.num_nodes, config.num_nodes)
self.assertEqual(paramset.peering_degree, config.nomssip.peering_degree)
self.assertEqual(
paramset.min_queue_size, config.nomssip.temporal_mix.min_queue_size
)
self.assertEqual(
paramset.transmission_rate, config.nomssip.transmission_rate_per_sec
)
self.assertEqual(paramset.num_sent_msgs, config.num_sent_msgs)
self.assertEqual(paramset.num_senders, config.num_senders)
self.assertEqual(paramset.queue_type, config.nomssip.temporal_mix.mix_type)
def test_build_parameter_sets(self):
cases = {
(ExperimentID.EXPERIMENT_1, SessionID.SESSION_1): pow(3, 4),
(ExperimentID.EXPERIMENT_2, SessionID.SESSION_1): pow(3, 5),
(ExperimentID.EXPERIMENT_3, SessionID.SESSION_1): pow(3, 5),
(ExperimentID.EXPERIMENT_4, SessionID.SESSION_1): pow(3, 6),
(ExperimentID.EXPERIMENT_1, SessionID.SESSION_2): pow(3, 4),
}
for queue_type in TemporalMixType:
for (exp_id, session_id), expected_cnt in cases.items():
sets = build_parameter_sets(exp_id, session_id, queue_type)
self.assertEqual(expected_cnt, len(sets), f"{exp_id}: {session_id}")
# Check if all parameter sets are unique
self.assertEqual(
len(sets),
len(set(list(map(str, sets)))),
f"{exp_id}: {session_id}",
)
SAMPLE_CONFIG = Config(
num_nodes=10,
nomssip=NomssipConfig(
peering_degree=3,
transmission_rate_per_sec=10,
msg_size=8,
temporal_mix=TemporalMixConfig(
mix_type=TemporalMixType.NONE,
min_queue_size=10,
seed_generator=random.Random(0),
),
skip_sending_noise=True,
),
topology=TopologyConfig(
seed=random.Random(0),
),
latency=LatencyConfig(
min_latency_sec=0,
max_latency_sec=0,
seed=random.Random(0),
),
num_sent_msgs=1,
msg_interval_sec=0.1,
num_senders=1,
sender_generator=random.Random(0),
)

View File

@ -3,4 +3,5 @@ pysphinx==0.0.5
dacite==1.8.1
pandas==2.2.2
matplotlib==3.9.1
seaborn==0.13.2
PyYAML==6.0.1

View File

@ -1,4 +1,5 @@
import math
from abc import abstractmethod
from collections import Counter
from typing import Awaitable
@ -11,17 +12,12 @@ from sim.config import LatencyConfig, NetworkConfig
from sim.state import NodeState
class MeteredRemoteSimplexConnection(SimplexConnection):
class RemoteSimplexConnection(SimplexConnection):
"""
A simplex connection implementation that simulates network latency and measures bandwidth usages.
A simplex connection implementation that simulates network latency.
"""
def __init__(
self,
config: LatencyConfig,
framework: Framework,
meter_start_time: float,
):
def __init__(self, config: LatencyConfig, framework: Framework):
self.framework = framework
# A connection has a random constant latency
self.latency = config.random_latency()
@ -32,10 +28,6 @@ class MeteredRemoteSimplexConnection(SimplexConnection):
self.relayer = framework.spawn(self.__run_relayer())
# A queue where a receiver gets messages
self.recv_queue: Queue[bytes] = framework.queue()
# To measure bandwidth usages
self.meter_start_time = meter_start_time
self.send_meters: list[int] = []
self.recv_meters: list[int] = []
async def send(self, data: bytes) -> None:
await self.send_queue.put((self.framework.now(), data))
@ -57,17 +49,45 @@ class MeteredRemoteSimplexConnection(SimplexConnection):
await self.framework.sleep(delay)
# Relay msg to the recv_queue.
# Update related statistics before msg is read from recv_queue by the receiver
# Call on_receiving (e.g. for updating stats) before msg is read from recv_queue by the receiver
# because the time at which enters the node is important when viewed from the outside.
self.on_receiving(data)
await self.recv_queue.put(data)
def on_sending(self, data: bytes) -> None:
# Should be overridden by subclass
pass
def on_receiving(self, data: bytes) -> None:
# Should be overridden by subclass
pass
class MeteredRemoteSimplexConnection(RemoteSimplexConnection):
"""
An extension of RemoteSimplexConnection that measures bandwidth usages.
"""
def __init__(
self,
config: LatencyConfig,
framework: Framework,
meter_start_time: float,
):
super().__init__(config, framework)
# To measure bandwidth usages
self.meter_start_time = meter_start_time
self.send_meters: list[int] = []
self.recv_meters: list[int] = []
@override
def on_sending(self, data: bytes) -> None:
"""
Update statistics when sending a message
"""
self.__update_meter(self.send_meters, len(data))
@override
def on_receiving(self, data: bytes) -> None:
"""
Update statistics when receiving a message

View File

@ -6,8 +6,8 @@ from sim.topology import are_all_nodes_connected, build_full_random_topology
class TestTopology(TestCase):
def test_full_random(self):
num_nodes = 100
peering_degree = 6
num_nodes = 10000
peering_degree = 16
topology = build_full_random_topology(
random.Random(0), num_nodes, peering_degree
)

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import random
from collections import defaultdict
@ -43,16 +45,18 @@ def build_full_random_topology(
def are_all_nodes_connected(topology: Topology) -> bool:
visited = set()
def dfs(topology: Topology, node: int) -> None:
if node in visited:
return
visited.add(node)
for peer in topology[node]:
dfs(topology, peer)
# Start DFS from the first node
dfs(topology, next(iter(topology)))
visited = dfs(topology, next(iter(topology)))
return len(visited) == len(topology)
def dfs(topology: Topology, start_node: int) -> set[int]:
visited: set[int] = set()
stack = [start_node]
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
stack.extend(peer for peer in topology[node] if peer not in visited)
return visited