chore: gowaku store fixes (#104)

This commit is contained in:
fbarbu15 2025-02-05 13:55:44 +02:00 committed by GitHub
parent 54a0dc29e8
commit c217ea0343
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 116 additions and 97 deletions

View File

@ -1,5 +1,5 @@
[pytest]
addopts = -s --instafail --tb=short --color=auto
addopts = --instafail --tb=short --color=auto
log_level = DEBUG
log_cli = True
log_file = log/test.log

View File

@ -48,7 +48,7 @@ class StepsCommon:
return message
@allure.step
def compute_message_hash(self, pubsub_topic, msg):
def compute_message_hash(self, pubsub_topic, msg, hash_type="hex"):
ctx = hashlib.sha256()
ctx.update(pubsub_topic.encode("utf-8"))
ctx.update(base64.b64decode(msg["payload"]))
@ -57,7 +57,10 @@ class StepsCommon:
ctx.update(base64.b64decode(msg["meta"]))
ctx.update(int(msg["timestamp"]).to_bytes(8, byteorder="big"))
hash_bytes = ctx.digest()
return "0x" + hash_bytes.hex()
if hash_type == "hex":
return "0x" + hash_bytes.hex()
else:
return base64.b64encode(hash_bytes).decode("utf-8")
def get_time_list_pass(self):
ts_pass = [

View File

@ -249,7 +249,8 @@ class StepsStore(StepsCommon):
waku_message = WakuMessage([self.store_response.messages[idx]])
waku_message.assert_received_message(message_to_check)
else:
expected_hash = self.compute_message_hash(pubsub_topic, message_to_check)
hash_type = "hex" if node.is_nwaku() else "base64"
expected_hash = self.compute_message_hash(pubsub_topic, message_to_check, hash_type=hash_type)
actual_hash = self.store_response.message_hash(idx)
assert (
expected_hash == actual_hash

View File

@ -12,13 +12,14 @@ class TestCursor(StepsStore):
@pytest.mark.parametrize("cursor_index, message_count", [[2, 4], [3, 20], [10, 40], [19, 20], [19, 50], [110, 120]])
def test_different_cursor_and_indexes(self, cursor_index, message_count):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
cursor = ""
cursor_index = cursor_index if cursor_index < 100 else 100
for i in range(message_count):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, page_size=cursor_index)
assert len(store_response.messages) == cursor_index
@ -27,7 +28,9 @@ class TestCursor(StepsStore):
store_response = self.get_messages_from_store(node, page_size=100, ascending="true", cursor=cursor)
assert len(store_response.messages) == message_count - cursor_index
for index in range(len(store_response.messages)):
assert store_response.message_hash(index) == message_hash_list[cursor_index + index], f"Message hash at index {index} doesn't match"
assert (
store_response.message_hash(index) == message_hash_list[node.type()][cursor_index + index]
), f"Message hash at index {index} doesn't match"
def test_passing_cursor_not_returned_in_paginationCursor(self):
cursor = ""
@ -53,41 +56,44 @@ class TestCursor(StepsStore):
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
assert not store_response.messages, "Messages found"
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
def test_passing_cursor_of_non_existing_message_from_the_store(self):
for i in range(4):
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
# creating a cursor to a message that doesn't exist
wrong_message = self.create_message(payload=to_base64("test"))
cursor = self.compute_message_hash(self.test_pubsub_topic, wrong_message)
cursor = {}
cursor["nwaku"] = self.compute_message_hash(self.test_pubsub_topic, wrong_message, hash_type="hex")
cursor["gowaku"] = self.compute_message_hash(self.test_pubsub_topic, wrong_message, hash_type="base64")
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
assert not store_response.messages, "Messages found"
try:
self.get_messages_from_store(node, page_size=100, cursor=cursor[node.type()])
raise AssertionError("Store fetch with wrong cursor worked!!!")
except Exception as ex:
assert "cursor not found" in str(ex) or "Internal Server Error" in str(ex)
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
def test_passing_invalid_cursor(self):
for i in range(4):
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
# creating a invalid hex cursor
cursor = to_hex("test")
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
assert not store_response.messages, "Messages found"
try:
self.get_messages_from_store(node, page_size=100, cursor=cursor)
raise AssertionError("Store fetch with wrong cursor worked!!!")
except Exception as ex:
assert "invalid hash length" in str(ex) or "Bad Request" in str(ex)
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
def test_passing_non_base64_cursor(self):
for i in range(4):
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
# creating a non base64 cursor
cursor = "test"
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
assert not store_response.messages, "Messages found"
# Addon on test
try:
self.get_messages_from_store(node, page_size=100, cursor=cursor)
raise AssertionError("Store fetch with wrong cursor worked!!!")
except Exception as ex:
assert "cursor not found" in str(ex) or "Exception converting hex string to bytes" in str(ex) or "Bad Request" in str(ex)
# Ensure that when the cursor is an empty string (""), the API returns the first page of data.
def test_empty_cursor(self):
@ -100,11 +106,9 @@ class TestCursor(StepsStore):
# Test the scenario where the cursor points near the last few messages, ensuring proper pagination.
def test_cursor_near_end(self):
message_hash_list = []
for i in range(10):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, page_size=5)
@ -120,44 +124,38 @@ class TestCursor(StepsStore):
# Create a deleted message and compute its hash as the cursor
deleted_message = self.create_message(payload=to_base64("Deleted_Message"))
cursor = self.compute_message_hash(self.test_pubsub_topic, deleted_message)
cursor = {}
cursor["nwaku"] = self.compute_message_hash(self.test_pubsub_topic, deleted_message, hash_type="hex")
cursor["gowaku"] = self.compute_message_hash(self.test_pubsub_topic, deleted_message, hash_type="base64")
# Test the store response
for node in self.store_nodes:
store_response = self.get_store_messages_with_errors(node=node, page_size=100, cursor=cursor)
# Assert that the error code is 500 for the deleted message scenario
store_response = self.get_store_messages_with_errors(
node=node, pubsub_topic=self.test_pubsub_topic, content_topics=self.test_content_topic, page_size=100, cursor=cursor[node.type()]
)
assert store_response["status_code"] == 500, f"Expected status code 500, got {store_response['status_code']}"
# Define a partial expected error message (since the actual response includes more details)
expected_error_fragment = "error in handleSelfStoreRequest: BAD_RESPONSE: archive error: DIRVER_ERROR: cursor not found"
# Extract the actual error message and ensure it contains the expected error fragment
actual_error_message = store_response["error_message"]
assert (
expected_error_fragment in actual_error_message
), f"Expected error message fragment '{expected_error_fragment}', but got '{actual_error_message}'"
assert "cursor not found" in actual_error_message
# Test if the API returns the expected messages when the cursor points to the first message in the store.
def test_cursor_equal_to_first_message(self):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
for i in range(10):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
cursor = message_hash_list[0] # Cursor points to the first message
for node in self.store_nodes:
cursor = message_hash_list[node.type()][0] # Cursor points to the first message
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
assert len(store_response.messages) == 9, "Message count mismatch from the first cursor"
# Test behavior when the cursor points exactly at the page size boundary.
def test_cursor_at_page_size_boundary(self):
message_hash_list = []
for i in range(10):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
# Set page size to 5, checking paginationCursor after both fetches
for node in self.store_nodes:
@ -213,6 +211,7 @@ class TestCursor(StepsStore):
assert store_response_valid.pagination_cursor is None, "There should be no pagination cursor for the last page"
# Validate the message content using the correct timestamp
hash_type = "hex" if node.is_nwaku() else "base64"
expected_message_hashes = [
self.compute_message_hash(
self.test_pubsub_topic,
@ -221,6 +220,7 @@ class TestCursor(StepsStore):
"contentTopic": "/myapp/1/latest/proto",
"timestamp": timestamps[3], # Use the stored timestamp for Message_3
},
hash_type=hash_type,
),
self.compute_message_hash(
self.test_pubsub_topic,
@ -229,6 +229,7 @@ class TestCursor(StepsStore):
"contentTopic": "/myapp/1/latest/proto",
"timestamp": timestamps[4], # Use the stored timestamp for Message_4
},
hash_type=hash_type,
),
]
for i, message in enumerate(store_response_valid.messages):

