diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c2628e9..c189215 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -26,5 +26,5 @@ jobs: run: python -m unittest -v - name: Run a short mixnet simulation working-directory: mixnet - run: python -m cmd.main --config config.ci.yaml + run: python -m cmd.main --config sim/config.ci.yaml diff --git a/mixnet/.gitignore b/mixnet/.gitignore index d05906f..c9455d7 100644 --- a/mixnet/.gitignore +++ b/mixnet/.gitignore @@ -1,2 +1,3 @@ .venv/ *.csv +*.png diff --git a/mixnet/README.md b/mixnet/README.md index bed1aa6..c43f8a9 100644 --- a/mixnet/README.md +++ b/mixnet/README.md @@ -43,7 +43,7 @@ pip install -r requirements.txt ## Getting Started -Copy the [`config.ci.yaml`](./config.ci.yaml) file and adjust the parameters to your needs. +Copy the [`sim/config.ci.yaml`](./sim/config.ci.yaml) file and adjust the parameters to your needs. Each parameter is explained in the config file. For more details, please refer to the [documentation](https://www.notion.so/NomMix-Sim-Getting-Started-ee0e2191f4e7437e93976aff2627d7ce?pvs=4). diff --git a/mixnet/cmd/queuesim.py b/mixnet/cmd/queuesim.py new file mode 100644 index 0000000..0afb80c --- /dev/null +++ b/mixnet/cmd/queuesim.py @@ -0,0 +1,39 @@ +import argparse + +from protocol.temporalmix import TemporalMixType +from queuesim.paramset import ExperimentID, SessionID +from queuesim.queuesim import run_session + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Measure the message dissemination time with various configurations.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("--exp-id", type=int, required=True, help="Experiment ID (>=1)") + parser.add_argument( + "--session-id", type=int, required=True, help="Session ID (>=1)" + ) + parser.add_argument( + "--queue-type", + type=str, + required=True, + help=f"Queue type: {' | '.join([t.value for t in TemporalMixType])}", + ) + parser.add_argument("--outdir", type=str, required=True, help="output directory") + parser.add_argument( + "--from-paramset", + type=int, + required=False, + default=1, + help="A parameter set ID (>=1) to start from", + ) + args = parser.parse_args() + + run_session( + ExperimentID(args.exp_id), + SessionID(args.session_id), + TemporalMixType(args.queue_type), + args.outdir, + args.from_paramset, + ) + print("All simulations completed!") diff --git a/mixnet/cmd/queuesim_merge.py b/mixnet/cmd/queuesim_merge.py new file mode 100644 index 0000000..6e21fc7 --- /dev/null +++ b/mixnet/cmd/queuesim_merge.py @@ -0,0 +1,45 @@ +import argparse +import glob +import os + +from protocol.temporalmix import TemporalMixType + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Merge multiple `all_results.csv` files into a single CSV file.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("--exp-id", type=int, required=True, help="Experiment ID (>=1)") + parser.add_argument( + "--session-id", type=int, required=True, help="Session ID (>=1)" + ) + parser.add_argument("--indir", type=str, required=True, help="input directory") + parser.add_argument( + "--out-csv-path", type=str, required=True, help="output CSV file path" + ) + args = parser.parse_args() + + pattern = os.path.join(args.indir, f"queuesim_e{args.exp_id}s{args.session_id}_*") + files = glob.glob(pattern) + assert len(TemporalMixType) == len(files), f"{len(TemporalMixType)} != {len(files)}" + + paths = [""] * len(TemporalMixType) + for file in files: + for i, queue_type in enumerate(TemporalMixType): + if f"_{queue_type.name}_" in file: + assert paths[i] == "" + paths[i] = file + break + assert all(path != "" for path in paths) + + with open(args.out_csv_path, "w", newline="") as output: + for i, path in enumerate(paths): + with open(f"{path}/session.csv") as input: + header = input.readline() + if i == 0: + output.write(header) + + for line in input: + output.write(line) + + print(f"Saved to {args.out_csv_path}") diff --git a/mixnet/cmd/queuesim_plot.py b/mixnet/cmd/queuesim_plot.py new file mode 100644 index 0000000..d727a04 --- /dev/null +++ b/mixnet/cmd/queuesim_plot.py @@ -0,0 +1,27 @@ +import argparse + +import pandas as pd + +from queuesim.paramset import ExperimentID, SessionID +from queuesim.plot import draw_plots + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Draw plots from a merged CSV file.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("--exp-id", type=int, required=True, help="Experiment ID (>=1)") + parser.add_argument( + "--session-id", type=int, required=True, help="Session ID (>=1)" + ) + parser.add_argument( + "--csv-path", type=str, required=True, help="input CSV file path" + ) + parser.add_argument("--outdir", type=str, required=True, help="output directory") + args = parser.parse_args() + + exp_id: ExperimentID = ExperimentID(args.exp_id) + session_id: SessionID = SessionID(args.session_id) + df = pd.read_csv(args.csv_path) + + draw_plots(df, exp_id, session_id, args.outdir) diff --git a/mixnet/framework/usim.py b/mixnet/framework/usim.py index 67ededa..749cbb0 100644 --- a/mixnet/framework/usim.py +++ b/mixnet/framework/usim.py @@ -1,6 +1,7 @@ from typing import Any, Awaitable, Coroutine, TypeVar import usim +from usim._primitives.task import Task, TaskCancelled from framework import framework @@ -19,6 +20,7 @@ class Framework(framework.Framework): # Because of the way μSim works, the scope must be created using `async with` syntax # and be passed to this constructor. self._scope = scope + self._tasks: list[Task] = [] def queue(self) -> framework.Queue: return Queue() @@ -33,7 +35,13 @@ class Framework(framework.Framework): def spawn( self, coroutine: Coroutine[Any, Any, framework.RT] ) -> Awaitable[framework.RT]: - return self._scope.do(coroutine) + task = self._scope.do(coroutine) + self._tasks.append(task) + return task + + def stop_tasks(self) -> None: + for task in self._tasks: + task.cancel() T = TypeVar("T") diff --git a/mixnet/protocol/connection.py b/mixnet/protocol/connection.py index 11a7c1f..e0301ee 100644 --- a/mixnet/protocol/connection.py +++ b/mixnet/protocol/connection.py @@ -66,6 +66,9 @@ class MixSimplexConnection(SimplexConnection): transmission_rate_per_sec: int, noise_msg: bytes, temporal_mix_config: TemporalMixConfig, + # OPTIMIZATION ONLY FOR EXPERIMENTS WITHOUT BANDWIDTH MEASUREMENT + # If True, skip sending a noise even if it's time to send one. + skip_sending_noise: bool, ): self.framework = framework self.queue: Queue[bytes] = TemporalMix.queue( @@ -73,12 +76,16 @@ class MixSimplexConnection(SimplexConnection): ) self.conn = conn self.transmission_rate_per_sec = transmission_rate_per_sec + self.noise_msg = noise_msg + self.skip_sending_noise = skip_sending_noise self.task = framework.spawn(self.__run()) async def __run(self): while True: await self.framework.sleep(1 / self.transmission_rate_per_sec) msg = await self.queue.get() + if self.skip_sending_noise and msg == self.noise_msg: + continue await self.conn.send(msg) async def send(self, data: bytes) -> None: diff --git a/mixnet/protocol/node.py b/mixnet/protocol/node.py index 5818694..4d7f157 100644 --- a/mixnet/protocol/node.py +++ b/mixnet/protocol/node.py @@ -108,7 +108,7 @@ class Node: inbound_conn: SimplexConnection, outbound_conn: SimplexConnection, ): - Node.__connect(self.nomssip, peer.nomssip, inbound_conn, outbound_conn) + connect_nodes(self.nomssip, peer.nomssip, inbound_conn, outbound_conn) def connect_broadcast( self, @@ -116,25 +116,7 @@ class Node: inbound_conn: SimplexConnection, outbound_conn: SimplexConnection, ): - Node.__connect(self.broadcast, peer.broadcast, inbound_conn, outbound_conn) - - @staticmethod - def __connect( - self_channel: Gossip, - peer_channel: Gossip, - inbound_conn: SimplexConnection, - outbound_conn: SimplexConnection, - ): - """ - Establish a duplex connection with a peer node. - """ - if not self_channel.can_accept_conn() or not peer_channel.can_accept_conn(): - raise PeeringDegreeReached() - - # Register a duplex connection for its own use - self_channel.add_conn(inbound_conn, outbound_conn) - # Register a duplex connection for the peer - peer_channel.add_conn(outbound_conn, inbound_conn) + connect_nodes(self.broadcast, peer.broadcast, inbound_conn, outbound_conn) async def send_message(self, msg: bytes): """ @@ -148,3 +130,21 @@ class Node: self.config.mix_path_length, ) await self.nomssip.gossip(sphinx_packet.bytes()) + + +def connect_nodes( + self_channel: Gossip, + peer_channel: Gossip, + inbound_conn: SimplexConnection, + outbound_conn: SimplexConnection, +): + """ + Establish a duplex connection with a peer node. + """ + if not self_channel.can_accept_conn() or not peer_channel.can_accept_conn(): + raise PeeringDegreeReached() + + # Register a duplex connection for its own use + self_channel.add_conn(inbound_conn, outbound_conn) + # Register a duplex connection for the peer + peer_channel.add_conn(outbound_conn, inbound_conn) diff --git a/mixnet/protocol/nomssip.py b/mixnet/protocol/nomssip.py index b5e08f6..ed7ee87 100644 --- a/mixnet/protocol/nomssip.py +++ b/mixnet/protocol/nomssip.py @@ -22,6 +22,9 @@ class NomssipConfig(GossipConfig): transmission_rate_per_sec: int msg_size: int temporal_mix: TemporalMixConfig + # OPTIMIZATION ONLY FOR EXPERIMENTS WITHOUT BANDWIDTH MEASUREMENT + # If True, skip sending a noise even if it's time to send one. + skip_sending_noise: bool = False class Nomssip(Gossip): @@ -52,6 +55,7 @@ class Nomssip(Gossip): self.config.transmission_rate_per_sec, noise_packet, self.config.temporal_mix, + self.config.skip_sending_noise, ), ) @@ -72,7 +76,7 @@ class Nomssip(Gossip): Gossip a message to all connected peers with prepending a message flag """ # The message size must be fixed. - assert len(msg) == self.config.msg_size + assert len(msg) == self.config.msg_size, f"{len(msg)} != {self.config.msg_size}" packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg) await self.__gossip_flagged_packet(packet) diff --git a/mixnet/protocol/temporalmix.py b/mixnet/protocol/temporalmix.py index 6775528..4d5e75d 100644 --- a/mixnet/protocol/temporalmix.py +++ b/mixnet/protocol/temporalmix.py @@ -111,12 +111,17 @@ class MixQueue(Queue[T]): class MinSizeMixQueue(MixQueue[T]): def __init__(self, min_pool_size: int, rng: random.Random, noise_msg: T): super().__init__(rng, noise_msg) + # Initialize the queue with noise messages + # to ensure that the queue size is at least `min_pool_size`. + self._queue = [self._noise_msg] * min_pool_size self._mix_pool_size = min_pool_size @abstractmethod async def get(self) -> T: - while len(self._queue) < self._mix_pool_size: - self._queue.append(self._noise_msg) + if len(self._queue) < self._mix_pool_size: + self._queue.extend( + [self._noise_msg] * (self._mix_pool_size - len(self._queue)) + ) # Subclass must implement this method pass diff --git a/mixnet/queuesim/config.py b/mixnet/queuesim/config.py new file mode 100644 index 0000000..c1b41d5 --- /dev/null +++ b/mixnet/queuesim/config.py @@ -0,0 +1,17 @@ +import random +from dataclasses import dataclass + +from protocol.nomssip import NomssipConfig +from sim.config import LatencyConfig, TopologyConfig + + +@dataclass +class Config: + num_nodes: int + nomssip: NomssipConfig + topology: TopologyConfig + latency: LatencyConfig + num_sent_msgs: int + msg_interval_sec: float + num_senders: int + sender_generator: random.Random diff --git a/mixnet/queuesim/node.py b/mixnet/queuesim/node.py new file mode 100644 index 0000000..ee4495a --- /dev/null +++ b/mixnet/queuesim/node.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import Awaitable, Callable + +from framework.framework import Framework +from protocol.connection import SimplexConnection +from protocol.node import connect_nodes +from protocol.nomssip import Nomssip, NomssipConfig + + +class Node: + def __init__( + self, + framework: Framework, + nomssip_config: NomssipConfig, + msg_handler: Callable[[bytes], Awaitable[None]], + ): + self.nomssip = Nomssip(framework, nomssip_config, msg_handler) + + def connect( + self, + peer: Node, + inbound_conn: SimplexConnection, + outbound_conn: SimplexConnection, + ): + connect_nodes(self.nomssip, peer.nomssip, inbound_conn, outbound_conn) + + async def send_message(self, msg: bytes): + """ + Send the message via Nomos Gossip to all connected peers. + """ + await self.nomssip.gossip(msg) diff --git a/mixnet/queuesim/paramset.py b/mixnet/queuesim/paramset.py new file mode 100644 index 0000000..a591f93 --- /dev/null +++ b/mixnet/queuesim/paramset.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +import itertools +from dataclasses import dataclass +from enum import Enum + +from protocol.temporalmix import TemporalMixType +from queuesim.config import Config + + +class ExperimentID(Enum): + EXPERIMENT_1 = 1 + EXPERIMENT_2 = 2 + EXPERIMENT_3 = 3 + EXPERIMENT_4 = 4 + + +class SessionID(Enum): + SESSION_1 = 1 + SESSION_2 = 2 + + +EXPERIMENT_TITLES: dict[ExperimentID, str] = { + ExperimentID.EXPERIMENT_1: "Single Sender - Single Message", + ExperimentID.EXPERIMENT_2: "Single Sender - Multiple Messages", + ExperimentID.EXPERIMENT_3: "Multiple Senders - Single Message", + ExperimentID.EXPERIMENT_4: "Multiple Senders - Multiple Messages", +} + + +@dataclass +class ParameterSet: + num_nodes: int + peering_degree: int + min_queue_size: int + transmission_rate: int + num_sent_msgs: int + num_senders: int + queue_type: TemporalMixType + num_iterations: int + + def apply_to(self, cfg: Config) -> None: + cfg.num_nodes = self.num_nodes + cfg.nomssip.peering_degree = self.peering_degree + cfg.nomssip.temporal_mix.min_queue_size = self.min_queue_size + cfg.nomssip.transmission_rate_per_sec = self.transmission_rate + cfg.num_sent_msgs = self.num_sent_msgs + cfg.num_senders = self.num_senders + cfg.nomssip.temporal_mix.mix_type = self.queue_type + + +def build_parameter_sets( + exp_id: ExperimentID, session_id: SessionID, queue_type: TemporalMixType +) -> list[ParameterSet]: + match session_id: + case SessionID.SESSION_1: + return __build_session_1_parameter_sets(exp_id, queue_type) + case SessionID.SESSION_2: + return __build_session_2_parameter_sets(exp_id, queue_type) + case _: + raise ValueError(f"Unknown session ID: {session_id}") + + +def __build_session_1_parameter_sets( + exp_id: ExperimentID, queue_type: TemporalMixType +) -> list[ParameterSet]: + sets: list[ParameterSet] = [] + + for num_nodes in [20, 40, 80]: + peering_degree_list = [num_nodes // 5, num_nodes // 4, num_nodes // 2] + min_queue_size_list = [num_nodes // 2, num_nodes, num_nodes * 2] + transmission_rate_list = [num_nodes // 2, num_nodes, num_nodes * 2] + num_sent_msgs_list = [8, 16, 32] + num_senders_list = [num_nodes // 10, num_nodes // 5, num_nodes // 2] + num_iterations = num_nodes // 2 + + match exp_id: + case ExperimentID.EXPERIMENT_1: + for ( + peering_degree, + min_queue_size, + transmission_rate, + ) in itertools.product( + peering_degree_list, + min_queue_size_list, + transmission_rate_list, + ): + sets.append( + ParameterSet( + num_nodes=num_nodes, + peering_degree=peering_degree, + min_queue_size=min_queue_size, + transmission_rate=transmission_rate, + num_sent_msgs=1, + num_senders=1, + queue_type=queue_type, + num_iterations=num_iterations, + ) + ) + case ExperimentID.EXPERIMENT_2: + for ( + peering_degree, + min_queue_size, + transmission_rate, + num_sent_msgs, + ) in itertools.product( + peering_degree_list, + min_queue_size_list, + transmission_rate_list, + num_sent_msgs_list, + ): + sets.append( + ParameterSet( + num_nodes=num_nodes, + peering_degree=peering_degree, + min_queue_size=min_queue_size, + transmission_rate=transmission_rate, + num_sent_msgs=num_sent_msgs, + num_senders=1, + queue_type=queue_type, + num_iterations=num_iterations, + ) + ) + case ExperimentID.EXPERIMENT_3: + for ( + peering_degree, + min_queue_size, + transmission_rate, + num_senders, + ) in itertools.product( + peering_degree_list, + min_queue_size_list, + transmission_rate_list, + num_senders_list, + ): + sets.append( + ParameterSet( + num_nodes=num_nodes, + peering_degree=peering_degree, + min_queue_size=min_queue_size, + transmission_rate=transmission_rate, + num_sent_msgs=1, + num_senders=num_senders, + queue_type=queue_type, + num_iterations=num_iterations, + ) + ) + case ExperimentID.EXPERIMENT_4: + for ( + peering_degree, + min_queue_size, + transmission_rate, + num_sent_msgs, + num_senders, + ) in itertools.product( + peering_degree_list, + min_queue_size_list, + transmission_rate_list, + num_sent_msgs_list, + num_senders_list, + ): + sets.append( + ParameterSet( + num_nodes=num_nodes, + peering_degree=peering_degree, + min_queue_size=min_queue_size, + transmission_rate=transmission_rate, + num_sent_msgs=num_sent_msgs, + num_senders=num_senders, + queue_type=queue_type, + num_iterations=num_iterations, + ) + ) + case _: + raise ValueError(f"Unknown experiment ID: {exp_id}") + + return sets + + +def __build_session_2_parameter_sets( + exp_id: ExperimentID, queue_type: TemporalMixType +) -> list[ParameterSet]: + sets: list[ParameterSet] = [] + + for num_nodes in [100, 1000, 10000]: + peering_degree_list = [4, 8, 16] + min_queue_size_list = [10, 50, 100] + transmission_rate_list = [1, 10, 100] + num_iterations = 20 + + match exp_id: + case ExperimentID.EXPERIMENT_1: + for ( + peering_degree, + min_queue_size, + transmission_rate, + ) in itertools.product( + peering_degree_list, + min_queue_size_list, + transmission_rate_list, + ): + sets.append( + ParameterSet( + num_nodes=num_nodes, + peering_degree=peering_degree, + min_queue_size=min_queue_size, + transmission_rate=transmission_rate, + num_sent_msgs=1, + num_senders=1, + queue_type=queue_type, + num_iterations=num_iterations, + ) + ) + case _: + raise NotImplementedError( + f"Experiment {exp_id} not implemented for session 2" + ) + + return sets diff --git a/mixnet/queuesim/plot.py b/mixnet/queuesim/plot.py new file mode 100644 index 0000000..55ae965 --- /dev/null +++ b/mixnet/queuesim/plot.py @@ -0,0 +1,133 @@ +import os + +import matplotlib.pyplot as plt +import pandas as pd +import seaborn as sns + +from queuesim.paramset import ExperimentID, SessionID + +BOXPLOT_VALUE_VARS = [ + "dtime_min", + "dtime_25%", + "dtime_50%", + "dtime_mean", + "dtime_75%", + "dtime_max", +] + +PARAM_SET = [ + "num_nodes", + "peering_degree", + "min_queue_size", + "transmission_rate", + "num_sent_msgs", + "num_senders", +] + + +def draw_plots( + df: pd.DataFrame, exp_id: ExperimentID, session_id: SessionID, outdir: str +): + assert os.path.exists(outdir) + __overview_by_queue_type(df, exp_id, session_id, f"{outdir}/plot_overview.png") + num_nodes = int(df["num_nodes"].min()) + for param in ["peering_degree", "min_queue_size", "transmission_rate"]: + __impact_of_param_by_queue_type( + df, exp_id, session_id, num_nodes, param, f"{outdir}/plot_{param}.png" + ) + + if exp_id == ExperimentID(2) or exp_id == ExperimentID(4): + __impact_of_param_by_queue_type( + df, + exp_id, + session_id, + num_nodes, + "num_sent_msgs", + f"{outdir}/plot_num_sent_msgs.png", + ) + + if exp_id == ExperimentID(3) or exp_id == ExperimentID(4): + __impact_of_param_by_queue_type( + df, + exp_id, + session_id, + num_nodes, + "num_senders", + f"{outdir}/plot_num_senders.png", + ) + + +def __param_set_legend(row): + legend = "" + for i, param in enumerate(PARAM_SET): + if i > 0: + legend += ", " + legend += f"{param}:{row[param]}" + return legend + + +def __overview_by_queue_type( + df: pd.DataFrame, + exp_id: ExperimentID, + session_id: SessionID, + out_path: str, +): + df = df.drop_duplicates(subset=["num_nodes", "queue_type"]) + print(df) + __draw_plot_by_queue_type( + df, f"{exp_id.name}: {session_id.name}: Overview", out_path + ) + + +def __impact_of_param_by_queue_type( + df: pd.DataFrame, + exp_id: ExperimentID, + session_id: SessionID, + num_nodes: int, + param: str, + out_path: str, +): + df = pd.DataFrame(df[df["num_nodes"] == num_nodes]) + df = df.drop_duplicates(subset=[param, "queue_type"]) + print(df) + __draw_plot_by_queue_type( + df, + f"{exp_id.name}: {session_id.name}: Impact of {param} ({num_nodes} nodes)", + out_path, + ) + + +def __draw_plot_by_queue_type(df: pd.DataFrame, title: str, out_path: str): + # Add a column that will be used as a legend + df["parameter_set"] = df.apply(__param_set_legend, axis=1) + + # Prepare DataFrame in long format for seaborn boxplot + long_format_df = pd.melt( + df, + id_vars=["queue_type", "parameter_set"], + value_vars=BOXPLOT_VALUE_VARS, + var_name="dtime_metric", + value_name="dtime", + ) + + # Plotting + plt.figure(figsize=(15, 10)) + sns.boxplot(data=long_format_df, x="queue_type", y="dtime", hue="parameter_set") + plt.title(title) + plt.xlabel("Queue Type") + plt.ylabel("Dissemination Time") + plt.legend(loc="upper right", ncol=1) + + # Adding vertical grid lines between x elements + plt.grid(axis="y") + plt.gca().set_xticks( + [i - 0.5 for i in range(1, len(df["queue_type"].unique()))], minor=True + ) + plt.grid(which="minor", axis="x", linestyle="--") + + plt.tight_layout() + + # Save the plot as a PNG file + assert not os.path.exists(out_path) + plt.savefig(out_path) + plt.draw() diff --git a/mixnet/queuesim/queuesim.py b/mixnet/queuesim/queuesim.py new file mode 100644 index 0000000..b753a10 --- /dev/null +++ b/mixnet/queuesim/queuesim.py @@ -0,0 +1,254 @@ +import concurrent.futures +import itertools +import os +import random +import time +from collections import defaultdict +from copy import deepcopy +from datetime import datetime, timedelta + +import pandas as pd +import usim + +from protocol.nomssip import NomssipConfig +from protocol.temporalmix import TemporalMixConfig, TemporalMixType +from queuesim.config import Config +from queuesim.paramset import ( + EXPERIMENT_TITLES, + ExperimentID, + ParameterSet, + SessionID, + build_parameter_sets, +) +from queuesim.simulation import Simulation +from sim.config import LatencyConfig, TopologyConfig + +DEFAULT_CONFIG = Config( + num_nodes=10, + nomssip=NomssipConfig( + peering_degree=3, + transmission_rate_per_sec=10, + msg_size=8, + temporal_mix=TemporalMixConfig( + mix_type=TemporalMixType.NONE, + min_queue_size=10, + seed_generator=random.Random(0), + ), + skip_sending_noise=True, + ), + topology=TopologyConfig( + seed=random.Random(0), + ), + latency=LatencyConfig( + min_latency_sec=0, + max_latency_sec=0, + seed=random.Random(0), + ), + num_sent_msgs=1, + msg_interval_sec=0.1, + num_senders=1, + sender_generator=random.Random(0), +) + + +RESULT_COLUMNS = [ + "paramset", + "num_nodes", + "peering_degree", + "min_queue_size", + "transmission_rate", + "num_sent_msgs", + "num_senders", + "queue_type", + "num_iterations", + "dtime_count", + "dtime_mean", + "dtime_std", + "dtime_min", + "dtime_25%", + "dtime_50%", + "dtime_75%", + "dtime_max", +] + + +def run_session( + exp_id: ExperimentID, + session_id: SessionID, + queue_type: TemporalMixType, + outdir: str, + from_paramset: int = 1, +): + print("******************************************************************") + print(f"{exp_id.name}: {session_id.name}: {EXPERIMENT_TITLES[exp_id]}") + print(f"Queue type: {queue_type.name}") + print("******************************************************************") + + # Create a directory and initialize a CSV file only with a header + assert os.path.isdir(outdir) + subdir = f"__WIP__queuesim_e{exp_id.value}s{session_id.value}_{queue_type.name}_{datetime.now().isoformat()}___DUR__" + os.makedirs(f"{outdir}/{subdir}") + session_result_path = f"{outdir}/{subdir}/session.csv" + assert not os.path.exists(session_result_path) + pd.DataFrame(columns=pd.Series(RESULT_COLUMNS)).to_csv( + session_result_path, index=False + ) + print(f"Initialized a CSV file: {session_result_path}") + + # Prepare all parameter sets of the session + paramsets = build_parameter_sets(exp_id, session_id, queue_type) + assert 1 <= from_paramset <= len(paramsets) + + # Run the simulations for each parameter set, using multi processes + session_start_time = time.time() + future_map: dict[ + concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int] + ] = dict() + with concurrent.futures.ProcessPoolExecutor() as executor: + # Submit all iterations of all parameter sets to the ProcessPoolExecutor + for paramset_idx, paramset in enumerate(paramsets): + paramset_id = paramset_idx + 1 + if paramset_id < from_paramset: + continue + future_map.update(__submit_iterations(paramset_id, paramset, executor)) + + # Collect results of each iteration + paramset_results: dict[int, tuple[set[int], list[float]]] = defaultdict( + lambda: (set(), []) + ) + paramsets_done: set[int] = set() + for future in concurrent.futures.as_completed(future_map): + paramset_id, paramset, iter_idx = future_map[future] + paramset_results[paramset_id][0].add(iter_idx) + paramset_results[paramset_id][1].extend(future.result()) + # If all iterations of the paramset are done, process the results + if len(paramset_results[paramset_id][0]) == paramset.num_iterations: + paramsets_done.add(paramset_id) + print("================================================") + print(f"ParamSet-{paramset_id} is done. Processing results...") + print( + f"Total {len(paramsets_done)+(from_paramset-1)}/{len(paramsets)} paramsets have been done so far." + ) + print("------------------------------------------------") + __process_paramset_result( + paramset_id, + paramset, + paramset_results[paramset_id][1], + session_result_path, + f"{outdir}/{subdir}/paramset_{paramset_id}.csv", + ) + print("================================================") + + session_elapsed_time = time.time() - session_start_time + session_elapsed_time_str = __format_elapsed_time(session_elapsed_time) + + # Load the completed session CSV file, sort rows by paramset_id, + # and overrite the file with the sorted rows. + pd.read_csv(session_result_path).sort_values(by="paramset").to_csv( + session_result_path, index=False + ) + + # Rename the WIP directory to the final name + new_subdir = subdir.replace("__WIP__", "").replace( + "__DUR__", session_elapsed_time_str + ) + assert not os.path.exists(f"{outdir}/{new_subdir}") + os.rename(f"{outdir}/{subdir}", f"{outdir}/{new_subdir}") + + print("******************************************************************") + print(f"Session Elapsed Time: {session_elapsed_time_str}") + print(f"Renamed the WIP directory to {outdir}/{new_subdir}") + print("******************************************************************") + + +def __submit_iterations( + paramset_id: int, + paramset: ParameterSet, + executor: concurrent.futures.ProcessPoolExecutor, +) -> dict[concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int]]: + """ + Submit all iterations of the given parameter set to the executor, + so that they can be ran by the ProcessPoolExecutor. + """ + # Prepare the configuration for the parameter set + cfg = deepcopy(DEFAULT_CONFIG) + paramset.apply_to(cfg) + + print( + f"Scheduling {paramset.num_iterations} iterations for the paramset:{paramset_id}" + ) + + future_map: dict[ + concurrent.futures.Future[list[float]], tuple[int, ParameterSet, int] + ] = dict() + for i in range(paramset.num_iterations): + # Update seeds for the current iteration + # Deepcopy the cfg to avoid the same cfg instance between iteration jobs. + iter_cfg = deepcopy(cfg) + iter_cfg.nomssip.temporal_mix.seed_generator = random.Random(i) + iter_cfg.topology.seed = random.Random(i) + iter_cfg.latency.seed = random.Random(i) + iter_cfg.sender_generator = random.Random(i) + # Submit the iteration to the executor + future = executor.submit(__run_iteration, iter_cfg) + future_map[future] = (paramset_id, paramset, i) + + return future_map + + +def __run_iteration(cfg: Config) -> list[float]: + """ + Run a single iteration of a certain parameter set. + The iteration uses the independent uSim instance. + """ + sim = Simulation(cfg) + usim.run(sim.run()) + return sim.dissemination_times + + +def __process_paramset_result( + paramset_id: int, + paramset: ParameterSet, + dissemination_times: list[float], + session_result_path: str, + paramset_result_path: str, +): + """ + Convert the result into a pd.Series, store the Series into a CSV file, + and append the summary of Series to the session CSV file. + """ + series = pd.Series(dissemination_times) + stats = series.describe() + result = { + "paramset": paramset_id, + "num_nodes": paramset.num_nodes, + "peering_degree": paramset.peering_degree, + "min_queue_size": paramset.min_queue_size, + "transmission_rate": paramset.transmission_rate, + "num_sent_msgs": paramset.num_sent_msgs, + "num_senders": paramset.num_senders, + "queue_type": paramset.queue_type.name, + "num_iterations": paramset.num_iterations, + "dtime_count": stats["count"], + "dtime_mean": stats["mean"], + "dtime_std": stats["std"], + "dtime_min": stats["min"], + "dtime_25%": stats["25%"], + "dtime_50%": stats["50%"], + "dtime_75%": stats["75%"], + "dtime_max": stats["max"], + } + assert result.keys() == set(RESULT_COLUMNS) + pd.DataFrame([result]).to_csv( + session_result_path, mode="a", header=False, index=False + ) + print(f"Appended a row to {session_result_path}") + series.to_csv(paramset_result_path, header=False, index=False) + print(f"Stored the dissemination times to {paramset_result_path}") + + +def __format_elapsed_time(elapsed_time: float) -> str: + td = timedelta(seconds=elapsed_time) + hours, reminder = divmod(td.seconds, 3600) + minutes, seconds = divmod(reminder, 60) + return f"{td.days}d{hours:02}h{minutes:02}m{seconds:02}s" diff --git a/mixnet/queuesim/simulation.py b/mixnet/queuesim/simulation.py new file mode 100644 index 0000000..d40fd1d --- /dev/null +++ b/mixnet/queuesim/simulation.py @@ -0,0 +1,149 @@ +import random +import struct +from dataclasses import dataclass +from random import Random +from typing import Counter, Self + +import usim + +from framework.framework import Queue +from framework.usim import Framework +from protocol.connection import LocalSimplexConnection, SimplexConnection +from protocol.nomssip import NomssipConfig +from protocol.temporalmix import TemporalMixConfig, TemporalMixType +from queuesim.config import Config +from queuesim.node import Node +from sim.config import LatencyConfig +from sim.connection import RemoteSimplexConnection +from sim.topology import build_full_random_topology + + +class Simulation: + """ + Manages the entire cycle of simulation: initialization, running, and analysis. + """ + + def __init__(self, config: Config): + self.config = config + + async def run(self): + async with usim.Scope() as scope: + self.framework = Framework(scope) + self.message_builder = MessageBuilder(self.framework) + self.dissemination_times = await self.__run() + self.framework.stop_tasks() + + async def __run(self) -> list[float]: + self.received_msg_queue: Queue[tuple[float, bytes]] = self.framework.queue() + + # Run and connect nodes + nodes = self.__run_nodes() + self.__connect_nodes(nodes) + + # Choose and start senders + senders = self.config.sender_generator.sample(nodes, k=self.config.num_senders) + for sender in senders: + self.framework.spawn(self.__run_sender(sender)) + + # To count how many nodes have received each message + received_msg_counters: Counter[bytes] = Counter() + # To collect the dissemination times of each message. + dissemination_times: list[float] = [] + # Wait until all messages are disseminated to the entire network. + while ( + len(dissemination_times) + < self.config.num_sent_msgs * self.config.num_senders + ): + # Wait until a node notifies that it has received a new message. + received_time, msg = await self.received_msg_queue.get() + # If the message has been received by all nodes, calculate the dissemination time. + received_msg_counters.update([msg]) + if received_msg_counters[msg] == len(nodes): + dissemination_times.append( + received_time - Message.from_bytes(msg).sent_time + ) + return dissemination_times + + def __run_nodes(self) -> list[Node]: + return [ + Node( + self.framework, + self.config.nomssip, + self.__process_msg, + ) + for _ in range(self.config.num_nodes) + ] + + async def __process_msg(self, msg: bytes) -> None: + """ + A handler to process messages received via Nomos Gossip channel + """ + # Notify that a new message has been received by the node. + # The received time is also included in the notification. + await self.received_msg_queue.put((self.framework.now(), msg)) + + def __connect_nodes(self, nodes: list[Node]): + topology = build_full_random_topology( + rng=self.config.topology.seed, + num_nodes=len(nodes), + peering_degree=self.config.nomssip.peering_degree, + ) + # Sort the topology by node index for the connection RULE defined below. + for node_idx, peer_indices in sorted(topology.items()): + for peer_idx in peer_indices: + # Since the topology is undirected, we only need to connect the two nodes once. + # RULE: the node with the smaller index establishes the connection. + assert node_idx != peer_idx + if node_idx > peer_idx: + continue + + # Connect the node and peer for Nomos Gossip + node = nodes[node_idx] + peer = nodes[peer_idx] + node.connect(peer, self.__create_conn(), self.__create_conn()) + + def __create_conn(self) -> SimplexConnection: + # If latency is always zero, use the local connection which is the lightest. + if ( + self.config.latency.min_latency_sec + == self.config.latency.max_latency_sec + == 0 + ): + return LocalSimplexConnection(self.framework) + else: + return RemoteSimplexConnection( + self.config.latency, + self.framework, + ) + + async def __run_sender(self, sender: Node): + for i in range(self.config.num_sent_msgs): + if i > 0: + await self.framework.sleep(self.config.msg_interval_sec) + msg = bytes(self.message_builder.next()) + await sender.send_message(msg) + + +@dataclass +class Message: + id: int + sent_time: float + + def __bytes__(self) -> bytes: + return struct.pack("if", self.id, self.sent_time) + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + id, sent_from = struct.unpack("if", data) + return cls(id, sent_from) + + +class MessageBuilder: + def __init__(self, framework: Framework): + self.framework = framework + self.next_id = 0 + + def next(self) -> Message: + msg = Message(self.next_id, self.framework.now()) + self.next_id += 1 + return msg diff --git a/mixnet/queuesim/test_paramset.py b/mixnet/queuesim/test_paramset.py new file mode 100644 index 0000000..3931662 --- /dev/null +++ b/mixnet/queuesim/test_paramset.py @@ -0,0 +1,88 @@ +import random +from copy import deepcopy +from unittest import TestCase + +from protocol.nomssip import NomssipConfig +from protocol.temporalmix import TemporalMixConfig, TemporalMixType +from queuesim.config import Config +from queuesim.paramset import ( + ExperimentID, + ParameterSet, + SessionID, + build_parameter_sets, +) +from sim.config import LatencyConfig, TopologyConfig + + +class TestParameterSet(TestCase): + def test_apply_to_config(self): + paramset = ParameterSet( + num_nodes=10000, + peering_degree=20000, + min_queue_size=30000, + transmission_rate=40000, + num_sent_msgs=50000, + num_senders=60000, + queue_type=TemporalMixType.NOISY_COIN_FLIPPING, + num_iterations=70000, + ) + config = deepcopy(SAMPLE_CONFIG) + paramset.apply_to(config) + self.assertEqual(paramset.num_nodes, config.num_nodes) + self.assertEqual(paramset.peering_degree, config.nomssip.peering_degree) + self.assertEqual( + paramset.min_queue_size, config.nomssip.temporal_mix.min_queue_size + ) + self.assertEqual( + paramset.transmission_rate, config.nomssip.transmission_rate_per_sec + ) + self.assertEqual(paramset.num_sent_msgs, config.num_sent_msgs) + self.assertEqual(paramset.num_senders, config.num_senders) + self.assertEqual(paramset.queue_type, config.nomssip.temporal_mix.mix_type) + + def test_build_parameter_sets(self): + cases = { + (ExperimentID.EXPERIMENT_1, SessionID.SESSION_1): pow(3, 4), + (ExperimentID.EXPERIMENT_2, SessionID.SESSION_1): pow(3, 5), + (ExperimentID.EXPERIMENT_3, SessionID.SESSION_1): pow(3, 5), + (ExperimentID.EXPERIMENT_4, SessionID.SESSION_1): pow(3, 6), + (ExperimentID.EXPERIMENT_1, SessionID.SESSION_2): pow(3, 4), + } + for queue_type in TemporalMixType: + for (exp_id, session_id), expected_cnt in cases.items(): + sets = build_parameter_sets(exp_id, session_id, queue_type) + self.assertEqual(expected_cnt, len(sets), f"{exp_id}: {session_id}") + # Check if all parameter sets are unique + self.assertEqual( + len(sets), + len(set(list(map(str, sets)))), + f"{exp_id}: {session_id}", + ) + + +SAMPLE_CONFIG = Config( + num_nodes=10, + nomssip=NomssipConfig( + peering_degree=3, + transmission_rate_per_sec=10, + msg_size=8, + temporal_mix=TemporalMixConfig( + mix_type=TemporalMixType.NONE, + min_queue_size=10, + seed_generator=random.Random(0), + ), + skip_sending_noise=True, + ), + topology=TopologyConfig( + seed=random.Random(0), + ), + latency=LatencyConfig( + min_latency_sec=0, + max_latency_sec=0, + seed=random.Random(0), + ), + num_sent_msgs=1, + msg_interval_sec=0.1, + num_senders=1, + sender_generator=random.Random(0), +) diff --git a/mixnet/requirements.txt b/mixnet/requirements.txt index 25753ba..29a7243 100644 --- a/mixnet/requirements.txt +++ b/mixnet/requirements.txt @@ -3,4 +3,5 @@ pysphinx==0.0.5 dacite==1.8.1 pandas==2.2.2 matplotlib==3.9.1 +seaborn==0.13.2 PyYAML==6.0.1 diff --git a/mixnet/config.ci.yaml b/mixnet/sim/config.ci.yaml similarity index 100% rename from mixnet/config.ci.yaml rename to mixnet/sim/config.ci.yaml diff --git a/mixnet/sim/connection.py b/mixnet/sim/connection.py index 6a8911c..1296c63 100644 --- a/mixnet/sim/connection.py +++ b/mixnet/sim/connection.py @@ -1,4 +1,5 @@ import math +from abc import abstractmethod from collections import Counter from typing import Awaitable @@ -11,17 +12,12 @@ from sim.config import LatencyConfig, NetworkConfig from sim.state import NodeState -class MeteredRemoteSimplexConnection(SimplexConnection): +class RemoteSimplexConnection(SimplexConnection): """ - A simplex connection implementation that simulates network latency and measures bandwidth usages. + A simplex connection implementation that simulates network latency. """ - def __init__( - self, - config: LatencyConfig, - framework: Framework, - meter_start_time: float, - ): + def __init__(self, config: LatencyConfig, framework: Framework): self.framework = framework # A connection has a random constant latency self.latency = config.random_latency() @@ -32,10 +28,6 @@ class MeteredRemoteSimplexConnection(SimplexConnection): self.relayer = framework.spawn(self.__run_relayer()) # A queue where a receiver gets messages self.recv_queue: Queue[bytes] = framework.queue() - # To measure bandwidth usages - self.meter_start_time = meter_start_time - self.send_meters: list[int] = [] - self.recv_meters: list[int] = [] async def send(self, data: bytes) -> None: await self.send_queue.put((self.framework.now(), data)) @@ -57,17 +49,45 @@ class MeteredRemoteSimplexConnection(SimplexConnection): await self.framework.sleep(delay) # Relay msg to the recv_queue. - # Update related statistics before msg is read from recv_queue by the receiver + # Call on_receiving (e.g. for updating stats) before msg is read from recv_queue by the receiver # because the time at which enters the node is important when viewed from the outside. self.on_receiving(data) await self.recv_queue.put(data) + def on_sending(self, data: bytes) -> None: + # Should be overridden by subclass + pass + + def on_receiving(self, data: bytes) -> None: + # Should be overridden by subclass + pass + + +class MeteredRemoteSimplexConnection(RemoteSimplexConnection): + """ + An extension of RemoteSimplexConnection that measures bandwidth usages. + """ + + def __init__( + self, + config: LatencyConfig, + framework: Framework, + meter_start_time: float, + ): + super().__init__(config, framework) + # To measure bandwidth usages + self.meter_start_time = meter_start_time + self.send_meters: list[int] = [] + self.recv_meters: list[int] = [] + + @override def on_sending(self, data: bytes) -> None: """ Update statistics when sending a message """ self.__update_meter(self.send_meters, len(data)) + @override def on_receiving(self, data: bytes) -> None: """ Update statistics when receiving a message diff --git a/mixnet/sim/test_topology.py b/mixnet/sim/test_topology.py index 0a4e3cd..4f8d9a1 100644 --- a/mixnet/sim/test_topology.py +++ b/mixnet/sim/test_topology.py @@ -6,8 +6,8 @@ from sim.topology import are_all_nodes_connected, build_full_random_topology class TestTopology(TestCase): def test_full_random(self): - num_nodes = 100 - peering_degree = 6 + num_nodes = 10000 + peering_degree = 16 topology = build_full_random_topology( random.Random(0), num_nodes, peering_degree ) diff --git a/mixnet/sim/topology.py b/mixnet/sim/topology.py index a1cbc13..b6167f2 100644 --- a/mixnet/sim/topology.py +++ b/mixnet/sim/topology.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import random from collections import defaultdict @@ -43,16 +45,18 @@ def build_full_random_topology( def are_all_nodes_connected(topology: Topology) -> bool: - visited = set() - - def dfs(topology: Topology, node: int) -> None: - if node in visited: - return - visited.add(node) - for peer in topology[node]: - dfs(topology, peer) - - # Start DFS from the first node - dfs(topology, next(iter(topology))) - + visited = dfs(topology, next(iter(topology))) return len(visited) == len(topology) + + +def dfs(topology: Topology, start_node: int) -> set[int]: + visited: set[int] = set() + stack = [start_node] + + while stack: + node = stack.pop() + if node not in visited: + visited.add(node) + stack.extend(peer for peer in topology[node] if peer not in visited) + + return visited