filter push tests (#9)
* filter push tests * filter push tests * small adjustments * test commit from new PC
This commit is contained in:
parent
d7be7e9504
commit
ea6a48a80a
|
@ -1,27 +1,9 @@
|
||||||
## Problem
|
## PR Details
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
Describe in details the problem or scenario that this PR is fixing.
|
Describe in details the feature or scenario that this PR is automating tests for.
|
||||||
|
|
||||||
If this is a feature addition or change, then focus on the WHY you are making the change.
|
|
||||||
|
|
||||||
If this is a bug fix, please describe why the old behavior was problematic.
|
|
||||||
-->
|
-->
|
||||||
|
|
||||||
## Solution
|
## Issues reported:
|
||||||
|
|
||||||
<!-- describe the new behavior -->
|
<!-- Issues found while working for this PR -->
|
||||||
|
|
||||||
## Diffs
|
|
||||||
|
|
||||||
<!--
|
|
||||||
Are there any diffs between implementations (nwaku vs gowaku)?
|
|
||||||
If yes they should be documented here: https://www.notion.so/Nwaku-vs-Gowaku-vs-Jswaku-diffs-b3e0e8f1e6cd4c6d9855b0c3c4634bc5
|
|
||||||
-->
|
|
||||||
|
|
||||||
## Notes
|
|
||||||
|
|
||||||
<!-- Remove items that are not relevant -->
|
|
||||||
|
|
||||||
- Resolves <issue number>
|
|
||||||
- Related to <link to specs>
|
|
||||||
|
|
|
@ -67,6 +67,7 @@ class WakuNode:
|
||||||
"min-relay-peers-to-publish": "1",
|
"min-relay-peers-to-publish": "1",
|
||||||
"legacy-filter": "false",
|
"legacy-filter": "false",
|
||||||
"log-level": "DEBUG",
|
"log-level": "DEBUG",
|
||||||
|
"rest-filter-cache-capacity": "50",
|
||||||
}
|
}
|
||||||
default_args.update(go_waku_args)
|
default_args.update(go_waku_args)
|
||||||
elif self.is_nwaku():
|
elif self.is_nwaku():
|
||||||
|
|
|
@ -92,7 +92,7 @@ class StepsFilter:
|
||||||
delay(message_propagation_delay)
|
delay(message_propagation_delay)
|
||||||
for index, peer in enumerate(peer_list):
|
for index, peer in enumerate(peer_list):
|
||||||
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
|
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
|
||||||
get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topics=pubsub_topic, node=peer)
|
get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer)
|
||||||
assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages"
|
assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages"
|
||||||
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
|
||||||
waku_message = WakuMessage(get_messages_response)
|
waku_message = WakuMessage(get_messages_response)
|
||||||
|
@ -175,8 +175,16 @@ class StepsFilter:
|
||||||
node = self.node2
|
node = self.node2
|
||||||
ping_sub_response = node.ping_filter_subscriptions(request_id)
|
ping_sub_response = node.ping_filter_subscriptions(request_id)
|
||||||
assert ping_sub_response["requestId"] == request_id
|
assert ping_sub_response["requestId"] == request_id
|
||||||
|
assert ping_sub_response["statusCode"] == 0
|
||||||
assert ping_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed
|
assert ping_sub_response["statusDesc"] in ["OK", ""] # until https://github.com/waku-org/nwaku/issues/2286 is fixed
|
||||||
|
|
||||||
|
def ping_without_filter_subscription(self, node=None):
|
||||||
|
try:
|
||||||
|
self.ping_filter_subscriptions(str(uuid4()), node=node)
|
||||||
|
raise AssertionError("Ping without any subscription worked")
|
||||||
|
except Exception as ex:
|
||||||
|
assert "peer has no subscriptions" in str(ex)
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def add_new_relay_subscription(self, pubsub_topics, node=None):
|
def add_new_relay_subscription(self, pubsub_topics, node=None):
|
||||||
if node is None:
|
if node is None:
|
||||||
|
@ -184,11 +192,11 @@ class StepsFilter:
|
||||||
self.node1.set_relay_subscriptions(pubsub_topics)
|
self.node1.set_relay_subscriptions(pubsub_topics)
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def get_filter_messages(self, content_topic, pubsub_topics=None, node=None):
|
def get_filter_messages(self, content_topic, pubsub_topic=None, node=None):
|
||||||
if node is None:
|
if node is None:
|
||||||
node = self.node2
|
node = self.node2
|
||||||
if node.is_gowaku():
|
if node.is_gowaku():
|
||||||
return node.get_filter_messages(content_topic, pubsub_topics)
|
return node.get_filter_messages(content_topic, pubsub_topic)
|
||||||
elif node.is_nwaku():
|
elif node.is_nwaku():
|
||||||
return node.get_filter_messages(content_topic)
|
return node.get_filter_messages(content_topic)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
import pytest
|
||||||
|
from src.libs.common import delay, to_base64
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
from src.test_data import SAMPLE_INPUTS, SAMPLE_TIMESTAMPS
|
||||||
|
from src.steps.filter import StepsFilter
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# here we will also implicitly test filter push, see: https://rfc.vac.dev/spec/12/#messagepush
|
||||||
|
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "subscribe_main_nodes")
|
||||||
|
class TestFilterGetMessages(StepsFilter):
|
||||||
|
def test_filter_get_message_with_valid_payloads(self):
|
||||||
|
failed_payloads = []
|
||||||
|
for payload in SAMPLE_INPUTS:
|
||||||
|
logger.debug(f'Running test with payload {payload["description"]}')
|
||||||
|
message = self.create_message(payload=to_base64(payload["value"]))
|
||||||
|
try:
|
||||||
|
self.check_published_message_reaches_filter_peer(message)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f'Payload {payload["description"]} failed: {str(e)}')
|
||||||
|
failed_payloads.append(payload["description"])
|
||||||
|
assert not failed_payloads, f"Payloads failed: {failed_payloads}"
|
||||||
|
|
||||||
|
def test_filter_get_message_with_valid_timestamps(self):
|
||||||
|
failed_timestamps = []
|
||||||
|
for timestamp in SAMPLE_TIMESTAMPS:
|
||||||
|
if self.node1.type() in timestamp["valid_for"]:
|
||||||
|
logger.debug(f'Running test with timestamp {timestamp["description"]}')
|
||||||
|
message = self.create_message(timestamp=timestamp["value"])
|
||||||
|
try:
|
||||||
|
self.check_published_message_reaches_filter_peer(message)
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error(f'Timestamp {timestamp["description"]} failed: {str(ex)}')
|
||||||
|
failed_timestamps.append(timestamp)
|
||||||
|
assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}"
|
||||||
|
|
||||||
|
def test_filter_get_message_with_version(self):
|
||||||
|
self.check_published_message_reaches_filter_peer(self.create_message(version=10))
|
||||||
|
|
||||||
|
def test_filter_get_message_with_meta(self):
|
||||||
|
self.check_published_message_reaches_filter_peer(self.create_message(meta=to_base64(self.test_payload)))
|
||||||
|
|
||||||
|
def test_filter_get_message_with_ephemeral(self):
|
||||||
|
failed_ephemeral = []
|
||||||
|
for ephemeral in [True, False]:
|
||||||
|
logger.debug(f"Running test with Ephemeral {ephemeral}")
|
||||||
|
try:
|
||||||
|
self.check_published_message_reaches_filter_peer(self.create_message(ephemeral=ephemeral))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Massage with Ephemeral {ephemeral} failed: {str(e)}")
|
||||||
|
failed_ephemeral.append(ephemeral)
|
||||||
|
assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}"
|
||||||
|
|
||||||
|
def test_filter_get_message_with_rate_limit_proof(self):
|
||||||
|
rate_limit_proof = {
|
||||||
|
"proof": to_base64("proofData"),
|
||||||
|
"epoch": to_base64("epochData"),
|
||||||
|
"nullifier": to_base64("nullifierData"),
|
||||||
|
}
|
||||||
|
self.check_published_message_reaches_filter_peer(self.create_message(rateLimitProof=rate_limit_proof))
|
||||||
|
|
||||||
|
def test_filter_get_message_with_extra_field(self):
|
||||||
|
try:
|
||||||
|
self.check_published_message_reaches_filter_peer(self.create_message(extraField="extraValue"))
|
||||||
|
if self.node1.is_nwaku():
|
||||||
|
raise AssertionError("Relay publish with extra field worked!!!")
|
||||||
|
elif self.node1.is_gowaku():
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
|
except Exception as ex:
|
||||||
|
assert "Bad Request" in str(ex)
|
||||||
|
|
||||||
|
def test_filter_get_message_duplicate_message(self):
|
||||||
|
message = self.create_message()
|
||||||
|
self.check_published_message_reaches_filter_peer(message)
|
||||||
|
try:
|
||||||
|
self.check_published_message_reaches_filter_peer(message)
|
||||||
|
raise AssertionError("Duplicate message was retrieved twice")
|
||||||
|
except Exception as ex:
|
||||||
|
assert "couldn't find any messages" in str(ex)
|
||||||
|
|
||||||
|
def test_filter_get_message_after_node_pauses_and_pauses(self):
|
||||||
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
self.node1.pause()
|
||||||
|
self.node1.unpause()
|
||||||
|
self.check_published_message_reaches_filter_peer(self.create_message(payload=to_base64("M1")))
|
||||||
|
self.node2.pause()
|
||||||
|
self.node2.unpause()
|
||||||
|
self.check_published_message_reaches_filter_peer(self.create_message(payload=to_base64("M2")))
|
||||||
|
|
||||||
|
def test_filter_get_message_after_node1_restarts(self):
|
||||||
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
self.node1.restart()
|
||||||
|
self.node1.ensure_ready()
|
||||||
|
delay(2)
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
|
||||||
|
def test_filter_get_message_after_node2_restarts(self):
|
||||||
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
self.node2.restart()
|
||||||
|
self.node2.ensure_ready()
|
||||||
|
delay(2)
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
|
||||||
|
def test_filter_get_50_messages(self):
|
||||||
|
num_messages = 50
|
||||||
|
for index in range(num_messages):
|
||||||
|
message = self.create_message(payload=to_base64(f"M_{index}"))
|
||||||
|
self.node1.send_relay_message(message, self.test_pubsub_topic)
|
||||||
|
delay(1)
|
||||||
|
filter_messages = self.get_filter_messages(content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, node=self.node2)
|
||||||
|
assert len(filter_messages) == num_messages
|
||||||
|
for index, message in enumerate(filter_messages):
|
||||||
|
assert message["payload"] == to_base64(
|
||||||
|
f"M_{index}"
|
||||||
|
), f'Incorrect payload at index: {index}. Published {to_base64(f"M_{index}")} Received {message["payload"]}'
|
||||||
|
|
||||||
|
def test_filter_get_message_with_1MB_payload(self):
|
||||||
|
payload_length = 1024 * 1023
|
||||||
|
message = self.create_message(payload=to_base64("a" * (payload_length)))
|
||||||
|
self.check_published_message_reaches_filter_peer(message, message_propagation_delay=2)
|
|
@ -19,3 +19,32 @@ class TestFilterMultipleNodes(StepsFilter):
|
||||||
self.subscribe_optional_filter_nodes([self.second_content_topic])
|
self.subscribe_optional_filter_nodes([self.second_content_topic])
|
||||||
self.check_published_message_reaches_filter_peer(peer_list=self.main_nodes)
|
self.check_published_message_reaches_filter_peer(peer_list=self.main_nodes)
|
||||||
self.check_publish_without_filter_subscription(peer_list=self.optional_nodes)
|
self.check_publish_without_filter_subscription(peer_list=self.optional_nodes)
|
||||||
|
|
||||||
|
def test_filter_get_message_while_one_peer_is_paused(self):
|
||||||
|
self.setup_optional_filter_nodes()
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
|
self.subscribe_optional_filter_nodes([self.test_content_topic])
|
||||||
|
self.check_published_message_reaches_filter_peer()
|
||||||
|
relay_message1 = self.create_message(contentTopic=self.test_content_topic)
|
||||||
|
relay_message2 = self.create_message(contentTopic=self.test_content_topic)
|
||||||
|
self.node2.pause()
|
||||||
|
self.node1.send_relay_message(relay_message1, self.test_pubsub_topic)
|
||||||
|
self.node2.unpause()
|
||||||
|
self.node1.send_relay_message(relay_message2, self.test_pubsub_topic)
|
||||||
|
filter_messages = self.get_filter_messages(content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, node=self.node2)
|
||||||
|
assert len(filter_messages) == 2, "Both messages should've been returned"
|
||||||
|
|
||||||
|
def test_filter_get_message_after_one_peer_was_stopped(self):
|
||||||
|
self.setup_optional_filter_nodes()
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
|
self.subscribe_optional_filter_nodes([self.test_content_topic])
|
||||||
|
self.check_published_message_reaches_filter_peer(peer_list=self.main_nodes + self.optional_nodes)
|
||||||
|
self.node2.stop()
|
||||||
|
self.check_published_message_reaches_filter_peer(peer_list=self.optional_nodes)
|
||||||
|
|
||||||
|
def test_ping_only_some_nodes_have_subscriptions(self):
|
||||||
|
self.setup_optional_filter_nodes()
|
||||||
|
self.wait_for_subscriptions_on_main_nodes([self.test_content_topic])
|
||||||
|
self.ping_filter_subscriptions("1", node=self.node2)
|
||||||
|
for node in self.optional_nodes:
|
||||||
|
self.ping_without_filter_subscription(node=node)
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
from uuid import uuid4
|
||||||
|
import pytest
|
||||||
|
from src.steps.filter import StepsFilter
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node")
|
||||||
|
class TestFilterPing(StepsFilter):
|
||||||
|
def test_filter_ping_on_subscribed_peer(self, subscribe_main_nodes):
|
||||||
|
self.ping_filter_subscriptions(str(uuid4()))
|
||||||
|
|
||||||
|
def test_filter_ping_on_peer_without_subscription(self):
|
||||||
|
self.ping_without_filter_subscription()
|
||||||
|
|
||||||
|
def test_filter_ping_on_unsubscribed_peer(self, subscribe_main_nodes):
|
||||||
|
self.ping_filter_subscriptions(str(uuid4()))
|
||||||
|
self.delete_filter_subscription({"requestId": "12345", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
|
||||||
|
self.ping_without_filter_subscription()
|
||||||
|
|
||||||
|
def test_filter_ping_without_request_id(self, subscribe_main_nodes):
|
||||||
|
try:
|
||||||
|
self.ping_filter_subscriptions("")
|
||||||
|
if self.node2.is_nwaku():
|
||||||
|
pass
|
||||||
|
elif self.node2.is_gowaku():
|
||||||
|
raise Exception("Ping without request id worked!!")
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
|
except Exception as ex:
|
||||||
|
assert "bad request id" in str(ex)
|
|
@ -17,3 +17,19 @@ class TestRelayMultipleNodes(StepsRelay):
|
||||||
raise AssertionError("Non subscribed nodes received the message!!")
|
raise AssertionError("Non subscribed nodes received the message!!")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found"
|
assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found"
|
||||||
|
|
||||||
|
def test_relay_get_message_while_one_peer_is_paused(self, subscribe_optional_relay_nodes, relay_warm_up):
|
||||||
|
self.check_published_message_reaches_relay_peer()
|
||||||
|
relay_message1 = self.create_message(contentTopic=self.test_content_topic)
|
||||||
|
relay_message2 = self.create_message(contentTopic=self.test_content_topic)
|
||||||
|
self.node2.pause()
|
||||||
|
self.node1.send_relay_message(relay_message1, self.test_pubsub_topic)
|
||||||
|
self.node2.unpause()
|
||||||
|
self.node1.send_relay_message(relay_message2, self.test_pubsub_topic)
|
||||||
|
messages = self.node2.get_relay_messages(self.test_pubsub_topic)
|
||||||
|
assert len(messages) == 2, "Both messages should've been returned"
|
||||||
|
|
||||||
|
def test_relay_get_message_after_one_peer_was_stopped(self, subscribe_optional_relay_nodes, relay_warm_up):
|
||||||
|
self.check_published_message_reaches_relay_peer(peer_list=self.main_nodes + self.optional_nodes)
|
||||||
|
self.node2.stop()
|
||||||
|
self.check_published_message_reaches_relay_peer(peer_list=self.optional_nodes)
|
||||||
|
|
|
@ -232,7 +232,7 @@ class TestRelayPublish(StepsRelay):
|
||||||
self.node1.ensure_ready()
|
self.node1.ensure_ready()
|
||||||
delay(2)
|
delay(2)
|
||||||
self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
self.ensure_relay_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
|
||||||
self.wait_for_published_message_to_reach_relay_peer()
|
self.check_published_message_reaches_relay_peer()
|
||||||
|
|
||||||
def test_publish_after_node2_restarts(self):
|
def test_publish_after_node2_restarts(self):
|
||||||
self.check_published_message_reaches_relay_peer()
|
self.check_published_message_reaches_relay_peer()
|
||||||
|
|
Loading…
Reference in New Issue