mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-10 18:03:07 +00:00
mulitple node tests
This commit is contained in:
parent
26718c38a4
commit
692213ceeb
@ -14,9 +14,11 @@ def get_env_var(var_name, default=None):
|
||||
|
||||
|
||||
# Configuration constants. Need to be upercase to appear in reports
|
||||
NODE_1 = get_env_var("NODE_1", "wakuorg/go-waku:latest")
|
||||
NODE_2 = get_env_var("NODE_2", "wakuorg/nwaku:latest")
|
||||
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", "wakuorg/nwaku:latest,wakuorg/go-waku:latest")
|
||||
DEFAULT_NWAKU = "harbor.status.im/wakuorg/nwaku:latest"
|
||||
DEFAULT_GOWAKU = "harbor.status.im/wakuorg/go-waku:latest"
|
||||
NODE_1 = get_env_var("NODE_1", DEFAULT_GOWAKU)
|
||||
NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU)
|
||||
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_GOWAKU}")
|
||||
# more nodes need to follow the NODE_X pattern
|
||||
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")
|
||||
NETWORK_NAME = get_env_var("NETWORK_NAME", "waku")
|
||||
|
||||
@ -59,7 +59,7 @@ class DockerManager:
|
||||
for chunk in container.logs(stream=True):
|
||||
log_file.write(chunk)
|
||||
|
||||
def generate_ports(self, base_port=None, count=5):
|
||||
def generate_ports(self, base_port=None, count=6):
|
||||
if base_port is None:
|
||||
base_port = random.randint(1024, 65535 - count)
|
||||
ports = [base_port + i for i in range(count)]
|
||||
|
||||
@ -1,13 +1,13 @@
|
||||
import os
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from src.libs.common import delay
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
from src.node.api_clients.rpc import RPC
|
||||
from src.node.api_clients.rest import REST
|
||||
from src.node.docker_mananger import DockerManager
|
||||
from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUB_TOPIC, PROTOCOL
|
||||
from src.env_vars import DOCKER_LOG_DIR, PROTOCOL
|
||||
from src.data_storage import DS
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
@ -29,8 +29,9 @@ class WakuNode:
|
||||
self._ports = self._docker_manager.generate_ports()
|
||||
self._rest_port = self._ports[0]
|
||||
self._rpc_port = self._ports[1]
|
||||
self._websocket_port = self._ports[3]
|
||||
self._tcp_port = self._ports[2]
|
||||
self._websocket_port = self._ports[3]
|
||||
self._metrics_port = self._ports[5]
|
||||
|
||||
if PROTOCOL == "RPC":
|
||||
self._api = RPC(self._rpc_port, self._image_name)
|
||||
@ -58,15 +59,24 @@ class WakuNode:
|
||||
"nat": f"extip:{self._ext_ip}",
|
||||
"peer-exchange": "true",
|
||||
"discv5-discovery": "true",
|
||||
# "cluster-id": "2",
|
||||
}
|
||||
|
||||
if "go-waku" in self._docker_manager.image:
|
||||
if self.is_gowaku():
|
||||
go_waku_args = {
|
||||
"min-relay-peers-to-publish": "1",
|
||||
"legacy-filter": "false",
|
||||
"log-level": "DEBUG",
|
||||
}
|
||||
default_args.update(go_waku_args)
|
||||
elif self.is_nwaku():
|
||||
nwaku_args = {
|
||||
"metrics-server": "true",
|
||||
"metrics-server-address": "0.0.0.0",
|
||||
"metrics-server-port": self._metrics_port,
|
||||
"metrics-logging": "true",
|
||||
}
|
||||
default_args.update(nwaku_args)
|
||||
|
||||
for key, value in kwargs.items():
|
||||
key = key.replace("_", "-")
|
||||
@ -89,6 +99,7 @@ class WakuNode:
|
||||
if self._container:
|
||||
logger.debug(f"Stopping container with id {self._container.short_id}")
|
||||
self._container.stop()
|
||||
self._container = None
|
||||
logger.debug("Container stopped.")
|
||||
|
||||
def restart(self):
|
||||
@ -169,6 +180,14 @@ class WakuNode:
|
||||
def get_filter_messages(self, content_topic):
|
||||
return self._api.get_filter_messages(content_topic)
|
||||
|
||||
def get_metrics(self):
|
||||
if self.is_nwaku():
|
||||
metrics = requests.get(f"http://localhost:{self._metrics_port}/metrics")
|
||||
metrics.raise_for_status()
|
||||
return metrics.content.decode("utf-8")
|
||||
else:
|
||||
pytest.skip(f"This method doesn't exist for node {self.type()}")
|
||||
|
||||
@property
|
||||
def image(self):
|
||||
return self._image_name
|
||||
|
||||
@ -22,20 +22,14 @@ class StepsFilter:
|
||||
test_payload = "Filter works!!"
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def setup_relay_node(self, request):
|
||||
def setup_main_relay_node(self):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
|
||||
start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY}
|
||||
if self.node1.is_gowaku():
|
||||
start_args["min_relay_peers_to_publish"] = "0"
|
||||
self.node1.start(**start_args)
|
||||
self.enr_uri = self.node1.get_enr_uri()
|
||||
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
|
||||
self.relay_node_start(NODE_1)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def setup_main_filter_node(self, request):
|
||||
def setup_main_filter_node(self):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
|
||||
self.node2.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
||||
self.main_nodes = [self.node2]
|
||||
self.optional_nodes = []
|
||||
@ -45,18 +39,6 @@ class StepsFilter:
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def setup_optional_filter_nodes(self, request):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
if ADDITIONAL_NODES:
|
||||
nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")]
|
||||
else:
|
||||
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
|
||||
for index, node in enumerate(nodes):
|
||||
node = WakuNode(node, f"additional_node{index}_{request.cls.test_id}")
|
||||
node.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
||||
self.optional_nodes.append(node)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@retry(stop=stop_after_delay(20), wait=wait_fixed(1), reraise=True)
|
||||
def filter_warm_up(self):
|
||||
@ -68,6 +50,29 @@ class StepsFilter:
|
||||
else:
|
||||
raise TimeoutError(f"WARM UP FAILED WITH: {ex}")
|
||||
|
||||
def relay_node_start(self, node):
|
||||
self.node1 = WakuNode(node, f"node1_{self.test_id}")
|
||||
start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY}
|
||||
if self.node1.is_gowaku():
|
||||
start_args["min_relay_peers_to_publish"] = "0"
|
||||
self.node1.start(**start_args)
|
||||
self.enr_uri = self.node1.get_enr_uri()
|
||||
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
|
||||
|
||||
def setup_optional_filter_nodes(self, node_list=ADDITIONAL_NODES):
|
||||
try:
|
||||
self.optional_nodes
|
||||
except AttributeError:
|
||||
self.optional_nodes = []
|
||||
if node_list:
|
||||
nodes = [node.strip() for node in node_list.split(",") if node]
|
||||
else:
|
||||
pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test")
|
||||
for index, node in enumerate(nodes):
|
||||
node = WakuNode(node, f"additional_node{index + 1}_{self.test_id}")
|
||||
node.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
||||
self.optional_nodes.append(node)
|
||||
|
||||
@allure.step
|
||||
def check_published_message_reaches_filter_peer(
|
||||
self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None
|
||||
@ -92,9 +97,9 @@ class StepsFilter:
|
||||
waku_message.assert_received_message(message)
|
||||
|
||||
@allure.step
|
||||
def check_publish_without_filter_subscription(self, message=None, pubsub_topic=None):
|
||||
def check_publish_without_filter_subscription(self, message=None, pubsub_topic=None, peer_list=None):
|
||||
try:
|
||||
self.check_published_message_reaches_filter_peer(message=message, pubsub_topic=pubsub_topic)
|
||||
self.check_published_message_reaches_filter_peer(message=message, pubsub_topic=pubsub_topic, peer_list=peer_list)
|
||||
raise AssertionError("Publish with no subscription worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||
@ -112,6 +117,13 @@ class StepsFilter:
|
||||
assert filter_sub_response["requestId"] == request_id
|
||||
assert filter_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed
|
||||
|
||||
def subscribe_optional_filter_nodes(self, content_topic_list, pubsub_topic=None):
|
||||
if pubsub_topic is None:
|
||||
pubsub_topic = self.test_pubsub_topic
|
||||
for node in self.optional_nodes:
|
||||
request_id = str(uuid4())
|
||||
self.create_filter_subscription({"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic}, node=node)
|
||||
|
||||
@allure.step
|
||||
def create_filter_subscription(self, subscription, node=None):
|
||||
if node is None:
|
||||
|
||||
33
src/steps/metrics.py
Normal file
33
src/steps/metrics.py
Normal file
@ -0,0 +1,33 @@
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import allure
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class StepsMetrics:
|
||||
@allure.step
|
||||
def check_metric(self, node, metric_name, expected_value):
|
||||
logger.debug(f"Checking metric: {metric_name} has {expected_value}")
|
||||
response = node.get_metrics()
|
||||
lines = response.split("\n")
|
||||
actual_value = None
|
||||
for line in lines:
|
||||
if line.startswith(metric_name):
|
||||
parts = line.split(" ")
|
||||
if len(parts) >= 2:
|
||||
actual_value = float(parts[1])
|
||||
break
|
||||
if actual_value is None:
|
||||
raise AttributeError(f"Metric '{metric_name}' not found")
|
||||
logger.debug(f"Found metric: {metric_name} with value {actual_value}")
|
||||
assert actual_value == expected_value, f"Expected value for '{metric_name}' is {expected_value}, but got {actual_value}"
|
||||
|
||||
@allure.step
|
||||
def wait_for_metric(self, node, metric_name, expected_value, timeout_duration=90):
|
||||
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(1), reraise=True)
|
||||
def check_metric_with_retry():
|
||||
self.check_metric(node, metric_name, expected_value)
|
||||
|
||||
check_metric_with_retry()
|
||||
@ -37,7 +37,7 @@ class StepsRelay:
|
||||
else:
|
||||
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
|
||||
for index, node in enumerate(nodes):
|
||||
node = WakuNode(node, f"node{index}_{request.cls.test_id}")
|
||||
node = WakuNode(node, f"additional_node{index}_{request.cls.test_id}")
|
||||
node.start(relay="true", discv5_bootstrap_node=self.enr_uri)
|
||||
self.optional_nodes.append(node)
|
||||
|
||||
|
||||
@ -67,12 +67,11 @@ INVALID_CONTENT_TOPICS = [
|
||||
|
||||
VALID_PUBSUB_TOPICS = [
|
||||
DEFAULT_PUBSUB_TOPIC,
|
||||
"/waku/2/rs/18/1",
|
||||
"/test/2/rs/18/1",
|
||||
"/waku/3/rs/18/1",
|
||||
"/waku/2/test/18/1",
|
||||
"/waku/2/rs/66/1",
|
||||
"/waku/2/rs/18/50",
|
||||
"/waku/2/rs/0/1",
|
||||
"/test/2/rs/0/1",
|
||||
"/waku/3/rs/0/1",
|
||||
"/waku/2/test/0/1",
|
||||
"/waku/2/rs/0/50",
|
||||
"/waku/18/50",
|
||||
"test",
|
||||
]
|
||||
|
||||
40
tests/filter/test_idle_subscriptions.py
Normal file
40
tests/filter/test_idle_subscriptions.py
Normal file
@ -0,0 +1,40 @@
|
||||
import pytest
|
||||
from src.libs.common import delay
|
||||
from src.env_vars import DEFAULT_NWAKU
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.filter import StepsFilter
|
||||
from src.steps.metrics import StepsMetrics
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TestIdleSubscriptions(StepsFilter, StepsMetrics):
|
||||
@pytest.mark.timeout(60 * 10)
|
||||
def test_idle_filter_subscriptions_for_more_than_5_nodes(self):
|
||||
self.relay_node_start(DEFAULT_NWAKU)
|
||||
filter_node_list = f"{DEFAULT_NWAKU}," * 6
|
||||
self.setup_optional_filter_nodes(filter_node_list)
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
self.subscribe_optional_filter_nodes([self.test_content_topic])
|
||||
self.check_published_message_reaches_filter_peer(peer_list=self.optional_nodes)
|
||||
self.wait_for_metric(self.node1, "waku_filter_subscriptions", 6.0)
|
||||
delay(60 * 5) # not sure how many seconds to put here but I hardcoded 5 minutes to be sure
|
||||
# after some idle time nodes should be disconnected and we should see max 5 connections
|
||||
self.wait_for_metric(
|
||||
self.node1, "waku_filter_subscriptions", 5.0
|
||||
) # test fails now because even after 5 minutes the number of nodes will remain at 6
|
||||
|
||||
@pytest.mark.timeout(60 * 10)
|
||||
def test_idle_filter_subscriptions_after_node_disconnection(self):
|
||||
self.relay_node_start(DEFAULT_NWAKU)
|
||||
self.setup_optional_filter_nodes(DEFAULT_NWAKU)
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
self.subscribe_optional_filter_nodes([self.test_content_topic])
|
||||
self.check_published_message_reaches_filter_peer(peer_list=self.optional_nodes)
|
||||
self.wait_for_metric(self.node1, "waku_filter_subscriptions", 1.0)
|
||||
self.optional_nodes[0].stop()
|
||||
delay(60 * 5) # not sure how many seconds to put here but I hardcoded 5 minutes to be sure
|
||||
# after some idle time the stopped node should be disconnected and we should see 0 connections
|
||||
self.wait_for_metric(
|
||||
self.node1, "waku_filter_subscriptions", 0.0
|
||||
) # test fails now because even after 5 minutes the number of nodes will remain at 1
|
||||
21
tests/filter/test_multiple_nodes.py
Normal file
21
tests/filter/test_multiple_nodes.py
Normal file
@ -0,0 +1,21 @@
|
||||
import pytest
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.filter import StepsFilter
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node")
|
||||
class TestFilterMultipleNodes(StepsFilter):
|
||||
def test_all_nodes_subscribed_to_the_topic(self):
|
||||
self.setup_optional_filter_nodes()
|
||||
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||
self.subscribe_optional_filter_nodes([self.test_content_topic])
|
||||
self.check_published_message_reaches_filter_peer()
|
||||
|
||||
def test_optional_nodes_not_subscribed_to_same_topic(self):
|
||||
self.setup_optional_filter_nodes()
|
||||
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||
self.subscribe_optional_filter_nodes([self.second_content_topic])
|
||||
self.check_published_message_reaches_filter_peer(peer_list=self.main_nodes)
|
||||
self.check_publish_without_filter_subscription(peer_list=self.optional_nodes)
|
||||
@ -6,7 +6,7 @@ from src.steps.filter import StepsFilter
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node")
|
||||
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node")
|
||||
class TestFilterSubscribeUpdate(StepsFilter):
|
||||
def test_filter_subscribe_to_single_topics(self):
|
||||
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||
|
||||
@ -6,7 +6,7 @@ from src.steps.filter import StepsFilter
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node")
|
||||
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node")
|
||||
class TestFilterSubscribeCreate(StepsFilter):
|
||||
def test_filter_update_subscription_add_a_new_content_topic(self):
|
||||
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
|
||||
|
||||
@ -3,7 +3,7 @@ from src.test_data import SAMPLE_INPUTS
|
||||
from src.steps.filter import StepsFilter
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node", "subscribe_main_nodes")
|
||||
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "subscribe_main_nodes")
|
||||
class TestFilterUnSubscribe(StepsFilter):
|
||||
def test_filter_unsubscribe_from_single_content_topic(self):
|
||||
self.check_published_message_reaches_filter_peer()
|
||||
|
||||
@ -4,7 +4,7 @@ from src.steps.filter import StepsFilter
|
||||
from random import choice
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node", "filter_warm_up")
|
||||
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "filter_warm_up")
|
||||
class TestFilterUnSubscribeAll(StepsFilter):
|
||||
def test_filter_unsubscribe_all_from_few_content_topics(self):
|
||||
content_topics = [input["value"] for input in SAMPLE_INPUTS[:5]]
|
||||
|
||||
@ -3,7 +3,7 @@ from src.steps.relay import StepsRelay
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes")
|
||||
class TestMultipleNodes(StepsRelay):
|
||||
class TestRelayMultipleNodes(StepsRelay):
|
||||
def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
|
||||
self.check_published_message_reaches_relay_peer()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user