Embed Robustness into `mixnet.py` (#61)

This commit is contained in:
Youngjoon Lee 2024-02-08 15:39:50 +09:00 committed by GitHub
parent cde1e92c9e
commit 5dd7b2730a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 180 additions and 262 deletions

View File

@ -4,49 +4,47 @@ import asyncio
from contextlib import suppress
from typing import Self
from mixnet.config import MixnetConfig
from mixnet.config import MixClientConfig, MixnetTopology
from mixnet.node import PacketQueue
from mixnet.packet import PacketBuilder
from mixnet.poisson import poisson_interval_sec
class MixClient:
__config: MixnetConfig
__real_packet_queue: PacketQueue
__outbound_socket: PacketQueue
__task: asyncio.Task # A reference just to prevent task from being garbage collected
config: MixClientConfig
real_packet_queue: PacketQueue
outbound_socket: PacketQueue
task: asyncio.Task # A reference just to prevent task from being garbage collected
@classmethod
async def new(
cls,
config: MixnetConfig,
config: MixClientConfig,
) -> Self:
self = cls()
self.__config = config
self.__real_packet_queue = asyncio.Queue()
self.__outbound_socket = asyncio.Queue()
self.__task = asyncio.create_task(self.__run())
self.config = config
self.real_packet_queue = asyncio.Queue()
self.outbound_socket = asyncio.Queue()
self.task = asyncio.create_task(self.__run())
return self
def set_config(self, config: MixnetConfig) -> None:
def set_topology(self, topology: MixnetTopology) -> None:
"""
Replace the old config with the new config received
Replace the old topology with the new topology received
In real implementations, this method may be integrated in a long-running task.
Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment.
"""
self.__config = config
self.config.topology = topology
def get_config(self) -> MixnetConfig:
return self.__config
# Only for testing
def get_topology(self) -> MixnetTopology:
return self.config.topology
async def send_message(self, msg: bytes) -> None:
packets_and_routes = PacketBuilder.build_real_packets(
msg, self.__config.topology
)
packets_and_routes = PacketBuilder.build_real_packets(msg, self.config.topology)
for packet, route in packets_and_routes:
await self.__real_packet_queue.put((route[0].addr, packet))
await self.real_packet_queue.put((route[0].addr, packet))
def subscribe_messages(self) -> "asyncio.Queue[bytes]":
"""
@ -54,10 +52,6 @@ class MixClient:
"""
return asyncio.Queue()
@property
def outbound_socket(self) -> PacketQueue:
return self.__outbound_socket
async def __run(self):
"""
Emit packets at the Poisson emission_rate_per_min.
@ -73,7 +67,7 @@ class MixClient:
emission_notifier_queue = asyncio.Queue()
_ = asyncio.create_task(
self.__emission_notifier(
self.__config.emission_rate_per_min, emission_notifier_queue
self.config.emission_rate_per_min, emission_notifier_queue
)
)
@ -81,7 +75,7 @@ class MixClient:
# Wait until the next emission time
_ = await emission_notifier_queue.get()
try:
await self.__emit(self.__config.redundancy, redundant_real_packet_queue)
await self.__emit(self.config.redundancy, redundant_real_packet_queue)
finally:
# Python convention: indicate that the previously enqueued task has been processed
emission_notifier_queue.task_done()
@ -93,23 +87,23 @@ class MixClient:
):
if not redundant_real_packet_queue.empty():
addr, packet = redundant_real_packet_queue.get_nowait()
await self.__outbound_socket.put((addr, packet))
await self.outbound_socket.put((addr, packet))
return
if not self.__real_packet_queue.empty():
addr, packet = self.__real_packet_queue.get_nowait()
if not self.real_packet_queue.empty():
addr, packet = self.real_packet_queue.get_nowait()
# Schedule redundant real packets
for _ in range(redundancy - 1):
redundant_real_packet_queue.put_nowait((addr, packet))
await self.__outbound_socket.put((addr, packet))
await self.outbound_socket.put((addr, packet))
packets_and_routes = PacketBuilder.build_drop_cover_packets(
b"drop cover", self.__config.topology
b"drop cover", self.config.topology
)
# We have a for loop here, but we expect that the total num of packets is 1
# because the dummy message is short.
for packet, route in packets_and_routes:
await self.__outbound_socket.put((route[0].addr, packet))
await self.outbound_socket.put((route[0].addr, packet))
async def __emission_notifier(
self, emission_rate_per_min: int, queue: asyncio.Queue
@ -119,6 +113,6 @@ class MixClient:
queue.put_nowait(None)
async def cancel(self) -> None:
self.__task.cancel()
self.task.cancel()
with suppress(asyncio.CancelledError):
await self.__task
await self.task

View File

@ -11,22 +11,59 @@ from cryptography.hazmat.primitives.asymmetric.x25519 import (
from pysphinx.node import Node
from mixnet.bls import BlsPrivateKey, BlsPublicKey
from mixnet.fisheryates import FisherYates
@dataclass
class MixnetConfig:
topology_config: MixnetTopologyConfig
mixclient_config: MixClientConfig
mixnode_config: MixNodeConfig
@dataclass
class MixnetTopologyConfig:
mixnode_candidates: List[MixNodeInfo]
size: MixnetTopologySize
entropy: bytes
@dataclass
class MixClientConfig:
emission_rate_per_min: int # Poisson rate parameter: lambda
redundancy: int
delay_rate_per_min: int # Poisson rate parameter: mu
topology: MixnetTopology
@dataclass
class MixNodeConfig:
encryption_private_key: X25519PrivateKey
delay_rate_per_min: int # Poisson rate parameter: mu
@dataclass
class MixnetTopology:
# In production, this can be a 1-D array, which is accessible by indexes.
# Here, we use a 2-D array for readability.
layers: List[List[MixNodeInfo]]
def __init__(
self,
config: MixnetTopologyConfig,
) -> None:
"""
Build a new topology deterministically using an entropy and a given set of candidates.
"""
shuffled = FisherYates.shuffle(config.mixnode_candidates, config.entropy)
sampled = shuffled[: config.size.num_total_mixnodes()]
layers = []
for layer_id in range(config.size.num_layers):
start = layer_id * config.size.num_mixnodes_per_layer
layer = sampled[start : start + config.size.num_mixnodes_per_layer]
layers.append(layer)
self.layers = layers
def generate_route(self, mix_destination: MixNodeInfo) -> list[MixNodeInfo]:
"""
Generate a mix route for a Sphinx packet.
@ -45,6 +82,15 @@ class MixnetTopology:
return random.choice(self.layers[-1])
@dataclass
class MixnetTopologySize:
num_layers: int
num_mixnodes_per_layer: int
def num_total_mixnodes(self) -> int:
return self.num_layers * self.num_mixnodes_per_layer
# 32-byte that represents an IP address and a port of a mix node.
NodeAddress: TypeAlias = bytes

View File

@ -1,51 +1,62 @@
from __future__ import annotations
import asyncio
from typing import Self
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
)
from contextlib import suppress
from typing import Self, TypeAlias
from mixnet.client import MixClient
from mixnet.config import MixnetConfig
from mixnet.config import MixnetConfig, MixnetTopology, MixnetTopologyConfig
from mixnet.node import MixNode
EntropyQueue: TypeAlias = "asyncio.Queue[bytes]"
class Mixnet:
__mixclient: MixClient
__mixnode: MixNode
topology_config: MixnetTopologyConfig
mixclient: MixClient
mixnode: MixNode
entropy_queue: EntropyQueue
task: asyncio.Task # A reference just to prevent task from being garbage collected
@classmethod
async def new(
cls,
encryption_private_key: X25519PrivateKey,
config: MixnetConfig,
entropy_queue: EntropyQueue,
) -> Self:
self = cls()
self.__mixclient = await MixClient.new(config)
self.__mixnode = await MixNode.new(encryption_private_key, config)
self.topology_config = config.topology_config
self.mixclient = await MixClient.new(config.mixclient_config)
self.mixnode = await MixNode.new(config.mixnode_config)
self.entropy_queue = entropy_queue
self.task = asyncio.create_task(self.__consume_entropy())
return self
async def publish_message(self, msg: bytes) -> None:
await self.__mixclient.send_message(msg)
await self.mixclient.send_message(msg)
def subscribe_messages(self) -> "asyncio.Queue[bytes]":
return self.__mixclient.subscribe_messages()
return self.mixclient.subscribe_messages()
def set_config(self, config: MixnetConfig) -> None:
"""
Replace the old config with the new config received.
async def __consume_entropy(
self,
) -> None:
while True:
entropy = await self.entropy_queue.get()
self.topology_config.entropy = entropy
In real implementations, this method should be a long-running task, accepting configs periodically.
Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment.
"""
self.__mixclient.set_config(config)
self.__mixnode.set_config(config)
def get_config(self) -> MixnetConfig:
return self.__mixclient.get_config()
topology = MixnetTopology(self.topology_config)
self.mixclient.set_topology(topology)
async def cancel(self) -> None:
await self.__mixclient.cancel()
await self.__mixnode.cancel()
self.task.cancel()
with suppress(asyncio.CancelledError):
await self.task
await self.mixclient.cancel()
await self.mixnode.cancel()
# Only for testing
def get_topology(self) -> MixnetTopology:
return self.mixclient.get_topology()

