AYAHASSAN287 b40938b030
E2E_Tests_CI_FIX (#85)
* add draft test case

* Adding test test_filter_3_senders_1_receiver

* Adding test test_filter_3_senders_45_msg_1_receiver

* add test test_filter_50_subscribe_node

* Remove comments and enhance tests

* Increase node limit to 1000 connection

* making PR ready with 3 tests

* Making changes for go-waku to pass

* Adding changes to make go-waku tests pass

* Change max nodes to 20 when node_2 = go-waku

* Fix CI failure because of nwaku

* Increase wait time for filter request

* Decrease number of nodes to pass the CI

* Add more wait to pass CI

* time_limit increase  to 120 to pass CI

* add flag filter-subscription-timeout to 600 instead of default value 300

* Additional changes for CI failure

* remove filter = true from subscribed filter node

* increase nodes to 15 in test_filter_many_subscribed_nodes

* test_filter_3_senders_multiple_msg_1_receiver

increase message_num to 12

* change max-connections to 50 instead of 1000

* commenting max-connection and filter timeout for CI failure

* Revert timeout back to 20
2024-11-10 09:48:47 +02:00

539 lines
19 KiB
Python

import errno
import json
import os
import random
import re
import shutil
import string
import pytest
import requests
from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed
from src.node.api_clients.rest import REST
from src.node.docker_mananger import DockerManager
from src.env_vars import DOCKER_LOG_DIR
from src.data_storage import DS
from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS
logger = get_custom_logger(__name__)
def select_private_key(prv_keys, key_id):
for key in prv_keys:
if key.endswith(key_id):
return key
raise ValueError("No matching key was found")
def sanitize_docker_flags(input_flags):
output_flags = {}
for key, value in input_flags.items():
key = key.replace("_", "-")
output_flags[key] = value
return output_flags
@retry(stop=stop_after_delay(180), wait=wait_fixed(0.5), reraise=True)
def rln_credential_store_ready(creds_file_path, single_check=False):
if os.path.exists(creds_file_path):
return True
elif not single_check:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), creds_file_path)
return False
def peer_info2multiaddr(peer, is_nwaku=True):
if is_nwaku:
return peer["multiaddr"]
else:
return peer["multiaddrs"][0]
def peer_info2id(peer, is_nwaku=True):
return peer_info2multiaddr(peer, is_nwaku).split("/")[-1]
def multiaddr2id(multiaddr):
return multiaddr.split("/")[-1]
def resolve_sharding_flags(kwargs):
if "pubsub_topic" in kwargs:
pubsub_topic = kwargs["pubsub_topic"]
if not "cluster_id" in kwargs:
try:
if isinstance(pubsub_topic, list):
pubsub_topic = pubsub_topic[0]
cluster_id = pubsub_topic.split("/")[4]
logger.debug(f"Cluster id was resolved to: {cluster_id}")
kwargs["cluster_id"] = cluster_id
except Exception as ex:
raise Exception("Could not resolve cluster_id from pubsub_topic")
return kwargs
class WakuNode:
def __init__(self, docker_image, docker_log_prefix=""):
self._image_name = docker_image
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name)
self._container = None
logger.debug(f"WakuNode instance initialized with log path {self._log_path}")
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
def start(self, wait_for_node_sec=20, **kwargs):
logger.debug("Starting Node...")
self._docker_manager.create_network()
self._ext_ip = self._docker_manager.generate_random_ext_ip()
self._ports = self._docker_manager.generate_ports()
self._rest_port = self._ports[0]
self._tcp_port = self._ports[1]
self._websocket_port = self._ports[2]
self._discv5_port = self._ports[3]
self._metrics_port = self._ports[4]
self._api = REST(self._rest_port)
self._volumes = []
default_args = {
"listen-address": "0.0.0.0",
"rest": "true",
"rest-admin": "true",
"websocket-support": "true",
"log-level": "TRACE",
"rest-relay-cache-capacity": "100",
"websocket-port": self._websocket_port,
"rest-port": self._rest_port,
"tcp-port": self._tcp_port,
"discv5-udp-port": self._discv5_port,
"rest-address": "0.0.0.0",
"nat": f"extip:{self._ext_ip}",
"peer-exchange": "true",
"discv5-discovery": "true",
"cluster-id": DEFAULT_CLUSTER_ID,
"rln-creds-id": None,
"rln-creds-source": None,
"nodekey": self.generate_random_nodekey(),
# "max-connections": "50",
# "filter-subscription-timeout": "600",
}
if self.is_gowaku():
go_waku_args = {
"min-relay-peers-to-publish": "1",
"log-level": "DEBUG",
"rest-filter-cache-capacity": "50",
"peer-store-capacity": "10",
}
default_args.update(go_waku_args)
elif self.is_nwaku():
nwaku_args = {
"shard": "0",
"metrics-server": "true",
"metrics-server-address": "0.0.0.0",
"metrics-server-port": self._metrics_port,
"metrics-logging": "true",
}
default_args.update(nwaku_args)
else:
raise NotImplementedError("Not implemented for this node type")
if "remove_container" in kwargs:
remove_container = kwargs["remove_container"]
del kwargs["remove_container"]
else:
remove_container = True
kwargs = self.parse_peer_persistence_config(kwargs)
kwargs = resolve_sharding_flags(kwargs)
default_args.update(sanitize_docker_flags(kwargs))
rln_args, rln_creds_set, keystore_path = self.parse_rln_credentials(default_args, False)
default_args.pop("rln-creds-id", None)
default_args.pop("rln-creds-source", None)
default_args.pop("rln-keystore-prefix", None)
if rln_creds_set:
rln_credential_store_ready(keystore_path)
default_args.update(rln_args)
else:
logger.info(f"RLN credentials not set or credential store not available, starting without RLN")
logger.debug(f"Using volumes {self._volumes}")
self._container = self._docker_manager.start_container(
self._docker_manager.image,
ports=self._ports,
args=default_args,
log_path=self._log_path,
container_ip=self._ext_ip,
volumes=self._volumes,
remove_container=remove_container,
)
logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}")
DS.waku_nodes.append(self)
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
try:
self.ensure_ready(timeout_duration=wait_for_node_sec)
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
def register_rln(self, **kwargs):
logger.debug("Registering RLN credentials...")
self._docker_manager.create_network()
self._ext_ip = self._docker_manager.generate_random_ext_ip()
self._ports = self._docker_manager.generate_ports()
self._rest_port = self._ports[0]
self._api = REST(self._rest_port)
self._volumes = []
default_args = {"rln-creds-id": None, "rln-creds-source": None, "rln-relay-user-message-limit-registration": 100}
default_args.update(sanitize_docker_flags(kwargs))
rln_args, rln_creds_set, keystore_path = self.parse_rln_credentials(default_args, True)
if rln_creds_set:
self._container = self._docker_manager.start_container(
self._docker_manager.image, self._ports, rln_args, self._log_path, self._ext_ip, self._volumes
)
logger.debug(f"Executed container from image {self._image_name}. REST: {self._rest_port} to register RLN")
logger.debug(f"Waiting for keystore {keystore_path}")
try:
rln_credential_store_ready(keystore_path)
except Exception as ex:
logger.error(f"File {keystore_path} with RLN credentials did not become available in time {ex}")
raise
else:
logger.warn("RLN credentials not set, no action performed")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
try:
self._container.remove()
except:
pass
self._container = None
logger.debug("Container stopped.")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
if self._container:
logger.debug(f"Killing container with id {self._container.short_id}")
self._container.kill()
try:
self._container.remove()
except:
pass
self._container = None
logger.debug("Container killed.")
def restart(self):
if self._container:
logger.debug(f"Restarting container with id {self._container.short_id}")
self._container.restart()
def pause(self):
if self._container:
logger.debug(f"Pausing container with id {self._container.short_id}")
self._container.pause()
def unpause(self):
if self._container:
logger.debug(f"Unpause container with id {self._container.short_id}")
self._container.unpause()
def ensure_ready(self, timeout_duration=10):
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True)
def check_healthy(node=self):
self.health_response = node.health()
if self.health_response == b"Node is healthy":
logger.info("Node is healthy !!")
return
try:
self.health_response = json.loads(self.health_response)
except Exception as ex:
raise AttributeError(f"Unknown health response format {ex}")
if self.health_response.get("nodeHealth") != "Ready":
raise AssertionError("Waiting for the node health status: Ready")
for p in self.health_response.get("protocolsHealth"):
if p.get("Rln Relay") != "Ready":
raise AssertionError("Waiting for the Rln relay status: Ready")
logger.info("Node protocols are initialized !!")
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True)
def check_ready(node=self):
node.info_response = node.info()
logger.info("REST service is ready !!")
if self.is_nwaku():
check_healthy()
check_ready()
def get_id(self):
try:
return self.info_response["listenAddresses"][0].split("/")[-1]
except Exception as ex:
raise AttributeError(f"Could not find ID in the info call because of error: {str(ex)}")
def get_enr_uri(self):
try:
return self.info_response["enrUri"]
except Exception as ex:
raise AttributeError(f"Could not find enrUri in the info call because of error: {str(ex)}")
def get_multiaddr_with_id(self):
addresses = self.info_response.get("listenAddresses", [])
ws_address = next((addr for addr in addresses if "/ws" not in addr), None)
if ws_address:
identifier = ws_address.split("/p2p/")[-1]
new_address = f"{self.get_tcp_address()}/p2p/{identifier}"
return new_address
else:
raise AttributeError("No '/ws' address found")
def get_tcp_address(self):
return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}"
def info(self):
return self._api.info()
def health(self):
return self._api.health()
def get_peers(self):
return self._api.get_peers()
def add_peers(self, peers):
return self._api.add_peers(peers)
def set_relay_subscriptions(self, pubsub_topics):
return self._api.set_relay_subscriptions(pubsub_topics)
def set_relay_auto_subscriptions(self, content_topics):
return self._api.set_relay_auto_subscriptions(content_topics)
def delete_relay_subscriptions(self, pubsub_topics):
return self._api.delete_relay_subscriptions(pubsub_topics)
def delete_relay_auto_subscriptions(self, content_topics):
return self._api.delete_relay_auto_subscriptions(content_topics)
def send_relay_message(self, message, pubsub_topic):
return self._api.send_relay_message(message, pubsub_topic)
def send_relay_auto_message(self, message):
return self._api.send_relay_auto_message(message)
def send_light_push_message(self, payload):
return self._api.send_light_push_message(payload)
def get_relay_messages(self, pubsub_topic):
return self._api.get_relay_messages(pubsub_topic)
def get_relay_auto_messages(self, content_topic):
return self._api.get_relay_auto_messages(content_topic)
def set_filter_subscriptions(self, subscription):
return self._api.set_filter_subscriptions(subscription)
def update_filter_subscriptions(self, subscription):
return self._api.update_filter_subscriptions(subscription)
def delete_filter_subscriptions(self, subscription):
return self._api.delete_filter_subscriptions(subscription)
def delete_all_filter_subscriptions(self, request_id):
return self._api.delete_all_filter_subscriptions(request_id)
def ping_filter_subscriptions(self, request_id):
return self._api.ping_filter_subscriptions(request_id)
def get_filter_messages(self, content_topic, pubsub_topic=None):
return self._api.get_filter_messages(content_topic, pubsub_topic)
def get_store_messages(
self,
peer_addr=None,
include_data=None,
pubsub_topic=None,
content_topics=None,
start_time=None,
end_time=None,
hashes=None,
cursor=None,
page_size=None,
ascending=None,
store_v="v3",
**kwargs,
):
return self._api.get_store_messages(
peer_addr=peer_addr,
include_data=include_data,
pubsub_topic=pubsub_topic,
content_topics=content_topics,
start_time=start_time,
end_time=end_time,
hashes=hashes,
cursor=cursor,
page_size=page_size,
ascending=ascending,
store_v=store_v,
**kwargs,
)
def get_metrics(self):
if self.is_nwaku():
metrics = requests.get(f"http://localhost:{self._metrics_port}/metrics")
metrics.raise_for_status()
return metrics.content.decode("utf-8")
else:
pytest.skip(f"This method doesn't exist for node {self.type()}")
@property
def image(self):
return self._image_name
def type(self):
if self.is_nwaku():
return "nwaku"
elif self.is_gowaku():
return "gowaku"
else:
raise ValueError("Unknown node type!!!")
def is_nwaku(self):
return "nwaku" in self.image
def is_gowaku(self):
return "go-waku" in self.image
def parse_rln_credentials(self, default_args, is_registration):
rln_args = {}
keystore_path = None
rln_creds_source = default_args["rln-creds-source"]
selected_id = default_args["rln-creds-id"]
if rln_creds_source is None or selected_id is None:
logger.debug(f"RLN credentials were not set")
return rln_args, False, keystore_path
imported_creds = json.loads(rln_creds_source)
if len(imported_creds) < 4 or any(value is None for value in imported_creds.values()):
logger.warn(f"One or more of required RLN credentials were not set properly")
return rln_args, False, keystore_path
eth_private_key = select_private_key(imported_creds, selected_id)
cwd = os.getcwd()
if self.is_nwaku():
if is_registration:
rln_args.update(
{
"generateRlnKeystore": None,
"--execute": None,
"rln-relay-user-message-limit": default_args["rln-relay-user-message-limit-registration"],
}
)
else:
rln_args.update(
{
"rln-relay": "true",
}
)
if not default_args.get("rln-relay-dynamic") and not is_registration:
rln_args.update(
{
"rln-relay-cred-path": "/keystore/keystore.json",
"rln-relay-cred-password": imported_creds["rln-relay-cred-password"],
}
)
else:
rln_args.update(
{
"rln-relay-cred-path": "/keystore/keystore.json",
"rln-relay-cred-password": imported_creds["rln-relay-cred-password"],
"rln-relay-eth-client-address": imported_creds["rln-relay-eth-client-address"],
"rln-relay-eth-contract-address": imported_creds["rln-relay-eth-contract-address"],
"rln-relay-eth-private-key": imported_creds[eth_private_key],
}
)
keystore_path = cwd + "/keystore_" + default_args["rln-keystore-prefix"] + "_" + selected_id + "/keystore.json"
self._volumes.extend(
[
cwd + "/rln_tree_" + default_args["rln-keystore-prefix"] + "_" + selected_id + ":/etc/rln_tree",
cwd + "/keystore_" + default_args["rln-keystore-prefix"] + "_" + selected_id + ":/keystore",
]
)
else:
raise NotImplementedError("Not implemented for type other than Nim Waku ")
return rln_args, True, keystore_path
def parse_peer_persistence_config(self, kwargs):
if kwargs.get("peer_persistence") == "true":
if self.is_gowaku():
kwargs["persist_peers"] = kwargs["peer_persistence"]
del kwargs["peer_persistence"]
cwd = os.getcwd()
# Please note, as of now, peerdb is stored directly at / which is not shareable between containers.
# Volume related code is usable after https://github.com/waku-org/nwaku/issues/2792 would be resolved.
self._volumes.extend(
[
cwd + "/peerdb" + ":/shared",
]
)
shutil.rmtree(cwd + "/peerdb")
return kwargs
@property
def container(self):
return self._container
def generate_random_nodekey(self):
# Define the set of hexadecimal characters
hex_chars = string.hexdigits.lower()
# Generate a random 64-character string from the hex characters
random_key = "".join(random.choice(hex_chars) for _ in range(64))
return random_key
def search_waku_log_for_string(self, search_pattern, use_regex=False):
return self._docker_manager.search_log_for_keywords(self._log_path, [search_pattern], use_regex)
def check_waku_log_errors(self, whitelist=None):
keywords = LOG_ERROR_KEYWORDS
# If a whitelist is provided, remove those keywords from the keywords list
if whitelist:
keywords = [keyword for keyword in keywords if keyword not in whitelist]
matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False)
assert not matches, f"Found errors {matches}"