initial iteration
This commit is contained in:
parent
3d2459052c
commit
0d3331c103
|
@ -0,0 +1,23 @@
|
|||
import trio
|
||||
|
||||
from node import DANode
|
||||
|
||||
class DANetwork:
|
||||
num_nodes: int
|
||||
nodes: []
|
||||
|
||||
def __init__(self, nodes):
|
||||
self.num_nodes = nodes
|
||||
self.nodes = []
|
||||
|
||||
async def build(self, nursery):
|
||||
node_list = []
|
||||
port_idx = 7560
|
||||
for _ in range(self.num_nodes):
|
||||
port_idx += 1
|
||||
nursery.start_soon(DANode.new,port_idx, node_list, nursery)
|
||||
self.nodes = node_list
|
||||
|
||||
def get_nodes(self):
|
||||
return self.nodes
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
import sys
|
||||
import trio
|
||||
import multiaddr
|
||||
|
||||
from random import randint
|
||||
|
||||
from libp2p import new_host, host
|
||||
|
||||
from libp2p.network.stream.net_stream_interface import (
|
||||
INetStream,
|
||||
)
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
)
|
||||
from libp2p.typing import (
|
||||
TProtocol,
|
||||
)
|
||||
|
||||
from blspy import PrivateKey, BasicSchemeMPL, G1Element
|
||||
|
||||
PROTOCOL_ID = TProtocol("/nomosda/1.0.0")
|
||||
MAX_READ_LEN = 2 ^ 32 - 1
|
||||
|
||||
|
||||
class DANode:
|
||||
"""
|
||||
A class handling Data Availability (DA)
|
||||
|
||||
"""
|
||||
|
||||
pk: PrivateKey
|
||||
id: G1Element
|
||||
listen_addr: multiaddr.Multiaddr
|
||||
host: host
|
||||
port: int
|
||||
node_list: []
|
||||
#inbound_socket: asyncio.Queue
|
||||
#outbound_socket: asyncio.Queue
|
||||
|
||||
@classmethod
|
||||
async def new(cls, port, node_list, nursery):
|
||||
self = cls()
|
||||
#self.pk = generate_random_sk()
|
||||
#self.id = self.pk.get_g1()
|
||||
self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||
self.host = new_host()
|
||||
self.port = port
|
||||
self.node_list = node_list
|
||||
nursery.start_soon(self.__run, nursery)
|
||||
print("DA node at port {} initialized".format(port))
|
||||
#self.inbound_socket = asyncio.Queue()
|
||||
#self.outbound_socket = asyncio.Queue()
|
||||
|
||||
def hex_id(self):
|
||||
return bytes(self.id).hex()
|
||||
|
||||
def get_id(self):
|
||||
return self.host.get_id()
|
||||
|
||||
def net_iface(self):
|
||||
return self.host
|
||||
|
||||
def get_port(self):
|
||||
return self.port
|
||||
|
||||
async def __run(self, nursery):
|
||||
"""
|
||||
"""
|
||||
async with self.host.run(listen_addrs=[self.listen_addr]):
|
||||
print("starting node at {}...".format(self.listen_addr))
|
||||
|
||||
async def stream_handler(self, stream: INetStream) -> None:
|
||||
nursery.start_soon(self.read_data,stream)
|
||||
nursery.start_soon(self.write_data,stream)
|
||||
|
||||
self.host.set_stream_handler(PROTOCOL_ID, stream_handler)
|
||||
self.node_list.append(self)
|
||||
await trio.sleep_forever()
|
||||
|
||||
async def read_data(self, stream: INetStream) -> None:
|
||||
print("read_data")
|
||||
while True:
|
||||
read_bytes = await stream.read(MAX_READ_LEN)
|
||||
if read_bytes is not None:
|
||||
len = len(read_bytes)
|
||||
# Green console colour: \x1b[32m
|
||||
# Reset console colour: \x1b[0m
|
||||
print("\x1b[32m got {} bytes\x1b[0m ".format(len))
|
||||
else:
|
||||
print("read_bytes is None, unexpected!")
|
||||
print("read_data exited")
|
||||
|
||||
async def write_data(self, stream: INetStream) -> None:
|
||||
print("write_data")
|
||||
async_f = trio.wrap_file(sys.stdin)
|
||||
while True:
|
||||
line = await async_f.readline()
|
||||
await stream.write(line.encode())
|
||||
print("write_data exited")
|
||||
|
||||
|
||||
def generate_random_sk() -> PrivateKey:
|
||||
seed = bytes([randint(0, 255) for _ in range(32)])
|
||||
return BasicSchemeMPL.key_gen(seed)
|
|
@ -0,0 +1,68 @@
|
|||
import sys
|
||||
import trio
|
||||
import multiaddr
|
||||
|
||||
from network import DANetwork
|
||||
from subnet import calculate_subnets
|
||||
|
||||
from libp2p.peer.peerinfo import (
|
||||
info_from_p2p_addr,
|
||||
)
|
||||
|
||||
|
||||
default_nodes = 32
|
||||
|
||||
async def run_network():
|
||||
if len(sys.argv) == 1:
|
||||
num_nodes = default_nodes
|
||||
else:
|
||||
num_nodes = int(sys.argv[1])
|
||||
|
||||
net = DANetwork(num_nodes)
|
||||
async with trio.open_nursery() as nursery:
|
||||
nursery.start_soon(net.build, nursery)
|
||||
nursery.start_soon(run_subnets,net, num_nodes)
|
||||
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
async def run_subnets(net, num_nodes):
|
||||
while len(net.get_nodes()) != num_nodes:
|
||||
print("nodes not ready yet")
|
||||
await trio.sleep(1)
|
||||
|
||||
print("nodes ready")
|
||||
nodes = net.get_nodes()
|
||||
subnets = calculate_subnets(nodes)
|
||||
|
||||
print()
|
||||
print("By subnets: ")
|
||||
for subnet in subnets:
|
||||
print("subnet: {} - ".format(subnet), end="")
|
||||
for n in subnets[subnet]:
|
||||
print(n.get_id().pretty()[:16], end=", ")
|
||||
print()
|
||||
print()
|
||||
|
||||
print()
|
||||
|
||||
print()
|
||||
print("Establishing connections...")
|
||||
|
||||
for subnet in subnets:
|
||||
for n in subnets[subnet]:
|
||||
for nn in subnets[subnet]:
|
||||
if nn.get_id() == n.get_id():
|
||||
continue
|
||||
remote_id = nn.get_id().pretty()
|
||||
remote_port = nn.get_port()
|
||||
addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port,remote_id)
|
||||
remote_addr = multiaddr.Multiaddr(addr)
|
||||
print("{} connecting to {}...".format(n.get_id(), addr))
|
||||
remote = info_from_p2p_addr(remote_addr)
|
||||
await n.net_iface().connect(remote)
|
||||
|
||||
print()
|
||||
|
||||
if __name__ == "__main__":
|
||||
trio.run(run_network)
|
|
@ -0,0 +1,42 @@
|
|||
from random import randint
|
||||
|
||||
COLS = 10
|
||||
REPLICATION_FACTOR = 4
|
||||
|
||||
def calculate_subnets(node_list):
|
||||
subnets = {}
|
||||
for i,n in enumerate(node_list):
|
||||
idx = i%COLS
|
||||
|
||||
if idx not in subnets:
|
||||
subnets[idx] = []
|
||||
subnets[idx].append(n)
|
||||
|
||||
listlen = len(node_list)
|
||||
i = listlen
|
||||
while i < COLS:
|
||||
subnets[i] = []
|
||||
subnets[i].append(node_list[i%listlen])
|
||||
i += 1
|
||||
|
||||
if listlen < REPLICATION_FACTOR * COLS:
|
||||
for subnet in subnets:
|
||||
last = subnets[subnet][len(subnets[subnet])-1].get_id()
|
||||
idx = -1
|
||||
for j,n in enumerate(node_list):
|
||||
if n.get_id() == last:
|
||||
idx = j+1
|
||||
while len(subnets[subnet]) < REPLICATION_FACTOR:
|
||||
if idx > len(node_list) -1:
|
||||
idx = 0
|
||||
if node_list[idx] in subnets[subnet]:
|
||||
idx += 1
|
||||
continue
|
||||
subnets[subnet].append(node_list[idx])
|
||||
idx += 1
|
||||
|
||||
|
||||
return subnets
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue