fixed sending and storing hashes; fixed proper test shutdown

This commit is contained in:
holisticode 2024-07-08 18:11:39 -05:00 committed by Gusto
parent e95d7e27ca
commit 59f328c117
No known key found for this signature in database
5 changed files with 101 additions and 61 deletions

10
da/network/constants.py Normal file
View File

@ -0,0 +1,10 @@
from libp2p.typing import TProtocol
PROTOCOL_ID = TProtocol("/nomosda/1.0.0")
MAX_READ_LEN = 2**32 - 1
# make this ocnfigurable
DATA_SIZE = 1024
# make this ocnfigurable
COL_SIZE = 4096
SAMPLE_THRESHOLD = 12

View File

@ -5,17 +5,10 @@ from typing import Self
import multiaddr
import trio
from constants import *
from libp2p import host, new_host
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.typing import TProtocol
PROTOCOL_ID = TProtocol("/nomosda/1.0.0")
MAX_READ_LEN = 2**32 - 1
# make this ocnfigurable
DATA_SIZE = 1024
# make this ocnfigurable
COL_SIZE = 4096
class Executor:
@ -52,8 +45,11 @@ class Executor:
def get_port(self):
return self.port
def get_hash(self, index: int):
return self.data_hashes[index]
def __create_data(self):
for i in range(COL_SIZE):
for i in range(COL_SIZE - 1):
self.data[i] = randbytes(DATA_SIZE)
self.data_hashes[i] = sha256(self.data[i]).hexdigest()

View File

@ -1,23 +1,21 @@
import trio
from node import DANode
class DANetwork:
num_nodes: int
num_nodes: int
nodes: []
def __init__(self, nodes):
def __init__(self, nodes):
self.num_nodes = nodes
self.nodes = []
async def build(self, nursery):
node_list = []
async def build(self, nursery, shut_down):
port_idx = 7560
for _ in range(self.num_nodes):
port_idx += 1
nursery.start_soon(DANode.new,port_idx, node_list, nursery)
self.nodes = node_list
nursery.start_soon(DANode.new, port_idx, self.nodes, nursery, shut_down)
return
def get_nodes(self):
return self.nodes

View File

