From 59f328c1176664cb84856ca7f6b92695ee8bbf88 Mon Sep 17 00:00:00 2001 From: holisticode Date: Mon, 8 Jul 2024 18:11:39 -0500 Subject: [PATCH] fixed sending and storing hashes; fixed proper test shutdown --- da/network/constants.py | 10 ++++++ da/network/executor.py | 14 +++----- da/network/network.py | 14 ++++---- da/network/node.py | 72 +++++++++++++++++++---------------------- da/network/poc.py | 52 +++++++++++++++++++++++++---- 5 files changed, 101 insertions(+), 61 deletions(-) create mode 100644 da/network/constants.py diff --git a/da/network/constants.py b/da/network/constants.py new file mode 100644 index 0000000..cd43262 --- /dev/null +++ b/da/network/constants.py @@ -0,0 +1,10 @@ +from libp2p.typing import TProtocol + +PROTOCOL_ID = TProtocol("/nomosda/1.0.0") +MAX_READ_LEN = 2**32 - 1 +# make this ocnfigurable +DATA_SIZE = 1024 +# make this ocnfigurable +COL_SIZE = 4096 + +SAMPLE_THRESHOLD = 12 diff --git a/da/network/executor.py b/da/network/executor.py index 3b8713c..c3bd88a 100644 --- a/da/network/executor.py +++ b/da/network/executor.py @@ -5,17 +5,10 @@ from typing import Self import multiaddr import trio +from constants import * from libp2p import host, new_host from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.typing import TProtocol - -PROTOCOL_ID = TProtocol("/nomosda/1.0.0") -MAX_READ_LEN = 2**32 - 1 -# make this ocnfigurable -DATA_SIZE = 1024 -# make this ocnfigurable -COL_SIZE = 4096 class Executor: @@ -52,8 +45,11 @@ class Executor: def get_port(self): return self.port + def get_hash(self, index: int): + return self.data_hashes[index] + def __create_data(self): - for i in range(COL_SIZE): + for i in range(COL_SIZE - 1): self.data[i] = randbytes(DATA_SIZE) self.data_hashes[i] = sha256(self.data[i]).hexdigest() diff --git a/da/network/network.py b/da/network/network.py index c19674a..a2235a8 100644 --- a/da/network/network.py +++ b/da/network/network.py @@ -1,23 +1,21 @@ import trio - from node import DANode + class DANetwork: - num_nodes: int + num_nodes: int nodes: [] - def __init__(self, nodes): + def __init__(self, nodes): self.num_nodes = nodes self.nodes = [] - async def build(self, nursery): - node_list = [] + async def build(self, nursery, shut_down): port_idx = 7560 for _ in range(self.num_nodes): port_idx += 1 - nursery.start_soon(DANode.new,port_idx, node_list, nursery) - self.nodes = node_list + nursery.start_soon(DANode.new, port_idx, self.nodes, nursery, shut_down) + return def get_nodes(self): return self.nodes - diff --git a/da/network/node.py b/da/network/node.py index d033761..b514fcf 100644 --- a/da/network/node.py +++ b/da/network/node.py @@ -5,13 +5,11 @@ from random import randint import multiaddr import trio from blspy import BasicSchemeMPL, G1Element, PrivateKey +from constants import * from libp2p import host, new_host +from libp2p.network.stream.exceptions import StreamReset from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.typing import TProtocol - -PROTOCOL_ID = TProtocol("/nomosda/1.0.0") -MAX_READ_LEN = 2**32 - 1 class DANode: @@ -20,35 +18,22 @@ class DANode: """ - pk: PrivateKey - id: G1Element listen_addr: multiaddr.Multiaddr host: host port: int node_list: [] - data: [] - hash: str - # inbound_socket: asyncio.Queue - # outbound_socket: asyncio.Queue + hashes: set() @classmethod - async def new(cls, port, node_list, nursery): + async def new(cls, port, node_list, nursery, shutdown): self = cls() - # self.pk = generate_random_sk() - # self.id = self.pk.get_g1() self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") self.host = new_host() self.port = port self.node_list = node_list - self.data = [] - self.hash = "" - nursery.start_soon(self.__run, nursery) + self.hashes = set() + nursery.start_soon(self.__run, nursery, shutdown) print("DA node at port {} initialized".format(port)) - # self.inbound_socket = asyncio.Queue() - # self.outbound_socket = asyncio.Queue() - - def hex_id(self): - return bytes(self.id).hex() def get_id(self): return self.host.get_id() @@ -59,36 +44,47 @@ class DANode: def get_port(self): return self.port - async def __run(self, nursery): + async def __run(self, nursery, shutdown): """ """ async with self.host.run(listen_addrs=[self.listen_addr]): print("starting node at {}...".format(self.listen_addr)) async def stream_handler(stream: INetStream) -> None: - nursery.start_soon(self.read_data, stream) - nursery.start_soon(self.write_data, stream) + nursery.start_soon(self.read_data, stream, nursery, shutdown) + # nursery.start_soon(self.write_data, stream) self.host.set_stream_handler(PROTOCOL_ID, stream_handler) self.node_list.append(self) - await trio.sleep_forever() + await shutdown.wait() - async def read_data(self, stream: INetStream) -> None: - while True: - read_bytes = await stream.read(MAX_READ_LEN) - if read_bytes is not None: - self.data = read_bytes - self.hash = sha256(self.data).hexdigest() - print("{} stored {}".format(self.host.get_id().pretty(), self.hash)) - else: - print("read_bytes is None, unexpected!") + async def read_data(self, stream: INetStream, nursery, shutdown) -> None: + first_event = None + async def select_event(async_fn, cancel_scope): + nonlocal first_event + first_event = await async_fn() + cancel_scope.cancel() + + async def read_stream(): + while True: + read_bytes = await stream.read(MAX_READ_LEN) + if read_bytes is not None: + hashstr = sha256(read_bytes).hexdigest() + self.hashes.add(hashstr) + print("{} stored {}".format(self.host.get_id().pretty(), hashstr)) + else: + print("read_bytes is None, unexpected!") + + nursery.start_soon(select_event, read_stream, nursery.cancel_scope) + nursery.start_soon(select_event, shutdown.wait, nursery.cancel_scope) + + """ async def write_data(self, stream: INetStream) -> None: async_f = trio.wrap_file(sys.stdin) while True: line = await async_f.readline() await stream.write(line.encode()) + """ - -def generate_random_sk() -> PrivateKey: - seed = bytes([randint(0, 255) for _ in range(32)]) - return BasicSchemeMPL.key_gen(seed) + async def has_hash(self, hashstr: str): + return hashstr in self.hashes diff --git a/da/network/poc.py b/da/network/poc.py index ea5a982..82029bd 100644 --- a/da/network/poc.py +++ b/da/network/poc.py @@ -1,7 +1,9 @@ import sys +from random import randint import multiaddr import trio +from constants import * from executor import Executor from libp2p.peer.peerinfo import info_from_p2p_addr from network import DANetwork @@ -17,17 +19,16 @@ async def run_network(): num_nodes = int(sys.argv[1]) net = DANetwork(num_nodes) + shutdown = trio.Event() async with trio.open_nursery() as nursery: - nursery.start_soon(net.build, nursery) - nursery.start_soon(run_subnets, net, num_nodes, nursery) - - await trio.sleep_forever() + nursery.start_soon(net.build, nursery, shutdown) + nursery.start_soon(run_subnets, net, num_nodes, nursery, shutdown) -async def run_subnets(net, num_nodes, nursery): +async def run_subnets(net, num_nodes, nursery, shutdown): while len(net.get_nodes()) != num_nodes: print("nodes not ready yet") - await trio.sleep(1) + await trio.sleep(0.1) print("nodes ready") nodes = net.get_nodes() @@ -48,9 +49,11 @@ async def run_subnets(net, num_nodes, nursery): print("Establishing connections...") node_list = [] + all_node_instances = set() for subnet in subnets: for n in subnets[subnet]: + all_node_instances.add(n) for i, nn in enumerate(subnets[subnet]): if nn.get_id() == n.get_id(): continue @@ -69,6 +72,43 @@ async def run_subnets(net, num_nodes, nursery): exe = Executor.new(7766, node_list) await exe.execute(nursery) + all_nodes = list(all_node_instances) + checked = [] + + print("starting sampling...") + for _ in range(SAMPLE_THRESHOLD): + nursery.start_soon(sample_node, exe, all_nodes, checked) + + print("waiting for sampling to finish...") + await check_complete(checked) + + print("Test completed") + shutdown.set() + + +async def check_complete(checked): + while len(checked) < SAMPLE_THRESHOLD: + await trio.sleep(0.5) + print(len(checked)) + print("waited") + print("check_complete exiting") + return + + +async def sample_node(exe, all_nodes, checked): + r = randint(0, COL_SIZE - 1) + hashstr = exe.get_hash(r) + n = randint(0, len(all_nodes) - 1) + node = all_nodes[n] + has = await node.has_hash(hashstr) + if has: + print("node {} has hash {}".format(node.get_id().pretty(), hashstr)) + else: + print("node {} does NOT HAVE hash {}".format(node.get_id().pretty(), hashstr)) + print("TEST FAILED") + checked.append(1) + return + if __name__ == "__main__": trio.run(run_network)