From a6d12dba6486fce376ec8cae421a48af61c0aaed Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Mon, 10 Jun 2024 16:16:27 +0900 Subject: [PATCH] fix window updates --- mixnet/v2/sim/adversary.py | 32 ++++++++++++++++++-------------- mixnet/v2/sim/analysis.py | 17 +++++++++-------- mixnet/v2/sim/config.py | 4 ++-- mixnet/v2/sim/config.yaml | 2 +- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/mixnet/v2/sim/adversary.py b/mixnet/v2/sim/adversary.py index 14927a4..6284246 100644 --- a/mixnet/v2/sim/adversary.py +++ b/mixnet/v2/sim/adversary.py @@ -20,8 +20,10 @@ class Adversary: self.config = config self.message_sizes = [] self.senders_around_interval = Counter() - self.io_windows = [] # dict[receiver, (deque[time_received], set[sender]))] - self.io_windows.append(defaultdict(lambda: (deque(), set()))) + self.msg_pools_per_window = [] # list[dict[receiver, deque[time_received])]] + self.msg_pools_per_window.append(defaultdict(lambda: deque())) + self.msgs_received_per_window = [] # list[dict[receiver, set[sender])]] + self.msgs_received_per_window.append(defaultdict(set)) self.final_msgs_received = defaultdict(dict) # dict[receiver, dict[window, sender]] # self.node_states = defaultdict(dict) @@ -31,19 +33,18 @@ class Adversary: self.message_sizes.append(len(msg)) def observe_receiving_node(self, sender: "Node", receiver: "Node", msg: SphinxPacket | bytes): - msg_queue, senders = self.io_windows[-1][receiver] - msg_queue.append(self.env.now) - senders.add(sender) + self.msg_pools_per_window[-1][receiver].append(self.env.now) + self.msgs_received_per_window[-1][receiver].add(sender) if receiver.operated_by_adversary and not isinstance(msg, SphinxPacket): - self.final_msgs_received[receiver][len(self.io_windows) - 1] = sender + self.final_msgs_received[receiver][len(self.msg_pools_per_window) - 1] = sender # if node not in self.node_states[self.env.now]: # self.node_states[self.env.now][node] = NodeState.RECEIVING def observe_sending_node(self, sender: "Node"): - msg_queue, _ = self.io_windows[-1][sender] - if len(msg_queue) > 0: + msg_pool = self.msg_pools_per_window[-1][sender] + if len(msg_pool) > 0: # Adversary doesn't know which message in the pool is being emitted. So, pop the oldest one from the pool. - msg_queue.popleft() + msg_pool.popleft() if self.is_around_message_interval(self.env.now): self.senders_around_interval.update({sender}) # self.node_states[self.env.now][node] = NodeState.SENDING @@ -53,14 +54,17 @@ class Adversary: def update_observation_window(self): while True: - yield self.env.timeout(self.config.adversary.io_window_size) - new_window = defaultdict(lambda: (deque(), set())) - for receiver, (msg_queue, _) in self.io_windows[-1].items(): + yield self.env.timeout(self.config.adversary.window_size) + + self.msgs_received_per_window.append(defaultdict(set)) + + new_msg_pool = defaultdict(lambda: deque()) + for receiver, msg_queue in self.msg_pools_per_window[-1].items(): for time_received in msg_queue: # If the message is likely to be still pending and be emitted soon, pass it on to the next window. if self.env.now - time_received < self.config.mixnet.max_mix_delay: - new_window[receiver][0].append(time_received) - self.io_windows.append(new_window) + new_msg_pool[receiver][0].append(time_received) + self.msg_pools_per_window.append(new_msg_pool) class NodeState(Enum): diff --git a/mixnet/v2/sim/analysis.py b/mixnet/v2/sim/analysis.py index 438255e..57264bf 100644 --- a/mixnet/v2/sim/analysis.py +++ b/mixnet/v2/sim/analysis.py @@ -121,12 +121,13 @@ class Analysis: def messages_in_node_over_time(self): dataframes = [] - for i, io_window in enumerate(self.sim.p2p.adversary.io_windows): - time = i * self.config.adversary.io_window_size - df = pd.DataFrame( - [(time, receiver.id, len(msg_queue), len(senders)) for receiver, (msg_queue, senders) in - io_window.items()], - columns=[COL_TIME, COL_NODE_ID, COL_MSG_CNT, COL_SENDER_CNT]) + for window, msg_pools in enumerate(self.sim.p2p.adversary.msg_pools_per_window): + time = window * self.config.adversary.window_size + data = [] + for receiver, msg_pool in msg_pools.items(): + senders = self.sim.p2p.adversary.msgs_received_per_window[window][receiver] + data.append((time, receiver.id, len(msg_pool), len(senders))) + df = pd.DataFrame(data, columns=[COL_TIME, COL_NODE_ID, COL_MSG_CNT, COL_SENDER_CNT]) if not df.empty: dataframes.append(df) df = pd.concat(dataframes, ignore_index=True) @@ -260,7 +261,7 @@ class Analysis: if sender is not None: senders = {sender} else: - _, senders = self.sim.p2p.adversary.io_windows[window][receiver] + senders = self.sim.p2p.adversary.msgs_received_per_window[window][receiver] # If the remaining_hops is 1, return the senders as suspected senders if remaining_hops == 1: @@ -273,7 +274,7 @@ class Analysis: for sender in senders: # Track back to each window where that sender might have received any messages. time_range = self.config.mixnet.max_mix_delay + self.config.p2p.max_network_latency - window_range = int(time_range / self.config.adversary.io_window_size) + window_range = int(time_range / self.config.adversary.window_size) for prev_window in range(window - 1, window - 1 - window_range, -1): if prev_window < 0: break diff --git a/mixnet/v2/sim/config.py b/mixnet/v2/sim/config.py index 1125544..bf34b4c 100644 --- a/mixnet/v2/sim/config.py +++ b/mixnet/v2/sim/config.py @@ -144,7 +144,7 @@ class MeasurementConfig: @dataclass class AdversaryConfig: # A time window for the adversary to observe inputs and outputs of each node - io_window_size: float + window_size: float def validate(self): - assert self.io_window_size > 0 + assert self.window_size > 0 diff --git a/mixnet/v2/sim/config.yaml b/mixnet/v2/sim/config.yaml index 154b1fa..22558ce 100644 --- a/mixnet/v2/sim/config.yaml +++ b/mixnet/v2/sim/config.yaml @@ -42,4 +42,4 @@ measurement: adversary: # A time window for the adversary to observe inputs and outputs of each node # Recommendation: Same as `p2p.min_network_latency` - io_window_size: 0.10 \ No newline at end of file + window_size: 0.10 \ No newline at end of file