From e95d7e27ca140f359f6f58e8cc31a116f7572641 Mon Sep 17 00:00:00 2001 From: holisticode Date: Fri, 5 Jul 2024 16:56:36 -0500 Subject: [PATCH] added 1st version executor --- da/network/executor.py | 70 ++++++++++++++++++++++++++++++++++++++++++ da/network/node.py | 62 ++++++++++++++++--------------------- da/network/poc.py | 32 +++++++++++-------- 3 files changed, 115 insertions(+), 49 deletions(-) create mode 100644 da/network/executor.py diff --git a/da/network/executor.py b/da/network/executor.py new file mode 100644 index 0000000..3b8713c --- /dev/null +++ b/da/network/executor.py @@ -0,0 +1,70 @@ +import sys +from hashlib import sha256 +from random import randbytes +from typing import Self + +import multiaddr +import trio +from libp2p import host, new_host +from libp2p.network.stream.net_stream_interface import INetStream +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.typing import TProtocol + +PROTOCOL_ID = TProtocol("/nomosda/1.0.0") +MAX_READ_LEN = 2**32 - 1 +# make this ocnfigurable +DATA_SIZE = 1024 +# make this ocnfigurable +COL_SIZE = 4096 + + +class Executor: + """ + A class for simulating a simple executor + + """ + + listen_addr: multiaddr.Multiaddr + host: host + port: int + node_list: [] + data: [] + data_hashes: [] + + @classmethod + def new(cls, port, node_list) -> Self: + self = cls() + self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + self.host = new_host() + self.port = port + self.data = [[] * DATA_SIZE] * COL_SIZE + self.data_hashes = [[] * 256] * COL_SIZE + self.node_list = node_list + self.__create_data() + return self + + def get_id(self): + return self.host.get_id() + + def net_iface(self): + return self.host + + def get_port(self): + return self.port + + def __create_data(self): + for i in range(COL_SIZE): + self.data[i] = randbytes(DATA_SIZE) + self.data_hashes[i] = sha256(self.data[i]).hexdigest() + + async def execute(self, nursery): + """ """ + async with self.host.run(listen_addrs=[self.listen_addr]): + for i, n in enumerate(self.node_list): + await self.host.connect(n) + + stream = await self.host.new_stream(n.peer_id, [PROTOCOL_ID]) + nursery.start_soon(self.write_data, stream, i) + + async def write_data(self, stream: INetStream, index: int) -> None: + await stream.write(self.data[index]) diff --git a/da/network/node.py b/da/network/node.py index d6ffbbd..d033761 100644 --- a/da/network/node.py +++ b/da/network/node.py @@ -1,25 +1,17 @@ import sys -import trio -import multiaddr - +from hashlib import sha256 from random import randint -from libp2p import new_host, host - -from libp2p.network.stream.net_stream_interface import ( - INetStream, -) -from libp2p.peer.peerinfo import ( - info_from_p2p_addr, -) -from libp2p.typing import ( - TProtocol, -) - -from blspy import PrivateKey, BasicSchemeMPL, G1Element +import multiaddr +import trio +from blspy import BasicSchemeMPL, G1Element, PrivateKey +from libp2p import host, new_host +from libp2p.network.stream.net_stream_interface import INetStream +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.typing import TProtocol PROTOCOL_ID = TProtocol("/nomosda/1.0.0") -MAX_READ_LEN = 2 ^ 32 - 1 +MAX_READ_LEN = 2**32 - 1 class DANode: @@ -34,22 +26,26 @@ class DANode: host: host port: int node_list: [] - #inbound_socket: asyncio.Queue - #outbound_socket: asyncio.Queue + data: [] + hash: str + # inbound_socket: asyncio.Queue + # outbound_socket: asyncio.Queue @classmethod async def new(cls, port, node_list, nursery): self = cls() - #self.pk = generate_random_sk() - #self.id = self.pk.get_g1() + # self.pk = generate_random_sk() + # self.id = self.pk.get_g1() self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") self.host = new_host() self.port = port self.node_list = node_list + self.data = [] + self.hash = "" nursery.start_soon(self.__run, nursery) print("DA node at port {} initialized".format(port)) - #self.inbound_socket = asyncio.Queue() - #self.outbound_socket = asyncio.Queue() + # self.inbound_socket = asyncio.Queue() + # self.outbound_socket = asyncio.Queue() def hex_id(self): return bytes(self.id).hex() @@ -64,39 +60,33 @@ class DANode: return self.port async def __run(self, nursery): - """ - """ + """ """ async with self.host.run(listen_addrs=[self.listen_addr]): print("starting node at {}...".format(self.listen_addr)) - async def stream_handler(self, stream: INetStream) -> None: - nursery.start_soon(self.read_data,stream) - nursery.start_soon(self.write_data,stream) + async def stream_handler(stream: INetStream) -> None: + nursery.start_soon(self.read_data, stream) + nursery.start_soon(self.write_data, stream) self.host.set_stream_handler(PROTOCOL_ID, stream_handler) self.node_list.append(self) await trio.sleep_forever() async def read_data(self, stream: INetStream) -> None: - print("read_data") while True: read_bytes = await stream.read(MAX_READ_LEN) if read_bytes is not None: - len = len(read_bytes) - # Green console colour: \x1b[32m - # Reset console colour: \x1b[0m - print("\x1b[32m got {} bytes\x1b[0m ".format(len)) + self.data = read_bytes + self.hash = sha256(self.data).hexdigest() + print("{} stored {}".format(self.host.get_id().pretty(), self.hash)) else: print("read_bytes is None, unexpected!") - print("read_data exited") async def write_data(self, stream: INetStream) -> None: - print("write_data") async_f = trio.wrap_file(sys.stdin) while True: line = await async_f.readline() await stream.write(line.encode()) - print("write_data exited") def generate_random_sk() -> PrivateKey: diff --git a/da/network/poc.py b/da/network/poc.py index be27c96..ea5a982 100644 --- a/da/network/poc.py +++ b/da/network/poc.py @@ -1,32 +1,30 @@ import sys -import trio -import multiaddr +import multiaddr +import trio +from executor import Executor +from libp2p.peer.peerinfo import info_from_p2p_addr from network import DANetwork from subnet import calculate_subnets -from libp2p.peer.peerinfo import ( - info_from_p2p_addr, -) +default_nodes = 32 -default_nodes = 32 - async def run_network(): if len(sys.argv) == 1: - num_nodes = default_nodes + num_nodes = default_nodes else: num_nodes = int(sys.argv[1]) net = DANetwork(num_nodes) async with trio.open_nursery() as nursery: nursery.start_soon(net.build, nursery) - nursery.start_soon(run_subnets,net, num_nodes) - + nursery.start_soon(run_subnets, net, num_nodes, nursery) + await trio.sleep_forever() -async def run_subnets(net, num_nodes): +async def run_subnets(net, num_nodes, nursery): while len(net.get_nodes()) != num_nodes: print("nodes not ready yet") await trio.sleep(1) @@ -49,20 +47,28 @@ async def run_subnets(net, num_nodes): print() print("Establishing connections...") + node_list = [] + for subnet in subnets: for n in subnets[subnet]: - for nn in subnets[subnet]: + for i, nn in enumerate(subnets[subnet]): if nn.get_id() == n.get_id(): continue remote_id = nn.get_id().pretty() remote_port = nn.get_port() - addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port,remote_id) + addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port, remote_id) remote_addr = multiaddr.Multiaddr(addr) print("{} connecting to {}...".format(n.get_id(), addr)) remote = info_from_p2p_addr(remote_addr) + if i == 0: + node_list.append(remote) await n.net_iface().connect(remote) print() + print("starting executor...") + exe = Executor.new(7766, node_list) + await exe.execute(nursery) + if __name__ == "__main__": trio.run(run_network)