Filter Subscribe (#6)

* Filter Subscribe

* set min_relay_peers_to_publish for filter tests

* address review comments
This commit is contained in:
Florin Barbu 2023-12-11 14:02:50 +02:00 committed by GitHub
parent f8db76ab20
commit 7f91bd7b95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 621 additions and 189 deletions

View File

@ -12,6 +12,13 @@ If this is a bug fix, please describe why the old behavior was problematic.
<!-- describe the new behavior --> <!-- describe the new behavior -->
## Diffs
<!--
Are there any diffs between implementations (nwaku vs gowaku)?
If yes they should be documented here: https://www.notion.so/Nwaku-vs-Gowaku-vs-Jswaku-diffs-b3e0e8f1e6cd4c6d9855b0c3c4634bc5
-->
## Notes ## Notes
<!-- Remove items that are not relevant --> <!-- Remove items that are not relevant -->

View File

@ -1,18 +0,0 @@
from dataclasses import dataclass, field
from marshmallow_dataclass import class_schema
from typing import Optional, Union
@dataclass
class MessageRpcResponse:
payload: str
contentTopic: str
version: Optional[int]
timestamp: Optional[int]
ephemeral: Optional[bool]
meta: Optional[str]
rateLimitProof: Optional[Union[dict, str]] = field(default_factory=dict)
rate_limit_proof: Optional[dict] = field(default_factory=dict)
message_rpc_response_schema = class_schema(MessageRpcResponse)()

View File

@ -28,17 +28,25 @@ class BaseClient(ABC):
pass pass
@abstractmethod @abstractmethod
def set_subscriptions(self, pubsub_topics): def set_relay_subscriptions(self, pubsub_topics):
pass pass
@abstractmethod @abstractmethod
def delete_subscriptions(self, pubsub_topics): def delete_relay_subscriptions(self, pubsub_topics):
pass pass
@abstractmethod @abstractmethod
def send_message(self, message, pubsub_topic): def send_relay_message(self, message, pubsub_topic):
pass pass
@abstractmethod @abstractmethod
def get_messages(self, pubsub_topic): def get_relay_messages(self, pubsub_topic):
pass
@abstractmethod
def set_filter_subscriptions(self, subscription):
pass
@abstractmethod
def get_filter_messages(self, content_topic):
pass pass

View File

@ -1,6 +1,5 @@
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
import json import json
from dataclasses import asdict
from urllib.parse import quote from urllib.parse import quote
from src.node.api_clients.base_client import BaseClient from src.node.api_clients.base_client import BaseClient
@ -20,15 +19,27 @@ class REST(BaseClient):
info_response = self.rest_call("get", "debug/v1/info") info_response = self.rest_call("get", "debug/v1/info")
return info_response.json() return info_response.json()
def set_subscriptions(self, pubsub_topics): def set_relay_subscriptions(self, pubsub_topics):
return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics)) return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics))
def delete_subscriptions(self, pubsub_topics): def delete_relay_subscriptions(self, pubsub_topics):
return self.rest_call("delete", "relay/v1/subscriptions", json.dumps(pubsub_topics)) return self.rest_call("delete", "relay/v1/subscriptions", json.dumps(pubsub_topics))
def send_message(self, message, pubsub_topic): def send_relay_message(self, message, pubsub_topic):
return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message)) return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message))
def get_messages(self, pubsub_topic): def get_relay_messages(self, pubsub_topic):
get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}") get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}")
return get_messages_response.json() return get_messages_response.json()
def set_filter_subscriptions(self, subscription):
set_subscriptions_response = self.rest_call("post", "filter/v2/subscriptions", json.dumps(subscription))
return set_subscriptions_response.json()
def get_filter_messages(self, content_topic):
get_messages_response = self.rest_call("get", f"filter/v2/messages/{quote(content_topic, safe='')}")
return get_messages_response.json()
def update_filter_subscriptions(self, subscription):
update_subscriptions_response = self.rest_call("put", "filter/v2/subscriptions", json.dumps(subscription))
return update_subscriptions_response.json()

View File

@ -21,21 +21,35 @@ class RPC(BaseClient):
info_response = self.rpc_call("get_waku_v2_debug_v1_info", []) info_response = self.rpc_call("get_waku_v2_debug_v1_info", [])
return info_response.json()["result"] return info_response.json()["result"]
def set_subscriptions(self, pubsub_topics): def set_relay_subscriptions(self, pubsub_topics):
if "nwaku" in self._image_name: if "nwaku" in self._image_name:
return self.rpc_call("post_waku_v2_relay_v1_subscriptions", [pubsub_topics]) return self.rpc_call("post_waku_v2_relay_v1_subscriptions", [pubsub_topics])
else: else:
return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics]) return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics])
def delete_subscriptions(self, pubsub_topics): def delete_relay_subscriptions(self, pubsub_topics):
if "nwaku" in self._image_name: if "nwaku" in self._image_name:
return self.rpc_call("delete_waku_v2_relay_v1_subscriptions", [pubsub_topics]) return self.rpc_call("delete_waku_v2_relay_v1_subscriptions", [pubsub_topics])
else: else:
return self.rpc_call("delete_waku_v2_relay_v1_subscription", [pubsub_topics]) return self.rpc_call("delete_waku_v2_relay_v1_subscription", [pubsub_topics])
def send_message(self, message, pubsub_topic): def send_relay_message(self, message, pubsub_topic):
return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, message]) return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, message])
def get_messages(self, pubsub_topic): def get_relay_messages(self, pubsub_topic):
get_messages_response = self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic]) get_messages_response = self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic])
return get_messages_response.json()["result"] return get_messages_response.json()["result"]
def set_filter_subscriptions(self, subscription):
set_subscriptions_response = self.rpc_call(
"post_waku_v2_filter_v1_subscription",
[
subscription.get("contentFilters", []),
subscription.get("pubsubTopic", None),
],
)
return set_subscriptions_response.json()["result"]
def get_filter_messages(self, content_topic):
get_messages_response = self.rpc_call("get_waku_v2_filter_v1_messages", [content_topic])
return get_messages_response.json()["result"]

48
src/node/waku_message.py Normal file
View File

