diff --git a/mixnet/client.py b/mixnet/client.py new file mode 100644 index 0000000..cab2b55 --- /dev/null +++ b/mixnet/client.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import queue +import time +from datetime import datetime, timedelta +from threading import Thread + +from mixnet.mixnet import Mixnet, MixnetTopology +from mixnet.node import PacketQueue +from mixnet.packet import PacketBuilder +from mixnet.poisson import poisson_interval_sec + + +class MixClientRunner(Thread): + """ + Emit packets at the Poisson emission_rate_per_min. + + If a real packet is scheduled to be sent, this thread sends the real packet to the mixnet, + and schedules redundant real packets to be emitted in the next turns. + + If no real packet is not scheduled, this thread emits a cover packet according to the emission_rate_per_min. + """ + + def __init__( + self, + mixnet: Mixnet, + topology: MixnetTopology, + emission_rate_per_min: int, # Poisson rate parameter: lambda in the spec + redundancy: int, # b in the spec + real_packet_queue: PacketQueue, + outbound_socket: PacketQueue, + ): + super().__init__() + self.mixnet = mixnet + self.topology = topology + self.emission_rate_per_min = emission_rate_per_min + self.redundancy = redundancy + self.real_packet_queue = real_packet_queue + self.redundant_real_packet_queue: PacketQueue = queue.Queue() + self.outbound_socket = outbound_socket + + def run(self) -> None: + # Here in Python, this thread is implemented in synchronous manner. + # In the real implementation, consider implementing this in asynchronous if possible. + + next_emission_ts = datetime.now() + timedelta( + seconds=poisson_interval_sec(self.emission_rate_per_min) + ) + + while True: + time.sleep(1 / 1000) + + if datetime.now() < next_emission_ts: + continue + + next_emission_ts += timedelta( + seconds=poisson_interval_sec(self.emission_rate_per_min) + ) + + if not self.redundant_real_packet_queue.empty(): + addr, packet = self.redundant_real_packet_queue.get() + self.outbound_socket.put((addr, packet)) + continue + + if not self.real_packet_queue.empty(): + addr, packet = self.real_packet_queue.get() + # Schedule redundant real packets + for _ in range(self.redundancy - 1): + self.redundant_real_packet_queue.put((addr, packet)) + self.outbound_socket.put((addr, packet)) + + packet, route = PacketBuilder.drop_cover( + b"drop cover", self.mixnet, self.topology + ).next() + self.outbound_socket.put((route[0].addr, packet)) diff --git a/mixnet/test_client.py b/mixnet/test_client.py new file mode 100644 index 0000000..1f9f31b --- /dev/null +++ b/mixnet/test_client.py @@ -0,0 +1,78 @@ +import queue +from datetime import datetime +from typing import Tuple +from unittest import TestCase + +import numpy +import timeout_decorator +from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + +from mixnet.bls import generate_bls +from mixnet.client import MixClientRunner +from mixnet.mixnet import Mixnet, MixnetTopology +from mixnet.node import MixNode, PacketQueue +from mixnet.packet import PacketBuilder +from mixnet.poisson import poisson_mean_interval_sec +from mixnet.utils import random_bytes + + +class TestMixClientRunner(TestCase): + @timeout_decorator.timeout(180) + def test_mixclient_runner_emission_rate(self): + mixnet, topology = self.init() + real_packet_queue: PacketQueue = queue.Queue() + outbound_socket: PacketQueue = queue.Queue() + + emission_rate_per_min = 30 + redundancy = 3 + client = MixClientRunner( + mixnet, + topology, + emission_rate_per_min, + redundancy, + real_packet_queue, + outbound_socket, + ) + client.daemon = True + client.start() + + # Create packets. At least two packets are expected to be generated from a 3500-byte msg + builder = PacketBuilder.real(random_bytes(3500), mixnet, topology) + # Schedule two packets to the mix client without any interval + packet, route = builder.next() + real_packet_queue.put((route[0].addr, packet)) + packet, route = builder.next() + real_packet_queue.put((route[0].addr, packet)) + + # Calculate intervals between packet emissions from the mix client + intervals = [] + ts = datetime.now() + for _ in range(30): + _ = outbound_socket.get() + now = datetime.now() + intervals.append((now - ts).total_seconds()) + ts = now + + # Check if packets were emitted at the Poisson emission_rate + # If emissions follow the Poisson distribution with a rate `lambda`, + # a mean interval between emissions must be `1/lambda`. + self.assertAlmostEqual( + float(numpy.mean(intervals)), + poisson_mean_interval_sec(emission_rate_per_min), + delta=1.0, + ) + + @staticmethod + def init() -> Tuple[Mixnet, MixnetTopology]: + mixnet = Mixnet( + [ + MixNode( + generate_bls(), + X25519PrivateKey.generate(), + random_bytes(32), + ) + for _ in range(12) + ] + ) + topology = mixnet.build_topology(b"entropy", 3, 3) + return mixnet, topology diff --git a/mixnet/test_node.py b/mixnet/test_node.py index 8783e9d..1c5c90c 100644 --- a/mixnet/test_node.py +++ b/mixnet/test_node.py @@ -83,7 +83,7 @@ class TestMixNodeRunner(TestCase): self.assertAlmostEqual( float(numpy.mean(num_jobs)), round(emission_rate_per_min / delay_rate_per_min), - delta=1.0, + delta=1.5, ) @staticmethod