diff --git a/scripts/store_v3.sh b/scripts/store_v3.sh new file mode 100755 index 0000000000..cff65acf13 --- /dev/null +++ b/scripts/store_v3.sh @@ -0,0 +1,111 @@ +#!/bin/bash +printf "\nAssuming you already have a docker network called waku\n" +# if not something like this should create it: docker network create --driver bridge --subnet 172.18.0.0/16 --gateway 172.18.0.1 waku + + +cluster_id=0 +pubsub_topic="/waku/2/rs/$cluster_id/0" +encoded_pubsub_topic=$(echo "$pubsub_topic" | sed 's:/:%2F:g') +content_topic="/test/1/store/proto" +encoded_content_topic=$(echo "$content_topic" | sed 's:/:%2F:g') +node_1=harbor.status.im/wakuorg/nwaku:latest +node_1_ip=172.18.64.13 +node_1_rest=32261 +node_1_tcp=32262 +node_2=harbor.status.im/wakuorg/nwaku:latest +node_2_ip=172.18.64.14 +node_2_rest=4588 + + +printf "\nStarting containers\n" + +container_id1=$(docker run -d -i -t -p $node_1_rest:$node_1_rest -p $node_1_tcp:$node_1_tcp -p 32263:32263 -p 32264:32264 -p 32265:32265 $node_1 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=32263 --rest-port=$node_1_rest --tcp-port=$node_1_tcp --discv5-udp-port=32264 --rest-address=0.0.0.0 --nat=extip:$node_1_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=32265 --metrics-logging=true --store=false --relay=true) +docker network connect --ip $node_1_ip waku $container_id1 + +printf "\nSleeping 2 seconds\n" +sleep 2 + +response=$(curl -X GET "http://127.0.0.1:$node_1_rest/debug/v1/info" -H "accept: application/json") +enrUri=$(echo $response | jq -r '.enrUri') + +# Extract the first non-WebSocket address +ws_address=$(echo $response | jq -r '.listenAddresses[] | select(contains("/ws") | not)') + +# Check if we got an address, and construct the new address with it +if [[ $ws_address != "" ]]; then + identifier=$(echo $ws_address | awk -F'/p2p/' '{print $2}') + if [[ $identifier != "" ]]; then + multiaddr_with_id="/ip4/${node_1_ip}/tcp/${node_1_tcp}/p2p/${identifier}" + echo $multiaddr_with_id + else + echo "No identifier found in the address." + exit 1 + fi +else + echo "No non-WebSocket address found." + exit 1 +fi + +container_id2=$(docker run -d -i -t -p $node_2_rest:$node_2_rest -p 4589:4589 -p 4590:4590 -p 4591:4591 -p 4592:4592 $node_2 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=4590 --rest-port=$node_2_rest --tcp-port=4589 --discv5-udp-port=4591 --rest-address=0.0.0.0 --nat=extip:$node_2_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=4592 --metrics-logging=true --discv5-bootstrap-node=$enrUri --storenode=$multiaddr_with_id --store=true --relay=true) + +docker network connect --ip $node_2_ip waku $container_id2 + +printf "\nSleeping 1 seconds\n" +sleep 1 + +printf "\nConnect peers\n" +curl -X POST "http://127.0.0.1:$node_2_rest/admin/v1/peers" -H "Content-Type: application/json" -d "[\"$multiaddr_with_id\"]" + +printf "\nSubscribe\n" +curl -X POST "http://127.0.0.1:$node_1_rest/relay/v1/subscriptions" -H "Content-Type: application/json" -d "[\"$pubsub_topic\"]" +curl -X POST "http://127.0.0.1:$node_2_rest/relay/v1/subscriptions" -H "Content-Type: application/json" -d "[\"$pubsub_topic\"]" + + +printf "\nSleeping 1 seconds\n" +sleep 1 + +printf "\nRelay from NODE 1\n" +curl -X POST "http://127.0.0.1:$node_1_rest/relay/v1/messages/$encoded_pubsub_topic" \ +-H "Content-Type: application/json" \ +-d '{"payload": "UmVsYXkgd29ya3MhIQ==", "contentTopic": "'"$content_topic"'", "timestamp": '$(date +%s%N)'}' + + +printf "\nSleeping 1 seconds\n" +sleep 1 + +printf "\nCheck message in NODE 2\n" +response=$(curl -X GET "http://127.0.0.1:$node_2_rest/relay/v1/messages/$encoded_pubsub_topic" -H "Content-Type: application/json") + +printf "\nResponse: $response\n" + +if [ "$response" == "[]" ]; then + echo "Error: NODE 2 didn't find the message" + exit 1 +else + echo "Success: NODE 2 received the message" +fi + + +printf "\nCheck message was stored in NODE 2 with v1 API\n" +response=$(curl -v -X GET "http://127.0.0.1:$node_2_rest/store/v1/messages?contentTopics=$encoded_content_topic&pageSize=5&ascending=true") + +printf "\nResponse: $response\n" + +if [ "$response" == "[]" ] || [ -z "$response" ]; then + echo "Error: NODE 2 didn't store the message with v1 API" + exit 1 +else + echo "Success: NODE 2 stored the message with v1 API" +fi + +printf "\nCheck message was stored in NODE 2 with v3 API\n" +response=$(curl -v -X GET "http://127.0.0.1:$node_2_rest/store/v3/messages?peerAddr=$multiaddr_with_id&contentTopics=$encoded_content_topic&pageSize=5&ascending=true") + +printf "\nResponse: $response\n" + +if [ "$response" == "[]" ] || [ -z "$response" ]; then + echo "Error: NODE 2 didn't stored the message with v3 API" + exit 1 +else + echo "Success: NODE 2 stored the message with v3 API" +fi \ No newline at end of file diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index a9ec1fdc42..bfb6bea0a4 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -82,3 +82,41 @@ class REST(BaseClient): endpoint = f"filter/v2/messages/{quote(content_topic, safe='')}" get_messages_response = self.rest_call("get", endpoint) return get_messages_response.json() + + def get_store_messages( + self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs + ): + base_url = f"store/{store_v}/messages" + params = [] + + if peerAddr is not None: + params.append(f"peerAddr={quote(peerAddr, safe='')}") + if includeData is not None: + params.append(f"includeData={includeData}") + if pubsubTopic is not None: + params.append(f"pubsubTopic={quote(pubsubTopic, safe='')}") + if contentTopics is not None: + params.append(f"contentTopics={quote(contentTopics, safe='')}") + if startTime is not None: + params.append(f"startTime={startTime}") + if endTime is not None: + params.append(f"endTime={endTime}") + if hashes is not None: + params.append(f"hashes={quote(hashes, safe='')}") + if cursor is not None: + params.append(f"cursor={quote(cursor, safe='')}") + if pageSize is not None: + params.append(f"pageSize={pageSize}") + if ascending is not None: + params.append(f"ascending={ascending}") + + # Append any additional keyword arguments to the parameters list + for key, value in kwargs.items(): + if value is not None: + params.append(f"{key}={quote(str(value), safe='')}") + + if params: + base_url += "?" + "&".join(params) + + get_messages_response = self.rest_call("get", base_url) + return get_messages_response.json() diff --git a/src/node/waku_message.py b/src/node/waku_message.py index 76b29ae87a..fb1f2f5496 100644 --- a/src/node/waku_message.py +++ b/src/node/waku_message.py @@ -17,16 +17,15 @@ class MessageRpcResponse: rate_limit_proof: Optional[dict] = field(default_factory=dict) -message_rpc_response_schema = class_schema(MessageRpcResponse)() - - class WakuMessage: - def __init__(self, message_response): + def __init__(self, message_response, schema=MessageRpcResponse): + self.schema = schema self.received_messages = message_response + self.message_rpc_response_schema = class_schema(self.schema)() @allure.step def assert_received_message(self, sent_message, index=0): - message = message_rpc_response_schema.load(self.received_messages[index]) + message = self.message_rpc_response_schema.load(self.received_messages[index]) def assert_fail_message(field_name): return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(message, field_name)}" diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 70ea5c361f..a4ba7d1aad 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -260,6 +260,24 @@ class WakuNode: def get_filter_messages(self, content_topic, pubsub_topic=None): return self._api.get_filter_messages(content_topic, pubsub_topic) + def get_store_messages( + self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs + ): + return self._api.get_store_messages( + peerAddr=peerAddr, + includeData=includeData, + pubsubTopic=pubsubTopic, + contentTopics=contentTopics, + startTime=startTime, + endTime=endTime, + hashes=hashes, + cursor=cursor, + pageSize=pageSize, + ascending=ascending, + store_v=store_v, + **kwargs, + ) + def get_metrics(self): if self.is_nwaku(): metrics = requests.get(f"http://localhost:{self._metrics_port}/metrics") diff --git a/src/steps/store.py b/src/steps/store.py new file mode 100644 index 0000000000..8777cc1279 --- /dev/null +++ b/src/steps/store.py @@ -0,0 +1,204 @@ +import inspect +import os +from src.libs.custom_logger import get_custom_logger +from time import time +import pytest +import allure +from src.libs.common import to_base64, delay, gen_step_id +from src.node.waku_message import WakuMessage +from src.env_vars import ( + ADDITIONAL_NODES, + NODE_1, + NODE_2, +) +from src.node.waku_node import WakuNode, rln_credential_store_ready +from tenacity import retry, stop_after_delay, wait_fixed +from src.test_data import VALID_PUBSUB_TOPICS + +logger = get_custom_logger(__name__) + + +class StepsStore: + test_content_topic = "/myapp/1/latest/proto" + test_pubsub_topic = "/waku/2/rs/0/0" + test_payload = "Store works!!" + + @pytest.fixture(scope="function", autouse=True) + def store_setup(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.main_publishing_nodes = [] + self.store_nodes = [] + self.optional_nodes = [] + self.multiaddr_list = [] + + @allure.step + def add_node_peer(self, node): + if node.is_nwaku(): + for multiaddr in self.multiaddr_list: + node.add_peers([multiaddr]) + + @allure.step + def start_publishing_node(self, image, node_index, **kwargs): + node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}") + node.start(**kwargs) + if kwargs["relay"] == "true": + self.main_publishing_nodes.extend([node]) + if kwargs["store"] == "true": + self.store_nodes.extend([node]) + self.add_node_peer(node) + self.multiaddr_list.extend([node.get_multiaddr_with_id()]) + return node + + @allure.step + def setup_store_node(self, image, node_index, **kwargs): + node = WakuNode(image, f"store_node{node_index}_{self.test_id}") + node.start(discv5_bootstrap_node=self.enr_uri, storenode=self.multiaddr_list[0], **kwargs) + if kwargs["relay"] == "true": + self.main_publishing_nodes.extend([node]) + self.store_nodes.extend([node]) + self.add_node_peer(node) + return node + + @allure.step + def setup_first_publishing_node(self, store="true", relay="true", **kwargs): + self.publishing_node1 = self.start_publishing_node(NODE_1, node_index=1, store=store, relay=relay, **kwargs) + self.enr_uri = self.publishing_node1.get_enr_uri() + + @allure.step + def setup_second_publishing_node(self, store, relay, **kwargs): + self.publishing_node2 = self.start_publishing_node(NODE_1, node_index=2, store=store, relay=relay, **kwargs) + + @allure.step + def setup_additional_publishing_nodes(self, node_list=ADDITIONAL_NODES, **kwargs): + if node_list: + nodes = [node.strip() for node in node_list.split(",") if node] + else: + pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test") + for index, node in enumerate(nodes): + self.start_publishing_node(node, node_index=index + 2, store="true", relay="true", **kwargs) + + @allure.step + def setup_first_store_node(self, store="true", relay="true", **kwargs): + self.store_node1 = self.setup_store_node(NODE_2, node_index=1, store=store, relay=relay, **kwargs) + + @allure.step + def setup_second_store_node(self, store="true", relay="false", **kwargs): + self.store_node2 = self.setup_store_node(NODE_2, node_index=2, store=store, relay=relay, **kwargs) + + @allure.step + def setup_additional_store_nodes(self, node_list=ADDITIONAL_NODES, **kwargs): + if node_list: + nodes = [node.strip() for node in node_list.split(",") if node] + else: + pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test") + self.additional_store_nodes = [] + for index, node in enumerate(nodes): + node = self.setup_store_node(node, node_index=index + 2, store="true", relay="false", **kwargs) + self.additional_store_nodes.append(node) + + @allure.step + def subscribe_to_pubsub_topics_via_relay(self, node=None, pubsub_topics=None): + if pubsub_topics is None: + pubsub_topics = [self.test_pubsub_topic] + if not node: + node = self.main_publishing_nodes + if isinstance(node, list): + for node in node: + node.set_relay_subscriptions(pubsub_topics) + else: + node.set_relay_subscriptions(pubsub_topics) + + @allure.step + def subscribe_to_pubsub_topics_via_filter(self, node, pubsub_topic=None, content_topic=None): + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if content_topic is None: + content_topic = [self.test_content_topic] + subscription = {"requestId": "1", "contentFilters": content_topic, "pubsubTopic": pubsub_topic} + node.set_filter_subscriptions(subscription) + + @allure.step + def publish_message_via(self, type, pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None): + self.message = self.create_message() if message is None else message + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if not sender: + sender = self.publishing_node1 + if type == "relay": + logger.debug("Relaying message") + sender.send_relay_message(self.message, pubsub_topic) + elif type == "lightpush": + payload = self.create_payload(pubsub_topic, self.message) + sender.send_light_push_message(payload) + delay(message_propagation_delay) + + @allure.step + def check_published_message_is_stored( + self, + store_node=None, + peerAddr=None, + includeData=None, + pubsubTopic=None, + contentTopics=None, + startTime=None, + endTime=None, + hashes=None, + cursor=None, + pageSize=None, + ascending=None, + store_v="v1", + **kwargs, + ): + if store_node is None: + store_node = self.store_nodes + elif not isinstance(store_node, list): + store_node = [store_node] + else: + store_node = store_node + for node in store_node: + logger.debug(f"Checking that peer {node.image} can find the stored message") + self.store_response = node.get_store_messages( + peerAddr=peerAddr, + includeData=includeData, + pubsubTopic=pubsubTopic, + contentTopics=contentTopics, + startTime=startTime, + endTime=endTime, + hashes=hashes, + cursor=cursor, + pageSize=pageSize, + ascending=ascending, + store_v=store_v, + **kwargs, + ) + + assert "messages" in self.store_response, f"Peer {node.image} has no messages key in the reponse" + assert self.store_response["messages"], f"Peer {node.image} couldn't find any messages" + assert len(self.store_response["messages"]) >= 1, "Expected at least 1 message but got none" + waku_message = WakuMessage(self.store_response["messages"][-1:]) + waku_message.assert_received_message(self.message) + + @allure.step + def check_store_returns_empty_response(self, pubsub_topic=None): + if not pubsub_topic: + pubsub_topic = self.test_pubsub_topic + try: + self.check_published_message_is_stored(pubsubTopic=pubsub_topic, pageSize=5, ascending="true") + except Exception as ex: + assert "couldn't find any messages" in str(ex) + + @allure.step + def create_message(self, **kwargs): + message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} + message.update(kwargs) + return message + + @allure.step + def create_payload(self, pubsub_topic=None, message=None, **kwargs): + if message is None: + message = self.create_message() + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + payload = {"pubsubTopic": pubsub_topic, "message": message} + payload.update(kwargs) + return payload diff --git a/tests/store/__init__.py b/tests/store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/store/test_get_messages.py b/tests/store/test_get_messages.py new file mode 100644 index 0000000000..53e3bcc9ea --- /dev/null +++ b/tests/store/test_get_messages.py @@ -0,0 +1,30 @@ +import pytest +from src.libs.custom_logger import get_custom_logger +from src.libs.common import to_base64 +from src.steps.store import StepsStore +from src.test_data import SAMPLE_INPUTS + +logger = get_custom_logger(__name__) + +# TO DO test without pubsubtopic freezes + + +class TestGetMessages(StepsStore): + @pytest.fixture(scope="function", autouse=True) + def store_functional_setup(self, store_setup): + self.setup_first_publishing_node(store="true", relay="true") + self.setup_first_store_node(store="true", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + + def test_store_messages_with_valid_payloads(self): + failed_payloads = [] + for payload in SAMPLE_INPUTS: + logger.debug(f'Running test with payload {payload["description"]}') + message = self.create_message(payload=to_base64(payload["value"])) + try: + self.publish_message_via("relay", message=message) + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true") + except Exception as e: + logger.error(f'Payload {payload["description"]} failed: {str(e)}') + failed_payloads.append(payload["description"]) + assert not failed_payloads, f"Payloads failed: {failed_payloads}" diff --git a/tests/store/test_running_nodes.py b/tests/store/test_running_nodes.py new file mode 100644 index 0000000000..9e6673bd10 --- /dev/null +++ b/tests/store/test_running_nodes.py @@ -0,0 +1,81 @@ +import pytest +from src.env_vars import NODE_2 +from src.steps.store import StepsStore + + +class TestRunningNodes(StepsStore): + def test_main_node_relay_and_store__peer_relay_and_store(self): + self.setup_first_publishing_node(store="true", relay="true") + self.setup_first_store_node(store="true", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("relay") + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + + def test_main_node_relay_and_store__peer_only_store(self): + self.setup_first_publishing_node(store="true", relay="true") + self.setup_first_store_node(store="true", relay="false") + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("relay") + if self.store_node1.is_gowaku(): + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + elif self.store_node1.is_nwaku(): + self.check_store_returns_empty_response() + + def test_main_node_relay_and_store__peer_only_relay(self): + self.setup_first_publishing_node(store="true", relay="true") + self.setup_first_store_node(store="false", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("relay") + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + + def test_main_node_relay_and_store__peer_neither_relay_nor_store(self): + self.setup_first_publishing_node(store="true", relay="true") + self.setup_first_store_node(store="false", relay="false") + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("relay") + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1087") + def test_main_node_only_relay__peer_relay_and_store(self): + self.setup_first_publishing_node(store="false", relay="true") + self.setup_first_store_node(store="true", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("relay") + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + + def test_main_node_only_relay__peer_only_store(self): + self.setup_first_publishing_node(store="false", relay="true") + self.setup_first_store_node(store="true", relay="false") + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("relay") + if self.store_node1.is_gowaku(): + try: + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + except Exception as ex: + assert "failed to negotiate protocol: protocols not supported" in str(ex) + elif self.store_node1.is_nwaku(): + self.check_store_returns_empty_response() + + def test_main_node_only_relay__peer_only_relay(self): + self.setup_first_publishing_node(store="false", relay="true") + self.setup_first_store_node(store="false", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("relay") + try: + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + except Exception as ex: + assert "failed to negotiate protocol: protocols not supported" in str(ex) or "PEER_DIAL_FAILURE" in str(ex) + + def test_store_lightpushed_message(self): + self.setup_first_publishing_node(store="true", relay="true", lightpush="true") + self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0]) + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("lightpush", sender=self.store_node1) + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + + def test_store_with_filter(self): + self.setup_first_publishing_node(store="true", relay="true", filter="true") + self.setup_first_store_node(store="false", relay="false", filter="true") + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message_via("relay") + self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")