add hops measurement

This commit is contained in:
Youngjoon Lee 2024-05-30 18:36:56 +09:00
parent 2d04ba6fa4
commit e303186ac6
No known key found for this signature in database
GPG Key ID: 09B750B5BD6F08A2
3 changed files with 28 additions and 12 deletions

View File

@ -18,6 +18,7 @@ class Analysis:
self.messages_emitted_around_interval()
self.messages_in_node_over_time()
# self.node_states()
self.message_hops()
def bandwidth(self, message_size_df: pd.DataFrame):
dataframes = []
@ -148,3 +149,11 @@ class Analysis:
plt.ylabel("Node ID")
plt.legend(title="state")
plt.show()
def message_hops(self):
df = pd.DataFrame(self.sim.p2p.measurement.message_hops.values(), columns=["hops"])
print(df.describe())
plt.figure(figsize=(6, 6))
seaborn.boxplot(data=df, y="hops", medianprops={"color": "red", "linewidth": 2.5})
plt.title("Message hops distribution")
plt.show()

View File

@ -17,6 +17,7 @@ class Measurement:
self.config = config
self.egress_bandwidth_per_time = []
self.ingress_bandwidth_per_time = []
self.message_hops = defaultdict(int) # dict[msg_hash, hops]
self.env.process(self._update_bandwidth_window())
@ -26,6 +27,9 @@ class Measurement:
def measure_ingress(self, node: "Node", msg: SphinxPacket | bytes):
self.ingress_bandwidth_per_time[-1][node] += len(msg)
def update_message_hops(self, msg_hash: bytes, hops: int):
self.message_hops[msg_hash] = max(hops, self.message_hops[msg_hash])
def _update_bandwidth_window(self):
while True:
self.ingress_bandwidth_per_time.append(defaultdict(int))

View File

@ -34,12 +34,12 @@ class P2P(ABC):
# This should accept only bytes in practice,
# but we accept SphinxPacket as well because we don't implement Sphinx deserialization.
@abstractmethod
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes):
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0):
# Yield 0 to ensure that the broadcast is done in the same time step.
# Without any yield, SimPy complains that the broadcast func is not a generator.
yield self.env.timeout(0)
def send(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node", is_first_of_msg: bool):
def send(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node", is_first_of_msg: bool):
if is_first_of_msg:
self.adversary.inspect_message_size(msg)
self.adversary.observe_sending_node(sender, receiver)
@ -50,10 +50,10 @@ class P2P(ABC):
self.measurement.measure_ingress(receiver, msg)
self.adversary.observe_receiving_node(sender, receiver)
self.receive(msg, sender, receiver)
self.receive(msg, hops, sender, receiver)
@abstractmethod
def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"):
def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"):
pass
def log(self, msg):
@ -67,14 +67,14 @@ class NaiveBroadcastP2P(P2P):
# This should accept only bytes in practice,
# but we accept SphinxPacket as well because we don't implement Sphinx deserialization.
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes):
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0):
yield from super().broadcast(sender, msg)
self.log(f"Node:{sender.id}: Broadcasting a msg: {len(msg)} bytes")
for i, receiver in enumerate(self.nodes):
self.env.process(self.send(msg, sender, receiver, i == 0))
self.env.process(self.send(msg, hops, sender, receiver, i == 0))
def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"):
def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"):
self.env.process(receiver.receive_message(msg))
@ -82,7 +82,7 @@ class GossipP2P(P2P):
def __init__(self, env: simpy.Environment, config: Config):
super().__init__(env, config)
self.topology = defaultdict(set)
self.message_cache = defaultdict(dict)
self.message_cache = defaultdict(dict) # dict[receiver, dict[msg_hash, sender]]
def set_nodes(self, nodes: list["Node"]):
super().set_nodes(nodes)
@ -105,7 +105,7 @@ class GossipP2P(P2P):
conns.add(neighbor)
self.topology[node] = conns
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes):
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0):
yield from super().broadcast(sender, msg)
self.log(f"Node:{sender.id}: Gossiping a msg: {len(msg)} bytes")
@ -119,15 +119,18 @@ class GossipP2P(P2P):
# Don't gossip the message if it was received from the node who is going to be the receiver,
# which means that the node already knows the message.
if receiver != self.message_cache[sender][msg_hash]:
self.env.process(self.send(msg, sender, receiver, cnt == 0))
self.env.process(self.send(msg, hops, sender, receiver, cnt == 0))
cnt += 1
def receive(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node"):
def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"):
# Receive/gossip the msg only if it hasn't been received before. If not, just ignore the msg.
# i.e. each message is received/gossiped at most once by each node.
msg_hash = hashlib.sha256(bytes(msg)).digest()
if msg_hash not in self.message_cache[receiver]:
self.message_cache[receiver][msg_hash] = sender
hops += 1
self.measurement.update_message_hops(msg_hash, hops)
# Receive and gossip
self.env.process(receiver.receive_message(msg))
self.env.process(self.broadcast(receiver, msg))
self.env.process(self.broadcast(receiver, msg, hops))