nomos-pocs/da/subnets/network/node.py

130 lines
4.5 KiB
Python

import sys
from hashlib import sha256
from random import randint
import dispersal.proto as proto
import multiaddr
import trio
from blspy import BasicSchemeMPL, G1Element, PrivateKey
from constants import *
from libp2p import host, new_host
from libp2p.network.stream.exceptions import StreamReset
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.peerinfo import info_from_p2p_addr
class DANode:
"""
A class handling Data Availability (DA).
Runs on a hardcoded port.
Starts a libp2p node.
Listens on a handler for receiving data.
Resends all data it receives to all peers it is connected to
(therefore assumes connection logic is established elsewhere)
"""
listen_addr: multiaddr.Multiaddr
libp2phost: host
port: int
node_list: []
# list of packet hashes it "stores"
hashes: set()
@classmethod
async def new(cls, port, node_list, nursery, shutdown, disperse_send):
self = cls()
self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
self.libp2phost = new_host()
self.port = port
self.node_list = node_list
self.hashes = set()
nursery.start_soon(self.__run, nursery, shutdown, disperse_send)
if DEBUG:
print("DA node at port {} initialized".format(port))
def get_id(self):
return self.libp2phost.get_id()
def net_iface(self):
return self.libp2phost
def get_port(self):
return self.port
async def __run(self, nursery, shutdown, disperse_send):
"""
Run the node. Starts libp2p host, and listener for data
"""
async with self.libp2phost.run(listen_addrs=[self.listen_addr]):
print("started node at {}...".format(self.listen_addr))
# handler to run when data is received
async def stream_handler(stream: INetStream) -> None:
nursery.start_soon(
self.read_data, stream, nursery, shutdown, disperse_send
)
# set the above handler
self.libp2phost.set_stream_handler(PROTOCOL_ID, stream_handler)
# at this point the node is "initialized" - signal it's "ready"
self.node_list.append(self)
# run until we shutdown
await shutdown.wait()
async def read_data(
self, stream: INetStream, nursery, shutdown, disperse_send
) -> None:
"""
We need to wait for incoming data, but also we want to shutdown
when the test is finished.
The following code makes sure that both events are listened to
and the first which occurs is handled.
"""
first_event = None
async def select_event(async_fn, cancel_scope):
nonlocal first_event
first_event = await async_fn()
cancel_scope.cancel()
disperse_send.close()
async def read_stream():
while True:
read_bytes = await stream.read(MAX_READ_LEN)
if read_bytes is not None:
message = proto.unpack_from_bytes(read_bytes)
hashstr = sha256(message.dispersal_req.blob.data).hexdigest()
if hashstr not in self.hashes:
# "store" the received packet
self.hashes.add(hashstr)
# now disperse this hash to all peers
nursery.start_soon(self.disperse, read_bytes, disperse_send)
if DEBUG:
print(
"{} stored {}".format(
self.libp2phost.get_id().pretty(), hashstr
)
)
await disperse_send.send(-1)
else:
print("read_bytes is None, unexpected!")
nursery.start_soon(select_event, read_stream, nursery.cancel_scope)
nursery.start_soon(select_event, shutdown.wait, nursery.cancel_scope)
async def disperse(self, packet, disperse_send) -> None:
# disperse the given packet to all peers
for p_id in self.libp2phost.get_peerstore().peer_ids():
if p_id == self.libp2phost.get_id():
continue
await disperse_send.send(1)
stream = await self.libp2phost.new_stream(p_id, [PROTOCOL_ID])
await stream.write(packet)
async def has_hash(self, hashstr: str):
return hashstr in self.hashes