mirror of
https://github.com/logos-co/nomos-specs.git
synced 2025-01-21 21:09:39 +00:00
Merge branch 'da-poc-tests' into da-poc
This commit is contained in:
commit
30ea9f9de4
@ -15,5 +15,7 @@ DEFAULT_DATA_SIZE = 1024
|
||||
DEFAULT_SUBNETS = 256
|
||||
DEFAULT_NODES = 32
|
||||
DEFAULT_SAMPLE_THRESHOLD = 12
|
||||
# how many nodes per subnet minimum
|
||||
DEFAULT_REPLICATION_FACTOR = 4
|
||||
|
||||
DEBUG = False
|
||||
|
@ -2,11 +2,8 @@ from random import randint
|
||||
|
||||
from constants import *
|
||||
|
||||
# how many nodes per subnet minimum
|
||||
REPLICATION_FACTOR = 4
|
||||
|
||||
|
||||
def calculate_subnets(node_list, num_subnets):
|
||||
def calculate_subnets(node_list, num_subnets, replication_factor):
|
||||
"""
|
||||
Calculate in which subnet(s) to place each node.
|
||||
This PoC does NOT require this to be analyzed,
|
||||
@ -45,7 +42,7 @@ def calculate_subnets(node_list, num_subnets):
|
||||
i += 1
|
||||
|
||||
# if not each subnet has at least factor number of nodes, fill up
|
||||
if listlen < REPLICATION_FACTOR * num_subnets:
|
||||
if listlen < replication_factor * num_subnets:
|
||||
for subnet in subnets:
|
||||
last = subnets[subnet][len(subnets[subnet]) - 1].get_id()
|
||||
idx = -1
|
||||
@ -54,7 +51,7 @@ def calculate_subnets(node_list, num_subnets):
|
||||
if n.get_id() == last:
|
||||
idx = j + 1
|
||||
# fill up until factor
|
||||
while len(subnets[subnet]) < REPLICATION_FACTOR:
|
||||
while len(subnets[subnet]) < replication_factor:
|
||||
# wrap index if at end
|
||||
if idx > len(node_list) - 1:
|
||||
idx = 0
|
||||
|
284
da/network/tests.py
Normal file
284
da/network/tests.py
Normal file
@ -0,0 +1,284 @@
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from random import randint
|
||||
|
||||
import multiaddr
|
||||
import trio
|
||||
from constants import *
|
||||
from executor import Executor
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
from network import DANetwork
|
||||
from subnet import calculate_subnets
|
||||
|
||||
"""
|
||||
Entry point for the poc.
|
||||
Handles cli arguments, initiates the network
|
||||
and waits for it to complete.
|
||||
|
||||
Also does some simple completion check.
|
||||
"""
|
||||
|
||||
|
||||
async def run_network(params):
|
||||
"""
|
||||
Create the network.
|
||||
Run the run_subnets
|
||||
"""
|
||||
|
||||
num_nodes = int(params.nodes)
|
||||
net = DANetwork(num_nodes)
|
||||
shutdown = trio.Event()
|
||||
disperse_send, disperse_recv = trio.open_memory_channel(0)
|
||||
async with trio.open_nursery() as nursery:
|
||||
nursery.start_soon(net.build, nursery, shutdown, disperse_send)
|
||||
nursery.start_soon(
|
||||
run_subnets, net, params, nursery, shutdown, disperse_send, disperse_recv
|
||||
)
|
||||
|
||||
|
||||
async def run_subnets(net, params, nursery, shutdown, disperse_send, disperse_recv):
|
||||
"""
|
||||
Run the actual PoC logic.
|
||||
Calculate the subnets.
|
||||
-> Establish connections based on the subnets <-
|
||||
Runs the executor.
|
||||
Runs simulated sampling.
|
||||
Runs simple completion check
|
||||
"""
|
||||
|
||||
num_nodes = int(params.nodes)
|
||||
num_subnets = int(params.subnets)
|
||||
data_size = int(params.data_size)
|
||||
sample_threshold = int(params.sample_threshold)
|
||||
fault_rate = int(params.fault_rate)
|
||||
replication_factor = int(params.replication_factor)
|
||||
|
||||
while len(net.get_nodes()) != num_nodes:
|
||||
print("nodes not ready yet")
|
||||
await trio.sleep(0.1)
|
||||
|
||||
print("Nodes ready")
|
||||
nodes = net.get_nodes()
|
||||
subnets = calculate_subnets(nodes, num_subnets, replication_factor)
|
||||
await print_subnet_info(subnets)
|
||||
|
||||
print("Establishing connections...")
|
||||
node_list = {}
|
||||
all_node_instances = set()
|
||||
await establish_connections(subnets, node_list, all_node_instances, fault_rate)
|
||||
|
||||
print("Starting executor...")
|
||||
exe = Executor.new(EXECUTOR_PORT, node_list, num_subnets, data_size)
|
||||
|
||||
print("Start dispersal and wait to complete...")
|
||||
print("depending on network and subnet size this may take a while...")
|
||||
global TIMESTAMP
|
||||
TIMESTAMP = time.time()
|
||||
async with trio.open_nursery() as subnursery:
|
||||
subnursery.start_soon(wait_disperse_finished, disperse_recv, num_subnets)
|
||||
subnursery.start_soon(exe.disperse, nursery)
|
||||
subnursery.start_soon(disperse_watcher, disperse_send.clone())
|
||||
|
||||
print()
|
||||
print()
|
||||
|
||||
print("OK. Start sampling...")
|
||||
checked = []
|
||||
for _ in range(sample_threshold):
|
||||
nursery.start_soon(sample_node, exe, subnets, checked)
|
||||
|
||||
print("Waiting for sampling to finish...")
|
||||
await check_complete(checked, sample_threshold)
|
||||
|
||||
print_connections(all_node_instances)
|
||||
|
||||
print("Test completed")
|
||||
shutdown.set()
|
||||
|
||||
|
||||
TIMESTAMP = time.time()
|
||||
|
||||
|
||||
def print_connections(node_list):
|
||||
for n in node_list:
|
||||
for p in n.net_iface().get_peerstore().peer_ids():
|
||||
if p == n.net_iface().get_id():
|
||||
continue
|
||||
print("node {} is connected to {}".format(n.get_id(), p))
|
||||
print()
|
||||
|
||||
|
||||
async def disperse_watcher(disperse_send):
|
||||
while time.time() - TIMESTAMP < 5:
|
||||
await trio.sleep(1)
|
||||
|
||||
await disperse_send.send(9999)
|
||||
print("canceled")
|
||||
|
||||
|
||||
async def wait_disperse_finished(disperse_recv, num_subnets):
|
||||
# the executor will be sending a packet
|
||||
# num_subnets times right away
|
||||
sends = num_subnets
|
||||
recvs = 0
|
||||
async for value in disperse_recv:
|
||||
if value == 9999:
|
||||
print("dispersal finished")
|
||||
return
|
||||
|
||||
print(".", end="")
|
||||
if value < 0:
|
||||
recvs += 1
|
||||
else:
|
||||
sends += 1
|
||||
|
||||
global TIMESTAMP
|
||||
TIMESTAMP = time.time()
|
||||
# print(sends)
|
||||
# print(recvs)
|
||||
# if sends == recvs:
|
||||
# disperse_recv.close()
|
||||
# print("close")
|
||||
# return
|
||||
|
||||
|
||||
async def print_subnet_info(subnets):
|
||||
"""
|
||||
Print which node is in what subnet
|
||||
"""
|
||||
|
||||
print()
|
||||
print("By subnets: ")
|
||||
for subnet in subnets:
|
||||
print("subnet: {} - ".format(subnet), end="")
|
||||
for n in subnets[subnet]:
|
||||
print(n.get_id().pretty()[:16], end=", ")
|
||||
print()
|
||||
|
||||
print()
|
||||
print()
|
||||
|
||||
|
||||
async def establish_connections(subnets, node_list, all_node_instances, fault_rate=0):
|
||||
"""
|
||||
Each node in a subnet connects to the other ones in that subnet.
|
||||
"""
|
||||
for subnet in subnets:
|
||||
# n is a DANode
|
||||
for n in subnets[subnet]:
|
||||
# while nodes connect to each other, they are **mutually** added
|
||||
# to their peer lists. Hence, we don't need to establish connections
|
||||
# again to peers we are already connected.
|
||||
# So in each iteration we get the peer list for the current node
|
||||
# to later check if we are already connected with the next peer
|
||||
this_nodes_peers = n.net_iface().get_peerstore().peer_ids()
|
||||
all_node_instances.add(n)
|
||||
faults = []
|
||||
for i in range(fault_rate):
|
||||
faults.append(randint(0, len(subnets[subnet])))
|
||||
for i, nn in enumerate(subnets[subnet]):
|
||||
# don't connect to self
|
||||
if nn.get_id() == n.get_id():
|
||||
continue
|
||||
if i in faults:
|
||||
continue
|
||||
remote_id = nn.get_id().pretty()
|
||||
remote_port = nn.get_port()
|
||||
# this script only works on localhost!
|
||||
addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port, remote_id)
|
||||
remote_addr = multiaddr.Multiaddr(addr)
|
||||
remote = info_from_p2p_addr(remote_addr)
|
||||
if subnet not in node_list:
|
||||
node_list[subnet] = []
|
||||
node_list[subnet].append(remote)
|
||||
# check if we are already connected with this peer. If yes, skip connecting
|
||||
if nn.get_id() in this_nodes_peers:
|
||||
continue
|
||||
if DEBUG:
|
||||
print("{} connecting to {}...".format(n.get_id(), addr))
|
||||
await n.net_iface().connect(remote)
|
||||
|
||||
print()
|
||||
|
||||
|
||||
async def check_complete(checked, sample_threshold):
|
||||
"""
|
||||
Simple completion check:
|
||||
Check how many nodes have already been "sampled"
|
||||
"""
|
||||
|
||||
while len(checked) < sample_threshold:
|
||||
await trio.sleep(0.5)
|
||||
print("check_complete exiting")
|
||||
return
|
||||
|
||||
|
||||
async def sample_node(exe, subnets, checked):
|
||||
"""
|
||||
Pick a random subnet.
|
||||
Pick a random node in that subnet.
|
||||
As the executor has a list of hashes per subnet,
|
||||
we can ask that node if it has that hash.
|
||||
"""
|
||||
|
||||
# s: subnet
|
||||
s = randint(0, len(subnets) - 1)
|
||||
# n: node (index)
|
||||
n = randint(0, len(subnets[s]) - 1)
|
||||
# actual node
|
||||
node = subnets[s][n]
|
||||
# pick the hash to check
|
||||
hashstr = exe.get_hash(s)
|
||||
# run the "sampling"
|
||||
has = await node.has_hash(hashstr)
|
||||
if has:
|
||||
print("node {} has hash {}".format(node.get_id().pretty(), hashstr))
|
||||
else:
|
||||
print("node {} does NOT HAVE hash {}".format(node.get_id().pretty(), hashstr))
|
||||
print("TEST FAILED")
|
||||
# signal we "sampled" another node
|
||||
checked.append(1)
|
||||
return
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-s", "--subnets", help="Number of subnets [default: 256]")
|
||||
parser.add_argument("-n", "--nodes", help="Number of nodes [default: 32]")
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--sample-threshold",
|
||||
help="Threshold for sampling request attempts [default: 12]",
|
||||
)
|
||||
parser.add_argument("-d", "--data-size", help="Size of packages [default: 1024]")
|
||||
parser.add_argument("-f", "--fault_rate", help="Fault rate [default: 0]")
|
||||
parser.add_argument(
|
||||
"-r", "--replication_factor", help="Replication factor [default: 4]"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.subnets:
|
||||
args.subnets = DEFAULT_SUBNETS
|
||||
if not args.nodes:
|
||||
args.nodes = DEFAULT_NODES
|
||||
if not args.sample_threshold:
|
||||
args.sample_threshold = DEFAULT_SAMPLE_THRESHOLD
|
||||
if not args.data_size:
|
||||
args.data_size = DEFAULT_DATA_SIZE
|
||||
if not args.replication_factor:
|
||||
args.replication_factor = DEFAULT_REPLICATION_FACTOR
|
||||
if not args.fault_rate:
|
||||
args.fault_rate = 0
|
||||
|
||||
print("Number of subnets will be: {}".format(args.subnets))
|
||||
print("Number of nodes will be: {}".format(args.nodes))
|
||||
print("Size of data package will be: {}".format(args.data_size))
|
||||
print("Threshold for sampling attempts will be: {}".format(args.sample_threshold))
|
||||
print("Fault rate will be: {}".format(args.fault_rate))
|
||||
|
||||
print()
|
||||
print("*******************")
|
||||
print("Starting network...")
|
||||
|
||||
trio.run(run_network, args)
|
Loading…
x
Reference in New Issue
Block a user