nomos-specs/mixnet/gossip.py

63 lines
2.1 KiB
Python
Raw Normal View History

2024-07-10 09:23:33 +09:00
import asyncio
import hashlib
from typing import Awaitable, Callable
from mixnet.config import GossipConfig
from mixnet.connection import DuplexConnection
class GossipChannel:
2024-07-10 09:55:51 +09:00
"""
A gossip channel that broadcasts messages to all connected peers.
Peers are connected via DuplexConnection.
This class simplifies and simulates the libp2p gossipsub.
"""
2024-07-10 09:23:33 +09:00
config: GossipConfig
conns: list[DuplexConnection]
2024-07-10 09:55:51 +09:00
# A handler to process inbound messages.
2024-07-10 09:23:33 +09:00
handler: Callable[[bytes], Awaitable[bytes | None]]
2024-07-10 09:55:51 +09:00
# A set of message hashes to prevent processing the same message twice.
2024-07-10 09:23:33 +09:00
msg_cache: set[bytes]
def __init__(
self,
config: GossipConfig,
handler: Callable[[bytes], Awaitable[bytes | None]],
):
self.config = config
self.conns = []
self.handler = handler
self.msg_cache = set()
# A set just for gathering a reference of tasks to prevent them from being garbage collected.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
self.tasks = set()
def add_conn(self, conn: DuplexConnection):
if len(self.conns) >= self.config.peering_degree:
# For simplicity of the spec, reject the connection if the peering degree is reached.
raise ValueError("The peering degree is reached.")
self.conns.append(conn)
task = asyncio.create_task(self.__process_inbound_conn(conn))
self.tasks.add(task)
# To discard the task from the set automatically when it is done.
task.add_done_callback(self.tasks.discard)
async def __process_inbound_conn(self, conn: DuplexConnection):
while True:
msg = await conn.recv()
# Don't process the same message twice.
msg_hash = hashlib.sha256(msg).digest()
if msg_hash in self.msg_cache:
continue
self.msg_cache.add(msg_hash)
new_msg = await self.handler(msg)
if new_msg is not None:
await self.gossip(new_msg)
async def gossip(self, packet: bytes):
for conn in self.conns:
await conn.send(packet)