diff --git a/da/network/constants.py b/da/network/constants.py index cd43262..34e3384 100644 --- a/da/network/constants.py +++ b/da/network/constants.py @@ -8,3 +8,5 @@ DATA_SIZE = 1024 COL_SIZE = 4096 SAMPLE_THRESHOLD = 12 + +DEBUG = False diff --git a/da/network/executor.py b/da/network/executor.py index c3bd88a..464e57b 100644 --- a/da/network/executor.py +++ b/da/network/executor.py @@ -20,7 +20,7 @@ class Executor: listen_addr: multiaddr.Multiaddr host: host port: int - node_list: [] + node_list: {} data: [] data_hashes: [] @@ -49,18 +49,19 @@ class Executor: return self.data_hashes[index] def __create_data(self): - for i in range(COL_SIZE - 1): + for i in range(COL_SIZE): self.data[i] = randbytes(DATA_SIZE) self.data_hashes[i] = sha256(self.data[i]).hexdigest() async def execute(self, nursery): """ """ async with self.host.run(listen_addrs=[self.listen_addr]): - for i, n in enumerate(self.node_list): + for subnet, nodes in self.node_list.items(): + n = nodes[0] await self.host.connect(n) stream = await self.host.new_stream(n.peer_id, [PROTOCOL_ID]) - nursery.start_soon(self.write_data, stream, i) + nursery.start_soon(self.write_data, stream, subnet) async def write_data(self, stream: INetStream, index: int) -> None: await stream.write(self.data[index]) diff --git a/da/network/network.py b/da/network/network.py index a2235a8..c009f46 100644 --- a/da/network/network.py +++ b/da/network/network.py @@ -10,12 +10,12 @@ class DANetwork: self.num_nodes = nodes self.nodes = [] - async def build(self, nursery, shut_down): + async def build(self, nursery, shutdown): port_idx = 7560 for _ in range(self.num_nodes): port_idx += 1 - nursery.start_soon(DANode.new, port_idx, self.nodes, nursery, shut_down) - return + nursery.start_soon(DANode.new, port_idx, self.nodes, nursery, shutdown) + print("net built") def get_nodes(self): return self.nodes diff --git a/da/network/node.py b/da/network/node.py index b514fcf..520fa14 100644 --- a/da/network/node.py +++ b/da/network/node.py @@ -19,7 +19,7 @@ class DANode: """ listen_addr: multiaddr.Multiaddr - host: host + libp2phost: host port: int node_list: [] hashes: set() @@ -28,7 +28,7 @@ class DANode: async def new(cls, port, node_list, nursery, shutdown): self = cls() self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - self.host = new_host() + self.libp2phost = new_host() self.port = port self.node_list = node_list self.hashes = set() @@ -36,24 +36,23 @@ class DANode: print("DA node at port {} initialized".format(port)) def get_id(self): - return self.host.get_id() + return self.libp2phost.get_id() def net_iface(self): - return self.host + return self.libp2phost def get_port(self): return self.port async def __run(self, nursery, shutdown): """ """ - async with self.host.run(listen_addrs=[self.listen_addr]): + async with self.libp2phost.run(listen_addrs=[self.listen_addr]): print("starting node at {}...".format(self.listen_addr)) async def stream_handler(stream: INetStream) -> None: nursery.start_soon(self.read_data, stream, nursery, shutdown) - # nursery.start_soon(self.write_data, stream) - self.host.set_stream_handler(PROTOCOL_ID, stream_handler) + self.libp2phost.set_stream_handler(PROTOCOL_ID, stream_handler) self.node_list.append(self) await shutdown.wait() @@ -70,21 +69,27 @@ class DANode: read_bytes = await stream.read(MAX_READ_LEN) if read_bytes is not None: hashstr = sha256(read_bytes).hexdigest() - self.hashes.add(hashstr) - print("{} stored {}".format(self.host.get_id().pretty(), hashstr)) + if hashstr not in self.hashes: + self.hashes.add(hashstr) + nursery.start_soon(self.disperse, read_bytes) + if DEBUG: + print( + "{} stored {}".format( + self.libp2phost.get_id().pretty(), hashstr + ) + ) else: print("read_bytes is None, unexpected!") nursery.start_soon(select_event, read_stream, nursery.cancel_scope) nursery.start_soon(select_event, shutdown.wait, nursery.cancel_scope) - """ - async def write_data(self, stream: INetStream) -> None: - async_f = trio.wrap_file(sys.stdin) - while True: - line = await async_f.readline() - await stream.write(line.encode()) - """ + async def disperse(self, packet) -> None: + for p_id in self.libp2phost.get_peerstore().peer_ids(): + if p_id == self.libp2phost.get_id(): + continue + stream = await self.libp2phost.new_stream(p_id, [PROTOCOL_ID]) + await stream.write(packet) async def has_hash(self, hashstr: str): return hashstr in self.hashes diff --git a/da/network/poc.py b/da/network/poc.py index 82029bd..451f705 100644 --- a/da/network/poc.py +++ b/da/network/poc.py @@ -33,7 +33,34 @@ async def run_subnets(net, num_nodes, nursery, shutdown): print("nodes ready") nodes = net.get_nodes() subnets = calculate_subnets(nodes) + await print_subnet_info(subnets) + print("Establishing connections...") + node_list = {} + all_node_instances = set() + await establish_connections(subnets, node_list, all_node_instances) + + print("starting executor...") + exe = Executor.new(7766, node_list) + await exe.execute(nursery) + + all_nodes = list(all_node_instances) + checked = [] + + await trio.sleep(1) + + print("starting sampling...") + for _ in range(SAMPLE_THRESHOLD): + nursery.start_soon(sample_node, exe, subnets, checked) + + print("waiting for sampling to finish...") + await check_complete(checked) + + print("Test completed") + shutdown.set() + + +async def print_subnet_info(subnets): print() print("By subnets: ") for subnet in subnets: @@ -44,15 +71,13 @@ async def run_subnets(net, num_nodes, nursery, shutdown): print() print() - print() - print("Establishing connections...") - node_list = [] - all_node_instances = set() +async def establish_connections(subnets, node_list, all_node_instances): for subnet in subnets: for n in subnets[subnet]: + this_nodes_peers = n.net_iface().get_peerstore().peer_ids() all_node_instances.add(n) for i, nn in enumerate(subnets[subnet]): if nn.get_id() == n.get_id(): @@ -61,45 +86,30 @@ async def run_subnets(net, num_nodes, nursery, shutdown): remote_port = nn.get_port() addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port, remote_id) remote_addr = multiaddr.Multiaddr(addr) - print("{} connecting to {}...".format(n.get_id(), addr)) remote = info_from_p2p_addr(remote_addr) - if i == 0: - node_list.append(remote) + if subnet not in node_list: + node_list[subnet] = [] + node_list[subnet].append(remote) + if nn.get_id() in this_nodes_peers: + continue + print("{} connecting to {}...".format(n.get_id(), addr)) await n.net_iface().connect(remote) print() - print("starting executor...") - exe = Executor.new(7766, node_list) - await exe.execute(nursery) - - all_nodes = list(all_node_instances) - checked = [] - - print("starting sampling...") - for _ in range(SAMPLE_THRESHOLD): - nursery.start_soon(sample_node, exe, all_nodes, checked) - - print("waiting for sampling to finish...") - await check_complete(checked) - - print("Test completed") - shutdown.set() async def check_complete(checked): while len(checked) < SAMPLE_THRESHOLD: await trio.sleep(0.5) - print(len(checked)) - print("waited") print("check_complete exiting") return -async def sample_node(exe, all_nodes, checked): - r = randint(0, COL_SIZE - 1) - hashstr = exe.get_hash(r) - n = randint(0, len(all_nodes) - 1) - node = all_nodes[n] +async def sample_node(exe, subnets, checked): + s = randint(0, len(subnets) - 1) + n = randint(0, len(subnets[s]) - 1) + node = subnets[s][n] + hashstr = exe.get_hash(s) has = await node.has_hash(hashstr) if has: print("node {} has hash {}".format(node.get_id().pretty(), hashstr)) diff --git a/da/network/subnet.py b/da/network/subnet.py index c416f55..8096c73 100644 --- a/da/network/subnet.py +++ b/da/network/subnet.py @@ -1,12 +1,15 @@ from random import randint -COLS = 10 +from constants import * + +# COL_SIZE = 10 REPLICATION_FACTOR = 4 + def calculate_subnets(node_list): - subnets = {} - for i,n in enumerate(node_list): - idx = i%COLS + subnets = {} + for i, n in enumerate(node_list): + idx = i % COL_SIZE if idx not in subnets: subnets[idx] = [] @@ -14,29 +17,25 @@ def calculate_subnets(node_list): listlen = len(node_list) i = listlen - while i < COLS: + while i < COL_SIZE: subnets[i] = [] - subnets[i].append(node_list[i%listlen]) + subnets[i].append(node_list[i % listlen]) i += 1 - if listlen < REPLICATION_FACTOR * COLS: + if listlen < REPLICATION_FACTOR * COL_SIZE: for subnet in subnets: - last = subnets[subnet][len(subnets[subnet])-1].get_id() + last = subnets[subnet][len(subnets[subnet]) - 1].get_id() idx = -1 - for j,n in enumerate(node_list): + for j, n in enumerate(node_list): if n.get_id() == last: - idx = j+1 + idx = j + 1 while len(subnets[subnet]) < REPLICATION_FACTOR: - if idx > len(node_list) -1: + if idx > len(node_list) - 1: idx = 0 if node_list[idx] in subnets[subnet]: idx += 1 continue subnets[subnet].append(node_list[idx]) idx += 1 - return subnets - - -