diff --git a/wls-module/wls.py b/wls-module/wls.py index fc9f0be..e515504 100644 --- a/wls-module/wls.py +++ b/wls-module/wls.py @@ -1,24 +1,28 @@ -#!/usr/bin/env python3 -""" -Description: Wakurtosis load simulator -""" - """ Dependencies """ -import sys, logging, json, time, random, os, argparse, tomllib, glob, hashlib +import argparse +import hashlib +import json +import logging +import random +import sys +import time +import tomllib import requests -import rtnorm +# Project Imports +import rtnorm """ Globals """ G_APP_NAME = 'WLS' G_LOG_LEVEL = 'DEBUG' G_DEFAULT_CONFIG_FILE = './config/config.json' G_DEFAULT_TOPOLOGY_FILE = './network_topology/network_data.json' -G_LOGGER = None +G_LOGGER = logging.getLogger(G_APP_NAME) +handler = logging.StreamHandler(sys.stdout) -""" Custom logging formatter """ + +# Custom logging formatter class CustomFormatter(logging.Formatter): - # Set different formats for every logging level time_name_stamp = "[%(asctime)s.%(msecs)03d] [" + G_APP_NAME + "]" FORMATS = { @@ -35,81 +39,8 @@ class CustomFormatter(logging.Formatter): formatter = logging.Formatter(log_fmt, '%d-%m-%Y %H:%M:%S') return formatter.format(record) -def check_waku_node(node_address): - - data = { - 'jsonrpc': '2.0', - 'method': 'get_waku_v2_debug_v1_info', - # 'method' : 'get_waku_v2_debug_v1_version', - 'id': 1, - 'params': []} - - G_LOGGER.info('Waku RPC: %s from %s' %(data['method'], node_address)) - - try: - response = requests.post(node_address, data=json.dumps(data), headers={'content-type': 'application/json'}) - except Exception as e: - G_LOGGER.debug('%s: %s' % (e.__doc__, e)) - return False - - try: - response_obj = response.json() - except Exception as e: - G_LOGGER.debug('%s: %s' % (e.__doc__, e)) - return False - - G_LOGGER.debug('Response from %s: %s' %(node_address, response_obj)) - - return True - -def get_waku_msgs(node_address, topic, cursor=None): - - data = { - 'jsonrpc': '2.0', - 'method': 'get_waku_v2_store_v1_messages', - 'id': 1, - 'params': [topic, None, None, None, {"pageSize": 100, "cursor": cursor,"forward": True}] - } - - G_LOGGER.debug('Waku RPC: %s from %s' %(data['method'], node_address)) - - s_time = time.time() - - response = requests.post(node_address, data=json.dumps(data), headers={'content-type': 'application/json'}) - - elapsed_ms =(time.time() - s_time) * 1000 - - response_obj = response.json() - - # G_LOGGER.debug('Response from %s: %s [%.4f ms.]' %(node_address, response_obj, elapsed_ms)) - - return response_obj, elapsed_ms - -# https://rfc.vac.dev/spec/16/#get_waku_v2_relay_v1_messages -def get_last_waku_msgs(node_address, topic): - - data = { - 'jsonrpc': '2.0', - 'method': 'get_waku_v2_relay_v1_messages', - 'id': 1, - 'params' : [topic]} - - G_LOGGER.debug('Waku RPC: %s from %s' %(data['method'], node_address)) - - s_time = time.time() - - response = requests.post(node_address, data=json.dumps(data), headers={'content-type': 'application/json'}) - - elapsed_ms =(time.time() - s_time) * 1000 - - response_obj = response.json() - - # G_LOGGER.debug('Response from %s: %s [%.4f ms.]' %(node_address, response_obj, elapsed_ms)) - - return response_obj, elapsed_ms def send_waku_msg(node_address, topic, payload, nonce=1): - # waku_msg = { # 'nonce' : nonce, # 'timestamp' : time.time_ns(), @@ -147,15 +78,18 @@ def send_waku_msg(node_address, topic, payload, nonce=1): return response_obj, elapsed_ms, json.dumps(waku_msg), my_payload['ts'] + # Generate a random interval using a Poisson distribution def poisson_interval(rate): return random.expovariate(rate) + def make_payload(size): payload = hex(random.getrandbits(4*size)) G_LOGGER.debug('Payload of size %d bytes: %s' %(size, payload)) return payload + def make_payload_dist(dist_type, min_size, max_size): # Check if min and max packet sizes are the same @@ -189,23 +123,6 @@ def make_payload_dist(dist_type, min_size, max_size): return '0x00', 0 -def parse_targets(enclave_dump_path, waku_port=8545): - - targets = [] - - G_LOGGER.info('Extracting Waku node addresses from Kurtosus enclance dump in %s' %enclave_dump_path) - - for path_obj in os.walk(enclave_dump_path): - if 'waku_' in path_obj[0]: - with open(path_obj[0] + '/spec.json', "r") as read_file: - spec_obj = json.load(read_file) - network_settings = spec_obj['NetworkSettings'] - waku_address = network_settings['Ports']['%d/tcp' %waku_port] - targets.append('%s:%s' %(waku_address[0]['HostIp'], waku_address[0]['HostPort'])) - - G_LOGGER.info('Parsed %d Waku nodes' %len(targets)) - - return targets def get_next_time_to_msg(inter_msg_type, msg_rate, simulation_time): @@ -218,68 +135,35 @@ def get_next_time_to_msg(inter_msg_type, msg_rate, simulation_time): G_LOGGER.error('%s is not a valid inter_msg_type. Aborting.' %inter_msg_type) sys.exit() -def get_all_messages_from_node_from_topic(node_address, topic): - page_cnt = 0 - msg_cnt = 0 - - # Retrieve the first page - response, elapsed = get_waku_msgs(node_address, topic) - if 'error' in response: - G_LOGGER.error(response['error']) - return 0 - - messages = response['result']['messages'] - msg_cnt += len(messages) - G_LOGGER.debug('Got page %d with %d messages from node %s and topic: %s' %(page_cnt, len(messages), node_address, topic)) - - for msg_idx, msg in enumerate(messages): - # Decode the payload - payload_obj = json.loads(''.join(map(chr, msg['payload']))) - - # Retrieve further pages - while(response['result']['pagingOptions']): - page_cnt += 1 - cursor = response['result']['pagingOptions']['cursor'] - index = {"digest" : cursor['digest'], "receivedTime" : cursor['receiverTime']} - response, elapsed = get_waku_msgs(node_address, topic, cursor) - if 'error' in response: - G_LOGGER.error(response['error']) - break - - messages = response['result']['messages'] - msg_cnt += len(messages) - G_LOGGER.debug('Got page %d with %d messages from node %s and topic: %s' %(page_cnt, len(messages), node_address, topic)) - - for msg_idx, msg in enumerate(messages): - # Decode the payload - payload_obj = json.loads(''.join(map(chr, msg['payload']))) - - return msg_cnt - -def main(): - - global G_LOGGER - +def innit_logging(): """ Init Logging """ - G_LOGGER = logging.getLogger(G_APP_NAME) - handler = logging.StreamHandler(sys.stdout) handler.setFormatter(CustomFormatter()) G_LOGGER.addHandler(handler) - G_LOGGER.info('Started') + +def configure_logging(wls_config, config_file): + G_LOGGER.setLevel(wls_config['debug_level']) + handler.setLevel(wls_config['debug_level']) + G_LOGGER.debug(wls_config) + G_LOGGER.info('Configuration loaded from %s' %config_file) + + +def parse_cli(): """ Parse command line args. """ parser = argparse.ArgumentParser() - parser.add_argument("-cfg", "--config_file", help="Config file", action="store_true", default=G_DEFAULT_CONFIG_FILE) + parser.add_argument("-cfg", "--config_file", help="Config file", action="store_true", + default=G_DEFAULT_CONFIG_FILE) parser.add_argument("-t", "--topology_file", help="Topology file", action="store_true", default=G_DEFAULT_TOPOLOGY_FILE) args = parser.parse_args() - config_file = args.config_file - topology_file = args.topology_file - + return args + + +def load_config_file(config_file): """ Load config file """ try: with open(config_file, 'r') as f: @@ -287,19 +171,11 @@ def main(): except Exception as e: G_LOGGER.error('%s: %s' % (e.__doc__, e)) sys.exit() - - # Set loglevel from config - wls_config = config['wls'] - G_LOGGER.setLevel(wls_config['debug_level']) - handler.setLevel(wls_config['debug_level']) + return config - G_LOGGER.debug(wls_config) - G_LOGGER.info('Configuration loaded from %s' %config_file) - - # Set RPNG seed from config - random.seed(config['general']['prng_seed']) +def load_topology(topology_file): """ Load topology """ try: with open(topology_file, 'r') as read_file: @@ -313,56 +189,64 @@ def main(): sys.exit(1) G_LOGGER.debug(topology) - G_LOGGER.info('%d topology loaded' %len(topology)) + G_LOGGER.info('%d topology loaded' % len(topology)) - # Dictionary to count messages of every topic being sent - topics_msg_cnt = {} + return topology + + +def load_topics_into_topology(topology): """ Load Topics """ nodes = topology["nodes"] for node, node_info in nodes.items(): try: - with open("tomls/"+node_info["node_config"], mode='rb') as read_file: + with open("tomls/" + node_info["node_config"], mode='rb') as read_file: toml_config = tomllib.load(read_file) if node_info["image"] == "nim-waku": topics = list(toml_config["topics"].split(" ")) elif node_info["image"] == "go-waku": topics = toml_config["topics"] - for topic in topics: - topics_msg_cnt[topic] = 0 - # Load topics into topology for easier access nodes[node]["topics"] = topics except Exception as e: G_LOGGER.error('%s: %s' % (e.__doc__, e)) sys.exit() - - G_LOGGER.info('Loaded nodes topics from toml files: %s' %topics_msg_cnt.keys()) + G_LOGGER.info('Loaded nodes topics from toml files') + + +def get_random_emitters(topology, wls_config): + nodes = topology["nodes"] """ Define the subset of emitters """ num_emitters = int(len(nodes) * wls_config["emitters_fraction"]) if num_emitters == 0: - G_LOGGER.error('The number of emitters must be greater than zero. Try increasing the fraction of emitters.') + G_LOGGER.error( + 'The number of emitters must be greater than zero. Try increasing the fraction of emitters.') sys.exit() random_emitters = dict(random.sample(list(nodes.items()), num_emitters)) G_LOGGER.info('Selected %d emitters out of %d total nodes' % (len(random_emitters), len(nodes))) + return random_emitters + + +def start_traffic_inyection(wls_config, random_emitters): """ Start simulation """ s_time = time.time() last_msg_time = 0 next_time_to_msg = 0 msgs_dict = {} - G_LOGGER.info('Starting a simulation of %d seconds ...' %wls_config['simulation_time']) + G_LOGGER.info('Starting a simulation of %d seconds ...' % wls_config['simulation_time']) while True: # Check end condition elapsed_s = time.time() - s_time - if elapsed_s >= wls_config['simulation_time']: - G_LOGGER.info('Simulation ended. Sent %d messages in %ds.' %(len(msgs_dict), elapsed_s)) + if elapsed_s >= wls_config['simulation_time']: + G_LOGGER.info( + 'Simulation ended. Sent %d messages in %ds.' % (len(msgs_dict), elapsed_s)) break # Send message @@ -371,38 +255,49 @@ def main(): if msg_elapsed <= next_time_to_msg: continue - G_LOGGER.debug('Time Δ: %.6f ms.' %((msg_elapsed - next_time_to_msg) * 1000.0)) - + G_LOGGER.debug('Time Δ: %.6f ms.' % ((msg_elapsed - next_time_to_msg) * 1000.0)) + # Pick an emitter at random from the emitters list - # emitter_idx = random.choice(emitters_indices) random_emitter, random_emitter_info = random.choice(list(random_emitters.items())) - - emitter_address = f"http://{random_emitter_info['ip_address']}:{random_emitter_info['ports']['rpc_'+random_emitter][0]}/" + + emitter_address = f"http://{random_emitter_info['ip_address']}:{random_emitter_info['ports']['rpc_' + random_emitter][0]}/" emitter_topics = random_emitter_info["topics"] # Pick a topic at random from the topics supported by the emitter emitter_topic = random.choice(emitter_topics) - G_LOGGER.info('Injecting message of topic %s to network through Waku node %s ...' %(emitter_topic, emitter_address)) + G_LOGGER.info('Injecting message of topic %s to network through Waku node %s ...' % ( + emitter_topic, emitter_address)) - payload, size = make_payload_dist(dist_type=wls_config['dist_type'].lower(), min_size=wls_config['min_packet_size'], max_size=wls_config['max_packet_size']) - response, elapsed, waku_msg, ts = send_waku_msg(emitter_address, topic=emitter_topic, payload=payload, nonce=len(msgs_dict)) + payload, size = make_payload_dist(dist_type=wls_config['dist_type'].lower(), + min_size=wls_config['min_packet_size'], + max_size=wls_config['max_packet_size']) + response, elapsed, waku_msg, ts = send_waku_msg(emitter_address, topic=emitter_topic, + payload=payload, nonce=len(msgs_dict)) if response['result']: msg_hash = hashlib.sha256(waku_msg.encode('utf-8')).hexdigest() if msg_hash in msgs_dict: - G_LOGGER.error('Hash collision. %s already exists in dictionary' %msg_hash) + G_LOGGER.error('Hash collision. %s already exists in dictionary' % msg_hash) continue - msgs_dict[msg_hash] = {'ts' : ts, 'injection_point' : emitter_address, 'nonce' : len(msgs_dict), 'topic' : emitter_topic, 'payload' : payload, 'payload_size' : size} - + msgs_dict[msg_hash] = {'ts': ts, 'injection_point': emitter_address, + 'nonce': len(msgs_dict), 'topic': emitter_topic, + 'payload': payload, 'payload_size': size} + # Compute the time to next message - next_time_to_msg = get_next_time_to_msg(wls_config['inter_msg_type'], wls_config['message_rate'], wls_config['simulation_time']) - G_LOGGER.debug('Next message will happen in %d ms.' %(next_time_to_msg * 1000.0)) - + next_time_to_msg = get_next_time_to_msg(wls_config['inter_msg_type'], + wls_config['message_rate'], + wls_config['simulation_time']) + G_LOGGER.debug('Next message will happen in %d ms.' % (next_time_to_msg * 1000.0)) + last_msg_time = time.time() - + elapsed_s = time.time() - s_time + return msgs_dict + + +def save_messages(msgs_dict): # Save messages for further analysis with open('./messages.json', 'w') as f: f.write(json.dumps(msgs_dict, indent=4)) @@ -411,6 +306,34 @@ def main(): G_LOGGER.info('Ended') -if __name__ == "__main__": +def main(): + innit_logging() + + args = parse_cli() + + config_file = args.config_file + topology_file = args.topology_file + + config = load_config_file(config_file) + # Set loglevel from config + wls_config = config['wls'] + + configure_logging(wls_config, config_file) + + # Set RPNG seed from config + random.seed(config['general']['prng_seed']) + + topology = load_topology(topology_file) + + load_topics_into_topology(topology) + + random_emitters = get_random_emitters(topology, wls_config) + + msgs_dict = start_traffic_inyection(wls_config, random_emitters) + + save_messages(msgs_dict) + + +if __name__ == "__main__": main()