Relay Subscribe (#5)

* github actions report summary

* use env instead of inputs

* multiple nodes tests

* fix warm up

* fix warm up

* small fix after CI run

* small fix after CI run 2

* add new multi-node test

* self review

* relay subscribe tests

* small fix

* new tests

* fix typo
This commit is contained in:
Florin Barbu 2023-11-24 14:04:24 +02:00 committed by GitHub
parent a6a0440312
commit f8db76ab20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 172 additions and 29 deletions

View File

@ -8,11 +8,6 @@ logger = get_custom_logger(__name__)
class BaseClient(ABC): class BaseClient(ABC):
# The retry decorator is applied to handle transient errors gracefully. This is particularly
# useful when running tests in parallel, where occasional network-related errors such as
# connection drops, timeouts, or temporary unavailability of a service can occur. Retrying
# ensures that such intermittent issues don't cause the tests to fail outright.
@retry(stop=stop_after_delay(0.5), wait=wait_fixed(0.1), reraise=True)
def make_request(self, method, url, headers=None, data=None): def make_request(self, method, url, headers=None, data=None):
logger.debug(f"{method.upper()} call: {url} with payload: {data}") logger.debug(f"{method.upper()} call: {url} with payload: {data}")
response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT)
@ -20,10 +15,10 @@ class BaseClient(ABC):
response.raise_for_status() response.raise_for_status()
except requests.HTTPError as http_err: except requests.HTTPError as http_err:
logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}") logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}")
raise raise Exception(f"Error: {http_err} with response: {response.content}")
except Exception as err: except Exception as err:
logger.error(f"An error occurred: {err}. Response content: {response.content}") logger.error(f"An error occurred: {err}. Response content: {response.content}")
raise raise Exception(f"Error: {err} with response: {response.content}")
else: else:
logger.info(f"Response status code: {response.status_code}. Response content: {response.content}") logger.info(f"Response status code: {response.status_code}. Response content: {response.content}")
return response return response
@ -36,6 +31,10 @@ class BaseClient(ABC):
def set_subscriptions(self, pubsub_topics): def set_subscriptions(self, pubsub_topics):
pass pass
@abstractmethod
def delete_subscriptions(self, pubsub_topics):
pass
@abstractmethod @abstractmethod
def send_message(self, message, pubsub_topic): def send_message(self, message, pubsub_topic):
pass pass

View File

@ -23,6 +23,9 @@ class REST(BaseClient):
def set_subscriptions(self, pubsub_topics): def set_subscriptions(self, pubsub_topics):
return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics)) return self.rest_call("post", "relay/v1/subscriptions", json.dumps(pubsub_topics))
def delete_subscriptions(self, pubsub_topics):
return self.rest_call("delete", "relay/v1/subscriptions", json.dumps(pubsub_topics))
def send_message(self, message, pubsub_topic): def send_message(self, message, pubsub_topic):
return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message)) return self.rest_call("post", f"relay/v1/messages/{quote(pubsub_topic, safe='')}", json.dumps(message))

View File

@ -27,6 +27,12 @@ class RPC(BaseClient):
else: else:
return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics]) return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics])
def delete_subscriptions(self, pubsub_topics):
if "nwaku" in self._image_name:
return self.rpc_call("delete_waku_v2_relay_v1_subscriptions", [pubsub_topics])
else:
return self.rpc_call("delete_waku_v2_relay_v1_subscription", [pubsub_topics])
def send_message(self, message, pubsub_topic): def send_message(self, message, pubsub_topic):
return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, message]) return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, message])

View File

@ -53,7 +53,6 @@ class WakuNode:
"rpc-address": "0.0.0.0", "rpc-address": "0.0.0.0",
"rest-address": "0.0.0.0", "rest-address": "0.0.0.0",
"nat": f"extip:{self._ext_ip}", "nat": f"extip:{self._ext_ip}",
"pubsub-topic": DEFAULT_PUBSUB_TOPIC,
} }
if "go-waku" in self._docker_manager.image: if "go-waku" in self._docker_manager.image:
@ -110,15 +109,16 @@ class WakuNode:
def info(self): def info(self):
return self._api.info() return self._api.info()
def set_subscriptions(self, pubsub_topics=None): def set_subscriptions(self, pubsub_topics):
if not pubsub_topics:
pubsub_topics = [DEFAULT_PUBSUB_TOPIC]
return self._api.set_subscriptions(pubsub_topics) return self._api.set_subscriptions(pubsub_topics)
def send_message(self, message, pubsub_topic=DEFAULT_PUBSUB_TOPIC): def delete_subscriptions(self, pubsub_topics):
return self._api.delete_subscriptions(pubsub_topics)
def send_message(self, message, pubsub_topic):
return self._api.send_message(message, pubsub_topic) return self._api.send_message(message, pubsub_topic)
def get_messages(self, pubsub_topic=DEFAULT_PUBSUB_TOPIC): def get_messages(self, pubsub_topic):
return self._api.get_messages(pubsub_topic) return self._api.get_messages(pubsub_topic)
@property @property

