diff --git a/mixnet/v2/sim/p2p.py b/mixnet/v2/sim/p2p.py index 3a44e95..365584d 100644 --- a/mixnet/v2/sim/p2p.py +++ b/mixnet/v2/sim/p2p.py @@ -35,20 +35,14 @@ class P2P(ABC): # but we accept SphinxPacket as well because we don't implement Sphinx deserialization. @abstractmethod def broadcast(self, sender: "Node", msg: SphinxPacket | bytes): - # Adversary - self.adversary.inspect_message_size(msg) - self.adversary.observe_outgoing_message(sender) # Yield 0 to ensure that the broadcast is done in the same time step. # Without any yield, SimPy complains that the broadcast func is not a generator. yield self.env.timeout(0) @abstractmethod - def send(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): + def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): # simulate network latency yield self.env.timeout(random.uniform(0, self.config.p2p.max_network_latency)) - # Measurement and adversary - self.measurement.measure_ingress(receiver, msg) - self.adversary.observe_incoming_message(receiver) def log(self, msg): print(f"t={self.env.now:.3f}: P2P: {msg}") @@ -64,12 +58,25 @@ class NaiveBroadcastP2P(P2P): def broadcast(self, sender: "Node", msg: SphinxPacket | bytes): yield from super().broadcast(sender, msg) self.log(f"Node:{sender.id}: Broadcasting a msg: {len(msg)} bytes") + + cnt = 0 for receiver in self.nodes: self.measurement.measure_egress(sender, msg) - self.env.process(self.send(msg, sender, receiver)) + self.env.process(self.receive(msg, sender, receiver)) + cnt += 1 + + if cnt > 0: + # Adversary + self.adversary.inspect_message_size(msg) + self.adversary.observe_outgoing_message(sender) + + def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): + yield from super().receive(msg, sender, receiver) + + # Measurement and adversary + self.measurement.measure_ingress(receiver, msg) + self.adversary.observe_incoming_message(receiver) - def send(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): - yield from super().send(msg, sender, receiver) self.env.process(receiver.receive_message(msg)) @@ -109,19 +116,33 @@ class GossipP2P(P2P): if msg_hash not in self.message_cache[sender]: self.message_cache[sender][msg_hash] = sender + cnt = 0 for receiver in self.topology[sender]: # Don't gossip the message if it was received from the node who is going to be the receiver, # which means that the node already knows the message. if receiver != self.message_cache[sender][msg_hash]: self.measurement.measure_egress(sender, msg) - self.env.process(self.send(msg, sender, receiver)) + self.env.process(self.receive(msg, sender, receiver)) + cnt += 1 - def send(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): - yield from super().send(msg, sender, receiver) - # Receive/gossip the msg only if it hasn't been received before. + if cnt > 0: + # Adversary + self.adversary.inspect_message_size(msg) + self.adversary.observe_outgoing_message(sender) + + def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"): + yield from super().receive(msg, sender, receiver) + + # Receive/gossip the msg only if it hasn't been received before. If not, just ignore the msg. # i.e. each message is received/gossiped at most once by each node. msg_hash = hashlib.sha256(bytes(msg)).digest() if msg_hash not in self.message_cache[receiver]: self.message_cache[receiver][msg_hash] = sender + + # Measurement and adversary + self.measurement.measure_ingress(receiver, msg) + self.adversary.observe_incoming_message(receiver) + + # Receive and gossip self.env.process(receiver.receive_message(msg)) self.env.process(self.broadcast(receiver, msg))