View File

@ -13,11 +13,12 @@ class TestCursorManyMessages(StepsStore):
@pytest.mark.timeout(540)
@pytest.mark.store2000
def test_get_multiple_2000_store_messages(self):
expected_message_hash_list = []
expected_message_hash_list = {"nwaku": [], "gowaku": []}
for i in range(2000):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
expected_message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
expected_message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, self.store_node1)
response_message_hash_list = []
while store_response.pagination_cursor is not None:
@ -25,5 +26,5 @@ class TestCursorManyMessages(StepsStore):
store_response = self.get_messages_from_store(self.store_node1, page_size=100, cursor=cursor)
for index in range(len(store_response.messages)):
response_message_hash_list.append(store_response.message_hash(index))
assert len(expected_message_hash_list) == len(response_message_hash_list), "Message count mismatch"
assert expected_message_hash_list == response_message_hash_list, "Message hash mismatch"
assert len(expected_message_hash_list[self.store_node1.type()]) == len(response_message_hash_list), "Message count mismatch"
assert expected_message_hash_list[self.store_node1.type()] == response_message_hash_list, "Message hash mismatch"

View File

@ -77,16 +77,17 @@ class TestGetMessages(StepsStore):
assert len(self.store_response.messages) == 1
def test_get_multiple_store_messages(self):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
for payload in SAMPLE_INPUTS:
message = self.create_message(payload=to_base64(payload["value"]))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, page_size=50)
assert len(store_response.messages) == len(SAMPLE_INPUTS)
for index in range(len(store_response.messages)):
assert store_response.message_hash(index) == message_hash_list[index], f"Message hash at index {index} doesn't match"
assert store_response.message_hash(index) == message_hash_list[node.type()][index], f"Message hash at index {index} doesn't match"
def test_store_is_empty(self):
for node in self.store_nodes:

View File

@ -12,35 +12,41 @@ logger = get_custom_logger(__name__)
@pytest.mark.usefixtures("node_setup")
class TestHashes(StepsStore):
def test_store_with_hashes(self):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
for payload in SAMPLE_INPUTS:
message = self.create_message(payload=to_base64(payload["value"]))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
for node in self.store_nodes:
for message_hash in message_hash_list:
for message_hash in message_hash_list[node.type()]:
store_response = self.get_messages_from_store(node, hashes=message_hash, page_size=50)
assert len(store_response.messages) == 1
assert store_response.message_hash(0) == message_hash
def test_store_with_multiple_hashes(self):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
for payload in SAMPLE_INPUTS:
message = self.create_message(payload=to_base64(payload["value"]))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, hashes=f"{message_hash_list[0]},{message_hash_list[4]}", page_size=50)
store_response = self.get_messages_from_store(
node, hashes=f"{message_hash_list[node.type()][0]},{message_hash_list[node.type()][4]}", page_size=50
)
assert len(store_response.messages) == 2
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on multiple hashes"
assert store_response.message_hash(1) == message_hash_list[4], "Incorrect messaged filtered based on multiple hashes"
assert store_response.message_hash(0) == message_hash_list[node.type()][0], "Incorrect messaged filtered based on multiple hashes"
assert store_response.message_hash(1) == message_hash_list[node.type()][4], "Incorrect messaged filtered based on multiple hashes"
def test_store_with_wrong_hash(self):
for i in range(4):
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
wrong_hash = self.compute_message_hash(self.test_pubsub_topic, self.create_message(payload=to_base64("test")))
wrong_hash = {}
wrong_hash["nwaku"] = self.compute_message_hash(self.test_pubsub_topic, self.create_message(payload=to_base64("test")), hash_type="hex")
wrong_hash["gowaku"] = self.compute_message_hash(self.test_pubsub_topic, self.create_message(payload=to_base64("test")), hash_type="base64")
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, hashes=wrong_hash, page_size=50)
store_response = self.get_messages_from_store(node, hashes=wrong_hash[node.type()], page_size=50)
assert not store_response.messages, "Messages found"
def test_store_with_invalid_hash(self):
@ -91,17 +97,19 @@ class TestHashes(StepsStore):
# Test the behavior when you supply an empty hash alongside valid hashes.
def test_store_with_empty_and_valid_hash(self):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
for i in range(4):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
empty_hash = ""
for node in self.store_nodes:
try:
# Combining valid hash with an empty hash
store_response = self.get_messages_from_store(node, hashes=f"{message_hash_list[0]},{empty_hash}", page_size=50)
store_response = self.get_messages_from_store(node, hashes=f"{message_hash_list[node.type()][0]},{empty_hash}", page_size=50)
assert len(store_response.messages) == 1, "Message count mismatch with empty and valid hashes"
except Exception as ex:
assert "waku message hash parsing error" in str(ex), "Unexpected error for combined empty and valid hash"
@ -121,18 +129,19 @@ class TestHashes(StepsStore):
# Test when duplicate valid hashes are provided.
def test_store_with_duplicate_hashes(self):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
for i in range(4):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
# Use the same hash twice
duplicate_hash = f"{message_hash_list[0]},{message_hash_list[0]}"
for node in self.store_nodes:
# Use the same hash twice
duplicate_hash = f"{message_hash_list[node.type()][0]},{message_hash_list[node.type()][0]}"
store_response = self.get_messages_from_store(node, hashes=duplicate_hash, page_size=50)
assert len(store_response.messages) == 1, "Expected only one message for duplicate hashes"
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect message returned for duplicate hashes"
assert store_response.message_hash(0) == message_hash_list[node.type()][0], "Incorrect message returned for duplicate hashes"
# Invalid Query Parameter (hash) for Hashes
def test_invalid_hash_param(self):
@ -145,7 +154,8 @@ class TestHashes(StepsStore):
for node in self.store_nodes:
# Step 1: Request messages with the correct 'hashes' parameter
correct_hash = self.compute_message_hash(self.test_pubsub_topic, published_messages[2])
hash_type = "hex" if node.is_nwaku() else "base64"
correct_hash = self.compute_message_hash(self.test_pubsub_topic, published_messages[2], hash_type=hash_type)
store_response_valid = self.get_messages_from_store(node, hashes=correct_hash)
assert store_response_valid.status_code == 200, "Expected 200 response with correct 'hashes' parameter"
@ -167,6 +177,4 @@ class TestHashes(StepsStore):
expected_hashes = []
returned_hashes = []
print("expected_hashes: ", expected_hashes)
print("returned_hashes: ", returned_hashes)
assert set(returned_hashes) == set(expected_hashes), "Returned message hashes do not match the expected hashes"

View File

@ -10,32 +10,34 @@ logger = get_custom_logger(__name__)
class TestSorting(StepsStore):
@pytest.mark.parametrize("ascending", ["true", "false"])
def test_store_sort_ascending(self, ascending):
expected_message_hash_list = []
expected_message_hash_list = {"nwaku": [], "gowaku": []}
for i in range(10):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
expected_message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
expected_message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, page_size=5, ascending=ascending)
response_message_hash_list = []
for index in range(len(store_response.messages)):
response_message_hash_list.append(store_response.message_hash(index))
if ascending == "true":
assert response_message_hash_list == expected_message_hash_list[:5], "Message hash mismatch for acending order"
assert response_message_hash_list == expected_message_hash_list[node.type()][:5], "Message hash mismatch for acending order"
else:
assert response_message_hash_list == expected_message_hash_list[5:], "Message hash mismatch for descending order"
assert response_message_hash_list == expected_message_hash_list[node.type()][5:], "Message hash mismatch for descending order"
def test_store_invalid_ascending(self):
expected_message_hash_list = []
expected_message_hash_list = {"nwaku": [], "gowaku": []}
ascending = "##"
for i in range(4):
message = self.create_message(payload=to_base64(f"Message_{i}"))
self.publish_message(message=message)
expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
expected_message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
expected_message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
logger.debug(f"requesting stored messages with invalid ascending ={ascending}")
for node in self.store_nodes:
store_response = self.get_messages_from_store(node, ascending=ascending, page_size=2)
response_message_hash_list = []
for index in range(len(store_response.messages)):
response_message_hash_list.append(store_response.message_hash(index))
assert response_message_hash_list == expected_message_hash_list[:2], "pages aren't forward as expected"
assert response_message_hash_list == expected_message_hash_list[node.type()][:2], "pages aren't forward as expected"

View File