View File

@ -23,8 +23,11 @@ class StepsRelay:
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY) self.node1.start(relay="true", discv5_discovery="true", peer_exchange="true", nodekey=NODEKEY)
try:
self.enr_uri = self.node1.info()["enrUri"] self.enr_uri = self.node1.info()["enrUri"]
self.node2 = WakuNode(NODE_2, f"node1_{request.cls.test_id}") except Exception as ex:
raise AttributeError(f"Could not find enrUri in the info call because of error: {str(ex)}")
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true") self.node2.start(relay="true", discv5_discovery="true", discv5_bootstrap_node=self.enr_uri, peer_exchange="true")
self.main_nodes = [self.node1, self.node2] self.main_nodes = [self.node1, self.node2]
self.optional_nodes = [] self.optional_nodes = []
@ -62,29 +65,42 @@ class StepsRelay:
# this method should be used only for the tests that use the warm_up fixture # this method should be used only for the tests that use the warm_up fixture
# otherwise use wait_for_published_message_to_reach_peer # otherwise use wait_for_published_message_to_reach_peer
@allure.step @allure.step
def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None): def check_published_message_reaches_peer(self, message=None, pubsub_topic=None, message_propagation_delay=0.1, sender=None, peer_list=None):
if message is None:
message = self.create_message()
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
if not sender: if not sender:
sender = self.node1 sender = self.node1
if not peer_list: if not peer_list:
peer_list = self.main_nodes + self.optional_nodes peer_list = self.main_nodes + self.optional_nodes
sender.send_message(message, pubsub_topic or self.test_pubsub_topic)
sender.send_message(message, pubsub_topic)
delay(message_propagation_delay) delay(message_propagation_delay)
for index, peer in enumerate(peer_list): for index, peer in enumerate(peer_list):
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
get_messages_response = peer.get_messages(pubsub_topic or self.test_pubsub_topic) get_messages_response = peer.get_messages(pubsub_topic)
assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages" assert get_messages_response, f"Peer NODE_{index}:{peer.image} couldn't find any messages"
received_message = message_rpc_response_schema.load(get_messages_response[0]) received_message = message_rpc_response_schema.load(get_messages_response[0])
self.assert_received_message(message, received_message) self.assert_received_message(message, received_message)
@allure.step
def check_publish_without_subscription(self, pubsub_topic):
try:
self.node1.send_message(self.create_message(), pubsub_topic)
raise AssertionError("Publish with no subscription worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
# we need much bigger timeout in CI because we run tests in parallel there and the machine itself is slower # we need much bigger timeout in CI because we run tests in parallel there and the machine itself is slower
@allure.step @allure.step
def wait_for_published_message_to_reach_peer( def wait_for_published_message_to_reach_peer(
self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, sender=None, peer_list=None self, timeout_duration=120 if RUNNING_IN_CI else 20, time_between_retries=1, pubsub_topic=None, sender=None, peer_list=None
): ):
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True) @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True)
def check_peer_connection(): def check_peer_connection():
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
self.check_published_message_reaches_peer(message, sender=sender, peer_list=peer_list) self.check_published_message_reaches_peer(message, pubsub_topic=pubsub_topic, sender=sender, peer_list=peer_list)
check_peer_connection() check_peer_connection()
@ -114,6 +130,11 @@ class StepsRelay:
for node in node_list: for node in node_list:
node.set_subscriptions(pubsub_topic_list) node.set_subscriptions(pubsub_topic_list)
@allure.step
def delete_subscriptions_on_nodes(self, node_list, pubsub_topic_list):
for node in node_list:
node.delete_subscriptions(pubsub_topic_list)
def create_message(self, **kwargs): def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs) message.update(kwargs)

