mirror of
https://github.com/waku-org/waku-interop-tests.git
synced 2025-01-14 17:34:44 +00:00
multiple node tests (#8)
* mulitple node tests * fix filter get message * adjustments for gowaku as filter * small adjustments * small adjustments * adjustments after CI runs * prepare for PR * prepare for PR * prepare for PR * address review comments * use cluster id 0 as suggested by Prem * small adjustments after CI run * small adjustments after CI run * small adjustments after CI run * small adjustments after CI run
This commit is contained in:
parent
26718c38a4
commit
d7be7e9504
10
.github/workflows/test.yml
vendored
10
.github/workflows/test.yml
vendored
@ -1,5 +1,9 @@
|
|||||||
name: Interop Tests
|
name: Interop Tests
|
||||||
|
|
||||||
|
concurrency:
|
||||||
|
group: Interop-tests
|
||||||
|
cancel-in-progress: false
|
||||||
|
|
||||||
on:
|
on:
|
||||||
schedule:
|
schedule:
|
||||||
- cron: '0 3 * * *'
|
- cron: '0 3 * * *'
|
||||||
@ -12,17 +16,17 @@ on:
|
|||||||
required: true
|
required: true
|
||||||
description: "Node that usually publishes messages. Used for all tests"
|
description: "Node that usually publishes messages. Used for all tests"
|
||||||
type: string
|
type: string
|
||||||
default: "wakuorg/nwaku:latest"
|
default: "harbor.status.im/wakuorg/go-waku:latest"
|
||||||
node2:
|
node2:
|
||||||
required: true
|
required: true
|
||||||
description: "Node that usually queries for published messages. Used for all tests"
|
description: "Node that usually queries for published messages. Used for all tests"
|
||||||
type: string
|
type: string
|
||||||
default: "wakuorg/go-waku:latest"
|
default: "harbor.status.im/wakuorg/nwaku:latest"
|
||||||
additional_nodes:
|
additional_nodes:
|
||||||
required: false
|
required: false
|
||||||
description: "Additional optional nodes used in e2e tests, separated by ,"
|
description: "Additional optional nodes used in e2e tests, separated by ,"
|
||||||
type: string
|
type: string
|
||||||
default: "wakuorg/nwaku:latest,wakuorg/go-waku:latest"
|
default: "harbor.status.im/wakuorg/nwaku:latest,harbor.status.im/wakuorg/go-waku:latest,harbor.status.im/wakuorg/nwaku:latest"
|
||||||
protocol:
|
protocol:
|
||||||
description: "Protocol used to comunicate inside the network"
|
description: "Protocol used to comunicate inside the network"
|
||||||
required: true
|
required: true
|
||||||
|
@ -14,9 +14,11 @@ def get_env_var(var_name, default=None):
|
|||||||
|
|
||||||
|
|
||||||
# Configuration constants. Need to be upercase to appear in reports
|
# Configuration constants. Need to be upercase to appear in reports
|
||||||
NODE_1 = get_env_var("NODE_1", "wakuorg/go-waku:latest")
|
DEFAULT_NWAKU = "harbor.status.im/wakuorg/nwaku:latest"
|
||||||
NODE_2 = get_env_var("NODE_2", "wakuorg/nwaku:latest")
|
DEFAULT_GOWAKU = "harbor.status.im/wakuorg/go-waku:latest"
|
||||||
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", "wakuorg/nwaku:latest,wakuorg/go-waku:latest")
|
NODE_1 = get_env_var("NODE_1", DEFAULT_GOWAKU)
|
||||||
|
NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU)
|
||||||
|
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_GOWAKU},{DEFAULT_NWAKU}")
|
||||||
# more nodes need to follow the NODE_X pattern
|
# more nodes need to follow the NODE_X pattern
|
||||||
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")
|
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")
|
||||||
NETWORK_NAME = get_env_var("NETWORK_NAME", "waku")
|
NETWORK_NAME = get_env_var("NETWORK_NAME", "waku")
|
||||||
|
@ -8,7 +8,7 @@ logger = get_custom_logger(__name__)
|
|||||||
|
|
||||||
class BaseClient(ABC):
|
class BaseClient(ABC):
|
||||||
def make_request(self, method, url, headers=None, data=None):
|
def make_request(self, method, url, headers=None, data=None):
|
||||||
logger.debug(f"{method.upper()} call: {url} with payload: {data}")
|
logger.info(f"{method.upper()} call: {url} with payload: {data}")
|
||||||
response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT)
|
response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT)
|
||||||
try:
|
try:
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
@ -52,6 +52,10 @@ class REST(BaseClient):
|
|||||||
ping_subscriptions_response = self.rest_call("get", f"filter/v2/subscriptions/{quote(request_id, safe='')}")
|
ping_subscriptions_response = self.rest_call("get", f"filter/v2/subscriptions/{quote(request_id, safe='')}")
|
||||||
return ping_subscriptions_response.json()
|
return ping_subscriptions_response.json()
|
||||||
|
|
||||||
def get_filter_messages(self, content_topic):
|
def get_filter_messages(self, content_topic, pubsub_topic=None):
|
||||||
get_messages_response = self.rest_call("get", f"filter/v2/messages/{quote(content_topic, safe='')}")
|
if pubsub_topic is not None:
|
||||||
|
endpoint = f"filter/v2/messages/{quote(pubsub_topic, safe='')}/{quote(content_topic, safe='')}"
|
||||||
|
else:
|
||||||
|
endpoint = f"filter/v2/messages/{quote(content_topic, safe='')}"
|
||||||
|
get_messages_response = self.rest_call("get", endpoint)
|
||||||
return get_messages_response.json()
|
return get_messages_response.json()
|
||||||
|
@ -59,7 +59,7 @@ class DockerManager:
|
|||||||
for chunk in container.logs(stream=True):
|
for chunk in container.logs(stream=True):
|
||||||
log_file.write(chunk)
|
log_file.write(chunk)
|
||||||
|
|
||||||
def generate_ports(self, base_port=None, count=5):
|
def generate_ports(self, base_port=None, count=6):
|
||||||
if base_port is None:
|
if base_port is None:
|
||||||
base_port = random.randint(1024, 65535 - count)
|
base_port = random.randint(1024, 65535 - count)
|
||||||
ports = [base_port + i for i in range(count)]
|
ports = [base_port + i for i in range(count)]
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import requests
|
||||||
from src.libs.common import delay
|
from src.libs.common import delay
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
from tenacity import retry, stop_after_delay, wait_fixed
|
from tenacity import retry, stop_after_delay, wait_fixed
|
||||||
from src.node.api_clients.rpc import RPC
|
from src.node.api_clients.rpc import RPC
|
||||||
from src.node.api_clients.rest import REST
|
from src.node.api_clients.rest import REST
|
||||||
from src.node.docker_mananger import DockerManager
|
from src.node.docker_mananger import DockerManager
|
||||||
from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUB_TOPIC, PROTOCOL
|
from src.env_vars import DOCKER_LOG_DIR, PROTOCOL
|
||||||
from src.data_storage import DS
|
from src.data_storage import DS
|
||||||
|
|
||||||
logger = get_custom_logger(__name__)
|
logger = get_custom_logger(__name__)
|
||||||
@ -29,8 +29,9 @@ class WakuNode:
|
|||||||
self._ports = self._docker_manager.generate_ports()
|
self._ports = self._docker_manager.generate_ports()
|
||||||
self._rest_port = self._ports[0]
|
self._rest_port = self._ports[0]
|
||||||
self._rpc_port = self._ports[1]
|
self._rpc_port = self._ports[1]
|
||||||
self._websocket_port = self._ports[3]
|
|
||||||
self._tcp_port = self._ports[2]
|
self._tcp_port = self._ports[2]
|
||||||
|
self._websocket_port = self._ports[3]
|
||||||
|
self._metrics_port = self._ports[5]
|
||||||
|
|
||||||
if PROTOCOL == "RPC":
|
if PROTOCOL == "RPC":
|
||||||
self._api = RPC(self._rpc_port, self._image_name)
|
self._api = RPC(self._rpc_port, self._image_name)
|
||||||
@ -58,15 +59,26 @@ class WakuNode:
|
|||||||
"nat": f"extip:{self._ext_ip}",
|
"nat": f"extip:{self._ext_ip}",
|
||||||
"peer-exchange": "true",
|
"peer-exchange": "true",
|
||||||
"discv5-discovery": "true",
|
"discv5-discovery": "true",
|
||||||
|
"cluster-id": "0",
|
||||||
}
|
}
|
||||||
|
|
||||||
if "go-waku" in self._docker_manager.image:
|
if self.is_gowaku():
|
||||||
go_waku_args = {
|
go_waku_args = {
|
||||||
"min-relay-peers-to-publish": "1",
|
"min-relay-peers-to-publish": "1",
|
||||||
"legacy-filter": "false",
|
"legacy-filter": "false",
|
||||||
"log-level": "DEBUG",
|
"log-level": "DEBUG",
|
||||||
}
|
}
|
||||||
default_args.update(go_waku_args)
|
default_args.update(go_waku_args)
|
||||||
|
elif self.is_nwaku():
|
||||||
|
nwaku_args = {
|
||||||
|
"metrics-server": "true",
|
||||||
|
"metrics-server-address": "0.0.0.0",
|
||||||
|
"metrics-server-port": self._metrics_port,
|
||||||
|
"metrics-logging": "true",
|
||||||
|
}
|
||||||
|
default_args.update(nwaku_args)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
|
|
||||||
for key, value in kwargs.items():
|
for key, value in kwargs.items():
|
||||||
key = key.replace("_", "-")
|
key = key.replace("_", "-")
|
||||||
@ -89,6 +101,7 @@ class WakuNode:
|
|||||||
if self._container:
|
if self._container:
|
||||||
logger.debug(f"Stopping container with id {self._container.short_id}")
|
logger.debug(f"Stopping container with id {self._container.short_id}")
|
||||||
self._container.stop()
|
self._container.stop()
|
||||||
|
self._container = None
|
||||||
logger.debug("Container stopped.")
|
logger.debug("Container stopped.")
|
||||||
|
|
||||||
def restart(self):
|
def restart(self):
|
||||||
@ -166,8 +179,16 @@ class WakuNode:
|
|||||||
else:
|
else:
|
||||||
return self._api.ping_filter_subscriptions(request_id)
|
return self._api.ping_filter_subscriptions(request_id)
|
||||||
|
|
||||||
def get_filter_messages(self, content_topic):
|
def get_filter_messages(self, content_topic, pubsub_topic=None):
|
||||||
return self._api.get_filter_messages(content_topic)
|
return self._api.get_filter_messages(content_topic, pubsub_topic)
|
||||||
|
|
||||||
|
def get_metrics(self):
|
||||||
|
if self.is_nwaku():
|
||||||
|
metrics = requests.get(f"http://localhost:{self._metrics_port}/metrics")
|
||||||
|
metrics.raise_for_status()
|
||||||
|
return metrics.content.decode("utf-8")
|
||||||
|
else:
|
||||||
|
pytest.skip(f"This method doesn't exist for node {self.type()}")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def image(self):
|
def image(self):
|
||||||
|
@ -17,46 +17,34 @@ logger = get_custom_logger(__name__)
|
|||||||
class StepsFilter:
|
class StepsFilter:
|
||||||
test_pubsub_topic = VALID_PUBSUB_TOPICS[1]
|
test_pubsub_topic = VALID_PUBSUB_TOPICS[1]
|
||||||
second_pubsub_topic = VALID_PUBSUB_TOPICS[2]
|
second_pubsub_topic = VALID_PUBSUB_TOPICS[2]
|
||||||
|
another_cluster_pubsub_topic = "/waku/2/rs/2/2"
|
||||||
test_content_topic = "/test/1/waku-filter/proto"
|
test_content_topic = "/test/1/waku-filter/proto"
|
||||||
second_content_topic = "/test/2/waku-filter/proto"
|
second_content_topic = "/test/2/waku-filter/proto"
|
||||||
test_payload = "Filter works!!"
|
test_payload = "Filter works!!"
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def setup_relay_node(self, request):
|
def filter_setup(self):
|
||||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
|
self.main_nodes = []
|
||||||
start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY}
|
self.optional_nodes = []
|
||||||
if self.node1.is_gowaku():
|
|
||||||
start_args["min_relay_peers_to_publish"] = "0"
|
|
||||||
self.node1.start(**start_args)
|
|
||||||
self.enr_uri = self.node1.get_enr_uri()
|
|
||||||
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def setup_main_filter_node(self, request):
|
def setup_main_relay_node(self):
|
||||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
|
self.relay_node_start(NODE_1)
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def setup_main_filter_node(self):
|
||||||
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
|
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
|
||||||
self.node2.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
self.node2.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
||||||
self.main_nodes = [self.node2]
|
self.main_nodes.append(self.node2)
|
||||||
self.optional_nodes = []
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def subscribe_main_nodes(self):
|
def subscribe_main_nodes(self):
|
||||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
|
||||||
def setup_optional_filter_nodes(self, request):
|
|
||||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
|
||||||
if ADDITIONAL_NODES:
|
|
||||||
nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")]
|
|
||||||
else:
|
|
||||||
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
|
|
||||||
for index, node in enumerate(nodes):
|
|
||||||
node = WakuNode(node, f"additional_node{index}_{request.cls.test_id}")
|
|
||||||
node.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
|
||||||
self.optional_nodes.append(node)
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
@retry(stop=stop_after_delay(20), wait=wait_fixed(1), reraise=True)
|
@retry(stop=stop_after_delay(20), wait=wait_fixed(1), reraise=True)
|
||||||
def filter_warm_up(self):
|
def filter_warm_up(self):
|
||||||
@ -68,6 +56,25 @@ class StepsFilter:
|
|||||||
else:
|
else:
|
||||||
raise TimeoutError(f"WARM UP FAILED WITH: {ex}")
|
raise TimeoutError(f"WARM UP FAILED WITH: {ex}")
|
||||||
|
|
||||||
|
def relay_node_start(self, node):
|
||||||
|
self.node1 = WakuNode(node, f"node1_{self.test_id}")
|
||||||
|
start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY}
|
||||||
|
if self.node1.is_gowaku():
|
||||||
|
start_args["min_relay_peers_to_publish"] = "0"
|
||||||
|
self.node1.start(**start_args)
|
||||||
|
self.enr_uri = self.node1.get_enr_uri()
|
||||||
|
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
|
||||||
|
|
||||||
|
def setup_optional_filter_nodes(self, node_list=ADDITIONAL_NODES):
|
||||||
|
if node_list:
|
||||||
|
nodes = [node.strip() for node in node_list.split(",") if node]
|
||||||
|
else:
|
||||||
|
pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test")
|
||||||
|
for index, node in enumerate(nodes):
|
||||||
|
node = WakuNode(node, f"additional_node{index + 1}_{self.test_id}")
|
||||||
|
node.start(relay="false", filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
|
||||||
|
self.optional_nodes.append(node)
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def check_published_message_reaches_filter_peer(
|
def check_published_message_reaches_filter_peer(
|
||||||
self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None
|
self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None
|
||||||
@ -85,33 +92,47 @@ class StepsFilter:
|
|||||||
delay(message_propagation_delay)
|
delay(message_propagation_delay)
|
||||||
for index, peer in enumerate(peer_list):
|
for index, peer in enumerate(peer_list):
|
||||||
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
|
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
|
||||||
get_messages_response = self.get_filter_messages(message["contentTopic"], node=peer)
|
get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topics=pubsub_topic, node=peer)
|
||||||
assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages"
|
assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages"
|
||||||
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
||||||
waku_message = WakuMessage(get_messages_response)
|
waku_message = WakuMessage(get_messages_response)
|
||||||
waku_message.assert_received_message(message)
|
waku_message.assert_received_message(message)
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def check_publish_without_filter_subscription(self, message=None, pubsub_topic=None):
|
def check_publish_without_filter_subscription(self, message=None, pubsub_topic=None, peer_list=None):
|
||||||
try:
|
try:
|
||||||
self.check_published_message_reaches_filter_peer(message=message, pubsub_topic=pubsub_topic)
|
self.check_published_message_reaches_filter_peer(message=message, pubsub_topic=pubsub_topic, peer_list=peer_list)
|
||||||
raise AssertionError("Publish with no subscription worked!!!")
|
raise AssertionError("Publish with no subscription worked!!!")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
assert "Bad Request" in str(ex) or "Not Found" in str(ex) or "couldn't find any messages" in str(ex)
|
||||||
|
|
||||||
@retry(stop=stop_after_delay(30), wait=wait_fixed(1), reraise=True)
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None):
|
def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None):
|
||||||
if pubsub_topic is None:
|
if pubsub_topic is None:
|
||||||
pubsub_topic = self.test_pubsub_topic
|
pubsub_topic = self.test_pubsub_topic
|
||||||
self.node1.set_relay_subscriptions([pubsub_topic])
|
self.node1.set_relay_subscriptions([pubsub_topic])
|
||||||
request_id = str(uuid4())
|
request_id = str(uuid4())
|
||||||
filter_sub_response = self.create_filter_subscription(
|
filter_sub_response = self.create_filter_subscription_with_retry(
|
||||||
{"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic}
|
{"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic}
|
||||||
)
|
)
|
||||||
assert filter_sub_response["requestId"] == request_id
|
assert filter_sub_response["requestId"] == request_id
|
||||||
assert filter_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed
|
assert filter_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed
|
||||||
|
|
||||||
|
@allure.step
|
||||||
|
def subscribe_optional_filter_nodes(self, content_topic_list, pubsub_topic=None):
|
||||||
|
if pubsub_topic is None:
|
||||||
|
pubsub_topic = self.test_pubsub_topic
|
||||||
|
for node in self.optional_nodes:
|
||||||
|
request_id = str(uuid4())
|
||||||
|
self.create_filter_subscription_with_retry(
|
||||||
|
{"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic}, node=node
|
||||||
|
)
|
||||||
|
|
||||||
|
@retry(stop=stop_after_delay(60), wait=wait_fixed(1), reraise=True)
|
||||||
|
@allure.step
|
||||||
|
def create_filter_subscription_with_retry(self, subscription, node=None):
|
||||||
|
return self.create_filter_subscription(subscription, node)
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def create_filter_subscription(self, subscription, node=None):
|
def create_filter_subscription(self, subscription, node=None):
|
||||||
if node is None:
|
if node is None:
|
||||||
@ -122,15 +143,23 @@ class StepsFilter:
|
|||||||
def update_filter_subscription(self, subscription, node=None):
|
def update_filter_subscription(self, subscription, node=None):
|
||||||
if node is None:
|
if node is None:
|
||||||
node = self.node2
|
node = self.node2
|
||||||
|
if node.is_gowaku():
|
||||||
|
pytest.skip(f"This method doesn't exist for node {node.type()}")
|
||||||
return node.update_filter_subscriptions(subscription)
|
return node.update_filter_subscriptions(subscription)
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def delete_filter_subscription(self, subscription, node=None):
|
def delete_filter_subscription(self, subscription, status=None, node=None):
|
||||||
if node is None:
|
if node is None:
|
||||||
node = self.node2
|
node = self.node2
|
||||||
delete_sub_response = node.delete_filter_subscriptions(subscription)
|
delete_sub_response = node.delete_filter_subscriptions(subscription)
|
||||||
assert delete_sub_response["requestId"] == subscription["requestId"]
|
if node.is_gowaku() and "requestId" not in subscription:
|
||||||
assert delete_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed
|
assert delete_sub_response["requestId"] == ""
|
||||||
|
else:
|
||||||
|
assert delete_sub_response["requestId"] == subscription["requestId"]
|
||||||
|
if status is None:
|
||||||
|
assert delete_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed
|
||||||
|
else:
|
||||||
|
assert status in delete_sub_response["statusDesc"]
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def delete_all_filter_subscriptions(self, request_id, node=None):
|
def delete_all_filter_subscriptions(self, request_id, node=None):
|
||||||
@ -155,10 +184,15 @@ class StepsFilter:
|
|||||||
self.node1.set_relay_subscriptions(pubsub_topics)
|
self.node1.set_relay_subscriptions(pubsub_topics)
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def get_filter_messages(self, content_topic, node=None):
|
def get_filter_messages(self, content_topic, pubsub_topics=None, node=None):
|
||||||
if node is None:
|
if node is None:
|
||||||
node = self.node2
|
node = self.node2
|
||||||
return node.get_filter_messages(content_topic)
|
if node.is_gowaku():
|
||||||
|
return node.get_filter_messages(content_topic, pubsub_topics)
|
||||||
|
elif node.is_nwaku():
|
||||||
|
return node.get_filter_messages(content_topic)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def create_message(self, **kwargs):
|
def create_message(self, **kwargs):
|
||||||
|
33
src/steps/metrics.py
Normal file
33
src/steps/metrics.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
import allure
|
||||||
|
from tenacity import retry, stop_after_delay, wait_fixed
|
||||||
|
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class StepsMetrics:
|
||||||
|
@allure.step
|
||||||
|
def check_metric(self, node, metric_name, expected_value):
|
||||||
|
logger.debug(f"Checking metric: {metric_name} has {expected_value}")
|
||||||
|
response = node.get_metrics()
|
||||||
|
lines = response.split("\n")
|
||||||
|
actual_value = None
|
||||||
|
for line in lines:
|
||||||
|
if line.startswith(metric_name):
|
||||||
|
parts = line.split(" ")
|
||||||
|
if len(parts) >= 2:
|
||||||
|
actual_value = float(parts[1])
|
||||||
|
break
|
||||||
|
if actual_value is None:
|
||||||
|
raise AttributeError(f"Metric '{metric_name}' not found")
|
||||||
|
logger.debug(f"Found metric: {metric_name} with value {actual_value}")
|
||||||
|
assert actual_value == expected_value, f"Expected value for '{metric_name}' is {expected_value}, but got {actual_value}"
|
||||||
|
|
||||||
|
@allure.step
|
||||||
|
def wait_for_metric(self, node, metric_name, expected_value, timeout_duration=90):
|
||||||
|
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(1), reraise=True)
|
||||||
|
def check_metric_with_retry():
|
||||||
|
self.check_metric(node, metric_name, expected_value)
|
||||||
|
|
||||||
|
check_metric_with_retry()
|
@ -18,6 +18,12 @@ class StepsRelay:
|
|||||||
test_content_topic = "/test/1/waku-relay/proto"
|
test_content_topic = "/test/1/waku-relay/proto"
|
||||||
test_payload = "Relay works!!"
|
test_payload = "Relay works!!"
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
|
def relay_setup(self):
|
||||||
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
|
self.main_nodes = []
|
||||||
|
self.optional_nodes = []
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def setup_main_relay_nodes(self, request):
|
def setup_main_relay_nodes(self, request):
|
||||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||||
@ -26,8 +32,7 @@ class StepsRelay:
|
|||||||
self.enr_uri = self.node1.get_enr_uri()
|
self.enr_uri = self.node1.get_enr_uri()
|
||||||
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
|
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
|
||||||
self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri)
|
self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri)
|
||||||
self.main_nodes = [self.node1, self.node2]
|
self.main_nodes.extend([self.node1, self.node2])
|
||||||
self.optional_nodes = []
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def setup_optional_relay_nodes(self, request):
|
def setup_optional_relay_nodes(self, request):
|
||||||
@ -37,7 +42,7 @@ class StepsRelay:
|
|||||||
else:
|
else:
|
||||||
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
|
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
|
||||||
for index, node in enumerate(nodes):
|
for index, node in enumerate(nodes):
|
||||||
node = WakuNode(node, f"node{index}_{request.cls.test_id}")
|
node = WakuNode(node, f"additional_node{index}_{request.cls.test_id}")
|
||||||
node.start(relay="true", discv5_bootstrap_node=self.enr_uri)
|
node.start(relay="true", discv5_bootstrap_node=self.enr_uri)
|
||||||
self.optional_nodes.append(node)
|
self.optional_nodes.append(node)
|
||||||
|
|
||||||
|
@ -65,19 +65,9 @@ INVALID_CONTENT_TOPICS = [
|
|||||||
{"description": "A bool", "value": True},
|
{"description": "A bool", "value": True},
|
||||||
]
|
]
|
||||||
|
|
||||||
VALID_PUBSUB_TOPICS = [
|
VALID_PUBSUB_TOPICS = ["/waku/2/rs/0/1", "/waku/2/rs/0/0", "/waku/2/rs/0/9", "/waku/2/rs/0/25", "/waku/2/rs/0/1000", DEFAULT_PUBSUB_TOPIC]
|
||||||
DEFAULT_PUBSUB_TOPIC,
|
|
||||||
"/waku/2/rs/18/1",
|
|
||||||
"/test/2/rs/18/1",
|
|
||||||
"/waku/3/rs/18/1",
|
|
||||||
"/waku/2/test/18/1",
|
|
||||||
"/waku/2/rs/66/1",
|
|
||||||
"/waku/2/rs/18/50",
|
|
||||||
"/waku/18/50",
|
|
||||||
"test",
|
|
||||||
]
|
|
||||||
|
|
||||||
INVALID_PUBSUB_TOPICS = ["/test/2/rs/18/1", ("/waku/2/rs/18/1"), {"pubsub_topic": "/waku/3/rs/18/1"}, True, 12345678, [["/waku/2/rs/18/1"]]]
|
INVALID_PUBSUB_TOPICS = ["/test/2/rs/0/1", "/waku/3/rs/0/1", "/waku/2/test/0/1", "/waku/2/rs/0/b", "/waku/2/rs/0"]
|
||||||
|
|
||||||
|
|
||||||
SAMPLE_TIMESTAMPS = [
|
SAMPLE_TIMESTAMPS = [
|
||||||
|
39
tests/filter/test_idle_subscriptions.py
Normal file
39
tests/filter/test_idle_subscriptions.py
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import pytest
|
||||||
|
from src.libs.common import delay
|
||||||
|
from src.env_vars import DEFAULT_NWAKU
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
from src.steps.filter import StepsFilter
|
||||||
|
from src.steps.metrics import StepsMetrics
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="Skipping until https://github.com/waku-org/nwaku/issues/2293 is fixed")
|
||||||
|
class TestIdleSubscriptions(StepsFilter, StepsMetrics):
|
||||||
|
# tests will probably suffer minor adjustments after https://github.com/waku-org/nwaku/issues/2293 is fixed
|
||||||
|
|
||||||
|
@pytest.mark.timeout(60 * 10)
|
||||||
|
def test_idle_filter_subscriptions_for_more_than_5_nodes(self):
|
||||||
|
self.relay_node_start(DEFAULT_NWAKU)
|
||||||
|
filter_node_list = f"{DEFAULT_NWAKU}," * 6
|
||||||
|
self.setup_optional_filter_nodes(filter_node_list)
|
||||||
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||||
|
self.subscribe_optional_filter_nodes([self.test_content_topic])
|
||||||
|
self.check_published_message_reaches_filter_peer(peer_list=self.optional_nodes)
|
||||||
|
self.wait_for_metric(self.node1, "waku_filter_subscriptions", 6.0)
|
||||||
|
delay(60 * 5) # not sure how many seconds to put here so hardcoded 5 minutes to be sure
|
||||||
|
# after some idle time nodes should disconnect and we should see max 5 connections
|
||||||
|
self.wait_for_metric(self.node1, "waku_filter_subscriptions", 5.0)
|
||||||
|
|
||||||
|
@pytest.mark.timeout(60 * 10)
|
||||||
|
def test_idle_filter_subscriptions_after_node_disconnection(self):
|
||||||
|
self.relay_node_start(DEFAULT_NWAKU)
|
||||||
|
self.setup_optional_filter_nodes(DEFAULT_NWAKU)
|
||||||
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||||
|
self.subscribe_optional_filter_nodes([self.test_content_topic])
|
||||||
|
self.check_published_message_reaches_filter_peer(peer_list=self.optional_nodes)
|
||||||
|
self.wait_for_metric(self.node1, "waku_filter_subscriptions", 1.0)
|
||||||
|
self.optional_nodes[0].stop()
|
||||||
|
delay(60 * 5) # not sure how many seconds to put here so hardcoded 5 minutes to be sure
|
||||||
|
# after some idle time the stopped node should disconnect and we should see 0 connections
|
||||||
|
self.wait_for_metric(self.node1, "waku_filter_subscriptions", 0.0)
|
21
tests/filter/test_multiple_nodes.py
Normal file
21
tests/filter/test_multiple_nodes.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
import pytest
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
from src.steps.filter import StepsFilter
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node")
|
||||||
|
class TestFilterMultipleNodes(StepsFilter):
|
||||||
|
def test_all_nodes_subscribed_to_the_topic(self):
|
||||||
|
self.setup_optional_filter_nodes()
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
|
self.subscribe_optional_filter_nodes([self.test_content_topic])
|
||||||
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
|
||||||
|
def test_optional_nodes_not_subscribed_to_same_topic(self):
|
||||||
|
self.setup_optional_filter_nodes()
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
|
self.subscribe_optional_filter_nodes([self.second_content_topic])
|
||||||
|
self.check_published_message_reaches_filter_peer(peer_list=self.main_nodes)
|
||||||
|
self.check_publish_without_filter_subscription(peer_list=self.optional_nodes)
|
@ -6,13 +6,13 @@ from src.steps.filter import StepsFilter
|
|||||||
logger = get_custom_logger(__name__)
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node")
|
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node")
|
||||||
class TestFilterSubscribeUpdate(StepsFilter):
|
class TestFilterSubscribeCreate(StepsFilter):
|
||||||
def test_filter_subscribe_to_single_topics(self):
|
def test_filter_subscribe_to_single_topics(self):
|
||||||
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
self.check_published_message_reaches_filter_peer()
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
|
||||||
def test_filter_subscribe_to_multiple_pubsub_topic(self):
|
def test_filter_subscribe_to_multiple_pubsub_topic_from_same_cluster(self):
|
||||||
failed_pubsub_topics = []
|
failed_pubsub_topics = []
|
||||||
for pubsub_topic in VALID_PUBSUB_TOPICS:
|
for pubsub_topic in VALID_PUBSUB_TOPICS:
|
||||||
content_topic = pubsub_topic
|
content_topic = pubsub_topic
|
||||||
@ -26,6 +26,25 @@ class TestFilterSubscribeUpdate(StepsFilter):
|
|||||||
failed_pubsub_topics.append(pubsub_topic)
|
failed_pubsub_topics.append(pubsub_topic)
|
||||||
assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}"
|
assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}"
|
||||||
|
|
||||||
|
def test_filter_subscribe_to_pubsub_topic_from_another_cluster_id(self):
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.another_cluster_pubsub_topic)
|
||||||
|
self.check_published_message_reaches_filter_peer(pubsub_topic=self.another_cluster_pubsub_topic)
|
||||||
|
|
||||||
|
def test_filter_subscribe_to_pubsub_topics_from_multiple_clusters(self):
|
||||||
|
pubsub_topic_list = [self.test_pubsub_topic, self.another_cluster_pubsub_topic, self.second_pubsub_topic]
|
||||||
|
failed_pubsub_topics = []
|
||||||
|
for pubsub_topic in pubsub_topic_list:
|
||||||
|
content_topic = pubsub_topic
|
||||||
|
logger.debug(f"Running test with pubsub topic: {pubsub_topic}")
|
||||||
|
try:
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([content_topic], pubsub_topic)
|
||||||
|
message = self.create_message(contentTopic=content_topic)
|
||||||
|
self.check_published_message_reaches_filter_peer(message, pubsub_topic=pubsub_topic)
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error(f"PubsubTopic {pubsub_topic} failed: {str(ex)}")
|
||||||
|
failed_pubsub_topics.append(pubsub_topic)
|
||||||
|
assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}"
|
||||||
|
|
||||||
def test_filter_subscribe_to_30_content_topics_in_one_call(self):
|
def test_filter_subscribe_to_30_content_topics_in_one_call(self):
|
||||||
failed_content_topics = []
|
failed_content_topics = []
|
||||||
self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[:30]])
|
self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[:30]])
|
||||||
@ -76,7 +95,7 @@ class TestFilterSubscribeUpdate(StepsFilter):
|
|||||||
def test_filter_subscribe_with_no_pubsub_topic(self, subscribe_main_nodes):
|
def test_filter_subscribe_with_no_pubsub_topic(self, subscribe_main_nodes):
|
||||||
try:
|
try:
|
||||||
self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]})
|
self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]})
|
||||||
raise AssertionError("Subscribe with no pubusub topics worked!!!")
|
# raise AssertionError("Subscribe with no pubusub topics worked!!!") commented until https://github.com/waku-org/nwaku/issues/2315 is fixed
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex)
|
assert "Bad Request" in str(ex)
|
||||||
|
|
||||||
@ -90,7 +109,12 @@ class TestFilterSubscribeUpdate(StepsFilter):
|
|||||||
def test_filter_subscribe_with_no_content_topic(self, subscribe_main_nodes):
|
def test_filter_subscribe_with_no_content_topic(self, subscribe_main_nodes):
|
||||||
try:
|
try:
|
||||||
self.create_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic})
|
self.create_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic})
|
||||||
raise AssertionError("Subscribe with no content topics worked!!!")
|
if self.node2.is_nwaku():
|
||||||
|
raise AssertionError("Subscribe with extra field worked!!!")
|
||||||
|
elif self.node2.is_gowaku():
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex)
|
assert "Bad Request" in str(ex)
|
||||||
|
|
||||||
@ -124,6 +148,11 @@ class TestFilterSubscribeUpdate(StepsFilter):
|
|||||||
self.create_filter_subscription(
|
self.create_filter_subscription(
|
||||||
{"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"}
|
{"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"}
|
||||||
)
|
)
|
||||||
raise AssertionError("Subscribe with extra field worked!!!")
|
if self.node2.is_nwaku():
|
||||||
|
raise AssertionError("Subscribe with extra field worked!!!")
|
||||||
|
elif self.node2.is_gowaku():
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex)
|
assert "Bad Request" in str(ex)
|
||||||
|
@ -6,8 +6,8 @@ from src.steps.filter import StepsFilter
|
|||||||
logger = get_custom_logger(__name__)
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node")
|
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node")
|
||||||
class TestFilterSubscribeCreate(StepsFilter):
|
class TestFilterSubscribeUpdate(StepsFilter):
|
||||||
def test_filter_update_subscription_add_a_new_content_topic(self):
|
def test_filter_update_subscription_add_a_new_content_topic(self):
|
||||||
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
|
||||||
self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic})
|
self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic})
|
||||||
@ -62,8 +62,8 @@ class TestFilterSubscribeCreate(StepsFilter):
|
|||||||
|
|
||||||
def test_filter_update_subscription_with_no_pubsub_topic(self, subscribe_main_nodes):
|
def test_filter_update_subscription_with_no_pubsub_topic(self, subscribe_main_nodes):
|
||||||
try:
|
try:
|
||||||
self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]})
|
self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic]})
|
||||||
raise AssertionError("Subscribe with no pubusub topics worked!!!")
|
# raise AssertionError("Subscribe with no pubusub topics worked!!!") commented until https://github.com/waku-org/nwaku/issues/2315 is fixed
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex)
|
assert "Bad Request" in str(ex)
|
||||||
|
|
||||||
|
@ -3,7 +3,7 @@ from src.test_data import SAMPLE_INPUTS
|
|||||||
from src.steps.filter import StepsFilter
|
from src.steps.filter import StepsFilter
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node", "subscribe_main_nodes")
|
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "subscribe_main_nodes")
|
||||||
class TestFilterUnSubscribe(StepsFilter):
|
class TestFilterUnSubscribe(StepsFilter):
|
||||||
def test_filter_unsubscribe_from_single_content_topic(self):
|
def test_filter_unsubscribe_from_single_content_topic(self):
|
||||||
self.check_published_message_reaches_filter_peer()
|
self.check_published_message_reaches_filter_peer()
|
||||||
@ -38,15 +38,28 @@ class TestFilterUnSubscribe(StepsFilter):
|
|||||||
self.check_publish_without_filter_subscription(self.create_message(contentTopic=self.second_content_topic), self.second_pubsub_topic)
|
self.check_publish_without_filter_subscription(self.create_message(contentTopic=self.second_content_topic), self.second_pubsub_topic)
|
||||||
|
|
||||||
def test_filter_unsubscribe_from_non_existing_content_topic(self):
|
def test_filter_unsubscribe_from_non_existing_content_topic(self):
|
||||||
self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic})
|
try:
|
||||||
|
self.delete_filter_subscription(
|
||||||
|
{"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic},
|
||||||
|
status="can't unsubscribe" if self.node2.is_gowaku() else "",
|
||||||
|
)
|
||||||
|
except Exception as ex:
|
||||||
|
assert "Not Found" in str(ex) and "peer has no subscriptions" in str(ex)
|
||||||
self.check_published_message_reaches_filter_peer()
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
|
||||||
def test_filter_unsubscribe_from_non_existing_pubsub_topic(self):
|
def test_filter_unsubscribe_from_non_existing_pubsub_topic(self):
|
||||||
try:
|
try:
|
||||||
self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.test_pubsub_topic], "pubsubTopic": self.second_pubsub_topic})
|
self.delete_filter_subscription(
|
||||||
raise AssertionError("Unsubscribe with non existing pubsub topic worked!!!")
|
{"requestId": "1", "contentFilters": [self.test_pubsub_topic], "pubsubTopic": self.second_pubsub_topic}, status="can't unsubscribe"
|
||||||
|
)
|
||||||
|
if self.node2.is_nwaku():
|
||||||
|
raise AssertionError("Unsubscribe with non existing pubsub topic worked!!!")
|
||||||
|
elif self.node2.is_gowaku():
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Not Found" and "peer has no subscriptions" in str(ex)
|
assert "Not Found" in str(ex) and "peer has no subscriptions" in str(ex)
|
||||||
self.check_published_message_reaches_filter_peer()
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
|
||||||
def test_filter_unsubscribe_from_31_content_topics(self):
|
def test_filter_unsubscribe_from_31_content_topics(self):
|
||||||
@ -56,7 +69,7 @@ class TestFilterUnSubscribe(StepsFilter):
|
|||||||
)
|
)
|
||||||
raise AssertionError("Unsubscribe from more than 30 content topics worked!!!")
|
raise AssertionError("Unsubscribe from more than 30 content topics worked!!!")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Not Found" and "exceeds maximum content topics: 30" in str(ex)
|
assert "exceeds maximum content topics: 30" in str(ex)
|
||||||
|
|
||||||
def test_filter_unsubscribe_with_no_content_topic(self):
|
def test_filter_unsubscribe_with_no_content_topic(self):
|
||||||
try:
|
try:
|
||||||
@ -99,8 +112,15 @@ class TestFilterUnSubscribe(StepsFilter):
|
|||||||
|
|
||||||
def test_filter_unsubscribe_with_no_request_id(self):
|
def test_filter_unsubscribe_with_no_request_id(self):
|
||||||
try:
|
try:
|
||||||
self.delete_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
|
self.delete_filter_subscription(
|
||||||
raise AssertionError("Unsubscribe with no request id worked!!!")
|
{"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}, status="can't unsubscribe"
|
||||||
|
)
|
||||||
|
if self.node2.is_nwaku():
|
||||||
|
raise AssertionError("Unsubscribe with no request id worked!!!")
|
||||||
|
elif self.node2.is_gowaku():
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex)
|
assert "Bad Request" in str(ex)
|
||||||
|
|
||||||
@ -116,7 +136,12 @@ class TestFilterUnSubscribe(StepsFilter):
|
|||||||
self.delete_filter_subscription(
|
self.delete_filter_subscription(
|
||||||
{"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"}
|
{"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic, "extraField": "extraValue"}
|
||||||
)
|
)
|
||||||
raise AssertionError("Unsubscribe with extra field worked!!!")
|
if self.node2.is_nwaku():
|
||||||
|
raise AssertionError("Unsubscribe with extra field worked!!!")
|
||||||
|
elif self.node2.is_gowaku():
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex)
|
assert "Bad Request" in str(ex)
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ from src.steps.filter import StepsFilter
|
|||||||
from random import choice
|
from random import choice
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node", "filter_warm_up")
|
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node")
|
||||||
class TestFilterUnSubscribeAll(StepsFilter):
|
class TestFilterUnSubscribeAll(StepsFilter):
|
||||||
def test_filter_unsubscribe_all_from_few_content_topics(self):
|
def test_filter_unsubscribe_all_from_few_content_topics(self):
|
||||||
content_topics = [input["value"] for input in SAMPLE_INPUTS[:5]]
|
content_topics = [input["value"] for input in SAMPLE_INPUTS[:5]]
|
||||||
@ -45,14 +45,12 @@ class TestFilterUnSubscribeAll(StepsFilter):
|
|||||||
self.delete_all_filter_subscriptions({"requestId": "1"})
|
self.delete_all_filter_subscriptions({"requestId": "1"})
|
||||||
raise AssertionError("Unsubscribe all on peer without subscriptions worked!!!")
|
raise AssertionError("Unsubscribe all on peer without subscriptions worked!!!")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Not Found" and "peer has no subscriptions" in str(ex)
|
if self.node2.is_nwaku():
|
||||||
|
assert "Not Found" in str(ex) and "peer has no subscriptions" in str(ex)
|
||||||
def test_filter_unsubscribe_all_with_no_request_id(self, subscribe_main_nodes):
|
elif self.node2.is_gowaku():
|
||||||
try:
|
assert "subscription not found" in str(ex)
|
||||||
self.delete_all_filter_subscriptions({})
|
else:
|
||||||
raise AssertionError("Unsubscribe all with no request id worked!!!")
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
except Exception as ex:
|
|
||||||
assert "Bad Request" in str(ex)
|
|
||||||
|
|
||||||
def test_filter_unsubscribe_all_with_invalid_request_id(self, subscribe_main_nodes):
|
def test_filter_unsubscribe_all_with_invalid_request_id(self, subscribe_main_nodes):
|
||||||
try:
|
try:
|
||||||
@ -61,7 +59,7 @@ class TestFilterUnSubscribeAll(StepsFilter):
|
|||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex)
|
assert "Bad Request" in str(ex)
|
||||||
|
|
||||||
def test_filter_unsubscribe_all_with_extra_field(self):
|
def test_filter_unsubscribe_all_with_extra_field(self, subscribe_main_nodes):
|
||||||
try:
|
try:
|
||||||
self.delete_all_filter_subscriptions({"requestId": 1, "extraField": "extraValue"})
|
self.delete_all_filter_subscriptions({"requestId": 1, "extraField": "extraValue"})
|
||||||
raise AssertionError("Unsubscribe all with extra field worked!!!")
|
raise AssertionError("Unsubscribe all with extra field worked!!!")
|
||||||
|
@ -3,7 +3,7 @@ from src.steps.relay import StepsRelay
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes")
|
@pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes")
|
||||||
class TestMultipleNodes(StepsRelay):
|
class TestRelayMultipleNodes(StepsRelay):
|
||||||
def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
|
def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
|
||||||
self.check_published_message_reaches_relay_peer()
|
self.check_published_message_reaches_relay_peer()
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ class TestRelayPublish(StepsRelay):
|
|||||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||||
|
|
||||||
def test_publish_with_payload_less_than_one_mb(self):
|
def test_publish_with_payload_less_than_one_mb(self):
|
||||||
payload_length = 1024 * 1023
|
payload_length = 1024 * 700 # after encoding to base64 this be close to 1MB
|
||||||
logger.debug(f"Running test with payload length of {payload_length} bytes")
|
logger.debug(f"Running test with payload length of {payload_length} bytes")
|
||||||
message = self.create_message(payload=to_base64("a" * (payload_length)))
|
message = self.create_message(payload=to_base64("a" * (payload_length)))
|
||||||
self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2)
|
self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2)
|
||||||
@ -57,7 +57,7 @@ class TestRelayPublish(StepsRelay):
|
|||||||
self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2)
|
self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2)
|
||||||
raise AssertionError("Message with payload > 1MB was received")
|
raise AssertionError("Message with payload > 1MB was received")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "couldn't find any messages" in str(ex)
|
assert "couldn't find any messages" in str(ex) or "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||||
|
|
||||||
def test_publish_with_valid_content_topics(self):
|
def test_publish_with_valid_content_topics(self):
|
||||||
failed_content_topics = []
|
failed_content_topics = []
|
||||||
@ -187,7 +187,16 @@ class TestRelayPublish(StepsRelay):
|
|||||||
self.check_published_message_reaches_relay_peer(self.create_message(rateLimitProof=rate_limit_proof))
|
self.check_published_message_reaches_relay_peer(self.create_message(rateLimitProof=rate_limit_proof))
|
||||||
|
|
||||||
def test_publish_with_extra_field(self):
|
def test_publish_with_extra_field(self):
|
||||||
self.check_published_message_reaches_relay_peer(self.create_message(extraField="extraValue"))
|
try:
|
||||||
|
self.check_published_message_reaches_relay_peer(self.create_message(extraField="extraValue"))
|
||||||
|
if self.node1.is_nwaku():
|
||||||
|
raise AssertionError("Relay publish with extra field worked!!!")
|
||||||
|
elif self.node1.is_gowaku():
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
|
except Exception as ex:
|
||||||
|
assert "Bad Request" in str(ex)
|
||||||
|
|
||||||
def test_publish_and_retrieve_duplicate_message(self):
|
def test_publish_and_retrieve_duplicate_message(self):
|
||||||
message = self.create_message()
|
message = self.create_message()
|
||||||
@ -221,6 +230,7 @@ class TestRelayPublish(StepsRelay):
|
|||||||
self.check_published_message_reaches_relay_peer()
|
self.check_published_message_reaches_relay_peer()
|
||||||
self.node1.restart()
|
self.node1.restart()
|
||||||
self.node1.ensure_ready()
|
self.node1.ensure_ready()
|
||||||
|
delay(2)
|
||||||
self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
||||||
self.wait_for_published_message_to_reach_relay_peer()
|
self.wait_for_published_message_to_reach_relay_peer()
|
||||||
|
|
||||||
|
@ -76,7 +76,7 @@ class TestRelaySubscribe(StepsRelay):
|
|||||||
elif self.node1.is_gowaku():
|
elif self.node1.is_gowaku():
|
||||||
raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!")
|
raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!")
|
||||||
else:
|
else:
|
||||||
raise Exception("Not implemented")
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||||
self.check_published_message_reaches_relay_peer()
|
self.check_published_message_reaches_relay_peer()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user