Merge branch 'master' into Alberto/new_info_to_wls

This commit is contained in:
Alberto Soutullo 2023-03-16 14:03:51 +01:00 committed by GitHub
commit 41d5f67b20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 132 additions and 79 deletions

View File

@ -9,13 +9,17 @@
"interconnection_batch": 10
},
"gennet": {
"num_nodes": 10,
"num_nodes": 3,
"fanout": 3,
"num_topics": 1,
"num_partitions": 1,
"num_subnets": 1,
"container_size": "3",
"node_type_distribution": { "nwaku":100, "gowaku":0, "nomos":0 },
"node_type": "desktop",
"container_size": "1",
"node_type_distribution": { "nwaku":100, "gowaku":0 },
"node_config": {
"nwaku": "rpc-admin = true\nkeep-alive = true\nmetrics-server = true\ndiscv5-discovery = true\n",
"gowaku": "rpc-admin = true\nmetrics-server = true\nrpc = true\n"
},
"network_type": "newmanwattsstrogatz",
"output_dir": "network_data",
"benchmark": "False"

View File

@ -19,20 +19,18 @@ from enum import Enum
# To add a new node type, add appropriate entries to the nodeType and nodeTypeSwitch
class nodeType(Enum):
NWAKU = "nwaku" # waku desktop config
GOWAKU = "gowaku" # waku mobile config
NOMOS = "nomos"
NWAKU = "nwaku" # waku desktop config
GOWAKU = "gowaku" # waku mobile config
nodeTypeToToml = {
nodeTypeToTomlDefault = {
nodeType.NWAKU: "rpc-admin = true\nkeep-alive = true\nmetrics-server = true\n",
nodeType.GOWAKU: "rpc-admin = true\nmetrics-server = true\nrpc = true\n"
}
nodeTypeToDocker = {
nodeType.NWAKU: "nim-waku",
nodeType.GOWAKU: "go-waku",
nodeType.NOMOS: "nomos"
nodeType.GOWAKU: "go-waku"
}
# To add a new network type, add appropriate entries to the networkType and networkTypeSwitch
@ -43,12 +41,14 @@ class networkType(Enum):
NEWMANWATTSSTROGATZ = "newmanwattsstrogatz" # mesh, smallworld
BARBELL = "barbell" # partition
BALANCEDTREE = "balancedtree" # committees?
NOMOSTREE = "nomostree" # balanced binary tree with even # of leaves
STAR = "star" # spof
NW_DATA_FNAME = "network_data.json"
EXTERNAL_NODES_PREFIX, NODE_PREFIX, SUBNET_PREFIX, CONTAINER_PREFIX = \
"nodes", "node", "subnetwork", "containers"
ID_STR_SEPARATOR = "-"
### I/O related fns ##############################################################
@ -66,7 +66,7 @@ def write_toml(dirname, node_name, toml):
# Draw the network and output the image to a file; does not account for subnets yet
def draw(dirname, H):
def draw_network(dirname, H):
nx.draw(H, pos=nx.kamada_kawai_layout(H), with_labels=True)
fname = os.path.join(dirname, NW_DATA_FNAME)
plt.savefig(f"{os.path.splitext(fname)[0]}.png", format="png")
@ -121,34 +121,65 @@ def get_random_sublist(topics):
### network processing related fns #################################################
# Network Types
def generate_config_model(n):
# |G| = n
def generate_config_model(ctx):
n = ctx.params["num_nodes"]
# degrees = nx.random_powerlaw_tree_sequence(n, tries=10000)
degrees = [random.randint(1, n) for i in range(n)]
if (sum(degrees)) % 2 != 0: # adjust the degree to be even
degrees[-1] += 1
return nx.configuration_model(degrees) # generate the graph
def generate_scalefree_graph(n):
# |G| = n
def generate_scalefree_graph(ctx):
n = ctx.params["num_nodes"]
return nx.scale_free_graph(n)
# |G| = n; n must be larger than k=D=3
def generate_newmanwattsstrogatz_graph(ctx):
n = ctx.params["num_nodes"]
fanout = ctx.params["fanout"]
return nx.newman_watts_strogatz_graph(n, fanout, 0.5)
# n must be larger than k=D=3
def generate_newmanwattsstrogatz_graph(n):
return nx.newman_watts_strogatz_graph(n, 3, 0.5)
def generate_barbell_graph(n):
# |G| = n (if odd); n+1 (if even)
def generate_barbell_graph(ctx):
n = ctx.params["num_nodes"]
return nx.barbell_graph(int(n / 2), 1)
def generate_balanced_tree(n, fanout=3):
# |G| > fanout^{\floor{log_n} + 1}
def generate_balanced_tree(ctx):
n = ctx.params["num_nodes"]
fanout = ctx.params["fanout"]
height = int(math.log(n) / math.log(fanout))
return nx.balanced_tree(fanout, height)
# nomostree is a balanced binary tree with even number of leaves
# |G| = n (if odd); n+1 (if even)
def generate_nomos_tree(ctx):
n = ctx.params["num_nodes"]
fanout = ctx.params["fanout"]
# nomos currently insists on binary trees
assert(fanout == 2)
height = int(math.log(n) / math.log(fanout))
G = nx.balanced_tree(fanout, height)
i, diff = 0, G.number_of_nodes() - n
leaves = [x for x in G.nodes() if G.degree(x) == 1]
nleaves = len(leaves)
if (nleaves - diff) % 2 != 0 :
diff -= 1
for node in leaves :
if i == diff:
break
G.remove_node(node)
i += 1
G = nx.convert_node_labels_to_integers(G)
return G
def generate_star_graph(n):
return nx.star_graph(n)
# |G| = n
def generate_star_graph(ctx):
n = ctx.params["num_nodes"]
return nx.star_graph(n-1)
networkTypeSwitch = {
@ -157,27 +188,29 @@ networkTypeSwitch = {
networkType.NEWMANWATTSSTROGATZ: generate_newmanwattsstrogatz_graph,
networkType.BARBELL: generate_barbell_graph,
networkType.BALANCEDTREE: generate_balanced_tree,
networkType.NOMOSTREE: generate_nomos_tree,
networkType.STAR: generate_star_graph
}
# Generate the network from nw type
def generate_network(n, network_type):
return postprocess_network(networkTypeSwitch.get(network_type)(n))
def generate_network(ctx):
network_type = networkType(ctx.params["network_type"])
return postprocess_network(networkTypeSwitch.get(network_type)(ctx))
# Label the generated network with prefix
def postprocess_network(G):
G = nx.Graph(G) # prune out parallel/multi edges
G.remove_edges_from(nx.selfloop_edges(G)) # remove the self-loops
mapping = {i: f"{NODE_PREFIX}_{i}" for i in range(len(G))}
mapping = {i: f"{NODE_PREFIX}{ID_STR_SEPARATOR}{i}" for i in range(len(G))}
return nx.relabel_nodes(G, mapping) # label the nodes
def generate_subnets(G, num_subnets):
n = len(G.nodes)
if num_subnets == n: # if num_subnets == size of the network
return {f"{NODE_PREFIX}_{i}": f"{SUBNET_PREFIX}_{i}" for i in range(n)}
return {f"{NODE_PREFIX}{ID_STR_SEPARATOR}{i}": f"{SUBNET_PREFIX}_{i}" for i in range(n)}
lst = list(range(n))
random.shuffle(lst)
@ -188,7 +221,7 @@ def generate_subnets(G, num_subnets):
for end in offsets:
l = []
for i in range(start, end + 1):
node2subnet[f"{NODE_PREFIX}_{lst[i]}"] = f"{SUBNET_PREFIX}_{subnet_id}"
node2subnet[f"{NODE_PREFIX}{ID_STR_SEPARATOR}{lst[i]}"] = f"{SUBNET_PREFIX}_{subnet_id}"
#node2subnet[lst[i]] = subnet_id
start = end
subnet_id += 1
@ -197,15 +230,22 @@ def generate_subnets(G, num_subnets):
### file format related fns ###########################################################
# Generate per node toml configs
def generate_toml(topics, node_type=nodeType.NWAKU):
def generate_toml(topics, configuration, node_type=nodeType.NWAKU):
topics = get_random_sublist(topics)
if node_type == nodeType.GOWAKU: # comma separated list of quoted topics
topic_str = ", ".join(f"\"{t}\"" for t in topics)
topic_str = f"[{topic_str}]"
else: # space separated topics
topic_str = " ".join(topics)
topic_str = " ".join(topics)
topic_str = f"\"{topic_str}\""
return f"{nodeTypeToToml.get(node_type)}topics = {topic_str}\n"
if configuration is None:
config = nodeTypeToTomlDefault.get(node_type)
return f"{config}topics = {topic_str}\n"
return f"{configuration}topics = {topic_str}\n"
# Convert a dict to pair of arrays
@ -226,12 +266,12 @@ def generate_node_types(node_type_distribution, G):
# Inverts a dictionary of lists
def invert_dict_of_list(d):
inv = {}
for key, val in d.items():
if val not in inv:
inv[val] = [key]
else:
inv[val].append(key)
inv = {}
for key, val in d.items():
if val not in inv:
inv[val] = [key]
else:
inv[val].append(key)
return inv
@ -260,11 +300,11 @@ def generate_and_write_files(ctx: typer, G):
json_dump = {}
json_dump[CONTAINER_PREFIX] = {}
json_dump[EXTERNAL_NODES_PREFIX] = {}
inv = {}
for key, val in node2container.items():
if val[1] not in inv:
inv = {}
for key, val in node2container.items():
if val[1] not in inv:
inv[val[1]] = [key]
else:
else:
inv[val[1]].append(key)
for container, nodes in inv.items():
json_dump[CONTAINER_PREFIX][container] = nodes
@ -275,7 +315,8 @@ def generate_and_write_files(ctx: typer, G):
# write the per node toml for the i^ith node of appropriate type
node_type, i = node_types_enum[i], i+1
write_toml(ctx.params["output_dir"], node, generate_toml(topics, node_type))
configuration = ctx.params.get("node_config", {}).get(node_type.value)
write_toml(ctx.params["output_dir"], node, generate_toml(topics, configuration, node_type))
json_dump[EXTERNAL_NODES_PREFIX][node] = {}
json_dump[EXTERNAL_NODES_PREFIX][node]["static_nodes"] = []
for edge in G.edges(node):
@ -333,17 +374,20 @@ def _num_subnets_callback(ctx: typer, Context, num_subnets: int):
def main(ctx: typer.Context,
benchmark: bool = False,
container_size: int = 1, # TODO: reduce container packer memory consumption
output_dir: str = "network_data",
prng_seed: int = 3,
num_nodes: int = 4,
num_topics: int = 1,
node_type_distribution: str = typer.Argument("{\"nwaku\" : 100 }" ,callback=ast.literal_eval),
network_type: networkType = networkType.NEWMANWATTSSTROGATZ.value,
num_subnets: int = typer.Option(1, callback=_num_subnets_callback),
num_partitions: int = typer.Option(1, callback=_num_partitions_callback),
config_file: str = typer.Option("", callback=conf_callback, is_eager=True)):
benchmark: bool = typer.Option(False, help="Measure CPU/Mem usage of Gennet"),
draw: bool = typer.Option(False, help="Draw the generated network"),
container_size: int = typer.Option(1, help="Set the number of nodes per container"), # TODO: reduce container packer memory consumption
output_dir: str = typer.Option("network_data", help="Set the output directory for Gennet generated files"),
prng_seed: int = typer.Option(41, help="Set the random seed"),
num_nodes: int = typer.Option(4, help="Set the number of nodes"),
num_topics: int = typer.Option(1, help="Set the number of topics"),
fanout: int = typer.Option(3, help="Set the arity for trees & newmanwattsstrogatz"),
node_type_distribution: str = typer.Argument("{\"nwaku\" : 100 }" ,callback=ast.literal_eval, help="Set the node type distribution"),
node_config: str = typer.Argument("{}" ,callback=ast.literal_eval, help="Set the node configuration"),
network_type: networkType = typer.Option(networkType.NEWMANWATTSSTROGATZ.value, help="Set the node type"),
num_subnets: int = typer.Option(1, callback=_num_subnets_callback, help="Set the number of subnets"),
num_partitions: int = typer.Option(1, callback=_num_partitions_callback, help="Set the number of network partitions"),
config_file: str = typer.Option("", callback=conf_callback, is_eager=True, help="Set the input config file (JSON)")):
# Benchmarking: record start time and start tracing mallocs
if benchmark :
@ -361,7 +405,7 @@ def main(ctx: typer.Context,
print("Setting the random seed to", prng_seed)
random.seed(prng_seed)
np.random.seed(prng_seed)
# leaving it for now should any json parsing issues pops up
# Extract the node type distribution from config.json or use the default
# no cli equivalent for node type distribution (NTD)
@ -372,18 +416,20 @@ def main(ctx: typer.Context,
# Generate the network
G = generate_network(num_nodes, networkType(network_type))
# G = generate_network(num_nodes, networkType(network_type), tree_arity)
G = generate_network(ctx)
# Do not complain if folder exists already
# Do not complain if the folder already exists
os.makedirs(output_dir, exist_ok=True)
# Generate file format specific data structs and write the files
generate_and_write_files(ctx, G)
#generate_and_write_files(output_dir, num_topics, num_subnets, node_type_distribution, G)
#draw(G, outpur_dir)
if draw :
draw_network(output_dir, G)
end = time.time()
time_took = end - start
print(f"For {num_nodes} nodes, network generation took {time_took} secs.\nThe generated network is under ./{output_dir}")
print(f"For {G.number_of_nodes()}/{num_nodes} nodes, network generation took {time_took} secs.\nThe generated network is under ./{output_dir}")
# Benchmarking. Record finish time and stop the malloc tracing
if benchmark :

View File

@ -26,7 +26,7 @@ def generate_template_node_targets(services, port_id, key_value):
service_info = services[vars.GENNET_NODES_KEY][service_name]
service_ip = service_info[vars.IP_KEY]
service_port_number = str(service_info[vars.PORTS_KEY][port_id+"_"+service_name][0])
service_port_number = str(service_info[vars.PORTS_KEY][port_id + vars.ID_STR_SEPARATOR + service_name][0])
targets_data.append('"' + service_ip + ":" + service_port_number + '"')
data_as_string = ",".join(targets_data)

View File

@ -74,7 +74,7 @@ def interconnect_nodes(plan, topology_information, interconnection_batch):
def _add_service_info_to_topology(plan, all_services_information, network_topology):
for node_id, node_info in network_topology[vars.GENNET_NODES_KEY].items():
node_rpc_port_id = vars.RPC_PORT_ID + "_" + node_id
node_rpc_port_id = vars.RPC_PORT_ID + vars.ID_STR_SEPARATOR + node_id
image = network_topology[vars.GENNET_NODES_KEY][node_id][vars.GENNET_IMAGE_KEY]
peer_id_getter = dispatchers.service_info_dispatcher[image]

View File

@ -33,15 +33,15 @@ def _prepare_nomos_cmd_in_service(nomos_names, config_files):
def _prepare_nomos_ports_in_service(node_names):
prepared_ports = {}
for i in range(len(node_names)):
prepared_ports[vars.RPC_PORT_ID + "_" + node_names[i]] = \
prepared_ports[vars.RPC_PORT_ID + vars.ID_STR_SEPARATOR + node_names[i]] = \
PortSpec(number=vars.NOMOS_RPC_PORT_NUMBER + i,
transport_protocol=vars.NOMOS_RPC_PORT_PROTOCOL)
prepared_ports[vars.PROMETHEUS_PORT_ID + "_" + node_names[i]] = \
prepared_ports[vars.PROMETHEUS_PORT_ID + vars.ID_STR_SEPARATOR + node_names[i]] = \
PortSpec(number=vars.PROMETHEUS_PORT_NUMBER + i,
transport_protocol=vars.PROMETHEUS_PORT_PROTOCOL)
prepared_ports[vars.NOMOS_LIBP2P_PORT_ID + "_" + node_names[i]] = \
prepared_ports[vars.NOMOS_LIBP2P_PORT_ID + vars.ID_STR_SEPARATOR + node_names[i]] = \
PortSpec(number=vars.NOMOS_LIBP2P_PORT + i,
transport_protocol=vars.NOMOS_LIBP2P_PORT_PROTOCOL)
@ -58,9 +58,9 @@ def _prepare_nomos_config_files_in_service(node_names, artifact_ids):
def add_nomos_ports_info_to_topology(network_topology, all_services_information, node_info, node_id):
nomos_rpc_port_id = vars.RPC_PORT_ID + "_" + node_id
libp2p_port_id = vars.NOMOS_LIBP2P_PORT_ID + "_" + node_id
prometheus_port_id = vars.PROMETHEUS_PORT_ID + "_" + node_id
nomos_rpc_port_id = vars.RPC_PORT_ID + vars.ID_STR_SEPARATOR + node_id
libp2p_port_id = vars.NOMOS_LIBP2P_PORT_ID + vars.ID_STR_SEPARATOR + node_id
prometheus_port_id = vars.PROMETHEUS_PORT_ID + vars.ID_STR_SEPARATOR + node_id
network_topology[vars.GENNET_NODES_KEY][node_id][vars.PORTS_KEY] = {}
_add_nomos_port(network_topology, all_services_information, node_id, node_info, nomos_rpc_port_id)

View File

@ -8,15 +8,15 @@ def prepare_waku_ports_in_service(node_names, network_topology):
for node_name in node_names:
node_info = network_topology[vars.GENNET_NODES_KEY][node_name]
prepared_ports[vars.RPC_PORT_ID + "_" + node_name] = \
prepared_ports[vars.RPC_PORT_ID + vars.ID_STR_SEPARATOR + node_name] = \
PortSpec(number=vars.WAKU_RPC_PORT_NUMBER + node_info[vars.GENNET_PORT_SHIFT_KEY],
transport_protocol=vars.WAKU_RPC_PORT_PROTOCOL)
prepared_ports[vars.PROMETHEUS_PORT_ID + "_" + node_name] = \
prepared_ports[vars.PROMETHEUS_PORT_ID + vars.ID_STR_SEPARATOR + node_name] = \
PortSpec(number=vars.PROMETHEUS_PORT_NUMBER + node_info[vars.GENNET_PORT_SHIFT_KEY],
transport_protocol=vars.PROMETHEUS_PORT_PROTOCOL)
prepared_ports[vars.WAKU_LIBP2P_PORT_ID + "_" + node_name] = \
prepared_ports[vars.WAKU_LIBP2P_PORT_ID + vars.ID_STR_SEPARATOR + node_name] = \
PortSpec(number=vars.WAKU_LIBP2P_PORT + node_info[vars.GENNET_PORT_SHIFT_KEY],
transport_protocol=vars.WAKU_LIBP2P_PORT_PROTOCOL)
@ -32,9 +32,9 @@ def prepare_waku_config_files_in_service(node_names, artifact_ids):
def add_waku_ports_info_to_topology(network_topology, all_services_information, node_info, node_id):
waku_rpc_port_id = vars.RPC_PORT_ID + "_" + node_id
libp2p_port_id = vars.WAKU_LIBP2P_PORT_ID + "_" + node_id
prometheus_port_id = vars.PROMETHEUS_PORT_ID + "_" + node_id
waku_rpc_port_id = vars.RPC_PORT_ID + vars.ID_STR_SEPARATOR + node_id
libp2p_port_id = vars.WAKU_LIBP2P_PORT_ID + vars.ID_STR_SEPARATOR + node_id
prometheus_port_id = vars.PROMETHEUS_PORT_ID + vars.ID_STR_SEPARATOR + node_id
network_topology[vars.GENNET_NODES_KEY][node_id][vars.PORTS_KEY] = {}
_add_waku_port(network_topology, all_services_information, node_id, node_info, waku_rpc_port_id)

View File

@ -30,7 +30,7 @@ def _merge_peer_ids(peer_ids):
def connect_nomos_to_peers(plan, service_name, node_id, port_id, peer_ids):
body = _merge_peer_ids(peer_ids)
port_id = port_id + "_" + node_id
port_id = port_id + vars.ID_STR_SEPARATOR + node_id
response = call_protocols.send_http_post_req(plan, service_name, port_id, vars.NOMOS_NET_CONN_URL, body)

View File

@ -2,6 +2,9 @@
NWAKU_IMAGE = "statusteam/nim-waku:nwaku-trace2"
GOWAKU_IMAGE = "gowaku"
# If changing this, you'll likely need to change it as well in gennet
ID_STR_SEPARATOR = "-"
RPC_PORT_ID = "rpc"
NODE_CONFIG_FILE_LOCATION = "github.com/logos-co/wakurtosis/config/topology_generated/"
@ -43,7 +46,7 @@ NOMOS_NET_CONN_URL = "/network/conn"
# Prometheus Configuration
PROMETHEUS_IMAGE = "prom/prometheus:latest"
PROMETHEUS_SERVICE_NAME = "prometheus"
PROMETHEUS_PORT_ID = "prometheus"
PROMETHEUS_PORT_ID = "prom"
PROMETHEUS_PORT_PROTOCOL = "TCP"
PROMETHEUS_PORT_NUMBER = 8008
PROMETHEUS_CONFIGURATION_PATH = "github.com/logos-co/wakurtosis/monitoring/prometheus.yml"
@ -63,7 +66,7 @@ GRAFANA_CUSTOMIZATION_PATH = "github.com/logos-co/wakurtosis/monitoring/configur
GRAFANA_DASHBOARD_PATH = "github.com/logos-co/wakurtosis/monitoring/configuration/dashboards/"
GRAFANA_SERVICE_NAME = "grafana"
GRAFANA_PORT_ID = "grafana_tcp"
GRAFANA_PORT_ID = "grafana" + ID_STR_SEPARATOR + "tcp"
GRAFANA_TCP_PORT = 3000
CONTAINER_CONFIGURATION_GRAFANA = "/etc/grafana/"

View File

@ -19,7 +19,7 @@ def get_wakunode_peer_id(plan, service_name, port_id):
def create_node_multiaddress(node_id, node_information):
ip = node_information[vars.IP_KEY]
port = node_information[vars.PORTS_KEY][vars.WAKU_LIBP2P_PORT_ID + "_" + node_id][0]
port = node_information[vars.PORTS_KEY][vars.WAKU_LIBP2P_PORT_ID + vars.ID_STR_SEPARATOR + node_id][0]
waku_node_id = node_information[vars.PEER_ID_KEY]
return '"/ip4/' + str(ip) + '/tcp/' + str(port) + '/p2p/' + waku_node_id + '"'
@ -32,7 +32,7 @@ def _merge_peer_ids(peer_ids):
def connect_wakunode_to_peers(plan, service_name, node_id, port_id, peer_ids):
method = vars.CONNECT_TO_PEER_METHOD
params = _merge_peer_ids(peer_ids)
port_id = port_id + "_" + node_id
port_id = port_id + vars.ID_STR_SEPARATOR + node_id
response = call_protocols.send_json_rpc(plan, service_name, port_id, method, params)