nomos-specs/mixnet/test_node.py

118 lines
4.7 KiB
Python

import asyncio
from datetime import datetime
from unittest import IsolatedAsyncioTestCase
import numpy
from pysphinx.sphinx import SphinxPacket
from mixnet.node import MixNode, NodeAddress, PacketQueue
from mixnet.packet import PacketBuilder
from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec
from mixnet.test_utils import (
init_mixnet_config,
with_test_timeout,
)
class TestMixNodeRunner(IsolatedAsyncioTestCase):
@with_test_timeout(180)
async def test_mixnode_emission_rate(self):
"""
Test if MixNodeRunner works as a M/M/inf queue.
If inputs are arrived at Poisson rate `lambda`,
and if processing is delayed according to an exponential distribution with a rate `mu`,
the rate of outputs should be `lambda`.
"""
config = init_mixnet_config()
config.mixclient_config.emission_rate_per_min = 120 # lambda (= 2msg/sec)
config.mixnode_config.delay_rate_per_min = 30 # mu (= 2s delay on average)
packet, route = PacketBuilder.build_real_packets(
b"msg", config.mixclient_config.topology
)[0]
# Start only the first mix node for testing
config.mixnode_config.encryption_private_key = route[0].encryption_private_key
mixnode = await MixNode.new(config.mixnode_config)
try:
# Send packets to the first mix node in a Poisson distribution
packet_count = 100
# This queue is just for counting how many packets have been sent so far.
sent_packet_queue: PacketQueue = asyncio.Queue()
sender_task = asyncio.create_task(
self.send_packets(
mixnode.inbound_socket,
packet,
route[0].addr,
packet_count,
config.mixclient_config.emission_rate_per_min,
sent_packet_queue,
)
)
try:
# Calculate intervals between outputs and gather num_jobs in the first mix node.
intervals = []
num_jobs = []
ts = datetime.now()
for _ in range(packet_count):
_ = await mixnode.outbound_socket.get()
now = datetime.now()
intervals.append((now - ts).total_seconds())
# Calculate the current # of jobs staying in the mix node
num_packets_emitted_from_mixnode = len(intervals)
num_packets_sent_to_mixnode = sent_packet_queue.qsize()
num_jobs.append(
num_packets_sent_to_mixnode - num_packets_emitted_from_mixnode
)
ts = now
# Remove the first interval that would be much larger than other intervals,
# because of the delay in mix node.
intervals = intervals[1:]
num_jobs = num_jobs[1:]
# Check if the emission rate of the first mix node is the same as
# the emission rate of the message sender, but with a delay.
# If outputs follow the Poisson distribution with a rate `lambda`,
# a mean interval between outputs must be `1/lambda`.
self.assertAlmostEqual(
float(numpy.mean(intervals)),
poisson_mean_interval_sec(
config.mixclient_config.emission_rate_per_min
),
delta=1.0,
)
# If runner is a M/M/inf queue,
# a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`.
self.assertAlmostEqual(
float(numpy.mean(num_jobs)),
round(
config.mixclient_config.emission_rate_per_min
/ config.mixnode_config.delay_rate_per_min
),
delta=1.5,
)
finally:
await sender_task
finally:
await mixnode.cancel()
@staticmethod
async def send_packets(
inbound_socket: PacketQueue,
packet: SphinxPacket,
node_addr: NodeAddress,
cnt: int,
rate_per_min: int,
# For testing purpose, to inform the caller how many packets have been sent to the inbound_socket
sent_packet_queue: PacketQueue,
):
for _ in range(cnt):
# Since the task is not heavy, just sleep for seconds instead of using emission_notifier
await asyncio.sleep(poisson_interval_sec(rate_per_min))
await inbound_socket.put((node_addr, packet))
await sent_packet_queue.put((node_addr, packet))