Merge pull request #55 from logos-co/WSL-WakuStoreRecovery

Wsl waku store recovery
This commit is contained in:
Daimakaimura 2023-01-30 10:32:39 +00:00 committed by GitHub
commit d8ac72f9a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 214 additions and 44 deletions

7
.gitignore vendored
View File

@ -3,6 +3,7 @@
*.iws
*.iml
*.ipr
*.log
# default network gen directory
WakuNetwork
@ -18,4 +19,10 @@ enclave.dump/
wsl-module/requirements.txt
wsl-module/__pycache__
config/network_topology_auto/
config/config.json
gennet-module/topology/
kurtosis_log.txt
config/network_topology/
config/topology_generated/
config/waku_config_files/
kurtosisrun_log.txt

View File

@ -11,11 +11,11 @@ rm kurtosis-cli_0.64.2_linux_amd64.tar.gz
# Build WSL and Gennet docker image
cd wsl-module
docker build -t gennet .
docker build -t wsl:0.0.1 .
cd ..
cd gennet-module
docker build -t wsl:0.0.1 .
docker build -t gennet .
cd ..
git clone git@github.com:waku-org/go-waku.git

View File

@ -1,4 +1,5 @@
topics="test test2"
rpc-admin=true
keep-alive=true
metrics-server=true
metrics-server=true
store=true

View File

@ -30,7 +30,7 @@ nodeDockerImageSwitch = {
}
NODES = [nodeType.NWAKU, nodeType.GOWAKU]
NODE_PROBABILITIES = (50, 50)
NODE_PROBABILITIES = (100, 0)
# To add a new network type, add appropriate entries to the networkType and networkTypeSwitch
# the networkTypeSwitch is placed before generate_network(): fwd declaration mismatch with typer/python :/

View File

@ -12,8 +12,8 @@ NODE_CONFIG_FILE_LOCATION = "github.com/logos-co/wakurtosis/config/topology_gene
CONTAINER_NODE_CONFIG_FILE_LOCATION = "/node/configuration_file/"
NODE_CONFIGURATION_FILE_EXTENSION = ".toml"
NWAKU_ENTRYPOINT = ["/usr/bin/wakunode", "--rpc-address=0.0.0.0", "--metrics-server-address=0.0.0.0"]
GOWAKU_ENTRYPOINT = ["/usr/bin/waku", "--rpc-address=0.0.0.0", "--metrics-server-address=0.0.0.0"]
NWAKU_ENTRYPOINT = ["/usr/bin/wakunode", "--rpc-address=0.0.0.0", "--metrics-server-address=0.0.0.0", "--store=true", "--storenode=/dns4/node_0"]
GOWAKU_ENTRYPOINT = ["/usr/bin/waku", "--rpc-address=0.0.0.0", "--metrics-server-address=0.0.0.0", "--store=true", "--storenode=/dns4/node_0"]
# Prometheus Configuration
PROMETHEUS_IMAGE = "prom/prometheus:latest"
@ -49,6 +49,7 @@ WSL_IMAGE = "wsl:0.0.1"
WSL_SERVICE_ID = "wsl"
WSL_CONFIG_PATH = "/wsl/config"
WSL_TARGETS_PATH = "/wsl/targets"
WSL_TOMLS_PATH = "/wsl/tomls"
CONTAINER_WSL_CONFIGURATION_FILE_NAME = "wsl.yml"
CONTAINER_TARGETS_FILE_NAME_WSL = "targets.json"

View File

@ -46,26 +46,26 @@ def get_wsl_template():
prng_seed : 0
# Simulation time in seconds
simulation_time : 1000
simulation_time : {{.simulation_time}}
# Message rate in messages per second
msg_rate : 10
msg_rate : {{.message_rate}}
# Packet size in bytes
min_packet_size : 2
max_packet_size : 1024
min_packet_size : {{.min_packet_size}}
max_packet_size : {{.max_packet_size}}
# Packe size distribution
# Values: uniform and gaussian
dist_type : "gaussian"
dist_type : {{.dist_type}}
# Fraction (of the total number of nodes) that inject traffic
# Values: [0., 1.]
emitters_fraction : 0.5
emitters_fraction : {{.emitters_fraction}}
# Inter-message times
# Values: uniform and poisson
inter_msg_type : "uniform"
"""
inter_msg_type : {{.inter_msg_type}}
"""
return wsl_yml_template

