diff --git a/da/network/constants.py b/da/network/constants.py index 34e3384..9ac2e12 100644 --- a/da/network/constants.py +++ b/da/network/constants.py @@ -2,11 +2,10 @@ from libp2p.typing import TProtocol PROTOCOL_ID = TProtocol("/nomosda/1.0.0") MAX_READ_LEN = 2**32 - 1 -# make this ocnfigurable -DATA_SIZE = 1024 -# make this ocnfigurable -COL_SIZE = 4096 -SAMPLE_THRESHOLD = 12 +DEFAULT_DATA_SIZE = 1024 +DEFAULT_SUBNETS = 256 +DEFAULT_NODES = 32 +DEFAULT_SAMPLE_THRESHOLD = 12 DEBUG = False diff --git a/da/network/executor.py b/da/network/executor.py index 464e57b..1a1cf0d 100644 --- a/da/network/executor.py +++ b/da/network/executor.py @@ -20,18 +20,22 @@ class Executor: listen_addr: multiaddr.Multiaddr host: host port: int + num_subnets: int + data_size: int node_list: {} data: [] data_hashes: [] @classmethod - def new(cls, port, node_list) -> Self: + def new(cls, port, node_list, num_subnets, data_size) -> Self: self = cls() self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") self.host = new_host() self.port = port - self.data = [[] * DATA_SIZE] * COL_SIZE - self.data_hashes = [[] * 256] * COL_SIZE + self.num_subnets = num_subnets + self.data_size = data_size + self.data = [[] * data_size] * num_subnets + self.data_hashes = [[] * 256] * num_subnets self.node_list = node_list self.__create_data() return self @@ -49,8 +53,8 @@ class Executor: return self.data_hashes[index] def __create_data(self): - for i in range(COL_SIZE): - self.data[i] = randbytes(DATA_SIZE) + for i in range(self.num_subnets): + self.data[i] = randbytes(self.data_size) self.data_hashes[i] = sha256(self.data[i]).hexdigest() async def execute(self, nursery): diff --git a/da/network/network.py b/da/network/network.py index c009f46..d72f46b 100644 --- a/da/network/network.py +++ b/da/network/network.py @@ -1,4 +1,5 @@ import trio +from constants import DEBUG from node import DANode @@ -15,7 +16,8 @@ class DANetwork: for _ in range(self.num_nodes): port_idx += 1 nursery.start_soon(DANode.new, port_idx, self.nodes, nursery, shutdown) - print("net built") + if DEBUG: + print("net built") def get_nodes(self): return self.nodes diff --git a/da/network/poc.py b/da/network/poc.py index 451f705..178dd97 100644 --- a/da/network/poc.py +++ b/da/network/poc.py @@ -1,3 +1,4 @@ +import argparse import sys from random import randint @@ -12,27 +13,27 @@ from subnet import calculate_subnets default_nodes = 32 -async def run_network(): - if len(sys.argv) == 1: - num_nodes = default_nodes - else: - num_nodes = int(sys.argv[1]) - +async def run_network(args): + num_nodes = int(args.nodes) net = DANetwork(num_nodes) shutdown = trio.Event() async with trio.open_nursery() as nursery: nursery.start_soon(net.build, nursery, shutdown) - nursery.start_soon(run_subnets, net, num_nodes, nursery, shutdown) + nursery.start_soon(run_subnets, net, args, nursery, shutdown) -async def run_subnets(net, num_nodes, nursery, shutdown): +async def run_subnets(net, args, nursery, shutdown): + num_nodes = int(args.nodes) + num_subnets = int(args.subnets) + data_size = int(args.data_size) + sample_threshold = int(args.sample_threshold) while len(net.get_nodes()) != num_nodes: print("nodes not ready yet") await trio.sleep(0.1) - print("nodes ready") + print("Nodes ready") nodes = net.get_nodes() - subnets = calculate_subnets(nodes) + subnets = calculate_subnets(nodes, num_subnets) await print_subnet_info(subnets) print("Establishing connections...") @@ -40,21 +41,24 @@ async def run_subnets(net, num_nodes, nursery, shutdown): all_node_instances = set() await establish_connections(subnets, node_list, all_node_instances) - print("starting executor...") - exe = Executor.new(7766, node_list) + print("Starting executor...") + exe = Executor.new(7766, node_list, num_subnets, data_size) + print( + "Disperse packets...(depending on the size of the network and number of subnets, this may take a while...)" + ) await exe.execute(nursery) all_nodes = list(all_node_instances) checked = [] - await trio.sleep(1) + await trio.sleep(20) - print("starting sampling...") - for _ in range(SAMPLE_THRESHOLD): + print("Starting sampling...") + for _ in range(sample_threshold): nursery.start_soon(sample_node, exe, subnets, checked) - print("waiting for sampling to finish...") - await check_complete(checked) + print("Waiting for sampling to finish...") + await check_complete(checked, sample_threshold) print("Test completed") shutdown.set() @@ -92,14 +96,15 @@ async def establish_connections(subnets, node_list, all_node_instances): node_list[subnet].append(remote) if nn.get_id() in this_nodes_peers: continue - print("{} connecting to {}...".format(n.get_id(), addr)) + if DEBUG: + print("{} connecting to {}...".format(n.get_id(), addr)) await n.net_iface().connect(remote) print() -async def check_complete(checked): - while len(checked) < SAMPLE_THRESHOLD: +async def check_complete(checked, sample_threshold): + while len(checked) < sample_threshold: await trio.sleep(0.5) print("check_complete exiting") return @@ -121,4 +126,32 @@ async def sample_node(exe, subnets, checked): if __name__ == "__main__": - trio.run(run_network) + parser = argparse.ArgumentParser() + parser.add_argument("-s", "--subnets", help="Number of subnets [default: 256]") + parser.add_argument("-n", "--nodes", help="Number of nodes [default: 32]") + parser.add_argument( + "-t", + "--sample-threshold", + help="Threshold for sampling request attempts [default: 12]", + ) + parser.add_argument("-d", "--data-size", help="Size of packages [default: 1024]") + args = parser.parse_args() + + if not args.subnets: + args.subnets = DEFAULT_SUBNETS + if not args.nodes: + args.nodes = DEFAULT_NODES + if not args.sample_threshold: + args.sample_threshold = DEFAULT_SAMPLE_THRESHOLD + if not args.data_size: + args.data_size = DEFAULT_DATA_SIZE + + print("Number of subnets will be: {}".format(args.subnets)) + print("Number of nodes will be: {}".format(args.nodes)) + print("Size of data package will be: {}".format(args.data_size)) + print("Threshold for sampling attempts will be: {}".format(args.sample_threshold)) + + print() + print("*******************") + print("Starting network...") + trio.run(run_network, args) diff --git a/da/network/subnet.py b/da/network/subnet.py index 8096c73..d9c145d 100644 --- a/da/network/subnet.py +++ b/da/network/subnet.py @@ -6,10 +6,10 @@ from constants import * REPLICATION_FACTOR = 4 -def calculate_subnets(node_list): +def calculate_subnets(node_list, num_subnets): subnets = {} for i, n in enumerate(node_list): - idx = i % COL_SIZE + idx = i % num_subnets if idx not in subnets: subnets[idx] = [] @@ -17,12 +17,12 @@ def calculate_subnets(node_list): listlen = len(node_list) i = listlen - while i < COL_SIZE: + while i < num_subnets: subnets[i] = [] subnets[i].append(node_list[i % listlen]) i += 1 - if listlen < REPLICATION_FACTOR * COL_SIZE: + if listlen < REPLICATION_FACTOR * num_subnets: for subnet in subnets: last = subnets[subnet][len(subnets[subnet]) - 1].get_id() idx = -1