mirror of
https://github.com/waku-org/waku-interop-tests.git
synced 2025-01-14 17:34:44 +00:00
Relay Publish: multiple nodes (#4)
* github actions report summary * use env instead of inputs * multiple nodes tests * fix warm up * fix warm up * small fix after CI run * small fix after CI run 2 * add new multi-node test * self review
This commit is contained in:
parent
589368f434
commit
a6a0440312
23
.github/workflows/test.yml
vendored
23
.github/workflows/test.yml
vendored
@ -9,13 +9,20 @@ on:
|
|||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
node1:
|
node1:
|
||||||
required: false
|
required: true
|
||||||
|
description: "Node that usually publishes messages. Used for all tests"
|
||||||
type: string
|
type: string
|
||||||
default: "wakuorg/nwaku:latest"
|
default: "wakuorg/nwaku:latest"
|
||||||
node2:
|
node2:
|
||||||
required: false
|
required: true
|
||||||
|
description: "Node that usually queries for published messages. Used for all tests"
|
||||||
type: string
|
type: string
|
||||||
default: "wakuorg/go-waku:latest"
|
default: "wakuorg/go-waku:latest"
|
||||||
|
additional_nodes:
|
||||||
|
required: false
|
||||||
|
description: "Additional optional nodes used in e2e tests, separated by ,"
|
||||||
|
type: string
|
||||||
|
default: "wakuorg/nwaku:latest,wakuorg/go-waku:latest"
|
||||||
protocol:
|
protocol:
|
||||||
description: "Protocol used to comunicate inside the network"
|
description: "Protocol used to comunicate inside the network"
|
||||||
required: true
|
required: true
|
||||||
@ -29,6 +36,7 @@ env:
|
|||||||
FORCE_COLOR: "1"
|
FORCE_COLOR: "1"
|
||||||
NODE_1: ${{ inputs.node1 }}
|
NODE_1: ${{ inputs.node1 }}
|
||||||
NODE_2: ${{ inputs.node2 }}
|
NODE_2: ${{ inputs.node2 }}
|
||||||
|
ADDITIONAL_NODES: ${{ inputs.additional_nodes }}
|
||||||
PROTOCOL: ${{ inputs.protocol || 'REST' }}
|
PROTOCOL: ${{ inputs.protocol || 'REST' }}
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
@ -75,3 +83,14 @@ jobs:
|
|||||||
github_token: ${{ secrets.GITHUB_TOKEN }}
|
github_token: ${{ secrets.GITHUB_TOKEN }}
|
||||||
publish_branch: gh-pages
|
publish_branch: gh-pages
|
||||||
publish_dir: allure-history
|
publish_dir: allure-history
|
||||||
|
|
||||||
|
- name: Create job summary
|
||||||
|
if: always()
|
||||||
|
run: |
|
||||||
|
echo "## Run Information" >> $GITHUB_STEP_SUMMARY
|
||||||
|
echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY
|
||||||
|
echo "- **Node2**: ${{ env.NODE_2}}" >> $GITHUB_STEP_SUMMARY
|
||||||
|
echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES}}" >> $GITHUB_STEP_SUMMARY
|
||||||
|
echo "- **Protocol**: ${{ env.PROTOCOL }}" >> $GITHUB_STEP_SUMMARY
|
||||||
|
echo "## Test Results" >> $GITHUB_STEP_SUMMARY
|
||||||
|
echo "Allure report will be available at: https://waku-org.github.io/waku-interop-tests/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY
|
||||||
|
@ -16,6 +16,8 @@ def get_env_var(var_name, default=None):
|
|||||||
# Configuration constants. Need to be upercase to appear in reports
|
# Configuration constants. Need to be upercase to appear in reports
|
||||||
NODE_1 = get_env_var("NODE_1", "wakuorg/go-waku:latest")
|
NODE_1 = get_env_var("NODE_1", "wakuorg/go-waku:latest")
|
||||||
NODE_2 = get_env_var("NODE_2", "wakuorg/nwaku:latest")
|
NODE_2 = get_env_var("NODE_2", "wakuorg/nwaku:latest")
|
||||||
|
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", "wakuorg/nwaku:latest,wakuorg/go-waku:latest")
|
||||||
|
# more nodes need to follow the NODE_X pattern
|
||||||
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")
|
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")
|
||||||
NETWORK_NAME = get_env_var("NETWORK_NAME", "waku")
|
NETWORK_NAME = get_env_var("NETWORK_NAME", "waku")
|
||||||
SUBNET = get_env_var("SUBNET", "172.18.0.0/16")
|
SUBNET = get_env_var("SUBNET", "172.18.0.0/16")
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import inspect
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
import math
|
import math
|
||||||
from time import time
|
from time import time
|
||||||
@ -5,7 +6,7 @@ import pytest
|
|||||||
import allure
|
import allure
|
||||||
from src.libs.common import to_base64, delay
|
from src.libs.common import to_base64, delay
|
||||||
from src.data_classes import message_rpc_response_schema
|
from src.data_classes import message_rpc_response_schema
|
||||||
from src.env_vars import NODE_1, NODE_2, NODEKEY
|
from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY, RUNNING_IN_CI
|
||||||
from src.node.waku_node import WakuNode
|
from src.node.waku_node import WakuNode
|
||||||
from tenacity import retry, stop_after_delay, wait_fixed
|
from tenacity import retry, stop_after_delay, wait_fixed
|
||||||
|
|
||||||
@ -13,36 +14,81 @@ logger = get_custom_logger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
class StepsRelay:
|
class StepsRelay:
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
test_pubsub_topic = "/waku/2/rs/18/1"
|
||||||
def setup_nodes(self, request):
|
test_content_topic = "/test/1/waku-relay/proto"
|
||||||
self.node1 = WakuNode(NODE_1, "node1_" + request.cls.test_id)
|
test_payload = "Relay works!!"
|
||||||
self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY)
|
|
||||||
enr_uri = self.node1.info()["enrUri"]
|
|
||||||
self.node2 = WakuNode(NODE_2, "node2_" + request.cls.test_id)
|
|
||||||
self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=enr_uri, peer_exchange="true")
|
|
||||||
self.test_pubsub_topic = "/waku/2/rs/18/1"
|
|
||||||
self.test_content_topic = "/test/1/waku-relay/proto"
|
|
||||||
self.test_payload = "Relay works!!"
|
|
||||||
self.node1.set_subscriptions([self.test_pubsub_topic])
|
|
||||||
self.node2.set_subscriptions([self.test_pubsub_topic])
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function")
|
||||||
def network_warm_up(self, setup_nodes):
|
def setup_main_relay_nodes(self, request):
|
||||||
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
|
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
|
||||||
|
self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY)
|
||||||
|
self.enr_uri = self.node1.info()["enrUri"]
|
||||||
|
self.node2 = WakuNode(NODE_2, f"node1_{request.cls.test_id}")
|
||||||
|
self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true")
|
||||||
|
self.main_nodes = [self.node1, self.node2]
|
||||||
|
self.optional_nodes = []
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def setup_optional_relay_nodes(self, request):
|
||||||
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
|
if ADDITIONAL_NODES:
|
||||||
|
nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")]
|
||||||
|
else:
|
||||||
|
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
|
||||||
|
for index, node in enumerate(nodes):
|
||||||
|
node = WakuNode(node, f"node{index}_{request.cls.test_id}")
|
||||||
|
node.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true")
|
||||||
|
self.optional_nodes.append(node)
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def subscribe_main_relay_nodes(self):
|
||||||
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
|
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def subscribe_optional_relay_nodes(self):
|
||||||
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
|
self.ensure_subscriptions_on_nodes(self.optional_nodes, [self.test_pubsub_topic])
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def relay_warm_up(self):
|
||||||
try:
|
try:
|
||||||
self.wait_for_published_message_to_reach_peer(120)
|
self.wait_for_published_message_to_reach_peer()
|
||||||
logger.info("WARM UP successful !!")
|
logger.info("WARM UP successful!!")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
raise TimeoutError(f"WARM UP FAILED WITH: {ex}")
|
raise TimeoutError(f"WARM UP FAILED WITH: {ex}")
|
||||||
|
|
||||||
|
# this method should be used only for the tests that use the warm_up fixture
|
||||||
|
# otherwise use wait_for_published_message_to_reach_peer
|
||||||
@allure.step
|
@allure.step
|
||||||
def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1):
|
def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None):
|
||||||
self.node1.send_message(message, pubsub_topic or self.test_pubsub_topic)
|
if not sender:
|
||||||
|
sender = self.node1
|
||||||
|
if not peer_list:
|
||||||
|
peer_list = self.main_nodes + self.optional_nodes
|
||||||
|
sender.send_message(message, pubsub_topic or self.test_pubsub_topic)
|
||||||
delay(message_propagation_delay)
|
delay(message_propagation_delay)
|
||||||
get_messages_response = self.node2.get_messages(pubsub_topic or self.test_pubsub_topic)
|
for index, peer in enumerate(peer_list):
|
||||||
assert get_messages_response, "Peer node couldn't find any messages"
|
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
|
||||||
received_message = message_rpc_response_schema.load(get_messages_response[0])
|
get_messages_response = peer.get_messages(pubsub_topic or self.test_pubsub_topic)
|
||||||
self.assert_received_message(message, received_message)
|
assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages"
|
||||||
|
received_message = message_rpc_response_schema.load(get_messages_response[0])
|
||||||
|
self.assert_received_message(message, received_message)
|
||||||
|
|
||||||
|
# we need much bigger timeout in CI because we run tests in parallel there and the machine itself is slower
|
||||||
|
@allure.step
|
||||||
|
def wait_for_published_message_to_reach_peer(
|
||||||
|
self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, sender=None, peer_list=None
|
||||||
|
):
|
||||||
|
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True)
|
||||||
|
def check_peer_connection():
|
||||||
|
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||||
|
self.check_published_message_reaches_peer(message, sender=sender, peer_list=peer_list)
|
||||||
|
|
||||||
|
check_peer_connection()
|
||||||
|
|
||||||
|
@allure.step
|
||||||
def assert_received_message(self, sent_message, received_message):
|
def assert_received_message(self, sent_message, received_message):
|
||||||
def assert_fail_message(field_name):
|
def assert_fail_message(field_name):
|
||||||
return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(received_message, field_name)}"
|
return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(received_message, field_name)}"
|
||||||
@ -63,14 +109,7 @@ class StepsRelay:
|
|||||||
if "rateLimitProof" in sent_message:
|
if "rateLimitProof" in sent_message:
|
||||||
assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof")
|
assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof")
|
||||||
|
|
||||||
def wait_for_published_message_to_reach_peer(self, timeout_duration, time_between_retries=1):
|
@allure.step
|
||||||
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True)
|
|
||||||
def check_peer_connection():
|
|
||||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
|
||||||
self.check_published_message_reaches_peer(message)
|
|
||||||
|
|
||||||
check_peer_connection()
|
|
||||||
|
|
||||||
def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list):
|
def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list):
|
||||||
for node in node_list:
|
for node in node_list:
|
||||||
node.set_subscriptions(pubsub_topic_list)
|
node.set_subscriptions(pubsub_topic_list)
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
import inspect
|
||||||
import glob
|
import glob
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
import os
|
import os
|
||||||
@ -28,6 +29,7 @@ def pytest_runtest_makereport(item):
|
|||||||
def set_allure_env_variables():
|
def set_allure_env_variables():
|
||||||
yield
|
yield
|
||||||
if os.path.isdir("allure-results") and not os.path.isfile(os.path.join("allure-results", "environment.properties")):
|
if os.path.isdir("allure-results") and not os.path.isfile(os.path.join("allure-results", "environment.properties")):
|
||||||
|
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
|
||||||
with open(os.path.join("allure-results", "environment.properties"), "w") as outfile:
|
with open(os.path.join("allure-results", "environment.properties"), "w") as outfile:
|
||||||
for attribute_name in dir(env_vars):
|
for attribute_name in dir(env_vars):
|
||||||
if attribute_name.isupper():
|
if attribute_name.isupper():
|
||||||
@ -38,6 +40,7 @@ def set_allure_env_variables():
|
|||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def test_id(request):
|
def test_id(request):
|
||||||
# setting up an unique test id to be used where needed
|
# setting up an unique test id to be used where needed
|
||||||
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
request.cls.test_id = f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}__{str(uuid4())}"
|
request.cls.test_id = f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}__{str(uuid4())}"
|
||||||
|
|
||||||
|
|
||||||
@ -45,6 +48,7 @@ def test_id(request):
|
|||||||
def test_setup(request, test_id):
|
def test_setup(request, test_id):
|
||||||
logger.debug(f"Running test: {request.node.name} with id: {request.cls.test_id}")
|
logger.debug(f"Running test: {request.node.name} with id: {request.cls.test_id}")
|
||||||
yield
|
yield
|
||||||
|
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
|
||||||
for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*")):
|
for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*")):
|
||||||
if os.path.getmtime(file) < time() - 3600:
|
if os.path.getmtime(file) < time() - 3600:
|
||||||
logger.debug(f"Deleting old log file: {file}")
|
logger.debug(f"Deleting old log file: {file}")
|
||||||
@ -58,6 +62,7 @@ def test_setup(request, test_id):
|
|||||||
def attach_logs_on_fail(request):
|
def attach_logs_on_fail(request):
|
||||||
yield
|
yield
|
||||||
if env_vars.RUNNING_IN_CI and hasattr(request.node, "rep_call") and request.node.rep_call.failed:
|
if env_vars.RUNNING_IN_CI and hasattr(request.node, "rep_call") and request.node.rep_call.failed:
|
||||||
|
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
|
||||||
logger.debug("Test failed, attempting to attach logs to the allure reports")
|
logger.debug("Test failed, attempting to attach logs to the allure reports")
|
||||||
for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*" + request.cls.test_id + "*")):
|
for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*" + request.cls.test_id + "*")):
|
||||||
attach_allure_file(file)
|
attach_allure_file(file)
|
||||||
@ -67,6 +72,7 @@ def attach_logs_on_fail(request):
|
|||||||
def close_open_nodes(attach_logs_on_fail):
|
def close_open_nodes(attach_logs_on_fail):
|
||||||
DS.waku_nodes = []
|
DS.waku_nodes = []
|
||||||
yield
|
yield
|
||||||
|
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
|
||||||
crashed_containers = []
|
crashed_containers = []
|
||||||
for node in DS.waku_nodes:
|
for node in DS.waku_nodes:
|
||||||
try:
|
try:
|
||||||
|
19
tests/relay/test_multiple_nodes.py
Normal file
19
tests/relay/test_multiple_nodes.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import pytest
|
||||||
|
from src.steps.relay import StepsRelay
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes")
|
||||||
|
class TestMultipleNodes(StepsRelay):
|
||||||
|
def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
|
||||||
|
self.check_published_message_reaches_peer(self.create_message())
|
||||||
|
|
||||||
|
def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
|
||||||
|
self.check_published_message_reaches_peer(self.create_message(), sender=self.optional_nodes[-1])
|
||||||
|
|
||||||
|
def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self):
|
||||||
|
self.wait_for_published_message_to_reach_peer(peer_list=self.main_nodes)
|
||||||
|
try:
|
||||||
|
self.check_published_message_reaches_peer(self.create_message(), peer_list=self.optional_nodes)
|
||||||
|
raise AssertionError("Non subscribed nodes received the message!!")
|
||||||
|
except Exception as ex:
|
||||||
|
assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found"
|
@ -1,3 +1,4 @@
|
|||||||
|
import pytest
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
from time import time
|
from time import time
|
||||||
from src.libs.common import delay, to_base64
|
from src.libs.common import delay, to_base64
|
||||||
@ -8,6 +9,7 @@ from src.data_classes import message_rpc_response_schema
|
|||||||
logger = get_custom_logger(__name__)
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("setup_main_relay_nodes", "subscribe_main_relay_nodes", "relay_warm_up")
|
||||||
class TestRelayPublish(StepsRelay):
|
class TestRelayPublish(StepsRelay):
|
||||||
def test_publish_with_valid_payloads(self):
|
def test_publish_with_valid_payloads(self):
|
||||||
failed_payloads = []
|
failed_payloads = []
|
||||||
@ -55,7 +57,7 @@ class TestRelayPublish(StepsRelay):
|
|||||||
self.check_published_message_reaches_peer(message, message_propagation_delay=2)
|
self.check_published_message_reaches_peer(message, message_propagation_delay=2)
|
||||||
raise AssertionError("Duplicate message was retrieved twice")
|
raise AssertionError("Duplicate message was retrieved twice")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Peer node couldn't find any messages" in str(ex)
|
assert "couldn't find any messages" in str(ex)
|
||||||
|
|
||||||
def test_publish_with_valid_content_topics(self):
|
def test_publish_with_valid_content_topics(self):
|
||||||
failed_content_topics = []
|
failed_content_topics = []
|
||||||
@ -90,7 +92,7 @@ class TestRelayPublish(StepsRelay):
|
|||||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||||
|
|
||||||
def test_publish_on_multiple_pubsub_topics(self):
|
def test_publish_on_multiple_pubsub_topics(self):
|
||||||
self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS)
|
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
|
||||||
failed_pubsub_topics = []
|
failed_pubsub_topics = []
|
||||||
for pubsub_topic in VALID_PUBSUB_TOPICS:
|
for pubsub_topic in VALID_PUBSUB_TOPICS:
|
||||||
logger.debug(f"Running test with pubsub topic {pubsub_topic}")
|
logger.debug(f"Running test with pubsub topic {pubsub_topic}")
|
||||||
@ -102,7 +104,7 @@ class TestRelayPublish(StepsRelay):
|
|||||||
assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}"
|
assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}"
|
||||||
|
|
||||||
def test_message_published_on_different_pubsub_topic_is_not_retrieved(self):
|
def test_message_published_on_different_pubsub_topic_is_not_retrieved(self):
|
||||||
self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS)
|
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
|
||||||
self.node1.send_message(self.create_message(), VALID_PUBSUB_TOPICS[0])
|
self.node1.send_message(self.create_message(), VALID_PUBSUB_TOPICS[0])
|
||||||
delay(0.1)
|
delay(0.1)
|
||||||
messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1])
|
messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1])
|
||||||
@ -194,7 +196,7 @@ class TestRelayPublish(StepsRelay):
|
|||||||
self.check_published_message_reaches_peer(message)
|
self.check_published_message_reaches_peer(message)
|
||||||
raise AssertionError("Duplicate message was retrieved twice")
|
raise AssertionError("Duplicate message was retrieved twice")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Peer node couldn't find any messages" in str(ex)
|
assert "couldn't find any messages" in str(ex)
|
||||||
|
|
||||||
def test_publish_while_peer_is_paused(self):
|
def test_publish_while_peer_is_paused(self):
|
||||||
message = self.create_message()
|
message = self.create_message()
|
||||||
@ -218,14 +220,14 @@ class TestRelayPublish(StepsRelay):
|
|||||||
def test_publish_after_node1_restarts(self):
|
def test_publish_after_node1_restarts(self):
|
||||||
self.check_published_message_reaches_peer(self.create_message())
|
self.check_published_message_reaches_peer(self.create_message())
|
||||||
self.node1.restart()
|
self.node1.restart()
|
||||||
self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic])
|
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
||||||
self.wait_for_published_message_to_reach_peer(20)
|
self.wait_for_published_message_to_reach_peer()
|
||||||
|
|
||||||
def test_publish_after_node2_restarts(self):
|
def test_publish_after_node2_restarts(self):
|
||||||
self.check_published_message_reaches_peer(self.create_message())
|
self.check_published_message_reaches_peer(self.create_message())
|
||||||
self.node2.restart()
|
self.node2.restart()
|
||||||
self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic])
|
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
||||||
self.wait_for_published_message_to_reach_peer(20)
|
self.wait_for_published_message_to_reach_peer()
|
||||||
|
|
||||||
def test_publish_and_retrieve_100_messages(self):
|
def test_publish_and_retrieve_100_messages(self):
|
||||||
num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag
|
num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag
|
||||||
|
Loading…
x
Reference in New Issue
Block a user