finalized first roundtrip; has bugs

This commit is contained in:
holisticode 2024-07-09 19:11:29 -05:00 committed by Gusto
parent 59f328c117
commit 43c6e7e672
No known key found for this signature in database
6 changed files with 85 additions and 68 deletions

View File

@ -8,3 +8,5 @@ DATA_SIZE = 1024
COL_SIZE = 4096
SAMPLE_THRESHOLD = 12
DEBUG = False

View File

@ -20,7 +20,7 @@ class Executor:
listen_addr: multiaddr.Multiaddr
host: host
port: int
node_list: []
node_list: {}
data: []
data_hashes: []
@ -49,18 +49,19 @@ class Executor:
return self.data_hashes[index]
def __create_data(self):
for i in range(COL_SIZE - 1):
for i in range(COL_SIZE):
self.data[i] = randbytes(DATA_SIZE)
self.data_hashes[i] = sha256(self.data[i]).hexdigest()
async def execute(self, nursery):
""" """
async with self.host.run(listen_addrs=[self.listen_addr]):
for i, n in enumerate(self.node_list):
for subnet, nodes in self.node_list.items():
n = nodes[0]
await self.host.connect(n)
stream = await self.host.new_stream(n.peer_id, [PROTOCOL_ID])
nursery.start_soon(self.write_data, stream, i)
nursery.start_soon(self.write_data, stream, subnet)
async def write_data(self, stream: INetStream, index: int) -> None:
await stream.write(self.data[index])

View File

@ -10,12 +10,12 @@ class DANetwork:
self.num_nodes = nodes
self.nodes = []
async def build(self, nursery, shut_down):
async def build(self, nursery, shutdown):
port_idx = 7560
for _ in range(self.num_nodes):
port_idx += 1
nursery.start_soon(DANode.new, port_idx, self.nodes, nursery, shut_down)
return
nursery.start_soon(DANode.new, port_idx, self.nodes, nursery, shutdown)
print("net built")
def get_nodes(self):
return self.nodes

View File

@ -19,7 +19,7 @@ class DANode:
"""
listen_addr: multiaddr.Multiaddr
host: host
libp2phost: host
port: int
node_list: []
hashes: set()
@ -28,7 +28,7 @@ class DANode:
async def new(cls, port, node_list, nursery, shutdown):
self = cls()
self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
self.host = new_host()
self.libp2phost = new_host()
self.port = port
self.node_list = node_list
self.hashes = set()
@ -36,24 +36,23 @@ class DANode:
print("DA node at port {} initialized".format(port))
def get_id(self):
return self.host.get_id()
return self.libp2phost.get_id()
def net_iface(self):
return self.host
return self.libp2phost
def get_port(self):
return self.port
async def __run(self, nursery, shutdown):
""" """
async with self.host.run(listen_addrs=[self.listen_addr]):
async with self.libp2phost.run(listen_addrs=[self.listen_addr]):
print("starting node at {}...".format(self.listen_addr))
async def stream_handler(stream: INetStream) -> None:
nursery.start_soon(self.read_data, stream, nursery, shutdown)
# nursery.start_soon(self.write_data, stream)
self.host.set_stream_handler(PROTOCOL_ID, stream_handler)
self.libp2phost.set_stream_handler(PROTOCOL_ID, stream_handler)
self.node_list.append(self)
await shutdown.wait()
@ -70,21 +69,27 @@ class DANode:
read_bytes = await stream.read(MAX_READ_LEN)
if read_bytes is not None:
hashstr = sha256(read_bytes).hexdigest()
self.hashes.add(hashstr)
print("{} stored {}".format(self.host.get_id().pretty(), hashstr))
if hashstr not in self.hashes:
self.hashes.add(hashstr)
nursery.start_soon(self.disperse, read_bytes)
if DEBUG:
print(
"{} stored {}".format(
self.libp2phost.get_id().pretty(), hashstr
)
)
else:
print("read_bytes is None, unexpected!")
nursery.start_soon(select_event, read_stream, nursery.cancel_scope)
nursery.start_soon(select_event, shutdown.wait, nursery.cancel_scope)
"""
async def write_data(self, stream: INetStream) -> None:
async_f = trio.wrap_file(sys.stdin)
while True:
line = await async_f.readline()
await stream.write(line.encode())
"""
async def disperse(self, packet) -> None:
for p_id in self.libp2phost.get_peerstore().peer_ids():
if p_id == self.libp2phost.get_id():
continue
stream = await self.libp2phost.new_stream(p_id, [PROTOCOL_ID])
await stream.write(packet)
async def has_hash(self, hashstr: str):
return hashstr in self.hashes

View File

@ -33,7 +33,34 @@ async def run_subnets(net, num_nodes, nursery, shutdown):
print("nodes ready")
nodes = net.get_nodes()
subnets = calculate_subnets(nodes)
await print_subnet_info(subnets)
print("Establishing connections...")
node_list = {}
all_node_instances = set()
await establish_connections(subnets, node_list, all_node_instances)
print("starting executor...")
exe = Executor.new(7766, node_list)
await exe.execute(nursery)
all_nodes = list(all_node_instances)
checked = []
await trio.sleep(1)
print("starting sampling...")
for _ in range(SAMPLE_THRESHOLD):
nursery.start_soon(sample_node, exe, subnets, checked)
print("waiting for sampling to finish...")
await check_complete(checked)
print("Test completed")
shutdown.set()
async def print_subnet_info(subnets):
print()
print("By subnets: ")
for subnet in subnets:
@ -44,15 +71,13 @@ async def run_subnets(net, num_nodes, nursery, shutdown):
print()
print()
print()
print("Establishing connections...")
node_list = []
all_node_instances = set()
async def establish_connections(subnets, node_list, all_node_instances):
for subnet in subnets:
for n in subnets[subnet]:
this_nodes_peers = n.net_iface().get_peerstore().peer_ids()
all_node_instances.add(n)
for i, nn in enumerate(subnets[subnet]):
if nn.get_id() == n.get_id():
@ -61,45 +86,30 @@ async def run_subnets(net, num_nodes, nursery, shutdown):
remote_port = nn.get_port()
addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port, remote_id)
remote_addr = multiaddr.Multiaddr(addr)
print("{} connecting to {}...".format(n.get_id(), addr))
remote = info_from_p2p_addr(remote_addr)
if i == 0:
node_list.append(remote)
if subnet not in node_list:
node_list[subnet] = []
node_list[subnet].append(remote)
if nn.get_id() in this_nodes_peers:
continue
print("{} connecting to {}...".format(n.get_id(), addr))
await n.net_iface().connect(remote)
print()
print("starting executor...")
exe = Executor.new(7766, node_list)
await exe.execute(nursery)
all_nodes = list(all_node_instances)
checked = []
print("starting sampling...")
for _ in range(SAMPLE_THRESHOLD):
nursery.start_soon(sample_node, exe, all_nodes, checked)
print("waiting for sampling to finish...")
await check_complete(checked)
print("Test completed")
shutdown.set()
async def check_complete(checked):
while len(checked) < SAMPLE_THRESHOLD:
await trio.sleep(0.5)
print(len(checked))
print("waited")
print("check_complete exiting")
return
async def sample_node(exe, all_nodes, checked):
r = randint(0, COL_SIZE - 1)
hashstr = exe.get_hash(r)
n = randint(0, len(all_nodes) - 1)
node = all_nodes[n]
async def sample_node(exe, subnets, checked):
s = randint(0, len(subnets) - 1)
n = randint(0, len(subnets[s]) - 1)
node = subnets[s][n]
hashstr = exe.get_hash(s)
has = await node.has_hash(hashstr)
if has:
print("node {} has hash {}".format(node.get_id().pretty(), hashstr))

View File

@ -1,12 +1,15 @@
from random import randint
COLS = 10
from constants import *
# COL_SIZE = 10
REPLICATION_FACTOR = 4
def calculate_subnets(node_list):
subnets = {}
for i,n in enumerate(node_list):
idx = i%COLS
subnets = {}
for i, n in enumerate(node_list):
idx = i % COL_SIZE
if idx not in subnets:
subnets[idx] = []
@ -14,29 +17,25 @@ def calculate_subnets(node_list):
listlen = len(node_list)
i = listlen
while i < COLS:
while i < COL_SIZE:
subnets[i] = []
subnets[i].append(node_list[i%listlen])
subnets[i].append(node_list[i % listlen])
i += 1
if listlen < REPLICATION_FACTOR * COLS:
if listlen < REPLICATION_FACTOR * COL_SIZE:
for subnet in subnets:
last = subnets[subnet][len(subnets[subnet])-1].get_id()
last = subnets[subnet][len(subnets[subnet]) - 1].get_id()
idx = -1
for j,n in enumerate(node_list):
for j, n in enumerate(node_list):
if n.get_id() == last:
idx = j+1
idx = j + 1
while len(subnets[subnet]) < REPLICATION_FACTOR:
if idx > len(node_list) -1:
if idx > len(node_list) - 1:
idx = 0
if node_list[idx] in subnets[subnet]:
idx += 1
continue
subnets[subnet].append(node_list[idx])
idx += 1
return subnets