chore: Update rest traffic timestamp to ns (#115)

* Update rest traffic timestamp to ns and improve logging

* Improve send time waiting

* Fix typo for arguments description
This commit is contained in:
Tanya S 2025-10-23 08:43:49 +02:00 committed by GitHub
parent 479b908c42
commit 034c60e660
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,22 +1,23 @@
import requests import requests
import time import time
import datetime
import os import os
import base64 import base64
import urllib.parse import urllib.parse
import requests
import argparse import argparse
import re import re
import logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s.%(msecs)03d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
def send_waku_msg(node_address, kbytes, pubsub_topic, content_topic): def send_waku_msg(node_address, kbytes, pubsub_topic, content_topic):
# TODO dirty trick .replace("=", "") # TODO dirty trick .replace("=", "")
base64_payload = (base64.b64encode(os.urandom(kbytes*1000)).decode('ascii')).replace("=", "") base64_payload = (base64.b64encode(os.urandom(kbytes*1000)).decode('ascii')).replace("=", "")
print("size message kBytes", len(base64_payload) *(3/4)/1000, "KBytes") logging.info("size message kBytes %.3f KBytes", len(base64_payload) *(3/4)/1000)
body = { body = {
"payload": base64_payload, "payload": base64_payload,
"contentTopic": content_topic, "contentTopic": content_topic,
"version": 1, # You can adjust the version as needed "version": 1, # You can adjust the version as needed
"timestamp": int(time.time()) "timestamp": int(time.time() * 1_000_000_000) # use nanoseconds to match nwaku node time unit
} }
encoded_pubsub_topic = urllib.parse.quote(pubsub_topic, safe='') encoded_pubsub_topic = urllib.parse.quote(pubsub_topic, safe='')
@ -24,23 +25,20 @@ def send_waku_msg(node_address, kbytes, pubsub_topic, content_topic):
url = f"{node_address}/relay/v1/messages/{encoded_pubsub_topic}" url = f"{node_address}/relay/v1/messages/{encoded_pubsub_topic}"
headers = {'content-type': 'application/json'} headers = {'content-type': 'application/json'}
readable_time = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] logging.info('Waku REST API: %s PubSubTopic: %s, ContentTopic: %s', url, pubsub_topic, content_topic)
print('[%s] Waku REST API: %s PubSubTopic: %s, ContentTopic: %s' % (readable_time, url, pubsub_topic, content_topic))
s_time = time.time() s_time = time.time()
response = None response = None
readable_time = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
try: try:
print('[%s] Sending request' % readable_time) logging.info('Sending request')
response = requests.post(url, json=body, headers=headers) response = requests.post(url, json=body, headers=headers)
except Exception as e: except Exception as e:
print(f"Error sending request: {e}") logging.error("Error sending request: %s", e)
if(response != None): if(response != None):
elapsed_ms = (time.time() - s_time) * 1000 elapsed_ms = (time.time() - s_time) * 1000
readable_time = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] logging.info('Response from %s: status:%s content:%s [%.4f ms.]', node_address,
print('[%s] Response from %s: status:%s content:%s [%.4f ms.]' % (readable_time, node_address, \ response.status_code, response.text, elapsed_ms)
response.status_code, response.text, elapsed_ms))
parser = argparse.ArgumentParser(description='') parser = argparse.ArgumentParser(description='')
@ -53,13 +51,13 @@ group.add_argument('-mn', '--multiple-nodes', type=str, help='example: http://wa
parser.add_argument('-c', '--content-topic', type=str, help='content topic', default="my-ctopic") parser.add_argument('-c', '--content-topic', type=str, help='content topic', default="my-ctopic")
parser.add_argument('-p', '--pubsub-topic', type=str, help='pubsub topic', default="/waku/2/rs/66/0") parser.add_argument('-p', '--pubsub-topic', type=str, help='pubsub topic', default="/waku/2/rs/66/0")
parser.add_argument('-s', '--msg-size-kbytes', type=int, help='message size in kBytes', default=10) parser.add_argument('-s', '--msg-size-kbytes', type=int, help='message size in kBytes', default=10)
parser.add_argument('-d', '--delay-seconds', type=int, help='delay in second between messages', required=15) parser.add_argument('-d', '--delay-seconds', type=int, help='delay in seconds between messages', default=15)
args = parser.parse_args() args = parser.parse_args()
print(args) logging.info("Arguments: %s", args)
if args.single_node != None: if args.single_node != None:
print("Injecting traffic to single node REST API:", args.single_node) logging.info("Injecting traffic to single node REST API: %s", args.single_node)
# this simply converts from http://url_[1..5]:port to # this simply converts from http://url_[1..5]:port to
# [http://url_1:port or from http://url-[1..5]:port to # [http://url_1:port or from http://url-[1..5]:port to
@ -69,20 +67,21 @@ if args.multiple_nodes:
start, end = (int(x) for x in re.search(r"\[(\d+)\.\.(\d+)\]", args.multiple_nodes).groups()) start, end = (int(x) for x in re.search(r"\[(\d+)\.\.(\d+)\]", args.multiple_nodes).groups())
if start is None or end is None: if start is None or end is None:
print("Could not parse range of multiple_nodes argument") logging.error("Could not parse range of multiple_nodes argument")
exit exit
print("Injecting traffic to multiple nodes REST APIs") logging.info("Injecting traffic to multiple nodes REST APIs")
for i in range(end, start - 1, -1): for i in range(end, start - 1, -1):
nodes.append(re.sub(r"\[\d+\.\.\d+\]", str(i), args.multiple_nodes)) nodes.append(re.sub(r"\[\d+\.\.\d+\]", str(i), args.multiple_nodes))
for node in nodes: for node in nodes:
print(node) logging.info("Node: %s", node)
while True: while True:
# calls are blocking # calls are blocking
# limited by the time it takes the REST API to reply # limited by the time it takes the REST API to reply
send_start_time = time.time()
if args.single_node != None: if args.single_node != None:
send_waku_msg(args.single_node, args.msg_size_kbytes, args.pubsub_topic, args.content_topic) send_waku_msg(args.single_node, args.msg_size_kbytes, args.pubsub_topic, args.content_topic)
@ -90,6 +89,16 @@ while True:
for node in nodes: for node in nodes:
send_waku_msg(node, args.msg_size_kbytes, args.pubsub_topic, args.content_topic) send_waku_msg(node, args.msg_size_kbytes, args.pubsub_topic, args.content_topic)
readable_time = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] send_elapsed = time.time() - send_start_time
print('[%s] sleeping: %s seconds' % (readable_time, args.delay_seconds)) if args.multiple_nodes != None:
time.sleep(args.delay_seconds) logging.info("Time taken to send to all nodes: %.4f seconds", send_elapsed)
else:
logging.info("Time taken to send to single node: %.4f seconds", send_elapsed)
time_to_sleep = args.delay_seconds - send_elapsed
if time_to_sleep > 0:
logging.info("Sleeping %.4f seconds to maintain delay of %d seconds between rounds", time_to_sleep, args.delay_seconds)
else:
logging.info("No sleep needed to maintain delay of %d seconds between rounds", args.delay_seconds)
time.sleep(max(0, time_to_sleep))