From f49a572d50bb2aa66db86dcf69e045ae48e5a132 Mon Sep 17 00:00:00 2001 From: Jordi Arranz Date: Thu, 2 Feb 2023 12:40:54 +0000 Subject: [PATCH] Generates a dictionary of messages sent to be used by wls_analyse.py --- wsl-module/wsl.py | 80 +++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/wsl-module/wsl.py b/wsl-module/wsl.py index 6a7a615..3bcd923 100644 --- a/wsl-module/wsl.py +++ b/wsl-module/wsl.py @@ -121,7 +121,7 @@ def send_waku_msg(node_address, topic, payload, nonce=1): my_payload = { 'nonce' : nonce, - 'timestamp' : time.time_ns(), + 'ts' : time.time_ns(), 'payload' : payload } @@ -138,8 +138,10 @@ def send_waku_msg(node_address, topic, payload, nonce=1): G_LOGGER.debug('Waku RPC: %s from %s Topic: %s' %(data['method'], node_address, topic)) s_time = time.time() + + json_data = json.dumps(data) - response = requests.post(node_address, data=json.dumps(data), headers={'content-type': 'application/json'}) + response = requests.post(node_address, data=json_data, headers={'content-type': 'application/json'}) elapsed_ms =(time.time() - s_time) * 1000 @@ -147,7 +149,7 @@ def send_waku_msg(node_address, topic, payload, nonce=1): G_LOGGER.debug('Response from %s: %s [%.4f ms.]' %(node_address, response_obj, elapsed_ms)) - return response_obj, elapsed_ms + return response_obj, elapsed_ms, hash(json_data), my_payload['ts'] # Generate a random interval using a Poisson distribution def poisson_interval(rate): @@ -163,7 +165,7 @@ def make_payload_dist(dist_type, min_size, max_size): # Check if min and max packet sizes are the same if min_size == max_size: G_LOGGER.warning('Packet size is constant: min_size=max_size=%d' %min_size) - return make_payload(min_size) + return make_payload(min_size), min_size # Payload sizes are even integers uniformly distributed in [min_size, max_size] if dist_type == 'uniform': @@ -173,7 +175,7 @@ def make_payload_dist(dist_type, min_size, max_size): while(size % 2) != 0: size = int(random.uniform(min_size, max_size)) - return make_payload(size) + return make_payload(size), size # Payload sizes are even integers ~"normally" distributed in [min_size, max_size] if dist_type == 'gaussian': @@ -185,11 +187,11 @@ def make_payload_dist(dist_type, min_size, max_size): while(size % 2) != 0: size = int(rtnorm.rtnorm(min_size, max_size, sigma=σ, mu=μ, size=1)) - return make_payload(size) + return make_payload(size), size G_LOGGER.error('Unknown distribution type %s') - return '0x00' + return '0x00', 0 def parse_targets(enclave_dump_path, waku_port=8545): @@ -354,13 +356,10 @@ def main(): G_LOGGER.info('Selected %d emitters out of %d total nodes' %(len(emitters), len(targets))) """ Start simulation """ - stats = {} - msg_cnt = 0 - failed_cnt = 0 - bytes_cnt = 0 s_time = time.time() last_msg_time = 0 next_time_to_msg = 0 + msgs_dict = {} G_LOGGER.info('Starting a simulation of %d seconds ...' %config['general']['simulation_time']) @@ -369,7 +368,7 @@ def main(): # Check end condition elapsed_s = time.time() - s_time if elapsed_s >= config['general']['simulation_time']: - G_LOGGER.info('Simulation ended. Sent %d messages (%d bytes) in %ds.' %(msg_cnt, bytes_cnt, elapsed_s)) + G_LOGGER.info('Simulation ended. Sent %d messages in %ds.' %(len(msgs_dict), elapsed_s)) break # Send message @@ -392,15 +391,14 @@ def main(): G_LOGGER.info('Injecting message of topic %s to network through Waku node %s ...' %(emitter_topic, node_address)) - payload = make_payload_dist(dist_type=config['general']['dist_type'].lower(), min_size=config['general']['min_packet_size'], max_size=config['general']['max_packet_size']) - response, elapsed = send_waku_msg(node_address, topic=emitter_topic, payload=payload, nonce=msg_cnt) - + payload, size = make_payload_dist(dist_type=config['general']['dist_type'].lower(), min_size=config['general']['min_packet_size'], max_size=config['general']['max_packet_size']) + response, elapsed, msg_hash, ts = send_waku_msg(node_address, topic=emitter_topic, payload=payload, nonce=len(msgs_dict)) + if response['result']: - msg_cnt += 1 - topics_msg_cnt[emitter_topic] += 1 - else: - G_LOGGER.info('Message failed!') - failed_cnt += 1 + if msg_hash in msgs_dict: + G_LOGGER.error('Hash collision. %s already exists in dictionary' %msg_hash) + continue + msgs_dict[msg_hash] = {'ts' : ts, 'injection_point' : node_address, 'nonce' : len(msgs_dict), 'topic' : emitter_topic, 'payload' : payload, 'payload_size' : size} # Compute the time to next message next_time_to_msg = get_next_time_to_msg(config['general']['inter_msg_type'], config['general']['msg_rate'], config['general']['simulation_time']) @@ -411,32 +409,32 @@ def main(): elapsed_s = time.time() - s_time # Retrieve messages from every node and topic - G_LOGGER.info('Retriving messages from the enclave ...') - for node_idx, target in enumerate(targets): - node_address = 'http://%s/' %target + # G_LOGGER.info('Retriving messages from the enclave ...') + # for node_idx, target in enumerate(targets): + # node_address = 'http://%s/' %target - for topic_idx, topic in enumerate(topics[node_idx]): - msg_cnt = get_all_messages_from_node_from_topic(node_address, topic) - msg_lost = topics_msg_cnt[topic] - msg_cnt - G_LOGGER.info('- Retrieved %d messages on topic %s from node %s. Lost %d message(s).' %(msg_cnt, topic, node_address, msg_lost)) + # for topic_idx, topic in enumerate(topics[node_idx]): + # msg_cnt = get_all_messages_from_node_from_topic(node_address, topic) + # msg_lost = topics_msg_cnt[topic] - msg_cnt + # G_LOGGER.info('- Retrieved %d messages on topic %s from node %s. Lost %d message(s).' %(msg_cnt, topic, node_address, msg_lost)) # Output - summary = { - "end_ts" : time.time(), - "params" : config['general'], - "topics" : list(topics_msg_cnt.keys()), - "topics_msg_cnt" : topics_msg_cnt, - "simulation_time" : elapsed_s, - "total_messages" : msg_cnt, - "avg_latency" : 0, - "max_latency" : 0, - "min_latency" : 0 - } + # summary = { + # "end_ts" : time.time(), + # "params" : config['general'], + # "topics" : list(topics_msg_cnt.keys()), + # "topics_msg_cnt" : topics_msg_cnt, + # "simulation_time" : elapsed_s, + # "total_messages" : len() + # } - G_LOGGER.info('Simulation sumnmary: %s' %summary) + # G_LOGGER.info('Simulation sumnmary: %s' %summary) - with open('./summary.json', 'w') as summary_file: - summary_file.write(json.dumps(summary, indent=4)) + # with open('./summary.json', 'w') as summary_file: + # summary_file.write(json.dumps(summary, indent=4)) + + with open('./messages.json', 'w') as f: + f.write(json.dumps(msgs_dict, indent=4)) """ We are done """ G_LOGGER.info('Ended')