@ -39,12 +39,13 @@ class TestTimeFilter(StepsStore):
assert not success_timestamps, f"Timestamps succeeded: {success_timestamps}"
def test_time_filter_matches_one_message(self):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
ts_pass = self.get_time_list_pass()
for timestamp in ts_pass:
message = self.create_message(timestamp=timestamp["value"])
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
for node in self.store_nodes:
store_response = self.get_messages_from_store(
node,
@ -53,15 +54,16 @@ class TestTimeFilter(StepsStore):
end_time=ts_pass[0]["value"] + 100000,
)
assert len(store_response.messages) == 1, "Message count mismatch"
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on time"
assert store_response.message_hash(0) == message_hash_list[node.type()][0], "Incorrect messaged filtered based on time"
def test_time_filter_matches_multiple_messages(self):
message_hash_list = []
message_hash_list = {"nwaku": [], "gowaku": []}
ts_pass = self.get_time_list_pass()
for timestamp in ts_pass:
message = self.create_message(timestamp=timestamp["value"])
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
for node in self.store_nodes:
store_response = self.get_messages_from_store(
node,
@ -71,15 +73,13 @@ class TestTimeFilter(StepsStore):
)
assert len(store_response.messages) == 5, "Message count mismatch"
for i in range(5):
assert store_response.message_hash(i) == message_hash_list[i], f"Incorrect messaged filtered based on time at index {i}"
assert store_response.message_hash(i) == message_hash_list[node.type()][i], f"Incorrect messaged filtered based on time at index {i}"
def test_time_filter_matches_no_message(self):
message_hash_list = []
ts_pass = self.get_time_list_pass()
for timestamp in ts_pass:
message = self.create_message(timestamp=timestamp["value"])
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
for node in self.store_nodes:
store_response = self.get_messages_from_store(
node,
@ -90,12 +90,10 @@ class TestTimeFilter(StepsStore):
assert not store_response.messages, "Message count mismatch"
def test_time_filter_start_time_equals_end_time(self):
message_hash_list = []
ts_pass = self.get_time_list_pass()
for timestamp in ts_pass:
message = self.create_message(timestamp=timestamp["value"])
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
for node in self.store_nodes:
try:
self.get_messages_from_store(

View File

@ -12,11 +12,12 @@ logger = get_custom_logger(__name__)
class TestTopics(StepsStore):
@pytest.fixture(scope="function", autouse=True)
def topics_setup(self, node_setup):
self.message_hash_list = []
self.message_hash_list = {"nwaku": [], "gowaku": []}
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
message = self.create_message(contentTopic=content_topic)
self.publish_message(message=message)
self.message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
self.message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
self.message_hash_list["gowaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="base64"))
def test_store_with_one_content_topic(self):
for node in self.store_nodes:
@ -24,7 +25,7 @@ class TestTopics(StepsStore):
store_response = node.get_store_messages(content_topics=content_topic, page_size=20, ascending="true")
assert len(store_response["messages"]) == 1, "Message count mismatch"
assert (
store_response["messages"][0]["messageHash"] == self.message_hash_list[index]
store_response["messages"][0]["messageHash"] == self.message_hash_list[node.type()][index]
), "Incorrect messaged filtered based on content topic"
def test_store_with_multiple_content_topics(self):
@ -34,10 +35,10 @@ class TestTopics(StepsStore):
)
assert len(store_response["messages"]) == 2, "Message count mismatch"
assert (
store_response["messages"][0]["messageHash"] == self.message_hash_list[0]
store_response["messages"][0]["messageHash"] == self.message_hash_list[node.type()][0]
), "Incorrect messaged filtered based on multiple content topics"
assert (
store_response["messages"][1]["messageHash"] == self.message_hash_list[4]
store_response["messages"][1]["messageHash"] == self.message_hash_list[node.type()][4]
), "Incorrect messaged filtered based on multiple content topics"
def test_store_with_unknown_content_topic(self):
@ -58,7 +59,7 @@ class TestTopics(StepsStore):
)
assert len(store_response["messages"]) == 1, "Message count mismatch"
assert (
store_response["messages"][0]["messageHash"] == self.message_hash_list[index]
store_response["messages"][0]["messageHash"] == self.message_hash_list[node.type()][index]
), "Incorrect messaged filtered based on content topic"
def test_store_with_unknown_pubsub_topic_but_known_content_topic(self):
@ -77,7 +78,7 @@ class TestTopics(StepsStore):
)
assert len(store_response["messages"]) == 1, "Message count mismatch"
assert (
store_response["messages"][0]["messageHash"] == self.message_hash_list[index]
store_response["messages"][0]["messageHash"] == self.message_hash_list[node.type()][index]
), "Incorrect messaged filtered based on content topic"
def test_store_without_pubsub_topic_and_content_topic(self):

View File

@ -365,7 +365,7 @@ class TestStoreSync(StepsStore):
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
expected_message_hash_list = []
expected_message_hash_list = {"nwaku": [], "gowaku": []}
for _ in range(500): # total 1500 messages
messages = [self.create_message() for _ in range(3)]
@ -373,7 +373,10 @@ class TestStoreSync(StepsStore):
for i, node in enumerate([self.node1, self.node2, self.node3]):
self.publish_message(sender=node, via="relay", message=messages[i], message_propagation_delay=0.01)
expected_message_hash_list.extend([self.compute_message_hash(self.test_pubsub_topic, msg) for msg in messages])
expected_message_hash_list["nwaku"].extend([self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex") for msg in messages])
expected_message_hash_list["gowaku"].extend(
[self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="base64") for msg in messages]
)
delay(5) # wait for the sync to finish
@ -385,8 +388,8 @@ class TestStoreSync(StepsStore):
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
for index in range(len(store_response.messages)):
response_message_hash_list.append(store_response.message_hash(index))
assert len(expected_message_hash_list) == len(response_message_hash_list), "Message count mismatch"
assert expected_message_hash_list == response_message_hash_list, "Message hash mismatch"
assert len(expected_message_hash_list[node.type()]) == len(response_message_hash_list), "Message count mismatch"
assert expected_message_hash_list[node.type()] == response_message_hash_list, "Message hash mismatch"
def test_large_message_payload_sync(self):
self.node1.start(store="true", relay="true")