View File

@ -15,7 +15,7 @@ from pysphinx.sphinx import (
UnknownHeaderTypeError,
)
from mixnet.config import MixnetConfig, NodeAddress
from mixnet.config import MixNodeConfig, NodeAddress
from mixnet.poisson import poisson_interval_sec
PacketQueue: TypeAlias = "asyncio.Queue[Tuple[NodeAddress, SphinxPacket]]"
@ -32,30 +32,24 @@ class MixNode:
in order to define the MixNode as a simple dataclass for clarity.
"""
__config: MixnetConfig
config: MixNodeConfig
inbound_socket: PacketQueue
outbound_socket: PacketPayloadQueue
__task: asyncio.Task # A reference just to prevent task from being garbage collected
task: asyncio.Task # A reference just to prevent task from being garbage collected
@classmethod
async def new(
cls,
encryption_private_key: X25519PrivateKey,
config: MixnetConfig,
config: MixNodeConfig,
) -> Self:
self = cls()
self.__config = config
self.__establish_connections()
self.config = config
self.inbound_socket = asyncio.Queue()
self.outbound_socket = asyncio.Queue()
self.__task = asyncio.create_task(self.__run(encryption_private_key))
self.task = asyncio.create_task(self.__run())
return self
async def __run(
self,
encryption_private_key: X25519PrivateKey,
):
async def __run(self):
"""
Read SphinxPackets from inbound socket and spawn a thread for each packet to process it.
@ -70,7 +64,9 @@ class MixNode:
_, packet = await self.inbound_socket.get()
task = asyncio.create_task(
self.__process_packet(
packet, encryption_private_key, self.__config.delay_rate_per_min
packet,
self.config.encryption_private_key,
self.config.delay_rate_per_min,
)
)
self.tasks.add(task)
@ -105,30 +101,7 @@ class MixNode:
case _:
raise UnknownHeaderTypeError
def set_config(self, config: MixnetConfig) -> None:
"""
Replace the old config with the new config received.
If topology has been changed, start establishing new network connections in background.
In real implementations, this method may be integrated in a long-running task.
Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment.
"""
if self.__config.topology != config.topology:
self.__establish_connections()
self.__config = config
def __establish_connections(self) -> None:
"""
Establish network connections in advance based on the topology received.
This is just a preparation to forward subsequent packets as quickly as possible,
but this is not a strict requirement.
In real implementations, this should be a background task.
"""
pass
async def cancel(self) -> None:
self.__task.cancel()
self.task.cancel()
with suppress(asyncio.CancelledError):
await self.__task
await self.task

