diff --git a/da/network/network.py b/da/network/network.py new file mode 100644 index 0000000..c19674a --- /dev/null +++ b/da/network/network.py @@ -0,0 +1,23 @@ +import trio + +from node import DANode + +class DANetwork: + num_nodes: int + nodes: [] + + def __init__(self, nodes): + self.num_nodes = nodes + self.nodes = [] + + async def build(self, nursery): + node_list = [] + port_idx = 7560 + for _ in range(self.num_nodes): + port_idx += 1 + nursery.start_soon(DANode.new,port_idx, node_list, nursery) + self.nodes = node_list + + def get_nodes(self): + return self.nodes + diff --git a/da/network/node.py b/da/network/node.py new file mode 100644 index 0000000..d6ffbbd --- /dev/null +++ b/da/network/node.py @@ -0,0 +1,104 @@ +import sys +import trio +import multiaddr + +from random import randint + +from libp2p import new_host, host + +from libp2p.network.stream.net_stream_interface import ( + INetStream, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) +from libp2p.typing import ( + TProtocol, +) + +from blspy import PrivateKey, BasicSchemeMPL, G1Element + +PROTOCOL_ID = TProtocol("/nomosda/1.0.0") +MAX_READ_LEN = 2 ^ 32 - 1 + + +class DANode: + """ + A class handling Data Availability (DA) + + """ + + pk: PrivateKey + id: G1Element + listen_addr: multiaddr.Multiaddr + host: host + port: int + node_list: [] + #inbound_socket: asyncio.Queue + #outbound_socket: asyncio.Queue + + @classmethod + async def new(cls, port, node_list, nursery): + self = cls() + #self.pk = generate_random_sk() + #self.id = self.pk.get_g1() + self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + self.host = new_host() + self.port = port + self.node_list = node_list + nursery.start_soon(self.__run, nursery) + print("DA node at port {} initialized".format(port)) + #self.inbound_socket = asyncio.Queue() + #self.outbound_socket = asyncio.Queue() + + def hex_id(self): + return bytes(self.id).hex() + + def get_id(self): + return self.host.get_id() + + def net_iface(self): + return self.host + + def get_port(self): + return self.port + + async def __run(self, nursery): + """ + """ + async with self.host.run(listen_addrs=[self.listen_addr]): + print("starting node at {}...".format(self.listen_addr)) + + async def stream_handler(self, stream: INetStream) -> None: + nursery.start_soon(self.read_data,stream) + nursery.start_soon(self.write_data,stream) + + self.host.set_stream_handler(PROTOCOL_ID, stream_handler) + self.node_list.append(self) + await trio.sleep_forever() + + async def read_data(self, stream: INetStream) -> None: + print("read_data") + while True: + read_bytes = await stream.read(MAX_READ_LEN) + if read_bytes is not None: + len = len(read_bytes) + # Green console colour: \x1b[32m + # Reset console colour: \x1b[0m + print("\x1b[32m got {} bytes\x1b[0m ".format(len)) + else: + print("read_bytes is None, unexpected!") + print("read_data exited") + + async def write_data(self, stream: INetStream) -> None: + print("write_data") + async_f = trio.wrap_file(sys.stdin) + while True: + line = await async_f.readline() + await stream.write(line.encode()) + print("write_data exited") + + +def generate_random_sk() -> PrivateKey: + seed = bytes([randint(0, 255) for _ in range(32)]) + return BasicSchemeMPL.key_gen(seed) diff --git a/da/network/poc.py b/da/network/poc.py new file mode 100644 index 0000000..be27c96 --- /dev/null +++ b/da/network/poc.py @@ -0,0 +1,68 @@ +import sys +import trio +import multiaddr + +from network import DANetwork +from subnet import calculate_subnets + +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) + + +default_nodes = 32 + +async def run_network(): + if len(sys.argv) == 1: + num_nodes = default_nodes + else: + num_nodes = int(sys.argv[1]) + + net = DANetwork(num_nodes) + async with trio.open_nursery() as nursery: + nursery.start_soon(net.build, nursery) + nursery.start_soon(run_subnets,net, num_nodes) + + await trio.sleep_forever() + + +async def run_subnets(net, num_nodes): + while len(net.get_nodes()) != num_nodes: + print("nodes not ready yet") + await trio.sleep(1) + + print("nodes ready") + nodes = net.get_nodes() + subnets = calculate_subnets(nodes) + + print() + print("By subnets: ") + for subnet in subnets: + print("subnet: {} - ".format(subnet), end="") + for n in subnets[subnet]: + print(n.get_id().pretty()[:16], end=", ") + print() + print() + + print() + + print() + print("Establishing connections...") + + for subnet in subnets: + for n in subnets[subnet]: + for nn in subnets[subnet]: + if nn.get_id() == n.get_id(): + continue + remote_id = nn.get_id().pretty() + remote_port = nn.get_port() + addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port,remote_id) + remote_addr = multiaddr.Multiaddr(addr) + print("{} connecting to {}...".format(n.get_id(), addr)) + remote = info_from_p2p_addr(remote_addr) + await n.net_iface().connect(remote) + + print() + +if __name__ == "__main__": + trio.run(run_network) diff --git a/da/network/subnet.py b/da/network/subnet.py new file mode 100644 index 0000000..c416f55 --- /dev/null +++ b/da/network/subnet.py @@ -0,0 +1,42 @@ +from random import randint + +COLS = 10 +REPLICATION_FACTOR = 4 + +def calculate_subnets(node_list): + subnets = {} + for i,n in enumerate(node_list): + idx = i%COLS + + if idx not in subnets: + subnets[idx] = [] + subnets[idx].append(n) + + listlen = len(node_list) + i = listlen + while i < COLS: + subnets[i] = [] + subnets[i].append(node_list[i%listlen]) + i += 1 + + if listlen < REPLICATION_FACTOR * COLS: + for subnet in subnets: + last = subnets[subnet][len(subnets[subnet])-1].get_id() + idx = -1 + for j,n in enumerate(node_list): + if n.get_id() == last: + idx = j+1 + while len(subnets[subnet]) < REPLICATION_FACTOR: + if idx > len(node_list) -1: + idx = 0 + if node_list[idx] in subnets[subnet]: + idx += 1 + continue + subnets[subnet].append(node_list[idx]) + idx += 1 + + + return subnets + + +