mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-07 00:13:06 +00:00
add new multi-node test
This commit is contained in:
parent
154a7f13e8
commit
2d01046649
11
.github/workflows/test.yml
vendored
11
.github/workflows/test.yml
vendored
@ -18,11 +18,11 @@ on:
|
||||
description: "Node that usually queries for published messages. Used for all tests"
|
||||
type: string
|
||||
default: "wakuorg/go-waku:latest"
|
||||
node3:
|
||||
additional_nodes:
|
||||
required: false
|
||||
description: "Optional node used in e2e tests"
|
||||
description: "Additional optional nodes used in e2e tests, separated by ,"
|
||||
type: string
|
||||
default: "wakuorg/nwaku:latest"
|
||||
default: "wakuorg/nwaku:latest,wakuorg/go-waku:latest"
|
||||
protocol:
|
||||
description: "Protocol used to comunicate inside the network"
|
||||
required: true
|
||||
@ -36,8 +36,7 @@ env:
|
||||
FORCE_COLOR: "1"
|
||||
NODE_1: ${{ inputs.node1 }}
|
||||
NODE_2: ${{ inputs.node2 }}
|
||||
NODE_3: ${{ inputs.node3 }}
|
||||
# more nodes need to follow the NODE_X pattern
|
||||
ADDITIONAL_NODES: ${{ inputs.additional_nodes }}
|
||||
PROTOCOL: ${{ inputs.protocol || 'REST' }}
|
||||
|
||||
jobs:
|
||||
@ -91,7 +90,7 @@ jobs:
|
||||
echo "## Run Information" >> $GITHUB_STEP_SUMMARY
|
||||
echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY
|
||||
echo "- **Node2**: ${{ env.NODE_2}}" >> $GITHUB_STEP_SUMMARY
|
||||
echo "- **Node3**: ${{ env.NODE_3}}" >> $GITHUB_STEP_SUMMARY
|
||||
echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES}}" >> $GITHUB_STEP_SUMMARY
|
||||
echo "- **Protocol**: ${{ env.PROTOCOL }}" >> $GITHUB_STEP_SUMMARY
|
||||
echo "## Test Results" >> $GITHUB_STEP_SUMMARY
|
||||
echo "Allure report will be available at: https://waku-org.github.io/waku-interop-tests/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY
|
||||
@ -13,26 +13,10 @@ def get_env_var(var_name, default=None):
|
||||
return env_var
|
||||
|
||||
|
||||
def get_nodes(defaults):
|
||||
nodes = []
|
||||
# First, use the defaults provided
|
||||
for node_var_name, default_value in defaults.items():
|
||||
node = get_env_var(node_var_name, default_value)
|
||||
nodes.append(node)
|
||||
# Now check for additional NODE_X variables
|
||||
index = len(defaults) + 1
|
||||
while True:
|
||||
extra_node_var_name = f"NODE_{index}"
|
||||
extra_node = get_env_var(extra_node_var_name)
|
||||
if not extra_node: # Break the loop if an additional NODE_X is not set
|
||||
break
|
||||
nodes.append(extra_node)
|
||||
index += 1
|
||||
return nodes
|
||||
|
||||
|
||||
# Configuration constants. Need to be upercase to appear in reports
|
||||
NODE_LIST = get_nodes(defaults={"NODE_1": "wakuorg/go-waku:latest", "NODE_2": "wakuorg/nwaku:latest", "NODE_3": "wakuorg/go-waku:latest"})
|
||||
NODE_1 = get_env_var("NODE_1", "wakuorg/go-waku:latest")
|
||||
NODE_2 = get_env_var("NODE_2", "wakuorg/nwaku:latest")
|
||||
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", "wakuorg/nwaku:latest,wakuorg/go-waku:latest")
|
||||
# more nodes need to follow the NODE_X pattern
|
||||
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")
|
||||
NETWORK_NAME = get_env_var("NETWORK_NAME", "waku")
|
||||
|
||||
@ -6,7 +6,7 @@ import pytest
|
||||
import allure
|
||||
from src.libs.common import to_base64, delay
|
||||
from src.data_classes import message_rpc_response_schema
|
||||
from src.env_vars import NODE_LIST, NODEKEY, RUNNING_IN_CI
|
||||
from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY, RUNNING_IN_CI
|
||||
from src.node.waku_node import WakuNode
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
|
||||
@ -21,10 +21,10 @@ class StepsRelay:
|
||||
@pytest.fixture(scope="function")
|
||||
def setup_main_relay_nodes(self, request):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
self.node1 = WakuNode(NODE_LIST[0], f"node1_{request.cls.test_id}")
|
||||
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
|
||||
self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY)
|
||||
self.enr_uri = self.node1.info()["enrUri"]
|
||||
self.node2 = WakuNode(NODE_LIST[1], f"node1_{request.cls.test_id}")
|
||||
self.node2 = WakuNode(NODE_2, f"node1_{request.cls.test_id}")
|
||||
self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true")
|
||||
self.main_nodes = [self.node1, self.node2]
|
||||
self.optional_nodes = []
|
||||
@ -32,7 +32,11 @@ class StepsRelay:
|
||||
@pytest.fixture(scope="function")
|
||||
def setup_optional_relay_nodes(self, request):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
for index, node in enumerate(NODE_LIST[2:]):
|
||||
if ADDITIONAL_NODES:
|
||||
nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")]
|
||||
else:
|
||||
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
|
||||
for index, node in enumerate(nodes):
|
||||
node = WakuNode(node, f"node{index}_{request.cls.test_id}")
|
||||
node.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true")
|
||||
self.optional_nodes.append(node)
|
||||
@ -55,6 +59,8 @@ class StepsRelay:
|
||||
except Exception as ex:
|
||||
raise TimeoutError(f"WARM UP FAILED WITH: {ex}")
|
||||
|
||||
# this method should be used only for the tests that use the warm_up fixture
|
||||
# otherwise use wait_for_published_message_to_reach_peer
|
||||
@allure.step
|
||||
def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None):
|
||||
if not sender:
|
||||
@ -70,6 +76,18 @@ class StepsRelay:
|
||||
received_message = message_rpc_response_schema.load(get_messages_response[0])
|
||||
self.assert_received_message(message, received_message)
|
||||
|
||||
@allure.step
|
||||
def wait_for_published_message_to_reach_peer(
|
||||
self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, sender=None, peer_list=None
|
||||
):
|
||||
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True)
|
||||
def check_peer_connection():
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
self.check_published_message_reaches_peer(message, sender=sender, peer_list=peer_list)
|
||||
|
||||
check_peer_connection()
|
||||
|
||||
@allure.step
|
||||
def assert_received_message(self, sent_message, received_message):
|
||||
def assert_fail_message(field_name):
|
||||
return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(received_message, field_name)}"
|
||||
@ -90,16 +108,7 @@ class StepsRelay:
|
||||
if "rateLimitProof" in sent_message:
|
||||
assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof")
|
||||
|
||||
def wait_for_published_message_to_reach_peer(
|
||||
self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, sender=None, peer_list=None
|
||||
):
|
||||
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True)
|
||||
def check_peer_connection():
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
self.check_published_message_reaches_peer(message, sender=sender, peer_list=peer_list)
|
||||
|
||||
check_peer_connection()
|
||||
|
||||
@allure.step
|
||||
def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list):
|
||||
for node in node_list:
|
||||
node.set_subscriptions(pubsub_topic_list)
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
from time import sleep
|
||||
import pytest
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.relay import StepsRelay
|
||||
@ -5,12 +6,18 @@ from src.steps.relay import StepsRelay
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes", "subscribe_optional_relay_nodes", "relay_warm_up"
|
||||
)
|
||||
@pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes")
|
||||
class TestMultipleNodes(StepsRelay):
|
||||
def test_first_node_to_start_publishes(self):
|
||||
self.wait_for_published_message_to_reach_peer()
|
||||
def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
|
||||
self.check_published_message_reaches_peer(self.create_message())
|
||||
|
||||
def test_last_node_to_start_publishes(self):
|
||||
self.wait_for_published_message_to_reach_peer(sender=self.optional_nodes[-1])
|
||||
def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
|
||||
self.check_published_message_reaches_peer(self.create_message(), sender=self.optional_nodes[-1])
|
||||
|
||||
def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self):
|
||||
self.wait_for_published_message_to_reach_peer(peer_list=self.main_nodes)
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.create_message(), peer_list=self.optional_nodes)
|
||||
raise AssertionError("Non subscribed nodes received the message!!")
|
||||
except Exception as ex:
|
||||
assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found"
|
||||
|
||||
@ -221,13 +221,13 @@ class TestRelayPublish(StepsRelay):
|
||||
self.check_published_message_reaches_peer(self.create_message())
|
||||
self.node1.restart()
|
||||
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
||||
self.wait_for_published_message_to_reach_peer(20)
|
||||
self.wait_for_published_message_to_reach_peer()
|
||||
|
||||
def test_publish_after_node2_restarts(self):
|
||||
self.check_published_message_reaches_peer(self.create_message())
|
||||
self.node2.restart()
|
||||
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
||||
self.wait_for_published_message_to_reach_peer(20)
|
||||
self.wait_for_published_message_to_reach_peer()
|
||||
|
||||
def test_publish_and_retrieve_100_messages(self):
|
||||
num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user