From 8b525f2b1ac98f07c4c3ce39a7fb1a93cf5b6c7b Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Thu, 11 Apr 2024 13:00:53 +0300 Subject: [PATCH] Light push tests (#27) * first set of tests * fix ci * new tests * finish publish tests * new tests * running nodes tests * new tests * finishing touches * new test * fixes * fixes --- scripts/README.md | 0 ...bug_1034_gowaku_fails_to_fetch_messages.sh | 0 ...8_light_push_fails_without_subscription.sh | 58 ++++ ...12_lightnodes_not_receiving_filter_push.sh | 0 ...bug_2567_light_push_fails_after_restart.sh | 73 +++++ scripts/qa_coding_challenge.sh | 90 ++++++ src/env_vars.py | 2 +- src/node/api_clients/base_client.py | 35 +-- src/node/api_clients/rest.py | 3 + src/node/waku_node.py | 3 + src/steps/light_push.py | 152 +++++++++ src/test_data.py | 7 + tests/light_push/__init__.py | 0 tests/light_push/test_multiple_nodes.py | 68 ++++ tests/light_push/test_publish.py | 292 ++++++++++++++++++ tests/light_push/test_running_nodes.py | 54 ++++ tests/relay/test_publish.py | 2 +- 17 files changed, 803 insertions(+), 36 deletions(-) mode change 100644 => 100755 scripts/README.md mode change 100644 => 100755 scripts/gowaku_bug_1034_gowaku_fails_to_fetch_messages.sh create mode 100755 scripts/gowaku_bug_1078_light_push_fails_without_subscription.sh mode change 100644 => 100755 scripts/nwaku_bug_2512_lightnodes_not_receiving_filter_push.sh create mode 100755 scripts/nwaku_bug_2567_light_push_fails_after_restart.sh create mode 100755 scripts/qa_coding_challenge.sh create mode 100644 src/steps/light_push.py create mode 100644 tests/light_push/__init__.py create mode 100644 tests/light_push/test_multiple_nodes.py create mode 100644 tests/light_push/test_publish.py create mode 100644 tests/light_push/test_running_nodes.py diff --git a/scripts/README.md b/scripts/README.md old mode 100644 new mode 100755 diff --git a/scripts/gowaku_bug_1034_gowaku_fails_to_fetch_messages.sh b/scripts/gowaku_bug_1034_gowaku_fails_to_fetch_messages.sh old mode 100644 new mode 100755 diff --git a/scripts/gowaku_bug_1078_light_push_fails_without_subscription.sh b/scripts/gowaku_bug_1078_light_push_fails_without_subscription.sh new file mode 100755 index 0000000000..6917b1e9e9 --- /dev/null +++ b/scripts/gowaku_bug_1078_light_push_fails_without_subscription.sh @@ -0,0 +1,58 @@ +#!/bin/bash +printf "\nAssuming you already have a docker network called waku\n" +# if not something like this should create it: docker network create --driver bridge --subnet 172.18.0.0/16 --gateway 172.18.0.1 waku + + +cluster_id=2 +pubsub_topic="/waku/2/rs/$cluster_id/0" +node_1=harbor.status.im/wakuorg/nwaku:latest +node_2=harbor.status.im/wakuorg/go-waku:latest +ext_ip="172.18.204.9" +tcp_port="37344" + +printf "\nStarting containers\n" + +container_id1=$(docker run -d -i -t -p 37343:37343 -p $tcp_port:$tcp_port -p 37345:37345 -p 37346:37346 -p 37347:37347 $node_1 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=37345 --rest-port=37343 --tcp-port=$tcp_port --discv5-udp-port=37346 --rest-address=0.0.0.0 --nat=extip:$ext_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=37347 --metrics-logging=true --pubsub-topic=/waku/2/rs/2/0 --lightpush=true --relay=true) +docker network connect --ip $ext_ip waku $container_id1 + +printf "\nSleeping 2 seconds\n" +sleep 2 + +response=$(curl -X GET "http://127.0.0.1:37343/debug/v1/info" -H "accept: application/json") +enrUri=$(echo $response | jq -r '.enrUri') + +# Extract the first non-WebSocket address +ws_address=$(echo $response | jq -r '.listenAddresses[] | select(contains("/ws") | not)') + +# Check if we got an address, and construct the new address with it +if [[ $ws_address != "" ]]; then + identifier=$(echo $ws_address | awk -F'/p2p/' '{print $2}') + if [[ $identifier != "" ]]; then + multiaddr_with_id="/ip4/${ext_ip}/tcp/${tcp_port}/p2p/${identifier}" + else + echo "No identifier found in the address." + exit 1 + fi +else + echo "No non-WebSocket address found." + exit 1 +fi + +container_id2=$(docker run -d -i -t -p 25908:25908 -p 25909:25909 -p 25910:25910 -p 25911:25911 -p 25912:25912 $node_2 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=DEBUG --rest-relay-cache-capacity=100 --websocket-port=25910 --rest-port=25908 --tcp-port=25909 --discv5-udp-port=25911 --rest-address=0.0.0.0 --nat=extip:172.18.141.214 --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --min-relay-peers-to-publish=1 --rest-filter-cache-capacity=50 --pubsub-topic=/waku/2/rs/2/0 --lightpush=true --relay=false --discv5-bootstrap-node=$enrUri --lightpushnode=$multiaddr_with_id) + +docker network connect --ip 172.18.141.214 waku $container_id2 + +printf "\nSleeping 10 seconds\n" +sleep 10 + +printf "\nSubscribe\n" +curl -v -X POST "http://127.0.0.1:37343/relay/v1/subscriptions" -H "Content-Type: application/json" -d '["/waku/2/rs/2/0"]' + + +printf "\nSleeping 2 seconds\n" +sleep 2 + +printf "\nLightpush message on subscribed pubusub topic\n" +curl -v -X POST "http://127.0.0.1:25908/lightpush/v1/message" -H "Content-Type: application/json" -d '{"pubsubTopic": "/waku/2/rs/2/0", "message": {"payload": "", "contentTopic": "/myapp/1/latest/proto", "timestamp": 1712149720320589312}}' +# printf "\nLightpush message on non subscribed pubusub topic\n" +# curl -v -X POST "http://127.0.0.1:25908/lightpush/v1/message" -H "Content-Type: application/json" -d '{"pubsubTopic": "/waku/2/rs/2/1", "message": {"payload": "TGlnaHQgcHVzaCB3b3JrcyEh", "contentTopic": "/myapp/1/latest/proto", "timestamp": 1712149720320589312}}' \ No newline at end of file diff --git a/scripts/nwaku_bug_2512_lightnodes_not_receiving_filter_push.sh b/scripts/nwaku_bug_2512_lightnodes_not_receiving_filter_push.sh old mode 100644 new mode 100755 diff --git a/scripts/nwaku_bug_2567_light_push_fails_after_restart.sh b/scripts/nwaku_bug_2567_light_push_fails_after_restart.sh new file mode 100755 index 0000000000..e1813cbfe1 --- /dev/null +++ b/scripts/nwaku_bug_2567_light_push_fails_after_restart.sh @@ -0,0 +1,73 @@ +#!/bin/bash +printf "\nAssuming you already have a docker network called waku\n" +# if not something like this should create it: docker network create --driver bridge --subnet 172.18.0.0/16 --gateway 172.18.0.1 waku + + +cluster_id=2 +pubsub_topic="/waku/2/rs/$cluster_id/0" +node_1=harbor.status.im/wakuorg/nwaku:latest +node_2=harbor.status.im/wakuorg/nwaku:latest +ext_ip="172.18.204.9" +tcp_port="37344" + +printf "\nStarting containers\n" + +container_id1=$(docker run -d -i -t -p 37343:37343 -p $tcp_port:$tcp_port -p 37345:37345 -p 37346:37346 -p 37347:37347 $node_1 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=37345 --rest-port=37343 --tcp-port=$tcp_port --discv5-udp-port=37346 --rest-address=0.0.0.0 --nat=extip:$ext_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=37347 --metrics-logging=true --pubsub-topic=/waku/2/rs/2/0 --lightpush=true --relay=true) +docker network connect --ip $ext_ip waku $container_id1 + +printf "\nSleeping 2 seconds\n" +sleep 2 + +response=$(curl -X GET "http://127.0.0.1:37343/debug/v1/info" -H "accept: application/json") +enrUri=$(echo $response | jq -r '.enrUri') + +# Extract the first non-WebSocket address +ws_address=$(echo $response | jq -r '.listenAddresses[] | select(contains("/ws") | not)') + +# Check if we got an address, and construct the new address with it +if [[ $ws_address != "" ]]; then + identifier=$(echo $ws_address | awk -F'/p2p/' '{print $2}') + if [[ $identifier != "" ]]; then + multiaddr_with_id="/ip4/${ext_ip}/tcp/${tcp_port}/p2p/${identifier}" + echo $multiaddr_with_id + else + echo "No identifier found in the address." + exit 1 + fi +else + echo "No non-WebSocket address found." + exit 1 +fi + +container_id2=$(docker run -d -i -t -p 25908:25908 -p 25909:25909 -p 25910:25910 -p 25911:25911 -p 25912:25912 $node_2 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=25910 --rest-port=25908 --tcp-port=25909 --discv5-udp-port=25911 --rest-address=0.0.0.0 --nat=extip:172.18.141.214 --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --pubsub-topic=/waku/2/rs/2/0 --lightpush=true --relay=false --discv5-bootstrap-node=$enrUri --lightpushnode=$multiaddr_with_id) + +docker network connect --ip 172.18.141.214 waku $container_id2 + +printf "\nSleeping 10 seconds\n" +sleep 10 + +printf "\nSubscribe\n" +curl -v -X POST "http://127.0.0.1:37343/relay/v1/subscriptions" -H "Content-Type: application/json" -d '["/waku/2/rs/2/0"]' + + +printf "\nSleeping 2 seconds\n" +sleep 2 + +printf "\nLightpush message on subscribed pubusub topic\n" +curl -v -X POST "http://127.0.0.1:25908/lightpush/v1/message" -H "Content-Type: application/json" -d '{"pubsubTopic": "/waku/2/rs/2/0", "message": {"payload": "TGlnaHQgcHVzaCB3b3JrcyEh", "contentTopic": "/myapp/1/latest/proto", "timestamp": 1712149720320589312}}' + +printf "\nRestarting NODE 1\n" +docker restart $container_id1 + +printf "\nSleeping 10 seconds\n" +sleep 10 + +printf "\nSubscribe\n" +curl -v -X POST "http://127.0.0.1:37343/relay/v1/subscriptions" -H "Content-Type: application/json" -d '["/waku/2/rs/2/0"]' + + +printf "\nSleeping 2 seconds\n" +sleep 2 + +printf "\nLightpush message on subscribed pubusub topic\n" +curl -v -X POST "http://127.0.0.1:25908/lightpush/v1/message" -H "Content-Type: application/json" -d '{"pubsubTopic": "/waku/2/rs/2/0", "message": {"payload": "TGlnaHQgcHVzaCB3b3JrcyEh", "contentTopic": "/myapp/1/latest/proto", "timestamp": 1712149720320589312}}' diff --git a/scripts/qa_coding_challenge.sh b/scripts/qa_coding_challenge.sh new file mode 100755 index 0000000000..43e59bb46f --- /dev/null +++ b/scripts/qa_coding_challenge.sh @@ -0,0 +1,90 @@ +#!/bin/bash +printf "\nAssuming you already have a docker network called waku\n" +# if not something like this should create it: docker network create --driver bridge --subnet 172.18.0.0/16 --gateway 172.18.0.1 waku + +printf "\nStarting containers\n" + +container_id1=$(docker run -d -i -t \ + -p 21161:21161 \ + -p 21162:21162 \ + -p 21163:21163 \ + -p 21164:21164 \ + -p 21165:21165 \ + wakuorg/nwaku:v0.24.0 \ + --listen-address=0.0.0.0 \ + --rest=true \ + --rest-admin=true \ + --websocket-support=true \ + --log-level=TRACE \ + --rest-relay-cache-capacity=100 \ + --websocket-port=21163 \ + --rest-port=21161 \ + --tcp-port=21162 \ + --discv5-udp-port=21164 \ + --rest-address=0.0.0.0 \ + --nat=extip:172.18.111.226 \ + --peer-exchange=true \ + --discv5-discovery=true \ + --relay=true) + +docker network connect --ip 172.18.111.226 waku $container_id1 + +printf "\nSleeping 2 seconds\n" +sleep 2 + +response=$(curl -X GET "http://127.0.0.1:21161/debug/v1/info" -H "accept: application/json") +enrUri=$(echo $response | jq -r '.enrUri') + +container_id2=$(docker run -d -i -t \ + -p 22161:22161 \ + -p 22162:22162 \ + -p 22163:22163 \ + -p 22164:22164 \ + -p 22165:22165 \ + wakuorg/nwaku:v0.24.0 \ + --listen-address=0.0.0.0 \ + --rest=true \ + --rest-admin=true \ + --websocket-support=true \ + --log-level=TRACE \ + --rest-relay-cache-capacity=100 \ + --websocket-port=22163 \ + --rest-port=22161 \ + --tcp-port=22162 \ + --discv5-udp-port=22164 \ + --rest-address=0.0.0.0 \ + --nat=extip:172.18.111.227 \ + --peer-exchange=true \ + --discv5-discovery=true \ + --discv5-bootstrap-node=$enrUri \ + --relay=true) + +docker network connect --ip 172.18.111.227 waku $container_id2 + +printf "\nSleeping 1 seconds\n" +sleep 1 + +printf "\nSubscribe\n" +curl -X POST "http://127.0.0.1:21161/relay/v1/auto/subscriptions" -H "accept: text/plain" -H "content-type: application/json" -d '["/my-app/2/chatroom-1/proto"]' +curl -X POST "http://127.0.0.1:22161/relay/v1/auto/subscriptions" -H "accept: text/plain" -H "content-type: application/json" -d '["/my-app/2/chatroom-1/proto"]' + + +printf "\nSleeping 60 seconds\n" +sleep 60 + +printf "\nRelay from NODE 1\n" +curl -X POST "http://127.0.0.1:21161/relay/v1/auto/messages" -H "content-type: application/json" -d '{"payload":"UmVsYXkgd29ya3MhIQ==","contentTopic":"/my-app/2/chatroom-1/proto","timestamp":0}' + +printf "\nSleeping 1 seconds\n" +sleep 1 + +printf "\nCheck message in NODE 2\n" +response=$(curl -X GET "http://127.0.0.1:22161/relay/v1/auto/messages/%2Fmy-app%2F2%2Fchatroom-1%2Fproto" -H "Content-Type: application/json") + +printf "\nResponse: $response\n" +if [ "$response" == "[]" ]; then + echo "Error: NODE 2 didn't find the message" + exit 1 +else + echo "Success: NODE 2 received the message" +fi \ No newline at end of file diff --git a/src/env_vars.py b/src/env_vars.py index 8cb62eda41..a588185616 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -27,7 +27,7 @@ IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24") GATEWAY = get_env_var("GATEWAY", "172.18.0.1") RUNNING_IN_CI = get_env_var("CI") NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68") -API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 10) +API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20) RLN_CREDENTIALS = get_env_var("RLN_CREDENTIALS") # example for .env file diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py index 3c7e3df8d6..b731e0f3fe 100644 --- a/src/node/api_clients/base_client.py +++ b/src/node/api_clients/base_client.py @@ -1,13 +1,12 @@ import json import requests -from abc import ABC, abstractmethod from src.env_vars import API_REQUEST_TIMEOUT from src.libs.custom_logger import get_custom_logger logger = get_custom_logger(__name__) -class BaseClient(ABC): +class BaseClient: def make_request(self, method, url, headers=None, data=None): self.log_request_as_curl(method, url, headers, data) response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) @@ -36,35 +35,3 @@ class BaseClient(ABC): headers_str_for_log = " ".join([f'-H "{key}: {value}"' for key, value in headers.items()]) if headers else "" curl_cmd = f"curl -v -X {method.upper()} \"{url}\" {headers_str_for_log} -d '{data}'" logger.info(curl_cmd) - - @abstractmethod - def info(self): - pass - - @abstractmethod - def set_relay_subscriptions(self, pubsub_topics): - pass - - @abstractmethod - def delete_relay_subscriptions(self, pubsub_topics): - pass - - @abstractmethod - def send_relay_message(self, message, pubsub_topic): - pass - - @abstractmethod - def get_relay_messages(self, pubsub_topic): - pass - - @abstractmethod - def set_filter_subscriptions(self, subscription): - pass - - @abstractmethod - def delete_filter_subscriptions(self, subscription): - pass - - @abstractmethod - def get_filter_messages(self, content_topic): - pass diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index 8cde7fba2b..a9ec1fdc42 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -44,6 +44,9 @@ class REST(BaseClient): def send_relay_auto_message(self, message): return self.rest_call("post", "relay/v1/auto/messages", json.dumps(message)) + def send_light_push_message(self, payload): + return self.rest_call("post", "lightpush/v1/message", json.dumps(payload)) + def get_relay_messages(self, pubsub_topic): get_messages_response = self.rest_call("get", f"relay/v1/messages/{quote(pubsub_topic, safe='')}") return get_messages_response.json() diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 799e887232..70ea5c361f 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -233,6 +233,9 @@ class WakuNode: def send_relay_auto_message(self, message): return self._api.send_relay_auto_message(message) + def send_light_push_message(self, payload): + return self._api.send_light_push_message(payload) + def get_relay_messages(self, pubsub_topic): return self._api.get_relay_messages(pubsub_topic) diff --git a/src/steps/light_push.py b/src/steps/light_push.py new file mode 100644 index 0000000000..bb66eaa480 --- /dev/null +++ b/src/steps/light_push.py @@ -0,0 +1,152 @@ +import inspect +import os +from src.libs.custom_logger import get_custom_logger +from time import time +import pytest +import allure +from src.libs.common import to_base64, delay, gen_step_id +from src.node.waku_message import WakuMessage +from src.env_vars import ( + ADDITIONAL_NODES, + NODE_1, + NODE_2, +) +from src.node.waku_node import WakuNode, rln_credential_store_ready +from tenacity import retry, stop_after_delay, wait_fixed +from src.test_data import VALID_PUBSUB_TOPICS + +logger = get_custom_logger(__name__) + + +class StepsLightPush: + test_content_topic = "/myapp/1/latest/proto" + test_pubsub_topic = "/waku/2/rs/0/0" + test_payload = "Light push works!!" + + @pytest.fixture(scope="function", autouse=True) + def light_push_setup(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.main_receiving_nodes = [] + self.optional_nodes = [] + self.multiaddr_list = [] + + @allure.step + def add_node_peer(self, node): + if node.is_nwaku(): + for multiaddr in self.multiaddr_list: + node.add_peers([multiaddr]) + + @allure.step + def start_receiving_node(self, image, node_index, **kwargs): + node = WakuNode(image, f"receiving_node{node_index}_{self.test_id}") + node.start(**kwargs) + if kwargs["relay"] == "true": + self.main_receiving_nodes.extend([node]) + self.add_node_peer(node) + self.multiaddr_list.extend([node.get_multiaddr_with_id()]) + return node + + @allure.step + def setup_lightpush_node(self, image, node_index, **kwargs): + node = WakuNode(image, f"lightpush_node{node_index}_{self.test_id}") + node.start(discv5_bootstrap_node=self.enr_uri, lightpushnode=self.multiaddr_list[0], **kwargs) + if kwargs["relay"] == "true": + self.main_receiving_nodes.extend([node]) + self.add_node_peer(node) + return node + + @allure.step + def setup_first_receiving_node(self, lightpush="true", relay="true", **kwargs): + self.receiving_node1 = self.start_receiving_node(NODE_1, node_index=1, lightpush=lightpush, relay=relay, **kwargs) + self.enr_uri = self.receiving_node1.get_enr_uri() + + @allure.step + def setup_second_receiving_node(self, lightpush, relay, **kwargs): + self.receiving_node2 = self.start_receiving_node(NODE_1, node_index=2, lightpush=lightpush, relay=relay, **kwargs) + + @allure.step + def setup_additional_receiving_nodes(self, node_list=ADDITIONAL_NODES, **kwargs): + if node_list: + nodes = [node.strip() for node in node_list.split(",") if node] + else: + pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test") + for index, node in enumerate(nodes): + self.start_receiving_node(node, node_index=index + 2, lightpush="true", relay="true", **kwargs) + + @allure.step + def setup_first_lightpush_node(self, lightpush="true", relay="false", **kwargs): + self.light_push_node1 = self.setup_lightpush_node(NODE_2, node_index=1, lightpush=lightpush, relay=relay, **kwargs) + + @allure.step + def setup_second_lightpush_node(self, lightpush="true", relay="false", **kwargs): + self.light_push_node2 = self.setup_lightpush_node(NODE_2, node_index=2, lightpush=lightpush, relay=relay, **kwargs) + + @allure.step + def setup_additional_lightpush_nodes(self, node_list=ADDITIONAL_NODES, **kwargs): + if node_list: + nodes = [node.strip() for node in node_list.split(",") if node] + else: + pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test") + self.additional_lightpush_nodes = [] + for index, node in enumerate(nodes): + node = self.setup_lightpush_node(node, node_index=index + 2, lightpush="true", relay="false", **kwargs) + self.additional_lightpush_nodes.append(node) + + @allure.step + def subscribe_to_pubsub_topics_via_relay(self, node=None, pubsub_topics=None): + if pubsub_topics is None: + pubsub_topics = [self.test_pubsub_topic] + if not node: + node = self.main_receiving_nodes + if isinstance(node, list): + for node in node: + node.set_relay_subscriptions(pubsub_topics) + else: + node.set_relay_subscriptions(pubsub_topics) + + @allure.step + def subscribe_to_pubsub_topics_via_filter(self, node, pubsub_topic=None, content_topic=None): + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if content_topic is None: + content_topic = [self.test_content_topic] + subscription = {"requestId": "1", "contentFilters": content_topic, "pubsubTopic": pubsub_topic} + node.set_filter_subscriptions(subscription) + + @allure.step + def check_light_pushed_message_reaches_receiving_peer( + self, pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None, peer_list=None + ): + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if not sender: + sender = self.light_push_node1 + if not peer_list: + peer_list = self.main_receiving_nodes + self.optional_nodes + payload = self.create_payload(pubsub_topic, message) + logger.debug("Lightpushing message") + sender.send_light_push_message(payload) + delay(message_propagation_delay) + for index, peer in enumerate(peer_list): + logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the lightpushed message") + get_messages_response = peer.get_relay_messages(pubsub_topic) + assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" + assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" + waku_message = WakuMessage(get_messages_response) + waku_message.assert_received_message(payload["message"]) + + @allure.step + def create_message(self, **kwargs): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message.update(kwargs) + return message + + @allure.step + def create_payload(self, pubsub_topic=None, message=None, **kwargs): + if message is None: + message = self.create_message() + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + payload = {"pubsubTopic": pubsub_topic, "message": message} + payload.update(kwargs) + return payload diff --git a/src/test_data.py b/src/test_data.py index 5f162873bb..3cbb7da193 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -121,6 +121,13 @@ PUBSUB_TOPICS_SAME_CLUSTER = [ "/waku/2/rs/2/7", ] +PUBSUB_TOPICS_WRONG_FORMAT = [ + {"description": "A dictionary", "value": {"key": "YWFh"}}, + {"description": "An integer", "value": 1234567890}, + {"description": "A list", "value": ["YWFh"]}, + {"description": "A bool", "value": True}, +] + SAMPLE_TIMESTAMPS = [ {"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku", "gowaku"]}, { diff --git a/tests/light_push/__init__.py b/tests/light_push/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/light_push/test_multiple_nodes.py b/tests/light_push/test_multiple_nodes.py new file mode 100644 index 0000000000..b66d28049c --- /dev/null +++ b/tests/light_push/test_multiple_nodes.py @@ -0,0 +1,68 @@ +from src.libs.common import delay +from src.steps.light_push import StepsLightPush + + +class TestMultipleNodes(StepsLightPush): + def test_2_lightpush_nodes_and_1_receiving_node(self): + self.setup_first_receiving_node(lightpush="true", relay="true") + self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_second_lightpush_node(lightpush="true", relay="false") + self.subscribe_to_pubsub_topics_via_relay() + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node2) + + def test_2_receiving_nodes__relay_node1_forwards_lightpushed_message_to_relay_node2(self): + self.setup_first_receiving_node(lightpush="true", relay="true") + self.setup_second_receiving_node(lightpush="false", relay="true") + self.setup_first_lightpush_node(lightpush="true", relay="false") + self.subscribe_to_pubsub_topics_via_relay() + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) + + def test_2_receiving_nodes__relay_node1_forwards_lightpushed_message_to_filter_node2(self): + self.setup_first_receiving_node(lightpush="true", relay="true", filter="true") + self.setup_second_receiving_node(lightpush="false", relay="false", filternode=self.receiving_node1.get_multiaddr_with_id()) + self.setup_first_lightpush_node(lightpush="true", relay="false") + self.subscribe_to_pubsub_topics_via_relay(node=self.receiving_node1) + self.subscribe_to_pubsub_topics_via_filter(node=self.receiving_node2) + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) + get_messages_response = self.receiving_node2.get_filter_messages(self.test_content_topic) + assert len(get_messages_response) == 1, "Lightpushed message was not relayed to the filter node" + + def test_2_lightpush_nodes_and_2_receiving_nodes(self): + self.setup_first_receiving_node(lightpush="true", relay="true") + self.setup_second_receiving_node(lightpush="false", relay="true") + self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_second_lightpush_node(lightpush="true", relay="false") + self.subscribe_to_pubsub_topics_via_relay() + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node2) + + def test_combination_of_different_nodes(self): + self.setup_first_receiving_node(lightpush="true", relay="true", filter="true") + self.setup_second_receiving_node(lightpush="false", relay="false", filternode=self.receiving_node1.get_multiaddr_with_id()) + self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_second_lightpush_node(lightpush="true", relay="true") + self.subscribe_to_pubsub_topics_via_relay(node=self.receiving_node1) + self.subscribe_to_pubsub_topics_via_relay(node=self.light_push_node2) + self.subscribe_to_pubsub_topics_via_filter(node=self.receiving_node2) + delay(1) + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node2) + get_messages_response = self.receiving_node2.get_filter_messages(self.test_content_topic) + assert len(get_messages_response) == 2, "Lightpushed message was not relayed to the filter node" + + def test_multiple_receiving_nodes(self): + self.setup_first_receiving_node(lightpush="true", relay="true") + self.setup_additional_receiving_nodes() + self.setup_first_lightpush_node(lightpush="true", relay="false") + self.subscribe_to_pubsub_topics_via_relay() + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) + + def test_multiple_lightpush_nodes(self): + self.setup_first_receiving_node(lightpush="true", relay="true") + self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_additional_lightpush_nodes() + self.subscribe_to_pubsub_topics_via_relay() + self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) + for node in self.additional_lightpush_nodes: + self.check_light_pushed_message_reaches_receiving_peer(sender=node) diff --git a/tests/light_push/test_publish.py b/tests/light_push/test_publish.py new file mode 100644 index 0000000000..53cae00af8 --- /dev/null +++ b/tests/light_push/test_publish.py @@ -0,0 +1,292 @@ +import pytest +from src.env_vars import NODE_2 +from src.libs.custom_logger import get_custom_logger +from time import time +from src.libs.common import delay, to_base64 +from src.steps.light_push import StepsLightPush +from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, PUBSUB_TOPICS_WRONG_FORMAT, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS, VALID_PUBSUB_TOPICS + +logger = get_custom_logger(__name__) + + +class TestLightPushPublish(StepsLightPush): + @pytest.fixture(scope="function", autouse=True) + def light_push_publish_setup(self, light_push_setup): + self.setup_first_receiving_node() + self.setup_first_lightpush_node() + self.subscribe_to_pubsub_topics_via_relay() + + def test_light_push_with_valid_payloads(self): + failed_payloads = [] + for payload in SAMPLE_INPUTS: + logger.debug(f'Running test with payload {payload["description"]}') + message = self.create_message(payload=to_base64(payload["value"])) + try: + self.check_light_pushed_message_reaches_receiving_peer(message=message) + except Exception as e: + logger.error(f'Payload {payload["description"]} failed: {str(e)}') + failed_payloads.append(payload["description"]) + assert not failed_payloads, f"Payloads failed: {failed_payloads}" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1079") + def test_light_push_with_invalid_payloads(self): + success_payloads = [] + for payload in INVALID_PAYLOADS: + logger.debug(f'Running test with payload {payload["description"]}') + payload = self.create_payload(message=self.create_message(payload=payload["value"])) + try: + self.light_push_node1.send_light_push_message(payload) + success_payloads.append(payload) + except Exception as ex: + assert "Bad Request" in str(ex) + assert not success_payloads, f"Invalid Payloads that didn't failed: {success_payloads}" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1079") + def test_light_push_with_missing_payload(self): + message = {"contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + try: + self.light_push_node1.send_light_push_message(self.create_payload(message=message)) + raise AssertionError("Light push with missing payload worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_light_push_with_payload_less_than_300_kb(self): + payload_length = 1024 * 200 # after encoding to base64 this will be close to 260KB + logger.debug(f"Running test with payload length of {payload_length} bytes") + message = self.create_message(payload=to_base64("a" * (payload_length))) + self.check_light_pushed_message_reaches_receiving_peer(message=message) + + @pytest.mark.xfail("nwaku" in NODE_2, reason="https://github.com/waku-org/nwaku/issues/2565") + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1076") + def test_light_push_with_payload_around_300_kb(self): + payload_length = 1024 * 225 # after encoding to base64 this will be close to 300KB + logger.debug(f"Running test with payload length of {payload_length} bytes") + message = self.create_message(payload=to_base64("a" * (payload_length))) + try: + self.check_light_pushed_message_reaches_receiving_peer(message=message, message_propagation_delay=2) + raise AssertionError("Message with payload > 1MB was received") + except Exception as ex: + assert "couldn't find any messages" in str(ex) + + @pytest.mark.xfail("nwaku" in NODE_2, reason="https://github.com/waku-org/nwaku/issues/2565") + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1076") + def test_light_push_with_payload_more_than_1_MB(self): + payload_length = 1024 * 1024 + logger.debug(f"Running test with payload length of {payload_length} bytes") + message = self.create_message(payload=to_base64("a" * (payload_length))) + try: + self.check_light_pushed_message_reaches_receiving_peer(message=message, message_propagation_delay=2) + raise AssertionError("Message with payload > 1MB was received") + except Exception as ex: + assert "couldn't find any messages" in str(ex) + + def test_light_push_with_valid_content_topics(self): + failed_content_topics = [] + for content_topic in SAMPLE_INPUTS: + logger.debug(f'Running test with content topic {content_topic["description"]}') + message = self.create_message(contentTopic=content_topic["value"]) + try: + self.check_light_pushed_message_reaches_receiving_peer(message=message) + except Exception as e: + logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}') + failed_content_topics.append(content_topic) + assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1079") + def test_light_push_with_invalid_content_topics(self): + success_content_topics = [] + for content_topic in INVALID_CONTENT_TOPICS: + logger.debug(f'Running test with contetn topic {content_topic["description"]}') + message = self.create_message(contentTopic=content_topic["value"]) + try: + self.check_light_pushed_message_reaches_receiving_peer(message=message) + success_content_topics.append(content_topic) + except Exception as ex: + assert "Bad Request" in str(ex) + assert not success_content_topics, f"Invalid Content topics that didn't failed: {success_content_topics}" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1079") + def test_light_push_with_missing_content_topic(self): + message = {"payload": to_base64(self.test_payload), "timestamp": int(time() * 1e9)} + try: + self.light_push_node1.send_light_push_message(self.create_payload(message=message)) + raise AssertionError("Light push with missing content_topic worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_light_push_on_multiple_pubsub_topics(self): + self.subscribe_to_pubsub_topics_via_relay(pubsub_topics=VALID_PUBSUB_TOPICS) + failed_pubsub_topics = [] + for pubsub_topic in VALID_PUBSUB_TOPICS: + logger.debug(f"Running test with pubsub topic {pubsub_topic}") + try: + self.check_light_pushed_message_reaches_receiving_peer(pubsub_topic=pubsub_topic) + except Exception as e: + logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}") + failed_pubsub_topics.append(pubsub_topic) + assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}" + + def test_message_light_pushed_on_different_pubsub_topic_is_not_retrieved(self): + self.subscribe_to_pubsub_topics_via_relay(pubsub_topics=VALID_PUBSUB_TOPICS) + payload = self.create_payload(pubsub_topic=VALID_PUBSUB_TOPICS[0]) + self.light_push_node1.send_light_push_message(payload) + delay(0.1) + messages = self.receiving_node1.get_relay_messages(VALID_PUBSUB_TOPICS[1]) + assert not messages, "Message was retrieved on wrong pubsub_topic" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1078") + def test_light_push_on_non_subscribed_pubsub_topic(self): + try: + self.check_light_pushed_message_reaches_receiving_peer(pubsub_topic=VALID_PUBSUB_TOPICS[1]) + raise AssertionError("Light push on unsubscribed pubsub_topic worked!!!") + except Exception as ex: + assert "Not Found" in str(ex) or "Internal Server Error" in str(ex) + + def test_light_push_with_invalid_pubsub_topics(self): + success_content_topics = [] + for pubsub_topic in PUBSUB_TOPICS_WRONG_FORMAT: + logger.debug(f"Running test with pubsub topic {pubsub_topic}") + try: + self.check_light_pushed_message_reaches_receiving_peer(pubsub_topic=pubsub_topic["value"]) + success_content_topics.append(pubsub_topic) + except Exception as ex: + assert "Bad Request" in str(ex) + assert not success_content_topics, f"Invalid Content topics that didn't failed: {success_content_topics}" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1078") + def test_light_push_with_missing_pubsub_topics(self): + self.light_push_node1.send_light_push_message({"message": self.create_message()}) + delay(0.1) + messages = self.receiving_node1.get_relay_messages(self.test_pubsub_topic) + assert len(messages) == 1 + + def test_light_push_with_valid_timestamps(self): + failed_timestamps = [] + for timestamp in SAMPLE_TIMESTAMPS: + if self.light_push_node1.type() in timestamp["valid_for"]: + logger.debug(f'Running test with timestamp {timestamp["description"]}') + message = self.create_message(timestamp=timestamp["value"]) + try: + self.check_light_pushed_message_reaches_receiving_peer(message=message) + except Exception as ex: + logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}') + failed_timestamps.append(timestamp) + assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}" + + def test_light_push_with_invalid_timestamps(self): + success_timestamps = [] + for timestamp in SAMPLE_TIMESTAMPS: + if self.light_push_node1.type() not in timestamp["valid_for"]: + logger.debug(f'Running test with timestamp {timestamp["description"]}') + message = self.create_message(timestamp=timestamp["value"]) + try: + self.check_light_pushed_message_reaches_receiving_peer(message=message) + success_timestamps.append(timestamp) + except Exception as e: + pass + assert not success_timestamps, f"Invalid Timestamps that didn't failed: {success_timestamps}" + + def test_light_push_with_no_timestamp(self): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic} + self.check_light_pushed_message_reaches_receiving_peer(message=message) + + def test_light_push_with_valid_version(self): + self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(version=10)) + + def test_light_push_with_invalid_version(self): + try: + self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(version=2.1)) + raise AssertionError("Light push with invalid version worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_light_push_with_valid_meta(self): + self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(meta=to_base64(self.test_payload))) + + def test_light_push_with_invalid_meta(self): + try: + self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(meta=self.test_payload)) + raise AssertionError("Light push with invalid meta worked!!!") + except Exception as ex: + assert "Bad Request" in str(ex) + + @pytest.mark.xfail("go-waku" in NODE_2, reason="https://github.com/waku-org/go-waku/issues/1079") + def test_light_push_with_with_large_meta(self): + meta_l = 1024 * 1 + try: + self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(meta=to_base64("a" * (meta_l)))) + except Exception as ex: + assert '(kind: InvalidLengthField, field: "meta")' in str(ex) + + def test_light_push_with_ephemeral(self): + failed_ephemeral = [] + for ephemeral in [True, False]: + logger.debug(f"Running test with Ephemeral {ephemeral}") + try: + self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(ephemeral=ephemeral)) + except Exception as e: + logger.error(f"Light push message with Ephemeral {ephemeral} failed: {str(e)}") + failed_ephemeral.append(ephemeral) + assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" + + def test_light_push_with_extra_field(self): + try: + self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(extraField="extraValue")) + if self.light_push_node1.is_nwaku(): + raise AssertionError("Relay publish with extra field worked!!!") + elif self.light_push_node1.is_gowaku(): + pass + else: + raise NotImplementedError("Not implemented for this node type") + except Exception as ex: + assert "Bad Request" in str(ex) + + def test_light_push_and_retrieve_duplicate_message(self): + message = self.create_message() + self.check_light_pushed_message_reaches_receiving_peer(message=message) + self.check_light_pushed_message_reaches_receiving_peer(message=message) + + def test_light_push_while_peer_is_paused(self): + message = self.create_message() + self.receiving_node1.stop() + try: + self.light_push_node1.send_light_push_message(self.create_payload(message=message)) + raise AssertionError("Push with peer stopped worked!!") + except Exception as ex: + assert "timed out" in str(ex) or "failed to dial" in str(ex) + + def test_light_push_after_node_pauses_and_pauses(self): + self.check_light_pushed_message_reaches_receiving_peer() + self.light_push_node1.pause() + self.light_push_node1.unpause() + self.check_light_pushed_message_reaches_receiving_peer() + self.receiving_node1.pause() + self.receiving_node1.unpause() + self.check_light_pushed_message_reaches_receiving_peer() + + def test_light_push_after_light_push_node_restarts(self): + self.check_light_pushed_message_reaches_receiving_peer() + self.light_push_node1.restart() + self.light_push_node1.ensure_ready() + self.check_light_pushed_message_reaches_receiving_peer() + + @pytest.mark.xfail(reason="https://github.com/waku-org/nwaku/issues/2567") + def test_light_push_after_receiving_node_restarts(self): + self.check_light_pushed_message_reaches_receiving_peer() + self.receiving_node1.restart() + self.receiving_node1.ensure_ready() + self.subscribe_to_pubsub_topics_via_relay() + self.check_light_pushed_message_reaches_receiving_peer() + + def test_light_push_and_retrieve_100_messages(self): + num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag + for index in range(num_messages): + message = self.create_message(payload=to_base64(f"M_{index}")) + self.light_push_node1.send_light_push_message(self.create_payload(message=message)) + delay(1) + messages = self.receiving_node1.get_relay_messages(self.test_pubsub_topic) + assert len(messages) == num_messages + for index, message in enumerate(messages): + assert message["payload"] == to_base64( + f"M_{index}" + ), f'Incorrect payload at index: {index}. Published {to_base64(f"M_{index}")} Received {message["payload"]}' diff --git a/tests/light_push/test_running_nodes.py b/tests/light_push/test_running_nodes.py new file mode 100644 index 0000000000..2e0609ed37 --- /dev/null +++ b/tests/light_push/test_running_nodes.py @@ -0,0 +1,54 @@ +from src.libs.common import delay +from src.steps.light_push import StepsLightPush + + +class TestRunningNodes(StepsLightPush): + def test_main_node_only_lightpush__peer_only_lightpush(self): + self.setup_first_receiving_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="false") + try: + self.light_push_node1.send_light_push_message(self.create_payload()) + raise AssertionError("Light push with non lightpush peer worked!!!") + except Exception as ex: + assert "no waku relay found" in str(ex) or "failed to negotiate protocol: protocols not supported" in str(ex) + + def test_main_node_only_lightpush__peer_only_filter(self): + self.setup_first_receiving_node(lightpush="false", relay="false", filter="true") + self.setup_first_lightpush_node(lightpush="true", relay="false") + try: + self.light_push_node1.send_light_push_message(self.create_payload()) + raise AssertionError("Light push with non lightpush peer worked!!!") + except Exception as ex: + assert "Failed to request a message push: dial_failure" in str(ex) or "failed to negotiate protocol: protocols not supported" in str(ex) + + def test_main_node_only_lightpush__peer_only_relay(self): + self.setup_first_receiving_node(lightpush="false", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + self.setup_first_lightpush_node(lightpush="true", relay="false") + try: + self.light_push_node1.send_light_push_message(self.create_payload()) + raise AssertionError("Light push with non lightpush peer worked!!!") + except Exception as ex: + assert "Failed to request a message push: dial_failure" in str(ex) or "failed to negotiate protocol: protocols not supported" in str(ex) + + def test_main_node_only_lightpush__peer_full(self): + self.setup_first_receiving_node(lightpush="true", relay="true", filter="true") + self.setup_first_lightpush_node(lightpush="true", relay="false") + self.subscribe_to_pubsub_topics_via_relay() + self.check_light_pushed_message_reaches_receiving_peer() + + def test_main_node_full__peer_full(self): + self.setup_first_receiving_node(lightpush="true", relay="true", filter="true") + self.setup_first_lightpush_node(lightpush="true", relay="true", filter="true") + self.subscribe_to_pubsub_topics_via_relay() + self.check_light_pushed_message_reaches_receiving_peer() + + def test_lightpush_node_with_relay_works_correctly(self): + self.test_main_node_full__peer_full() + self.light_push_node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + self.receiving_node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + delay(0.1) + response1 = self.receiving_node1.get_relay_messages(self.test_pubsub_topic) + assert len(response1) == 2 + response2 = self.light_push_node1.get_relay_messages(self.test_pubsub_topic) + assert len(response2) == 2 diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 4351a1444c..f92c6b2cba 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -44,7 +44,7 @@ class TestRelayPublish(StepsRelay): except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) - def test_publish_with_payload_less_than_one_150_kb(self): + def test_publish_with_payload_less_than_150_kb(self): payload_length = 1024 * 100 # after encoding to base64 this will be close to 150KB logger.debug(f"Running test with payload length of {payload_length} bytes") message = self.create_message(payload=to_base64("a" * (payload_length)))