analyze the diversity of message senders over time

This commit is contained in:
Youngjoon Lee 2024-05-28 11:44:38 +09:00
parent 55f0f67554
commit ba22e3529a
No known key found for this signature in database
GPG Key ID: 09B750B5BD6F08A2
4 changed files with 31 additions and 16 deletions

View File

@ -21,7 +21,7 @@ class Adversary:
self.config = config
self.message_sizes = []
self.senders_around_interval = defaultdict(int)
self.msgs_in_node_per_window = [] # [<node, int>]
self.msgs_in_node_per_window = [] # [<node, (int, int)>]: The 1st int is msg count, the 2nd int is sender count
self.cur_window_per_node = defaultdict(lambda: deque()) # <node, [(time, int)]>: int is + or -.
# self.node_states = defaultdict(dict)
@ -30,15 +30,15 @@ class Adversary:
def inspect_message_size(self, msg: SphinxPacket | bytes):
self.message_sizes.append(len(msg))
def observe_receiving_node(self, node: "Node"):
self.cur_window_per_node[node].append((self.env.now, 1))
def observe_receiving_node(self, sender: "Node", receiver: "Node"):
self.cur_window_per_node[receiver].append((self.env.now, 1, sender))
# if node not in self.node_states[self.env.now]:
# self.node_states[self.env.now][node] = NodeState.RECEIVING
def observe_sending_node(self, node: "Node"):
self.cur_window_per_node[node].append((self.env.now, -1))
def observe_sending_node(self, sender: "Node", receiver: "Node"):
self.cur_window_per_node[sender].append((self.env.now, -1, receiver))
if self.is_around_message_interval(self.env.now):
self.senders_around_interval[node] += 1
self.senders_around_interval[sender] += 1
# self.node_states[self.env.now][node] = NodeState.SENDING
def is_around_message_interval(self, time: SimTime):
@ -49,17 +49,20 @@ class Adversary:
while True:
yield self.env.timeout(self.config.adversary.io_window_moving_interval)
self.msgs_in_node_per_window.append(defaultdict(int)) # <node, int>
self.msgs_in_node_per_window.append(defaultdict(lambda: (0, 0))) # <node, (int, int)>
for node, queue in self.cur_window_per_node.items():
msg_cnt = 0.0
senders = set()
# Pop old events that are out of the new window, and accumulate msg_cnt
while queue and queue[0][0] < self.env.now - self.config.adversary.io_window_size:
_, delta = queue.popleft()
_, delta, sender = queue.popleft()
msg_cnt += delta
senders.add(sender)
# Iterate remaining events that will remain in the new window, and accumulate msg_cnt
for _, delta in queue:
for _, delta, sender in queue:
msg_cnt += delta
self.msgs_in_node_per_window[-1][node] = msg_cnt
senders.add(sender)
self.msgs_in_node_per_window[-1][node] = (msg_cnt, len(senders))
class NodeState(Enum):

View File

@ -91,12 +91,12 @@ class Analysis:
dataframes = []
for i, msgs_in_node in enumerate(self.sim.p2p.adversary.msgs_in_node_per_window):
time = i * self.config.adversary.io_window_moving_interval
df = pd.DataFrame([(time, node.id, cnt) for node, cnt in msgs_in_node.items()],
columns=["time", "node_id", "msg_count"])
df = pd.DataFrame([(time, node.id, msg_cnt, sender_cnt) for node, (msg_cnt, sender_cnt) in msgs_in_node.items()],
columns=["time", "node_id", "msg_cnt", "sender_cnt"])
if not df.empty:
dataframes.append(df)
df = pd.concat(dataframes, ignore_index=True)
df_pivot = df.pivot(index="time", columns="node_id", values="msg_count")
df_pivot = df.pivot(index="time", columns="node_id", values="msg_cnt")
plt.figure(figsize=(12, 6))
for column in df_pivot.columns:
plt.plot(df_pivot.index, df_pivot[column], marker=None, label=column)
@ -108,6 +108,18 @@ class Analysis:
plt.tight_layout()
plt.show()
df_pivot = df.pivot(index="time", columns="node_id", values="sender_cnt")
plt.figure(figsize=(12, 6))
for column in df_pivot.columns:
plt.plot(df_pivot.index, df_pivot[column], marker=None, label=column)
plt.title("Senders of messages in each node over time")
plt.xlabel("Time")
plt.ylabel("Sender Count")
plt.ylim(bottom=0)
plt.grid(True)
plt.tight_layout()
plt.show()
def node_states(self):
rows = []
for time, node_states in self.sim.p2p.adversary.node_states.items():

View File

@ -6,7 +6,7 @@ mixnet:
num_nodes: 100
# A number of mix nodes selected by a message sender through which the Sphinx message goes through
# If 0, the message is broadcast directly to all nodes without being Sphinx-encoded.
num_mix_layers: 0
num_mix_layers: 4
# A size of a message payload in bytes (e.g. the size of a block proposal)
payload_size: 320
# An interval of sending a new real/cover message

View File

@ -42,14 +42,14 @@ class P2P(ABC):
def send(self, msg: SphinxPacket | bytes, sender: "Node", receiver: "Node", is_first_of_msg: bool):
if is_first_of_msg:
self.adversary.inspect_message_size(msg)
self.adversary.observe_sending_node(sender)
self.adversary.observe_sending_node(sender, receiver)
self.measurement.measure_egress(sender, msg)
# simulate network latency
yield self.env.timeout(random.uniform(0, self.config.p2p.max_network_latency))
self.measurement.measure_ingress(receiver, msg)
self.adversary.observe_receiving_node(receiver)
self.adversary.observe_receiving_node(sender, receiver)
self.receive(msg, sender, receiver)
@abstractmethod