View File

@ -77,6 +77,8 @@ VALID_PUBSUB_TOPICS = [
"test", "test",
] ]
INVALID_PUBSUB_TOPICS = ["/test/2/rs/18/1", ("/waku/2/rs/18/1"), {"pubsub_topic": "/waku/3/rs/18/1"}, True, 12345678, [["/waku/2/rs/18/1"]]]
SAMPLE_TIMESTAMPS = [ SAMPLE_TIMESTAMPS = [
{"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku", "gowaku"]}, {"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku", "gowaku"]},

View File

@ -5,15 +5,15 @@ from src.steps.relay import StepsRelay
@pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes") @pytest.mark.usefixtures("setup_main_relay_nodes", "setup_optional_relay_nodes", "subscribe_main_relay_nodes")
class TestMultipleNodes(StepsRelay): class TestMultipleNodes(StepsRelay):
def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): def test_first_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
self.check_published_message_reaches_peer(self.create_message()) self.check_published_message_reaches_peer()
def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up): def test_last_node_to_start_publishes(self, subscribe_optional_relay_nodes, relay_warm_up):
self.check_published_message_reaches_peer(self.create_message(), sender=self.optional_nodes[-1]) self.check_published_message_reaches_peer(sender=self.optional_nodes[-1])
def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self): def test_optional_nodes_not_subscribed_to_same_pubsub_topic(self):
self.wait_for_published_message_to_reach_peer(peer_list=self.main_nodes) self.wait_for_published_message_to_reach_peer(peer_list=self.main_nodes)
try: try:
self.check_published_message_reaches_peer(self.create_message(), peer_list=self.optional_nodes) self.check_published_message_reaches_peer(peer_list=self.optional_nodes)
raise AssertionError("Non subscribed nodes received the message!!") raise AssertionError("Non subscribed nodes received the message!!")
except Exception as ex: except Exception as ex:
assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found" assert "Not Found" in str(ex), "Expected 404 Not Found when the message is not found"

View File

@ -97,7 +97,7 @@ class TestRelayPublish(StepsRelay):
for pubsub_topic in VALID_PUBSUB_TOPICS: for pubsub_topic in VALID_PUBSUB_TOPICS:
logger.debug(f"Running test with pubsub topic {pubsub_topic}") logger.debug(f"Running test with pubsub topic {pubsub_topic}")
try: try:
self.check_published_message_reaches_peer(self.create_message(), pubsub_topic=pubsub_topic) self.check_published_message_reaches_peer(pubsub_topic=pubsub_topic)
except Exception as e: except Exception as e:
logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}") logger.error(f"PubusubTopic {pubsub_topic} failed: {str(e)}")
failed_pubsub_topics.append(pubsub_topic) failed_pubsub_topics.append(pubsub_topic)
@ -110,9 +110,9 @@ class TestRelayPublish(StepsRelay):
messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1]) messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1])
assert not messages, "Message was retrieved on wrong pubsub_topic" assert not messages, "Message was retrieved on wrong pubsub_topic"
def test_publish_on_unsubscribed_pubsub_topic(self): def test_publish_on_non_subscribed_pubsub_topic(self):
try: try:
self.check_published_message_reaches_peer(self.create_message(), pubsub_topic="/waku/2/rs/19/1") self.check_published_message_reaches_peer(pubsub_topic="/waku/2/rs/19/1")
raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!") raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!")
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
@ -209,7 +209,7 @@ class TestRelayPublish(StepsRelay):
self.assert_received_message(message, received_message) self.assert_received_message(message, received_message)
def test_publish_after_node_pauses_and_pauses(self): def test_publish_after_node_pauses_and_pauses(self):
self.check_published_message_reaches_peer(self.create_message()) self.check_published_message_reaches_peer()
self.node1.pause() self.node1.pause()
self.node1.unpause() self.node1.unpause()
self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1"))) self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1")))
@ -218,14 +218,16 @@ class TestRelayPublish(StepsRelay):
self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2"))) self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2")))
def test_publish_after_node1_restarts(self): def test_publish_after_node1_restarts(self):
self.check_published_message_reaches_peer(self.create_message()) self.check_published_message_reaches_peer()
self.node1.restart() self.node1.restart()
self.node1.ensure_ready()
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_peer()
def test_publish_after_node2_restarts(self): def test_publish_after_node2_restarts(self):
self.check_published_message_reaches_peer(self.create_message()) self.check_published_message_reaches_peer()
self.node2.restart() self.node2.restart()
self.node2.ensure_ready()
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic]) self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer() self.wait_for_published_message_to_reach_peer()

