Interconnect waku and nomos nodes via the same method

This commit is contained in:
Gusto Bacvinka 2023-02-25 15:04:07 +02:00 committed by Alberto Soutullo Rendo
parent b469a63ccb
commit 600c9e9693
6 changed files with 208 additions and 193 deletions

View File

@ -14,7 +14,7 @@
"num_partitions": 1,
"num_subnets": 1,
"container_size": "1",
"node_type_distribution": { "nwaku":100, "gowaku":0 },
"node_type_distribution": { "nwaku":100, "gowaku":0, "nomos":0 },
"node_type": "desktop",
"network_type": "newmanwattsstrogatz",
"output_dir": "network_data",

View File

@ -34,7 +34,8 @@ def run(plan, args):
grafana_service = grafana.set_up_grafana(plan, prometheus_service)
waku.interconnect_waku_nodes(plan, network_topology, interconnection_batch)
# nomos.interconnect_nomos_nodes(plan, waku_topology, services)
nodes.interconnect_nodes(plan, network_topology, services, interconnection_batch)
# Setup WLS & Start the Simulation
wls_service = wls.init(plan, network_topology, wls_config)

View File

@ -71,15 +71,7 @@ def prepare_gowaku_service(plan, gowakunode_name, all_services, use_general_conf
all_services[gowakunode_name] = add_service_config
def prepare_nomos_service(plan, node_name, all_services, use_general_configuration):
plan.print("nomos")
artifact_id, configuration_file = files.get_toml_configuration_artifact(plan, node_name,
use_general_configuration,
node_name)
plan.print("Configuration being used file is " + configuration_file)
plan.print("Entrypoint is "+ str(vars.NOMOS_ENTRYPOINT))
def prepare_nomos_service(node_name, all_services, config_file, artifact_id):
nomos_service_config = ServiceConfig(
image=vars.NOMOS_IMAGE,
ports={
@ -104,7 +96,21 @@ def prepare_nomos_service(plan, node_name, all_services, use_general_configurati
all_services[node_name] = nomos_service_config
def instantiate_services(plan, network_topology, use_general_configuration):
def interconnect_nodes(plan, topology_information, services, interconnection_batch):
for waku_service_name in services.keys():
peers = topology_information[waku_service_name]["static_nodes"]
for i in range(0, len(peers), interconnection_batch):
x = i
image = services[waku_service_name]["image"]
create_id = service_dispatcher[image].create_id
connect_peers = service_dispatcher[image].connect_peers
peer_ids = [create_id(services[peer]) for peer in peers[x:x + interconnection_batch]]
connect_peers(plan, waku_service_name, vars.WAKU_RPC_PORT_ID, peer_ids)
def instantiate_services(plan, network_topology, testing):
"""
As we will need to access for the service information later, the structure is the following:
@ -134,19 +140,14 @@ def instantiate_services(plan, network_topology, use_general_configuration):
for service_name in network_topology.keys():
image = network_topology[service_name]["image"]
service_builder = service_dispatcher[image][0]
service_builder = service_dispatcher[image].prepare_service
service_builder(plan, service_name, all_services, use_general_configuration)
all_services_information = plan.add_services(
configs = all_services
)
<<<<<<< HEAD
#services_information = _add_waku_service_information(plan, all_services_information)
services_information = _add_nomos_service_information(plan, all_services_information)
=======
services_information = add_service_information(plan, all_services_information, network_topology)
>>>>>>> 1e3f147 (Make node init code agnostic to the node type)
return services_information
@ -155,20 +156,12 @@ def add_service_information(plan, all_services_information, network_topology):
new_services_information = {}
for service_name in all_services_information:
<<<<<<< HEAD
node_peer_id = waku.get_wakunode_peer_id(plan, service_name, system_variables.WAKU_RPC_PORT_ID)
new_services_information[service_name] = {}
new_services_information[service_name]["peer_id"] = node_peer_id
new_services_information[service_name]["service_info"] = all_services_information[service_name]
=======
image = network_topology[service_name]["image"]
info_getter = service_dispatcher[image][1]
info_getter = service_dispatcher[image].add_service_information
service_info = all_services_information[service_name]
new_service_info = info_getter(plan, service_name, service_info)
new_service_info["image"] = image
new_services_information[service_name] = new_service_info
>>>>>>> 1e3f147 (Make node init code agnostic to the node type)
return new_services_information
@ -192,7 +185,22 @@ def _add_nomos_service_information(plan, service_name, service_info):
service_dispatcher = {
"go-waku": [prepare_gowaku_service, _add_waku_service_information],
"nim-waku": [prepare_nwaku_service, _add_waku_service_information],
"nomos": [prepare_nomos_service, _add_nomos_service_information]
"go-waku": struct(
prepare_service = prepare_gowaku_service,
add_service_information = _add_waku_service_information,
create_id = waku.create_waku_id,
connect_peers = waku.connect_wakunode_to_peers
),
"nim-waku": struct(
prepare_service = prepare_gowaku_service,
add_service_information = _add_waku_service_information,
create_id = waku.create_waku_id,
connect_peers = waku.connect_wakunode_to_peers
),
"nomos": struct(
prepare_service = prepare_nomos_service,
add_service_information = _add_nomos_service_information,
create_id = nomos.create_nomos_id,
connect_peers = nomos.connect_nomos_to_peers
),
}

View File

@ -1,8 +1,13 @@
import requests
import time
import json
import random
import matplotlib.pyplot as plt
import networkx as nx
from PIL import Image
LOGGER = None
# Histogram of time delta in millis of tx being sent
# and received by all nodes.
def hist_delta(name, iterations):
@ -33,23 +38,178 @@ def network_graph(name, topology):
def concat_images(name, images):
images = [Image.open(image) for image in images]
# Get the width and height of the first image
widths, heights = zip(*(i.size for i in images))
# Calculate the total width and height of the collage
total_width = sum(widths)
max_height = max(heights)
# Create a new image with the calculated size
collage = Image.new('RGB', (total_width, max_height))
# Paste the images into the collage
x_offset = 0
for image in images:
collage.paste(image, (x_offset, 0))
x_offset += image.size[0]
# Save the collage
collage.save(name)
def check_nomos_node(node_address):
url = node_address + "network/info"
try:
response = requests.get(url)
except Exception as e:
LOGGER.debug('%s: %s' % (e.__doc__, e))
return False
try:
response_obj = response.json()
except Exception as e:
LOGGER.debug('%s: %s' % (e.__doc__, e))
return False
LOGGER.debug('Response from %s: %s' %(node_address, response_obj))
return True
def add_nomos_tx(node_address, tx):
url = node_address + "mempool/addtx"
try:
response = requests.post(url, data=json.dumps(tx), headers={'content-type': 'application/json'})
except Exception as e:
LOGGER.debug('%s: %s' % (e.__doc__, e))
return False
if len(response.text) > 0:
LOGGER.debug('Response from %s: %s' %(url, response.text))
return False
return True
def get_nomos_mempool_metrics(node_address, iteration_s):
url = node_address + "mempool/metrics"
try:
response = requests.get(url)
except Exception as e:
LOGGER.debug('%s: %s' % (e.__doc__, e))
return "error", -1
try:
response_obj = response.json()
except Exception as e:
LOGGER.debug('%s: %s' % (e.__doc__, e))
return "error", -1
LOGGER.debug('Response from %s: %s' %(node_address, response_obj))
time_e = int(time.time() * 1000)
return response_obj, time_e - iteration_s
def run_tests(logger, targets, topology):
global LOGGER
LOGGER = logger
""" Check all nodes are reachable """
for i, target in enumerate(targets):
if not check_nomos_node('http://%s/' %target):
LOGGER.error('Node %d (%s) is not online. Aborted.' %(i, target))
sys.exit(1)
LOGGER.info('All %d Waku nodes are reachable.' %len(targets))
""" Start simulation """
msg_cnt = 0
failed_addtx_cnt = 0
failed_metrics_cnt = 0
bytes_cnt = 0
s_time = time.time()
last_msg_time = 0
next_time_to_msg = 0
failed_dissemination_cnt = 0
batch_size = 40
iterations = []
tx_id = 0
LOGGER.info('Tx addition start time: %d' %int(round(time.time() * 1000)))
""" Add new transaction to every node """
for i, target in enumerate(targets):
iteration_s = int(time.time() * 1000)
last_tx_sent = iteration_s
tx_id = tx_id + msg_cnt+failed_addtx_cnt+1
for j in range(batch_size):
tx_id += j
tx_target = random.choice(targets)
LOGGER.debug('sending tx_id: %s to target: %s' %(tx_id, tx_target))
if not add_nomos_tx('http://%s/' %tx_target, 'tx%s' %tx_id):
LOGGER.error('Unable to add new tx. Node %s.' %(tx_target))
failed_addtx_cnt += 1
continue
last_tx_sent = int(time.time() * 1000)
msg_cnt += 1
time.sleep(1.5)
results = []
""" Collect mempool metrics from nodes """
for n, target in enumerate(targets):
res, t = get_nomos_mempool_metrics('http://%s/' %target, iteration_s)
if 'error' in res:
LOGGER.error('Unable to pull metrics. Node %d (%s).' %(n, target))
failed_metrics_cnt += 1
continue
is_ok = True
delta = res['last_tx'] - last_tx_sent
start_finish = res['last_tx'] - iteration_s
# Tolerate one second difference between finish and start times.
if -1000 < delta < 0:
delta = 0
if delta < 0:
LOGGER.error('delta should be gt that zero: %d' %delta)
delta = -1
LOGGER.debug('should be %s' %msg_cnt)
if res['pending_tx'] != msg_cnt:
delta = -1
is_ok = False
failed_dissemination_cnt += 1
results.append({
"node": n,
"is_ok": is_ok,
"delta": delta,
"start_finish": start_finish
})
iterations.append({
"iteration": iteration_s,
"results": results
})
stats = {
"msg_cnt": msg_cnt,
"failed_addtx_cnt": failed_addtx_cnt,
"failed_metrics_cnt": failed_metrics_cnt,
"failed_dissemination_cnt": failed_dissemination_cnt,
"batch_size": batch_size,
"bytes_cnt": bytes_cnt,
"s_time": s_time,
"last_msg_time": last_msg_time,
"next_time_to_msg": next_time_to_msg,
"iterations": iterations,
}
LOGGER.info("Results: %s" %json.dumps(stats))
with open('./summary.json', 'w') as summary_file:
summary_file.write(json.dumps(stats, indent=4))
network_graph("1.png", topology)
hist_delta("2.png", stats['iterations'])
concat_images("collage.png", ["1.png", "2.png"])
""" We are done """
LOGGER.info('Ended')

View File

@ -8,6 +8,7 @@ Description: Wakurtosis load simulator
import sys, logging, yaml, json, time, random, os, argparse, tomllib, glob, hashlib
import requests
import rtnorm
import nomos
# from pathlib import Path
# import numpy as np
# import pandas as pd

View File

@ -5,7 +5,7 @@ Description: Wakurtosis load simulator
"""
""" Dependencies """
import sys, logging, yaml, json, time, random, os, argparse, tomllib, glob
import sys, logging, yaml, json, random, os, argparse, tomllib, glob
import requests
import rtnorm
import nomos
@ -41,58 +41,6 @@ class CustomFormatter(logging.Formatter):
formatter = logging.Formatter(log_fmt, '%d-%m-%Y %H:%M:%S')
return formatter.format(record)
def check_nomos_node(node_address):
url = node_address + "network/info"
try:
response = requests.get(url)
except Exception as e:
G_LOGGER.debug('%s: %s' % (e.__doc__, e))
return False
try:
response_obj = response.json()
except Exception as e:
G_LOGGER.debug('%s: %s' % (e.__doc__, e))
return False
G_LOGGER.debug('Response from %s: %s' %(node_address, response_obj))
return True
def add_nomos_tx(node_address, tx):
url = node_address + "mempool/addtx"
try:
response = requests.post(url, data=json.dumps(tx), headers={'content-type': 'application/json'})
except Exception as e:
G_LOGGER.debug('%s: %s' % (e.__doc__, e))
return False
if len(response.text) > 0:
G_LOGGER.debug('Response from %s: %s' %(url, response.text))
return False
return True
def get_nomos_mempool_metrics(node_address, iteration_s):
url = node_address + "mempool/metrics"
try:
response = requests.get(url)
except Exception as e:
G_LOGGER.debug('%s: %s' % (e.__doc__, e))
return "error", -1
try:
response_obj = response.json()
except Exception as e:
G_LOGGER.debug('%s: %s' % (e.__doc__, e))
return "error", -1
G_LOGGER.debug('Response from %s: %s' %(node_address, response_obj))
time_e = int(time.time() * 1000)
return response_obj, time_e - iteration_s
def main():
@ -155,111 +103,8 @@ def main():
G_LOGGER.debug(targets)
G_LOGGER.info('%d targets loaded' %len(targets))
""" Check all nodes are reachable """
for i, target in enumerate(targets):
if not check_nomos_node('http://%s/' %target):
G_LOGGER.error('Node %d (%s) is not online. Aborted.' %(i, target))
sys.exit(1)
G_LOGGER.info('All %d Waku nodes are reachable.' %len(targets))
nomos.run_tests(G_LOGGER, targets, topology)
""" Start simulation """
msg_cnt = 0
failed_addtx_cnt = 0
failed_metrics_cnt = 0
bytes_cnt = 0
s_time = time.time()
last_msg_time = 0
next_time_to_msg = 0
failed_dissemination_cnt = 0
batch_size = 40
iterations = []
tx_id = 0
G_LOGGER.info('Tx addition start time: %d' %int(round(time.time() * 1000)))
""" Add new transaction to every node """
for i, target in enumerate(targets):
iteration_s = int(time.time() * 1000)
last_tx_sent = iteration_s
tx_id = tx_id + msg_cnt+failed_addtx_cnt+1
for j in range(batch_size):
tx_id += j
tx_target = random.choice(targets)
G_LOGGER.debug('sending tx_id: %s to target: %s' %(tx_id, tx_target))
if not add_nomos_tx('http://%s/' %tx_target, 'tx%s' %tx_id):
G_LOGGER.error('Unable to add new tx. Node %s.' %(tx_target))
failed_addtx_cnt += 1
continue
last_tx_sent = int(time.time() * 1000)
msg_cnt += 1
time.sleep(1.5)
results = []
""" Collect mempool metrics from nodes """
for n, target in enumerate(targets):
res, t = get_nomos_mempool_metrics('http://%s/' %target, iteration_s)
if 'error' in res:
G_LOGGER.error('Unable to pull metrics. Node %d (%s).' %(n, target))
failed_metrics_cnt += 1
continue
is_ok = True
delta = res['last_tx'] - last_tx_sent
start_finish = res['last_tx'] - iteration_s
# Tolerate one second difference between finish and start times.
if -1000 < delta < 0:
delta = 0
if delta < 0:
G_LOGGER.error('delta should be gt that zero: %d' %delta)
delta = -1
G_LOGGER.debug('should be %s' %msg_cnt)
if res['pending_tx'] != msg_cnt:
delta = -1
is_ok = False
failed_dissemination_cnt += 1
results.append({
"node": n,
"is_ok": is_ok,
"delta": delta,
"start_finish": start_finish
})
iterations.append({
"iteration": iteration_s,
"results": results
})
stats = {
"msg_cnt": msg_cnt,
"failed_addtx_cnt": failed_addtx_cnt,
"failed_metrics_cnt": failed_metrics_cnt,
"failed_dissemination_cnt": failed_dissemination_cnt,
"batch_size": batch_size,
"bytes_cnt": bytes_cnt,
"s_time": s_time,
"last_msg_time": last_msg_time,
"next_time_to_msg": next_time_to_msg,
"iterations": iterations,
}
G_LOGGER.info("Results: %s" %json.dumps(stats))
with open('./summary.json', 'w') as summary_file:
summary_file.write(json.dumps(stats, indent=4))
nomos.network_graph("1.png", topology)
nomos.hist_delta("2.png", stats['iterations'])
nomos.concat_images("collage.png", ["1.png", "2.png"])
""" We are done """
G_LOGGER.info('Ended')
if __name__ == "__main__":