diff --git a/mixnet/client.py b/mixnet/client.py index 295b7c6..e11128b 100644 --- a/mixnet/client.py +++ b/mixnet/client.py @@ -2,7 +2,7 @@ from __future__ import annotations import asyncio -from mixnet.mixnet import Mixnet, MixnetTopology +from mixnet.mixnet import Mixnet from mixnet.node import PacketQueue from mixnet.packet import PacketBuilder from mixnet.poisson import poisson_interval_sec @@ -10,7 +10,6 @@ from mixnet.poisson import poisson_interval_sec async def mixclient_emitter( mixnet: Mixnet, - topology: MixnetTopology, emission_rate_per_min: int, # Poisson rate parameter: lambda in the spec redundancy: int, # b in the spec real_packet_queue: PacketQueue, @@ -38,7 +37,6 @@ async def mixclient_emitter( try: await emit( mixnet, - topology, redundancy, real_packet_queue, redundant_real_packet_queue, @@ -51,7 +49,6 @@ async def mixclient_emitter( async def emit( mixnet: Mixnet, - topology: MixnetTopology, redundancy: int, # b in the spec real_packet_queue: PacketQueue, redundant_real_packet_queue: PacketQueue, @@ -69,7 +66,7 @@ async def emit( redundant_real_packet_queue.put_nowait((addr, packet)) await outbound_socket.put((addr, packet)) - packet, route = PacketBuilder.drop_cover(b"drop cover", mixnet, topology).next() + packet, route = PacketBuilder.drop_cover(b"drop cover", mixnet).next() await outbound_socket.put((route[0].addr, packet)) diff --git a/mixnet/mixnet.py b/mixnet/mixnet.py index caa75f4..42a54f0 100644 --- a/mixnet/mixnet.py +++ b/mixnet/mixnet.py @@ -4,44 +4,67 @@ import random from dataclasses import dataclass from typing import List -from mixnet.fisheryates import FisherYates from mixnet.node import MixNode -@dataclass class Mixnet: - mix_nodes: List[MixNode] + __topology: MixnetTopology | None = None - # Build a new topology deterministically using an entropy. - # The entropy is expected to be injected from outside. - # - # TODO: Implement constructing a new topology in advance to minimize the topology transition time. - # https://www.notion.so/Mixnet-Specification-807b624444a54a4b88afa1cc80e100c2?pvs=4#9a7f6089e210454bb11fe1c10fceff68 - def build_topology( - self, - entropy: bytes, - n_layers: int, - n_nodes_per_layer: int, - ) -> MixnetTopology: - num_nodes = n_nodes_per_layer * n_layers - assert num_nodes < len(self.mix_nodes) + def get_topology(self) -> MixnetTopology: + if self.__topology is None: + raise RuntimeError("topology is not set yet") + return self.__topology - shuffled = FisherYates.shuffle(self.mix_nodes, entropy) - sampled = shuffled[:num_nodes] - layers = [] - for l in range(n_layers): - start = l * n_nodes_per_layer - layer = sampled[start : start + n_nodes_per_layer] - layers.append(layer) - return MixnetTopology(layers) + def set_topology(self, topology: MixnetTopology) -> None: + """ + Replace the old topology with the new topology received, and start establishing new network connections in background. - def choose_mixnode(self) -> MixNode: - return random.choice(self.mix_nodes) + In real implementations, this method should be a long-running task, accepting topologies periodically. + Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment. + """ + self.__topology = topology + self.__establish_connections() + + def __establish_connections(self) -> None: + """ + Establish network connections in advance based on the topology received. + + This is just a preparation to forward subsequent packets as quickly as possible, + but this is not a strict requirement. + + In real implementations, this should be a background task. + """ + pass @dataclass class MixnetTopology: + # In production, this can be a 1-D array, which is accessible by indexes. + # Here, we use a 2-D array for readability. layers: List[List[MixNode]] - def generate_route(self) -> list[MixNode]: - return [random.choice(layer) for layer in self.layers] + def generate_route(self, mix_destination: MixNode) -> list[MixNode]: + """ + Generate a mix route for a Sphinx packet. + The pre-selected mix_destination is used as a last mix node in the route, + so that associated packets can be merged together into a original message. + """ + route = [random.choice(layer) for layer in self.layers[:-1]] + route.append(mix_destination) + return route + + def choose_mix_destination(self) -> MixNode: + """ + Choose a mix node from the last mix layer as a mix destination + that will reconstruct a message from Sphinx packets. + """ + return random.choice(self.layers[-1]) + + +@dataclass +class MixnetTopologySize: + num_layers: int + num_mixnodes_per_layer: int + + def num_total_mixnodes(self) -> int: + return self.num_layers * self.num_mixnodes_per_layer diff --git a/mixnet/packet.py b/mixnet/packet.py index 50b721b..0b41e05 100644 --- a/mixnet/packet.py +++ b/mixnet/packet.py @@ -9,7 +9,7 @@ from typing import Dict, Iterator, List, Self, Tuple, TypeAlias from pysphinx.payload import Payload from pysphinx.sphinx import SphinxPacket -from mixnet.mixnet import Mixnet, MixnetTopology, MixNode +from mixnet.mixnet import Mixnet, MixNode class MessageFlag(Enum): @@ -28,9 +28,9 @@ class PacketBuilder: flag: MessageFlag, message: bytes, mixnet: Mixnet, - topology: MixnetTopology, ): - destination = mixnet.choose_mixnode() + topology = mixnet.get_topology() + destination = topology.choose_mix_destination() msg_with_flag = flag.bytes() + message # NOTE: We don't encrypt msg_with_flag for destination. @@ -39,7 +39,7 @@ class PacketBuilder: packets_and_routes = [] for fragment in fragment_set.fragments: - route = topology.generate_route() + route = topology.generate_route(destination) packet = SphinxPacket.build( fragment.bytes(), [mixnode.sphinx_node() for mixnode in route], @@ -50,14 +50,12 @@ class PacketBuilder: self.iter = iter(packets_and_routes) @classmethod - def real(cls, message: bytes, mixnet: Mixnet, topology: MixnetTopology) -> Self: - return cls(MessageFlag.MESSAGE_FLAG_REAL, message, mixnet, topology) + def real(cls, message: bytes, mixnet: Mixnet) -> Self: + return cls(MessageFlag.MESSAGE_FLAG_REAL, message, mixnet) @classmethod - def drop_cover( - cls, message: bytes, mixnet: Mixnet, topology: MixnetTopology - ) -> Self: - return cls(MessageFlag.MESSAGE_FLAG_DROP_COVER, message, mixnet, topology) + def drop_cover(cls, message: bytes, mixnet: Mixnet) -> Self: + return cls(MessageFlag.MESSAGE_FLAG_DROP_COVER, message, mixnet) def next(self) -> Tuple[SphinxPacket, List[MixNode]]: return next(self.iter) diff --git a/mixnet/robustness.py b/mixnet/robustness.py new file mode 100644 index 0000000..fa6aa6c --- /dev/null +++ b/mixnet/robustness.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from typing import List + +from mixnet.fisheryates import FisherYates +from mixnet.mixnet import ( + Mixnet, + MixnetTopology, + MixnetTopologySize, +) +from mixnet.node import MixNode + + +class Robustness: + """ + A robustness layer is placed on top of a mixnet layer and a consensus layer, + to separate their responsibilities and minimize dependencies between them. + + For v1, the role of robustness layer is building a new mixnet topology + and injecting it to the mixnet layer, + whenever a new entropy is received from the consensus layer. + A static list of nodes is used for building topologies deterministically. + This can be changed in later versions. + + In later versions, the robustness layer will have more responsibilities. + """ + + def __init__( + self, + mixnode_candidates: List[MixNode], + mixnet_topology_size: MixnetTopologySize, + mixnet: Mixnet, + ) -> None: + assert mixnet_topology_size.num_total_mixnodes() <= len(mixnode_candidates) + self.mixnode_candidates = mixnode_candidates + self.mixnet_topology_size = mixnet_topology_size + self.mixnet = mixnet + + def set_entropy(self, entropy: bytes) -> None: + """ + Given a entropy received, build a new topology and send it to mixnet. + + In real implementations, this method should be a long-running task, consuming entropy periodically. + Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment. + """ + topology = self.build_topology(entropy) + self.mixnet.set_topology(topology) + + def build_topology(self, entropy: bytes) -> MixnetTopology: + """ + Build a new topology deterministically using an entropy and a given set of candidates. + """ + shuffled = FisherYates.shuffle(self.mixnode_candidates, entropy) + sampled = shuffled[: self.mixnet_topology_size.num_total_mixnodes()] + + layers = [] + for layer_id in range(self.mixnet_topology_size.num_layers): + start = layer_id * self.mixnet_topology_size.num_mixnodes_per_layer + layer = sampled[ + start : start + self.mixnet_topology_size.num_mixnodes_per_layer + ] + layers.append(layer) + return MixnetTopology(layers) diff --git a/mixnet/test_client.py b/mixnet/test_client.py index 2aea83f..1999bb1 100644 --- a/mixnet/test_client.py +++ b/mixnet/test_client.py @@ -1,25 +1,21 @@ import asyncio from datetime import datetime -from typing import Tuple -from unittest import IsolatedAsyncioTestCase import numpy -from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey -from mixnet.bls import generate_bls from mixnet.client import mixclient_emitter -from mixnet.mixnet import Mixnet, MixnetTopology -from mixnet.node import MixNode, PacketQueue +from mixnet.node import PacketQueue from mixnet.packet import PacketBuilder from mixnet.poisson import poisson_mean_interval_sec -from mixnet.utils import random_bytes +from mixnet.test_mixnet import TestMixnet from mixnet.test_utils import with_test_timeout +from mixnet.utils import random_bytes -class TestMixClient(IsolatedAsyncioTestCase): +class TestMixClient(TestMixnet): @with_test_timeout(100) async def test_mixclient_emitter(self): - mixnet, topology = self.init() + mixnet, _ = self.init() real_packet_queue: PacketQueue = asyncio.Queue() outbound_socket: PacketQueue = asyncio.Queue() @@ -28,7 +24,6 @@ class TestMixClient(IsolatedAsyncioTestCase): _ = asyncio.create_task( mixclient_emitter( mixnet, - topology, emission_rate_per_min, redundancy, real_packet_queue, @@ -37,7 +32,7 @@ class TestMixClient(IsolatedAsyncioTestCase): ) # Create packets. At least two packets are expected to be generated from a 3500-byte msg - builder = PacketBuilder.real(random_bytes(3500), mixnet, topology) + builder = PacketBuilder.real(random_bytes(3500), mixnet) # Schedule two packets to the mix client without any interval packet, route = builder.next() await real_packet_queue.put((route[0].addr, packet)) @@ -61,18 +56,3 @@ class TestMixClient(IsolatedAsyncioTestCase): poisson_mean_interval_sec(emission_rate_per_min), delta=1.0, ) - - @staticmethod - def init() -> Tuple[Mixnet, MixnetTopology]: - mixnet = Mixnet( - [ - MixNode( - generate_bls(), - X25519PrivateKey.generate(), - random_bytes(32), - ) - for _ in range(12) - ] - ) - topology = mixnet.build_topology(b"entropy", 3, 3) - return mixnet, topology diff --git a/mixnet/test_mixnet.py b/mixnet/test_mixnet.py index d63e914..a14345c 100644 --- a/mixnet/test_mixnet.py +++ b/mixnet/test_mixnet.py @@ -1,21 +1,40 @@ -from unittest import TestCase +from typing import Tuple +from unittest import IsolatedAsyncioTestCase from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey from mixnet.bls import generate_bls -from mixnet.mixnet import Mixnet, MixNode +from mixnet.mixnet import Mixnet, MixnetTopologySize, MixNode +from mixnet.robustness import Robustness from mixnet.utils import random_bytes -class TestMixnet(TestCase): - def test_build_topology(self): - nodes = [ - MixNode(generate_bls(), X25519PrivateKey.generate(), random_bytes(32)) - for _ in range(12) - ] - mixnet = Mixnet(nodes) +class TestMixnet(IsolatedAsyncioTestCase): + @staticmethod + def init() -> Tuple[Mixnet, Robustness]: + mixnet = Mixnet() + robustness = Robustness( + [ + MixNode( + generate_bls(), + X25519PrivateKey.generate(), + random_bytes(32), + ) + for _ in range(12) + ], + MixnetTopologySize(3, 3), + mixnet, + ) + robustness.set_entropy(b"entropy") - topology = mixnet.build_topology(b"entropy", 3, 3) - self.assertEqual(len(topology.layers), 3) - for layer in topology.layers: - self.assertEqual(len(layer), 3) + return (mixnet, robustness) + + def test_topology_from_robustness(self): + mixnet, robustness = self.init() + + topology1 = mixnet.get_topology() + + robustness.set_entropy(b"new entropy") + topology2 = mixnet.get_topology() + + self.assertNotEqual(topology1, topology2) diff --git a/mixnet/test_node.py b/mixnet/test_node.py index b7a33d5..0a0cb18 100644 --- a/mixnet/test_node.py +++ b/mixnet/test_node.py @@ -1,22 +1,17 @@ import asyncio from datetime import datetime -from typing import Tuple -from unittest import IsolatedAsyncioTestCase import numpy -from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey from pysphinx.sphinx import SphinxPacket -from mixnet.bls import generate_bls -from mixnet.mixnet import Mixnet, MixnetTopology -from mixnet.node import MixNode, NodeAddress, PacketPayloadQueue, PacketQueue +from mixnet.node import NodeAddress, PacketPayloadQueue, PacketQueue from mixnet.packet import PacketBuilder from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec +from mixnet.test_mixnet import TestMixnet from mixnet.test_utils import with_test_timeout -from mixnet.utils import random_bytes -class TestMixNodeRunner(IsolatedAsyncioTestCase): +class TestMixNodeRunner(TestMixnet): @with_test_timeout(180) async def test_mixnode_runner_emission_rate(self): """ @@ -26,11 +21,11 @@ class TestMixNodeRunner(IsolatedAsyncioTestCase): and if processing is delayed according to an exponential distribution with a rate `mu`, the rate of outputs should be `lambda`. """ - mixnet, topology = self.init() + mixnet, _ = self.init() inbound_socket: PacketQueue = asyncio.Queue() outbound_socket: PacketPayloadQueue = asyncio.Queue() - packet, route = PacketBuilder.real(b"msg", mixnet, topology).next() + packet, route = PacketBuilder.real(b"msg", mixnet).next() delay_rate_per_min = 30 # mu (= 2s delay on average) # Start only the first mix node for testing @@ -107,18 +102,3 @@ class TestMixNodeRunner(IsolatedAsyncioTestCase): await asyncio.sleep(poisson_interval_sec(rate_per_min)) await inbound_socket.put((node_addr, packet)) await sent_packet_queue.put((node_addr, packet)) - - @staticmethod - def init() -> Tuple[Mixnet, MixnetTopology]: - mixnet = Mixnet( - [ - MixNode( - generate_bls(), - X25519PrivateKey.generate(), - random_bytes(32), - ) - for _ in range(12) - ] - ) - topology = mixnet.build_topology(b"entropy", 3, 3) - return mixnet, topology diff --git a/mixnet/test_packet.py b/mixnet/test_packet.py index e306e3a..576f392 100644 --- a/mixnet/test_packet.py +++ b/mixnet/test_packet.py @@ -1,11 +1,7 @@ -from typing import List, Tuple -from unittest import TestCase +from typing import List -from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey from pysphinx.sphinx import ProcessedFinalHopPacket, SphinxPacket -from mixnet.bls import generate_bls -from mixnet.mixnet import Mixnet, MixnetTopology from mixnet.node import MixNode from mixnet.packet import ( Fragment, @@ -13,15 +9,16 @@ from mixnet.packet import ( MessageReconstructor, PacketBuilder, ) +from mixnet.test_mixnet import TestMixnet from mixnet.utils import random_bytes -class TestPacket(TestCase): +class TestPacket(TestMixnet): def test_real_packet(self): - mixnet, topology = self.init() + mixnet, _ = self.init() msg = random_bytes(3500) - builder = PacketBuilder.real(msg, mixnet, topology) + builder = PacketBuilder.real(msg, mixnet) packet0, route0 = builder.next() packet1, route1 = builder.next() packet2, route2 = builder.next() @@ -46,10 +43,10 @@ class TestPacket(TestCase): ) def test_cover_packet(self): - mixnet, topology = self.init() + mixnet, _ = self.init() msg = b"cover" - builder = PacketBuilder.drop_cover(msg, mixnet, topology) + builder = PacketBuilder.drop_cover(msg, mixnet) packet, route = builder.next() self.assertRaises(StopIteration, builder.next) @@ -61,21 +58,6 @@ class TestPacket(TestCase): (MessageFlag.MESSAGE_FLAG_DROP_COVER, msg), ) - @staticmethod - def init() -> Tuple[Mixnet, MixnetTopology]: - mixnet = Mixnet( - [ - MixNode( - generate_bls(), - X25519PrivateKey.generate(), - random_bytes(32), - ) - for _ in range(12) - ] - ) - topology = mixnet.build_topology(b"entropy", 3, 3) - return mixnet, topology - @staticmethod def process_packet(packet: SphinxPacket, route: List[MixNode]) -> Fragment: processed = packet.process(route[0].encryption_private_key) diff --git a/mixnet/test_robustness.py b/mixnet/test_robustness.py new file mode 100644 index 0000000..7f796fc --- /dev/null +++ b/mixnet/test_robustness.py @@ -0,0 +1,33 @@ +from unittest import TestCase + +from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + +from mixnet.bls import generate_bls +from mixnet.mixnet import Mixnet, MixnetTopologySize, MixNode +from mixnet.robustness import Robustness +from mixnet.utils import random_bytes + + +class TestRobustness(TestCase): + def test_build_topology(self): + robustness = Robustness( + [ + MixNode( + generate_bls(), + X25519PrivateKey.generate(), + random_bytes(32), + ) + for _ in range(12) + ], + MixnetTopologySize(3, 3), + Mixnet(), + ) + + topology = robustness.build_topology(b"entropy") + self.assertEqual( + len(topology.layers), robustness.mixnet_topology_size.num_layers + ) + for layer in topology.layers: + self.assertEqual( + len(layer), robustness.mixnet_topology_size.num_mixnodes_per_layer + )