fix: original sender counts
This commit is contained in:
parent
7dc003456d
commit
e1421977ea
|
@ -1,5 +1,4 @@
|
||||||
import random
|
from collections import Counter
|
||||||
from collections import defaultdict, Counter
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -102,8 +101,9 @@ class Analysis:
|
||||||
dataframes = []
|
dataframes = []
|
||||||
for i, msgs_in_node in enumerate(self.sim.p2p.adversary.msgs_in_node_per_window):
|
for i, msgs_in_node in enumerate(self.sim.p2p.adversary.msgs_in_node_per_window):
|
||||||
time = i * self.config.adversary.io_window_moving_interval
|
time = i * self.config.adversary.io_window_moving_interval
|
||||||
df = pd.DataFrame([(time, node.id, msg_cnt, len(senders)) for node, (msg_cnt, senders) in msgs_in_node.items()],
|
df = pd.DataFrame(
|
||||||
columns=["time", "node_id", "msg_cnt", "sender_cnt"])
|
[(time, node.id, msg_cnt, len(senders)) for node, (msg_cnt, senders) in msgs_in_node.items()],
|
||||||
|
columns=["time", "node_id", "msg_cnt", "sender_cnt"])
|
||||||
if not df.empty:
|
if not df.empty:
|
||||||
dataframes.append(df)
|
dataframes.append(df)
|
||||||
df = pd.concat(dataframes, ignore_index=True)
|
df = pd.concat(dataframes, ignore_index=True)
|
||||||
|
@ -244,7 +244,11 @@ class Analysis:
|
||||||
_, senders = self.sim.p2p.adversary.msgs_in_node_per_window[starting_window][starting_node]
|
_, senders = self.sim.p2p.adversary.msgs_in_node_per_window[starting_window][starting_node]
|
||||||
nodes_per_hop = [Counter(senders)]
|
nodes_per_hop = [Counter(senders)]
|
||||||
|
|
||||||
MAX_HOPS = 4 * 8
|
if self.config.p2p.type == self.config.p2p.TYPE_ONE_TO_ALL:
|
||||||
|
MAX_HOPS = 1 + self.config.mixnet.num_mix_layers
|
||||||
|
else:
|
||||||
|
MAX_HOPS = (1 + self.config.mixnet.num_mix_layers) * 8
|
||||||
|
|
||||||
for window in range(starting_window - 1, 0, -1):
|
for window in range(starting_window - 1, 0, -1):
|
||||||
if len(nodes_per_hop) >= MAX_HOPS:
|
if len(nodes_per_hop) >= MAX_HOPS:
|
||||||
break
|
break
|
||||||
|
|
|
@ -9,6 +9,7 @@ from cryptography.hazmat.primitives import serialization
|
||||||
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
|
||||||
|
|
||||||
from config import Config
|
from config import Config
|
||||||
|
from measurement import Measurement
|
||||||
from sphinx import SphinxPacket, Attachment
|
from sphinx import SphinxPacket, Attachment
|
||||||
from p2p import P2P
|
from p2p import P2P
|
||||||
|
|
||||||
|
@ -17,7 +18,7 @@ class Node:
|
||||||
INCENTIVE_TX_SIZE = 512
|
INCENTIVE_TX_SIZE = 512
|
||||||
PADDING_SEPARATOR = b'\x01'
|
PADDING_SEPARATOR = b'\x01'
|
||||||
|
|
||||||
def __init__(self, id: int, env: simpy.Environment, p2p: P2P, config: Config):
|
def __init__(self, id: int, env: simpy.Environment, p2p: P2P, config: Config, measurement: Measurement):
|
||||||
self.id = id
|
self.id = id
|
||||||
self.env = env
|
self.env = env
|
||||||
self.p2p = p2p
|
self.p2p = p2p
|
||||||
|
@ -25,6 +26,7 @@ class Node:
|
||||||
self.public_key = self.private_key.public_key()
|
self.public_key = self.private_key.public_key()
|
||||||
self.config = config
|
self.config = config
|
||||||
self.payload_id = 0
|
self.payload_id = 0
|
||||||
|
self.measurement = measurement
|
||||||
self.action = self.env.process(self.send_message())
|
self.action = self.env.process(self.send_message())
|
||||||
|
|
||||||
def send_message(self):
|
def send_message(self):
|
||||||
|
@ -37,6 +39,8 @@ class Node:
|
||||||
message_type = self.message_type_to_send()
|
message_type = self.message_type_to_send()
|
||||||
if message_type is None: # nothing to send in this turn
|
if message_type is None: # nothing to send in this turn
|
||||||
continue
|
continue
|
||||||
|
elif message_type == MessageType.REAL:
|
||||||
|
self.measurement.count_original_sender(self)
|
||||||
|
|
||||||
msg = self.create_message(message_type)
|
msg = self.create_message(message_type)
|
||||||
prep_time = random.uniform(0, self.config.mixnet.max_message_prep_time)
|
prep_time = random.uniform(0, self.config.mixnet.max_message_prep_time)
|
||||||
|
|
|
@ -35,16 +35,14 @@ class P2P(ABC):
|
||||||
# This should accept only bytes in practice,
|
# This should accept only bytes in practice,
|
||||||
# but we accept SphinxPacket as well because we don't implement Sphinx deserialization.
|
# but we accept SphinxPacket as well because we don't implement Sphinx deserialization.
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0):
|
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops_traveled: int = 0):
|
||||||
# Yield 0 to ensure that the broadcast is done in the same time step.
|
# Yield 0 to ensure that the broadcast is done in the same time step.
|
||||||
# Without any yield, SimPy complains that the broadcast func is not a generator.
|
# Without any yield, SimPy complains that the broadcast func is not a generator.
|
||||||
yield self.env.timeout(0)
|
yield self.env.timeout(0)
|
||||||
|
|
||||||
def send(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node", is_first_of_msg: bool):
|
def send(self, msg: SphinxPacket | bytes, hops_traveled: int, sender: "Node", receiver: "Node",
|
||||||
if hops == 0:
|
is_first_of_broadcasting: bool):
|
||||||
self.measurement.count_original_sender(sender)
|
if is_first_of_broadcasting:
|
||||||
|
|
||||||
if is_first_of_msg:
|
|
||||||
self.adversary.inspect_message_size(msg)
|
self.adversary.inspect_message_size(msg)
|
||||||
self.adversary.observe_sending_node(sender, receiver)
|
self.adversary.observe_sending_node(sender, receiver)
|
||||||
self.measurement.measure_egress(sender, msg)
|
self.measurement.measure_egress(sender, msg)
|
||||||
|
@ -54,10 +52,10 @@ class P2P(ABC):
|
||||||
|
|
||||||
self.measurement.measure_ingress(receiver, msg)
|
self.measurement.measure_ingress(receiver, msg)
|
||||||
self.adversary.observe_receiving_node(sender, receiver)
|
self.adversary.observe_receiving_node(sender, receiver)
|
||||||
self.receive(msg, hops, sender, receiver)
|
self.receive(msg, hops_traveled + 1, sender, receiver)
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"):
|
def receive(self, msg: SphinxPacket | bytes, hops_traveled: int, sender: "Node", receiver: "Node"):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def log(self, msg):
|
def log(self, msg):
|
||||||
|
@ -71,14 +69,14 @@ class NaiveBroadcastP2P(P2P):
|
||||||
|
|
||||||
# This should accept only bytes in practice,
|
# This should accept only bytes in practice,
|
||||||
# but we accept SphinxPacket as well because we don't implement Sphinx deserialization.
|
# but we accept SphinxPacket as well because we don't implement Sphinx deserialization.
|
||||||
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0):
|
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops_traveled: int = 0):
|
||||||
yield from super().broadcast(sender, msg)
|
yield from super().broadcast(sender, msg)
|
||||||
|
|
||||||
self.log(f"Node:{sender.id}: Broadcasting a msg: {len(msg)} bytes")
|
self.log(f"Node:{sender.id}: Broadcasting a msg: {len(msg)} bytes")
|
||||||
for i, receiver in enumerate(self.nodes):
|
for i, receiver in enumerate(self.nodes):
|
||||||
self.env.process(self.send(msg, hops, sender, receiver, i == 0))
|
self.env.process(self.send(msg, 0, sender, receiver, i == 0))
|
||||||
|
|
||||||
def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"):
|
def receive(self, msg: SphinxPacket | bytes, hops_traveled: int, sender: "Node", receiver: "Node"):
|
||||||
self.env.process(receiver.receive_message(msg))
|
self.env.process(receiver.receive_message(msg))
|
||||||
|
|
||||||
|
|
||||||
|
@ -109,7 +107,7 @@ class GossipP2P(P2P):
|
||||||
conns.add(neighbor)
|
conns.add(neighbor)
|
||||||
self.topology[node] = conns
|
self.topology[node] = conns
|
||||||
|
|
||||||
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops: int = 0):
|
def broadcast(self, sender: "Node", msg: SphinxPacket | bytes, hops_traveled: int = 0):
|
||||||
yield from super().broadcast(sender, msg)
|
yield from super().broadcast(sender, msg)
|
||||||
self.log(f"Node:{sender.id}: Gossiping a msg: {len(msg)} bytes")
|
self.log(f"Node:{sender.id}: Gossiping a msg: {len(msg)} bytes")
|
||||||
|
|
||||||
|
@ -123,17 +121,17 @@ class GossipP2P(P2P):
|
||||||
# Don't gossip the message if it was received from the node who is going to be the receiver,
|
# Don't gossip the message if it was received from the node who is going to be the receiver,
|
||||||
# which means that the node already knows the message.
|
# which means that the node already knows the message.
|
||||||
if receiver != self.message_cache[sender][msg_hash]:
|
if receiver != self.message_cache[sender][msg_hash]:
|
||||||
self.env.process(self.send(msg, hops, sender, receiver, cnt == 0))
|
self.env.process(self.send(msg, hops_traveled, sender, receiver, cnt == 0))
|
||||||
cnt += 1
|
cnt += 1
|
||||||
|
|
||||||
def receive(self, msg: SphinxPacket | bytes, hops: int, sender: "Node", receiver: "Node"):
|
def receive(self, msg: SphinxPacket | bytes, hops_traveled: int, sender: "Node", receiver: "Node"):
|
||||||
# Receive/gossip the msg only if it hasn't been received before. If not, just ignore the msg.
|
# Receive/gossip the msg only if it hasn't been received before. If not, just ignore the msg.
|
||||||
# i.e. each message is received/gossiped at most once by each node.
|
# i.e. each message is received/gossiped at most once by each node.
|
||||||
msg_hash = hashlib.sha256(bytes(msg)).digest()
|
msg_hash = hashlib.sha256(bytes(msg)).digest()
|
||||||
if msg_hash not in self.message_cache[receiver]:
|
if msg_hash not in self.message_cache[receiver]:
|
||||||
self.message_cache[receiver][msg_hash] = sender
|
self.message_cache[receiver][msg_hash] = sender
|
||||||
|
self.measurement.update_message_hops(msg_hash, hops_traveled)
|
||||||
|
|
||||||
# Receive and gossip
|
# Receive and gossip
|
||||||
self.env.process(receiver.receive_message(msg))
|
self.env.process(receiver.receive_message(msg))
|
||||||
hops += 1
|
self.env.process(self.broadcast(receiver, msg, hops_traveled))
|
||||||
self.measurement.update_message_hops(msg_hash, hops)
|
|
||||||
self.env.process(self.broadcast(receiver, msg, hops))
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ class Simulation:
|
||||||
self.config = config
|
self.config = config
|
||||||
self.env = simpy.Environment()
|
self.env = simpy.Environment()
|
||||||
self.p2p = Simulation.init_p2p(self.env, config)
|
self.p2p = Simulation.init_p2p(self.env, config)
|
||||||
nodes = [Node(i, self.env, self.p2p, config) for i in range(config.mixnet.num_nodes)]
|
nodes = [Node(i, self.env, self.p2p, config, self.p2p.measurement) for i in range(config.mixnet.num_nodes)]
|
||||||
self.p2p.set_nodes(nodes)
|
self.p2p.set_nodes(nodes)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
Loading…
Reference in New Issue