From 65644fd1d222f914d27f9252bbfbec7ab8e10c56 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:22:16 +0900 Subject: [PATCH] mixnet v2 --- mixnet/bls.py | 13 -- mixnet/client.py | 118 ----------- mixnet/config.py | 99 +++------- mixnet/fisheryates.py | 21 -- mixnet/mixnet.py | 62 ------ mixnet/node.py | 183 ++++++++++-------- mixnet/packet.py | 24 +-- mixnet/poisson.py | 13 -- mixnet/test_client.py | 45 ----- mixnet/test_fisheryates.py | 21 -- mixnet/test_mixnet.py | 20 -- mixnet/test_node.py | 118 +---------- mixnet/test_packet.py | 16 +- mixnet/test_utils.py | 38 ++-- mixnet/v2/sim/adversary.py | 24 ++- .../sim/bulk-attack-2024-06-11T23:03:46.csv | 33 ++++ .../sim/bulk-attack-2024-06-13T16:32:29.csv | 33 ++++ 17 files changed, 254 insertions(+), 627 deletions(-) delete mode 100644 mixnet/bls.py delete mode 100644 mixnet/client.py delete mode 100644 mixnet/fisheryates.py delete mode 100644 mixnet/mixnet.py delete mode 100644 mixnet/poisson.py delete mode 100644 mixnet/test_client.py delete mode 100644 mixnet/test_fisheryates.py delete mode 100644 mixnet/test_mixnet.py create mode 100644 mixnet/v2/sim/bulk-attack-2024-06-11T23:03:46.csv create mode 100644 mixnet/v2/sim/bulk-attack-2024-06-13T16:32:29.csv diff --git a/mixnet/bls.py b/mixnet/bls.py deleted file mode 100644 index a4278b0..0000000 --- a/mixnet/bls.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import TypeAlias - -import blspy - -from mixnet.utils import random_bytes - -BlsPrivateKey: TypeAlias = blspy.PrivateKey -BlsPublicKey: TypeAlias = blspy.G1Element - - -def generate_bls() -> BlsPrivateKey: - seed = random_bytes(32) - return blspy.BasicSchemeMPL.key_gen(seed) diff --git a/mixnet/client.py b/mixnet/client.py deleted file mode 100644 index 8d21cf3..0000000 --- a/mixnet/client.py +++ /dev/null @@ -1,118 +0,0 @@ -from __future__ import annotations - -import asyncio -from contextlib import suppress -from typing import Self - -from mixnet.config import MixClientConfig, MixnetTopology -from mixnet.node import PacketQueue -from mixnet.packet import PacketBuilder -from mixnet.poisson import poisson_interval_sec - - -class MixClient: - config: MixClientConfig - real_packet_queue: PacketQueue - outbound_socket: PacketQueue - task: asyncio.Task # A reference just to prevent task from being garbage collected - - @classmethod - async def new( - cls, - config: MixClientConfig, - ) -> Self: - self = cls() - self.config = config - self.real_packet_queue = asyncio.Queue() - self.outbound_socket = asyncio.Queue() - self.task = asyncio.create_task(self.__run()) - return self - - def set_topology(self, topology: MixnetTopology) -> None: - """ - Replace the old topology with the new topology received - - In real implementations, this method may be integrated in a long-running task. - Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment. - """ - self.config.topology = topology - - # Only for testing - def get_topology(self) -> MixnetTopology: - return self.config.topology - - async def send_message(self, msg: bytes) -> None: - packets_and_routes = PacketBuilder.build_real_packets(msg, self.config.topology) - for packet, route in packets_and_routes: - await self.real_packet_queue.put((route[0].addr, packet)) - - def subscribe_messages(self) -> "asyncio.Queue[bytes]": - """ - Subscribe messages, which went through mix nodes and were broadcasted via gossip - """ - return asyncio.Queue() - - async def __run(self): - """ - Emit packets at the Poisson emission_rate_per_min. - - If a real packet is scheduled to be sent, this thread sends the real packet to the mixnet, - and schedules redundant real packets to be emitted in the next turns. - - If no real packet is not scheduled, this thread emits a cover packet according to the emission_rate_per_min. - """ - - redundant_real_packet_queue: PacketQueue = asyncio.Queue() - - emission_notifier_queue = asyncio.Queue() - _ = asyncio.create_task( - self.__emission_notifier( - self.config.emission_rate_per_min, emission_notifier_queue - ) - ) - - while True: - # Wait until the next emission time - _ = await emission_notifier_queue.get() - try: - await self.__emit(self.config.redundancy, redundant_real_packet_queue) - finally: - # Python convention: indicate that the previously enqueued task has been processed - emission_notifier_queue.task_done() - - async def __emit( - self, - redundancy: int, # b in the spec - redundant_real_packet_queue: PacketQueue, - ): - if not redundant_real_packet_queue.empty(): - addr, packet = redundant_real_packet_queue.get_nowait() - await self.outbound_socket.put((addr, packet)) - return - - if not self.real_packet_queue.empty(): - addr, packet = self.real_packet_queue.get_nowait() - # Schedule redundant real packets - for _ in range(redundancy - 1): - redundant_real_packet_queue.put_nowait((addr, packet)) - await self.outbound_socket.put((addr, packet)) - - packets_and_routes = PacketBuilder.build_drop_cover_packets( - b"drop cover", self.config.topology - ) - # We have a for loop here, but we expect that the total num of packets is 1 - # because the dummy message is short. - for packet, route in packets_and_routes: - await self.outbound_socket.put((route[0].addr, packet)) - - async def __emission_notifier( - self, emission_rate_per_min: int, queue: asyncio.Queue - ): - while True: - await asyncio.sleep(poisson_interval_sec(emission_rate_per_min)) - queue.put_nowait(None) - - async def cancel(self) -> None: - self.task.cancel() - with suppress(asyncio.CancelledError): - await self.task diff --git a/mixnet/config.py b/mixnet/config.py index 5b1a2d1..4bfa1c3 100644 --- a/mixnet/config.py +++ b/mixnet/config.py @@ -2,110 +2,57 @@ from __future__ import annotations import random from dataclasses import dataclass -from typing import List, TypeAlias +from typing import List from cryptography.hazmat.primitives.asymmetric.x25519 import ( X25519PrivateKey, X25519PublicKey, ) -from pysphinx.node import Node - -from mixnet.bls import BlsPrivateKey, BlsPublicKey -from mixnet.fisheryates import FisherYates +from pysphinx.sphinx import Node as SphinxNode @dataclass class MixnetConfig: - topology_config: MixnetTopologyConfig - mixclient_config: MixClientConfig - mixnode_config: MixNodeConfig + node_configs: List[NodeConfig] + membership: MixMembership @dataclass -class MixnetTopologyConfig: - mixnode_candidates: List[MixNodeInfo] - size: MixnetTopologySize - entropy: bytes +class NodeConfig: + private_key: X25519PrivateKey + conn_degree: int # Connection Degree (default: 6) + transmission_rate_per_sec: int # Global Transmission Rate @dataclass -class MixClientConfig: - emission_rate_per_min: int # Poisson rate parameter: lambda - redundancy: int - topology: MixnetTopology +class MixMembership: + nodes: List[NodePublicInfo] - -@dataclass -class MixNodeConfig: - encryption_private_key: X25519PrivateKey - delay_rate_per_min: int # Poisson rate parameter: mu - - -@dataclass -class MixnetTopology: - # In production, this can be a 1-D array, which is accessible by indexes. - # Here, we use a 2-D array for readability. - layers: List[List[MixNodeInfo]] - - def __init__( - self, - config: MixnetTopologyConfig, - ) -> None: - """ - Build a new topology deterministically using an entropy and a given set of candidates. - """ - shuffled = FisherYates.shuffle(config.mixnode_candidates, config.entropy) - sampled = shuffled[: config.size.num_total_mixnodes()] - - layers = [] - for layer_id in range(config.size.num_layers): - start = layer_id * config.size.num_mixnodes_per_layer - layer = sampled[start : start + config.size.num_mixnodes_per_layer] - layers.append(layer) - self.layers = layers - - def generate_route(self, mix_destination: MixNodeInfo) -> list[MixNodeInfo]: + def generate_route( + self, num_hops: int, last_mix: NodePublicInfo + ) -> list[NodePublicInfo]: """ Generate a mix route for a Sphinx packet. The pre-selected mix_destination is used as a last mix node in the route, so that associated packets can be merged together into a original message. """ - route = [random.choice(layer) for layer in self.layers[:-1]] - route.append(mix_destination) + route = [self.choose() for _ in range(num_hops - 1)] + route.append(last_mix) return route - def choose_mix_destination(self) -> MixNodeInfo: + def choose(self) -> NodePublicInfo: """ - Choose a mix node from the last mix layer as a mix destination - that will reconstruct a message from Sphinx packets. + Choose a mix node as a mix destination that will reconstruct a message from Sphinx packets. """ - return random.choice(self.layers[-1]) + return random.choice(self.nodes) @dataclass -class MixnetTopologySize: - num_layers: int - num_mixnodes_per_layer: int - - def num_total_mixnodes(self) -> int: - return self.num_layers * self.num_mixnodes_per_layer - - -# 32-byte that represents an IP address and a port of a mix node. -NodeAddress: TypeAlias = bytes - - -@dataclass -class MixNodeInfo: - identity_private_key: BlsPrivateKey - encryption_private_key: X25519PrivateKey - addr: NodeAddress - - def identity_public_key(self) -> BlsPublicKey: - return self.identity_private_key.get_g1() +class NodePublicInfo: + private_key: X25519PrivateKey def encryption_public_key(self) -> X25519PublicKey: - return self.encryption_private_key.public_key() + return self.private_key.public_key() - def sphinx_node(self) -> Node: - return Node(self.encryption_private_key, self.addr) + def sphinx_node(self) -> SphinxNode: + return SphinxNode(self.private_key, bytes(32)) diff --git a/mixnet/fisheryates.py b/mixnet/fisheryates.py deleted file mode 100644 index 70c92ff..0000000 --- a/mixnet/fisheryates.py +++ /dev/null @@ -1,21 +0,0 @@ -import random -from typing import List - - -class FisherYates: - @staticmethod - def shuffle(elements: List, entropy: bytes) -> List: - """ - Fisher-Yates shuffling algorithm. - In Python, random.shuffle implements the Fisher-Yates shuffling. - https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle - https://softwareengineering.stackexchange.com/a/215780 - :param elements: elements to be shuffled - :param entropy: a seed for deterministic sampling - """ - out = elements.copy() - random.seed(a=entropy, version=2) - random.shuffle(out) - # reset seed - random.seed() - return out diff --git a/mixnet/mixnet.py b/mixnet/mixnet.py deleted file mode 100644 index dedc9ef..0000000 --- a/mixnet/mixnet.py +++ /dev/null @@ -1,62 +0,0 @@ -from __future__ import annotations - -import asyncio -from contextlib import suppress -from typing import Self, TypeAlias - -from mixnet.client import MixClient -from mixnet.config import MixnetConfig, MixnetTopology, MixnetTopologyConfig -from mixnet.node import MixNode - -EntropyQueue: TypeAlias = "asyncio.Queue[bytes]" - - -class Mixnet: - topology_config: MixnetTopologyConfig - - mixclient: MixClient - mixnode: MixNode - entropy_queue: EntropyQueue - task: asyncio.Task # A reference just to prevent task from being garbage collected - - @classmethod - async def new( - cls, - config: MixnetConfig, - entropy_queue: EntropyQueue, - ) -> Self: - self = cls() - self.topology_config = config.topology_config - self.mixclient = await MixClient.new(config.mixclient_config) - self.mixnode = await MixNode.new(config.mixnode_config) - self.entropy_queue = entropy_queue - self.task = asyncio.create_task(self.__consume_entropy()) - return self - - async def publish_message(self, msg: bytes) -> None: - await self.mixclient.send_message(msg) - - def subscribe_messages(self) -> "asyncio.Queue[bytes]": - return self.mixclient.subscribe_messages() - - async def __consume_entropy( - self, - ) -> None: - while True: - entropy = await self.entropy_queue.get() - self.topology_config.entropy = entropy - - topology = MixnetTopology(self.topology_config) - self.mixclient.set_topology(topology) - - async def cancel(self) -> None: - self.task.cancel() - with suppress(asyncio.CancelledError): - await self.task - - await self.mixclient.cancel() - await self.mixnode.cancel() - - # Only for testing - def get_topology(self) -> MixnetTopology: - return self.mixclient.get_topology() diff --git a/mixnet/node.py b/mixnet/node.py index 8ab4b77..ced0576 100644 --- a/mixnet/node.py +++ b/mixnet/node.py @@ -1,107 +1,134 @@ from __future__ import annotations import asyncio -from contextlib import suppress -from typing import Self, Tuple, TypeAlias +from typing import Awaitable, Callable, TypeAlias -from cryptography.hazmat.primitives.asymmetric.x25519 import ( - X25519PrivateKey, -) +from pysphinx.payload import DEFAULT_PAYLOAD_SIZE from pysphinx.sphinx import ( Payload, ProcessedFinalHopPacket, ProcessedForwardHopPacket, SphinxPacket, - UnknownHeaderTypeError, ) -from mixnet.config import MixNodeConfig, NodeAddress -from mixnet.poisson import poisson_interval_sec +from mixnet.config import MixMembership, NodeConfig +from mixnet.packet import Fragment, MessageFlag, MessageReconstructor, PacketBuilder -PacketQueue: TypeAlias = "asyncio.Queue[Tuple[NodeAddress, SphinxPacket]]" -PacketPayloadQueue: TypeAlias = ( - "asyncio.Queue[Tuple[NodeAddress, SphinxPacket | Payload]]" -) +NetworkPacket: TypeAlias = "SphinxPacket | bytes" +NetworkPacketQueue: TypeAlias = "asyncio.Queue[NetworkPacket]" +Connection: TypeAlias = NetworkPacketQueue +BroadcastChannel: TypeAlias = "asyncio.Queue[bytes]" -class MixNode: - """ - A class handling incoming packets with delays +class Node: + config: NodeConfig + membership: MixMembership + mixgossip_channel: MixGossipChannel + reconstructor: MessageReconstructor + broadcast_channel: BroadcastChannel - This class is defined separated with the MixNode class, - in order to define the MixNode as a simple dataclass for clarity. - """ - - config: MixNodeConfig - inbound_socket: PacketQueue - outbound_socket: PacketPayloadQueue - task: asyncio.Task # A reference just to prevent task from being garbage collected - - @classmethod - async def new( - cls, - config: MixNodeConfig, - ) -> Self: - self = cls() + def __init__(self, config: NodeConfig, membership: MixMembership): self.config = config - self.inbound_socket = asyncio.Queue() - self.outbound_socket = asyncio.Queue() - self.task = asyncio.create_task(self.__run()) - return self + self.membership = membership + self.mixgossip_channel = MixGossipChannel(config, self.__process_sphinx_packet) + self.reconstructor = MessageReconstructor() + self.broadcast_channel = asyncio.Queue() - async def __run(self): - """ - Read SphinxPackets from inbound socket and spawn a thread for each packet to process it. + async def __process_sphinx_packet( + self, packet: SphinxPacket + ) -> NetworkPacket | None: + try: + processed = packet.process(self.config.private_key) + match processed: + case ProcessedForwardHopPacket(): + return processed.next_packet + case ProcessedFinalHopPacket(): + await self.__process_sphinx_payload(processed.payload) + except Exception: + # Return SphinxPacket as it is, if this node cannot unwrap it. + return packet - This thread approximates a M/M/inf queue. - """ + async def __process_sphinx_payload(self, payload: Payload): + msg_with_flag = self.reconstructor.add( + Fragment.from_bytes(payload.recover_plain_playload()) + ) + if msg_with_flag is not None: + flag, msg = PacketBuilder.parse_msg_and_flag(msg_with_flag) + if flag == MessageFlag.MESSAGE_FLAG_REAL: + await self.broadcast_channel.put(msg) + + async def send_message(self, msg: bytes): + for packet, _ in PacketBuilder.build_real_packets(msg, self.membership): + await self.mixgossip_channel.gossip(packet) + + +class MixGossipChannel: + inbound_conns: list[Connection] + outbound_conns: list[MixOutboundConnection] + handler: Callable[[SphinxPacket], Awaitable[NetworkPacket | None]] + + def __init__( + self, + config: NodeConfig, + handler: Callable[[SphinxPacket], Awaitable[NetworkPacket | None]], + ): + self.inbound_conns = [asyncio.Queue() for _ in range(config.conn_degree)] + self.outbound_conns = [ + MixOutboundConnection(config.transmission_rate_per_sec) + for _ in range(config.conn_degree) + ] + self.handler = handler # A set just for gathering a reference of tasks to prevent them from being garbage collected. # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task self.tasks = set() - - while True: - _, packet = await self.inbound_socket.get() - task = asyncio.create_task( - self.__process_packet( - packet, - self.config.encryption_private_key, - self.config.delay_rate_per_min, - ) - ) + for conn in self.inbound_conns: + task = asyncio.create_task(self.__process_inbound_conn(conn)) self.tasks.add(task) # To discard the task from the set automatically when it is done. task.add_done_callback(self.tasks.discard) - async def __process_packet( - self, - packet: SphinxPacket, - encryption_private_key: X25519PrivateKey, - delay_rate_per_min: int, # Poisson rate parameter: mu - ): - """ - Process a single packet with a delay that follows exponential distribution, - and forward it to the next mix node or the mix destination + async def __process_inbound_conn(self, conn: Connection): + while True: + elem = await conn.get() + if isinstance(elem, bytes): + assert elem == build_noise_packet() + # Drop packet + continue + elif isinstance(elem, SphinxPacket): + net_packet = await self.handler(elem) + if net_packet is not None: + await self.gossip(net_packet) - This thread is a single server (worker) in a M/M/inf queue that MixNodeRunner approximates. - """ - delay_sec = poisson_interval_sec(delay_rate_per_min) - await asyncio.sleep(delay_sec) + async def gossip(self, packet: NetworkPacket): + for conn in self.outbound_conns: + await conn.send(packet) - processed = packet.process(encryption_private_key) - match processed: - case ProcessedForwardHopPacket(): - await self.outbound_socket.put( - (processed.next_node_address, processed.next_packet) - ) - case ProcessedFinalHopPacket(): - await self.outbound_socket.put( - (processed.destination_node_address, processed.payload) - ) - case _: - raise UnknownHeaderTypeError - async def cancel(self) -> None: - self.task.cancel() - with suppress(asyncio.CancelledError): - await self.task +class MixOutboundConnection: + transmission_rate_per_sec: int + queue: NetworkPacketQueue + conn: Connection + + def __init__(self, transmission_rate_per_sec: int): + self.transmission_rate_per_sec = transmission_rate_per_sec + self.queue = asyncio.Queue() + self.conn = asyncio.Queue() + self.task = asyncio.create_task(self.__run()) + + async def __run(self): + while True: + await asyncio.sleep(1 / self.transmission_rate_per_sec) + # TODO: time mixing + if self.queue.empty(): + elem = build_noise_packet() + else: + elem = self.queue.get_nowait() + await self.conn.put(elem) + + async def send(self, elem: NetworkPacket): + await self.queue.put(elem) + + +def build_noise_packet() -> bytes: + return bytes(DEFAULT_PAYLOAD_SIZE) diff --git a/mixnet/packet.py b/mixnet/packet.py index 71fe687..f40a290 100644 --- a/mixnet/packet.py +++ b/mixnet/packet.py @@ -9,7 +9,7 @@ from typing import Dict, List, Self, Tuple, TypeAlias from pysphinx.payload import Payload from pysphinx.sphinx import SphinxPacket -from mixnet.config import MixnetTopology, MixNodeInfo +from mixnet.config import MixMembership, NodePublicInfo class MessageFlag(Enum): @@ -23,25 +23,25 @@ class MessageFlag(Enum): class PacketBuilder: @staticmethod def build_real_packets( - message: bytes, topology: MixnetTopology - ) -> List[Tuple[SphinxPacket, List[MixNodeInfo]]]: + message: bytes, membership: MixMembership + ) -> List[Tuple[SphinxPacket, List[NodePublicInfo]]]: return PacketBuilder.__build_packets( - MessageFlag.MESSAGE_FLAG_REAL, message, topology + MessageFlag.MESSAGE_FLAG_REAL, message, membership ) @staticmethod def build_drop_cover_packets( - message: bytes, topology: MixnetTopology - ) -> List[Tuple[SphinxPacket, List[MixNodeInfo]]]: + message: bytes, membership: MixMembership + ) -> List[Tuple[SphinxPacket, List[NodePublicInfo]]]: return PacketBuilder.__build_packets( - MessageFlag.MESSAGE_FLAG_DROP_COVER, message, topology + MessageFlag.MESSAGE_FLAG_DROP_COVER, message, membership ) @staticmethod def __build_packets( - flag: MessageFlag, message: bytes, topology: MixnetTopology - ) -> List[Tuple[SphinxPacket, List[MixNodeInfo]]]: - destination = topology.choose_mix_destination() + flag: MessageFlag, message: bytes, membership: MixMembership + ) -> List[Tuple[SphinxPacket, List[NodePublicInfo]]]: + last_mix = membership.choose() msg_with_flag = flag.bytes() + message # NOTE: We don't encrypt msg_with_flag for destination. @@ -50,11 +50,11 @@ class PacketBuilder: out = [] for fragment in fragment_set.fragments: - route = topology.generate_route(destination) + route = membership.generate_route(3, last_mix) packet = SphinxPacket.build( fragment.bytes(), [mixnode.sphinx_node() for mixnode in route], - destination.sphinx_node(), + last_mix.sphinx_node(), ) out.append((packet, route)) diff --git a/mixnet/poisson.py b/mixnet/poisson.py deleted file mode 100644 index 86a4b66..0000000 --- a/mixnet/poisson.py +++ /dev/null @@ -1,13 +0,0 @@ -import numpy - - -def poisson_interval_sec(rate_per_min: int) -> float: - # If events occur in a Poisson distribution with rate_per_min, - # the interval between events follows the exponential distribution - # with the rate_per_min (i.e. with the scale 1/rate_per_min). - interval_min = numpy.random.exponential(scale=1 / rate_per_min, size=1)[0] - return interval_min * 60 - - -def poisson_mean_interval_sec(rate_per_min: int) -> float: - return 1 / rate_per_min * 60 diff --git a/mixnet/test_client.py b/mixnet/test_client.py deleted file mode 100644 index 4327466..0000000 --- a/mixnet/test_client.py +++ /dev/null @@ -1,45 +0,0 @@ -from datetime import datetime -from unittest import IsolatedAsyncioTestCase - -import numpy - -from mixnet.client import MixClient -from mixnet.poisson import poisson_mean_interval_sec -from mixnet.test_utils import ( - init_mixnet_config, - with_test_timeout, -) -from mixnet.utils import random_bytes - - -class TestMixClient(IsolatedAsyncioTestCase): - @with_test_timeout(100) - async def test_mixclient(self): - config = init_mixnet_config().mixclient_config - config.emission_rate_per_min = 30 - config.redundancy = 3 - - mixclient = await MixClient.new(config) - try: - # Send a 3500-byte msg, expecting that it is split into at least two packets - await mixclient.send_message(random_bytes(3500)) - - # Calculate intervals between packet emissions from the mix client - intervals = [] - ts = datetime.now() - for _ in range(30): - _ = await mixclient.outbound_socket.get() - now = datetime.now() - intervals.append((now - ts).total_seconds()) - ts = now - - # Check if packets were emitted at the Poisson emission_rate - # If emissions follow the Poisson distribution with a rate `lambda`, - # a mean interval between emissions must be `1/lambda`. - self.assertAlmostEqual( - float(numpy.mean(intervals)), - poisson_mean_interval_sec(config.emission_rate_per_min), - delta=1.0, - ) - finally: - await mixclient.cancel() diff --git a/mixnet/test_fisheryates.py b/mixnet/test_fisheryates.py deleted file mode 100644 index a32554c..0000000 --- a/mixnet/test_fisheryates.py +++ /dev/null @@ -1,21 +0,0 @@ -from unittest import TestCase - -from mixnet.fisheryates import FisherYates - - -class TestFisherYates(TestCase): - def test_shuffle(self): - entropy = b"hello" - elems = [1, 2, 3, 4, 5] - - shuffled1 = FisherYates.shuffle(elems, entropy) - self.assertEqual(sorted(elems), sorted(shuffled1)) - - # shuffle again with the same entropy - shuffled2 = FisherYates.shuffle(elems, entropy) - self.assertEqual(shuffled1, shuffled2) - - # shuffle with a different entropy - shuffled3 = FisherYates.shuffle(elems, b"world") - self.assertNotEqual(shuffled1, shuffled3) - self.assertEqual(sorted(elems), sorted(shuffled3)) diff --git a/mixnet/test_mixnet.py b/mixnet/test_mixnet.py deleted file mode 100644 index 9a0e4cb..0000000 --- a/mixnet/test_mixnet.py +++ /dev/null @@ -1,20 +0,0 @@ -import asyncio -from unittest import IsolatedAsyncioTestCase - -from mixnet.mixnet import Mixnet -from mixnet.test_utils import init_mixnet_config - - -class TestMixnet(IsolatedAsyncioTestCase): - async def test_topology_from_robustness(self): - config = init_mixnet_config() - entropy_queue = asyncio.Queue() - - mixnet = await Mixnet.new(config, entropy_queue) - try: - old_topology = config.mixclient_config.topology - await entropy_queue.put(b"new entropy") - await asyncio.sleep(1) - self.assertNotEqual(old_topology, mixnet.get_topology()) - finally: - await mixnet.cancel() diff --git a/mixnet/test_node.py b/mixnet/test_node.py index 26ab0c7..10ef3e2 100644 --- a/mixnet/test_node.py +++ b/mixnet/test_node.py @@ -1,117 +1,15 @@ -import asyncio -from datetime import datetime from unittest import IsolatedAsyncioTestCase -import numpy -from pysphinx.sphinx import SphinxPacket - -from mixnet.node import MixNode, NodeAddress, PacketQueue -from mixnet.packet import PacketBuilder -from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec +from mixnet.node import Node from mixnet.test_utils import ( init_mixnet_config, - with_test_timeout, ) -class TestMixNodeRunner(IsolatedAsyncioTestCase): - @with_test_timeout(180) - async def test_mixnode_emission_rate(self): - """ - Test if MixNodeRunner works as a M/M/inf queue. - - If inputs are arrived at Poisson rate `lambda`, - and if processing is delayed according to an exponential distribution with a rate `mu`, - the rate of outputs should be `lambda`. - """ - config = init_mixnet_config() - config.mixclient_config.emission_rate_per_min = 120 # lambda (= 2msg/sec) - config.mixnode_config.delay_rate_per_min = 30 # mu (= 2s delay on average) - - packet, route = PacketBuilder.build_real_packets( - b"msg", config.mixclient_config.topology - )[0] - - # Start only the first mix node for testing - config.mixnode_config.encryption_private_key = route[0].encryption_private_key - mixnode = await MixNode.new(config.mixnode_config) - try: - # Send packets to the first mix node in a Poisson distribution - packet_count = 100 - # This queue is just for counting how many packets have been sent so far. - sent_packet_queue: PacketQueue = asyncio.Queue() - sender_task = asyncio.create_task( - self.send_packets( - mixnode.inbound_socket, - packet, - route[0].addr, - packet_count, - config.mixclient_config.emission_rate_per_min, - sent_packet_queue, - ) - ) - try: - # Calculate intervals between outputs and gather num_jobs in the first mix node. - intervals = [] - num_jobs = [] - ts = datetime.now() - for _ in range(packet_count): - _ = await mixnode.outbound_socket.get() - now = datetime.now() - intervals.append((now - ts).total_seconds()) - - # Calculate the current # of jobs staying in the mix node - num_packets_emitted_from_mixnode = len(intervals) - num_packets_sent_to_mixnode = sent_packet_queue.qsize() - num_jobs.append( - num_packets_sent_to_mixnode - num_packets_emitted_from_mixnode - ) - - ts = now - - # Remove the first interval that would be much larger than other intervals, - # because of the delay in mix node. - intervals = intervals[1:] - num_jobs = num_jobs[1:] - - # Check if the emission rate of the first mix node is the same as - # the emission rate of the message sender, but with a delay. - # If outputs follow the Poisson distribution with a rate `lambda`, - # a mean interval between outputs must be `1/lambda`. - self.assertAlmostEqual( - float(numpy.mean(intervals)), - poisson_mean_interval_sec( - config.mixclient_config.emission_rate_per_min - ), - delta=1.0, - ) - # If runner is a M/M/inf queue, - # a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`. - self.assertAlmostEqual( - float(numpy.mean(num_jobs)), - round( - config.mixclient_config.emission_rate_per_min - / config.mixnode_config.delay_rate_per_min - ), - delta=1.5, - ) - finally: - await sender_task - finally: - await mixnode.cancel() - - @staticmethod - async def send_packets( - inbound_socket: PacketQueue, - packet: SphinxPacket, - node_addr: NodeAddress, - cnt: int, - rate_per_min: int, - # For testing purpose, to inform the caller how many packets have been sent to the inbound_socket - sent_packet_queue: PacketQueue, - ): - for _ in range(cnt): - # Since the task is not heavy, just sleep for seconds instead of using emission_notifier - await asyncio.sleep(poisson_interval_sec(rate_per_min)) - await inbound_socket.put((node_addr, packet)) - await sent_packet_queue.put((node_addr, packet)) +class TestNode(IsolatedAsyncioTestCase): + async def test_node(self): + config = init_mixnet_config(10) + nodes = [ + Node(node_config, config.membership) for node_config in config.node_configs + ] + await nodes[0].send_message(b"block selection") diff --git a/mixnet/test_packet.py b/mixnet/test_packet.py index d1d517a..a923d78 100644 --- a/mixnet/test_packet.py +++ b/mixnet/test_packet.py @@ -3,7 +3,7 @@ from unittest import TestCase from pysphinx.sphinx import ProcessedFinalHopPacket, SphinxPacket -from mixnet.config import MixNodeInfo +from mixnet.config import NodePublicInfo from mixnet.packet import ( Fragment, MessageFlag, @@ -16,9 +16,9 @@ from mixnet.utils import random_bytes class TestPacket(TestCase): def test_real_packet(self): - topology = init_mixnet_config().mixclient_config.topology + membership = init_mixnet_config(10).membership msg = random_bytes(3500) - packets_and_routes = PacketBuilder.build_real_packets(msg, topology) + packets_and_routes = PacketBuilder.build_real_packets(msg, membership) self.assertEqual(4, len(packets_and_routes)) reconstructor = MessageReconstructor() @@ -47,9 +47,9 @@ class TestPacket(TestCase): ) def test_cover_packet(self): - topology = init_mixnet_config().mixclient_config.topology + membership = init_mixnet_config(10).membership msg = b"cover" - packets_and_routes = PacketBuilder.build_drop_cover_packets(msg, topology) + packets_and_routes = PacketBuilder.build_drop_cover_packets(msg, membership) self.assertEqual(1, len(packets_and_routes)) reconstructor = MessageReconstructor() @@ -63,14 +63,14 @@ class TestPacket(TestCase): ) @staticmethod - def process_packet(packet: SphinxPacket, route: List[MixNodeInfo]) -> Fragment: - processed = packet.process(route[0].encryption_private_key) + def process_packet(packet: SphinxPacket, route: List[NodePublicInfo]) -> Fragment: + processed = packet.process(route[0].private_key) if isinstance(processed, ProcessedFinalHopPacket): return Fragment.from_bytes(processed.payload.recover_plain_playload()) else: processed = processed for node in route[1:]: - p = processed.next_packet.process(node.encryption_private_key) + p = processed.next_packet.process(node.private_key) if isinstance(p, ProcessedFinalHopPacket): return Fragment.from_bytes(p.payload.recover_plain_playload()) else: diff --git a/mixnet/test_utils.py b/mixnet/test_utils.py index e3ba260..eff02d5 100644 --- a/mixnet/test_utils.py +++ b/mixnet/test_utils.py @@ -2,17 +2,12 @@ import asyncio from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey -from mixnet.bls import generate_bls from mixnet.config import ( - MixClientConfig, - MixNodeConfig, + MixMembership, MixnetConfig, - MixNodeInfo, - MixnetTopology, - MixnetTopologyConfig, - MixnetTopologySize, + NodeConfig, + NodePublicInfo, ) -from mixnet.utils import random_bytes def with_test_timeout(t): @@ -26,21 +21,14 @@ def with_test_timeout(t): return wrapper -def init_mixnet_config() -> MixnetConfig: - topology_config = MixnetTopologyConfig( - [ - MixNodeInfo( - generate_bls(), - X25519PrivateKey.generate(), - random_bytes(32), - ) - for _ in range(12) - ], - MixnetTopologySize(3, 3), - b"entropy", +def init_mixnet_config(num_nodes: int) -> MixnetConfig: + conn_degree = 4 + transmission_rate_per_sec = 3 + node_configs = [ + NodeConfig(X25519PrivateKey.generate(), conn_degree, transmission_rate_per_sec) + for _ in range(num_nodes) + ] + membership = MixMembership( + [NodePublicInfo(node_config.private_key) for node_config in node_configs] ) - mixclient_config = MixClientConfig(30, 3, MixnetTopology(topology_config)) - mixnode_config = MixNodeConfig( - topology_config.mixnode_candidates[0].encryption_private_key, 30 - ) - return MixnetConfig(topology_config, mixclient_config, mixnode_config) + return MixnetConfig(node_configs, membership) diff --git a/mixnet/v2/sim/adversary.py b/mixnet/v2/sim/adversary.py index 6ce63ef..adcbd15 100644 --- a/mixnet/v2/sim/adversary.py +++ b/mixnet/v2/sim/adversary.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections import defaultdict, deque, Counter +from collections import Counter, defaultdict, deque from enum import Enum from typing import TYPE_CHECKING @@ -46,14 +46,25 @@ class Adversary: self.senders_around_interval.update({sender}) # self.node_states[self.env.now][node] = NodeState.SENDING - def observe_if_final_msg(self, sender: "Node", receiver: "Node", time_sent: Time, msg: SphinxPacket | bytes): + def observe_if_final_msg( + self, + sender: "Node", + receiver: "Node", + time_sent: Time, + msg: SphinxPacket | bytes, + ): origin_id = receiver.inspect_message(msg) if origin_id is not None: cur_time = len(self.msgs_received_per_time) - 1 - self.final_msgs_received[receiver][cur_time].append((sender, time_sent, origin_id)) + self.final_msgs_received[receiver][cur_time].append( + (sender, time_sent, origin_id) + ) def is_around_message_interval(self, time: Time) -> bool: - return time % self.config.mixnet.message_interval <= self.config.mixnet.max_message_prep_time + return ( + time % self.config.mixnet.message_interval + <= self.config.mixnet.max_message_prep_time + ) def update_observation_time(self): while True: @@ -66,7 +77,10 @@ class Adversary: for time_received in msg_queue: # If the message is likely to be still pending and be emitted soon, # pass it on to the next time slot. - if self.env.now() - time_received < self.config.mixnet.max_mix_delay: + if ( + self.env.now() - time_received + < self.config.mixnet.max_mix_delay + ): new_msg_pool[receiver][0].append(time_received) self.msg_pools_per_time.append(new_msg_pool) diff --git a/mixnet/v2/sim/bulk-attack-2024-06-11T23:03:46.csv b/mixnet/v2/sim/bulk-attack-2024-06-11T23:03:46.csv new file mode 100644 index 0000000..f0b0d85 --- /dev/null +++ b/mixnet/v2/sim/bulk-attack-2024-06-11T23:03:46.csv @@ -0,0 +1,33 @@ +p2p_type,num_mix_layers,cover_message_prob,mix_delay,global_precision,global_recall,global_f1_score,target_median,target_std,target_min,target_25%,target_mean,target_75%,target_max +1-to-all,0,0.0,0,100.0,95.23809523809523,97.56097560975608,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +1-to-all,0,0.1,0,26.595744680851062,100.0,42.016806722689076,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +1-to-all,0,0.2,0,26.262626262626267,100.0,41.60000000000001,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +1-to-all,0,0.3,0,25.252525252525253,96.15384615384616,39.99999999999999,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +1-to-all,1,0.0,0,100.0,100.0,100.0,100.0,27.386127875258307,50.0,50.0,80.0,100.0,100.0 +1-to-all,1,0.1,0,21.50537634408602,95.23809523809523,35.08771929824562,0.0,14.907119849998598,0.0,0.0,6.666666666666666,0.0,33.33333333333333 +1-to-all,1,0.2,0,23.232323232323232,95.83333333333334,37.39837398373983,0.0,0.0,0.0,0.0,0.0,0.0,0.0 +1-to-all,1,0.3,0,24.242424242424242,100.0,39.02439024390243,0.0,0.0,0.0,0.0,0.0,0.0,0.0 +1-to-all,2,0.0,0,100.0,100.0,100.0,50.0,22.360679774997898,50.0,50.0,60.0,50.0,100.0 +1-to-all,2,0.1,0,21.875,100.0,35.8974358974359,33.33333333333333,14.907119849998596,0.0,33.33333333333333,26.666666666666664,33.33333333333333,33.33333333333333 +1-to-all,2,0.2,0,21.21212121212121,100.0,34.99999999999999,0.0,14.907119849998598,0.0,0.0,6.666666666666666,0.0,33.33333333333333 +1-to-all,2,0.3,0,28.28282828282828,100.0,44.09448818897637,0.0,14.907119849998598,0.0,0.0,6.666666666666666,0.0,33.33333333333333 +1-to-all,3,0.0,0,100.0,100.0,100.0,33.33333333333333,36.51483716701107,33.33333333333333,33.33333333333333,59.999999999999986,100.0,100.0 +1-to-all,3,0.1,0,28.421052631578945,100.0,44.26229508196721,0.0,0.0,0.0,0.0,0.0,0.0,0.0 +1-to-all,3,0.2,0,23.46938775510204,100.0,38.01652892561983,0.0,18.257418583505537,0.0,0.0,13.333333333333332,33.33333333333333,33.33333333333333 +1-to-all,3,0.3,0,18.181818181818183,100.0,30.76923076923077,0.0,14.907119849998598,0.0,0.0,6.666666666666666,0.0,33.33333333333333 +gossip,0,0.0,0,100.0,100.0,100.0,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +gossip,0,0.1,0,23.655913978494624,100.0,38.26086956521739,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +gossip,0,0.2,0,18.0,100.0,30.508474576271187,25.0,45.86072499703549,11.11111111111111,14.285714285714285,50.079365079365076,100.0,100.0 +gossip,0,0.3,0,24.0,100.0,38.70967741935484,100.0,32.48931448269655,33.33333333333333,50.0,76.66666666666666,100.0,100.0 +gossip,1,0.0,0,100.0,100.0,100.0,50.0,27.386127875258307,50.0,50.0,70.0,100.0,100.0 +gossip,1,0.1,0,21.875,100.0,35.8974358974359,9.090909090909092,1.8836560973128214,5.88235294117647,8.333333333333332,8.70172311348782,9.090909090909092,11.11111111111111 +gossip,1,0.2,0,16.0,100.0,27.586206896551722,6.25,3.7611412893852796,0.0,4.545454545454546,5.697552447552448,7.6923076923076925,10.0 +gossip,1,0.3,0,22.0,100.0,36.0655737704918,3.571428571428571,4.645380092823903,0.0,3.225806451612903,4.811827956989248,4.761904761904762,12.5 +gossip,2,0.0,0,25.882352941176475,100.0,41.12149532710281,50.0,23.47382389307855,7.142857142857142,7.142857142857142,32.857142857142854,50.0,50.0 +gossip,2,0.1,0,27.27272727272727,100.0,42.857142857142854,7.6923076923076925,3.1467257425263124,5.555555555555555,7.6923076923076925,9.18803418803419,12.5,12.5 +gossip,2,0.2,0,25.0,100.0,40.0,4.166666666666666,1.611565442570853,3.225806451612903,3.225806451612903,4.385560675883257,4.166666666666666,7.142857142857142 +gossip,2,0.3,0,31.0,100.0,47.32824427480916,3.4482758620689653,0.3762583509436308,3.0303030303030303,3.0303030303030303,3.411747894506515,3.7037037037037033,3.8461538461538463 +gossip,3,0.0,0,25.0,100.0,40.0,3.225806451612903,25.783526630398008,2.7777777777777777,2.7777777777777777,21.756272401433687,50.0,50.0 +gossip,3,0.1,0,24.0,100.0,38.70967741935484,7.6923076923076925,3.5018591849872105,4.0,4.0,7.176923076923077,7.6923076923076925,12.5 +gossip,3,0.2,0,22.0,100.0,36.0655737704918,1.694915254237288,1.8261038200732775,0.0,1.5384615384615385,2.010311722176129,1.8181818181818181,5.0 +gossip,3,0.3,0,22.0,100.0,36.0655737704918,3.125,0.6566081240247693,1.6129032258064515,2.857142857142857,2.769009216589862,3.125,3.125 diff --git a/mixnet/v2/sim/bulk-attack-2024-06-13T16:32:29.csv b/mixnet/v2/sim/bulk-attack-2024-06-13T16:32:29.csv new file mode 100644 index 0000000..1983e54 --- /dev/null +++ b/mixnet/v2/sim/bulk-attack-2024-06-13T16:32:29.csv @@ -0,0 +1,33 @@ +p2p_type,num_mix_layers,cover_message_prob,mix_delay,global_precision,global_recall,global_f1_score,target_accuracy_median,target_accuracy_std,target_accuracy_min,target_accuracy_25p,target_accuracy_mean,target_accuracy_75p,target_accuracy_max +1-to-all,0,0.0,0,100.0,100.0,100.0,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +1-to-all,0,0.1,0,18.6046511627907,100.0,31.372549019607845,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +1-to-all,0,0.2,0,23.157894736842106,100.0,37.6068376068376,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +1-to-all,0,0.3,0,17.17171717171717,100.0,29.3103448275862,100.0,0.0,100.0,100.0,100.0,100.0,100.0 +1-to-all,1,0.0,0,100.0,100.0,100.0,50.0,27.386127875258307,33.33333333333333,50.0,66.66666666666667,100.0,100.0 +1-to-all,1,0.1,0,21.518987341772153,100.0,35.41666666666667,10.555555555555555,4.289387252569091,6.25,9.318181818181818,11.499403374403373,12.5,25.0 +1-to-all,1,0.2,0,20.408163265306122,100.0,33.898305084745765,4.3478260869565215,1.064716488345387,2.7777777777777777,4.166666666666666,4.748639137369114,5.263157894736842,7.142857142857142 +1-to-all,1,0.3,0,13.131313131313133,100.0,23.21428571428572,3.3333333333333335,0.4835588183696303,2.4390243902439024,3.0539772727272725,3.289326227500298,3.4482758620689653,4.0 +1-to-all,2,0.0,0,100.0,100.0,100.0,50.0,30.731814857642963,33.33333333333333,33.33333333333333,62.5,100.0,100.0 +1-to-all,2,0.1,0,21.686746987951807,100.0,35.64356435643564,9.090909090909092,2.9511091437139054,7.6923076923076925,8.333333333333332,10.668458525601384,12.5,16.666666666666664 +1-to-all,2,0.2,0,17.17171717171717,100.0,29.3103448275862,4.772727272727273,1.517418864339985,3.7037037037037033,4.21195652173913,5.229772314519028,5.88235294117647,8.333333333333332 +1-to-all,2,0.3,0,19.19191919191919,100.0,32.20338983050847,3.225806451612903,0.39312356236984713,2.7777777777777777,3.125,3.350887277298567,3.7037037037037033,3.8461538461538463 +1-to-all,3,0.0,0,100.0,100.0,100.0,100.0,32.338083338177725,33.33333333333333,33.33333333333333,72.22222222222221,100.0,100.0 +1-to-all,3,0.1,0,16.470588235294116,100.0,28.282828282828284,9.166666666666666,2.2034971259087777,6.25,7.852564102564102,9.674145299145298,12.152777777777779,12.5 +1-to-all,3,0.2,0,16.161616161616163,100.0,27.82608695652174,5.131578947368421,0.8437177952608307,3.125,4.5995670995671,5.08918183464537,5.88235294117647,5.88235294117647 +1-to-all,3,0.3,0,16.161616161616163,100.0,27.82608695652174,3.571428571428571,0.6071582079904736,2.941176470588235,2.941176470588235,3.5602522403023538,4.0,5.0 +gossip,0,0.0,0,100.0,100.0,100.0,100.0,13.844373104863458,50.0,100.0,96.0,100.0,100.0 +gossip,0,0.1,0,25.581395348837212,100.0,40.74074074074075,33.33333333333333,33.355058782757894,6.25,16.666666666666664,42.353814988430365,50.0,100.0 +gossip,0,0.2,0,21.21212121212121,100.0,34.99999999999999,16.666666666666664,29.65823962793278,4.545454545454546,8.333333333333332,28.250577087558728,33.33333333333333,100.0 +gossip,0,0.3,0,16.0,100.0,27.586206896551722,9.090909090909092,26.889855352443877,2.5,6.25,22.139198132019306,27.083333333333332,100.0 +gossip,1,0.0,0,100.0,100.0,100.0,100.0,26.111648393354674,50.0,50.0,77.27272727272727,100.0,100.0 +gossip,1,0.1,0,17.24137931034483,100.0,29.41176470588236,9.545454545454547,6.059468261509824,8.333333333333332,9.090909090909092,11.66779401154401,12.5,33.33333333333333 +gossip,1,0.2,0,19.587628865979383,100.0,32.758620689655174,5.263157894736842,2.6675474246775224,3.7037037037037033,4.545454545454546,5.609466489598406,5.263157894736842,16.666666666666664 +gossip,1,0.3,0,22.0,100.0,36.0655737704918,3.7037037037037033,0.9160598857343621,2.857142857142857,3.3620689655172415,3.843721629198569,4.050925925925926,6.25 +gossip,2,0.0,0,23.157894736842106,100.0,37.6068376068376,33.33333333333333,37.12662068164611,2.857142857142857,10.795454545454547,42.426595143986454,75.0,100.0 +gossip,2,0.1,0,18.181818181818183,100.0,30.76923076923077,7.6923076923076925,2.7804557733541913,3.0303030303030303,5.0,7.409481356849778,10.0,11.11111111111111 +gossip,2,0.2,0,15.0,100.0,26.08695652173913,3.8461538461538463,1.2241427103675966,1.4705882352941175,3.0303030303030303,3.5156305993901906,4.0,6.25 +gossip,2,0.3,0,19.0,100.0,31.932773109243698,3.125,0.8305457174085623,1.2345679012345678,2.272727272727273,2.8054490290275624,3.3122119815668203,3.7037037037037033 +gossip,3,0.0,0,17.0,100.0,29.05982905982906,3.3333333333333335,34.44621772314927,1.6129032258064515,2.1165780141843973,22.259486584613253,31.25,100.0 +gossip,3,0.1,0,12.0,100.0,21.428571428571427,9.090909090909092,4.350041393990425,2.1739130434782608,9.090909090909092,8.88079119963178,10.0,16.666666666666664 +gossip,3,0.2,0,14.14141414141414,100.0,24.778761061946902,5.0,1.4150913057792438,1.5151515151515151,3.225806451612903,4.217988467751566,5.0,6.25 +gossip,3,0.3,0,21.0,100.0,34.710743801652896,3.0303030303030303,0.9983480122032289,1.3157894736842104,1.4870500438981562,2.583031727692883,3.134796238244514,3.8461538461538463