View File

@ -49,6 +49,11 @@ def init(plan, services, wsl_config):
# Generate simulation config
wsl_config = create_config(plan, wsl_config)
tomls_artifact = plan.upload_files(
src = system_variables.NODE_CONFIG_FILE_LOCATION,
name = "tomls_artifact",
)
# Create targets.json
wsl_targets = create_targets(plan, services)
@ -60,6 +65,7 @@ def init(plan, services, wsl_config):
files={
system_variables.WSL_CONFIG_PATH : wsl_config,
system_variables.WSL_TARGETS_PATH : wsl_targets,
system_variables.WSL_TOMLS_PATH : tomls_artifact
},
cmd=["python3", "wsl.py"]

View File

@ -1,3 +1,6 @@
#!/bin/sh
# pip freeze > requirements.txt
image_id=$(docker images -q wsl:0.0.1)
echo $image_id
docker image rm -f $image_id
docker image build --progress=plain -t wsl:0.0.1 ./

View File

@ -1,6 +1,7 @@
asyncio==3.4.3
certifi==2022.12.7
charset-normalizer==2.1.1
click==8.1.3
contourpy==1.0.6
cycler==0.11.0
fonttools==4.38.0
@ -18,5 +19,7 @@ PyYAML==6.0
requests==2.28.1
scipy==1.10.0
six==1.16.0
toml==0.10.2
tqdm==4.64.1
typer==0.7.0
urllib3==1.26.13

View File