View File

@ -0,0 +1,110 @@
import pytest
from src.env_vars import DEFAULT_PUBSUB_TOPIC
from src.libs.custom_logger import get_custom_logger
from src.steps.relay import StepsRelay
from src.test_data import INVALID_PUBSUB_TOPICS, VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__)
@pytest.mark.usefixtures("setup_main_relay_nodes")
class TestRelaySubscribe(StepsRelay):
def test_no_subscription(self):
self.check_publish_without_subscription(self.test_pubsub_topic)
def test_subscribe_to_single_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer()
def test_subscribe_to_already_existing_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer()
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.check_published_message_reaches_peer()
def test_subscribe_with_multiple_overlapping_pubsub_topics(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3])
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[1:4])
for pubsub_topic in VALID_PUBSUB_TOPICS[:4]:
self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic)
def test_subscribe_with_empty_pubsub_topic_list(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [])
def test_subscribe_with_invalid_pubsub_topic_format(self):
success_pubsub_topics = []
for pubsub_topic in INVALID_PUBSUB_TOPICS:
logger.debug(f"Running test with payload {pubsub_topic}")
try:
self.ensure_subscriptions_on_nodes(self.main_nodes, pubsub_topic)
success_pubsub_topics.append(pubsub_topic)
except Exception as ex:
assert "Bad Request" in str(ex)
assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}"
def test_unsubscribe_from_single_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer()
self.delete_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.check_publish_without_subscription(self.test_pubsub_topic)
def test_unsubscribe_from_all_pubsub_topics(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
for pubsub_topic in VALID_PUBSUB_TOPICS:
self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic)
self.delete_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
for pubsub_topic in VALID_PUBSUB_TOPICS:
self.check_publish_without_subscription(pubsub_topic)
def test_unsubscribe_from_some_pubsub_topics(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS)
for pubsub_topic in VALID_PUBSUB_TOPICS:
self.wait_for_published_message_to_reach_peer(pubsub_topic=pubsub_topic)
self.delete_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS[:3])
for pubsub_topic in VALID_PUBSUB_TOPICS[:3]:
self.check_publish_without_subscription(pubsub_topic)
for pubsub_topic in VALID_PUBSUB_TOPICS[3:]:
self.check_published_message_reaches_peer(pubsub_topic=pubsub_topic)
def test_unsubscribe_from_non_existing_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer()
try:
self.delete_subscriptions_on_nodes(self.main_nodes, ["/waku/2/rs/999/99"])
if self.node1.is_nwaku():
pass # nwaku doesn't fail in this case
elif self.node1.is_gowaku():
raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!")
else:
raise Exception("Not implemented")
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
self.check_published_message_reaches_peer()
def test_unsubscribe_with_invalid_pubsub_topic_format(self):
success_pubsub_topics = []
for pubsub_topic in INVALID_PUBSUB_TOPICS:
logger.debug(f"Running test with payload {pubsub_topic}")
try:
self.delete_subscriptions_on_nodes(self.main_nodes, pubsub_topic)
success_pubsub_topics.append(pubsub_topic)
except Exception as ex:
assert "Bad Request" in str(ex)
assert not success_pubsub_topics, f"Invalid Pubsub Topics that didn't failed: {success_pubsub_topics}"
def test_resubscribe_to_unsubscribed_pubsub_topic(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer()
self.delete_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.check_publish_without_subscription(self.test_pubsub_topic)
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.check_published_message_reaches_peer()
def test_publish_on_default_pubsub_topic_without_beeing_subscribed_to_it(self):
self.ensure_subscriptions_on_nodes(self.main_nodes, [self.test_pubsub_topic])
self.wait_for_published_message_to_reach_peer()
try:
self.check_published_message_reaches_peer(pubsub_topic=DEFAULT_PUBSUB_TOPIC)
raise AssertionError(f"Publish on {DEFAULT_PUBSUB_TOPIC} with beeing subscribed to it worked!!!")
except Exception as ex:
assert "Not Found" in str(ex)