@ -5,13 +5,11 @@ from random import randint
import multiaddr
import trio
from blspy import BasicSchemeMPL, G1Element, PrivateKey
from constants import *
from libp2p import host, new_host
from libp2p.network.stream.exceptions import StreamReset
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.typing import TProtocol
PROTOCOL_ID = TProtocol("/nomosda/1.0.0")
MAX_READ_LEN = 2**32 - 1
class DANode:
@ -20,35 +18,22 @@ class DANode:
"""
pk: PrivateKey
id: G1Element
listen_addr: multiaddr.Multiaddr
host: host
port: int
node_list: []
data: []
hash: str
# inbound_socket: asyncio.Queue
# outbound_socket: asyncio.Queue
hashes: set()
@classmethod
async def new(cls, port, node_list, nursery):
async def new(cls, port, node_list, nursery, shutdown):
self = cls()
# self.pk = generate_random_sk()
# self.id = self.pk.get_g1()
self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
self.host = new_host()
self.port = port
self.node_list = node_list
self.data = []
self.hash = ""
nursery.start_soon(self.__run, nursery)
self.hashes = set()
nursery.start_soon(self.__run, nursery, shutdown)
print("DA node at port {} initialized".format(port))
# self.inbound_socket = asyncio.Queue()
# self.outbound_socket = asyncio.Queue()
def hex_id(self):
return bytes(self.id).hex()
def get_id(self):
return self.host.get_id()
@ -59,36 +44,47 @@ class DANode:
def get_port(self):
return self.port
async def __run(self, nursery):
async def __run(self, nursery, shutdown):
""" """
async with self.host.run(listen_addrs=[self.listen_addr]):
print("starting node at {}...".format(self.listen_addr))
async def stream_handler(stream: INetStream) -> None:
nursery.start_soon(self.read_data, stream)
nursery.start_soon(self.write_data, stream)
nursery.start_soon(self.read_data, stream, nursery, shutdown)
# nursery.start_soon(self.write_data, stream)
self.host.set_stream_handler(PROTOCOL_ID, stream_handler)
self.node_list.append(self)
await trio.sleep_forever()
await shutdown.wait()
async def read_data(self, stream: INetStream) -> None:
while True:
read_bytes = await stream.read(MAX_READ_LEN)
if read_bytes is not None:
self.data = read_bytes
self.hash = sha256(self.data).hexdigest()
print("{} stored {}".format(self.host.get_id().pretty(), self.hash))
else:
print("read_bytes is None, unexpected!")
async def read_data(self, stream: INetStream, nursery, shutdown) -> None:
first_event = None
async def select_event(async_fn, cancel_scope):
nonlocal first_event
first_event = await async_fn()
cancel_scope.cancel()
async def read_stream():
while True:
read_bytes = await stream.read(MAX_READ_LEN)
if read_bytes is not None:
hashstr = sha256(read_bytes).hexdigest()
self.hashes.add(hashstr)
print("{} stored {}".format(self.host.get_id().pretty(), hashstr))
else:
print("read_bytes is None, unexpected!")
nursery.start_soon(select_event, read_stream, nursery.cancel_scope)
nursery.start_soon(select_event, shutdown.wait, nursery.cancel_scope)
"""
async def write_data(self, stream: INetStream) -> None:
async_f = trio.wrap_file(sys.stdin)
while True:
line = await async_f.readline()
await stream.write(line.encode())
"""
def generate_random_sk() -> PrivateKey:
seed = bytes([randint(0, 255) for _ in range(32)])
return BasicSchemeMPL.key_gen(seed)
async def has_hash(self, hashstr: str):
return hashstr in self.hashes

View File

@ -1,7 +1,9 @@
import sys
from random import randint
import multiaddr
import trio
from constants import *
from executor import Executor
from libp2p.peer.peerinfo import info_from_p2p_addr
from network import DANetwork
@ -17,17 +19,16 @@ async def run_network():
num_nodes = int(sys.argv[1])
net = DANetwork(num_nodes)
shutdown = trio.Event()
async with trio.open_nursery() as nursery:
nursery.start_soon(net.build, nursery)
nursery.start_soon(run_subnets, net, num_nodes, nursery)
await trio.sleep_forever()
nursery.start_soon(net.build, nursery, shutdown)
nursery.start_soon(run_subnets, net, num_nodes, nursery, shutdown)
async def run_subnets(net, num_nodes, nursery):
async def run_subnets(net, num_nodes, nursery, shutdown):
while len(net.get_nodes()) != num_nodes:
print("nodes not ready yet")
await trio.sleep(1)
await trio.sleep(0.1)
print("nodes ready")
nodes = net.get_nodes()
@ -48,9 +49,11 @@ async def run_subnets(net, num_nodes, nursery):
print("Establishing connections...")
node_list = []
all_node_instances = set()
for subnet in subnets:
for n in subnets[subnet]:
all_node_instances.add(n)
for i, nn in enumerate(subnets[subnet]):
if nn.get_id() == n.get_id():
continue
@ -69,6 +72,43 @@ async def run_subnets(net, num_nodes, nursery):
exe = Executor.new(7766, node_list)
await exe.execute(nursery)
all_nodes = list(all_node_instances)
checked = []
print("starting sampling...")
for _ in range(SAMPLE_THRESHOLD):
nursery.start_soon(sample_node, exe, all_nodes, checked)
print("waiting for sampling to finish...")
await check_complete(checked)
print("Test completed")
shutdown.set()
async def check_complete(checked):
while len(checked) < SAMPLE_THRESHOLD:
await trio.sleep(0.5)
print(len(checked))
print("waited")
print("check_complete exiting")
return
async def sample_node(exe, all_nodes, checked):
r = randint(0, COL_SIZE - 1)
hashstr = exe.get_hash(r)
n = randint(0, len(all_nodes) - 1)
node = all_nodes[n]
has = await node.has_hash(hashstr)
if has:
print("node {} has hash {}".format(node.get_id().pretty(), hashstr))
else:
print("node {} does NOT HAVE hash {}".format(node.get_id().pretty(), hashstr))
print("TEST FAILED")
checked.append(1)
return
if __name__ == "__main__":
trio.run(run_network)