From a6a04403128f8ae6bf3189a3180d8d9379e57b3d Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Tue, 21 Nov 2023 09:29:48 +0200 Subject: [PATCH] Relay Publish: multiple nodes (#4) * github actions report summary * use env instead of inputs * multiple nodes tests * fix warm up * fix warm up * small fix after CI run * small fix after CI run 2 * add new multi-node test * self review --- .github/workflows/test.yml | 23 ++++++- src/env_vars.py | 2 + src/steps/relay.py | 101 ++++++++++++++++++++--------- tests/conftest.py | 6 ++ tests/relay/test_multiple_nodes.py | 19 ++++++ tests/relay/test_publish.py | 18 ++--- 6 files changed, 128 insertions(+), 41 deletions(-) create mode 100644 tests/relay/test_multiple_nodes.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 16d8b78ad..64032efe9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,13 +9,20 @@ on: workflow_dispatch: inputs: node1: - required: false + required: true + description: "Node that usually publishes messages. Used for all tests" type: string default: "wakuorg/nwaku:latest" node2: - required: false + required: true + description: "Node that usually queries for published messages. Used for all tests" type: string default: "wakuorg/go-waku:latest" + additional_nodes: + required: false + description: "Additional optional nodes used in e2e tests, separated by ," + type: string + default: "wakuorg/nwaku:latest,wakuorg/go-waku:latest" protocol: description: "Protocol used to comunicate inside the network" required: true @@ -29,6 +36,7 @@ env: FORCE_COLOR: "1" NODE_1: ${{ inputs.node1 }} NODE_2: ${{ inputs.node2 }} + ADDITIONAL_NODES: ${{ inputs.additional_nodes }} PROTOCOL: ${{ inputs.protocol || 'REST' }} jobs: @@ -75,3 +83,14 @@ jobs: github_token: ${{ secrets.GITHUB_TOKEN }} publish_branch: gh-pages publish_dir: allure-history + + - name: Create job summary + if: always() + run: | + echo "## Run Information" >> $GITHUB_STEP_SUMMARY + echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY + echo "- **Node2**: ${{ env.NODE_2}}" >> $GITHUB_STEP_SUMMARY + echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES}}" >> $GITHUB_STEP_SUMMARY + echo "- **Protocol**: ${{ env.PROTOCOL }}" >> $GITHUB_STEP_SUMMARY + echo "## Test Results" >> $GITHUB_STEP_SUMMARY + echo "Allure report will be available at: https://waku-org.github.io/waku-interop-tests/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY diff --git a/src/env_vars.py b/src/env_vars.py index 3cf33609a..a996661cc 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -16,6 +16,8 @@ def get_env_var(var_name, default=None): # Configuration constants. Need to be upercase to appear in reports NODE_1 = get_env_var("NODE_1", "wakuorg/go-waku:latest") NODE_2 = get_env_var("NODE_2", "wakuorg/nwaku:latest") +ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", "wakuorg/nwaku:latest,wakuorg/go-waku:latest") +# more nodes need to follow the NODE_X pattern DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker") NETWORK_NAME = get_env_var("NETWORK_NAME", "waku") SUBNET = get_env_var("SUBNET", "172.18.0.0/16") diff --git a/src/steps/relay.py b/src/steps/relay.py index 720232d24..d9bafdbad 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -1,3 +1,4 @@ +import inspect from src.libs.custom_logger import get_custom_logger import math from time import time @@ -5,7 +6,7 @@ import pytest import allure from src.libs.common import to_base64, delay from src.data_classes import message_rpc_response_schema -from src.env_vars import NODE_1, NODE_2, NODEKEY +from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY, RUNNING_IN_CI from src.node.waku_node import WakuNode from tenacity import retry, stop_after_delay, wait_fixed @@ -13,36 +14,81 @@ logger = get_custom_logger(__name__) class StepsRelay: - @pytest.fixture(scope="function", autouse=True) - def setup_nodes(self, request): - self.node1 = WakuNode(NODE_1, "node1_" + request.cls.test_id) - self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY) - enr_uri = self.node1.info()["enrUri"] - self.node2 = WakuNode(NODE_2, "node2_" + request.cls.test_id) - self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=enr_uri, peer_exchange="true") - self.test_pubsub_topic = "/waku/2/rs/18/1" - self.test_content_topic = "/test/1/waku-relay/proto" - self.test_payload = "Relay works!!" - self.node1.set_subscriptions([self.test_pubsub_topic]) - self.node2.set_subscriptions([self.test_pubsub_topic]) + test_pubsub_topic = "/waku/2/rs/18/1" + test_content_topic = "/test/1/waku-relay/proto" + test_payload = "Relay works!!" - @pytest.fixture(scope="function", autouse=True) - def network_warm_up(self, setup_nodes): + @pytest.fixture(scope="function") + def setup_main_relay_nodes(self, request): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") + self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY) + self.enr_uri = self.node1.info()["enrUri"] + self.node2 = WakuNode(NODE_2, f"node1_{request.cls.test_id}") + self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true") + self.main_nodes = [self.node1, self.node2] + self.optional_nodes = [] + + @pytest.fixture(scope="function") + def setup_optional_relay_nodes(self, request): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + if ADDITIONAL_NODES: + nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")] + else: + pytest.skip("ADDITIONAL_NODES is empty, cannot run test") + for index, node in enumerate(nodes): + node = WakuNode(node, f"node{index}_{request.cls.test_id}") + node.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true") + self.optional_nodes.append(node) + + @pytest.fixture(scope="function") + def subscribe_main_relay_nodes(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + + @pytest.fixture(scope="function") + def subscribe_optional_relay_nodes(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.ensure_subscriptions_on_nodes(self.optional_nodes, [self.test_pubsub_topic]) + + @pytest.fixture(scope="function") + def relay_warm_up(self): try: - self.wait_for_published_message_to_reach_peer(120) - logger.info("WARM UP successful !!") + self.wait_for_published_message_to_reach_peer() + logger.info("WARM UP successful!!") except Exception as ex: raise TimeoutError(f"WARM UP FAILED WITH: {ex}") + # this method should be used only for the tests that use the warm_up fixture + # otherwise use wait_for_published_message_to_reach_peer @allure.step - def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1): - self.node1.send_message(message, pubsub_topic or self.test_pubsub_topic) + def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None): + if not sender: + sender = self.node1 + if not peer_list: + peer_list = self.main_nodes + self.optional_nodes + sender.send_message(message, pubsub_topic or self.test_pubsub_topic) delay(message_propagation_delay) - get_messages_response = self.node2.get_messages(pubsub_topic or self.test_pubsub_topic) - assert get_messages_response, "Peer node couldn't find any messages" - received_message = message_rpc_response_schema.load(get_messages_response[0]) - self.assert_received_message(message, received_message) + for index, peer in enumerate(peer_list): + logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") + get_messages_response = peer.get_messages(pubsub_topic or self.test_pubsub_topic) + assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages" + received_message = message_rpc_response_schema.load(get_messages_response[0]) + self.assert_received_message(message, received_message) + # we need much bigger timeout in CI because we run tests in parallel there and the machine itself is slower + @allure.step + def wait_for_published_message_to_reach_peer( + self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, sender=None, peer_list=None + ): + @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True) + def check_peer_connection(): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + self.check_published_message_reaches_peer(message, sender=sender, peer_list=peer_list) + + check_peer_connection() + + @allure.step def assert_received_message(self, sent_message, received_message): def assert_fail_message(field_name): return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(received_message, field_name)}" @@ -63,14 +109,7 @@ class StepsRelay: if "rateLimitProof" in sent_message: assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof") - def wait_for_published_message_to_reach_peer(self, timeout_duration, time_between_retries=1): - @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True) - def check_peer_connection(): - message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} - self.check_published_message_reaches_peer(message) - - check_peer_connection() - + @allure.step def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list): for node in node_list: node.set_subscriptions(pubsub_topic_list) diff --git a/tests/conftest.py b/tests/conftest.py index b412a2971..2e26b97cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import inspect import glob from src.libs.custom_logger import get_custom_logger import os @@ -28,6 +29,7 @@ def pytest_runtest_makereport(item): def set_allure_env_variables(): yield if os.path.isdir("allure-results") and not os.path.isfile(os.path.join("allure-results", "environment.properties")): + logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}") with open(os.path.join("allure-results", "environment.properties"), "w") as outfile: for attribute_name in dir(env_vars): if attribute_name.isupper(): @@ -38,6 +40,7 @@ def set_allure_env_variables(): @pytest.fixture(scope="function", autouse=True) def test_id(request): # setting up an unique test id to be used where needed + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") request.cls.test_id = f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}__{str(uuid4())}" @@ -45,6 +48,7 @@ def test_id(request): def test_setup(request, test_id): logger.debug(f"Running test: {request.node.name} with id: {request.cls.test_id}") yield + logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}") for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*")): if os.path.getmtime(file) < time() - 3600: logger.debug(f"Deleting old log file: {file}") @@ -58,6 +62,7 @@ def test_setup(request, test_id): def attach_logs_on_fail(request): yield if env_vars.RUNNING_IN_CI and hasattr(request.node, "rep_call") and request.node.rep_call.failed: + logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}") logger.debug("Test failed, attempting to attach logs to the allure reports") for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*" + request.cls.test_id + "*")): attach_allure_file(file) @@ -67,6 +72,7 @@ def attach_logs_on_fail(request): def close_open_nodes(attach_logs_on_fail): DS.waku_nodes = [] yield + logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}") crashed_containers = [] for node in DS.waku_nodes: try: diff --git a/tests/relay/test_multiple_nodes.py b/tests/relay/test_multiple_nodes.py new file mode 100644 index 000000000..aa7cc121e --- /dev/null +++ b/tests/relay/test_multiple_nodes.py @@ -0,0 +1,19 @@ +import pytest +from src.steps.relay import StepsRelay + + +@pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes") +class TestMultipleNodes(StepsRelay): + def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): + self.check_published_message_reaches_peer(self.create_message()) + + def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): + self.check_published_message_reaches_peer(self.create_message(), sender=self.optional_nodes[-1]) + + def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self): + self.wait_for_published_message_to_reach_peer(peer_list=self.main_nodes) + try: + self.check_published_message_reaches_peer(self.create_message(), peer_list=self.optional_nodes) + raise AssertionError("Non subscribed nodes received the message!!") + except Exception as ex: + assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found" diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 3d49d1993..0f45ff981 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -1,3 +1,4 @@ +import pytest from src.libs.custom_logger import get_custom_logger from time import time from src.libs.common import delay, to_base64 @@ -8,6 +9,7 @@ from src.data_classes import message_rpc_response_schema logger = get_custom_logger(__name__) +@pytest.mark.usefixtures("setup_main_relay_nodes", "subscribe_main_relay_nodes", "relay_warm_up") class TestRelayPublish(StepsRelay): def test_publish_with_valid_payloads(self): failed_payloads = [] @@ -55,7 +57,7 @@ class TestRelayPublish(StepsRelay): self.check_published_message_reaches_peer(message, message_propagation_delay=2) raise AssertionError("Duplicate message was retrieved twice") except Exception as ex: - assert "Peer node couldn't find any messages" in str(ex) + assert "couldn't find any messages" in str(ex) def test_publish_with_valid_content_topics(self): failed_content_topics = [] @@ -90,7 +92,7 @@ class TestRelayPublish(StepsRelay): assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) def test_publish_on_multiple_pubsub_topics(self): - self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS) + self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) failed_pubsub_topics = [] for pubsub_topic in VALID_PUBSUB_TOPICS: logger.debug(f"Running test with pubsub topic {pubsub_topic}") @@ -102,7 +104,7 @@ class TestRelayPublish(StepsRelay): assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" def test_message_published_on_different_pubsub_topic_is_not_retrieved(self): - self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS) + self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) self.node1.send_message(self.create_message(), VALID_PUBSUB_TOPICS[0]) delay(0.1) messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1]) @@ -194,7 +196,7 @@ class TestRelayPublish(StepsRelay): self.check_published_message_reaches_peer(message) raise AssertionError("Duplicate message was retrieved twice") except Exception as ex: - assert "Peer node couldn't find any messages" in str(ex) + assert "couldn't find any messages" in str(ex) def test_publish_while_peer_is_paused(self): message = self.create_message() @@ -218,14 +220,14 @@ class TestRelayPublish(StepsRelay): def test_publish_after_node1_restarts(self): self.check_published_message_reaches_peer(self.create_message()) self.node1.restart() - self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer(20) + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer() def test_publish_after_node2_restarts(self): self.check_published_message_reaches_peer(self.create_message()) self.node2.restart() - self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic]) - self.wait_for_published_message_to_reach_peer(20) + self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) + self.wait_for_published_message_to_reach_peer() def test_publish_and_retrieve_100_messages(self): num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag