diff --git a/mixnet/client.py b/mixnet/client.py index d124cf4..8d21cf3 100644 --- a/mixnet/client.py +++ b/mixnet/client.py @@ -4,49 +4,47 @@ import asyncio from contextlib import suppress from typing import Self -from mixnet.config import MixnetConfig +from mixnet.config import MixClientConfig, MixnetTopology from mixnet.node import PacketQueue from mixnet.packet import PacketBuilder from mixnet.poisson import poisson_interval_sec class MixClient: - __config: MixnetConfig - - __real_packet_queue: PacketQueue - __outbound_socket: PacketQueue - __task: asyncio.Task # A reference just to prevent task from being garbage collected + config: MixClientConfig + real_packet_queue: PacketQueue + outbound_socket: PacketQueue + task: asyncio.Task # A reference just to prevent task from being garbage collected @classmethod async def new( cls, - config: MixnetConfig, + config: MixClientConfig, ) -> Self: self = cls() - self.__config = config - self.__real_packet_queue = asyncio.Queue() - self.__outbound_socket = asyncio.Queue() - self.__task = asyncio.create_task(self.__run()) + self.config = config + self.real_packet_queue = asyncio.Queue() + self.outbound_socket = asyncio.Queue() + self.task = asyncio.create_task(self.__run()) return self - def set_config(self, config: MixnetConfig) -> None: + def set_topology(self, topology: MixnetTopology) -> None: """ - Replace the old config with the new config received + Replace the old topology with the new topology received In real implementations, this method may be integrated in a long-running task. Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment. """ - self.__config = config + self.config.topology = topology - def get_config(self) -> MixnetConfig: - return self.__config + # Only for testing + def get_topology(self) -> MixnetTopology: + return self.config.topology async def send_message(self, msg: bytes) -> None: - packets_and_routes = PacketBuilder.build_real_packets( - msg, self.__config.topology - ) + packets_and_routes = PacketBuilder.build_real_packets(msg, self.config.topology) for packet, route in packets_and_routes: - await self.__real_packet_queue.put((route[0].addr, packet)) + await self.real_packet_queue.put((route[0].addr, packet)) def subscribe_messages(self) -> "asyncio.Queue[bytes]": """ @@ -54,10 +52,6 @@ class MixClient: """ return asyncio.Queue() - @property - def outbound_socket(self) -> PacketQueue: - return self.__outbound_socket - async def __run(self): """ Emit packets at the Poisson emission_rate_per_min. @@ -73,7 +67,7 @@ class MixClient: emission_notifier_queue = asyncio.Queue() _ = asyncio.create_task( self.__emission_notifier( - self.__config.emission_rate_per_min, emission_notifier_queue + self.config.emission_rate_per_min, emission_notifier_queue ) ) @@ -81,7 +75,7 @@ class MixClient: # Wait until the next emission time _ = await emission_notifier_queue.get() try: - await self.__emit(self.__config.redundancy, redundant_real_packet_queue) + await self.__emit(self.config.redundancy, redundant_real_packet_queue) finally: # Python convention: indicate that the previously enqueued task has been processed emission_notifier_queue.task_done() @@ -93,23 +87,23 @@ class MixClient: ): if not redundant_real_packet_queue.empty(): addr, packet = redundant_real_packet_queue.get_nowait() - await self.__outbound_socket.put((addr, packet)) + await self.outbound_socket.put((addr, packet)) return - if not self.__real_packet_queue.empty(): - addr, packet = self.__real_packet_queue.get_nowait() + if not self.real_packet_queue.empty(): + addr, packet = self.real_packet_queue.get_nowait() # Schedule redundant real packets for _ in range(redundancy - 1): redundant_real_packet_queue.put_nowait((addr, packet)) - await self.__outbound_socket.put((addr, packet)) + await self.outbound_socket.put((addr, packet)) packets_and_routes = PacketBuilder.build_drop_cover_packets( - b"drop cover", self.__config.topology + b"drop cover", self.config.topology ) # We have a for loop here, but we expect that the total num of packets is 1 # because the dummy message is short. for packet, route in packets_and_routes: - await self.__outbound_socket.put((route[0].addr, packet)) + await self.outbound_socket.put((route[0].addr, packet)) async def __emission_notifier( self, emission_rate_per_min: int, queue: asyncio.Queue @@ -119,6 +113,6 @@ class MixClient: queue.put_nowait(None) async def cancel(self) -> None: - self.__task.cancel() + self.task.cancel() with suppress(asyncio.CancelledError): - await self.__task + await self.task diff --git a/mixnet/config.py b/mixnet/config.py index 0880c3c..5b1a2d1 100644 --- a/mixnet/config.py +++ b/mixnet/config.py @@ -11,22 +11,59 @@ from cryptography.hazmat.primitives.asymmetric.x25519 import ( from pysphinx.node import Node from mixnet.bls import BlsPrivateKey, BlsPublicKey +from mixnet.fisheryates import FisherYates @dataclass class MixnetConfig: + topology_config: MixnetTopologyConfig + mixclient_config: MixClientConfig + mixnode_config: MixNodeConfig + + +@dataclass +class MixnetTopologyConfig: + mixnode_candidates: List[MixNodeInfo] + size: MixnetTopologySize + entropy: bytes + + +@dataclass +class MixClientConfig: emission_rate_per_min: int # Poisson rate parameter: lambda redundancy: int - delay_rate_per_min: int # Poisson rate parameter: mu topology: MixnetTopology +@dataclass +class MixNodeConfig: + encryption_private_key: X25519PrivateKey + delay_rate_per_min: int # Poisson rate parameter: mu + + @dataclass class MixnetTopology: # In production, this can be a 1-D array, which is accessible by indexes. # Here, we use a 2-D array for readability. layers: List[List[MixNodeInfo]] + def __init__( + self, + config: MixnetTopologyConfig, + ) -> None: + """ + Build a new topology deterministically using an entropy and a given set of candidates. + """ + shuffled = FisherYates.shuffle(config.mixnode_candidates, config.entropy) + sampled = shuffled[: config.size.num_total_mixnodes()] + + layers = [] + for layer_id in range(config.size.num_layers): + start = layer_id * config.size.num_mixnodes_per_layer + layer = sampled[start : start + config.size.num_mixnodes_per_layer] + layers.append(layer) + self.layers = layers + def generate_route(self, mix_destination: MixNodeInfo) -> list[MixNodeInfo]: """ Generate a mix route for a Sphinx packet. @@ -45,6 +82,15 @@ class MixnetTopology: return random.choice(self.layers[-1]) +@dataclass +class MixnetTopologySize: + num_layers: int + num_mixnodes_per_layer: int + + def num_total_mixnodes(self) -> int: + return self.num_layers * self.num_mixnodes_per_layer + + # 32-byte that represents an IP address and a port of a mix node. NodeAddress: TypeAlias = bytes diff --git a/mixnet/mixnet.py b/mixnet/mixnet.py index 5891e71..dedc9ef 100644 --- a/mixnet/mixnet.py +++ b/mixnet/mixnet.py @@ -1,51 +1,62 @@ from __future__ import annotations import asyncio -from typing import Self - -from cryptography.hazmat.primitives.asymmetric.x25519 import ( - X25519PrivateKey, -) +from contextlib import suppress +from typing import Self, TypeAlias from mixnet.client import MixClient -from mixnet.config import MixnetConfig +from mixnet.config import MixnetConfig, MixnetTopology, MixnetTopologyConfig from mixnet.node import MixNode +EntropyQueue: TypeAlias = "asyncio.Queue[bytes]" + class Mixnet: - __mixclient: MixClient - __mixnode: MixNode + topology_config: MixnetTopologyConfig + + mixclient: MixClient + mixnode: MixNode + entropy_queue: EntropyQueue + task: asyncio.Task # A reference just to prevent task from being garbage collected @classmethod async def new( cls, - encryption_private_key: X25519PrivateKey, config: MixnetConfig, + entropy_queue: EntropyQueue, ) -> Self: self = cls() - self.__mixclient = await MixClient.new(config) - self.__mixnode = await MixNode.new(encryption_private_key, config) + self.topology_config = config.topology_config + self.mixclient = await MixClient.new(config.mixclient_config) + self.mixnode = await MixNode.new(config.mixnode_config) + self.entropy_queue = entropy_queue + self.task = asyncio.create_task(self.__consume_entropy()) return self async def publish_message(self, msg: bytes) -> None: - await self.__mixclient.send_message(msg) + await self.mixclient.send_message(msg) def subscribe_messages(self) -> "asyncio.Queue[bytes]": - return self.__mixclient.subscribe_messages() + return self.mixclient.subscribe_messages() - def set_config(self, config: MixnetConfig) -> None: - """ - Replace the old config with the new config received. + async def __consume_entropy( + self, + ) -> None: + while True: + entropy = await self.entropy_queue.get() + self.topology_config.entropy = entropy - In real implementations, this method should be a long-running task, accepting configs periodically. - Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment. - """ - self.__mixclient.set_config(config) - self.__mixnode.set_config(config) - - def get_config(self) -> MixnetConfig: - return self.__mixclient.get_config() + topology = MixnetTopology(self.topology_config) + self.mixclient.set_topology(topology) async def cancel(self) -> None: - await self.__mixclient.cancel() - await self.__mixnode.cancel() + self.task.cancel() + with suppress(asyncio.CancelledError): + await self.task + + await self.mixclient.cancel() + await self.mixnode.cancel() + + # Only for testing + def get_topology(self) -> MixnetTopology: + return self.mixclient.get_topology() diff --git a/mixnet/node.py b/mixnet/node.py index abb397c..8ab4b77 100644 --- a/mixnet/node.py +++ b/mixnet/node.py @@ -15,7 +15,7 @@ from pysphinx.sphinx import ( UnknownHeaderTypeError, ) -from mixnet.config import MixnetConfig, NodeAddress +from mixnet.config import MixNodeConfig, NodeAddress from mixnet.poisson import poisson_interval_sec PacketQueue: TypeAlias = "asyncio.Queue[Tuple[NodeAddress, SphinxPacket]]" @@ -32,30 +32,24 @@ class MixNode: in order to define the MixNode as a simple dataclass for clarity. """ - __config: MixnetConfig - + config: MixNodeConfig inbound_socket: PacketQueue outbound_socket: PacketPayloadQueue - __task: asyncio.Task # A reference just to prevent task from being garbage collected + task: asyncio.Task # A reference just to prevent task from being garbage collected @classmethod async def new( cls, - encryption_private_key: X25519PrivateKey, - config: MixnetConfig, + config: MixNodeConfig, ) -> Self: self = cls() - self.__config = config - self.__establish_connections() + self.config = config self.inbound_socket = asyncio.Queue() self.outbound_socket = asyncio.Queue() - self.__task = asyncio.create_task(self.__run(encryption_private_key)) + self.task = asyncio.create_task(self.__run()) return self - async def __run( - self, - encryption_private_key: X25519PrivateKey, - ): + async def __run(self): """ Read SphinxPackets from inbound socket and spawn a thread for each packet to process it. @@ -70,7 +64,9 @@ class MixNode: _, packet = await self.inbound_socket.get() task = asyncio.create_task( self.__process_packet( - packet, encryption_private_key, self.__config.delay_rate_per_min + packet, + self.config.encryption_private_key, + self.config.delay_rate_per_min, ) ) self.tasks.add(task) @@ -105,30 +101,7 @@ class MixNode: case _: raise UnknownHeaderTypeError - def set_config(self, config: MixnetConfig) -> None: - """ - Replace the old config with the new config received. - If topology has been changed, start establishing new network connections in background. - - In real implementations, this method may be integrated in a long-running task. - Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment. - """ - if self.__config.topology != config.topology: - self.__establish_connections() - self.__config = config - - def __establish_connections(self) -> None: - """ - Establish network connections in advance based on the topology received. - - This is just a preparation to forward subsequent packets as quickly as possible, - but this is not a strict requirement. - - In real implementations, this should be a background task. - """ - pass - async def cancel(self) -> None: - self.__task.cancel() + self.task.cancel() with suppress(asyncio.CancelledError): - await self.__task + await self.task diff --git a/mixnet/robustness.py b/mixnet/robustness.py deleted file mode 100644 index d477162..0000000 --- a/mixnet/robustness.py +++ /dev/null @@ -1,100 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import List - -from mixnet.config import MixnetConfig, MixnetTopology, MixNodeInfo -from mixnet.fisheryates import FisherYates -from mixnet.mixnet import Mixnet - - -class Robustness: - """ - A robustness layer is placed on top of a mixnet layer and a consensus layer, - to separate their responsibilities and minimize dependencies between them. - - For v1, the role of robustness layer is building a new mixnet topology - and injecting it to the mixnet layer, - whenever a new entropy is received from the consensus layer. - A static list of nodes is used for building topologies deterministically. - This can be changed in later versions. - - In later versions, the robustness layer will have more responsibilities. - """ - - def __init__( - self, - config: RobustnessConfig, - mixnet: Mixnet, - ) -> None: - self.__config = config - self.__mixnet = mixnet - - def set_entropy(self, entropy: bytes) -> None: - """ - Given a entropy received, build a new topology and send it to mixnet. - In v1, this doesn't change any mixnet config except topology. - - In real implementations, this method should be a long-running task, consuming entropy periodically. - Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment. - """ - self.__config.mixnet.mixnet_layer_config.topology = self.build_topology( - self.__config.mixnet.mixnode_candidates, - self.__config.mixnet.topology_size, - entropy, - ) - self.__mixnet.set_config(self.__config.mixnet.mixnet_layer_config) - - @staticmethod - def build_topology( - mixnode_candidates: List[MixNodeInfo], - mixnet_topology_size: MixnetTopologySize, - entropy: bytes, - ) -> MixnetTopology: - """ - Build a new topology deterministically using an entropy and a given set of candidates. - """ - shuffled = FisherYates.shuffle(mixnode_candidates, entropy) - sampled = shuffled[: mixnet_topology_size.num_total_mixnodes()] - - layers = [] - for layer_id in range(mixnet_topology_size.num_layers): - start = layer_id * mixnet_topology_size.num_mixnodes_per_layer - layer = sampled[start : start + mixnet_topology_size.num_mixnodes_per_layer] - layers.append(layer) - return MixnetTopology(layers) - - -@dataclass -class RobustnessConfig: - """In v1, the robustness layer manages configs only for the mixnet layer.""" - - mixnet: RobustnessMixnetConfig - - -class RobustnessMixnetConfig: - """ - Configurations for the mixnet layer - These configurations are meant to be changed over time according to other parameters from other layers (e.g. consensus). - """ - - def __init__( - self, - mixnode_candidates: List[MixNodeInfo], - mixnet_topology_size: MixnetTopologySize, - mixnet_layer_config: MixnetConfig, - ) -> None: - assert mixnet_topology_size.num_total_mixnodes() <= len(mixnode_candidates) - self.mixnode_candidates = mixnode_candidates - self.topology_size = mixnet_topology_size - # A config to be injected to the mixnet layer whenever it is updated - self.mixnet_layer_config = mixnet_layer_config - - -@dataclass -class MixnetTopologySize: - num_layers: int - num_mixnodes_per_layer: int - - def num_total_mixnodes(self) -> int: - return self.num_layers * self.num_mixnodes_per_layer diff --git a/mixnet/test_client.py b/mixnet/test_client.py index 3cd5c9d..4327466 100644 --- a/mixnet/test_client.py +++ b/mixnet/test_client.py @@ -6,7 +6,7 @@ import numpy from mixnet.client import MixClient from mixnet.poisson import poisson_mean_interval_sec from mixnet.test_utils import ( - init_robustness_mixnet_config, + init_mixnet_config, with_test_timeout, ) from mixnet.utils import random_bytes @@ -15,7 +15,7 @@ from mixnet.utils import random_bytes class TestMixClient(IsolatedAsyncioTestCase): @with_test_timeout(100) async def test_mixclient(self): - config = init_robustness_mixnet_config().mixnet_layer_config + config = init_mixnet_config().mixclient_config config.emission_rate_per_min = 30 config.redundancy = 3 diff --git a/mixnet/test_mixnet.py b/mixnet/test_mixnet.py index 2b74a6d..9a0e4cb 100644 --- a/mixnet/test_mixnet.py +++ b/mixnet/test_mixnet.py @@ -1,26 +1,20 @@ +import asyncio from unittest import IsolatedAsyncioTestCase from mixnet.mixnet import Mixnet -from mixnet.robustness import Robustness, RobustnessConfig -from mixnet.test_utils import init_robustness_mixnet_config +from mixnet.test_utils import init_mixnet_config class TestMixnet(IsolatedAsyncioTestCase): async def test_topology_from_robustness(self): - robustness_mixnet_config = init_robustness_mixnet_config() + config = init_mixnet_config() + entropy_queue = asyncio.Queue() - mixnet = await Mixnet.new( - robustness_mixnet_config.mixnode_candidates[0].encryption_private_key, - robustness_mixnet_config.mixnet_layer_config, - ) + mixnet = await Mixnet.new(config, entropy_queue) try: - robustness = Robustness(RobustnessConfig(robustness_mixnet_config), mixnet) - self.assertEqual( - robustness_mixnet_config.mixnet_layer_config, mixnet.get_config() - ) - - old_topology = robustness_mixnet_config.mixnet_layer_config.topology - robustness.set_entropy(b"new entropy") - self.assertNotEqual(old_topology, mixnet.get_config().topology) + old_topology = config.mixclient_config.topology + await entropy_queue.put(b"new entropy") + await asyncio.sleep(1) + self.assertNotEqual(old_topology, mixnet.get_topology()) finally: await mixnet.cancel() diff --git a/mixnet/test_node.py b/mixnet/test_node.py index edc496c..26ab0c7 100644 --- a/mixnet/test_node.py +++ b/mixnet/test_node.py @@ -9,7 +9,7 @@ from mixnet.node import MixNode, NodeAddress, PacketQueue from mixnet.packet import PacketBuilder from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec from mixnet.test_utils import ( - init_robustness_mixnet_config, + init_mixnet_config, with_test_timeout, ) @@ -24,14 +24,17 @@ class TestMixNodeRunner(IsolatedAsyncioTestCase): and if processing is delayed according to an exponential distribution with a rate `mu`, the rate of outputs should be `lambda`. """ - config = init_robustness_mixnet_config().mixnet_layer_config - config.emission_rate_per_min = 120 # lambda (= 2msg/sec) - config.delay_rate_per_min = 30 # mu (= 2s delay on average) + config = init_mixnet_config() + config.mixclient_config.emission_rate_per_min = 120 # lambda (= 2msg/sec) + config.mixnode_config.delay_rate_per_min = 30 # mu (= 2s delay on average) - packet, route = PacketBuilder.build_real_packets(b"msg", config.topology)[0] + packet, route = PacketBuilder.build_real_packets( + b"msg", config.mixclient_config.topology + )[0] # Start only the first mix node for testing - mixnode = await MixNode.new(route[0].encryption_private_key, config) + config.mixnode_config.encryption_private_key = route[0].encryption_private_key + mixnode = await MixNode.new(config.mixnode_config) try: # Send packets to the first mix node in a Poisson distribution packet_count = 100 @@ -43,7 +46,7 @@ class TestMixNodeRunner(IsolatedAsyncioTestCase): packet, route[0].addr, packet_count, - config.emission_rate_per_min, + config.mixclient_config.emission_rate_per_min, sent_packet_queue, ) ) @@ -77,14 +80,19 @@ class TestMixNodeRunner(IsolatedAsyncioTestCase): # a mean interval between outputs must be `1/lambda`. self.assertAlmostEqual( float(numpy.mean(intervals)), - poisson_mean_interval_sec(config.emission_rate_per_min), + poisson_mean_interval_sec( + config.mixclient_config.emission_rate_per_min + ), delta=1.0, ) # If runner is a M/M/inf queue, # a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`. self.assertAlmostEqual( float(numpy.mean(num_jobs)), - round(config.emission_rate_per_min / config.delay_rate_per_min), + round( + config.mixclient_config.emission_rate_per_min + / config.mixnode_config.delay_rate_per_min + ), delta=1.5, ) finally: diff --git a/mixnet/test_packet.py b/mixnet/test_packet.py index 33329e8..d1d517a 100644 --- a/mixnet/test_packet.py +++ b/mixnet/test_packet.py @@ -10,13 +10,13 @@ from mixnet.packet import ( MessageReconstructor, PacketBuilder, ) -from mixnet.test_utils import init_robustness_mixnet_config +from mixnet.test_utils import init_mixnet_config from mixnet.utils import random_bytes class TestPacket(TestCase): def test_real_packet(self): - topology = init_robustness_mixnet_config().mixnet_layer_config.topology + topology = init_mixnet_config().mixclient_config.topology msg = random_bytes(3500) packets_and_routes = PacketBuilder.build_real_packets(msg, topology) self.assertEqual(4, len(packets_and_routes)) @@ -47,7 +47,7 @@ class TestPacket(TestCase): ) def test_cover_packet(self): - topology = init_robustness_mixnet_config().mixnet_layer_config.topology + topology = init_mixnet_config().mixclient_config.topology msg = b"cover" packets_and_routes = PacketBuilder.build_drop_cover_packets(msg, topology) self.assertEqual(1, len(packets_and_routes)) diff --git a/mixnet/test_robustness.py b/mixnet/test_robustness.py deleted file mode 100644 index 418767d..0000000 --- a/mixnet/test_robustness.py +++ /dev/null @@ -1,14 +0,0 @@ -from unittest import TestCase - -from mixnet.test_utils import init_robustness_mixnet_config - - -class TestRobustness(TestCase): - def test_build_topology(self): - robustness_mixnet_config = init_robustness_mixnet_config() - topology = robustness_mixnet_config.mixnet_layer_config.topology - topology_size = robustness_mixnet_config.topology_size - - self.assertEqual(len(topology.layers), topology_size.num_layers) - for layer in topology.layers: - self.assertEqual(len(layer), topology_size.num_mixnodes_per_layer) diff --git a/mixnet/test_utils.py b/mixnet/test_utils.py index 7820835..e3ba260 100644 --- a/mixnet/test_utils.py +++ b/mixnet/test_utils.py @@ -3,8 +3,15 @@ import asyncio from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey from mixnet.bls import generate_bls -from mixnet.config import MixnetConfig, MixNodeInfo -from mixnet.robustness import MixnetTopologySize, Robustness, RobustnessMixnetConfig +from mixnet.config import ( + MixClientConfig, + MixNodeConfig, + MixnetConfig, + MixNodeInfo, + MixnetTopology, + MixnetTopologyConfig, + MixnetTopologySize, +) from mixnet.utils import random_bytes @@ -19,22 +26,21 @@ def with_test_timeout(t): return wrapper -def init_robustness_mixnet_config() -> RobustnessMixnetConfig: - mixnode_candidates = [ - MixNodeInfo( - generate_bls(), - X25519PrivateKey.generate(), - random_bytes(32), - ) - for _ in range(12) - ] - topology_size = MixnetTopologySize(3, 3) - mixnet_layer_config = MixnetConfig( - 30, - 3, - 30, - Robustness.build_topology(mixnode_candidates, topology_size, b"entropy"), +def init_mixnet_config() -> MixnetConfig: + topology_config = MixnetTopologyConfig( + [ + MixNodeInfo( + generate_bls(), + X25519PrivateKey.generate(), + random_bytes(32), + ) + for _ in range(12) + ], + MixnetTopologySize(3, 3), + b"entropy", ) - return RobustnessMixnetConfig( - mixnode_candidates, topology_size, mixnet_layer_config + mixclient_config = MixClientConfig(30, 3, MixnetTopology(topology_config)) + mixnode_config = MixNodeConfig( + topology_config.mixnode_candidates[0].encryption_private_key, 30 ) + return MixnetConfig(topology_config, mixclient_config, mixnode_config)