@ -0,0 +1,48 @@
from dataclasses import dataclass, field
from marshmallow_dataclass import class_schema
from typing import Optional, Union
import math
import allure
@dataclass
class MessageRpcResponse:
payload: str
contentTopic: str
version: Optional[int]
timestamp: Optional[int]
ephemeral: Optional[bool]
meta: Optional[str]
rateLimitProof: Optional[Union[dict, str]] = field(default_factory=dict)
rate_limit_proof: Optional[dict] = field(default_factory=dict)
message_rpc_response_schema = class_schema(MessageRpcResponse)()
class WakuMessage:
def __init__(self, message_response):
self.received_messages = message_response
@allure.step
def assert_received_message(self, sent_message, index=0):
message = message_rpc_response_schema.load(self.received_messages[index])
def assert_fail_message(field_name):
return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(message, field_name)}"
assert message.payload == sent_message["payload"], assert_fail_message("payload")
assert message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic")
if sent_message.get("timestamp") is not None:
if isinstance(sent_message["timestamp"], float):
assert math.isclose(float(message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp")
else:
assert str(message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp")
if "version" in sent_message:
assert str(message.version) == str(sent_message["version"]), assert_fail_message("version")
if "meta" in sent_message:
assert str(message.meta) == str(sent_message["meta"]), assert_fail_message("meta")
if "ephemeral" in sent_message:
assert str(message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral")
if "rateLimitProof" in sent_message:
assert str(message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof")

View File

@ -1,4 +1,6 @@
import os import os
import pytest
from src.libs.common import delay from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed from tenacity import retry, stop_after_delay, wait_fixed
@ -27,7 +29,8 @@ class WakuNode:
self._ports = self._docker_manager.generate_ports() self._ports = self._docker_manager.generate_ports()
self._rest_port = self._ports[0] self._rest_port = self._ports[0]
self._rpc_port = self._ports[1] self._rpc_port = self._ports[1]
self._websocket_port = self._ports[2] self._websocket_port = self._ports[3]
self._tcp_port = self._ports[2]
if PROTOCOL == "RPC": if PROTOCOL == "RPC":
self._api = RPC(self._rpc_port, self._image_name) self._api = RPC(self._rpc_port, self._image_name)
@ -53,6 +56,8 @@ class WakuNode:
"rpc-address": "0.0.0.0", "rpc-address": "0.0.0.0",
"rest-address": "0.0.0.0", "rest-address": "0.0.0.0",
"nat": f"extip:{self._ext_ip}", "nat": f"extip:{self._ext_ip}",
"peer-exchange": "true",
"discv5-discovery": "true",
} }
if "go-waku" in self._docker_manager.image: if "go-waku" in self._docker_manager.image:
@ -69,7 +74,7 @@ class WakuNode:
self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip) self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip)
logger.debug( logger.debug(
f"Started container from image {self._image_name}. RPC: {self._rpc_port} REST: {self._rest_port} WebSocket: {self._websocket_port}" f"Started container from image {self._image_name}. RPC: {self._rpc_port} REST: {self._rest_port} WebSocket: {self._websocket_port} TCP: {self._tcp_port}"
) )
DS.waku_nodes.append(self) DS.waku_nodes.append(self)
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
@ -103,23 +108,51 @@ class WakuNode:
@retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True)
def ensure_ready(self): def ensure_ready(self):
self.info() self.info_response = self.info()
logger.info(f"{PROTOCOL} service is ready !!") logger.info(f"{PROTOCOL} service is ready !!")
def get_enr_uri(self):
try:
return self.info_response["enrUri"]
except Exception as ex:
raise AttributeError(f"Could not find enrUri in the info call because of error: {str(ex)}")
def get_multiaddr_with_id(self):
addresses = self.info_response.get("listenAddresses", [])
ws_address = next((addr for addr in addresses if "/ws" not in addr), None)
if ws_address:
identifier = ws_address.split("/p2p/")[-1]
new_address = f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}/p2p/{identifier}"
return new_address
else:
raise AttributeError("No '/ws' address found")
def info(self): def info(self):
return self._api.info() return self._api.info()
def set_subscriptions(self, pubsub_topics): def set_relay_subscriptions(self, pubsub_topics):
return self._api.set_subscriptions(pubsub_topics) return self._api.set_relay_subscriptions(pubsub_topics)
def delete_subscriptions(self, pubsub_topics): def delete_relay_subscriptions(self, pubsub_topics):
return self._api.delete_subscriptions(pubsub_topics) return self._api.delete_relay_subscriptions(pubsub_topics)
def send_message(self, message, pubsub_topic): def send_relay_message(self, message, pubsub_topic):
return self._api.send_message(message, pubsub_topic) return self._api.send_relay_message(message, pubsub_topic)
def get_messages(self, pubsub_topic): def get_relay_messages(self, pubsub_topic):
return self._api.get_messages(pubsub_topic) return self._api.get_relay_messages(pubsub_topic)
def set_filter_subscriptions(self, subscription):
return self._api.set_filter_subscriptions(subscription)
def update_filter_subscriptions(self, subscription):
if PROTOCOL == "RPC":
pytest.skip("This method doesn't exist for RPC protocol")
else:
return self._api.update_filter_subscriptions(subscription)
def get_filter_messages(self, content_topic):
return self._api.get_filter_messages(content_topic)
@property @property
def image(self): def image(self):

124
src/steps/filter.py Normal file
View File