View File

@ -1,100 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import List
from mixnet.config import MixnetConfig, MixnetTopology, MixNodeInfo
from mixnet.fisheryates import FisherYates
from mixnet.mixnet import Mixnet
class Robustness:
"""
A robustness layer is placed on top of a mixnet layer and a consensus layer,
to separate their responsibilities and minimize dependencies between them.
For v1, the role of robustness layer is building a new mixnet topology
and injecting it to the mixnet layer,
whenever a new entropy is received from the consensus layer.
A static list of nodes is used for building topologies deterministically.
This can be changed in later versions.
In later versions, the robustness layer will have more responsibilities.
"""
def __init__(
self,
config: RobustnessConfig,
mixnet: Mixnet,
) -> None:
self.__config = config
self.__mixnet = mixnet
def set_entropy(self, entropy: bytes) -> None:
"""
Given a entropy received, build a new topology and send it to mixnet.
In v1, this doesn't change any mixnet config except topology.
In real implementations, this method should be a long-running task, consuming entropy periodically.
Here in the spec, this method has been simplified as a setter, assuming the single-thread test environment.
"""
self.__config.mixnet.mixnet_layer_config.topology = self.build_topology(
self.__config.mixnet.mixnode_candidates,
self.__config.mixnet.topology_size,
entropy,
)
self.__mixnet.set_config(self.__config.mixnet.mixnet_layer_config)
@staticmethod
def build_topology(
mixnode_candidates: List[MixNodeInfo],
mixnet_topology_size: MixnetTopologySize,
entropy: bytes,
) -> MixnetTopology:
"""
Build a new topology deterministically using an entropy and a given set of candidates.
"""
shuffled = FisherYates.shuffle(mixnode_candidates, entropy)
sampled = shuffled[: mixnet_topology_size.num_total_mixnodes()]
layers = []
for layer_id in range(mixnet_topology_size.num_layers):
start = layer_id * mixnet_topology_size.num_mixnodes_per_layer
layer = sampled[start : start + mixnet_topology_size.num_mixnodes_per_layer]
layers.append(layer)
return MixnetTopology(layers)
@dataclass
class RobustnessConfig:
"""In v1, the robustness layer manages configs only for the mixnet layer."""
mixnet: RobustnessMixnetConfig
class RobustnessMixnetConfig:
"""
Configurations for the mixnet layer
These configurations are meant to be changed over time according to other parameters from other layers (e.g. consensus).
"""
def __init__(
self,
mixnode_candidates: List[MixNodeInfo],
mixnet_topology_size: MixnetTopologySize,
mixnet_layer_config: MixnetConfig,
) -> None:
assert mixnet_topology_size.num_total_mixnodes() <= len(mixnode_candidates)
self.mixnode_candidates = mixnode_candidates
self.topology_size = mixnet_topology_size
# A config to be injected to the mixnet layer whenever it is updated
self.mixnet_layer_config = mixnet_layer_config
@dataclass
class MixnetTopologySize:
num_layers: int
num_mixnodes_per_layer: int
def num_total_mixnodes(self) -> int:
return self.num_layers * self.num_mixnodes_per_layer

