mirror of https://github.com/waku-org/nwaku.git
including UTC time in logs and logging timestamp (#2926)
This commit is contained in:
parent
a29eca77a0
commit
c1cdfffaf1
|
@ -7,6 +7,7 @@ import sys
|
|||
import urllib.parse
|
||||
import requests
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
from prometheus_client import Counter, Gauge, start_http_server
|
||||
|
||||
# Content topic where Sona messages are going to be sent
|
||||
|
@ -32,6 +33,12 @@ parser.add_argument('-t', '--health-threshold', type=int, help='consecutive succ
|
|||
args = parser.parse_args()
|
||||
|
||||
|
||||
# Logs message including current UTC time
|
||||
def log_with_utc(message):
|
||||
utc_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{utc_time} UTC] {message}")
|
||||
|
||||
|
||||
# Sends Sonda message. Returns True if successful, False otherwise
|
||||
def send_sonda_msg(rest_address, pubsub_topic, content_topic, timestamp):
|
||||
message = "Hi, I'm Sonda"
|
||||
|
@ -47,14 +54,14 @@ def send_sonda_msg(rest_address, pubsub_topic, content_topic, timestamp):
|
|||
url = f'{rest_address}/relay/v1/messages/{encoded_pubsub_topic}'
|
||||
headers = {'content-type': 'application/json'}
|
||||
|
||||
print(f'Waku REST API: {url} PubSubTopic: {pubsub_topic}, ContentTopic: {content_topic}')
|
||||
log_with_utc(f'Sending Sonda message via REST: {url} PubSubTopic: {pubsub_topic}, ContentTopic: {content_topic}, timestamp: {timestamp}')
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
response = requests.post(url, json=body, headers=headers, timeout=10)
|
||||
elapsed_seconds = time.time() - start_time
|
||||
|
||||
print(f'Response from {rest_address}: status:{response.status_code} content:{response.text} [{elapsed_seconds:.4f} s.]')
|
||||
log_with_utc(f'Response from {rest_address}: status:{response.status_code} content:{response.text} [{elapsed_seconds:.4f} s.]')
|
||||
|
||||
if response.status_code == 200:
|
||||
successful_sonda_msgs.inc()
|
||||
|
@ -62,7 +69,7 @@ def send_sonda_msg(rest_address, pubsub_topic, content_topic, timestamp):
|
|||
else:
|
||||
response.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
print(f'Error sending request: {e}')
|
||||
log_with_utc(f'Error sending request: {e}')
|
||||
|
||||
failed_sonda_msgs.inc()
|
||||
return False
|
||||
|
@ -74,7 +81,7 @@ def check_store_response(json_response, store_node, timestamp):
|
|||
# Check for the store node status code
|
||||
if json_response.get('statusCode') != 200:
|
||||
error = f"{json_response.get('statusCode')} {json_response.get('statusDesc')}"
|
||||
print(f'Failed performing store query {error}')
|
||||
log_with_utc(f'Failed performing store query {error}')
|
||||
failed_store_queries.labels(node=store_node, error=error).inc()
|
||||
consecutive_successful_responses.labels(node=store_node).set(0)
|
||||
|
||||
|
@ -83,7 +90,7 @@ def check_store_response(json_response, store_node, timestamp):
|
|||
messages = json_response.get('messages')
|
||||
# If there's no message in the response, increase counters and return
|
||||
if not messages:
|
||||
print("No messages in store response")
|
||||
log_with_utc("No messages in store response")
|
||||
empty_store_responses.labels(node=store_node).inc()
|
||||
consecutive_successful_responses.labels(node=store_node).set(0)
|
||||
return True
|
||||
|
@ -92,12 +99,12 @@ def check_store_response(json_response, store_node, timestamp):
|
|||
for message in messages:
|
||||
# If message field is missing in current message, continue
|
||||
if not message.get("message"):
|
||||
print("Could not retrieve message")
|
||||
log_with_utc("Could not retrieve message")
|
||||
continue
|
||||
|
||||
# If a message is found with the same timestamp as sonda message, increase counters and return
|
||||
if timestamp == message.get('message').get('timestamp'):
|
||||
print(f'Found Sonda message in store response node={store_node}')
|
||||
log_with_utc(f'Found Sonda message in store response node={store_node}')
|
||||
successful_store_queries.labels(node=store_node).inc()
|
||||
consecutive_successful_responses.labels(node=store_node).inc()
|
||||
return True
|
||||
|
@ -121,16 +128,16 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con
|
|||
s_time = time.time()
|
||||
|
||||
try:
|
||||
print(f'Sending store request to {store_node}')
|
||||
log_with_utc(f'Sending store request to {store_node}')
|
||||
response = requests.get(url, params=params)
|
||||
except Exception as e:
|
||||
print(f'Error sending request: {e}')
|
||||
log_with_utc(f'Error sending request: {e}')
|
||||
failed_store_queries.labels(node=store_node, error=str(e)).inc()
|
||||
consecutive_successful_responses.labels(node=store_node).set(0)
|
||||
return False
|
||||
|
||||
elapsed_seconds = time.time() - s_time
|
||||
print(f'Response from {rest_address}: status:{response.status_code} [{elapsed_seconds:.4f} s.]')
|
||||
log_with_utc(f'Response from {rest_address}: status:{response.status_code} [{elapsed_seconds:.4f} s.]')
|
||||
|
||||
if response.status_code != 200:
|
||||
failed_store_queries.labels(node=store_node, error=f'{response.status_code} {response.content}').inc()
|
||||
|
@ -141,7 +148,7 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con
|
|||
try:
|
||||
json_response = response.json()
|
||||
except Exception as e:
|
||||
print(f'Error parsing response JSON: {e}')
|
||||
log_with_utc(f'Error parsing response JSON: {e}')
|
||||
failed_store_queries.labels(node=store_node, error="JSON parse error").inc()
|
||||
consecutive_successful_responses.labels(node=store_node).set(0)
|
||||
return False
|
||||
|
@ -155,7 +162,7 @@ def send_store_query(rest_address, store_node, encoded_pubsub_topic, encoded_con
|
|||
|
||||
|
||||
def send_store_queries(rest_address, store_nodes, pubsub_topic, content_topic, timestamp):
|
||||
print(f'Sending store queries. nodes = {store_nodes}')
|
||||
log_with_utc(f'Sending store queries. nodes = {store_nodes} timestamp = {timestamp}')
|
||||
encoded_pubsub_topic = urllib.parse.quote(pubsub_topic, safe='')
|
||||
encoded_content_topic = urllib.parse.quote(content_topic, safe='')
|
||||
|
||||
|
@ -164,12 +171,12 @@ def send_store_queries(rest_address, store_nodes, pubsub_topic, content_topic, t
|
|||
|
||||
|
||||
def main():
|
||||
print(f'Running Sonda with args={args}')
|
||||
log_with_utc(f'Running Sonda with args={args}')
|
||||
|
||||
store_nodes = []
|
||||
if args.store_nodes is not None:
|
||||
store_nodes = [s.strip() for s in args.store_nodes.split(",")]
|
||||
print(f'Store nodes to query: {store_nodes}')
|
||||
log_with_utc(f'Store nodes to query: {store_nodes}')
|
||||
|
||||
# Start Prometheus HTTP server at port 8004
|
||||
start_http_server(8004)
|
||||
|
@ -181,7 +188,7 @@ def main():
|
|||
# Send Sonda message
|
||||
res = send_sonda_msg(node_rest_address, args.pubsub_topic, SONDA_CONTENT_TOPIC, timestamp)
|
||||
|
||||
print(f'sleeping: {args.delay_seconds} seconds')
|
||||
log_with_utc(f'sleeping: {args.delay_seconds} seconds')
|
||||
time.sleep(args.delay_seconds)
|
||||
|
||||
# Only send store query if message was successfully published
|
||||
|
|
Loading…
Reference in New Issue