More refactor and PEP-8

This commit is contained in:
Alberto Soutullo 2023-03-04 18:05:38 +01:00 committed by Alberto Soutullo Rendo
parent 1236ae679b
commit 4edd198793
3 changed files with 84 additions and 56 deletions

View File

@ -37,3 +37,12 @@ def load_topology(topology_file):
G_LOGGER.info('%d topology loaded' % len(topology))
return topology
def save_messages_to_json(msgs_dict):
# Save messages for further analysis
with open('./messages.json', 'w') as f:
f.write(json.dumps(msgs_dict, indent=4))
""" We are done """
G_LOGGER.info('Ended')

View File

@ -82,4 +82,3 @@ def get_next_time_to_msg(inter_msg_type, msg_rate, simulation_time):
G_LOGGER.error('%s is not a valid inter_msg_type. Aborting.' % inter_msg_type)
sys.exit()

View File

@ -1,7 +1,6 @@
# Python Imports
import argparse
import hashlib
import json
import random
import sys
import time
@ -60,7 +59,8 @@ def get_random_emitters(topology, wls_config):
if num_emitters == 0:
G_LOGGER.error(
'The number of emitters must be greater than zero. Try increasing the fraction of emitters.')
'The number of emitters must be greater than zero. '
'Try increasing the fraction of emitters.')
sys.exit()
random_emitters = dict(random.sample(list(nodes.items()), num_emitters))
@ -69,81 +69,101 @@ def get_random_emitters(topology, wls_config):
return random_emitters
def _is_simulation_finished(start_time, wls_config, msgs_dict):
# Check end condition
elapsed_s = time.time() - start_time
if elapsed_s >= wls_config['simulation_time']:
G_LOGGER.info(f"Simulation ended. Sent {len(msgs_dict)} messages in {elapsed_s}.")
return True
return False
def _time_to_send_next_message(last_msg_time, next_time_to_msg):
# Send message
# BUG: There is a constant discrepancy.
# The average number of messages sent by time interval is slightly less than expected
msg_elapsed = time.time() - last_msg_time
if msg_elapsed <= next_time_to_msg:
return False
G_LOGGER.debug(f"Time Δ: {(msg_elapsed - next_time_to_msg) * 1000.0:6f}ms.")
return True
def _select_emitter_and_topic(random_emitters):
# Pick an emitter at random from the emitters list
random_emitter, random_emitter_info = random.choice(list(random_emitters.items()))
emitter_address = f"http://{random_emitter_info['ip_address']}:" \
f"{random_emitter_info['ports']['rpc_' + random_emitter][0]}/"
emitter_topics = random_emitter_info["topics"]
# Pick a topic at random from the topics supported by the emitter
emitter_topic = random.choice(emitter_topics)
G_LOGGER.info(f"Injecting message of topic {emitter_topic} to network "
f"through Waku node {emitter_address} ...")
return emitter_address, emitter_topic
def _inyect_message(emitter_address, emitter_topic, msgs_dict, wls_config):
payload, size = payloads.make_payload_dist(dist_type=wls_config['dist_type'].lower(),
min_size=wls_config['min_packet_size'],
max_size=wls_config['max_packet_size'])
response, elapsed, waku_msg, ts = waku_messaging.send_msg_to_node(emitter_address,
topic=emitter_topic,
payload=payload,
nonce=len(msgs_dict))
if response['result']:
msg_hash = hashlib.sha256(waku_msg.encode('utf-8')).hexdigest()
if msg_hash in msgs_dict:
G_LOGGER.error(f"Hash collision. {msg_hash} already exists in dictionary")
raise RuntimeWarning
msgs_dict[msg_hash] = {'ts': ts, 'injection_point': emitter_address,
'nonce': len(msgs_dict), 'topic': emitter_topic,
'payload': payload, 'payload_size': size}
def start_traffic_inyection(wls_config, random_emitters):
""" Start simulation """
s_time = time.time()
start_time = time.time()
last_msg_time = 0
next_time_to_msg = 0
msgs_dict = {}
G_LOGGER.info('Starting a simulation of %d seconds ...' % wls_config['simulation_time'])
G_LOGGER.info(f"Starting a simulation of {wls_config['simulation_time']} seconds...")
while True:
# Check end condition
elapsed_s = time.time() - s_time
if elapsed_s >= wls_config['simulation_time']:
G_LOGGER.info(
'Simulation ended. Sent %d messages in %ds.' % (len(msgs_dict), elapsed_s))
if _is_simulation_finished(start_time, wls_config, msgs_dict):
break
# Send message
# BUG: There is a constant discrepancy. The average number of messages sent by time interval is slightly less than expected
msg_elapsed = time.time() - last_msg_time
if msg_elapsed <= next_time_to_msg:
if not _time_to_send_next_message(last_msg_time, next_time_to_msg):
continue
G_LOGGER.debug('Time Δ: %.6f ms.' % ((msg_elapsed - next_time_to_msg) * 1000.0))
emitter_address, emitter_topic = _select_emitter_and_topic(random_emitters)
# Pick an emitter at random from the emitters list
random_emitter, random_emitter_info = random.choice(list(random_emitters.items()))
emitter_address = f"http://{random_emitter_info['ip_address']}:{random_emitter_info['ports']['rpc_' + random_emitter][0]}/"
emitter_topics = random_emitter_info["topics"]
# Pick a topic at random from the topics supported by the emitter
emitter_topic = random.choice(emitter_topics)
G_LOGGER.info('Injecting message of topic %s to network through Waku node %s ...' % (
emitter_topic, emitter_address))
payload, size = payloads.make_payload_dist(dist_type=wls_config['dist_type'].lower(),
min_size=wls_config['min_packet_size'],
max_size=wls_config['max_packet_size'])
response, elapsed, waku_msg, ts = waku_messaging.send_msg_to_node(emitter_address, topic=emitter_topic,
payload=payload, nonce=len(msgs_dict))
if response['result']:
msg_hash = hashlib.sha256(waku_msg.encode('utf-8')).hexdigest()
if msg_hash in msgs_dict:
G_LOGGER.error('Hash collision. %s already exists in dictionary' % msg_hash)
continue
msgs_dict[msg_hash] = {'ts': ts, 'injection_point': emitter_address,
'nonce': len(msgs_dict), 'topic': emitter_topic,
'payload': payload, 'payload_size': size}
try:
_inyect_message(emitter_address, emitter_topic, msgs_dict, wls_config)
except RuntimeWarning:
continue
# Compute the time to next message
next_time_to_msg = waku_messaging.get_next_time_to_msg(wls_config['inter_msg_type'],
wls_config['message_rate'],
wls_config['simulation_time'])
wls_config['message_rate'],
wls_config['simulation_time'])
G_LOGGER.debug('Next message will happen in %d ms.' % (next_time_to_msg * 1000.0))
last_msg_time = time.time()
elapsed_s = time.time() - s_time
return msgs_dict
def save_messages(msgs_dict):
# Save messages for further analysis
with open('./messages.json', 'w') as f:
f.write(json.dumps(msgs_dict, indent=4))
""" We are done """
G_LOGGER.info('Ended')
def main():
args = parse_cli()
@ -168,7 +188,7 @@ def main():
msgs_dict = start_traffic_inyection(wls_config, random_emitters)
save_messages(msgs_dict)
files.save_messages_to_json(msgs_dict)
if __name__ == "__main__":