Generates a dictionary of messages sent to be used by wls_analyse.py

This commit is contained in:
Jordi Arranz 2023-02-02 12:40:54 +00:00
parent 677898381c
commit f49a572d50
1 changed files with 39 additions and 41 deletions

View File

@ -121,7 +121,7 @@ def send_waku_msg(node_address, topic, payload, nonce=1):
my_payload = { my_payload = {
'nonce' : nonce, 'nonce' : nonce,
'timestamp' : time.time_ns(), 'ts' : time.time_ns(),
'payload' : payload 'payload' : payload
} }
@ -139,7 +139,9 @@ def send_waku_msg(node_address, topic, payload, nonce=1):
s_time = time.time() s_time = time.time()
response = requests.post(node_address, data=json.dumps(data), headers={'content-type': 'application/json'}) json_data = json.dumps(data)
response = requests.post(node_address, data=json_data, headers={'content-type': 'application/json'})
elapsed_ms =(time.time() - s_time) * 1000 elapsed_ms =(time.time() - s_time) * 1000
@ -147,7 +149,7 @@ def send_waku_msg(node_address, topic, payload, nonce=1):
G_LOGGER.debug('Response from %s: %s [%.4f ms.]' %(node_address, response_obj, elapsed_ms)) G_LOGGER.debug('Response from %s: %s [%.4f ms.]' %(node_address, response_obj, elapsed_ms))
return response_obj, elapsed_ms return response_obj, elapsed_ms, hash(json_data), my_payload['ts']
# Generate a random interval using a Poisson distribution # Generate a random interval using a Poisson distribution
def poisson_interval(rate): def poisson_interval(rate):
@ -163,7 +165,7 @@ def make_payload_dist(dist_type, min_size, max_size):
# Check if min and max packet sizes are the same # Check if min and max packet sizes are the same
if min_size == max_size: if min_size == max_size:
G_LOGGER.warning('Packet size is constant: min_size=max_size=%d' %min_size) G_LOGGER.warning('Packet size is constant: min_size=max_size=%d' %min_size)
return make_payload(min_size) return make_payload(min_size), min_size
# Payload sizes are even integers uniformly distributed in [min_size, max_size] # Payload sizes are even integers uniformly distributed in [min_size, max_size]
if dist_type == 'uniform': if dist_type == 'uniform':
@ -173,7 +175,7 @@ def make_payload_dist(dist_type, min_size, max_size):
while(size % 2) != 0: while(size % 2) != 0:
size = int(random.uniform(min_size, max_size)) size = int(random.uniform(min_size, max_size))
return make_payload(size) return make_payload(size), size
# Payload sizes are even integers ~"normally" distributed in [min_size, max_size] # Payload sizes are even integers ~"normally" distributed in [min_size, max_size]
if dist_type == 'gaussian': if dist_type == 'gaussian':
@ -185,11 +187,11 @@ def make_payload_dist(dist_type, min_size, max_size):
while(size % 2) != 0: while(size % 2) != 0:
size = int(rtnorm.rtnorm(min_size, max_size, sigma=σ, mu=μ, size=1)) size = int(rtnorm.rtnorm(min_size, max_size, sigma=σ, mu=μ, size=1))
return make_payload(size) return make_payload(size), size
G_LOGGER.error('Unknown distribution type %s') G_LOGGER.error('Unknown distribution type %s')
return '0x00' return '0x00', 0
def parse_targets(enclave_dump_path, waku_port=8545): def parse_targets(enclave_dump_path, waku_port=8545):
@ -354,13 +356,10 @@ def main():
G_LOGGER.info('Selected %d emitters out of %d total nodes' %(len(emitters), len(targets))) G_LOGGER.info('Selected %d emitters out of %d total nodes' %(len(emitters), len(targets)))
""" Start simulation """ """ Start simulation """
stats = {}
msg_cnt = 0
failed_cnt = 0
bytes_cnt = 0
s_time = time.time() s_time = time.time()
last_msg_time = 0 last_msg_time = 0
next_time_to_msg = 0 next_time_to_msg = 0
msgs_dict = {}
G_LOGGER.info('Starting a simulation of %d seconds ...' %config['general']['simulation_time']) G_LOGGER.info('Starting a simulation of %d seconds ...' %config['general']['simulation_time'])
@ -369,7 +368,7 @@ def main():
# Check end condition # Check end condition
elapsed_s = time.time() - s_time elapsed_s = time.time() - s_time
if elapsed_s >= config['general']['simulation_time']: if elapsed_s >= config['general']['simulation_time']:
G_LOGGER.info('Simulation ended. Sent %d messages (%d bytes) in %ds.' %(msg_cnt, bytes_cnt, elapsed_s)) G_LOGGER.info('Simulation ended. Sent %d messages in %ds.' %(len(msgs_dict), elapsed_s))
break break
# Send message # Send message
@ -392,15 +391,14 @@ def main():
G_LOGGER.info('Injecting message of topic %s to network through Waku node %s ...' %(emitter_topic, node_address)) G_LOGGER.info('Injecting message of topic %s to network through Waku node %s ...' %(emitter_topic, node_address))
payload = make_payload_dist(dist_type=config['general']['dist_type'].lower(), min_size=config['general']['min_packet_size'], max_size=config['general']['max_packet_size']) payload, size = make_payload_dist(dist_type=config['general']['dist_type'].lower(), min_size=config['general']['min_packet_size'], max_size=config['general']['max_packet_size'])
response, elapsed = send_waku_msg(node_address, topic=emitter_topic, payload=payload, nonce=msg_cnt) response, elapsed, msg_hash, ts = send_waku_msg(node_address, topic=emitter_topic, payload=payload, nonce=len(msgs_dict))
if response['result']: if response['result']:
msg_cnt += 1 if msg_hash in msgs_dict:
topics_msg_cnt[emitter_topic] += 1 G_LOGGER.error('Hash collision. %s already exists in dictionary' %msg_hash)
else: continue
G_LOGGER.info('Message failed!') msgs_dict[msg_hash] = {'ts' : ts, 'injection_point' : node_address, 'nonce' : len(msgs_dict), 'topic' : emitter_topic, 'payload' : payload, 'payload_size' : size}
failed_cnt += 1
# Compute the time to next message # Compute the time to next message
next_time_to_msg = get_next_time_to_msg(config['general']['inter_msg_type'], config['general']['msg_rate'], config['general']['simulation_time']) next_time_to_msg = get_next_time_to_msg(config['general']['inter_msg_type'], config['general']['msg_rate'], config['general']['simulation_time'])
@ -411,32 +409,32 @@ def main():
elapsed_s = time.time() - s_time elapsed_s = time.time() - s_time
# Retrieve messages from every node and topic # Retrieve messages from every node and topic
G_LOGGER.info('Retriving messages from the enclave ...') # G_LOGGER.info('Retriving messages from the enclave ...')
for node_idx, target in enumerate(targets): # for node_idx, target in enumerate(targets):
node_address = 'http://%s/' %target # node_address = 'http://%s/' %target
for topic_idx, topic in enumerate(topics[node_idx]): # for topic_idx, topic in enumerate(topics[node_idx]):
msg_cnt = get_all_messages_from_node_from_topic(node_address, topic) # msg_cnt = get_all_messages_from_node_from_topic(node_address, topic)
msg_lost = topics_msg_cnt[topic] - msg_cnt # msg_lost = topics_msg_cnt[topic] - msg_cnt
G_LOGGER.info('- Retrieved %d messages on topic %s from node %s. Lost %d message(s).' %(msg_cnt, topic, node_address, msg_lost)) # G_LOGGER.info('- Retrieved %d messages on topic %s from node %s. Lost %d message(s).' %(msg_cnt, topic, node_address, msg_lost))
# Output # Output
summary = { # summary = {
"end_ts" : time.time(), # "end_ts" : time.time(),
"params" : config['general'], # "params" : config['general'],
"topics" : list(topics_msg_cnt.keys()), # "topics" : list(topics_msg_cnt.keys()),
"topics_msg_cnt" : topics_msg_cnt, # "topics_msg_cnt" : topics_msg_cnt,
"simulation_time" : elapsed_s, # "simulation_time" : elapsed_s,
"total_messages" : msg_cnt, # "total_messages" : len()
"avg_latency" : 0, # }
"max_latency" : 0,
"min_latency" : 0
}
G_LOGGER.info('Simulation sumnmary: %s' %summary) # G_LOGGER.info('Simulation sumnmary: %s' %summary)
with open('./summary.json', 'w') as summary_file: # with open('./summary.json', 'w') as summary_file:
summary_file.write(json.dumps(summary, indent=4)) # summary_file.write(json.dumps(summary, indent=4))
with open('./messages.json', 'w') as f:
f.write(json.dumps(msgs_dict, indent=4))
""" We are done """ """ We are done """
G_LOGGER.info('Ended') G_LOGGER.info('Ended')