View File

@ -6,7 +6,7 @@ import numpy
from mixnet.client import MixClient
from mixnet.poisson import poisson_mean_interval_sec
from mixnet.test_utils import (
init_robustness_mixnet_config,
init_mixnet_config,
with_test_timeout,
)
from mixnet.utils import random_bytes
@ -15,7 +15,7 @@ from mixnet.utils import random_bytes
class TestMixClient(IsolatedAsyncioTestCase):
@with_test_timeout(100)
async def test_mixclient(self):
config = init_robustness_mixnet_config().mixnet_layer_config
config = init_mixnet_config().mixclient_config
config.emission_rate_per_min = 30
config.redundancy = 3

View File

@ -1,26 +1,20 @@
import asyncio
from unittest import IsolatedAsyncioTestCase
from mixnet.mixnet import Mixnet
from mixnet.robustness import Robustness, RobustnessConfig
from mixnet.test_utils import init_robustness_mixnet_config
from mixnet.test_utils import init_mixnet_config
class TestMixnet(IsolatedAsyncioTestCase):
async def test_topology_from_robustness(self):
robustness_mixnet_config = init_robustness_mixnet_config()
config = init_mixnet_config()
entropy_queue = asyncio.Queue()
mixnet = await Mixnet.new(
robustness_mixnet_config.mixnode_candidates[0].encryption_private_key,
robustness_mixnet_config.mixnet_layer_config,
)
mixnet = await Mixnet.new(config, entropy_queue)
try:
robustness = Robustness(RobustnessConfig(robustness_mixnet_config), mixnet)
self.assertEqual(
robustness_mixnet_config.mixnet_layer_config, mixnet.get_config()
)
old_topology = robustness_mixnet_config.mixnet_layer_config.topology
robustness.set_entropy(b"new entropy")
self.assertNotEqual(old_topology, mixnet.get_config().topology)
old_topology = config.mixclient_config.topology
await entropy_queue.put(b"new entropy")
await asyncio.sleep(1)
self.assertNotEqual(old_topology, mixnet.get_topology())
finally:
await mixnet.cancel()

View File