@ -5,7 +5,7 @@ Description: Wakurtosis load simulator
"""
""" Dependencies """
import sys, logging, yaml, json, time, random, os, argparse
import sys, logging, yaml, json, time, random, os, argparse, tomllib, glob
import requests
import rtnorm
# from pathlib import Path
@ -17,7 +17,7 @@ import rtnorm
""" Globals """
G_APP_NAME = 'WLS'
G_LOG_LEVEL = 'DEBUG'
g_DEFAULT_CONFIG_FILE = './config/wsl.yml'
G_DEFAULT_CONFIG_FILE = './config/wsl.yml'
G_LOGGER = None
""" Custom logging formatter """
@ -43,8 +43,8 @@ def check_waku_node(node_address):
data = {
'jsonrpc': '2.0',
# 'method': 'get_waku_v2_debug_v1_info',
'method' : 'get_waku_v2_debug_v1_version',
'method': 'get_waku_v2_debug_v1_info',
# 'method' : 'get_waku_v2_debug_v1_version',
'id': 1,
'params' : []}
@ -66,12 +66,68 @@ def check_waku_node(node_address):
return True
def get_waku_msgs(node_address, topic, cursor=None):
data = {
'jsonrpc': '2.0',
'method': 'get_waku_v2_store_v1_messages',
'id': 1,
'params' : [topic, None, None, None, {"pageSize": 100, "cursor": cursor,"forward": True}]
}
G_LOGGER.debug('Waku RPC: %s from %s' %(data['method'], node_address))
s_time = time.time()
response = requests.post(node_address, data=json.dumps(data), headers={'content-type': 'application/json'})
elapsed_ms =(time.time() - s_time) * 1000
response_obj = response.json()
# G_LOGGER.debug('Response from %s: %s [%.4f ms.]' %(node_address, response_obj, elapsed_ms))
return response_obj, elapsed_ms
# https://rfc.vac.dev/spec/16/#get_waku_v2_relay_v1_messages
def get_last_waku_msgs(node_address, topic):
data = {
'jsonrpc': '2.0',
'method': 'get_waku_v2_relay_v1_messages',
'id': 1,
'params' : [topic]}
G_LOGGER.debug('Waku RPC: %s from %s' %(data['method'], node_address))
s_time = time.time()
response = requests.post(node_address, data=json.dumps(data), headers={'content-type': 'application/json'})
elapsed_ms =(time.time() - s_time) * 1000
response_obj = response.json()
# G_LOGGER.debug('Response from %s: %s [%.4f ms.]' %(node_address, response_obj, elapsed_ms))
return response_obj, elapsed_ms
def send_waku_msg(node_address, topic, payload, nonce=1):
waku_msg = {
# waku_msg = {
# 'nonce' : nonce,
# 'timestamp' : time.time_ns(),
# 'payload' : payload}
my_payload = {
'nonce' : nonce,
'timestamp' : time.time_ns(),
'payload' : payload}
'payload' : payload
}
waku_msg = {
'payload' : json.dumps(my_payload).encode('utf-8').hex()
}
data = {
'jsonrpc': '2.0',
@ -79,7 +135,7 @@ def send_waku_msg(node_address, topic, payload, nonce=1):
'id': 1,
'params' : [topic, waku_msg]}
G_LOGGER.debug('Waku RPC: %s from %s' %(data['method'], node_address))
G_LOGGER.debug('Waku RPC: %s from %s Topic: %s' %(data['method'], node_address, topic))
s_time = time.time()
@ -164,6 +220,45 @@ def get_next_time_to_msg(inter_msg_type, msg_rate, simulation_time):
G_LOGGER.error('%s is not a valid inter_msg_type. Aborting.' %inter_msg_type)
sys.exit()
def get_all_messages_from_node_from_topic(node_address, topic):
page_cnt = 0
msg_cnt = 0
# Retrieve the first page
response, elapsed = get_waku_msgs(node_address, topic)
if 'error' in response:
G_LOGGER.error(response['error'])
return 0
messages = response['result']['messages']
msg_cnt += len(messages)
G_LOGGER.debug('Got page %d with %d messages from node %s and topic: %s' %(page_cnt, len(messages), node_address, topic))
for msg_idx, msg in enumerate(messages):
# Decode the payload
payload_obj = json.loads(''.join(map(chr, msg['payload'])))
# Retrieve further pages
while(response['result']['pagingOptions']):
page_cnt += 1
cursor = response['result']['pagingOptions']['cursor']
index = {"digest" : cursor['digest'], "receivedTime" : cursor['receiverTime']}
response, elapsed = get_waku_msgs(node_address, topic, cursor)
if 'error' in response:
G_LOGGER.error(response['error'])
break
messages = response['result']['messages']
msg_cnt += len(messages)
G_LOGGER.debug('Got page %d with %d messages from node %s and topic: %s' %(page_cnt, len(messages), node_address, topic))
for msg_idx, msg in enumerate(messages):
# Decode the payload
payload_obj = json.loads(''.join(map(chr, msg['payload'])))
return msg_cnt
def main():
global G_LOGGER
@ -178,7 +273,7 @@ def main():
""" Parse command line args. """
parser = argparse.ArgumentParser()
parser.add_argument("-cfg", "--config_file", help="Config file", action="store_true", default=g_DEFAULT_CONFIG_FILE)
parser.add_argument("-cfg", "--config_file", help="Config file", action="store_true", default=G_DEFAULT_CONFIG_FILE)
args = parser.parse_args()
config_file = args.config_file
@ -223,18 +318,45 @@ def main():
sys.exit(1)
G_LOGGER.info('All %d Waku nodes are reachable.' %len(targets))
""" Load Topics """
topics = []
try:
tomls = glob.glob('./tomls/*.toml')
tomls.sort()
for toml_file in tomls:
with open(toml_file, mode='rb') as read_file:
toml_config = tomllib.load(read_file)
node_topics_str = toml_config['topics']
topics.append(list(node_topics_str.split(' ')))
except Exception as e:
G_LOGGER.error('%s: %s' % (e.__doc__, e))
sys.exit()
# Dictionary to count messages of every topic being sent
topics_msg_cnt = {}
for node_topics in topics:
for topic in node_topics:
topics_msg_cnt[topic] = 0
G_LOGGER.info('Loaded nodes topics from toml files: %s' %topics_msg_cnt.keys())
""" Define the subset of emitters """
num_emitters = int(len(targets) * config['general']['emitters_fraction'])
if num_emitters == 0:
G_LOGGER.error('The number of emitters must be greater than zero. Try increasing the fraction of emitters.')
sys.exit()
emitters = random.sample(targets, num_emitters)
""" NOTE: Emitters will only inject topics they are subscribed to """
emitters_indices = random.sample(range(len(targets)), num_emitters)
emitters = [targets[i] for i in emitters_indices]
emitters_topics = [topics[i] for i in emitters_indices]
# emitters = random.sample(targets, num_emitters)
G_LOGGER.info('Selected %d emitters out of %d total nodes' %(len(emitters), len(targets)))
""" Start simulation """
stats = {}
msg_cnt = 0
failed_cnt = 0
bytes_cnt = 0
s_time = time.time()
last_msg_time = 0
@ -258,37 +380,64 @@ def main():
G_LOGGER.debug('Time Δ: %.6f ms.' %((msg_elapsed - next_time_to_msg) * 1000.0))
# Reference: https://rfc.vac.dev/spec/16/#get_waku_v2_relay_v1_messages
node_address = 'http://%s/' %random.choice(emitters)
# Pick an emitter at random from the emitters list
emitter_idx = random.choice(emitters_indices)
node_address = 'http://%s/' %emitters[emitter_idx]
G_LOGGER.info('Injecting message to network through Waku node %s ...' %node_address)
emitter_topics = emitters_topics[emitter_idx]
# Pick a topic at random from the topics supported by the emitter
emitter_topic = random.choice(emitter_topics)
G_LOGGER.info('Injecting message of topic %s to network through Waku node %s ...' %(emitter_topic, node_address))
payload = make_payload_dist(dist_type=config['general']['dist_type'].lower(), min_size=config['general']['min_packet_size'], max_size=config['general']['max_packet_size'])
response, elapsed = send_waku_msg(node_address, topic='test', payload=payload)
# # Keep track of basic stats
# if response['result']:
# if node_address in stats:
# stats[node_address]['msg_cnt'] += 1
# stats[node_address]['msg_sent'] += 1
# else:
# stats[node_address] = { 'msg_cnt' = 1 }
# stats[node_address]['msg_sent'] += 1
response, elapsed = send_waku_msg(node_address, topic=emitter_topic, payload=payload, nonce=msg_cnt)
if response['result']:
msg_cnt += 1
topics_msg_cnt[emitter_topic] += 1
else:
G_LOGGER.info('Message failed!')
failed_cnt += 1
# else:
# G_LOGGER.error('RPC Message failed to node_address')
# Compute the time to next message
next_time_to_msg = get_next_time_to_msg(config['general']['inter_msg_type'], config['general']['msg_rate'], config['general']['simulation_time'])
G_LOGGER.debug('Next message will happen in %d ms.' %(next_time_to_msg * 1000.0))
last_msg_time = time.time()
msg_cnt += 1
# Pull messages
# get_waku_v2_relay_v1_messagesget_waku_v2_relay_v1_messages
elapsed_s = time.time() - s_time
# Retrieve messages from every node and topic
G_LOGGER.info('Retriving messages from the enclave ...')
for node_idx, target in enumerate(targets):
node_address = 'http://%s/' %target
for topic_idx, topic in enumerate(topics[node_idx]):
msg_cnt = get_all_messages_from_node_from_topic(node_address, topic)
msg_lost = topics_msg_cnt[topic] - msg_cnt
G_LOGGER.info('- Retrieved %d messages on topic %s from node %s. Lost %d message(s).' %(msg_cnt, topic, node_address, msg_lost))
# Output
summary = {
"end_ts" : time.time(),
"params" : config['general'],
"topics" : list(topics_msg_cnt.keys()),
"topics_msg_cnt" : topics_msg_cnt,
"simulation_time" : elapsed_s,
"total_messages" : msg_cnt,
"avg_latency" : 0,
"max_latency" : 0,
"min_latency" : 0
}
G_LOGGER.info('Simulation sumnmary: %s' %summary)
with open('./summary.json', 'w') as summary_file:
summary_file.write(json.dumps(summary, indent=4))
""" We are done """
G_LOGGER.info('Ended')