mirror of
https://github.com/logos-co/nomos-specs.git
synced 2025-01-10 23:56:31 +00:00
Mixnet: Packet delay in mix node (#49)
This commit is contained in:
parent
1fc319de9e
commit
d963d6cb51
@ -2,20 +2,10 @@ from __future__ import annotations
|
||||
|
||||
import random
|
||||
from dataclasses import dataclass
|
||||
from typing import List, TypeAlias
|
||||
from typing import List
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.x25519 import (
|
||||
X25519PrivateKey,
|
||||
X25519PublicKey,
|
||||
)
|
||||
from pysphinx.node import Node
|
||||
|
||||
from mixnet.bls import BlsPrivateKey, BlsPublicKey
|
||||
from mixnet.fisheryates import FisherYates
|
||||
|
||||
NodeId: TypeAlias = BlsPublicKey
|
||||
# 32-byte that represents an IP address and a port of a mix node.
|
||||
NodeAddress: TypeAlias = bytes
|
||||
from mixnet.node import MixNode
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -49,22 +39,6 @@ class Mixnet:
|
||||
return random.choice(self.mix_nodes)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MixNode:
|
||||
identity_private_key: BlsPrivateKey
|
||||
encryption_private_key: X25519PrivateKey
|
||||
addr: NodeAddress
|
||||
|
||||
def identity_public_key(self) -> BlsPublicKey:
|
||||
return self.identity_private_key.get_g1()
|
||||
|
||||
def encryption_public_key(self) -> X25519PublicKey:
|
||||
return self.encryption_private_key.public_key()
|
||||
|
||||
def sphinx_node(self) -> Node:
|
||||
return Node(self.encryption_private_key, self.addr)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MixnetTopology:
|
||||
layers: List[List[MixNode]]
|
||||
|
174
mixnet/node.py
Normal file
174
mixnet/node.py
Normal file
@ -0,0 +1,174 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from threading import Thread
|
||||
from typing import Tuple, TypeAlias
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.x25519 import (
|
||||
X25519PrivateKey,
|
||||
X25519PublicKey,
|
||||
)
|
||||
from pysphinx.node import Node
|
||||
from pysphinx.sphinx import (
|
||||
Payload,
|
||||
ProcessedFinalHopPacket,
|
||||
ProcessedForwardHopPacket,
|
||||
SphinxPacket,
|
||||
UnknownHeaderTypeError,
|
||||
)
|
||||
|
||||
from mixnet.bls import BlsPrivateKey, BlsPublicKey
|
||||
from mixnet.poisson import poisson_interval_sec
|
||||
|
||||
NodeId: TypeAlias = BlsPublicKey
|
||||
# 32-byte that represents an IP address and a port of a mix node.
|
||||
NodeAddress: TypeAlias = bytes
|
||||
|
||||
PacketQueue: TypeAlias = "queue.Queue[Tuple[NodeAddress, SphinxPacket]]"
|
||||
PacketPayloadQueue: TypeAlias = (
|
||||
"queue.Queue[Tuple[NodeAddress, SphinxPacket | Payload]]"
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MixNode:
|
||||
identity_private_key: BlsPrivateKey
|
||||
encryption_private_key: X25519PrivateKey
|
||||
addr: NodeAddress
|
||||
|
||||
def identity_public_key(self) -> BlsPublicKey:
|
||||
return self.identity_private_key.get_g1()
|
||||
|
||||
def encryption_public_key(self) -> X25519PublicKey:
|
||||
return self.encryption_private_key.public_key()
|
||||
|
||||
def sphinx_node(self) -> Node:
|
||||
return Node(self.encryption_private_key, self.addr)
|
||||
|
||||
def start(
|
||||
self,
|
||||
delay_rate_per_min: int,
|
||||
inbound_socket: PacketQueue,
|
||||
outbound_socket: PacketPayloadQueue,
|
||||
) -> MixNodeRunner:
|
||||
thread = MixNodeRunner(
|
||||
self.encryption_private_key,
|
||||
delay_rate_per_min,
|
||||
inbound_socket,
|
||||
outbound_socket,
|
||||
)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
return thread
|
||||
|
||||
|
||||
class MixNodeRunner(Thread):
|
||||
"""
|
||||
Read SphinxPackets from inbound socket and spawn a thread for each packet to process it.
|
||||
|
||||
This thread approximates a M/M/inf queue.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
encryption_private_key: X25519PrivateKey,
|
||||
delay_rate_per_min: int, # Poisson rate parameter: mu
|
||||
inbound_socket: PacketQueue,
|
||||
outbound_socket: PacketPayloadQueue,
|
||||
):
|
||||
super().__init__()
|
||||
self.encryption_private_key = encryption_private_key
|
||||
self.delay_rate_per_min = delay_rate_per_min
|
||||
self.inbound_socket = inbound_socket
|
||||
self.outbound_socket = outbound_socket
|
||||
self.num_processing = AtomicInt(0)
|
||||
|
||||
def run(self) -> None:
|
||||
# Here in Python, this thread is implemented in synchronous manner.
|
||||
# In the real implementation, consider implementing this in asynchronous if possible,
|
||||
# to approximate a M/M/inf queue
|
||||
while True:
|
||||
_, packet = self.inbound_socket.get()
|
||||
thread = MixNodePacketProcessor(
|
||||
packet,
|
||||
self.encryption_private_key,
|
||||
self.delay_rate_per_min,
|
||||
self.outbound_socket,
|
||||
self.num_processing,
|
||||
)
|
||||
thread.daemon = True
|
||||
self.num_processing.add(1)
|
||||
thread.start()
|
||||
|
||||
def num_jobs(self) -> int:
|
||||
"""
|
||||
Return the number of packets that are being processed or still in the inbound socket.
|
||||
|
||||
If this thread works as a M/M/inf queue completely,
|
||||
the number of packets that are still in the inbound socket must be always 0.
|
||||
"""
|
||||
return self.num_processing.get() + self.inbound_socket.qsize()
|
||||
|
||||
|
||||
class MixNodePacketProcessor(Thread):
|
||||
"""
|
||||
Process a single packet with a delay that follows exponential distribution,
|
||||
and forward it to the next mix node or the mix destination
|
||||
|
||||
This thread is a single server (worker) in a M/M/inf queue that MixNodeRunner approximates.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
packet: SphinxPacket,
|
||||
encryption_private_key: X25519PrivateKey,
|
||||
delay_rate_per_min: int, # Poisson rate parameter: mu
|
||||
outbound_socket: PacketPayloadQueue,
|
||||
num_processing: AtomicInt,
|
||||
):
|
||||
super().__init__()
|
||||
self.packet = packet
|
||||
self.encryption_private_key = encryption_private_key
|
||||
self.delay_rate_per_min = delay_rate_per_min
|
||||
self.outbound_socket = outbound_socket
|
||||
self.num_processing = num_processing
|
||||
|
||||
def run(self) -> None:
|
||||
delay_sec = poisson_interval_sec(self.delay_rate_per_min)
|
||||
time.sleep(delay_sec)
|
||||
|
||||
processed = self.packet.process(self.encryption_private_key)
|
||||
match processed:
|
||||
case ProcessedForwardHopPacket():
|
||||
self.outbound_socket.put(
|
||||
(processed.next_node_address, processed.next_packet)
|
||||
)
|
||||
case ProcessedFinalHopPacket():
|
||||
self.outbound_socket.put(
|
||||
(processed.destination_node_address, processed.payload)
|
||||
)
|
||||
case _:
|
||||
raise UnknownHeaderTypeError
|
||||
|
||||
self.num_processing.sub(1)
|
||||
|
||||
|
||||
class AtomicInt:
|
||||
def __init__(self, initial: int) -> None:
|
||||
self.lock = threading.Lock()
|
||||
self.value = initial
|
||||
|
||||
def add(self, v: int):
|
||||
with self.lock:
|
||||
self.value += v
|
||||
|
||||
def sub(self, v: int):
|
||||
with self.lock:
|
||||
self.value -= v
|
||||
|
||||
def get(self) -> int:
|
||||
with self.lock:
|
||||
return self.value
|
13
mixnet/poisson.py
Normal file
13
mixnet/poisson.py
Normal file
@ -0,0 +1,13 @@
|
||||
import numpy
|
||||
|
||||
|
||||
def poisson_interval_sec(rate_per_min: int) -> float:
|
||||
# If events occur in a Poisson distribution with rate_per_min,
|
||||
# the interval between events follows the exponential distribution
|
||||
# with the rate_per_min (i.e. with the scale 1/rate_per_min).
|
||||
interval_min = numpy.random.exponential(scale=1 / rate_per_min, size=1)[0]
|
||||
return interval_min * 60
|
||||
|
||||
|
||||
def poisson_mean_interval_sec(rate_per_min: int) -> float:
|
||||
return 1 / rate_per_min * 60
|
114
mixnet/test_node.py
Normal file
114
mixnet/test_node.py
Normal file
@ -0,0 +1,114 @@
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Tuple
|
||||
from unittest import TestCase
|
||||
|
||||
import numpy
|
||||
import timeout_decorator
|
||||
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
|
||||
from pysphinx.sphinx import SphinxPacket
|
||||
|
||||
from mixnet.bls import generate_bls
|
||||
from mixnet.mixnet import Mixnet, MixnetTopology
|
||||
from mixnet.node import MixNode, NodeAddress, PacketPayloadQueue, PacketQueue
|
||||
from mixnet.packet import PacketBuilder
|
||||
from mixnet.poisson import poisson_interval_sec, poisson_mean_interval_sec
|
||||
from mixnet.utils import random_bytes
|
||||
|
||||
|
||||
class TestMixNodeRunner(TestCase):
|
||||
@timeout_decorator.timeout(180)
|
||||
def test_mixnode_runner_emission_rate(self):
|
||||
"""
|
||||
Test if MixNodeRunner works as a M/M/inf queue.
|
||||
|
||||
If inputs are arrived at Poisson rate `lambda`,
|
||||
and if processing is delayed according to an exponential distribution with a rate `mu`,
|
||||
the rate of outputs should be `lambda`.
|
||||
"""
|
||||
mixnet, topology = self.init()
|
||||
inbound_socket: PacketQueue = queue.Queue()
|
||||
outbound_socket: PacketPayloadQueue = queue.Queue()
|
||||
|
||||
packet, route = PacketBuilder.real(b"msg", mixnet, topology).next()
|
||||
|
||||
delay_rate_per_min = 30 # mu (= 2s delay on average)
|
||||
# Start only the first mix node for testing
|
||||
runner = route[0].start(delay_rate_per_min, inbound_socket, outbound_socket)
|
||||
|
||||
# Send packets to the first mix node in a Poisson distribution
|
||||
packet_count = 100
|
||||
emission_rate_per_min = 120 # lambda (= 2msg/sec)
|
||||
sender = threading.Thread(
|
||||
target=self.send_packets,
|
||||
args=(
|
||||
inbound_socket,
|
||||
packet,
|
||||
route[0].addr,
|
||||
packet_count,
|
||||
emission_rate_per_min,
|
||||
),
|
||||
)
|
||||
sender.daemon = True
|
||||
sender.start()
|
||||
|
||||
# Calculate intervals between outputs and gather num_jobs in the first mix node.
|
||||
intervals = []
|
||||
num_jobs = []
|
||||
ts = datetime.now()
|
||||
for _ in range(packet_count):
|
||||
_ = outbound_socket.get()
|
||||
now = datetime.now()
|
||||
intervals.append((now - ts).total_seconds())
|
||||
num_jobs.append(runner.num_jobs())
|
||||
ts = now
|
||||
# Remove the first interval that would be much larger than other intervals,
|
||||
# because of the delay in mix node.
|
||||
intervals = intervals[1:]
|
||||
num_jobs = num_jobs[1:]
|
||||
|
||||
# Check if the emission rate of the first mix node is the same as
|
||||
# the emission rate of the message sender, but with a delay.
|
||||
# If outputs follow the Poisson distribution with a rate `lambda`,
|
||||
# a mean interval between outputs must be `1/lambda`.
|
||||
self.assertAlmostEqual(
|
||||
float(numpy.mean(intervals)),
|
||||
poisson_mean_interval_sec(emission_rate_per_min),
|
||||
delta=1.0,
|
||||
)
|
||||
# If runner is a M/M/inf queue,
|
||||
# a mean number of jobs being processed/scheduled in the runner must be `lambda/mu`.
|
||||
self.assertAlmostEqual(
|
||||
float(numpy.mean(num_jobs)),
|
||||
round(emission_rate_per_min / delay_rate_per_min),
|
||||
delta=1.0,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def send_packets(
|
||||
inbound_socket: PacketQueue,
|
||||
packet: SphinxPacket,
|
||||
node_addr: NodeAddress,
|
||||
cnt: int,
|
||||
rate_per_min: int,
|
||||
):
|
||||
for _ in range(cnt):
|
||||
time.sleep(poisson_interval_sec(rate_per_min))
|
||||
inbound_socket.put((node_addr, packet))
|
||||
|
||||
@staticmethod
|
||||
def init() -> Tuple[Mixnet, MixnetTopology]:
|
||||
mixnet = Mixnet(
|
||||
[
|
||||
MixNode(
|
||||
generate_bls(),
|
||||
X25519PrivateKey.generate(),
|
||||
random_bytes(32),
|
||||
)
|
||||
for _ in range(12)
|
||||
]
|
||||
)
|
||||
topology = mixnet.build_topology(b"entropy", 3, 3)
|
||||
return mixnet, topology
|
@ -5,7 +5,8 @@ from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
|
||||
from pysphinx.sphinx import ProcessedFinalHopPacket, SphinxPacket
|
||||
|
||||
from mixnet.bls import generate_bls
|
||||
from mixnet.mixnet import Mixnet, MixnetTopology, MixNode
|
||||
from mixnet.mixnet import Mixnet, MixnetTopology
|
||||
from mixnet.node import MixNode
|
||||
from mixnet.packet import (
|
||||
Fragment,
|
||||
MessageFlag,
|
||||
|
@ -6,4 +6,5 @@ pycparser==2.21
|
||||
pysphinx==0.0.1
|
||||
scipy==1.11.4
|
||||
setuptools==69.0.3
|
||||
timeout-decorator==0.5.0
|
||||
wheel==0.42.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user