@ -9,7 +9,7 @@ from mixnet.node import MixNode, NodeAddress, PacketQueue
from mixnet.packet import PacketBuilder
from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec
from mixnet.test_utils import (
init_robustness_mixnet_config,
init_mixnet_config,
with_test_timeout,
)
@ -24,14 +24,17 @@ class TestMixNodeRunner(IsolatedAsyncioTestCase):
and if processing is delayed according to an exponential distribution with a rate `mu`,
the rate of outputs should be `lambda`.
"""
config = init_robustness_mixnet_config().mixnet_layer_config
config.emission_rate_per_min = 120 # lambda (= 2msg/sec)
config.delay_rate_per_min = 30 # mu (= 2s delay on average)
config = init_mixnet_config()
config.mixclient_config.emission_rate_per_min = 120 # lambda (= 2msg/sec)
config.mixnode_config.delay_rate_per_min = 30 # mu (= 2s delay on average)
packet, route = PacketBuilder.build_real_packets(b"msg", config.topology)[0]
packet, route = PacketBuilder.build_real_packets(
b"msg", config.mixclient_config.topology
)[0]
# Start only the first mix node for testing
mixnode = await MixNode.new(route[0].encryption_private_key, config)
config.mixnode_config.encryption_private_key = route[0].encryption_private_key
mixnode = await MixNode.new(config.mixnode_config)
try:
# Send packets to the first mix node in a Poisson distribution
packet_count = 100
@ -43,7 +46,7 @@ class TestMixNodeRunner(IsolatedAsyncioTestCase):
packet,
route[0].addr,
packet_count,
config.emission_rate_per_min,
config.mixclient_config.emission_rate_per_min,
sent_packet_queue,
)
)
@ -77,14 +80,19 @@ class TestMixNodeRunner(IsolatedAsyncioTestCase):
# a mean interval between outputs must be `1/lambda`.
self.assertAlmostEqual(
float(numpy.mean(intervals)),
poisson_mean_interval_sec(config.emission_rate_per_min),
poisson_mean_interval_sec(
config.mixclient_config.emission_rate_per_min
),
delta=1.0,
)
# If runner is a M/M/inf queue,
# a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`.
self.assertAlmostEqual(
float(numpy.mean(num_jobs)),
round(config.emission_rate_per_min / config.delay_rate_per_min),
round(
config.mixclient_config.emission_rate_per_min
/ config.mixnode_config.delay_rate_per_min
),
delta=1.5,
)
finally:

View File

@ -10,13 +10,13 @@ from mixnet.packet import (
MessageReconstructor,
PacketBuilder,
)
from mixnet.test_utils import init_robustness_mixnet_config
from mixnet.test_utils import init_mixnet_config
from mixnet.utils import random_bytes
class TestPacket(TestCase):
def test_real_packet(self):
topology = init_robustness_mixnet_config().mixnet_layer_config.topology
topology = init_mixnet_config().mixclient_config.topology
msg = random_bytes(3500)
packets_and_routes = PacketBuilder.build_real_packets(msg, topology)
self.assertEqual(4, len(packets_and_routes))
@ -47,7 +47,7 @@ class TestPacket(TestCase):
)
def test_cover_packet(self):
topology = init_robustness_mixnet_config().mixnet_layer_config.topology
topology = init_mixnet_config().mixclient_config.topology
msg = b"cover"
packets_and_routes = PacketBuilder.build_drop_cover_packets(msg, topology)
self.assertEqual(1, len(packets_and_routes))

View File

@ -1,14 +0,0 @@
from unittest import TestCase
from mixnet.test_utils import init_robustness_mixnet_config
class TestRobustness(TestCase):
def test_build_topology(self):
robustness_mixnet_config = init_robustness_mixnet_config()
topology = robustness_mixnet_config.mixnet_layer_config.topology
topology_size = robustness_mixnet_config.topology_size
self.assertEqual(len(topology.layers), topology_size.num_layers)
for layer in topology.layers:
self.assertEqual(len(layer), topology_size.num_mixnodes_per_layer)

View File

@ -3,8 +3,15 @@ import asyncio
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from mixnet.bls import generate_bls
from mixnet.config import MixnetConfig, MixNodeInfo
from mixnet.robustness import MixnetTopologySize, Robustness, RobustnessMixnetConfig
from mixnet.config import (
MixClientConfig,
MixNodeConfig,
MixnetConfig,
MixNodeInfo,
MixnetTopology,
MixnetTopologyConfig,
MixnetTopologySize,
)
from mixnet.utils import random_bytes
@ -19,22 +26,21 @@ def with_test_timeout(t):
return wrapper
def init_robustness_mixnet_config() -> RobustnessMixnetConfig:
mixnode_candidates = [
MixNodeInfo(
generate_bls(),
X25519PrivateKey.generate(),
random_bytes(32),
)
for _ in range(12)
]
topology_size = MixnetTopologySize(3, 3)
mixnet_layer_config = MixnetConfig(
30,
3,
30,
Robustness.build_topology(mixnode_candidates, topology_size, b"entropy"),
def init_mixnet_config() -> MixnetConfig:
topology_config = MixnetTopologyConfig(
[
MixNodeInfo(
generate_bls(),
X25519PrivateKey.generate(),
random_bytes(32),
)
for _ in range(12)
],
MixnetTopologySize(3, 3),
b"entropy",
)
return RobustnessMixnetConfig(
mixnode_candidates, topology_size, mixnet_layer_config
mixclient_config = MixClientConfig(30, 3, MixnetTopology(topology_config))
mixnode_config = MixNodeConfig(
topology_config.mixnode_candidates[0].encryption_private_key, 30
)
return MixnetConfig(topology_config, mixclient_config, mixnode_config)