diff --git a/mixnet/README.md b/mixnet/README.md deleted file mode 100644 index 0cc18d9..0000000 --- a/mixnet/README.md +++ /dev/null @@ -1,21 +0,0 @@ -# Mixnet Specification - -This is the executable specification of Mixnet, which can be used as a networking layer of the Nomos network. - -![](structure.png) - -## Public Components - -- [`mixnet.py`](mixnet.py): A public interface of the Mixnet layer, which can be used by upper layers -- [`robustness.py`](robustness.py): A public interface of the Robustness layer, which can be on top of the Mixnet layer and used by upper layers - -## Private Components - -There are two primary components in the Mixnet layer. - -- [`client.py`](client.py): A mix client interface, which splits a message into Sphinx packets, sends packets to mix nodes, and receives messages via gossip. Also, this emits cover packets periodically. -- [`node.py`](node.py): A mix node interface, which receives Sphinx packets from other mix nodes, processes packets, and forwards packets to other mix nodes. This works only when selected by the topology construction. - -Each component receives a new topology from the Robustness layer. - -There is no interaction between mix client and mix node components. diff --git a/mixnet/bls.py b/mixnet/bls.py deleted file mode 100644 index a4278b0..0000000 --- a/mixnet/bls.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import TypeAlias - -import blspy - -from mixnet.utils import random_bytes - -BlsPrivateKey: TypeAlias = blspy.PrivateKey -BlsPublicKey: TypeAlias = blspy.G1Element - - -def generate_bls() -> BlsPrivateKey: - seed = random_bytes(32) - return blspy.BasicSchemeMPL.key_gen(seed) diff --git a/mixnet/client.py b/mixnet/client.py deleted file mode 100644 index 8d21cf3..0000000 --- a/mixnet/client.py +++ /dev/null @@ -1,118 +0,0 @@ -from __future__ import annotations - -import asyncio -from contextlib import suppress -from typing import Self - -from mixnet.config import MixClientConfig, MixnetTopology -from mixnet.node import PacketQueue -from mixnet.packet import PacketBuilder -from mixnet.poisson import poisson_interval_sec - - -class MixClient: - config: MixClientConfig - real_packet_queue: PacketQueue - outbound_socket: PacketQueue - task: asyncio.Task # A reference just to prevent task from being garbage collected - - @classmethod - async def new( - cls, - config: MixClientConfig, - ) -> Self: - self = cls() - self.config = config - self.real_packet_queue = asyncio.Queue() - self.outbound_socket = asyncio.Queue() - self.task = asyncio.create_task(self.__run()) - return self - - def set_topology(self, topology: MixnetTopology) -> None: - """ - Replace the old topology with the new topology received - - In real implementations, this method may be integrated in a long-running task. - Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment. - """ - self.config.topology = topology - - # Only for testing - def get_topology(self) -> MixnetTopology: - return self.config.topology - - async def send_message(self, msg: bytes) -> None: - packets_and_routes = PacketBuilder.build_real_packets(msg, self.config.topology) - for packet, route in packets_and_routes: - await self.real_packet_queue.put((route[0].addr, packet)) - - def subscribe_messages(self) -> "asyncio.Queue[bytes]": - """ - Subscribe messages, which went through mix nodes and were broadcasted via gossip - """ - return asyncio.Queue() - - async def __run(self): - """ - Emit packets at the Poisson emission_rate_per_min. - - If a real packet is scheduled to be sent, this thread sends the real packet to the mixnet, - and schedules redundant real packets to be emitted in the next turns. - - If no real packet is not scheduled, this thread emits a cover packet according to the emission_rate_per_min. - """ - - redundant_real_packet_queue: PacketQueue = asyncio.Queue() - - emission_notifier_queue = asyncio.Queue() - _ = asyncio.create_task( - self.__emission_notifier( - self.config.emission_rate_per_min, emission_notifier_queue - ) - ) - - while True: - # Wait until the next emission time - _ = await emission_notifier_queue.get() - try: - await self.__emit(self.config.redundancy, redundant_real_packet_queue) - finally: - # Python convention: indicate that the previously enqueued task has been processed - emission_notifier_queue.task_done() - - async def __emit( - self, - redundancy: int, # b in the spec - redundant_real_packet_queue: PacketQueue, - ): - if not redundant_real_packet_queue.empty(): - addr, packet = redundant_real_packet_queue.get_nowait() - await self.outbound_socket.put((addr, packet)) - return - - if not self.real_packet_queue.empty(): - addr, packet = self.real_packet_queue.get_nowait() - # Schedule redundant real packets - for _ in range(redundancy - 1): - redundant_real_packet_queue.put_nowait((addr, packet)) - await self.outbound_socket.put((addr, packet)) - - packets_and_routes = PacketBuilder.build_drop_cover_packets( - b"drop cover", self.config.topology - ) - # We have a for loop here, but we expect that the total num of packets is 1 - # because the dummy message is short. - for packet, route in packets_and_routes: - await self.outbound_socket.put((route[0].addr, packet)) - - async def __emission_notifier( - self, emission_rate_per_min: int, queue: asyncio.Queue - ): - while True: - await asyncio.sleep(poisson_interval_sec(emission_rate_per_min)) - queue.put_nowait(None) - - async def cancel(self) -> None: - self.task.cancel() - with suppress(asyncio.CancelledError): - await self.task diff --git a/mixnet/config.py b/mixnet/config.py index 5b1a2d1..e09983d 100644 --- a/mixnet/config.py +++ b/mixnet/config.py @@ -2,110 +2,56 @@ from __future__ import annotations import random from dataclasses import dataclass -from typing import List, TypeAlias +from typing import List from cryptography.hazmat.primitives.asymmetric.x25519 import ( X25519PrivateKey, X25519PublicKey, ) -from pysphinx.node import Node - -from mixnet.bls import BlsPrivateKey, BlsPublicKey -from mixnet.fisheryates import FisherYates +from pysphinx.sphinx import Node as SphinxNode @dataclass class MixnetConfig: - topology_config: MixnetTopologyConfig - mixclient_config: MixClientConfig - mixnode_config: MixNodeConfig + node_configs: List[NodeConfig] + membership: MixMembership @dataclass -class MixnetTopologyConfig: - mixnode_candidates: List[MixNodeInfo] - size: MixnetTopologySize - entropy: bytes +class NodeConfig: + private_key: X25519PrivateKey + transmission_rate_per_sec: int # Global Transmission Rate @dataclass -class MixClientConfig: - emission_rate_per_min: int # Poisson rate parameter: lambda - redundancy: int - topology: MixnetTopology +class MixMembership: + nodes: List[NodeInfo] - -@dataclass -class MixNodeConfig: - encryption_private_key: X25519PrivateKey - delay_rate_per_min: int # Poisson rate parameter: mu - - -@dataclass -class MixnetTopology: - # In production, this can be a 1-D array, which is accessible by indexes. - # Here, we use a 2-D array for readability. - layers: List[List[MixNodeInfo]] - - def __init__( - self, - config: MixnetTopologyConfig, - ) -> None: - """ - Build a new topology deterministically using an entropy and a given set of candidates. - """ - shuffled = FisherYates.shuffle(config.mixnode_candidates, config.entropy) - sampled = shuffled[: config.size.num_total_mixnodes()] - - layers = [] - for layer_id in range(config.size.num_layers): - start = layer_id * config.size.num_mixnodes_per_layer - layer = sampled[start : start + config.size.num_mixnodes_per_layer] - layers.append(layer) - self.layers = layers - - def generate_route(self, mix_destination: MixNodeInfo) -> list[MixNodeInfo]: + def generate_route(self, num_hops: int, last_mix: NodeInfo) -> list[NodeInfo]: """ Generate a mix route for a Sphinx packet. The pre-selected mix_destination is used as a last mix node in the route, so that associated packets can be merged together into a original message. """ - route = [random.choice(layer) for layer in self.layers[:-1]] - route.append(mix_destination) + route = [self.choose() for _ in range(num_hops - 1)] + route.append(last_mix) return route - def choose_mix_destination(self) -> MixNodeInfo: + def choose(self) -> NodeInfo: """ - Choose a mix node from the last mix layer as a mix destination - that will reconstruct a message from Sphinx packets. + Choose a mix node as a mix destination that will reconstruct a message from Sphinx packets. """ - return random.choice(self.layers[-1]) + return random.choice(self.nodes) @dataclass -class MixnetTopologySize: - num_layers: int - num_mixnodes_per_layer: int +class NodeInfo: + private_key: X25519PrivateKey - def num_total_mixnodes(self) -> int: - return self.num_layers * self.num_mixnodes_per_layer + def public_key(self) -> X25519PublicKey: + return self.private_key.public_key() - -# 32-byte that represents an IP address and a port of a mix node. -NodeAddress: TypeAlias = bytes - - -@dataclass -class MixNodeInfo: - identity_private_key: BlsPrivateKey - encryption_private_key: X25519PrivateKey - addr: NodeAddress - - def identity_public_key(self) -> BlsPublicKey: - return self.identity_private_key.get_g1() - - def encryption_public_key(self) -> X25519PublicKey: - return self.encryption_private_key.public_key() - - def sphinx_node(self) -> Node: - return Node(self.encryption_private_key, self.addr) + def sphinx_node(self) -> SphinxNode: + # TODO: Use a pre-signed incentive tx, instead of NodeAddress + dummy_node_addr = bytes(32) + return SphinxNode(self.private_key, dummy_node_addr) diff --git a/mixnet/fisheryates.py b/mixnet/fisheryates.py deleted file mode 100644 index 70c92ff..0000000 --- a/mixnet/fisheryates.py +++ /dev/null @@ -1,21 +0,0 @@ -import random -from typing import List - - -class FisherYates: - @staticmethod - def shuffle(elements: List, entropy: bytes) -> List: - """ - Fisher-Yates shuffling algorithm. - In Python, random.shuffle implements the Fisher-Yates shuffling. - https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle - https://softwareengineering.stackexchange.com/a/215780 - :param elements: elements to be shuffled - :param entropy: a seed for deterministic sampling - """ - out = elements.copy() - random.seed(a=entropy, version=2) - random.shuffle(out) - # reset seed - random.seed() - return out diff --git a/mixnet/mixnet.py b/mixnet/mixnet.py deleted file mode 100644 index dedc9ef..0000000 --- a/mixnet/mixnet.py +++ /dev/null @@ -1,62 +0,0 @@ -from __future__ import annotations - -import asyncio -from contextlib import suppress -from typing import Self, TypeAlias - -from mixnet.client import MixClient -from mixnet.config import MixnetConfig, MixnetTopology, MixnetTopologyConfig -from mixnet.node import MixNode - -EntropyQueue: TypeAlias = "asyncio.Queue[bytes]" - - -class Mixnet: - topology_config: MixnetTopologyConfig - - mixclient: MixClient - mixnode: MixNode - entropy_queue: EntropyQueue - task: asyncio.Task # A reference just to prevent task from being garbage collected - - @classmethod - async def new( - cls, - config: MixnetConfig, - entropy_queue: EntropyQueue, - ) -> Self: - self = cls() - self.topology_config = config.topology_config - self.mixclient = await MixClient.new(config.mixclient_config) - self.mixnode = await MixNode.new(config.mixnode_config) - self.entropy_queue = entropy_queue - self.task = asyncio.create_task(self.__consume_entropy()) - return self - - async def publish_message(self, msg: bytes) -> None: - await self.mixclient.send_message(msg) - - def subscribe_messages(self) -> "asyncio.Queue[bytes]": - return self.mixclient.subscribe_messages() - - async def __consume_entropy( - self, - ) -> None: - while True: - entropy = await self.entropy_queue.get() - self.topology_config.entropy = entropy - - topology = MixnetTopology(self.topology_config) - self.mixclient.set_topology(topology) - - async def cancel(self) -> None: - self.task.cancel() - with suppress(asyncio.CancelledError): - await self.task - - await self.mixclient.cancel() - await self.mixnode.cancel() - - # Only for testing - def get_topology(self) -> MixnetTopology: - return self.mixclient.get_topology() diff --git a/mixnet/node.py b/mixnet/node.py index 8ab4b77..4c931ef 100644 --- a/mixnet/node.py +++ b/mixnet/node.py @@ -1,107 +1,141 @@ from __future__ import annotations import asyncio -from contextlib import suppress -from typing import Self, Tuple, TypeAlias +from typing import Awaitable, Callable, TypeAlias -from cryptography.hazmat.primitives.asymmetric.x25519 import ( - X25519PrivateKey, -) +from pysphinx.payload import DEFAULT_PAYLOAD_SIZE from pysphinx.sphinx import ( Payload, ProcessedFinalHopPacket, ProcessedForwardHopPacket, SphinxPacket, - UnknownHeaderTypeError, ) -from mixnet.config import MixNodeConfig, NodeAddress -from mixnet.poisson import poisson_interval_sec +from mixnet.config import MixMembership, NodeConfig +from mixnet.packet import Fragment, MessageFlag, MessageReconstructor, PacketBuilder -PacketQueue: TypeAlias = "asyncio.Queue[Tuple[NodeAddress, SphinxPacket]]" -PacketPayloadQueue: TypeAlias = ( - "asyncio.Queue[Tuple[NodeAddress, SphinxPacket | Payload]]" -) +NetworkPacket: TypeAlias = "SphinxPacket | bytes" +NetworkPacketQueue: TypeAlias = "asyncio.Queue[NetworkPacket]" +Connection: TypeAlias = NetworkPacketQueue +BroadcastChannel: TypeAlias = "asyncio.Queue[bytes]" -class MixNode: - """ - A class handling incoming packets with delays +class Node: + config: NodeConfig + membership: MixMembership + mixgossip_channel: MixGossipChannel + reconstructor: MessageReconstructor + broadcast_channel: BroadcastChannel - This class is defined separated with the MixNode class, - in order to define the MixNode as a simple dataclass for clarity. - """ - - config: MixNodeConfig - inbound_socket: PacketQueue - outbound_socket: PacketPayloadQueue - task: asyncio.Task # A reference just to prevent task from being garbage collected - - @classmethod - async def new( - cls, - config: MixNodeConfig, - ) -> Self: - self = cls() + def __init__(self, config: NodeConfig, membership: MixMembership): self.config = config - self.inbound_socket = asyncio.Queue() - self.outbound_socket = asyncio.Queue() - self.task = asyncio.create_task(self.__run()) - return self + self.membership = membership + self.mixgossip_channel = MixGossipChannel(self.__process_sphinx_packet) + self.reconstructor = MessageReconstructor() + self.broadcast_channel = asyncio.Queue() - async def __run(self): - """ - Read SphinxPackets from inbound socket and spawn a thread for each packet to process it. + async def __process_sphinx_packet( + self, packet: SphinxPacket + ) -> NetworkPacket | None: + try: + processed = packet.process(self.config.private_key) + match processed: + case ProcessedForwardHopPacket(): + return processed.next_packet + case ProcessedFinalHopPacket(): + await self.__process_sphinx_payload(processed.payload) + except Exception: + # Return SphinxPacket as it is, if this node cannot unwrap it. + return packet - This thread approximates a M/M/inf queue. - """ + async def __process_sphinx_payload(self, payload: Payload): + msg_with_flag = self.reconstructor.add( + Fragment.from_bytes(payload.recover_plain_playload()) + ) + if msg_with_flag is not None: + flag, msg = PacketBuilder.parse_msg_and_flag(msg_with_flag) + if flag == MessageFlag.MESSAGE_FLAG_REAL: + await self.broadcast_channel.put(msg) + def connect(self, peer: Node): + conn = asyncio.Queue() + peer.mixgossip_channel.add_inbound(conn) + self.mixgossip_channel.add_outbound( + MixOutboundConnection(conn, self.config.transmission_rate_per_sec) + ) + + async def send_message(self, msg: bytes): + for packet, _ in PacketBuilder.build_real_packets(msg, self.membership): + await self.mixgossip_channel.gossip(packet) + + +class MixGossipChannel: + inbound_conns: list[Connection] + outbound_conns: list[MixOutboundConnection] + handler: Callable[[SphinxPacket], Awaitable[NetworkPacket | None]] + + def __init__( + self, + handler: Callable[[SphinxPacket], Awaitable[NetworkPacket | None]], + ): + self.inbound_conns = [] + self.outbound_conns = [] + self.handler = handler # A set just for gathering a reference of tasks to prevent them from being garbage collected. # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task self.tasks = set() + def add_inbound(self, conn: Connection): + self.inbound_conns.append(conn) + task = asyncio.create_task(self.__process_inbound_conn(conn)) + self.tasks.add(task) + # To discard the task from the set automatically when it is done. + task.add_done_callback(self.tasks.discard) + + def add_outbound(self, conn: MixOutboundConnection): + self.outbound_conns.append(conn) + + async def __process_inbound_conn(self, conn: Connection): while True: - _, packet = await self.inbound_socket.get() - task = asyncio.create_task( - self.__process_packet( - packet, - self.config.encryption_private_key, - self.config.delay_rate_per_min, - ) - ) - self.tasks.add(task) - # To discard the task from the set automatically when it is done. - task.add_done_callback(self.tasks.discard) + elem = await conn.get() + if isinstance(elem, bytes): + assert elem == build_noise_packet() + # Drop packet + continue + elif isinstance(elem, SphinxPacket): + net_packet = await self.handler(elem) + if net_packet is not None: + await self.gossip(net_packet) - async def __process_packet( - self, - packet: SphinxPacket, - encryption_private_key: X25519PrivateKey, - delay_rate_per_min: int, # Poisson rate parameter: mu - ): - """ - Process a single packet with a delay that follows exponential distribution, - and forward it to the next mix node or the mix destination + async def gossip(self, packet: NetworkPacket): + for conn in self.outbound_conns: + await conn.send(packet) - This thread is a single server (worker) in a M/M/inf queue that MixNodeRunner approximates. - """ - delay_sec = poisson_interval_sec(delay_rate_per_min) - await asyncio.sleep(delay_sec) - processed = packet.process(encryption_private_key) - match processed: - case ProcessedForwardHopPacket(): - await self.outbound_socket.put( - (processed.next_node_address, processed.next_packet) - ) - case ProcessedFinalHopPacket(): - await self.outbound_socket.put( - (processed.destination_node_address, processed.payload) - ) - case _: - raise UnknownHeaderTypeError +class MixOutboundConnection: + queue: NetworkPacketQueue + conn: Connection + transmission_rate_per_sec: int - async def cancel(self) -> None: - self.task.cancel() - with suppress(asyncio.CancelledError): - await self.task + def __init__(self, conn: Connection, transmission_rate_per_sec: int): + self.queue = asyncio.Queue() + self.conn = conn + self.transmission_rate_per_sec = transmission_rate_per_sec + self.task = asyncio.create_task(self.__run()) + + async def __run(self): + while True: + await asyncio.sleep(1 / self.transmission_rate_per_sec) + # TODO: time mixing + if self.queue.empty(): + elem = build_noise_packet() + else: + elem = self.queue.get_nowait() + await self.conn.put(elem) + + async def send(self, elem: NetworkPacket): + await self.queue.put(elem) + + +def build_noise_packet() -> bytes: + return bytes(DEFAULT_PAYLOAD_SIZE) diff --git a/mixnet/packet.py b/mixnet/packet.py index 71fe687..1ac833b 100644 --- a/mixnet/packet.py +++ b/mixnet/packet.py @@ -9,7 +9,7 @@ from typing import Dict, List, Self, Tuple, TypeAlias from pysphinx.payload import Payload from pysphinx.sphinx import SphinxPacket -from mixnet.config import MixnetTopology, MixNodeInfo +from mixnet.config import MixMembership, NodeInfo class MessageFlag(Enum): @@ -23,25 +23,25 @@ class MessageFlag(Enum): class PacketBuilder: @staticmethod def build_real_packets( - message: bytes, topology: MixnetTopology - ) -> List[Tuple[SphinxPacket, List[MixNodeInfo]]]: + message: bytes, membership: MixMembership + ) -> List[Tuple[SphinxPacket, List[NodeInfo]]]: return PacketBuilder.__build_packets( - MessageFlag.MESSAGE_FLAG_REAL, message, topology + MessageFlag.MESSAGE_FLAG_REAL, message, membership ) @staticmethod def build_drop_cover_packets( - message: bytes, topology: MixnetTopology - ) -> List[Tuple[SphinxPacket, List[MixNodeInfo]]]: + message: bytes, membership: MixMembership + ) -> List[Tuple[SphinxPacket, List[NodeInfo]]]: return PacketBuilder.__build_packets( - MessageFlag.MESSAGE_FLAG_DROP_COVER, message, topology + MessageFlag.MESSAGE_FLAG_DROP_COVER, message, membership ) @staticmethod def __build_packets( - flag: MessageFlag, message: bytes, topology: MixnetTopology - ) -> List[Tuple[SphinxPacket, List[MixNodeInfo]]]: - destination = topology.choose_mix_destination() + flag: MessageFlag, message: bytes, membership: MixMembership + ) -> List[Tuple[SphinxPacket, List[NodeInfo]]]: + last_mix = membership.choose() msg_with_flag = flag.bytes() + message # NOTE: We don't encrypt msg_with_flag for destination. @@ -50,11 +50,11 @@ class PacketBuilder: out = [] for fragment in fragment_set.fragments: - route = topology.generate_route(destination) + route = membership.generate_route(3, last_mix) packet = SphinxPacket.build( fragment.bytes(), [mixnode.sphinx_node() for mixnode in route], - destination.sphinx_node(), + last_mix.sphinx_node(), ) out.append((packet, route)) diff --git a/mixnet/poisson.py b/mixnet/poisson.py deleted file mode 100644 index 86a4b66..0000000 --- a/mixnet/poisson.py +++ /dev/null @@ -1,13 +0,0 @@ -import numpy - - -def poisson_interval_sec(rate_per_min: int) -> float: - # If events occur in a Poisson distribution with rate_per_min, - # the interval between events follows the exponential distribution - # with the rate_per_min (i.e. with the scale 1/rate_per_min). - interval_min = numpy.random.exponential(scale=1 / rate_per_min, size=1)[0] - return interval_min * 60 - - -def poisson_mean_interval_sec(rate_per_min: int) -> float: - return 1 / rate_per_min * 60 diff --git a/mixnet/structure.png b/mixnet/structure.png deleted file mode 100644 index 994313c..0000000 Binary files a/mixnet/structure.png and /dev/null differ diff --git a/mixnet/test_client.py b/mixnet/test_client.py deleted file mode 100644 index 4327466..0000000 --- a/mixnet/test_client.py +++ /dev/null @@ -1,45 +0,0 @@ -from datetime import datetime -from unittest import IsolatedAsyncioTestCase - -import numpy - -from mixnet.client import MixClient -from mixnet.poisson import poisson_mean_interval_sec -from mixnet.test_utils import ( - init_mixnet_config, - with_test_timeout, -) -from mixnet.utils import random_bytes - - -class TestMixClient(IsolatedAsyncioTestCase): - @with_test_timeout(100) - async def test_mixclient(self): - config = init_mixnet_config().mixclient_config - config.emission_rate_per_min = 30 - config.redundancy = 3 - - mixclient = await MixClient.new(config) - try: - # Send a 3500-byte msg, expecting that it is split into at least two packets - await mixclient.send_message(random_bytes(3500)) - - # Calculate intervals between packet emissions from the mix client - intervals = [] - ts = datetime.now() - for _ in range(30): - _ = await mixclient.outbound_socket.get() - now = datetime.now() - intervals.append((now - ts).total_seconds()) - ts = now - - # Check if packets were emitted at the Poisson emission_rate - # If emissions follow the Poisson distribution with a rate `lambda`, - # a mean interval between emissions must be `1/lambda`. - self.assertAlmostEqual( - float(numpy.mean(intervals)), - poisson_mean_interval_sec(config.emission_rate_per_min), - delta=1.0, - ) - finally: - await mixclient.cancel() diff --git a/mixnet/test_fisheryates.py b/mixnet/test_fisheryates.py deleted file mode 100644 index a32554c..0000000 --- a/mixnet/test_fisheryates.py +++ /dev/null @@ -1,21 +0,0 @@ -from unittest import TestCase - -from mixnet.fisheryates import FisherYates - - -class TestFisherYates(TestCase): - def test_shuffle(self): - entropy = b"hello" - elems = [1, 2, 3, 4, 5] - - shuffled1 = FisherYates.shuffle(elems, entropy) - self.assertEqual(sorted(elems), sorted(shuffled1)) - - # shuffle again with the same entropy - shuffled2 = FisherYates.shuffle(elems, entropy) - self.assertEqual(shuffled1, shuffled2) - - # shuffle with a different entropy - shuffled3 = FisherYates.shuffle(elems, b"world") - self.assertNotEqual(shuffled1, shuffled3) - self.assertEqual(sorted(elems), sorted(shuffled3)) diff --git a/mixnet/test_mixnet.py b/mixnet/test_mixnet.py deleted file mode 100644 index 9a0e4cb..0000000 --- a/mixnet/test_mixnet.py +++ /dev/null @@ -1,20 +0,0 @@ -import asyncio -from unittest import IsolatedAsyncioTestCase - -from mixnet.mixnet import Mixnet -from mixnet.test_utils import init_mixnet_config - - -class TestMixnet(IsolatedAsyncioTestCase): - async def test_topology_from_robustness(self): - config = init_mixnet_config() - entropy_queue = asyncio.Queue() - - mixnet = await Mixnet.new(config, entropy_queue) - try: - old_topology = config.mixclient_config.topology - await entropy_queue.put(b"new entropy") - await asyncio.sleep(1) - self.assertNotEqual(old_topology, mixnet.get_topology()) - finally: - await mixnet.cancel() diff --git a/mixnet/test_node.py b/mixnet/test_node.py index 26ab0c7..89ddf6a 100644 --- a/mixnet/test_node.py +++ b/mixnet/test_node.py @@ -1,117 +1,37 @@ import asyncio -from datetime import datetime from unittest import IsolatedAsyncioTestCase -import numpy -from pysphinx.sphinx import SphinxPacket - -from mixnet.node import MixNode, NodeAddress, PacketQueue -from mixnet.packet import PacketBuilder -from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec +from mixnet.node import Node from mixnet.test_utils import ( init_mixnet_config, - with_test_timeout, ) -class TestMixNodeRunner(IsolatedAsyncioTestCase): - @with_test_timeout(180) - async def test_mixnode_emission_rate(self): - """ - Test if MixNodeRunner works as a M/M/inf queue. +class TestNode(IsolatedAsyncioTestCase): + async def test_node(self): + config = init_mixnet_config(10) + nodes = [ + Node(node_config, config.membership) for node_config in config.node_configs + ] + for i, node in enumerate(nodes): + node.connect(nodes[(i + 1) % len(nodes)]) - If inputs are arrived at Poisson rate `lambda`, - and if processing is delayed according to an exponential distribution with a rate `mu`, - the rate of outputs should be `lambda`. - """ - config = init_mixnet_config() - config.mixclient_config.emission_rate_per_min = 120 # lambda (= 2msg/sec) - config.mixnode_config.delay_rate_per_min = 30 # mu (= 2s delay on average) + await nodes[0].send_message(b"block selection") - packet, route = PacketBuilder.build_real_packets( - b"msg", config.mixclient_config.topology - )[0] + timeout = 15 + for _ in range(timeout): + broadcasted_msgs = [] + for node in nodes: + if not node.broadcast_channel.empty(): + broadcasted_msgs.append(node.broadcast_channel.get_nowait()) - # Start only the first mix node for testing - config.mixnode_config.encryption_private_key = route[0].encryption_private_key - mixnode = await MixNode.new(config.mixnode_config) - try: - # Send packets to the first mix node in a Poisson distribution - packet_count = 100 - # This queue is just for counting how many packets have been sent so far. - sent_packet_queue: PacketQueue = asyncio.Queue() - sender_task = asyncio.create_task( - self.send_packets( - mixnode.inbound_socket, - packet, - route[0].addr, - packet_count, - config.mixclient_config.emission_rate_per_min, - sent_packet_queue, - ) - ) - try: - # Calculate intervals between outputs and gather num_jobs in the first mix node. - intervals = [] - num_jobs = [] - ts = datetime.now() - for _ in range(packet_count): - _ = await mixnode.outbound_socket.get() - now = datetime.now() - intervals.append((now - ts).total_seconds()) + if len(broadcasted_msgs) == 0: + await asyncio.sleep(1) + else: + # We expect only one node to broadcast the message. + assert len(broadcasted_msgs) == 1 + self.assertEqual(b"block selection", broadcasted_msgs[0]) + return + self.fail("timeout") - # Calculate the current # of jobs staying in the mix node - num_packets_emitted_from_mixnode = len(intervals) - num_packets_sent_to_mixnode = sent_packet_queue.qsize() - num_jobs.append( - num_packets_sent_to_mixnode - num_packets_emitted_from_mixnode - ) - - ts = now - - # Remove the first interval that would be much larger than other intervals, - # because of the delay in mix node. - intervals = intervals[1:] - num_jobs = num_jobs[1:] - - # Check if the emission rate of the first mix node is the same as - # the emission rate of the message sender, but with a delay. - # If outputs follow the Poisson distribution with a rate `lambda`, - # a mean interval between outputs must be `1/lambda`. - self.assertAlmostEqual( - float(numpy.mean(intervals)), - poisson_mean_interval_sec( - config.mixclient_config.emission_rate_per_min - ), - delta=1.0, - ) - # If runner is a M/M/inf queue, - # a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`. - self.assertAlmostEqual( - float(numpy.mean(num_jobs)), - round( - config.mixclient_config.emission_rate_per_min - / config.mixnode_config.delay_rate_per_min - ), - delta=1.5, - ) - finally: - await sender_task - finally: - await mixnode.cancel() - - @staticmethod - async def send_packets( - inbound_socket: PacketQueue, - packet: SphinxPacket, - node_addr: NodeAddress, - cnt: int, - rate_per_min: int, - # For testing purpose, to inform the caller how many packets have been sent to the inbound_socket - sent_packet_queue: PacketQueue, - ): - for _ in range(cnt): - # Since the task is not heavy, just sleep for seconds instead of using emission_notifier - await asyncio.sleep(poisson_interval_sec(rate_per_min)) - await inbound_socket.put((node_addr, packet)) - await sent_packet_queue.put((node_addr, packet)) + # TODO: check noise diff --git a/mixnet/test_packet.py b/mixnet/test_packet.py index d1d517a..81e3a25 100644 --- a/mixnet/test_packet.py +++ b/mixnet/test_packet.py @@ -1,9 +1,10 @@ +from random import randint from typing import List from unittest import TestCase from pysphinx.sphinx import ProcessedFinalHopPacket, SphinxPacket -from mixnet.config import MixNodeInfo +from mixnet.config import NodeInfo from mixnet.packet import ( Fragment, MessageFlag, @@ -11,14 +12,13 @@ from mixnet.packet import ( PacketBuilder, ) from mixnet.test_utils import init_mixnet_config -from mixnet.utils import random_bytes class TestPacket(TestCase): def test_real_packet(self): - topology = init_mixnet_config().mixclient_config.topology - msg = random_bytes(3500) - packets_and_routes = PacketBuilder.build_real_packets(msg, topology) + membership = init_mixnet_config(10).membership + msg = self.random_bytes(3500) + packets_and_routes = PacketBuilder.build_real_packets(msg, membership) self.assertEqual(4, len(packets_and_routes)) reconstructor = MessageReconstructor() @@ -47,9 +47,9 @@ class TestPacket(TestCase): ) def test_cover_packet(self): - topology = init_mixnet_config().mixclient_config.topology + membership = init_mixnet_config(10).membership msg = b"cover" - packets_and_routes = PacketBuilder.build_drop_cover_packets(msg, topology) + packets_and_routes = PacketBuilder.build_drop_cover_packets(msg, membership) self.assertEqual(1, len(packets_and_routes)) reconstructor = MessageReconstructor() @@ -63,16 +63,21 @@ class TestPacket(TestCase): ) @staticmethod - def process_packet(packet: SphinxPacket, route: List[MixNodeInfo]) -> Fragment: - processed = packet.process(route[0].encryption_private_key) + def process_packet(packet: SphinxPacket, route: List[NodeInfo]) -> Fragment: + processed = packet.process(route[0].private_key) if isinstance(processed, ProcessedFinalHopPacket): return Fragment.from_bytes(processed.payload.recover_plain_playload()) else: processed = processed for node in route[1:]: - p = processed.next_packet.process(node.encryption_private_key) + p = processed.next_packet.process(node.private_key) if isinstance(p, ProcessedFinalHopPacket): return Fragment.from_bytes(p.payload.recover_plain_playload()) else: processed = p assert False + + @staticmethod + def random_bytes(size: int) -> bytes: + assert size >= 0 + return bytes([randint(0, 255) for _ in range(size)]) diff --git a/mixnet/test_utils.py b/mixnet/test_utils.py index e3ba260..4ff1d4f 100644 --- a/mixnet/test_utils.py +++ b/mixnet/test_utils.py @@ -1,46 +1,20 @@ -import asyncio - from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey -from mixnet.bls import generate_bls from mixnet.config import ( - MixClientConfig, - MixNodeConfig, + MixMembership, MixnetConfig, - MixNodeInfo, - MixnetTopology, - MixnetTopologyConfig, - MixnetTopologySize, + NodeConfig, + NodeInfo, ) -from mixnet.utils import random_bytes -def with_test_timeout(t): - def wrapper(coroutine): - async def run(*args, **kwargs): - async with asyncio.timeout(t): - return await coroutine(*args, **kwargs) - - return run - - return wrapper - - -def init_mixnet_config() -> MixnetConfig: - topology_config = MixnetTopologyConfig( - [ - MixNodeInfo( - generate_bls(), - X25519PrivateKey.generate(), - random_bytes(32), - ) - for _ in range(12) - ], - MixnetTopologySize(3, 3), - b"entropy", +def init_mixnet_config(num_nodes: int) -> MixnetConfig: + transmission_rate_per_sec = 3 + node_configs = [ + NodeConfig(X25519PrivateKey.generate(), transmission_rate_per_sec) + for _ in range(num_nodes) + ] + membership = MixMembership( + [NodeInfo(node_config.private_key) for node_config in node_configs] ) - mixclient_config = MixClientConfig(30, 3, MixnetTopology(topology_config)) - mixnode_config = MixNodeConfig( - topology_config.mixnode_candidates[0].encryption_private_key, 30 - ) - return MixnetConfig(topology_config, mixclient_config, mixnode_config) + return MixnetConfig(node_configs, membership) diff --git a/mixnet/utils.py b/mixnet/utils.py deleted file mode 100644 index 6b45176..0000000 --- a/mixnet/utils.py +++ /dev/null @@ -1,6 +0,0 @@ -from random import randint - - -def random_bytes(size: int) -> bytes: - assert size >= 0 - return bytes([randint(0, 255) for _ in range(size)])