From 6a3d0cb83a05f9971edc59ee354512ab8226f988 Mon Sep 17 00:00:00 2001 From: holisticode Date: Thu, 18 Jul 2024 17:09:54 -0500 Subject: [PATCH] added test runs --- da/network/constants.py | 2 + da/network/subnet.py | 9 +- da/network/tests.py | 284 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 289 insertions(+), 6 deletions(-) create mode 100644 da/network/tests.py diff --git a/da/network/constants.py b/da/network/constants.py index 37d9e86..8d9efde 100644 --- a/da/network/constants.py +++ b/da/network/constants.py @@ -15,5 +15,7 @@ DEFAULT_DATA_SIZE = 1024 DEFAULT_SUBNETS = 256 DEFAULT_NODES = 32 DEFAULT_SAMPLE_THRESHOLD = 12 +# how many nodes per subnet minimum +DEFAULT_REPLICATION_FACTOR = 4 DEBUG = False diff --git a/da/network/subnet.py b/da/network/subnet.py index 52b7ef5..77d5110 100644 --- a/da/network/subnet.py +++ b/da/network/subnet.py @@ -2,11 +2,8 @@ from random import randint from constants import * -# how many nodes per subnet minimum -REPLICATION_FACTOR = 4 - -def calculate_subnets(node_list, num_subnets): +def calculate_subnets(node_list, num_subnets, replication_factor): """ Calculate in which subnet(s) to place each node. This PoC does NOT require this to be analyzed, @@ -45,7 +42,7 @@ def calculate_subnets(node_list, num_subnets): i += 1 # if not each subnet has at least factor number of nodes, fill up - if listlen < REPLICATION_FACTOR * num_subnets: + if listlen < replication_factor * num_subnets: for subnet in subnets: last = subnets[subnet][len(subnets[subnet]) - 1].get_id() idx = -1 @@ -54,7 +51,7 @@ def calculate_subnets(node_list, num_subnets): if n.get_id() == last: idx = j + 1 # fill up until factor - while len(subnets[subnet]) < REPLICATION_FACTOR: + while len(subnets[subnet]) < replication_factor: # wrap index if at end if idx > len(node_list) - 1: idx = 0 diff --git a/da/network/tests.py b/da/network/tests.py new file mode 100644 index 0000000..a4f4ce8 --- /dev/null +++ b/da/network/tests.py @@ -0,0 +1,284 @@ +import argparse +import sys +import time +from random import randint + +import multiaddr +import trio +from constants import * +from executor import Executor +from libp2p.peer.peerinfo import info_from_p2p_addr +from network import DANetwork +from subnet import calculate_subnets + +""" + Entry point for the poc. + Handles cli arguments, initiates the network + and waits for it to complete. + + Also does some simple completion check. +""" + + +async def run_network(params): + """ + Create the network. + Run the run_subnets + """ + + num_nodes = int(params.nodes) + net = DANetwork(num_nodes) + shutdown = trio.Event() + disperse_send, disperse_recv = trio.open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(net.build, nursery, shutdown, disperse_send) + nursery.start_soon( + run_subnets, net, params, nursery, shutdown, disperse_send, disperse_recv + ) + + +async def run_subnets(net, params, nursery, shutdown, disperse_send, disperse_recv): + """ + Run the actual PoC logic. + Calculate the subnets. + -> Establish connections based on the subnets <- + Runs the executor. + Runs simulated sampling. + Runs simple completion check + """ + + num_nodes = int(params.nodes) + num_subnets = int(params.subnets) + data_size = int(params.data_size) + sample_threshold = int(params.sample_threshold) + fault_rate = int(params.fault_rate) + replication_factor = int(params.replication_factor) + + while len(net.get_nodes()) != num_nodes: + print("nodes not ready yet") + await trio.sleep(0.1) + + print("Nodes ready") + nodes = net.get_nodes() + subnets = calculate_subnets(nodes, num_subnets, replication_factor) + await print_subnet_info(subnets) + + print("Establishing connections...") + node_list = {} + all_node_instances = set() + await establish_connections(subnets, node_list, all_node_instances, fault_rate) + + print("Starting executor...") + exe = Executor.new(EXECUTOR_PORT, node_list, num_subnets, data_size) + + print("Start dispersal and wait to complete...") + print("depending on network and subnet size this may take a while...") + global TIMESTAMP + TIMESTAMP = time.time() + async with trio.open_nursery() as subnursery: + subnursery.start_soon(wait_disperse_finished, disperse_recv, num_subnets) + subnursery.start_soon(exe.disperse, nursery) + subnursery.start_soon(disperse_watcher, disperse_send.clone()) + + print() + print() + + print("OK. Start sampling...") + checked = [] + for _ in range(sample_threshold): + nursery.start_soon(sample_node, exe, subnets, checked) + + print("Waiting for sampling to finish...") + await check_complete(checked, sample_threshold) + + print_connections(all_node_instances) + + print("Test completed") + shutdown.set() + + +TIMESTAMP = time.time() + + +def print_connections(node_list): + for n in node_list: + for p in n.net_iface().get_peerstore().peer_ids(): + if p == n.net_iface().get_id(): + continue + print("node {} is connected to {}".format(n.get_id(), p)) + print() + + +async def disperse_watcher(disperse_send): + while time.time() - TIMESTAMP < 5: + await trio.sleep(1) + + await disperse_send.send(9999) + print("canceled") + + +async def wait_disperse_finished(disperse_recv, num_subnets): + # the executor will be sending a packet + # num_subnets times right away + sends = num_subnets + recvs = 0 + async for value in disperse_recv: + if value == 9999: + print("dispersal finished") + return + + print(".", end="") + if value < 0: + recvs += 1 + else: + sends += 1 + + global TIMESTAMP + TIMESTAMP = time.time() + # print(sends) + # print(recvs) + # if sends == recvs: + # disperse_recv.close() + # print("close") + # return + + +async def print_subnet_info(subnets): + """ + Print which node is in what subnet + """ + + print() + print("By subnets: ") + for subnet in subnets: + print("subnet: {} - ".format(subnet), end="") + for n in subnets[subnet]: + print(n.get_id().pretty()[:16], end=", ") + print() + + print() + print() + + +async def establish_connections(subnets, node_list, all_node_instances, fault_rate=0): + """ + Each node in a subnet connects to the other ones in that subnet. + """ + for subnet in subnets: + # n is a DANode + for n in subnets[subnet]: + # while nodes connect to each other, they are **mutually** added + # to their peer lists. Hence, we don't need to establish connections + # again to peers we are already connected. + # So in each iteration we get the peer list for the current node + # to later check if we are already connected with the next peer + this_nodes_peers = n.net_iface().get_peerstore().peer_ids() + all_node_instances.add(n) + faults = [] + for i in range(fault_rate): + faults.append(randint(0, len(subnets[subnet]))) + for i, nn in enumerate(subnets[subnet]): + # don't connect to self + if nn.get_id() == n.get_id(): + continue + if i in faults: + continue + remote_id = nn.get_id().pretty() + remote_port = nn.get_port() + # this script only works on localhost! + addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port, remote_id) + remote_addr = multiaddr.Multiaddr(addr) + remote = info_from_p2p_addr(remote_addr) + if subnet not in node_list: + node_list[subnet] = [] + node_list[subnet].append(remote) + # check if we are already connected with this peer. If yes, skip connecting + if nn.get_id() in this_nodes_peers: + continue + if DEBUG: + print("{} connecting to {}...".format(n.get_id(), addr)) + await n.net_iface().connect(remote) + + print() + + +async def check_complete(checked, sample_threshold): + """ + Simple completion check: + Check how many nodes have already been "sampled" + """ + + while len(checked) < sample_threshold: + await trio.sleep(0.5) + print("check_complete exiting") + return + + +async def sample_node(exe, subnets, checked): + """ + Pick a random subnet. + Pick a random node in that subnet. + As the executor has a list of hashes per subnet, + we can ask that node if it has that hash. + """ + + # s: subnet + s = randint(0, len(subnets) - 1) + # n: node (index) + n = randint(0, len(subnets[s]) - 1) + # actual node + node = subnets[s][n] + # pick the hash to check + hashstr = exe.get_hash(s) + # run the "sampling" + has = await node.has_hash(hashstr) + if has: + print("node {} has hash {}".format(node.get_id().pretty(), hashstr)) + else: + print("node {} does NOT HAVE hash {}".format(node.get_id().pretty(), hashstr)) + print("TEST FAILED") + # signal we "sampled" another node + checked.append(1) + return + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-s", "--subnets", help="Number of subnets [default: 256]") + parser.add_argument("-n", "--nodes", help="Number of nodes [default: 32]") + parser.add_argument( + "-t", + "--sample-threshold", + help="Threshold for sampling request attempts [default: 12]", + ) + parser.add_argument("-d", "--data-size", help="Size of packages [default: 1024]") + parser.add_argument("-f", "--fault_rate", help="Fault rate [default: 0]") + parser.add_argument( + "-r", "--replication_factor", help="Replication factor [default: 4]" + ) + args = parser.parse_args() + + if not args.subnets: + args.subnets = DEFAULT_SUBNETS + if not args.nodes: + args.nodes = DEFAULT_NODES + if not args.sample_threshold: + args.sample_threshold = DEFAULT_SAMPLE_THRESHOLD + if not args.data_size: + args.data_size = DEFAULT_DATA_SIZE + if not args.replication_factor: + args.replication_factor = DEFAULT_REPLICATION_FACTOR + if not args.fault_rate: + args.fault_rate = 0 + + print("Number of subnets will be: {}".format(args.subnets)) + print("Number of nodes will be: {}".format(args.nodes)) + print("Size of data package will be: {}".format(args.data_size)) + print("Threshold for sampling attempts will be: {}".format(args.sample_threshold)) + print("Fault rate will be: {}".format(args.fault_rate)) + + print() + print("*******************") + print("Starting network...") + + trio.run(run_network, args)