nomos-specs/mixnet/node.py

131 lines
4.0 KiB
Python
Raw Normal View History

2024-01-23 01:29:14 +00:00
from __future__ import annotations
2024-01-25 09:04:55 +00:00
import asyncio
2024-01-23 01:29:14 +00:00
from dataclasses import dataclass
from typing import Tuple, TypeAlias
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from pysphinx.node import Node
from pysphinx.sphinx import (
Payload,
ProcessedFinalHopPacket,
ProcessedForwardHopPacket,
SphinxPacket,
UnknownHeaderTypeError,
)
from mixnet.bls import BlsPrivateKey, BlsPublicKey
from mixnet.poisson import poisson_interval_sec
NodeId: TypeAlias = BlsPublicKey
# 32-byte that represents an IP address and a port of a mix node.
NodeAddress: TypeAlias = bytes
2024-01-25 09:04:55 +00:00
PacketQueue: TypeAlias = "asyncio.Queue[Tuple[NodeAddress, SphinxPacket]]"
2024-01-23 01:29:14 +00:00
PacketPayloadQueue: TypeAlias = (
2024-01-25 09:04:55 +00:00
"asyncio.Queue[Tuple[NodeAddress, SphinxPacket | Payload]]"
2024-01-23 01:29:14 +00:00
)
@dataclass
class MixNode:
identity_private_key: BlsPrivateKey
encryption_private_key: X25519PrivateKey
addr: NodeAddress
def identity_public_key(self) -> BlsPublicKey:
return self.identity_private_key.get_g1()
def encryption_public_key(self) -> X25519PublicKey:
return self.encryption_private_key.public_key()
def sphinx_node(self) -> Node:
return Node(self.encryption_private_key, self.addr)
def start(
self,
2024-01-25 09:04:55 +00:00
delay_rate_per_min: int, # Poisson rate parameter: mu
2024-01-23 01:29:14 +00:00
inbound_socket: PacketQueue,
outbound_socket: PacketPayloadQueue,
2024-01-25 09:04:55 +00:00
) -> asyncio.Task:
return asyncio.create_task(
MixNodeRunner(
self.encryption_private_key,
delay_rate_per_min,
inbound_socket,
outbound_socket,
).run()
2024-01-23 01:29:14 +00:00
)
2024-01-25 09:04:55 +00:00
class MixNodeRunner:
2024-01-23 01:29:14 +00:00
"""
2024-01-25 09:04:55 +00:00
A class handling incoming packets with delays
2024-01-23 01:29:14 +00:00
2024-01-25 09:04:55 +00:00
This class is defined separated with the MixNode class,
in order to define the MixNode as a simple dataclass for clarity.
2024-01-23 01:29:14 +00:00
"""
def __init__(
self,
encryption_private_key: X25519PrivateKey,
delay_rate_per_min: int, # Poisson rate parameter: mu
inbound_socket: PacketQueue,
outbound_socket: PacketPayloadQueue,
):
self.encryption_private_key = encryption_private_key
self.delay_rate_per_min = delay_rate_per_min
self.inbound_socket = inbound_socket
self.outbound_socket = outbound_socket
2024-01-25 09:04:55 +00:00
async def run(self):
2024-01-23 01:29:14 +00:00
"""
2024-01-25 09:04:55 +00:00
Read SphinxPackets from inbound socket and spawn a thread for each packet to process it.
2024-01-23 01:29:14 +00:00
2024-01-25 09:04:55 +00:00
This thread approximates a M/M/inf queue.
2024-01-23 01:29:14 +00:00
"""
2024-01-25 09:04:55 +00:00
# A set just for gathering a reference of tasks to prevent them from being garbage collected.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
self.tasks = set()
2024-01-23 01:29:14 +00:00
2024-01-25 09:04:55 +00:00
while True:
_, packet = await self.inbound_socket.get()
task = asyncio.create_task(
self.process_packet(
packet,
)
)
self.tasks.add(task)
# To discard the task from the set automatically when it is done.
task.add_done_callback(self.tasks.discard)
2024-01-23 01:29:14 +00:00
2024-01-25 09:04:55 +00:00
async def process_packet(
2024-01-23 01:29:14 +00:00
self,
packet: SphinxPacket,
):
2024-01-25 09:04:55 +00:00
"""
Process a single packet with a delay that follows exponential distribution,
and forward it to the next mix node or the mix destination
2024-01-23 01:29:14 +00:00
2024-01-25 09:04:55 +00:00
This thread is a single server (worker) in a M/M/inf queue that MixNodeRunner approximates.
"""
2024-01-23 01:29:14 +00:00
delay_sec = poisson_interval_sec(self.delay_rate_per_min)
2024-01-25 09:04:55 +00:00
await asyncio.sleep(delay_sec)
2024-01-23 01:29:14 +00:00
2024-01-25 09:04:55 +00:00
processed = packet.process(self.encryption_private_key)
2024-01-23 01:29:14 +00:00
match processed:
case ProcessedForwardHopPacket():
2024-01-25 09:04:55 +00:00
await self.outbound_socket.put(
2024-01-23 01:29:14 +00:00
(processed.next_node_address, processed.next_packet)
)
case ProcessedFinalHopPacket():
2024-01-25 09:04:55 +00:00
await self.outbound_socket.put(
2024-01-23 01:29:14 +00:00
(processed.destination_node_address, processed.payload)
)
case _:
raise UnknownHeaderTypeError