From e303186ac66259dcc3a7677d06a5d3115d339b87 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 30 May 2024 18:36:56 +0900 Subject: [PATCH] add hops measurement --- mixnet/v2/sim/analysis.py | 9 +++++++++ mixnet/v2/sim/measurement.py | 4 ++++ mixnet/v2/sim/p2p.py | 27 +++++++++++++++------------ 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/mixnet/v2/sim/analysis.py b/mixnet/v2/sim/analysis.py index aea7301..24f697c 100644 --- a/mixnet/v2/sim/analysis.py +++ b/mixnet/v2/sim/analysis.py @@ -18,6 +18,7 @@ class Analysis: self.messages_emitted_around_interval() self.messages_in_node_over_time() # self.node_states() + self.message_hops() def bandwidth(self, message_size_df: pd.DataFrame): dataframes = [] @@ -148,3 +149,11 @@ class Analysis: plt.ylabel("Node ID") plt.legend(title="state") plt.show() + + def message_hops(self): + df = pd.DataFrame(self.sim.p2p.measurement.message_hops.values(), columns=["hops"]) + print(df.describe()) + plt.figure(figsize=(6, 6)) + seaborn.boxplot(data=df, y="hops", medianprops={"color": "red", "linewidth": 2.5}) + plt.title("Message hops distribution") + plt.show() diff --git a/mixnet/v2/sim/measurement.py b/mixnet/v2/sim/measurement.py index fa9a641..b5cf642 100644 --- a/mixnet/v2/sim/measurement.py +++ b/mixnet/v2/sim/measurement.py @@ -17,6 +17,7 @@ class Measurement: self.config = config self.egress_bandwidth_per_time = [] self.ingress_bandwidth_per_time = [] + self.message_hops = defaultdict(int) # dict[msg_hash, hops] self.env.process(self._update_bandwidth_window()) @@ -26,6 +27,9 @@ class Measurement: def measure_ingress(self, node: "Node", msg: SphinxPacket | bytes): self.ingress_bandwidth_per_time[-1][node] += len(msg) + def update_message_hops(self, msg_hash: bytes, hops: int): + self.message_hops[msg_hash] = max(hops, self.message_hops[msg_hash]) + def _update_bandwidth_window(self): while True: self.ingress_bandwidth_per_time.append(defaultdict(int)) diff --git a/mixnet/v2/sim/p2p.py b/mixnet/v2/sim/p2p.py index 4b4e59a..79ddddf 100644 --- a/mixnet/v2/sim/p2p.py +++ b/mixnet/v2/sim/p2p.py @@ -34,12 +34,12 @@ class P2P(ABC): # This should accept only bytes in practice, # but we accept SphinxPacket as well because we don't implement Sphinx deserialization. @abstractmethod - def broadcast(self, sender: "Node", msg: SphinxPacket | bytes): + def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0): # Yield 0 to ensure that the broadcast is done in the same time step. # Without any yield, SimPy complains that the broadcast func is not a generator. yield self.env.timeout(0) - def send(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node", is_first_of_msg: bool): + def send(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node", is_first_of_msg: bool): if is_first_of_msg: self.adversary.inspect_message_size(msg) self.adversary.observe_sending_node(sender, receiver) @@ -50,10 +50,10 @@ class P2P(ABC): self.measurement.measure_ingress(receiver, msg) self.adversary.observe_receiving_node(sender, receiver) - self.receive(msg, sender, receiver) + self.receive(msg, hops, sender, receiver) @abstractmethod - def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): + def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"): pass def log(self, msg): @@ -67,14 +67,14 @@ class NaiveBroadcastP2P(P2P): # This should accept only bytes in practice, # but we accept SphinxPacket as well because we don't implement Sphinx deserialization. - def broadcast(self, sender: "Node", msg: SphinxPacket | bytes): + def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0): yield from super().broadcast(sender, msg) self.log(f"Node:{sender.id}: Broadcasting a msg: {len(msg)} bytes") for i, receiver in enumerate(self.nodes): - self.env.process(self.send(msg, sender, receiver, i == 0)) + self.env.process(self.send(msg, hops, sender, receiver, i == 0)) - def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): + def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"): self.env.process(receiver.receive_message(msg)) @@ -82,7 +82,7 @@ class GossipP2P(P2P): def __init__(self, env: simpy.Environment, config: Config): super().__init__(env, config) self.topology = defaultdict(set) - self.message_cache = defaultdict(dict) + self.message_cache = defaultdict(dict) # dict[receiver, dict[msg_hash, sender]] def set_nodes(self, nodes: list["Node"]): super().set_nodes(nodes) @@ -105,7 +105,7 @@ class GossipP2P(P2P): conns.add(neighbor) self.topology[node] = conns - def broadcast(self, sender: "Node", msg: SphinxPacket | bytes): + def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0): yield from super().broadcast(sender, msg) self.log(f"Node:{sender.id}: Gossiping a msg: {len(msg)} bytes") @@ -119,15 +119,18 @@ class GossipP2P(P2P): # Don't gossip the message if it was received from the node who is going to be the receiver, # which means that the node already knows the message. if receiver != self.message_cache[sender][msg_hash]: - self.env.process(self.send(msg, sender, receiver, cnt == 0)) + self.env.process(self.send(msg, hops, sender, receiver, cnt == 0)) cnt += 1 - def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): + def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"): # Receive/gossip the msg only if it hasn't been received before. If not, just ignore the msg. # i.e. each message is received/gossiped at most once by each node. msg_hash = hashlib.sha256(bytes(msg)).digest() if msg_hash not in self.message_cache[receiver]: self.message_cache[receiver][msg_hash] = sender + hops += 1 + self.measurement.update_message_hops(msg_hash, hops) + # Receive and gossip self.env.process(receiver.receive_message(msg)) - self.env.process(self.broadcast(receiver, msg)) + self.env.process(self.broadcast(receiver, msg, hops))