From 2e58b2db975455bd34a33310afe6fe197da38d01 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Thu, 16 Nov 2023 16:43:44 +0200 Subject: [PATCH] fixes based on Alex suggestions --- src/env_vars.py | 1 + src/libs/common.py | 4 ++-- src/node/api_clients/base_client.py | 11 +++++----- src/node/docker_mananger.py | 20 ++++++++--------- src/node/waku_node.py | 24 ++++++++++---------- src/steps/relay.py | 10 ++++----- tests/conftest.py | 4 ++-- tests/relay/test_publish.py | 34 ++++++++++++++--------------- 8 files changed, 55 insertions(+), 53 deletions(-) diff --git a/src/env_vars.py b/src/env_vars.py index 92803cd1..3cf33609 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -25,3 +25,4 @@ DEFAULT_PUBSUB_TOPIC = get_env_var("DEFAULT_PUBSUB_TOPIC", "/waku/2/default-waku PROTOCOL = get_env_var("PROTOCOL", "REST") RUNNING_IN_CI = get_env_var("CI") NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68") +API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 10) diff --git a/src/libs/common.py b/src/libs/common.py index 67b37adf..58db57b8 100644 --- a/src/libs/common.py +++ b/src/libs/common.py @@ -25,10 +25,10 @@ def to_base64(input_data): def attach_allure_file(file): - logger.debug("Attaching file %s", file) + logger.debug(f"Attaching file {file}") allure.attach.file(file, name=os.path.basename(file), attachment_type=allure.attachment_type.TEXT) def delay(num_seconds): - logger.debug("Sleeping for %s seconds", num_seconds) + logger.debug(f"Sleeping for {num_seconds} seconds") sleep(num_seconds) diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 7992cb03..30725db3 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -1,6 +1,7 @@ import requests from tenacity import retry, stop_after_delay, wait_fixed from abc import ABC, abstractmethod +from src.env_vars import API_REQUEST_TIMEOUT from src.libs.custom_logger import get_custom_logger logger = get_custom_logger(__name__) @@ -13,18 +14,18 @@ class BaseClient(ABC): # ensures that such intermittent issues don't cause the tests to fail outright. @retry(stop=stop_after_delay(0.5), wait=wait_fixed(0.1), reraise=True) def make_request(self, method, url, headers=None, data=None): - logger.debug("%s call: %s with payload: %s", method.upper(), url, data) - response = requests.request(method.upper(), url, headers=headers, data=data) + logger.debug(f"{method.upper()} call: {url} with payload: {data}") + response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) try: response.raise_for_status() except requests.HTTPError as http_err: - logger.error("HTTP error occurred: %s. Response content: %s", http_err, response.content) + logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}") raise except Exception as err: - logger.error("An error occurred: %s. Response content: %s", err, response.content) + logger.error(f"An error occurred: {err}. Response content: {response.content}") raise else: - logger.info("Response status code: %s. Response content: %s", response.status_code, response.content) + logger.error(f"Response status code: {response.status_code}. Response content: {response.content}") return response @abstractmethod diff --git a/src/node/docker_mananger.py b/src/node/docker_mananger.py index 00bd0caf..152e04df 100644 --- a/src/node/docker_mananger.py +++ b/src/node/docker_mananger.py @@ -14,13 +14,13 @@ class DockerManager: def __init__(self, image): self._image = image self._client = docker.from_env() - logger.debug("Docker client initialized with image %s", self._image) + logger.debug(f"Docker client initialized with image {self._image}") def create_network(self, network_name=NETWORK_NAME): - logger.debug("Attempting to create or retrieve network %s", network_name) + logger.debug(f"Attempting to create or retrieve network {network_name}") networks = self._client.networks.list(names=[network_name]) if networks: - logger.debug("Network %s already exists", network_name) + logger.debug(f"Network {network_name} already exists") return networks[0] network = self._client.networks.create( @@ -28,7 +28,7 @@ class DockerManager: driver="bridge", ipam=IPAMConfig(driver="default", pool_configs=[IPAMPool(subnet=SUBNET, iprange=IP_RANGE, gateway=GATEWAY)]), ) - logger.debug("Network %s created", network_name) + logger.debug(f"Network {network_name} created") return network def start_container(self, image_name, ports, args, log_path, container_ip): @@ -39,14 +39,14 @@ class DockerManager: else: cli_args.append(f"--{key}={value}") # Add a single command port_bindings = {f"{port}/tcp": ("", port) for port in ports} - logger.debug("Starting container with image %s", image_name) - logger.debug("Using args %s", cli_args) + logger.debug(f"Starting container with image {image_name}") + logger.debug(f"Using args {cli_args}") container = self._client.containers.run(image_name, command=cli_args, ports=port_bindings, detach=True, remove=True, auto_remove=True) network = self._client.networks.get(NETWORK_NAME) network.connect(container, ipv4_address=container_ip) - logger.debug("Container started with ID %s. Setting up logs at %s", container.short_id, log_path) + logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}") log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) log_thread.daemon = True log_thread.start() @@ -63,14 +63,14 @@ class DockerManager: if base_port is None: base_port = random.randint(1024, 65535 - count) ports = [base_port + i for i in range(count)] - logger.debug("Generated ports %s", ports) + logger.debug(f"Generated ports {ports}") return ports @staticmethod def generate_random_ext_ip(): base_ip_fragments = ["172", "18"] ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)]) - logger.debug("Generated random external IP %s", ext_ip) + logger.debug(f"Generated random external IP {ext_ip}") return ext_ip def is_container_running(self, container): @@ -78,7 +78,7 @@ class DockerManager: refreshed_container = self._client.containers.get(container.id) return refreshed_container.status == "running" except NotFound: - logger.error("Container with ID %s not found", container.id) + logger.error(f"Container with ID {container.id} not found") return False @property diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 9fa4ba28..f701b14a 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -17,7 +17,7 @@ class WakuNode: self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") self._docker_manager = DockerManager(self._image_name) self._container = None - logger.debug("WakuNode instance initialized with log path %s", self._log_path) + logger.debug(f"WakuNode instance initialized with log path {self._log_path}") @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def start(self, **kwargs): @@ -70,47 +70,49 @@ class WakuNode: self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip) logger.debug( - "Started container from image %s. RPC: %s REST: %s WebSocket: %s", self._image_name, self._rpc_port, self._rest_port, self._websocket_port + f"Started container from image {self._image_name}. RPC: {self._rpc_port} REST: {self._rest_port} WebSocket: {self._websocket_port}" ) DS.waku_nodes.append(self) delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly try: self.ensure_ready() - except Exception as e: - logger.error("%s service did not become ready in time: %s", PROTOCOL, e) + except Exception as ex: + logger.error(f"{PROTOCOL} service did not become ready in time: {ex}") raise @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): if self._container: - logger.debug("Stopping container with id %s", self._container.short_id) + logger.debug(f"Stopping container with id {self._container.short_id}") self._container.stop() logger.debug("Container stopped.") def restart(self): if self._container: - logger.debug("Restarting container with id %s", self._container.short_id) + logger.debug(f"Restarting container with id {self._container.short_id}") self._container.restart() def pause(self): if self._container: - logger.debug("Pausing container with id %s", self._container.short_id) + logger.debug(f"Pausing container with id {self._container.short_id}") self._container.pause() def unpause(self): if self._container: - logger.debug("Unpause container with id %s", self._container.short_id) + logger.debug(f"Unpause container with id {self._container.short_id}") self._container.unpause() @retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True) def ensure_ready(self): self.info() - logger.info("%s service is ready !!", PROTOCOL) + logger.info(f"{PROTOCOL} service is ready !!") def info(self): return self._api.info() - def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUB_TOPIC]): + def set_subscriptions(self, pubsub_topics=None): + if not pubsub_topics: + pubsub_topics = [DEFAULT_PUBSUB_TOPIC] return self._api.set_subscriptions(pubsub_topics) def send_message(self, message, pubsub_topic=DEFAULT_PUBSUB_TOPIC): @@ -129,7 +131,7 @@ class WakuNode: elif self.is_gowaku(): return "gowaku" else: - raise Exception("Unknown node type!!!") + raise ValueError("Unknown node type!!!") def is_nwaku(self): return "nwaku" in self.image diff --git a/src/steps/relay.py b/src/steps/relay.py index 84174ce2..720232d2 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -32,15 +32,13 @@ class StepsRelay: self.wait_for_published_message_to_reach_peer(120) logger.info("WARM UP successful !!") except Exception as ex: - raise Exception(f"WARM UP FAILED WITH: {ex}") + raise TimeoutError(f"WARM UP FAILED WITH: {ex}") @allure.step def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1): - if not pubsub_topic: - pubsub_topic = self.test_pubsub_topic - self.node1.send_message(message, pubsub_topic) + self.node1.send_message(message, pubsub_topic or self.test_pubsub_topic) delay(message_propagation_delay) - get_messages_response = self.node2.get_messages(pubsub_topic) + get_messages_response = self.node2.get_messages(pubsub_topic or self.test_pubsub_topic) assert get_messages_response, "Peer node couldn't find any messages" received_message = message_rpc_response_schema.load(get_messages_response[0]) self.assert_received_message(message, received_message) @@ -51,7 +49,7 @@ class StepsRelay: assert received_message.payload == sent_message["payload"], assert_fail_message("payload") assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic") - if "timestamp" in sent_message and sent_message["timestamp"] is not None: + if sent_message.get("timestamp") is not None: if isinstance(sent_message["timestamp"], float): assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp") else: diff --git a/tests/conftest.py b/tests/conftest.py index 6eedd099..b412a297 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,7 +43,7 @@ def test_id(request): @pytest.fixture(scope="function", autouse=True) def test_setup(request, test_id): - logger.debug("Running test: %s with id: %s", request.node.name, request.cls.test_id) + logger.debug(f"Running test: {request.node.name} with id: {request.cls.test_id}") yield for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*")): if os.path.getmtime(file) < time() - 3600: @@ -74,5 +74,5 @@ def close_open_nodes(attach_logs_on_fail): except Exception as ex: if "No such container" in str(ex): crashed_containers.append(node.image) - logger.error("Failed to stop container because of error %s", ex) + logger.error(f"Failed to stop container because of error {ex}") assert not crashed_containers, f"Containers {crashed_containers} crashed during the test!!!" diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index ca5eec70..eaed41c5 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -11,19 +11,19 @@ class TestRelayPublish(StepsRelay): def test_publish_with_valid_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: - logger.debug("Running test with payload %s", payload["description"]) + logger.debug(f'Running test with payload {payload["description"]}') message = self.create_message(payload=to_base64(payload["value"])) try: self.check_published_message_reaches_peer(message) except Exception as e: - logger.error("Payload %s failed: %s", payload["description"], str(e)) + logger.error(f'Payload {payload["description"]} failed: {str(e)}') failed_payloads.append(payload["description"]) assert not failed_payloads, f"Payloads failed: {failed_payloads}" def test_publish_with_invalid_payloads(self): success_payloads = [] for payload in INVALID_PAYLOADS: - logger.debug("Running test with payload %s", payload["description"]) + logger.debug(f'Running test with payload {payload["description"]}') message = self.create_message(payload=payload["value"]) try: self.node1.send_message(message, self.test_pubsub_topic) @@ -42,14 +42,13 @@ class TestRelayPublish(StepsRelay): def test_publish_with_payload_less_than_one_mb(self): payload_length = 1024 * 1023 - logger.debug("Running test with payload length of %s bytes", payload_length) + logger.debug(f"Running test with payload length of {payload_length} bytes") message = self.create_message(payload=to_base64("a" * (payload_length))) self.check_published_message_reaches_peer(message, message_propagation_delay=2) def test_publish_with_payload_equal_or_more_than_one_mb(self): - payload_length = 1024 * 1023 for payload_length in [1024 * 1024, 1024 * 1024 * 10]: - logger.debug("Running test with payload length of %s bytes", payload_length) + logger.debug(f"Running test with payload length of {payload_length} bytes") message = self.create_message(payload=to_base64("a" * (payload_length))) try: self.check_published_message_reaches_peer(message, message_propagation_delay=2) @@ -60,19 +59,19 @@ class TestRelayPublish(StepsRelay): def test_publish_with_valid_content_topics(self): failed_content_topics = [] for content_topic in SAMPLE_INPUTS: - logger.debug("Running test with content topic %s", content_topic["description"]) + logger.debug(f'Running test with content topic {content_topic["description"]}') message = self.create_message(contentTopic=content_topic["value"]) try: self.check_published_message_reaches_peer(message) except Exception as e: - logger.error("ContentTopic %s failed: %s", content_topic["description"], str(e)) + logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}') failed_content_topics.append(content_topic) assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" def test_publish_with_invalid_content_topics(self): success_content_topics = [] for content_topic in INVALID_CONTENT_TOPICS: - logger.debug("Running test with contetn topic %s", content_topic["description"]) + logger.debug(f'Running test with contetn topic {content_topic["description"]}') message = self.create_message(contentTopic=content_topic["value"]) try: self.node1.send_message(message, self.test_pubsub_topic) @@ -93,11 +92,11 @@ class TestRelayPublish(StepsRelay): self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS) failed_pubsub_topics = [] for pubsub_topic in VALID_PUBSUB_TOPICS: - logger.debug("Running test with pubsub topic %s", pubsub_topic) + logger.debug(f"Running test with pubsub topic {pubsub_topic}") try: self.check_published_message_reaches_peer(self.create_message(), pubsub_topic=pubsub_topic) except Exception as e: - logger.error("PubusubTopic %s failed: %s", pubsub_topic, str(e)) + logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}") failed_pubsub_topics.append(pubsub_topic) assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" @@ -119,12 +118,12 @@ class TestRelayPublish(StepsRelay): failed_timestamps = [] for timestamp in SAMPLE_TIMESTAMPS: if self.node1.type() in timestamp["valid_for"]: - logger.debug("Running test with timestamp %s", timestamp["description"]) + logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) try: self.check_published_message_reaches_peer(message) except Exception as ex: - logger.error("Timestamp %s failed: %s", timestamp["description"], str(ex)) + logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}') failed_timestamps.append(timestamp) assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}" @@ -132,7 +131,7 @@ class TestRelayPublish(StepsRelay): success_timestamps = [] for timestamp in SAMPLE_TIMESTAMPS: if self.node1.type() not in timestamp["valid_for"]: - logger.debug("Running test with timestamp %s", timestamp["description"]) + logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) try: self.check_published_message_reaches_peer(message) @@ -168,11 +167,11 @@ class TestRelayPublish(StepsRelay): def test_publish_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: - logger.debug("Running test with Ephemeral %s", ephemeral) + logger.debug(f"Running test with Ephemeral {ephemeral}") try: self.check_published_message_reaches_peer(self.create_message(ephemeral=ephemeral)) except Exception as e: - logger.error("Massage with Ephemeral %s failed: %s", ephemeral, str(e)) + logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}") failed_ephemeral.append(ephemeral) assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" @@ -196,12 +195,13 @@ class TestRelayPublish(StepsRelay): except Exception as ex: assert "Peer node couldn't find any messages" in str(ex) - def test_publish_after_node_pauses(self): + def test_publish_after_node_pauses_and_pauses(self): self.check_published_message_reaches_peer(self.create_message()) self.node1.pause() self.node1.unpause() self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) self.node2.pause() + self.check_published_message_reaches_peer(self.create_message()) self.node2.unpause() self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2")))