added 1st version executor

This commit is contained in:
holisticode 2024-07-05 16:56:36 -05:00 committed by Gusto
parent 0d3331c103
commit e95d7e27ca
No known key found for this signature in database
3 changed files with 115 additions and 49 deletions

70
da/network/executor.py Normal file
View File

@ -0,0 +1,70 @@
import sys
from hashlib import sha256
from random import randbytes
from typing import Self
import multiaddr
import trio
from libp2p import host, new_host
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.typing import TProtocol
PROTOCOL_ID = TProtocol("/nomosda/1.0.0")
MAX_READ_LEN = 2**32 - 1
# make this ocnfigurable
DATA_SIZE = 1024
# make this ocnfigurable
COL_SIZE = 4096
class Executor:
"""
A class for simulating a simple executor
"""
listen_addr: multiaddr.Multiaddr
host: host
port: int
node_list: []
data: []
data_hashes: []
@classmethod
def new(cls, port, node_list) -> Self:
self = cls()
self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
self.host = new_host()
self.port = port
self.data = [[] * DATA_SIZE] * COL_SIZE
self.data_hashes = [[] * 256] * COL_SIZE
self.node_list = node_list
self.__create_data()
return self
def get_id(self):
return self.host.get_id()
def net_iface(self):
return self.host
def get_port(self):
return self.port
def __create_data(self):
for i in range(COL_SIZE):
self.data[i] = randbytes(DATA_SIZE)
self.data_hashes[i] = sha256(self.data[i]).hexdigest()
async def execute(self, nursery):
""" """
async with self.host.run(listen_addrs=[self.listen_addr]):
for i, n in enumerate(self.node_list):
await self.host.connect(n)
stream = await self.host.new_stream(n.peer_id, [PROTOCOL_ID])
nursery.start_soon(self.write_data, stream, i)
async def write_data(self, stream: INetStream, index: int) -> None:
await stream.write(self.data[index])

View File

@ -1,25 +1,17 @@
import sys
import trio
import multiaddr
from hashlib import sha256
from random import randint
from libp2p import new_host, host
from libp2p.network.stream.net_stream_interface import (
INetStream,
)
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
from libp2p.typing import (
TProtocol,
)
from blspy import PrivateKey, BasicSchemeMPL, G1Element
import multiaddr
import trio
from blspy import BasicSchemeMPL, G1Element, PrivateKey
from libp2p import host, new_host
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.typing import TProtocol
PROTOCOL_ID = TProtocol("/nomosda/1.0.0")
MAX_READ_LEN = 2 ^ 32 - 1
MAX_READ_LEN = 2**32 - 1
class DANode:
@ -34,22 +26,26 @@ class DANode:
host: host
port: int
node_list: []
#inbound_socket: asyncio.Queue
#outbound_socket: asyncio.Queue
data: []
hash: str
# inbound_socket: asyncio.Queue
# outbound_socket: asyncio.Queue
@classmethod
async def new(cls, port, node_list, nursery):
self = cls()
#self.pk = generate_random_sk()
#self.id = self.pk.get_g1()
# self.pk = generate_random_sk()
# self.id = self.pk.get_g1()
self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
self.host = new_host()
self.port = port
self.node_list = node_list
self.data = []
self.hash = ""
nursery.start_soon(self.__run, nursery)
print("DA node at port {} initialized".format(port))
#self.inbound_socket = asyncio.Queue()
#self.outbound_socket = asyncio.Queue()
# self.inbound_socket = asyncio.Queue()
# self.outbound_socket = asyncio.Queue()
def hex_id(self):
return bytes(self.id).hex()
@ -64,39 +60,33 @@ class DANode:
return self.port
async def __run(self, nursery):
"""
"""
""" """
async with self.host.run(listen_addrs=[self.listen_addr]):
print("starting node at {}...".format(self.listen_addr))
async def stream_handler(self, stream: INetStream) -> None:
nursery.start_soon(self.read_data,stream)
nursery.start_soon(self.write_data,stream)
async def stream_handler(stream: INetStream) -> None:
nursery.start_soon(self.read_data, stream)
nursery.start_soon(self.write_data, stream)
self.host.set_stream_handler(PROTOCOL_ID, stream_handler)
self.node_list.append(self)
await trio.sleep_forever()
async def read_data(self, stream: INetStream) -> None:
print("read_data")
while True:
read_bytes = await stream.read(MAX_READ_LEN)
if read_bytes is not None:
len = len(read_bytes)
# Green console colour: \x1b[32m
# Reset console colour: \x1b[0m
print("\x1b[32m got {} bytes\x1b[0m ".format(len))
self.data = read_bytes
self.hash = sha256(self.data).hexdigest()
print("{} stored {}".format(self.host.get_id().pretty(), self.hash))
else:
print("read_bytes is None, unexpected!")
print("read_data exited")
async def write_data(self, stream: INetStream) -> None:
print("write_data")
async_f = trio.wrap_file(sys.stdin)
while True:
line = await async_f.readline()
await stream.write(line.encode())
print("write_data exited")
def generate_random_sk() -> PrivateKey:

View File

@ -1,32 +1,30 @@
import sys
import trio
import multiaddr
import multiaddr
import trio
from executor import Executor
from libp2p.peer.peerinfo import info_from_p2p_addr
from network import DANetwork
from subnet import calculate_subnets
from libp2p.peer.peerinfo import (
info_from_p2p_addr,
)
default_nodes = 32
default_nodes = 32
async def run_network():
if len(sys.argv) == 1:
num_nodes = default_nodes
num_nodes = default_nodes
else:
num_nodes = int(sys.argv[1])
net = DANetwork(num_nodes)
async with trio.open_nursery() as nursery:
nursery.start_soon(net.build, nursery)
nursery.start_soon(run_subnets,net, num_nodes)
nursery.start_soon(run_subnets, net, num_nodes, nursery)
await trio.sleep_forever()
async def run_subnets(net, num_nodes):
async def run_subnets(net, num_nodes, nursery):
while len(net.get_nodes()) != num_nodes:
print("nodes not ready yet")
await trio.sleep(1)
@ -49,20 +47,28 @@ async def run_subnets(net, num_nodes):
print()
print("Establishing connections...")
node_list = []
for subnet in subnets:
for n in subnets[subnet]:
for nn in subnets[subnet]:
for i, nn in enumerate(subnets[subnet]):
if nn.get_id() == n.get_id():
continue
remote_id = nn.get_id().pretty()
remote_port = nn.get_port()
addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port,remote_id)
addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port, remote_id)
remote_addr = multiaddr.Multiaddr(addr)
print("{} connecting to {}...".format(n.get_id(), addr))
remote = info_from_p2p_addr(remote_addr)
if i == 0:
node_list.append(remote)
await n.net_iface().connect(remote)
print()
print("starting executor...")
exe = Executor.new(7766, node_list)
await exe.execute(nursery)
if __name__ == "__main__":
trio.run(run_network)