mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-08 17:03:12 +00:00
adjust tests for go-waku
This commit is contained in:
parent
6d3875fdbd
commit
8a54f28bb1
89
src/node/store_response.py
Normal file
89
src/node/store_response.py
Normal file
@ -0,0 +1,89 @@
|
||||
class StoreResponse:
|
||||
def __init__(self, store_response, node):
|
||||
self.response = store_response
|
||||
self.node = node
|
||||
|
||||
@property
|
||||
def request_id(self):
|
||||
try:
|
||||
if self.node.is_nwaku():
|
||||
return self.response.get("requestId")
|
||||
else:
|
||||
return self.response.get("request_id")
|
||||
except:
|
||||
return None
|
||||
|
||||
@property
|
||||
def status_code(self):
|
||||
try:
|
||||
if self.node.is_nwaku():
|
||||
return self.response.get("statusCode")
|
||||
else:
|
||||
return self.response.get("status_code")
|
||||
except:
|
||||
return None
|
||||
|
||||
@property
|
||||
def status_desc(self):
|
||||
try:
|
||||
if self.node.is_nwaku():
|
||||
return self.response.get("statusDesc")
|
||||
else:
|
||||
return self.response.get("status_desc")
|
||||
except:
|
||||
return None
|
||||
|
||||
@property
|
||||
def messages(self):
|
||||
try:
|
||||
return self.response.get("messages")
|
||||
except:
|
||||
return None
|
||||
|
||||
@property
|
||||
def pagination_cursor(self):
|
||||
try:
|
||||
if self.node.is_nwaku():
|
||||
return self.response.get("paginationCursor")
|
||||
else:
|
||||
return self.response.get("pagination_cursor")
|
||||
except:
|
||||
return None
|
||||
|
||||
def message_hash(self, index):
|
||||
if self.messages is not None:
|
||||
if self.node.is_nwaku():
|
||||
return self.messages[index]["messageHash"]
|
||||
else:
|
||||
return self.messages[index]["message_hash"]
|
||||
else:
|
||||
return None
|
||||
|
||||
def message_payload(self, index):
|
||||
try:
|
||||
if self.messages is not None:
|
||||
payload = self.messages[index]["message"]["payload"]
|
||||
return payload
|
||||
else:
|
||||
return None
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
def message_at(self, index):
|
||||
try:
|
||||
if self.messages is not None:
|
||||
message = self.messages[index]["message"]
|
||||
return message
|
||||
else:
|
||||
return None
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
def message_pubsub_topic(self, index):
|
||||
if self.messages is not None:
|
||||
if self.node.is_nwaku():
|
||||
return self.messages[index]["pubsubTopic"]
|
||||
else:
|
||||
return self.messages[index]["pubsub_topic"]
|
||||
else:
|
||||
return None
|
||||
@ -13,6 +13,7 @@ class MessageRpcResponse:
|
||||
timestamp: Optional[int]
|
||||
ephemeral: Optional[bool]
|
||||
meta: Optional[str]
|
||||
rateLimitProof: str = field(default_factory=dict)
|
||||
rate_limit_proof: Optional[dict] = field(default_factory=dict)
|
||||
|
||||
|
||||
@ -23,8 +24,8 @@ class WakuMessage:
|
||||
self.message_rpc_response_schema = class_schema(self.schema)()
|
||||
|
||||
@allure.step
|
||||
def assert_received_message(self, sent_message, index=0):
|
||||
message = self.message_rpc_response_schema.load(self.received_messages[index])
|
||||
def assert_received_message(self, sent_message):
|
||||
message = self.message_rpc_response_schema.load(self.received_messages)
|
||||
|
||||
def assert_fail_message(field_name):
|
||||
return f"Incorrect field: {field_name}. Published: {sent_message[field_name]} Received: {getattr(message, field_name)}"
|
||||
|
||||
@ -3,6 +3,7 @@ from src.libs.custom_logger import get_custom_logger
|
||||
import pytest
|
||||
import allure
|
||||
from src.libs.common import delay
|
||||
from src.node.store_response import StoreResponse
|
||||
from src.node.waku_message import WakuMessage
|
||||
from src.env_vars import (
|
||||
ADDITIONAL_NODES,
|
||||
@ -151,7 +152,7 @@ class StepsStore(StepsCommon):
|
||||
pubsub_topic = self.test_pubsub_topic
|
||||
if node.is_gowaku() and content_topics is None:
|
||||
content_topics = self.test_content_topic
|
||||
return node.get_store_messages(
|
||||
store_response = node.get_store_messages(
|
||||
peer_addr=peer_addr,
|
||||
include_data=include_data,
|
||||
pubsub_topic=pubsub_topic,
|
||||
@ -165,6 +166,11 @@ class StepsStore(StepsCommon):
|
||||
store_v=store_v,
|
||||
**kwargs,
|
||||
)
|
||||
store_response = StoreResponse(store_response, node)
|
||||
assert store_response.request_id is not None, "Request id is missing"
|
||||
assert store_response.status_code, "Status code is missing"
|
||||
assert store_response.status_desc, "Status desc is missing"
|
||||
return store_response
|
||||
|
||||
@allure.step
|
||||
def check_published_message_is_stored(
|
||||
@ -212,20 +218,17 @@ class StepsStore(StepsCommon):
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
assert "messages" in self.store_response, f"Peer {node.image} has no messages key in the reponse"
|
||||
assert self.store_response["messages"], f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response}"
|
||||
assert len(self.store_response["messages"]) >= 1, "Expected at least 1 message but got none"
|
||||
assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response}"
|
||||
assert len(self.store_response.messages) >= 1, "Expected at least 1 message but got none"
|
||||
store_message_index = -1 # we are looking for the last and most recent message in the store
|
||||
waku_message = WakuMessage(self.store_response["messages"][store_message_index:])
|
||||
waku_message = WakuMessage(self.store_response.messages[store_message_index:])
|
||||
if store_v == "v1":
|
||||
waku_message.assert_received_message(message_to_check)
|
||||
else:
|
||||
expected_hash = self.compute_message_hash(pubsub_topic, message_to_check)
|
||||
if node.is_nwaku():
|
||||
actual = self.store_response["messages"][store_message_index]["messageHash"]
|
||||
else:
|
||||
actual = self.store_response["messages"][store_message_index]["message_hash"]
|
||||
assert expected_hash == actual, f"Message hash returned by store doesn't match the computed message hash {expected_hash}"
|
||||
assert expected_hash == self.store_response.message_hash(
|
||||
store_message_index
|
||||
), f"Message hash returned by store doesn't match the computed message hash {expected_hash}"
|
||||
|
||||
@allure.step
|
||||
def check_store_returns_empty_response(self, pubsub_topic=None):
|
||||
|
||||
@ -1,15 +1,9 @@
|
||||
import pytest
|
||||
from src.env_vars import NODE_1
|
||||
from src.libs.common import to_base64
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.node.waku_message import WakuMessage
|
||||
from src.steps.store import StepsStore
|
||||
from src.test_data import SAMPLE_INPUTS
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
# TO DO test without pubsubtopic freezes
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("node_setup")
|
||||
class TestApiFlags(StepsStore):
|
||||
@ -17,32 +11,6 @@ class TestApiFlags(StepsStore):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(store_node=self.store_node1, peer_addr=self.multiaddr_list[0])
|
||||
|
||||
def test_store_with_hashes(self):
|
||||
message_hash_list = []
|
||||
for payload in SAMPLE_INPUTS:
|
||||
message = self.create_message(payload=to_base64(payload["value"]))
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
for message_hash in message_hash_list:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, hashes=message_hash, page_size=50, ascending="true")
|
||||
assert len(store_response["messages"]) == 1
|
||||
assert store_response["messages"][0]["messageHash"] == message_hash
|
||||
|
||||
def test_store_with_multiple_hashes(self):
|
||||
message_hash_list = []
|
||||
for payload in SAMPLE_INPUTS:
|
||||
message = self.create_message(payload=to_base64(payload["value"]))
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(
|
||||
pubsub_topic=self.test_pubsub_topic, hashes=f"{message_hash_list[0]},{message_hash_list[4]}", page_size=50, ascending="true"
|
||||
)
|
||||
assert len(store_response["messages"]) == 2
|
||||
assert store_response["messages"][0]["messageHash"] == message_hash_list[0], "Incorrect messaged filtered based on multiple hashes"
|
||||
assert store_response["messages"][1]["messageHash"] == message_hash_list[4], "Incorrect messaged filtered based on multiple hashes"
|
||||
|
||||
def test_store_include_data(self):
|
||||
message_list = []
|
||||
for payload in SAMPLE_INPUTS:
|
||||
@ -51,9 +19,9 @@ class TestApiFlags(StepsStore):
|
||||
message_list.append(message)
|
||||
for node in self.store_nodes:
|
||||
store_response = self.get_messages_from_store(node, include_data="true", page_size=50)
|
||||
assert len(store_response["messages"]) == len(SAMPLE_INPUTS)
|
||||
for index, message in enumerate(store_response["messages"]):
|
||||
assert message["message"]["payload"] == message_list[index]["payload"]
|
||||
assert message["pubsubTopic"] == self.test_pubsub_topic
|
||||
waku_message = WakuMessage([message["message"]])
|
||||
assert len(store_response.messages) == len(SAMPLE_INPUTS)
|
||||
for index in range(len(store_response.messages)):
|
||||
assert store_response.message_payload(index) == message_list[index]["payload"]
|
||||
assert store_response.message_pubsub_topic(index) == self.test_pubsub_topic
|
||||
waku_message = WakuMessage(store_response.message_at(index))
|
||||
waku_message.assert_received_message(message_list[index])
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
import pytest
|
||||
from src.env_vars import NODE_1
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.libs.common import to_base64
|
||||
from src.node.store_response import StoreResponse
|
||||
from src.steps.store import StepsStore
|
||||
|
||||
|
||||
@pytest.mark.xfail("go_waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1109")
|
||||
@pytest.mark.usefixtures("node_setup")
|
||||
class TestCursor(StepsStore):
|
||||
# we implicitly test the reusabilty of the cursor for multiple nodes
|
||||
@ -14,13 +16,13 @@ class TestCursor(StepsStore):
|
||||
message = self.create_message(payload=to_base64(f"Message_{i}"))
|
||||
self.publish_message(message=message)
|
||||
expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
store_response = {"paginationCursor": ""}
|
||||
store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, self.store_node1)
|
||||
response_message_hash_list = []
|
||||
while "paginationCursor" in store_response:
|
||||
cursor = store_response["paginationCursor"]
|
||||
store_response = self.store_node1.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=100, ascending="true", cursor=cursor)
|
||||
for message in store_response["messages"]:
|
||||
response_message_hash_list.append(message["messageHash"])
|
||||
while store_response.pagination_cursor is not None:
|
||||
cursor = store_response.pagination_cursor
|
||||
store_response = self.get_messages_from_store(self.store_node1, page_size=100, cursor=cursor)
|
||||
for index in range(len(store_response.messages)):
|
||||
response_message_hash_list.append(store_response.message_hash(index))
|
||||
assert len(expected_message_hash_list) == len(response_message_hash_list), "Message count mismatch"
|
||||
assert expected_message_hash_list == response_message_hash_list, "Message hash mismatch"
|
||||
|
||||
@ -34,72 +36,69 @@ class TestCursor(StepsStore):
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=cursor_index, ascending="true")
|
||||
assert len(store_response["messages"]) == cursor_index
|
||||
cursor = store_response["paginationCursor"]
|
||||
store_response = self.get_messages_from_store(node, page_size=cursor_index)
|
||||
assert len(store_response.messages) == cursor_index
|
||||
cursor = store_response.pagination_cursor
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=100, ascending="true", cursor=cursor)
|
||||
assert len(store_response["messages"]) == message_count - cursor_index
|
||||
for index, message_hash in enumerate(store_response["messages"]):
|
||||
assert message_hash["messageHash"] == message_hash_list[cursor_index + index], f"Message hash at index {index} doesn't match"
|
||||
store_response = self.get_messages_from_store(node, page_size=100, ascending="true", cursor=cursor)
|
||||
assert len(store_response.messages) == message_count - cursor_index
|
||||
for index in range(len(store_response.messages)):
|
||||
assert store_response.message_hash(index) == message_hash_list[cursor_index + index], f"Message hash at index {index} doesn't match"
|
||||
|
||||
def test_passing_cursor_not_returned_in_paginationCursor(self):
|
||||
cursor = ""
|
||||
for i in range(10):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
store_response = self.get_messages_from_store(node, page_size=5)
|
||||
# retrieving the cursor with the message hash of the 3rd message stored
|
||||
cursor = store_response["messages"][2]["messageHash"]
|
||||
cursor = store_response.message_hash(2)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=100, ascending="true", cursor=cursor)
|
||||
assert len(store_response["messages"]) == 7, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||
assert len(store_response.messages) == 7, "Message count mismatch"
|
||||
|
||||
def test_passing_cursor_of_the_last_message_from_the_store(self):
|
||||
cursor = ""
|
||||
for i in range(10):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=10, ascending="true")
|
||||
store_response = self.get_messages_from_store(node, page_size=10)
|
||||
# retrieving the cursor with the message hash of the last message stored
|
||||
cursor = store_response["messages"][9]["messageHash"]
|
||||
cursor = store_response.message_hash(9)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=100, ascending="true", cursor=cursor)
|
||||
assert len(store_response["messages"]) == 0, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||
assert len(store_response.messages) == 0, "Message count mismatch"
|
||||
|
||||
@pytest.mark.xfail("nwaku" in NODE_1, reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
|
||||
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||
def test_passing_cursor_of_non_existing_message_from_the_store(self):
|
||||
for i in range(4):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=10, ascending="true")
|
||||
# creating a cursor to a message that doesn't exist
|
||||
wrong_message = self.create_message(payload=to_base64("test"))
|
||||
cursor = self.compute_message_hash(self.test_pubsub_topic, wrong_message)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=100, ascending="true", cursor=cursor)
|
||||
assert len(store_response["messages"]) == 0, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||
assert len(store_response.messages) == 0, "Message count mismatch"
|
||||
|
||||
@pytest.mark.xfail("nwaku" in NODE_1, reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
|
||||
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||
def test_passing_invalid_cursor(self):
|
||||
for i in range(4):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=10, ascending="true")
|
||||
# creating a invalid base64 cursor
|
||||
cursor = to_base64("test")
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=100, ascending="true", cursor=cursor)
|
||||
assert len(store_response["messages"]) == 0, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||
assert len(store_response.messages) == 0, "Message count mismatch"
|
||||
|
||||
@pytest.mark.xfail("nwaku" in NODE_1, reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
|
||||
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||
def test_passing_non_base64_cursor(self):
|
||||
for i in range(4):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=10, ascending="true")
|
||||
# creating a non base64 cursor
|
||||
cursor = "test"
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=100, ascending="true", cursor=cursor)
|
||||
assert len(store_response["messages"]) == 0, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||
assert len(store_response.messages) == 0, "Message count mismatch"
|
||||
|
||||
@ -19,8 +19,8 @@ class TestEphemeral(StepsStore):
|
||||
self.publish_message(message=self.create_message(ephemeral=True))
|
||||
stored = self.publish_message(message=self.create_message(ephemeral=False))
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored)
|
||||
assert len(self.store_response["messages"]) == 1
|
||||
assert len(self.store_response.messages) == 1
|
||||
stored = self.publish_message(message=self.create_message(ephemeral=False))
|
||||
self.publish_message(message=self.create_message(ephemeral=True))
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored)
|
||||
assert len(self.store_response["messages"]) == 2
|
||||
assert len(self.store_response.messages) == 2
|
||||
|
||||
@ -6,15 +6,15 @@ from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, SAMPLE_INPUTS, PUBSUB
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
# TO DO test without pubsubtopic freezes
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("node_setup")
|
||||
class TestGetMessages(StepsStore):
|
||||
# only one test for store v1, all other tests are using the new store v3
|
||||
def test_legacy_store_v1(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true", store_v="v1")
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v1")
|
||||
assert len(store_response["messages"]) == 1
|
||||
|
||||
def test_get_store_messages_with_different_payloads(self):
|
||||
failed_payloads = []
|
||||
@ -28,7 +28,7 @@ class TestGetMessages(StepsStore):
|
||||
logger.error(f'Payload {payload["description"]} failed: {str(e)}')
|
||||
failed_payloads.append(payload["description"])
|
||||
assert not failed_payloads, f"Payloads failed: {failed_payloads}"
|
||||
assert len(self.store_response["messages"]) == len(SAMPLE_INPUTS)
|
||||
assert len(self.store_response.messages) == len(SAMPLE_INPUTS)
|
||||
|
||||
def test_get_store_messages_with_different_content_topics(self):
|
||||
failed_content_topics = []
|
||||
@ -72,7 +72,7 @@ class TestGetMessages(StepsStore):
|
||||
self.publish_message(message=message)
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
# only one message is stored
|
||||
assert len(self.store_response["messages"]) == 1
|
||||
assert len(self.store_response.messages) == 1
|
||||
|
||||
def test_get_multiple_store_messages(self):
|
||||
message_hash_list = []
|
||||
@ -82,15 +82,11 @@ class TestGetMessages(StepsStore):
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = self.get_messages_from_store(node, page_size=50)
|
||||
assert len(store_response["messages"]) == len(SAMPLE_INPUTS)
|
||||
for index, message_hash in enumerate(store_response["messages"]):
|
||||
if node.is_nwaku():
|
||||
actual = message_hash["messageHash"]
|
||||
else:
|
||||
actual = message_hash["message_hash"]
|
||||
assert actual == message_hash_list[index], f"Message hash at index {index} doesn't match"
|
||||
assert len(store_response.messages) == len(SAMPLE_INPUTS)
|
||||
for index in range(len(store_response.messages)):
|
||||
assert store_response.message_hash(index) == message_hash_list[index], f"Message hash at index {index} doesn't match"
|
||||
|
||||
def test_store_is_empty(self):
|
||||
for node in self.store_nodes:
|
||||
store_response = self.get_messages_from_store(node, page_size=50)
|
||||
assert len(store_response["messages"]) == 0
|
||||
assert not store_response.messages
|
||||
|
||||
60
tests/store/test_hashes.py
Normal file
60
tests/store/test_hashes.py
Normal file
@ -0,0 +1,60 @@
|
||||
import pytest
|
||||
from src.env_vars import NODE_2
|
||||
from src.libs.common import to_base64
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.store import StepsStore
|
||||
from src.test_data import SAMPLE_INPUTS
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.xfail("go_waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1109")
|
||||
@pytest.mark.usefixtures("node_setup")
|
||||
class TestHashes(StepsStore):
|
||||
def test_store_with_hashes(self):
|
||||
message_hash_list = []
|
||||
for payload in SAMPLE_INPUTS:
|
||||
message = self.create_message(payload=to_base64(payload["value"]))
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
for message_hash in message_hash_list:
|
||||
store_response = self.get_messages_from_store(node, hashes=message_hash, page_size=50)
|
||||
assert len(store_response.messages) == 1
|
||||
assert store_response.message_hash(0) == message_hash
|
||||
|
||||
def test_store_with_multiple_hashes(self):
|
||||
message_hash_list = []
|
||||
for payload in SAMPLE_INPUTS:
|
||||
message = self.create_message(payload=to_base64(payload["value"]))
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = self.get_messages_from_store(node, hashes=f"{message_hash_list[0]},{message_hash_list[4]}", page_size=50)
|
||||
assert len(store_response.messages) == 2
|
||||
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on multiple hashes"
|
||||
assert store_response.message_hash(1) == message_hash_list[4], "Incorrect messaged filtered based on multiple hashes"
|
||||
|
||||
def test_store_with_wrong_hash(self):
|
||||
for i in range(4):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
wrong_hash = self.compute_message_hash(self.test_pubsub_topic, self.create_message(payload=to_base64("test")))
|
||||
for node in self.store_nodes:
|
||||
store_response = self.get_messages_from_store(node, hashes=wrong_hash, page_size=50)
|
||||
assert len(store_response.messages) == 0
|
||||
|
||||
def test_store_with_invalid_hash(self):
|
||||
for i in range(4):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
invalid_hash = to_base64("test")
|
||||
for node in self.store_nodes:
|
||||
store_response = self.get_messages_from_store(node, hashes=invalid_hash, page_size=50)
|
||||
assert len(store_response.messages) == 0
|
||||
|
||||
def test_store_with_non_base64_hash(self):
|
||||
for i in range(4):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
non_base64_hash = "test"
|
||||
for node in self.store_nodes:
|
||||
store_response = self.get_messages_from_store(node, hashes=non_base64_hash, page_size=50)
|
||||
assert len(store_response.messages) == 0
|
||||
@ -9,27 +9,27 @@ class TestPageSize(StepsStore):
|
||||
for i in range(30):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, ascending="true")
|
||||
assert len(store_response["messages"]) == 20, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node)
|
||||
assert len(store_response.messages) == 20, "Message count mismatch"
|
||||
|
||||
def test_page_size_0_defaults_to_20(self):
|
||||
for i in range(30):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=0, ascending="true")
|
||||
assert len(store_response["messages"]) == 20, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node, page_size=0)
|
||||
assert len(store_response.messages) == 20, "Message count mismatch"
|
||||
|
||||
def test_max_page_size(self):
|
||||
for i in range(200):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=200, ascending="true")
|
||||
assert len(store_response["messages"]) == 100, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node, page_size=200)
|
||||
assert len(store_response.messages) == 100, "Message count mismatch"
|
||||
|
||||
@pytest.mark.parametrize("page_size", [1, 11, 39, 81, 99])
|
||||
def test_different_page_size(self, page_size):
|
||||
for i in range(page_size + 1):
|
||||
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=page_size, ascending="true")
|
||||
assert len(store_response["messages"]) == page_size, "Message count mismatch"
|
||||
store_response = self.get_messages_from_store(node, page_size=page_size)
|
||||
assert len(store_response.messages) == page_size, "Message count mismatch"
|
||||
|
||||
@ -10,77 +10,84 @@ logger = get_custom_logger(__name__)
|
||||
class TestReliability(StepsStore):
|
||||
def test_publishing_node_is_stopped(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
self.publishing_node1.stop()
|
||||
store_response = self.store_node1.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
assert len(store_response["messages"]) == 1
|
||||
try:
|
||||
store_response = self.get_messages_from_store(self.store_node1, page_size=5)
|
||||
assert len(store_response.messages) == 1
|
||||
except Exception as ex:
|
||||
if self.store_node1.is_gowaku():
|
||||
assert "failed to dial: context deadline exceeded" in str(ex)
|
||||
else:
|
||||
raise AssertionError(f"Nwaku failed with {ex}")
|
||||
|
||||
def test_publishing_node_restarts(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
self.publishing_node1.restart()
|
||||
self.publishing_node1.ensure_ready()
|
||||
self.add_node_peer(self.store_node1, self.multiaddr_list)
|
||||
self.subscribe_to_pubsub_topics_via_relay(node=self.publishing_node1)
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
assert len(store_response["messages"]) == 2
|
||||
store_response = self.get_messages_from_store(node, page_size=5)
|
||||
assert len(store_response.messages) == 2
|
||||
|
||||
def test_store_node_restarts(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
self.store_node1.restart()
|
||||
self.store_node1.ensure_ready()
|
||||
self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1)
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
assert len(store_response["messages"]) == 2
|
||||
store_response = self.get_messages_from_store(node, page_size=5)
|
||||
assert len(store_response.messages) == 2
|
||||
|
||||
def test_publishing_node_paused_and_unpaused(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
self.publishing_node1.pause()
|
||||
delay(1)
|
||||
self.publishing_node1.unpause()
|
||||
self.publishing_node1.ensure_ready()
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
assert len(store_response["messages"]) == 2
|
||||
store_response = self.get_messages_from_store(node, page_size=5)
|
||||
assert len(store_response.messages) == 2
|
||||
|
||||
def test_store_node_paused_and_unpaused(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
self.store_node1.pause()
|
||||
delay(1)
|
||||
self.store_node1.unpause()
|
||||
self.store_node1.ensure_ready()
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
assert len(store_response["messages"]) == 2
|
||||
store_response = self.get_messages_from_store(node, page_size=5)
|
||||
assert len(store_response.messages) == 2
|
||||
|
||||
def test_message_relayed_while_store_node_is_paused(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
self.store_node1.pause()
|
||||
self.publish_message()
|
||||
self.store_node1.unpause()
|
||||
self.store_node1.ensure_ready()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
assert len(store_response["messages"]) == 2
|
||||
store_response = self.get_messages_from_store(node, page_size=5)
|
||||
assert len(store_response.messages) == 2
|
||||
|
||||
## I THINK WE HAVE A BUG FOR GOWAKU - NEEDS REPORTING!!!!
|
||||
def test_message_relayed_while_store_node_is_stopped(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
self.store_node1.stop()
|
||||
self.publish_message()
|
||||
self.store_node1.start()
|
||||
@ -89,15 +96,16 @@ class TestReliability(StepsStore):
|
||||
self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1)
|
||||
delay(1)
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
assert len(store_response["messages"]) == 2
|
||||
store_response = self.get_messages_from_store(node, page_size=5)
|
||||
assert len(store_response.messages) == 2
|
||||
|
||||
## I THINK WE HAVE A BUG FOR NWAKU - NEEDS REPORTING!!!!
|
||||
def test_message_relayed_before_store_node_is_started(self):
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
self.setup_second_store_node(store="true", relay="true")
|
||||
self.subscribe_to_pubsub_topics_via_relay()
|
||||
store_response = self.store_node2.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true")
|
||||
assert len(store_response["messages"]) == 0
|
||||
store_response = self.get_messages_from_store(self.store_node2, page_size=5)
|
||||
assert len(store_response.messages) == 1
|
||||
self.publish_message()
|
||||
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||
self.check_published_message_is_stored(page_size=5)
|
||||
|
||||
@ -13,10 +13,10 @@ class TestSorting(StepsStore):
|
||||
self.publish_message(message=message)
|
||||
expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending=ascending)
|
||||
store_response = self.get_messages_from_store(node, page_size=5, ascending=ascending)
|
||||
response_message_hash_list = []
|
||||
for message in store_response["messages"]:
|
||||
response_message_hash_list.append(message["messageHash"])
|
||||
for index in range(len(store_response.messages)):
|
||||
response_message_hash_list.append(store_response.message_hash(index))
|
||||
if ascending == "true":
|
||||
assert response_message_hash_list == expected_message_hash_list[:5], "Message hash mismatch for acending order"
|
||||
else:
|
||||
|
||||
@ -56,15 +56,14 @@ class TestTimeFilter(StepsStore):
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(
|
||||
pubsub_topic=self.test_pubsub_topic,
|
||||
store_response = self.get_messages_from_store(
|
||||
node,
|
||||
page_size=20,
|
||||
ascending="true",
|
||||
start_time=self.ts_pass[0]["value"] - 100000,
|
||||
end_time=self.ts_pass[0]["value"] + 100000,
|
||||
)
|
||||
assert len(store_response["messages"]) == 1, "Message count mismatch"
|
||||
assert store_response["messages"][0]["messageHash"] == message_hash_list[0], "Incorrect messaged filtered based on time"
|
||||
assert len(store_response.messages) == 1, "Message count mismatch"
|
||||
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on time"
|
||||
|
||||
def test_time_filter_matches_multiple_messages(self):
|
||||
message_hash_list = []
|
||||
@ -73,16 +72,15 @@ class TestTimeFilter(StepsStore):
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(
|
||||
pubsub_topic=self.test_pubsub_topic,
|
||||
store_response = self.get_messages_from_store(
|
||||
node,
|
||||
page_size=20,
|
||||
ascending="true",
|
||||
start_time=self.ts_pass[0]["value"] - 100000,
|
||||
end_time=self.ts_pass[4]["value"] + 100000,
|
||||
)
|
||||
assert len(store_response["messages"]) == 5, "Message count mismatch"
|
||||
assert len(store_response.messages) == 5, "Message count mismatch"
|
||||
for i in range(5):
|
||||
assert store_response["messages"][i]["messageHash"] == message_hash_list[i], f"Incorrect messaged filtered based on time at index {i}"
|
||||
assert store_response.message_hash(i) == message_hash_list[i], f"Incorrect messaged filtered based on time at index {i}"
|
||||
|
||||
def test_time_filter_matches_no_message(self):
|
||||
message_hash_list = []
|
||||
@ -91,14 +89,13 @@ class TestTimeFilter(StepsStore):
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(
|
||||
pubsub_topic=self.test_pubsub_topic,
|
||||
store_response = self.get_messages_from_store(
|
||||
node,
|
||||
page_size=20,
|
||||
ascending="true",
|
||||
start_time=self.ts_pass[0]["value"] - 100000,
|
||||
end_time=self.ts_pass[0]["value"] - 100,
|
||||
)
|
||||
assert len(store_response["messages"]) == 0, "Message count mismatch"
|
||||
assert not store_response.messages, "Message count mismatch"
|
||||
|
||||
def test_time_filter_start_time_equals_end_time(self):
|
||||
message_hash_list = []
|
||||
@ -107,12 +104,11 @@ class TestTimeFilter(StepsStore):
|
||||
self.publish_message(message=message)
|
||||
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||
for node in self.store_nodes:
|
||||
store_response = node.get_store_messages(
|
||||
pubsub_topic=self.test_pubsub_topic,
|
||||
store_response = self.get_messages_from_store(
|
||||
node,
|
||||
page_size=20,
|
||||
ascending="true",
|
||||
start_time=self.ts_pass[0]["value"],
|
||||
end_time=self.ts_pass[0]["value"],
|
||||
)
|
||||
assert len(store_response["messages"]) == 1, "Message count mismatch"
|
||||
assert store_response["messages"][0]["messageHash"] == message_hash_list[0], "Incorrect messaged filtered based on time"
|
||||
assert len(store_response.messages) == 1, "Message count mismatch"
|
||||
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on time"
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
import pytest
|
||||
from src.env_vars import NODE_2
|
||||
from src.steps.store import StepsStore
|
||||
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS
|
||||
|
||||
|
||||
@pytest.mark.xfail("go_waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1108")
|
||||
class TestTopics(StepsStore):
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def topics_setup(self, node_setup):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user