diff --git a/da/network/__init__.py b/da/network/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/da/network/dispersal/proto.py b/da/network/dispersal/proto.py index 707a885..19378bc 100644 --- a/da/network/dispersal/proto.py +++ b/da/network/dispersal/proto.py @@ -1,24 +1,29 @@ -import dispersal_pb2 from itertools import count +import dispersal.dispersal_pb2 as dispersal_pb2 + MAX_MSG_LEN_BYTES = 2 + def pack_message(message): # SerializeToString method returns an instance of bytes. data = message.SerializeToString() - length_prefix = len(data).to_bytes(MAX_MSG_LEN_BYTES, byteorder='big') + length_prefix = len(data).to_bytes(MAX_MSG_LEN_BYTES, byteorder="big") return length_prefix + data + async def unpack_from_reader(reader): length_prefix = await reader.readexactly(MAX_MSG_LEN_BYTES) - data_length = int.from_bytes(length_prefix, byteorder='big') + data_length = int.from_bytes(length_prefix, byteorder="big") data = await reader.readexactly(data_length) return parse(data) + def unpack_from_bytes(data): length_prefix = data[:MAX_MSG_LEN_BYTES] - data_length = int.from_bytes(length_prefix, byteorder='big') - return parse(data[MAX_MSG_LEN_BYTES:MAX_MSG_LEN_BYTES + data_length]) + data_length = int.from_bytes(length_prefix, byteorder="big") + return parse(data[MAX_MSG_LEN_BYTES : MAX_MSG_LEN_BYTES + data_length]) + def parse(data): message = dispersal_pb2.DispersalMessage() @@ -28,30 +33,36 @@ def parse(data): # DISPERSAL + def new_dispersal_req_msg(blob_id, data): blob = dispersal_pb2.Blob(blob_id=blob_id, data=data) dispersal_req = dispersal_pb2.DispersalReq(blob=blob) dispersal_message = dispersal_pb2.DispersalMessage(dispersal_req=dispersal_req) return pack_message(dispersal_message) + def new_dispersal_res_success_msg(blob_id): dispersal_res = dispersal_pb2.DispersalRes(blob_id=blob_id) dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res) return pack_message(dispersal_message) + def new_dispersal_res_chunk_size_error_msg(blob_id, description): dispersal_err = dispersal_pb2.DispersalErr( - blob_id=blob_id, err_type=dispersal_pb2.DispersalErr.CHUNK_SIZE, - err_description=description + blob_id=blob_id, + err_type=dispersal_pb2.DispersalErr.CHUNK_SIZE, + err_description=description, ) dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err) dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res) return pack_message(dispersal_message) + def new_dispersal_res_verification_error_msg(blob_id, description): dispersal_err = dispersal_pb2.DispersalErr( - blob_id=blob_id, err_type=dispersal_pb2.DispersalErr.VERIFICATION, - err_description=description + blob_id=blob_id, + err_type=dispersal_pb2.DispersalErr.VERIFICATION, + err_description=description, ) dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err) dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res) @@ -60,21 +71,25 @@ def new_dispersal_res_verification_error_msg(blob_id, description): # SAMPLING + def new_sample_req_msg(blob_id): sample_req = dispersal_pb2.SampleReq(blob_id=blob_id) dispersal_message = dispersal_pb2.DispersalMessage(sample_req=sample_req) return pack_message(dispersal_message) + def new_sample_res_success_msg(blob_id, data): blob = dispersal_pb2.Blob(blob_id=blob_id, data=data) sample_res = dispersal_pb2.SampleRes(blob=blob) dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res) return pack_message(dispersal_message) + def new_sample_res_not_found_error_msg(blob_id, description): sample_err = dispersal_pb2.SampleErr( - blob_id=blob_id, err_type=dispersal_pb2.SampleErr.NOT_FOUND, - err_description=description + blob_id=blob_id, + err_type=dispersal_pb2.SampleErr.NOT_FOUND, + err_description=description, ) sample_res = dispersal_pb2.SampleRes(err=sample_err) dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res) @@ -83,24 +98,29 @@ def new_sample_res_not_found_error_msg(blob_id, description): # SESSION CONTROL + def new_close_msg(reason): close_msg = dispersal_pb2.CloseMsg(reason=reason) return close_msg + def new_session_req_close_msg(reason): close_msg = new_close_msg(reason) session_req = dispersal_pb2.SessionReq(close_msg=close_msg) dispersal_message = dispersal_pb2.DispersalMessage(session_req=session_req) return dispersal_message + def new_session_req_graceful_shutdown_msg(): message = new_session_req_close_msg(dispersal_pb2.CloseMsg.GRACEFUL_SHUTDOWN) return pack_message(message) + def new_session_req_subnet_change_msg(): message = new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_CHANGE) return pack_message(message) + def new_session_req_subnet_sample_fail_msg(): message = new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_SAMPLE_FAIL) return pack_message(message) diff --git a/da/network/executor.py b/da/network/executor.py index 4bd0f10..84dced9 100644 --- a/da/network/executor.py +++ b/da/network/executor.py @@ -1,15 +1,14 @@ -import sys from hashlib import sha256 from random import randbytes from typing import Self +import dispersal.proto as proto import multiaddr import trio from constants import HASH_LENGTH, PROTOCOL_ID from libp2p import host, new_host from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.peerinfo import info_from_p2p_addr -from da.network.dispersal import proto class Executor: @@ -34,6 +33,7 @@ class Executor: data: [] # stores hashes of the data for later verification data_hashes: [] + blob_id: int @classmethod def new(cls, port, node_list, num_subnets, data_size) -> Self: @@ -69,9 +69,12 @@ class Executor: Create random data for dispersal One packet of self.data_size length per subnet """ + id = sha256() for i in range(self.num_subnets): self.data[i] = randbytes(self.data_size) self.data_hashes[i] = sha256(self.data[i]).hexdigest() + id.update(self.data[i]) + self.blob_id = id.digest() async def disperse(self, nursery): """ @@ -96,7 +99,7 @@ class Executor: The index is the subnet number """ - blob_id = sha256(self.data) + blob_id = self.blob_id blob_data = self.data[index] message = proto.new_dispersal_req_msg(blob_id, blob_data) diff --git a/da/network/node.py b/da/network/node.py index c88fe51..2170e37 100644 --- a/da/network/node.py +++ b/da/network/node.py @@ -2,6 +2,7 @@ import sys from hashlib import sha256 from random import randint +import dispersal.proto as proto import multiaddr import trio from blspy import BasicSchemeMPL, G1Element, PrivateKey @@ -94,7 +95,8 @@ class DANode: while True: read_bytes = await stream.read(MAX_READ_LEN) if read_bytes is not None: - hashstr = sha256(read_bytes).hexdigest() + message = proto.unpack_from_bytes(read_bytes) + hashstr = sha256(message.dispersal_req.blob.data).hexdigest() if hashstr not in self.hashes: # "store" the received packet self.hashes.add(hashstr) diff --git a/da/network/poc.py b/da/network/poc.py index 41c8e09..a53790e 100644 --- a/da/network/poc.py +++ b/da/network/poc.py @@ -1,5 +1,6 @@ import argparse import sys +import time from random import randint import multiaddr @@ -31,10 +32,12 @@ async def run_network(params): disperse_send, disperse_recv = trio.open_memory_channel(0) async with trio.open_nursery() as nursery: nursery.start_soon(net.build, nursery, shutdown, disperse_send) - nursery.start_soon(run_subnets, net, params, nursery, shutdown, disperse_recv) + nursery.start_soon( + run_subnets, net, params, nursery, shutdown, disperse_send, disperse_recv + ) -async def run_subnets(net, params, nursery, shutdown, disperse_recv): +async def run_subnets(net, params, nursery, shutdown, disperse_send, disperse_recv): """ Run the actual PoC logic. Calculate the subnets. @@ -48,28 +51,34 @@ async def run_subnets(net, params, nursery, shutdown, disperse_recv): num_subnets = int(params.subnets) data_size = int(params.data_size) sample_threshold = int(params.sample_threshold) + fault_rate = int(params.fault_rate) + replication_factor = int(params.replication_factor) + while len(net.get_nodes()) != num_nodes: print("nodes not ready yet") await trio.sleep(0.1) print("Nodes ready") nodes = net.get_nodes() - subnets = calculate_subnets(nodes, num_subnets) + subnets = calculate_subnets(nodes, num_subnets, replication_factor) await print_subnet_info(subnets) print("Establishing connections...") node_list = {} all_node_instances = set() - await establish_connections(subnets, node_list, all_node_instances) + await establish_connections(subnets, node_list, all_node_instances, fault_rate) print("Starting executor...") exe = Executor.new(EXECUTOR_PORT, node_list, num_subnets, data_size) print("Start dispersal and wait to complete...") print("depending on network and subnet size this may take a while...") + global TIMESTAMP + TIMESTAMP = time.time() async with trio.open_nursery() as subnursery: subnursery.start_soon(wait_disperse_finished, disperse_recv, num_subnets) subnursery.start_soon(exe.disperse, nursery) + subnursery.start_soon(disperse_watcher, disperse_send.clone()) print() print() @@ -82,25 +91,44 @@ async def run_subnets(net, params, nursery, shutdown, disperse_recv): print("Waiting for sampling to finish...") await check_complete(checked, sample_threshold) + print_connections(all_node_instances) + print("Test completed") shutdown.set() +TIMESTAMP = time.time() + + +def print_connections(node_list): + for n in node_list: + for p in n.net_iface().get_peerstore().peer_ids(): + if p == n.net_iface().get_id(): + continue + print("node {} is connected to {}".format(n.get_id(), p)) + print() + + +async def disperse_watcher(disperse_send): + while time.time() - TIMESTAMP < 5: + await trio.sleep(1) + + await disperse_send.send(9999) + print("canceled") + + async def wait_disperse_finished(disperse_recv, num_subnets): - # the executor will be sending a packet - # num_subnets times right away - sends = num_subnets - recvs = 0 + # run until there are no changes detected async for value in disperse_recv: - print(".", end="") - if value < 0: - recvs += 1 - else: - sends += 1 - if sends == recvs: - disperse_recv.close() + if value == 9999: + print("dispersal finished") return + print(".", end="") + + global TIMESTAMP + TIMESTAMP = time.time() + async def print_subnet_info(subnets): """ @@ -119,7 +147,7 @@ async def print_subnet_info(subnets): print() -async def establish_connections(subnets, node_list, all_node_instances): +async def establish_connections(subnets, node_list, all_node_instances, fault_rate=0): """ Each node in a subnet connects to the other ones in that subnet. """ @@ -133,10 +161,15 @@ async def establish_connections(subnets, node_list, all_node_instances): # to later check if we are already connected with the next peer this_nodes_peers = n.net_iface().get_peerstore().peer_ids() all_node_instances.add(n) - for nn in subnets[subnet]: + faults = [] + for i in range(fault_rate): + faults.append(randint(0, len(subnets[subnet]))) + for i, nn in enumerate(subnets[subnet]): # don't connect to self if nn.get_id() == n.get_id(): continue + if i in faults: + continue remote_id = nn.get_id().pretty() remote_port = nn.get_port() # this script only works on localhost! @@ -206,6 +239,10 @@ if __name__ == "__main__": help="Threshold for sampling request attempts [default: 12]", ) parser.add_argument("-d", "--data-size", help="Size of packages [default: 1024]") + parser.add_argument("-f", "--fault_rate", help="Fault rate [default: 0]") + parser.add_argument( + "-r", "--replication_factor", help="Replication factor [default: 4]" + ) args = parser.parse_args() if not args.subnets: @@ -216,11 +253,16 @@ if __name__ == "__main__": args.sample_threshold = DEFAULT_SAMPLE_THRESHOLD if not args.data_size: args.data_size = DEFAULT_DATA_SIZE + if not args.replication_factor: + args.replication_factor = DEFAULT_REPLICATION_FACTOR + if not args.fault_rate: + args.fault_rate = 0 print("Number of subnets will be: {}".format(args.subnets)) print("Number of nodes will be: {}".format(args.nodes)) print("Size of data package will be: {}".format(args.data_size)) print("Threshold for sampling attempts will be: {}".format(args.sample_threshold)) + print("Fault rate will be: {}".format(args.fault_rate)) print() print("*******************") diff --git a/da/network/tests.py b/da/network/tests.py deleted file mode 100644 index a4f4ce8..0000000 --- a/da/network/tests.py +++ /dev/null @@ -1,284 +0,0 @@ -import argparse -import sys -import time -from random import randint - -import multiaddr -import trio -from constants import * -from executor import Executor -from libp2p.peer.peerinfo import info_from_p2p_addr -from network import DANetwork -from subnet import calculate_subnets - -""" - Entry point for the poc. - Handles cli arguments, initiates the network - and waits for it to complete. - - Also does some simple completion check. -""" - - -async def run_network(params): - """ - Create the network. - Run the run_subnets - """ - - num_nodes = int(params.nodes) - net = DANetwork(num_nodes) - shutdown = trio.Event() - disperse_send, disperse_recv = trio.open_memory_channel(0) - async with trio.open_nursery() as nursery: - nursery.start_soon(net.build, nursery, shutdown, disperse_send) - nursery.start_soon( - run_subnets, net, params, nursery, shutdown, disperse_send, disperse_recv - ) - - -async def run_subnets(net, params, nursery, shutdown, disperse_send, disperse_recv): - """ - Run the actual PoC logic. - Calculate the subnets. - -> Establish connections based on the subnets <- - Runs the executor. - Runs simulated sampling. - Runs simple completion check - """ - - num_nodes = int(params.nodes) - num_subnets = int(params.subnets) - data_size = int(params.data_size) - sample_threshold = int(params.sample_threshold) - fault_rate = int(params.fault_rate) - replication_factor = int(params.replication_factor) - - while len(net.get_nodes()) != num_nodes: - print("nodes not ready yet") - await trio.sleep(0.1) - - print("Nodes ready") - nodes = net.get_nodes() - subnets = calculate_subnets(nodes, num_subnets, replication_factor) - await print_subnet_info(subnets) - - print("Establishing connections...") - node_list = {} - all_node_instances = set() - await establish_connections(subnets, node_list, all_node_instances, fault_rate) - - print("Starting executor...") - exe = Executor.new(EXECUTOR_PORT, node_list, num_subnets, data_size) - - print("Start dispersal and wait to complete...") - print("depending on network and subnet size this may take a while...") - global TIMESTAMP - TIMESTAMP = time.time() - async with trio.open_nursery() as subnursery: - subnursery.start_soon(wait_disperse_finished, disperse_recv, num_subnets) - subnursery.start_soon(exe.disperse, nursery) - subnursery.start_soon(disperse_watcher, disperse_send.clone()) - - print() - print() - - print("OK. Start sampling...") - checked = [] - for _ in range(sample_threshold): - nursery.start_soon(sample_node, exe, subnets, checked) - - print("Waiting for sampling to finish...") - await check_complete(checked, sample_threshold) - - print_connections(all_node_instances) - - print("Test completed") - shutdown.set() - - -TIMESTAMP = time.time() - - -def print_connections(node_list): - for n in node_list: - for p in n.net_iface().get_peerstore().peer_ids(): - if p == n.net_iface().get_id(): - continue - print("node {} is connected to {}".format(n.get_id(), p)) - print() - - -async def disperse_watcher(disperse_send): - while time.time() - TIMESTAMP < 5: - await trio.sleep(1) - - await disperse_send.send(9999) - print("canceled") - - -async def wait_disperse_finished(disperse_recv, num_subnets): - # the executor will be sending a packet - # num_subnets times right away - sends = num_subnets - recvs = 0 - async for value in disperse_recv: - if value == 9999: - print("dispersal finished") - return - - print(".", end="") - if value < 0: - recvs += 1 - else: - sends += 1 - - global TIMESTAMP - TIMESTAMP = time.time() - # print(sends) - # print(recvs) - # if sends == recvs: - # disperse_recv.close() - # print("close") - # return - - -async def print_subnet_info(subnets): - """ - Print which node is in what subnet - """ - - print() - print("By subnets: ") - for subnet in subnets: - print("subnet: {} - ".format(subnet), end="") - for n in subnets[subnet]: - print(n.get_id().pretty()[:16], end=", ") - print() - - print() - print() - - -async def establish_connections(subnets, node_list, all_node_instances, fault_rate=0): - """ - Each node in a subnet connects to the other ones in that subnet. - """ - for subnet in subnets: - # n is a DANode - for n in subnets[subnet]: - # while nodes connect to each other, they are **mutually** added - # to their peer lists. Hence, we don't need to establish connections - # again to peers we are already connected. - # So in each iteration we get the peer list for the current node - # to later check if we are already connected with the next peer - this_nodes_peers = n.net_iface().get_peerstore().peer_ids() - all_node_instances.add(n) - faults = [] - for i in range(fault_rate): - faults.append(randint(0, len(subnets[subnet]))) - for i, nn in enumerate(subnets[subnet]): - # don't connect to self - if nn.get_id() == n.get_id(): - continue - if i in faults: - continue - remote_id = nn.get_id().pretty() - remote_port = nn.get_port() - # this script only works on localhost! - addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port, remote_id) - remote_addr = multiaddr.Multiaddr(addr) - remote = info_from_p2p_addr(remote_addr) - if subnet not in node_list: - node_list[subnet] = [] - node_list[subnet].append(remote) - # check if we are already connected with this peer. If yes, skip connecting - if nn.get_id() in this_nodes_peers: - continue - if DEBUG: - print("{} connecting to {}...".format(n.get_id(), addr)) - await n.net_iface().connect(remote) - - print() - - -async def check_complete(checked, sample_threshold): - """ - Simple completion check: - Check how many nodes have already been "sampled" - """ - - while len(checked) < sample_threshold: - await trio.sleep(0.5) - print("check_complete exiting") - return - - -async def sample_node(exe, subnets, checked): - """ - Pick a random subnet. - Pick a random node in that subnet. - As the executor has a list of hashes per subnet, - we can ask that node if it has that hash. - """ - - # s: subnet - s = randint(0, len(subnets) - 1) - # n: node (index) - n = randint(0, len(subnets[s]) - 1) - # actual node - node = subnets[s][n] - # pick the hash to check - hashstr = exe.get_hash(s) - # run the "sampling" - has = await node.has_hash(hashstr) - if has: - print("node {} has hash {}".format(node.get_id().pretty(), hashstr)) - else: - print("node {} does NOT HAVE hash {}".format(node.get_id().pretty(), hashstr)) - print("TEST FAILED") - # signal we "sampled" another node - checked.append(1) - return - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("-s", "--subnets", help="Number of subnets [default: 256]") - parser.add_argument("-n", "--nodes", help="Number of nodes [default: 32]") - parser.add_argument( - "-t", - "--sample-threshold", - help="Threshold for sampling request attempts [default: 12]", - ) - parser.add_argument("-d", "--data-size", help="Size of packages [default: 1024]") - parser.add_argument("-f", "--fault_rate", help="Fault rate [default: 0]") - parser.add_argument( - "-r", "--replication_factor", help="Replication factor [default: 4]" - ) - args = parser.parse_args() - - if not args.subnets: - args.subnets = DEFAULT_SUBNETS - if not args.nodes: - args.nodes = DEFAULT_NODES - if not args.sample_threshold: - args.sample_threshold = DEFAULT_SAMPLE_THRESHOLD - if not args.data_size: - args.data_size = DEFAULT_DATA_SIZE - if not args.replication_factor: - args.replication_factor = DEFAULT_REPLICATION_FACTOR - if not args.fault_rate: - args.fault_rate = 0 - - print("Number of subnets will be: {}".format(args.subnets)) - print("Number of nodes will be: {}".format(args.nodes)) - print("Size of data package will be: {}".format(args.data_size)) - print("Threshold for sampling attempts will be: {}".format(args.sample_threshold)) - print("Fault rate will be: {}".format(args.fault_rate)) - - print() - print("*******************") - print("Starting network...") - - trio.run(run_network, args)