mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-07 00:13:06 +00:00
finishing touches
This commit is contained in:
parent
5b803ba0c3
commit
ca94c2b44e
@ -21,7 +21,7 @@ NETWORK_NAME = get_env_var("NETWORK_NAME", "waku")
|
||||
SUBNET = get_env_var("SUBNET", "172.18.0.0/16")
|
||||
IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24")
|
||||
GATEWAY = get_env_var("GATEWAY", "172.18.0.1")
|
||||
DEFAULT_PUBSUBTOPIC = get_env_var("DEFAULT_PUBSUBTOPIC", "/waku/2/default-waku/proto")
|
||||
DEFAULT_PUBSUB_TOPIC = get_env_var("DEFAULT_PUBSUB_TOPIC", "/waku/2/default-waku/proto")
|
||||
PROTOCOL = get_env_var("PROTOCOL", "REST")
|
||||
RUNNING_IN_CI = get_env_var("CI")
|
||||
NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68")
|
||||
|
||||
@ -5,7 +5,7 @@ from tenacity import retry, stop_after_delay, wait_fixed
|
||||
from src.node.api_clients.rpc import RPC
|
||||
from src.node.api_clients.rest import REST
|
||||
from src.node.docker_mananger import DockerManager
|
||||
from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL
|
||||
from src.env_vars import DOCKER_LOG_DIR, DEFAULT_PUBSUB_TOPIC, PROTOCOL
|
||||
from src.data_storage import DS
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
@ -53,7 +53,7 @@ class WakuNode:
|
||||
"rpc-address": "0.0.0.0",
|
||||
"rest-address": "0.0.0.0",
|
||||
"nat": f"extip:{self._ext_ip}",
|
||||
"pubsub-topic": DEFAULT_PUBSUBTOPIC,
|
||||
"pubsub-topic": DEFAULT_PUBSUB_TOPIC,
|
||||
}
|
||||
|
||||
if "go-waku" in self._docker_manager.image:
|
||||
@ -110,13 +110,13 @@ class WakuNode:
|
||||
def info(self):
|
||||
return self._api.info()
|
||||
|
||||
def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUBTOPIC]):
|
||||
def set_subscriptions(self, pubsub_topics=[DEFAULT_PUBSUB_TOPIC]):
|
||||
return self._api.set_subscriptions(pubsub_topics)
|
||||
|
||||
def send_message(self, message, pubsub_topic=DEFAULT_PUBSUBTOPIC):
|
||||
def send_message(self, message, pubsub_topic=DEFAULT_PUBSUB_TOPIC):
|
||||
return self._api.send_message(message, pubsub_topic)
|
||||
|
||||
def get_messages(self, pubsub_topic=DEFAULT_PUBSUBTOPIC):
|
||||
def get_messages(self, pubsub_topic=DEFAULT_PUBSUB_TOPIC):
|
||||
return self._api.get_messages(pubsub_topic)
|
||||
|
||||
@property
|
||||
|
||||
@ -23,7 +23,6 @@ class StepsRelay:
|
||||
self.test_pubsub_topic = "/waku/2/rs/18/1"
|
||||
self.test_content_topic = "/test/1/waku-relay/proto"
|
||||
self.test_payload = "Relay works!!"
|
||||
self.test_message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
self.node1.set_subscriptions([self.test_pubsub_topic])
|
||||
self.node2.set_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
@ -40,10 +39,8 @@ class StepsRelay:
|
||||
if not pubsub_topic:
|
||||
pubsub_topic = self.test_pubsub_topic
|
||||
self.node1.send_message(message, pubsub_topic)
|
||||
|
||||
delay(message_propagation_delay)
|
||||
get_messages_response = self.node2.get_messages(pubsub_topic)
|
||||
logger.debug("Got reponse from remote peer %s", get_messages_response)
|
||||
assert get_messages_response, "Peer node couldn't find any messages"
|
||||
received_message = message_rpc_response_schema.load(get_messages_response[0])
|
||||
self.assert_received_message(message, received_message)
|
||||
@ -54,18 +51,18 @@ class StepsRelay:
|
||||
|
||||
assert received_message.payload == sent_message["payload"], assert_fail_message("payload")
|
||||
assert received_message.contentTopic == sent_message["contentTopic"], assert_fail_message("contentTopic")
|
||||
if "timestamp" in sent_message and sent_message["timestamp"]:
|
||||
if "timestamp" in sent_message and sent_message["timestamp"] is not None:
|
||||
if isinstance(sent_message["timestamp"], float):
|
||||
assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp")
|
||||
else:
|
||||
assert str(received_message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp")
|
||||
if "version" in sent_message and sent_message["version"]:
|
||||
if "version" in sent_message:
|
||||
assert str(received_message.version) == str(sent_message["version"]), assert_fail_message("version")
|
||||
if "meta" in sent_message and sent_message["meta"]:
|
||||
if "meta" in sent_message:
|
||||
assert str(received_message.meta) == str(sent_message["meta"]), assert_fail_message("meta")
|
||||
if "ephemeral" in sent_message and sent_message["ephemeral"]:
|
||||
if "ephemeral" in sent_message:
|
||||
assert str(received_message.ephemeral) == str(sent_message["ephemeral"]), assert_fail_message("ephemeral")
|
||||
if "rateLimitProof" in sent_message and sent_message["rateLimitProof"]:
|
||||
if "rateLimitProof" in sent_message:
|
||||
assert str(received_message.rateLimitProof) == str(sent_message["rateLimitProof"]), assert_fail_message("rateLimitProof")
|
||||
|
||||
def wait_for_published_message_to_reach_peer(self, timeout_duration, time_between_retries=1):
|
||||
@ -75,3 +72,12 @@ class StepsRelay:
|
||||
self.check_published_message_reaches_peer(message)
|
||||
|
||||
check_peer_connection()
|
||||
|
||||
def ensure_subscriptions_on_nodes(self, node_list, pubsub_topic_list):
|
||||
for node in node_list:
|
||||
node.set_subscriptions(pubsub_topic_list)
|
||||
|
||||
def create_message(self, **kwargs):
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
message.update(kwargs)
|
||||
return message
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
from time import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from src.env_vars import DEFAULT_PUBSUBTOPIC
|
||||
from src.env_vars import DEFAULT_PUBSUB_TOPIC
|
||||
|
||||
NOW = datetime.now()
|
||||
|
||||
@ -66,7 +66,7 @@ INVALID_CONTENT_TOPICS = [
|
||||
]
|
||||
|
||||
VALID_PUBSUB_TOPICS = [
|
||||
DEFAULT_PUBSUBTOPIC,
|
||||
DEFAULT_PUBSUB_TOPIC,
|
||||
"/waku/2/rs/18/1",
|
||||
"/test/2/rs/18/1",
|
||||
"/waku/3/rs/18/1",
|
||||
|
||||
@ -64,7 +64,7 @@ def attach_logs_on_fail(request):
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def close_open_nodes():
|
||||
def close_open_nodes(attach_logs_on_fail):
|
||||
DS.waku_nodes = []
|
||||
yield
|
||||
crashed_containers = []
|
||||
|
||||
@ -12,7 +12,7 @@ class TestRelayPublish(StepsRelay):
|
||||
failed_payloads = []
|
||||
for payload in SAMPLE_INPUTS:
|
||||
logger.debug("Running test with payload %s", payload["description"])
|
||||
message = {"payload": to_base64(payload["value"]), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
message = self.create_message(payload=to_base64(payload["value"]))
|
||||
try:
|
||||
self.check_published_message_reaches_peer(message)
|
||||
except Exception as e:
|
||||
@ -24,7 +24,7 @@ class TestRelayPublish(StepsRelay):
|
||||
success_payloads = []
|
||||
for payload in INVALID_PAYLOADS:
|
||||
logger.debug("Running test with payload %s", payload["description"])
|
||||
message = {"payload": payload["value"], "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
message = self.create_message(payload=payload["value"])
|
||||
try:
|
||||
self.node1.send_message(message, self.test_pubsub_topic)
|
||||
success_payloads.append(payload)
|
||||
@ -43,14 +43,14 @@ class TestRelayPublish(StepsRelay):
|
||||
def test_publish_with_payload_less_than_one_mb(self):
|
||||
payload_length = 1024 * 1023
|
||||
logger.debug("Running test with payload length of %s bytes", payload_length)
|
||||
message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
message = self.create_message(payload=to_base64("a" * (payload_length)))
|
||||
self.check_published_message_reaches_peer(message, message_propagation_delay=2)
|
||||
|
||||
def test_publish_with_payload_equal_or_more_than_one_mb(self):
|
||||
payload_length = 1024 * 1023
|
||||
for payload_length in [1024 * 1024, 1024 * 1024 * 10]:
|
||||
logger.debug("Running test with payload length of %s bytes", payload_length)
|
||||
message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
message = self.create_message(payload=to_base64("a" * (payload_length)))
|
||||
try:
|
||||
self.check_published_message_reaches_peer(message, message_propagation_delay=2)
|
||||
raise AssertionError("Duplicate message was retrieved twice")
|
||||
@ -61,7 +61,7 @@ class TestRelayPublish(StepsRelay):
|
||||
failed_content_topics = []
|
||||
for content_topic in SAMPLE_INPUTS:
|
||||
logger.debug("Running test with content topic %s", content_topic["description"])
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": content_topic["value"], "timestamp": int(time() * 1e9)}
|
||||
message = self.create_message(contentTopic=content_topic["value"])
|
||||
try:
|
||||
self.check_published_message_reaches_peer(message)
|
||||
except Exception as e:
|
||||
@ -73,7 +73,7 @@ class TestRelayPublish(StepsRelay):
|
||||
success_content_topics = []
|
||||
for content_topic in INVALID_CONTENT_TOPICS:
|
||||
logger.debug("Running test with contetn topic %s", content_topic["description"])
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": content_topic["value"], "timestamp": int(time() * 1e9)}
|
||||
message = self.create_message(contentTopic=content_topic["value"])
|
||||
try:
|
||||
self.node1.send_message(message, self.test_pubsub_topic)
|
||||
success_content_topics.append(content_topic)
|
||||
@ -90,22 +90,27 @@ class TestRelayPublish(StepsRelay):
|
||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||
|
||||
def test_publish_on_multiple_pubsub_topics(self):
|
||||
self.node1.set_subscriptions(VALID_PUBSUB_TOPICS)
|
||||
self.node2.set_subscriptions(VALID_PUBSUB_TOPICS)
|
||||
self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS)
|
||||
failed_pubsub_topics = []
|
||||
for pubsub_topic in VALID_PUBSUB_TOPICS:
|
||||
logger.debug("Running test with pubsub topic %s", pubsub_topic)
|
||||
first_message = {"payload": to_base64("M1"), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
try:
|
||||
self.check_published_message_reaches_peer(first_message, pubsub_topic=pubsub_topic)
|
||||
self.check_published_message_reaches_peer(self.create_message(), pubsub_topic=pubsub_topic)
|
||||
except Exception as e:
|
||||
logger.error("PubusubTopic %s failed: %s", pubsub_topic, str(e))
|
||||
failed_pubsub_topics.append(pubsub_topic)
|
||||
assert not failed_pubsub_topics, f"PubusubTopic failed: {failed_pubsub_topics}"
|
||||
|
||||
def test_message_published_on_different_pubsub_topic_is_not_retrieved(self):
|
||||
self.ensure_subscriptions_on_nodes([self.node1, self.node2], VALID_PUBSUB_TOPICS)
|
||||
self.node1.send_message(self.create_message(), VALID_PUBSUB_TOPICS[0])
|
||||
delay(0.1)
|
||||
messages = self.node2.get_messages(VALID_PUBSUB_TOPICS[1])
|
||||
assert not messages, "Message was retrieved on wrong pubsub_topic"
|
||||
|
||||
def test_publish_on_unsubscribed_pubsub_topic(self):
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message, pubsub_topic="/waku/2/rs/19/1")
|
||||
self.check_published_message_reaches_peer(self.create_message(), pubsub_topic="/waku/2/rs/19/1")
|
||||
raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||
@ -115,7 +120,7 @@ class TestRelayPublish(StepsRelay):
|
||||
for timestamp in SAMPLE_TIMESTAMPS:
|
||||
if self.node1.type() in timestamp["valid_for"]:
|
||||
logger.debug("Running test with timestamp %s", timestamp["description"])
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": timestamp["value"]}
|
||||
message = self.create_message(timestamp=timestamp["value"])
|
||||
try:
|
||||
self.check_published_message_reaches_peer(message)
|
||||
except Exception as ex:
|
||||
@ -128,7 +133,7 @@ class TestRelayPublish(StepsRelay):
|
||||
for timestamp in SAMPLE_TIMESTAMPS:
|
||||
if self.node1.type() not in timestamp["valid_for"]:
|
||||
logger.debug("Running test with timestamp %s", timestamp["description"])
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": timestamp["value"]}
|
||||
message = self.create_message(timestamp=timestamp["value"])
|
||||
try:
|
||||
self.check_published_message_reaches_peer(message)
|
||||
success_timestamps.append(timestamp)
|
||||
@ -141,25 +146,21 @@ class TestRelayPublish(StepsRelay):
|
||||
self.check_published_message_reaches_peer(message)
|
||||
|
||||
def test_publish_with_valid_version(self):
|
||||
self.test_message["version"] = 10
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(version=10))
|
||||
|
||||
def test_publish_with_invalid_version(self):
|
||||
self.test_message["version"] = 2.1
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(version=2.1))
|
||||
raise AssertionError("Publish with invalid version worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex)
|
||||
|
||||
def test_publish_with_valid_meta(self):
|
||||
self.test_message["meta"] = to_base64(self.test_payload)
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(meta=to_base64(self.test_payload)))
|
||||
|
||||
def test_publish_with_invalid_meta(self):
|
||||
self.test_message["meta"] = self.test_payload
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(meta=self.test_payload))
|
||||
raise AssertionError("Publish with invalid meta worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex)
|
||||
@ -168,63 +169,58 @@ class TestRelayPublish(StepsRelay):
|
||||
failed_ephemeral = []
|
||||
for ephemeral in [True, False]:
|
||||
logger.debug("Running test with Ephemeral %s", ephemeral)
|
||||
self.test_message["ephemeral"] = ephemeral
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(ephemeral=ephemeral))
|
||||
except Exception as e:
|
||||
logger.error("Massage with Ephemeral %s failed: %s", ephemeral, str(e))
|
||||
failed_ephemeral.append(ephemeral)
|
||||
assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}"
|
||||
|
||||
def test_publish_with_rate_limit_proof(self):
|
||||
self.test_message["rateLimitProof"] = {
|
||||
rate_limit_proof = {
|
||||
"proof": to_base64("proofData"),
|
||||
"epoch": to_base64("epochData"),
|
||||
"nullifier": to_base64("nullifierData"),
|
||||
}
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(rateLimitProof=rate_limit_proof))
|
||||
|
||||
def test_publish_with_extra_field(self):
|
||||
self.test_message["extraField"] = "extraValue"
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(extraField="extraValue"))
|
||||
|
||||
def test_publish_and_retrieve_duplicate_message(self):
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
message = self.create_message()
|
||||
self.check_published_message_reaches_peer(message)
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(message)
|
||||
raise AssertionError("Duplicate message was retrieved twice")
|
||||
except Exception as ex:
|
||||
assert "Peer node couldn't find any messages" in str(ex)
|
||||
|
||||
def test_publish_after_node_pauses(self):
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message())
|
||||
self.node1.pause()
|
||||
self.node1.unpause()
|
||||
self.test_message["payload"] = to_base64("new payload 1")
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M1")))
|
||||
self.node2.pause()
|
||||
self.node2.unpause()
|
||||
self.test_message["payload"] = to_base64("new payload 2")
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message(payload=to_base64("M2")))
|
||||
|
||||
def test_publish_after_node1_restarts(self):
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message())
|
||||
self.node1.restart()
|
||||
self.node1.set_subscriptions([self.test_pubsub_topic])
|
||||
self.node2.set_subscriptions([self.test_pubsub_topic])
|
||||
self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic])
|
||||
self.wait_for_published_message_to_reach_peer(20)
|
||||
|
||||
def test_publish_after_node2_restarts(self):
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
self.check_published_message_reaches_peer(self.create_message())
|
||||
self.node2.restart()
|
||||
self.node1.set_subscriptions([self.test_pubsub_topic])
|
||||
self.node2.set_subscriptions([self.test_pubsub_topic])
|
||||
self.ensure_subscriptions_on_nodes([self.node1, self.node2], [self.test_pubsub_topic])
|
||||
self.wait_for_published_message_to_reach_peer(20)
|
||||
|
||||
def test_publish_and_retrieve_100_messages(self):
|
||||
num_messages = 100 # if increase this number make sure to also increase rest-relay-cache-capacity flag
|
||||
for index in range(num_messages):
|
||||
message = {"payload": to_base64(f"M_{index}"), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
message = self.create_message(payload=to_base64(f"M_{index}"))
|
||||
self.node1.send_message(message, self.test_pubsub_topic)
|
||||
delay(1)
|
||||
messages = self.node2.get_messages(self.test_pubsub_topic)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user