@ -0,0 +1,124 @@
import inspect
from uuid import uuid4
from src.libs.custom_logger import get_custom_logger
from time import time
import pytest
import allure
from src.libs.common import to_base64, delay
from src.node.waku_message import WakuMessage
from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY
from src.node.waku_node import WakuNode
from tenacity import retry, stop_after_delay, wait_fixed
from src.test_data import VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__)
class StepsFilter:
test_pubsub_topic = VALID_PUBSUB_TOPICS[1]
test_content_topic = "/test/1/waku-filter/proto"
second_conted_topic = "/test/2/waku-filter/proto"
test_payload = "Filter works!!"
@pytest.fixture(scope="function")
def setup_relay_node(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
start_args = {"relay": "true", "filter": "true", "nodekey": NODEKEY}
if self.node1.is_gowaku():
start_args["min_relay_peers_to_publish"] = "0"
self.node1.start(**start_args)
self.enr_uri = self.node1.get_enr_uri()
self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
@pytest.fixture(scope="function")
def setup_main_filter_node(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
self.node2.start(filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
self.main_nodes = [self.node2]
self.optional_nodes = []
@pytest.fixture(scope="function")
def subscribe_main_nodes(self):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
@pytest.fixture(scope="function")
def setup_optional_filter_nodes(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
if ADDITIONAL_NODES:
nodes = [node.strip() for node in ADDITIONAL_NODES.split(",")]
else:
pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
for index, node in enumerate(nodes):
node = WakuNode(node, f"node{index}_{request.cls.test_id}")
node.start(filter="true", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
self.optional_nodes.append(node)
@allure.step
def check_published_message_reaches_filter_peer(
self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None
):
if message is None:
message = self.create_message()
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
if not sender:
sender = self.node1
if not peer_list:
peer_list = self.main_nodes + self.optional_nodes
sender.send_relay_message(message, pubsub_topic)
delay(message_propagation_delay)
for index, peer in enumerate(peer_list):
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
get_messages_response = self.get_filter_messages(message["contentTopic"], node=peer)
assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages"
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
waku_message = WakuMessage(get_messages_response)
waku_message.assert_received_message(message)
@retry(stop=stop_after_delay(10), wait=wait_fixed(1), reraise=True)
@allure.step
def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None):
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
self.node1.set_relay_subscriptions([pubsub_topic])
request_id = str(uuid4())
filter_sub_response = self.create_filter_subscription(
{"requestId": request_id, "contentFilters": content_topic_list, "pubsubTopic": pubsub_topic}
)
assert filter_sub_response["requestId"] == request_id
assert filter_sub_response["statusCode"] == 0
assert filter_sub_response["statusDesc"] == ""
@allure.step
def create_filter_subscription(self, subscription, node=None):
if node is None:
node = self.node2
return node.set_filter_subscriptions(subscription)
@allure.step
def update_filter_subscription(self, subscription, node=None):
if node is None:
node = self.node2
return node.update_filter_subscriptions(subscription)
@allure.step
def add_new_relay_subscription(self, pubsub_topics, node=None):
if node is None:
node = self.node1
self.node1.set_relay_subscriptions(pubsub_topics)
@allure.step
def get_filter_messages(self, content_topic, node=None):
if node is None:
node = self.node2
return node.get_filter_messages(content_topic)
@allure.step
def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message

View File

@ -1,20 +1,20 @@
import inspect import inspect
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
import math
from time import time from time import time
import pytest import pytest
import allure import allure
from src.libs.common import to_base64, delay from src.libs.common import to_base64, delay
from src.data_classes import message_rpc_response_schema from src.node.waku_message import WakuMessage
from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY, RUNNING_IN_CI from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY, RUNNING_IN_CI
from src.node.waku_node import WakuNode from src.node.waku_node import WakuNode
from tenacity import retry, stop_after_delay, wait_fixed from tenacity import retry, stop_after_delay, wait_fixed
from src.test_data import VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
class StepsRelay: class StepsRelay:
test_pubsub_topic = "/waku/2/rs/18/1" test_pubsub_topic = VALID_PUBSUB_TOPICS[1]
test_content_topic = "/test/1/waku-relay/proto" test_content_topic = "/test/1/waku-relay/proto"
test_payload = "Relay works!!" test_payload = "Relay works!!"
@ -22,13 +22,10 @@ class StepsRelay:
def setup_main_relay_nodes(self, request): def setup_main_relay_nodes(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY) self.node1.start(relay="true", nodekey=NODEKEY)
try: self.enr_uri = self.node1.get_enr_uri()
self.enr_uri = self.node1.info()["enrUri"]
except Exception as ex:
raise AttributeError(f"Could not find enrUri in the info call because of error: {str(ex)}")
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true") self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri)
self.main_nodes = [self.node1, self.node2] self.main_nodes = [self.node1, self.node2]
self.optional_nodes = [] self.optional_nodes = []
@ -41,31 +38,31 @@ class StepsRelay:
pytest.skip("ADDITIONAL_NODES is empty, cannot run test") pytest.skip("ADDITIONAL_NODES is empty, cannot run test")
for index, node in enumerate(nodes): for index, node in enumerate(nodes):
node = WakuNode(node, f"node{index}_{request.cls.test_id}") node = WakuNode(node, f"node{index}_{request.cls.test_id}")
node.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true") node.start(relay="true", discv5_bootstrap_node=self.enr_uri)
self.optional_nodes.append(node) self.optional_nodes.append(node)
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def subscribe_main_relay_nodes(self): def subscribe_main_relay_nodes(self):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def subscribe_optional_relay_nodes(self): def subscribe_optional_relay_nodes(self):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.ensure_subscriptions_on_nodes(self.optional_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.optional_nodes, [self.test_pubsub_topic])
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def relay_warm_up(self): def relay_warm_up(self):
try: try:
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
logger.info("WARM UP successful!!") logger.info("WARM UP successful!!")
except Exception as ex: except Exception as ex:
raise TimeoutError(f"WARM UP FAILED WITH: {ex}") raise TimeoutError(f"WARM UP FAILED WITH: {ex}")
# this method should be used only for the tests that use the warm_up fixture # this method should be used only for the tests that use the relay_warm_up fixture
# otherwise use wait_for_published_message_to_reach_peer # otherwise use wait_for_published_message_to_reach_relay_peer
@allure.step @allure.step
def check_published_message_reaches_peer(self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None): def check_published_message_reaches_relay_peer(self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None):
if message is None: if message is None:
message = self.create_message() message = self.create_message()
if pubsub_topic is None: if pubsub_topic is None:
@ -75,66 +72,47 @@ class StepsRelay:
if not peer_list: if not peer_list:
peer_list = self.main_nodes + self.optional_nodes peer_list = self.main_nodes + self.optional_nodes
sender.send_message(message, pubsub_topic) sender.send_relay_message(message, pubsub_topic)
delay(message_propagation_delay) delay(message_propagation_delay)
for index, peer in enumerate(peer_list): for index, peer in enumerate(peer_list):
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
get_messages_response = peer.get_messages(pubsub_topic) get_messages_response = peer.get_relay_messages(pubsub_topic)
assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages" assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages"
received_message = message_rpc_response_schema.load(get_messages_response[0]) assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
self.assert_received_message(message, received_message) waku_message = WakuMessage(get_messages_response)
waku_message.assert_received_message(message)
@allure.step @allure.step
def check_publish_without_subscription(self, pubsub_topic): def check_publish_without_relay_subscription(self, pubsub_topic):
try: try:
self.node1.send_message(self.create_message(), pubsub_topic) self.node1.send_relay_message(self.create_message(), pubsub_topic)
raise AssertionError("Publish with no subscription worked!!!") raise AssertionError("Publish with no subscription worked!!!")
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
# we need much bigger timeout in CI because we run tests in parallel there and the machine itself is slower # we need much bigger timeout in CI because we run tests in parallel there and the machine itself is slower
@allure.step @allure.step
def wait_for_published_message_to_reach_peer( def wait_for_published_message_to_reach_relay_peer(
self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, pubsub_topic=None, sender=None, peer_list=None self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, pubsub_topic=None, sender=None, peer_list=None
): ):
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True) @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True)
def check_peer_connection(): def check_peer_connection():
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
self.check_published_message_reaches_peer(message, pubsub_topic=pubsub_topic, sender=sender, peer_list=peer_list) self.check_published_message_reaches_relay_peer(message, pubsub_topic=pubsub_topic, sender=sender, peer_list=peer_list)
check_peer_connection() check_peer_connection()
@allure.step @allure.step
def assert_received_message(self, sent_message, received_message): def ensure_relay_subscriptions_on_nodes(self, node_list, pubsub_topic_list):
def assert_fail_message(field_name): for node in node_list:
return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(received_message, field_name)}" node.set_relay_subscriptions(pubsub_topic_list)
assert received_message.payload == sent_message["payload"], assert_fail_message("payload")
assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic")
if sent_message.get("timestamp") is not None:
if isinstance(sent_message["timestamp"], float):
assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp")
else:
assert str(received_message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp")
if "version" in sent_message:
assert str(received_message.version) == str(sent_message["version"]), assert_fail_message("version")
if "meta" in sent_message:
assert str(received_message.meta) == str(sent_message["meta"]), assert_fail_message("meta")
if "ephemeral" in sent_message:
assert str(received_message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral")
if "rateLimitProof" in sent_message:
assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof")
@allure.step @allure.step
def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list): def delete_relay_subscriptions_on_nodes(self, node_list, pubsub_topic_list):
for node in node_list: for node in node_list:
node.set_subscriptions(pubsub_topic_list) node.delete_relay_subscriptions(pubsub_topic_list)
@allure.step @allure.step
def delete_subscriptions_on_nodes(self, node_list, pubsub_topic_list):
for node in node_list:
node.delete_subscriptions(pubsub_topic_list)
def create_message(self, **kwargs): def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs) message.update(kwargs)

0
tests/filter/__init__.py Normal file
View File

View File

@ -0,0 +1,120 @@
import pytest
from src.libs.custom_logger import get_custom_logger
from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS
from src.steps.filter import StepsFilter
logger = get_custom_logger(__name__)
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node")
class TestFilterSubscribeUpdate(StepsFilter):
def test_filter_subscribe_to_single_topics(self):
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
self.check_published_message_reaches_filter_peer()
def test_filter_subscribe_to_multiple_pubsub_topic(self):
failed_pubsub_topics = []
for pubsub_topic in VALID_PUBSUB_TOPICS:
content_topic = pubsub_topic
logger.debug(f"Running test with pubsub topic: {pubsub_topic}")
try:
self.wait_for_subscriptions_on_main_nodes([content_topic], pubsub_topic)
message = self.create_message(contentTopic=content_topic)
self.check_published_message_reaches_filter_peer(message, pubsub_topic=pubsub_topic)
except Exception as ex:
logger.error(f"PubsubTopic {pubsub_topic} failed: {str(ex)}")
failed_pubsub_topics.append(pubsub_topic)
assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}"
def test_filter_subscribe_to_30_content_topics_in_one_call(self):
failed_content_topics = []
self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[:30]])
for content_topic in SAMPLE_INPUTS[:30]:
logger.debug(f'Running test with content topic {content_topic["description"]}')
message = self.create_message(contentTopic=content_topic["value"])
try:
self.check_published_message_reaches_filter_peer(message)
except Exception as ex:
logger.error(f'ContentTopic {content_topic["description"]} failed: {str(ex)}')
failed_content_topics.append(content_topic)
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
def test_filter_subscribe_to_30_content_topics_in_separate_calls(self, subscribe_main_nodes):
for content_topic in SAMPLE_INPUTS[:30]:
self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic["value"]], "pubsubTopic": self.test_pubsub_topic})
failed_content_topics = []
for content_topic in SAMPLE_INPUTS[:30]:
logger.debug(f'Running test with content topic {content_topic["description"]}')
message = self.create_message(contentTopic=content_topic["value"])
try:
self.check_published_message_reaches_filter_peer(message)
except Exception as ex:
logger.error(f'ContentTopic {content_topic["description"]} failed: {str(ex)}')
failed_content_topics.append(content_topic)
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
def test_filter_subscribe_to_31_content_topics(self, subscribe_main_nodes):
try:
_31_content_topics = [input["value"] for input in SAMPLE_INPUTS[:31]]
self.create_filter_subscription({"requestId": "1", "contentFilters": _31_content_topics, "pubsubTopic": self.test_pubsub_topic})
raise AssertionError("Subscribe with more than 30 content topics worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_subscribe_refresh(self):
for _ in range(2):
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
self.check_published_message_reaches_filter_peer()
def test_filter_subscribe_with_multiple_overlapping_content_topics(self):
self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[:3]])
self.wait_for_subscriptions_on_main_nodes([input["value"] for input in SAMPLE_INPUTS[1:4]])
for content_topic in SAMPLE_INPUTS[:4]:
message = self.create_message(contentTopic=content_topic["value"])
self.check_published_message_reaches_filter_peer(message)
def test_filter_subscribe_with_no_pubsub_topic(self, subscribe_main_nodes):
try:
self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]})
raise AssertionError(f"Subscribe with no pubusub topics worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_subscribe_with_invalid_pubsub_topic_format(self, subscribe_main_nodes):
try:
self.create_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": [self.test_pubsub_topic]})
raise AssertionError(f"Subscribe with invalid pubusub topics worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_subscribe_with_no_content_topic(self, subscribe_main_nodes):
try:
self.create_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic})
raise AssertionError(f"Subscribe with no content topics worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_subscribe_with_invalid_content_topic_format(self, subscribe_main_nodes):
success_content_topics = []
for content_topic in INVALID_CONTENT_TOPICS:
logger.debug(f'Running test with contetn topic {content_topic["description"]}')
try:
self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic})
success_content_topics.append(content_topic)
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
assert not success_content_topics, f"Invalid Content topics that didn't failed: {success_content_topics}"
def test_filter_subscribe_with_no_request_id(self, subscribe_main_nodes):
try:
self.create_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
raise AssertionError(f"Subscribe with no request id worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_subscribe_with_invalid_request_id(self, subscribe_main_nodes):
try:
self.create_filter_subscription({"requestId": 1, "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
raise AssertionError(f"Subscribe with invalid request id worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)

View File

@ -0,0 +1,107 @@
import pytest
from src.libs.custom_logger import get_custom_logger
from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS
from src.steps.filter import StepsFilter
logger = get_custom_logger(__name__)
@pytest.mark.usefixtures("setup_relay_node", "setup_main_filter_node")
class TestFilterSubscribeCreate(StepsFilter):
def test_filter_update_subscription_add_a_new_content_topic(self):
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_conted_topic], "pubsubTopic": self.test_pubsub_topic})
self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.test_content_topic))
self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.second_conted_topic))
def test_filter_update_subscription_add_30_new_content_topics(self):
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
self.update_filter_subscription(
{"requestId": "1", "contentFilters": [input["value"] for input in SAMPLE_INPUTS[:30]], "pubsubTopic": self.test_pubsub_topic}
)
self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.test_content_topic))
failed_content_topics = []
for content_topic in SAMPLE_INPUTS[:30]:
logger.debug(f'Running test with content topic {content_topic["description"]}')
message = self.create_message(contentTopic=content_topic["value"])
try:
self.check_published_message_reaches_filter_peer(message)
except Exception as ex:
logger.error(f'ContentTopic {content_topic["description"]} failed: {str(ex)}')
failed_content_topics.append(content_topic)
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
def test_filter_update_subscription_add_31_new_content_topics(self, subscribe_main_nodes):
try:
_31_content_topics = [input["value"] for input in SAMPLE_INPUTS[:31]]
self.update_filter_subscription({"requestId": "1", "contentFilters": _31_content_topics, "pubsubTopic": self.test_pubsub_topic})
raise AssertionError("Subscribe with more than 30 content topics worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_update_subscription_refresh_existing(self):
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=self.test_content_topic))
def test_filter_update_subscription_add_a_new_pubsub_topic(self):
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
self.update_filter_subscription(
{"requestId": "1", "contentFilters": [self.test_content_topic, self.second_conted_topic], "pubsubTopic": VALID_PUBSUB_TOPICS[4]}
)
self.add_new_relay_subscription(VALID_PUBSUB_TOPICS[4:5])
self.check_published_message_reaches_filter_peer(
self.create_message(contentTopic=self.test_content_topic), pubsub_topic=self.test_pubsub_topic
)
self.check_published_message_reaches_filter_peer(
self.create_message(contentTopic=self.test_content_topic), pubsub_topic=VALID_PUBSUB_TOPICS[4]
)
self.check_published_message_reaches_filter_peer(
self.create_message(contentTopic=self.second_conted_topic), pubsub_topic=VALID_PUBSUB_TOPICS[4]
)
def test_filter_update_subscription_with_no_pubsub_topic(self, subscribe_main_nodes):
try:
self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic]})
raise AssertionError(f"Subscribe with no pubusub topics worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_update_subscription_with_pubsub_topic_list_instead_of_string(self, subscribe_main_nodes):
try:
self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": [self.test_pubsub_topic]})
raise AssertionError(f"Subscribe with invalid pubusub topics worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_update_subscription_with_no_content_topic(self, subscribe_main_nodes):
try:
self.update_filter_subscription({"requestId": "1", "pubsubTopic": self.test_pubsub_topic})
raise AssertionError(f"Subscribe with no content topics worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_update_subscription_with_invalid_content_topic_format(self, subscribe_main_nodes):
success_content_topics = []
for content_topic in INVALID_CONTENT_TOPICS:
logger.debug(f'Running test with contetn topic {content_topic["description"]}')
try:
self.update_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic})
success_content_topics.append(content_topic)
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
assert not success_content_topics, f"Invalid Content topics that didn't failed: {success_content_topics}"
def test_filter_update_subscription_with_no_request_id(self, subscribe_main_nodes):
try:
self.update_filter_subscription({"contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
raise AssertionError(f"Subscribe with no request id worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)
def test_filter_update_subscription_with_invalid_request_id(self, subscribe_main_nodes):
try:
self.update_filter_subscription({"requestId": 1, "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
raise AssertionError(f"Subscribe with invalid request id worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex)

View File

@ -5,15 +5,15 @@ from src.steps.relay import StepsRelay
@pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes") @pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes")
class TestMultipleNodes(StepsRelay): class TestMultipleNodes(StepsRelay):
def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
self.check_published_message_reaches_peer() self.check_published_message_reaches_relay_peer()
def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
self.check_published_message_reaches_peer(sender=self.optional_nodes[-1]) self.check_published_message_reaches_relay_peer(sender=self.optional_nodes[-1])
def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self): def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self):
self.wait_for_published_message_to_reach_peer(peer_list=self.main_nodes) self.wait_for_published_message_to_reach_relay_peer(peer_list=self.main_nodes)
try: try:
self.check_published_message_reaches_peer(peer_list=self.optional_nodes) self.check_published_message_reaches_relay_peer(peer_list=self.optional_nodes)
raise AssertionError("Non subscribed nodes received the message!!") raise AssertionError("Non subscribed nodes received the message!!")
except Exception as ex: except Exception as ex:
assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found" assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found"

View File

@ -4,7 +4,7 @@ from time import time
from src.libs.common import delay, to_base64 from src.libs.common import delay, to_base64
from src.steps.relay import StepsRelay from src.steps.relay import StepsRelay
from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS, VALID_PUBSUB_TOPICS from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS, VALID_PUBSUB_TOPICS
from src.data_classes import message_rpc_response_schema from src.node.waku_message import WakuMessage
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
@ -17,7 +17,7 @@ class TestRelayPublish(StepsRelay):
logger.debug(f'Running test with payload {payload["description"]}') logger.debug(f'Running test with payload {payload["description"]}')
message = self.create_message(payload=to_base64(payload["value"])) message = self.create_message(payload=to_base64(payload["value"]))
try: try:
self.check_published_message_reaches_peer(message) self.check_published_message_reaches_relay_peer(message)
except Exception as e: except Exception as e:
logger.error(f'Payload {payload["description"]} failed: {str(e)}') logger.error(f'Payload {payload["description"]} failed: {str(e)}')
failed_payloads.append(payload["description"]) failed_payloads.append(payload["description"])
@ -29,7 +29,7 @@ class TestRelayPublish(StepsRelay):
logger.debug(f'Running test with payload {payload["description"]}') logger.debug(f'Running test with payload {payload["description"]}')
message = self.create_message(payload=payload["value"]) message = self.create_message(payload=payload["value"])
try: try:
self.node1.send_message(message, self.test_pubsub_topic) self.node1.send_relay_message(message, self.test_pubsub_topic)
success_payloads.append(payload) success_payloads.append(payload)
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
@ -38,7 +38,7 @@ class TestRelayPublish(StepsRelay):
def test_publish_with_missing_payload(self): def test_publish_with_missing_payload(self):
message = {"contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message = {"contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
try: try:
self.node1.send_message(message, self.test_pubsub_topic) self.node1.send_relay_message(message, self.test_pubsub_topic)
raise AssertionError("Publish with missing payload worked!!!") raise AssertionError("Publish with missing payload worked!!!")
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
@ -47,14 +47,14 @@ class TestRelayPublish(StepsRelay):
payload_length = 1024 * 1023 payload_length = 1024 * 1023
logger.debug(f"Running test with payload length of {payload_length} bytes") logger.debug(f"Running test with payload length of {payload_length} bytes")
message = self.create_message(payload=to_base64("a" * (payload_length))) message = self.create_message(payload=to_base64("a" * (payload_length)))
self.check_published_message_reaches_peer(message, message_propagation_delay=2) self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2)
def test_publish_with_payload_equal_or_more_than_one_mb(self): def test_publish_with_payload_equal_or_more_than_one_mb(self):
for payload_length in [1024 * 1024, 1024 * 1024 * 10]: for payload_length in [1024 * 1024, 1024 * 1024 * 10]:
logger.debug(f"Running test with payload length of {payload_length} bytes") logger.debug(f"Running test with payload length of {payload_length} bytes")
message = self.create_message(payload=to_base64("a" * (payload_length))) message = self.create_message(payload=to_base64("a" * (payload_length)))
try: try:
self.check_published_message_reaches_peer(message, message_propagation_delay=2) self.check_published_message_reaches_relay_peer(message, message_propagation_delay=2)
raise AssertionError("Duplicate message was retrieved twice") raise AssertionError("Duplicate message was retrieved twice")
except Exception as ex: except Exception as ex:
assert "couldn't find any messages" in str(ex) assert "couldn't find any messages" in str(ex)
@ -65,7 +65,7 @@ class TestRelayPublish(StepsRelay):
logger.debug(f'Running test with content topic {content_topic["description"]}') logger.debug(f'Running test with content topic {content_topic["description"]}')
message = self.create_message(contentTopic=content_topic["value"]) message = self.create_message(contentTopic=content_topic["value"])
try: try:
self.check_published_message_reaches_peer(message) self.check_published_message_reaches_relay_peer(message)
except Exception as e: except Exception as e:
logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}') logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}')
failed_content_topics.append(content_topic) failed_content_topics.append(content_topic)
@ -77,7 +77,7 @@ class TestRelayPublish(StepsRelay):
logger.debug(f'Running test with contetn topic {content_topic["description"]}') logger.debug(f'Running test with contetn topic {content_topic["description"]}')
message = self.create_message(contentTopic=content_topic["value"]) message = self.create_message(contentTopic=content_topic["value"])
try: try:
self.node1.send_message(message, self.test_pubsub_topic) self.node1.send_relay_message(message, self.test_pubsub_topic)
success_content_topics.append(content_topic) success_content_topics.append(content_topic)
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
@ -86,33 +86,33 @@ class TestRelayPublish(StepsRelay):
def test_publish_with_missing_content_topic(self): def test_publish_with_missing_content_topic(self):
message = {"payload": to_base64(self.test_payload), "timestamp": int(time() * 1e9)} message = {"payload": to_base64(self.test_payload), "timestamp": int(time() * 1e9)}
try: try:
self.node1.send_message(message, self.test_pubsub_topic) self.node1.send_relay_message(message, self.test_pubsub_topic)
raise AssertionError("Publish with missing content_topic worked!!!") raise AssertionError("Publish with missing content_topic worked!!!")
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
def test_publish_on_multiple_pubsub_topics(self): def test_publish_on_multiple_pubsub_topics(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
failed_pubsub_topics = [] failed_pubsub_topics = []
for pubsub_topic in VALID_PUBSUB_TOPICS: for pubsub_topic in VALID_PUBSUB_TOPICS:
logger.debug(f"Running test with pubsub topic {pubsub_topic}") logger.debug(f"Running test with pubsub topic {pubsub_topic}")
try: try:
self.check_published_message_reaches_peer(pubsub_topic=pubsub_topic) self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
except Exception as e: except Exception as e:
logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}") logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}")
failed_pubsub_topics.append(pubsub_topic) failed_pubsub_topics.append(pubsub_topic)
assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}"
def test_message_published_on_different_pubsub_topic_is_not_retrieved(self): def test_message_published_on_different_pubsub_topic_is_not_retrieved(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
self.node1.send_message(self.create_message(), VALID_PUBSUB_TOPICS[0]) self.node1.send_relay_message(self.create_message(), VALID_PUBSUB_TOPICS[0])
delay(0.1) delay(0.1)
messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1]) messages = self.node2.get_relay_messages(VALID_PUBSUB_TOPICS[1])
assert not messages, "Message was retrieved on wrong pubsub_topic" assert not messages, "Message was retrieved on wrong pubsub_topic"
def test_publish_on_non_subscribed_pubsub_topic(self): def test_publish_on_non_subscribed_pubsub_topic(self):
try: try:
self.check_published_message_reaches_peer(pubsub_topic="/waku/2/rs/19/1") self.check_published_message_reaches_relay_peer(pubsub_topic=VALID_PUBSUB_TOPICS[4])
raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!") raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!")
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
@ -124,7 +124,7 @@ class TestRelayPublish(StepsRelay):
logger.debug(f'Running test with timestamp {timestamp["description"]}') logger.debug(f'Running test with timestamp {timestamp["description"]}')
message = self.create_message(timestamp=timestamp["value"]) message = self.create_message(timestamp=timestamp["value"])
try: try:
self.check_published_message_reaches_peer(message) self.check_published_message_reaches_relay_peer(message)
except Exception as ex: except Exception as ex:
logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}') logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}')
failed_timestamps.append(timestamp) failed_timestamps.append(timestamp)
@ -137,7 +137,7 @@ class TestRelayPublish(StepsRelay):
logger.debug(f'Running test with timestamp {timestamp["description"]}') logger.debug(f'Running test with timestamp {timestamp["description"]}')
message = self.create_message(timestamp=timestamp["value"]) message = self.create_message(timestamp=timestamp["value"])
try: try:
self.check_published_message_reaches_peer(message) self.check_published_message_reaches_relay_peer(message)
success_timestamps.append(timestamp) success_timestamps.append(timestamp)
except Exception as e: except Exception as e:
pass pass
@ -145,24 +145,24 @@ class TestRelayPublish(StepsRelay):
def test_publish_with_no_timestamp(self): def test_publish_with_no_timestamp(self):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic} message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic}
self.check_published_message_reaches_peer(message) self.check_published_message_reaches_relay_peer(message)
def test_publish_with_valid_version(self): def test_publish_with_valid_version(self):
self.check_published_message_reaches_peer(self.create_message(version=10)) self.check_published_message_reaches_relay_peer(self.create_message(version=10))
def test_publish_with_invalid_version(self): def test_publish_with_invalid_version(self):
try: try:
self.check_published_message_reaches_peer(self.create_message(version=2.1)) self.check_published_message_reaches_relay_peer(self.create_message(version=2.1))
raise AssertionError("Publish with invalid version worked!!!") raise AssertionError("Publish with invalid version worked!!!")
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) assert "Bad Request" in str(ex)
def test_publish_with_valid_meta(self): def test_publish_with_valid_meta(self):
self.check_published_message_reaches_peer(self.create_message(meta=to_base64(self.test_payload))) self.check_published_message_reaches_relay_peer(self.create_message(meta=to_base64(self.test_payload)))
def test_publish_with_invalid_meta(self): def test_publish_with_invalid_meta(self):
try: try:
self.check_published_message_reaches_peer(self.create_message(meta=self.test_payload)) self.check_published_message_reaches_relay_peer(self.create_message(meta=self.test_payload))
raise AssertionError("Publish with invalid meta worked!!!") raise AssertionError("Publish with invalid meta worked!!!")
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) assert "Bad Request" in str(ex)
@ -172,7 +172,7 @@ class TestRelayPublish(StepsRelay):
for ephemeral in [True, False]: for ephemeral in [True, False]:
logger.debug(f"Running test with Ephemeral {ephemeral}") logger.debug(f"Running test with Ephemeral {ephemeral}")
try: try:
self.check_published_message_reaches_peer(self.create_message(ephemeral=ephemeral)) self.check_published_message_reaches_relay_peer(self.create_message(ephemeral=ephemeral))
except Exception as e: except Exception as e:
logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}") logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}")
failed_ephemeral.append(ephemeral) failed_ephemeral.append(ephemeral)
@ -184,16 +184,16 @@ class TestRelayPublish(StepsRelay):
"epoch": to_base64("epochData"), "epoch": to_base64("epochData"),
"nullifier": to_base64("nullifierData"), "nullifier": to_base64("nullifierData"),
} }
self.check_published_message_reaches_peer(self.create_message(rateLimitProof=rate_limit_proof)) self.check_published_message_reaches_relay_peer(self.create_message(rateLimitProof=rate_limit_proof))
def test_publish_with_extra_field(self): def test_publish_with_extra_field(self):
self.check_published_message_reaches_peer(self.create_message(extraField="extraValue")) self.check_published_message_reaches_relay_peer(self.create_message(extraField="extraValue"))
def test_publish_and_retrieve_duplicate_message(self): def test_publish_and_retrieve_duplicate_message(self):
message = self.create_message() message = self.create_message()
self.check_published_message_reaches_peer(message) self.check_published_message_reaches_relay_peer(message)
try: try:
self.check_published_message_reaches_peer(message) self.check_published_message_reaches_relay_peer(message)
raise AssertionError("Duplicate message was retrieved twice") raise AssertionError("Duplicate message was retrieved twice")
except Exception as ex: except Exception as ex:
assert "couldn't find any messages" in str(ex) assert "couldn't find any messages" in str(ex)
@ -201,43 +201,43 @@ class TestRelayPublish(StepsRelay):
def test_publish_while_peer_is_paused(self): def test_publish_while_peer_is_paused(self):
message = self.create_message() message = self.create_message()
self.node2.pause() self.node2.pause()
self.node1.send_message(message, self.test_pubsub_topic) self.node1.send_relay_message(message, self.test_pubsub_topic)
self.node2.unpause() self.node2.unpause()
get_messages_response = self.node2.get_messages(self.test_pubsub_topic) get_messages_response = self.node2.get_relay_messages(self.test_pubsub_topic)
assert get_messages_response, "Peer node couldn't find any messages" assert get_messages_response, "Peer node couldn't find any messages"
received_message = message_rpc_response_schema.load(get_messages_response[0]) waku_message = WakuMessage(get_messages_response)
self.assert_received_message(message, received_message) waku_message.assert_received_message(message)
def test_publish_after_node_pauses_and_pauses(self): def test_publish_after_node_pauses_and_pauses(self):
self.check_published_message_reaches_peer() self.check_published_message_reaches_relay_peer()
self.node1.pause() self.node1.pause()
self.node1.unpause() self.node1.unpause()
self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) self.check_published_message_reaches_relay_peer(self.create_message(payload=to_base64("M1")))
self.node2.pause() self.node2.pause()
self.node2.unpause() self.node2.unpause()
self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2"))) self.check_published_message_reaches_relay_peer(self.create_message(payload=to_base64("M2")))
def test_publish_after_node1_restarts(self): def test_publish_after_node1_restarts(self):
self.check_published_message_reaches_peer() self.check_published_message_reaches_relay_peer()
self.node1.restart() self.node1.restart()
self.node1.ensure_ready() self.node1.ensure_ready()
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
def test_publish_after_node2_restarts(self): def test_publish_after_node2_restarts(self):
self.check_published_message_reaches_peer() self.check_published_message_reaches_relay_peer()
self.node2.restart() self.node2.restart()
self.node2.ensure_ready() self.node2.ensure_ready()
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
def test_publish_and_retrieve_100_messages(self): def test_publish_and_retrieve_100_messages(self):
num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag
for index in range(num_messages): for index in range(num_messages):
message = self.create_message(payload=to_base64(f"M_{index}")) message = self.create_message(payload=to_base64(f"M_{index}"))
self.node1.send_message(message, self.test_pubsub_topic) self.node1.send_relay_message(message, self.test_pubsub_topic)
delay(1) delay(1)
messages = self.node2.get_messages(self.test_pubsub_topic) messages = self.node2.get_relay_messages(self.test_pubsub_topic)
assert len(messages) == num_messages assert len(messages) == num_messages
for index, message in enumerate(messages): for index, message in enumerate(messages):
assert message["payload"] == to_base64( assert message["payload"] == to_base64(

View File

@ -9,68 +9,68 @@ logger = get_custom_logger(__name__)
@pytest.mark.usefixtures("setup_main_relay_nodes") @pytest.mark.usefixtures("setup_main_relay_nodes")
class TestRelaySubscribe(StepsRelay): class TestRelaySubscribe(StepsRelay):
def test_no_subscription(self): def test_relay_no_subscription(self):
self.check_publish_without_subscription(self.test_pubsub_topic) self.check_publish_without_relay_subscription(self.test_pubsub_topic)
def test_subscribe_to_single_pubsub_topic(self): def test_relay_subscribe_to_single_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
def test_subscribe_to_already_existing_pubsub_topic(self): def test_relay_subscribe_to_already_existing_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.check_published_message_reaches_peer() self.check_published_message_reaches_relay_peer()
def test_subscribe_with_multiple_overlapping_pubsub_topics(self): def test_relay_subscribe_with_multiple_overlapping_pubsub_topics(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3])
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[1:4]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[1:4])
for pubsub_topic in VALID_PUBSUB_TOPICS[:4]: for pubsub_topic in VALID_PUBSUB_TOPICS[:4]:
self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) self.wait_for_published_message_to_reach_relay_peer(pubsub_topic=pubsub_topic)
def test_subscribe_with_empty_pubsub_topic_list(self): def test_relay_subscribe_with_empty_pubsub_topic_list(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, []) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [])
def test_subscribe_with_invalid_pubsub_topic_format(self): def test_relay_subscribe_with_invalid_pubsub_topic_format(self):
success_pubsub_topics = [] success_pubsub_topics = []
for pubsub_topic in INVALID_PUBSUB_TOPICS: for pubsub_topic in INVALID_PUBSUB_TOPICS:
logger.debug(f"Running test with payload {pubsub_topic}") logger.debug(f"Running test with payload {pubsub_topic}")
try: try:
self.ensure_subscriptions_on_nodes(self.main_nodes, pubsub_topic) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, pubsub_topic)
success_pubsub_topics.append(pubsub_topic) success_pubsub_topics.append(pubsub_topic)
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) assert "Bad Request" in str(ex)
assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}" assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}"
def test_unsubscribe_from_single_pubsub_topic(self): def test_relay_unsubscribe_from_single_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
self.delete_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.delete_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.check_publish_without_subscription(self.test_pubsub_topic) self.check_publish_without_relay_subscription(self.test_pubsub_topic)
def test_unsubscribe_from_all_pubsub_topics(self): def test_relay_unsubscribe_from_all_pubsub_topics(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
for pubsub_topic in VALID_PUBSUB_TOPICS: for pubsub_topic in VALID_PUBSUB_TOPICS:
self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) self.wait_for_published_message_to_reach_relay_peer(pubsub_topic=pubsub_topic)
self.delete_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) self.delete_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
for pubsub_topic in VALID_PUBSUB_TOPICS: for pubsub_topic in VALID_PUBSUB_TOPICS:
self.check_publish_without_subscription(pubsub_topic) self.check_publish_without_relay_subscription(pubsub_topic)
def test_unsubscribe_from_some_pubsub_topics(self): def test_relay_unsubscribe_from_some_pubsub_topics(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
for pubsub_topic in VALID_PUBSUB_TOPICS: for pubsub_topic in VALID_PUBSUB_TOPICS:
self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic) self.wait_for_published_message_to_reach_relay_peer(pubsub_topic=pubsub_topic)
self.delete_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3]) self.delete_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3])
for pubsub_topic in VALID_PUBSUB_TOPICS[:3]: for pubsub_topic in VALID_PUBSUB_TOPICS[:3]:
self.check_publish_without_subscription(pubsub_topic) self.check_publish_without_relay_subscription(pubsub_topic)
for pubsub_topic in VALID_PUBSUB_TOPICS[3:]: for pubsub_topic in VALID_PUBSUB_TOPICS[3:]:
self.check_published_message_reaches_peer(pubsub_topic=pubsub_topic) self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
def test_unsubscribe_from_non_existing_pubsub_topic(self): def test_relay_unsubscribe_from_non_existing_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
try: try:
self.delete_subscriptions_on_nodes(self.main_nodes, ["/waku/2/rs/999/99"]) self.delete_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[4:5])
if self.node1.is_nwaku(): if self.node1.is_nwaku():
pass # nwaku doesn't fail in this case pass # nwaku doesn't fail in this case
elif self.node1.is_gowaku(): elif self.node1.is_gowaku():
@ -79,32 +79,32 @@ class TestRelaySubscribe(StepsRelay):
raise Exception("Not implemented") raise Exception("Not implemented")
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
self.check_published_message_reaches_peer() self.check_published_message_reaches_relay_peer()
def test_unsubscribe_with_invalid_pubsub_topic_format(self): def test_relay_unsubscribe_with_invalid_pubsub_topic_format(self):
success_pubsub_topics = [] success_pubsub_topics = []
for pubsub_topic in INVALID_PUBSUB_TOPICS: for pubsub_topic in INVALID_PUBSUB_TOPICS:
logger.debug(f"Running test with payload {pubsub_topic}") logger.debug(f"Running test with payload {pubsub_topic}")
try: try:
self.delete_subscriptions_on_nodes(self.main_nodes, pubsub_topic) self.delete_relay_subscriptions_on_nodes(self.main_nodes, pubsub_topic)
success_pubsub_topics.append(pubsub_topic) success_pubsub_topics.append(pubsub_topic)
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) assert "Bad Request" in str(ex)
assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}" assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}"
def test_resubscribe_to_unsubscribed_pubsub_topic(self): def test_relay_resubscribe_to_unsubscribed_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
self.delete_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.delete_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.check_publish_without_subscription(self.test_pubsub_topic) self.check_publish_without_relay_subscription(self.test_pubsub_topic)
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.check_published_message_reaches_peer() self.check_published_message_reaches_relay_peer()
def test_publish_on_default_pubsub_topic_without_beeing_subscribed_to_it(self): def test_relay_publish_on_default_pubsub_topic_without_beeing_subscribed_to_it(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_relay_peer()
try: try:
self.check_published_message_reaches_peer(pubsub_topic=DEFAULT_PUBSUB_TOPIC) self.check_published_message_reaches_relay_peer(pubsub_topic=DEFAULT_PUBSUB_TOPIC)
raise AssertionError(f"Publish on {DEFAULT_PUBSUB_TOPIC} with beeing subscribed to it worked!!!") raise AssertionError(f"Publish on {DEFAULT_PUBSUB_TOPIC} with beeing subscribed to it worked!!!")
except Exception as ex: except Exception as ex:
assert "Not Found" in str(ex) assert "Not Found" in str(ex)