store tests

This commit is contained in:
Florin Barbu 2024-05-17 15:49:36 +03:00
parent 61b8e505cb
commit bd5c541d3f
No known key found for this signature in database
GPG Key ID: 593D6DBC6D9E5095
9 changed files with 217 additions and 142 deletions

View File

@ -93,29 +93,29 @@ class REST(BaseClient):
return get_messages_response.json()
def get_store_messages(
self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs
self, peer_addr, include_data, pubsub_topic, content_topics, start_time, end_time, hashes, cursor, page_size, ascending, store_v, **kwargs
):
base_url = f"store/{store_v}/messages"
params = []
if peerAddr is not None:
params.append(f"peerAddr={quote(peerAddr, safe='')}")
if includeData is not None:
params.append(f"includeData={includeData}")
if pubsubTopic is not None:
params.append(f"pubsubTopic={quote(pubsubTopic, safe='')}")
if contentTopics is not None:
params.append(f"contentTopics={quote(contentTopics, safe='')}")
if startTime is not None:
params.append(f"startTime={startTime}")
if endTime is not None:
params.append(f"endTime={endTime}")
if peer_addr is not None:
params.append(f"peerAddr={quote(peer_addr, safe='')}")
if include_data is not None:
params.append(f"includeData={include_data}")
if pubsub_topic is not None:
params.append(f"pubsubTopic={quote(pubsub_topic, safe='')}")
if content_topics is not None:
params.append(f"contentTopics={quote(content_topics, safe='')}")
if start_time is not None:
params.append(f"startTime={start_time}")
if end_time is not None:
params.append(f"endTime={end_time}")
if hashes is not None:
params.append(f"hashes={quote(hashes, safe='')}")
if cursor is not None:
params.append(f"cursor={quote(cursor, safe='')}")
if pageSize is not None:
params.append(f"pageSize={pageSize}")
if page_size is not None:
params.append(f"pageSize={page_size}")
if ascending is not None:
params.append(f"ascending={ascending}")

View File

@ -292,29 +292,29 @@ class WakuNode:
def get_store_messages(
self,
peerAddr=None,
includeData=None,
pubsubTopic=None,
contentTopics=None,
startTime=None,
endTime=None,
peer_addr=None,
include_data=None,
pubsub_topic=None,
content_topics=None,
start_time=None,
end_time=None,
hashes=None,
cursor=None,
pageSize=None,
page_size=None,
ascending=None,
store_v=None,
**kwargs,
):
return self._api.get_store_messages(
peerAddr=peerAddr,
includeData=includeData,
pubsubTopic=pubsubTopic,
contentTopics=contentTopics,
startTime=startTime,
endTime=endTime,
peer_addr=peer_addr,
include_data=include_data,
pubsub_topic=pubsub_topic,
content_topics=content_topics,
start_time=start_time,
end_time=end_time,
hashes=hashes,
cursor=cursor,
pageSize=pageSize,
page_size=page_size,
ascending=ascending,
store_v=store_v,
**kwargs,

View File

@ -29,6 +29,12 @@ class StepsStore(StepsCommon):
self.optional_nodes = []
self.multiaddr_list = []
@pytest.fixture(scope="function", autouse=False)
def node_setup(self, store_setup):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
@allure.step
def start_publishing_node(self, image, node_index, **kwargs):
node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}")
@ -110,41 +116,45 @@ class StepsStore(StepsCommon):
node.set_filter_subscriptions(subscription)
@allure.step
def publish_message_via(self, type, pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None):
def publish_message(self, via="relay", pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None):
self.message = self.create_message() if message is None else message
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
if not sender:
sender = self.publishing_node1
if type == "relay":
if via == "relay":
logger.debug("Relaying message")
sender.send_relay_message(self.message, pubsub_topic)
elif type == "lightpush":
elif via == "lightpush":
payload = self.create_payload(pubsub_topic, self.message)
sender.send_light_push_message(payload)
delay(message_propagation_delay)
return self.message
@allure.step
def check_published_message_is_stored(
self,
store_node=None,
peerAddr=None,
includeData=None,
pubsubTopic=None,
contentTopics=None,
startTime=None,
endTime=None,
peer_addr=None,
include_data=None,
pubsub_topic=None,
content_topics=None,
start_time=None,
end_time=None,
hashes=None,
cursor=None,
pageSize=None,
page_size=None,
ascending=None,
store_v="v3",
message_to_check=None,
**kwargs,
):
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
if message_to_check is None:
message_to_check = self.message
if store_node is None:
store_node = self.store_nodes
if pubsubTopic is None:
pubsubTopic = self.test_pubsub_topic
elif not isinstance(store_node, list):
store_node = [store_node]
else:
@ -152,15 +162,15 @@ class StepsStore(StepsCommon):
for node in store_node:
logger.debug(f"Checking that peer {node.image} can find the stored message")
self.store_response = node.get_store_messages(
peerAddr=peerAddr,
includeData=includeData,
pubsubTopic=pubsubTopic,
contentTopics=contentTopics,
startTime=startTime,
endTime=endTime,
peer_addr=peer_addr,
include_data=include_data,
pubsub_topic=pubsub_topic,
content_topics=content_topics,
start_time=start_time,
end_time=end_time,
hashes=hashes,
cursor=cursor,
pageSize=pageSize,
page_size=page_size,
ascending=ascending,
store_v=store_v,
**kwargs,
@ -174,9 +184,9 @@ class StepsStore(StepsCommon):
self.store_response["messages"][store_message_index:], schema=MessageRpcResponseStore if node.is_nwaku() else MessageRpcResponse
)
if store_v == "v1":
waku_message.assert_received_message(self.message)
waku_message.assert_received_message(message_to_check)
else:
expected_hash = self.compute_message_hash(pubsubTopic, self.message)
expected_hash = self.compute_message_hash(pubsub_topic, message_to_check)
assert (
expected_hash == self.store_response["messages"][store_message_index]["message_hash"]["data"]
), f"Message hash returned by store doesn't match the computed message hash {expected_hash}"
@ -186,7 +196,7 @@ class StepsStore(StepsCommon):
if not pubsub_topic:
pubsub_topic = self.test_pubsub_topic
try:
self.check_published_message_is_stored(pubsubTopic=pubsub_topic, pageSize=5, ascending="true")
self.check_published_message_is_stored(pubsubTopic=pubsub_topic, page_size=5, ascending="true")
except Exception as ex:
assert "couldn't find any messages" in str(ex)

View File

@ -0,0 +1,13 @@
import pytest
from src.libs.common import to_base64
from src.steps.store import StepsStore
@pytest.mark.usefixtures("node_setup")
class TestCursor(StepsStore):
def test_get_multiple_2000_store_messages(self):
for i in range(110):
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")), message_propagation_delay=0.001)
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, page_size=50, ascending="true", store_v="v3")
print(len(store_response["messages"]))

View File

@ -0,0 +1,28 @@
import pytest
from src.libs.custom_logger import get_custom_logger
from src.libs.common import to_base64
from src.steps.store import StepsStore
from src.test_data import SAMPLE_INPUTS, VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__)
@pytest.mark.usefixtures("node_setup")
class TestEphemeral(StepsStore):
def test_message_with_ephemeral_true(self):
self.publish_message(message=self.create_message(ephemeral=True))
self.check_store_returns_empty_response()
def test_message_with_ephemeral_false(self):
self.publish_message(message=self.create_message(ephemeral=False))
self.check_published_message_is_stored(page_size=5, ascending="true")
def test_message_with_both_ephemeral_true_and_false(self):
self.publish_message(message=self.create_message(ephemeral=True))
stored = self.publish_message(message=self.create_message(ephemeral=False))
self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored)
assert len(self.store_response["messages"]) == 1
stored = self.publish_message(message=self.create_message(ephemeral=False))
self.publish_message(message=self.create_message(ephemeral=True))
self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored)
assert len(self.store_response["messages"]) == 2

View File

@ -6,20 +6,15 @@ from src.test_data import SAMPLE_INPUTS, VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__)
# TO DO test without pubsubtopic freezes
# TO DO test without pubsubtopic freezes
@pytest.mark.usefixtures("node_setup")
class TestGetMessages(StepsStore):
@pytest.fixture(scope="function", autouse=True)
def node_setup(self, store_setup):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
# only one test for store v1, all other tests are using the new store v3
def test_legacy_store_v1(self):
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true", store_v="v1")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true", store_v="v1")
def test_get_store_messages_with_different_payloads(self):
failed_payloads = []
@ -27,24 +22,13 @@ class TestGetMessages(StepsStore):
logger.debug(f'Running test with payload {payload["description"]}')
message = self.create_message(payload=to_base64(payload["value"]))
try:
self.publish_message_via("relay", message=message)
self.check_published_message_is_stored(pageSize=50, ascending="true")
self.publish_message(message=message)
self.check_published_message_is_stored(page_size=50, ascending="true")
except Exception as e:
logger.error(f'Payload {payload["description"]} failed: {str(e)}')
failed_payloads.append(payload["description"])
assert not failed_payloads, f"Payloads failed: {failed_payloads}"
def test_get_multiple_store_messages(self):
message_hash_list = []
for payload in SAMPLE_INPUTS:
message = self.create_message(payload=to_base64(payload["value"]))
self.publish_message_via("relay", message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true", store_v="v3")
assert len(store_response["messages"]) == len(SAMPLE_INPUTS)
for index, message_hash in enumerate(store_response["messages"]):
assert message_hash["message_hash"]["data"] == message_hash_list[index], f"Message hash at index {index} doesn't match"
assert len(self.store_response["messages"]) == len(SAMPLE_INPUTS)
def test_get_store_messages_with_different_content_topics(self):
failed_content_topics = []
@ -52,12 +36,13 @@ class TestGetMessages(StepsStore):
logger.debug(f'Running test with content topic {content_topic["description"]}')
message = self.create_message(contentTopic=content_topic["value"])
try:
self.publish_message_via("relay", message=message)
self.check_published_message_is_stored(pageSize=50, ascending="true")
self.publish_message(message=message)
self.check_published_message_is_stored(page_size=50, ascending="true")
except Exception as e:
logger.error(f'ContentTopic {content_topic["description"]} failed: {str(e)}')
failed_content_topics.append(content_topic)
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
assert len(self.store_response["messages"]) == len(SAMPLE_INPUTS)
def test_get_store_messages_with_different_pubsub_topics(self):
self.subscribe_to_pubsub_topics_via_relay(pubsub_topics=VALID_PUBSUB_TOPICS)
@ -65,8 +50,8 @@ class TestGetMessages(StepsStore):
for pubsub_topic in VALID_PUBSUB_TOPICS:
logger.debug(f"Running test with pubsub topic {pubsub_topic}")
try:
self.publish_message_via("relay", pubsub_topic=pubsub_topic)
self.check_published_message_is_stored(pubsubTopic=pubsub_topic, pageSize=50, ascending="true")
self.publish_message(pubsub_topic=pubsub_topic)
self.check_published_message_is_stored(pubsub_topic=pubsub_topic, page_size=50, ascending="true")
except Exception as e:
logger.error(f"PubsubTopic pubsub_topic failed: {str(e)}")
failed_pubsub_topics.append(pubsub_topic)
@ -74,13 +59,30 @@ class TestGetMessages(StepsStore):
def test_get_store_message_with_meta(self):
message = self.create_message(meta=to_base64(self.test_payload))
self.publish_message_via("relay", message=message)
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message(message=message)
self.check_published_message_is_stored(page_size=5, ascending="true")
def test_get_store_message_with_version(self):
message = self.create_message(version=10)
self.publish_message(message=message)
self.check_published_message_is_stored(page_size=5, ascending="true")
def test_get_store_duplicate_messages(self):
message = self.create_message()
self.publish_message_via("relay", message=message)
self.publish_message_via("relay", message=message)
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message(message=message)
self.publish_message(message=message)
self.check_published_message_is_stored(page_size=5, ascending="true")
# only one message is stored
assert len(self.store_response["messages"]) == 1
def test_get_multiple_store_messages(self):
message_hash_list = []
for payload in SAMPLE_INPUTS:
message = self.create_message(payload=to_base64(payload["value"]))
self.publish_message(message=message)
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, page_size=50, ascending="true", store_v="v3")
assert len(store_response["messages"]) == len(SAMPLE_INPUTS)
for index, message_hash in enumerate(store_response["messages"]):
assert message_hash["message_hash"]["data"] == message_hash_list[index], f"Message hash at index {index} doesn't match"

View File

@ -1,4 +1,3 @@
import pytest
from src.libs.custom_logger import get_custom_logger
from src.libs.common import delay
from src.steps.store import StepsStore
@ -7,102 +6,98 @@ logger = get_custom_logger(__name__)
class TestReliability(StepsStore):
@pytest.fixture(scope="function", autouse=False)
def node_setup(self, store_setup):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
def test_publishing_node_is_stopped(self, node_setup):
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
self.publishing_node1.stop()
store_response = self.store_node1.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3")
store_response = self.store_node1.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3")
assert len(store_response["messages"]) == 1
def test_publishing_node_restarts(self, node_setup):
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
self.publishing_node1.restart()
self.publishing_node1.ensure_ready()
self.add_node_peer(self.store_node1, self.multiaddr_list)
self.subscribe_to_pubsub_topics_via_relay(node=self.publishing_node1)
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3")
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3")
assert len(store_response["messages"]) == 2
def test_store_node_restarts(self, node_setup):
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
self.store_node1.restart()
self.store_node1.ensure_ready()
self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1)
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3")
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3")
assert len(store_response["messages"]) == 2
def test_publishing_node_paused_and_unpaused(self, node_setup):
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
self.publishing_node1.pause()
delay(1)
self.publishing_node1.unpause()
self.publishing_node1.ensure_ready()
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3")
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3")
assert len(store_response["messages"]) == 2
def test_store_node_paused_and_unpaused(self, node_setup):
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
self.store_node1.pause()
delay(1)
self.store_node1.unpause()
self.store_node1.ensure_ready()
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3")
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3")
assert len(store_response["messages"]) == 2
def test_message_relayed_while_store_node_is_paused(self, node_setup):
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
self.store_node1.pause()
self.publish_message_via("relay")
self.publish_message()
self.store_node1.unpause()
self.store_node1.ensure_ready()
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.check_published_message_is_stored(page_size=5, ascending="true")
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3")
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3")
assert len(store_response["messages"]) == 2
def test_message_relayed_while_store_node_is_stopped(self, node_setup):
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
self.store_node1.stop()
self.publish_message_via("relay")
self.publish_message()
self.store_node1.start()
self.store_node1.ensure_ready()
self.add_node_peer(self.store_node1, self.multiaddr_list)
self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1)
delay(1)
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.check_published_message_is_stored(page_size=5, ascending="true")
for node in self.store_nodes:
store_response = node.get_store_messages(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true", store_v="v3")
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v3")
assert len(store_response["messages"]) == 2
def test_message_relayed_before_store_node_is_started(self):
self.setup_first_publishing_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.publish_message()
self.setup_first_store_node(store="true", relay="true")
delay(10)
self.subscribe_to_pubsub_topics_via_relay()
delay(1)
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.add_node_peer(self.store_node1, self.multiaddr_list)
delay(10)
self.check_published_message_is_stored(page_size=5, ascending="true")

View File

@ -6,51 +6,51 @@ class TestRunningNodes(StepsStore):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
def test_main_node_relay_and_store__peer_only_store(self):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="true", relay="false")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.publish_message()
self.check_store_returns_empty_response()
def test_main_node_relay_and_store__peer_only_relay(self):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="false", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
def test_main_node_relay_and_store__peer_neither_relay_nor_store(self):
self.setup_first_publishing_node(store="true", relay="true")
self.setup_first_store_node(store="false", relay="false")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
def test_main_node_only_relay__peer_relay_and_store(self):
self.setup_first_publishing_node(store="false", relay="true")
self.setup_first_store_node(store="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")
def test_main_node_only_relay__peer_only_store(self):
self.setup_first_publishing_node(store="false", relay="true")
self.setup_first_store_node(store="true", relay="false")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.publish_message()
self.check_store_returns_empty_response()
def test_main_node_only_relay__peer_only_relay(self):
self.setup_first_publishing_node(store="false", relay="true")
self.setup_first_store_node(store="false", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.publish_message()
try:
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.check_published_message_is_stored(page_size=5, ascending="true")
except Exception as ex:
assert "failed to negotiate protocol: protocols not supported" in str(ex) or "PEER_DIAL_FAILURE" in str(ex)
@ -58,12 +58,12 @@ class TestRunningNodes(StepsStore):
self.setup_first_publishing_node(store="true", relay="true", lightpush="true")
self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0])
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("lightpush", sender=self.store_node1)
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message(via="lightpush", sender=self.store_node1)
self.check_published_message_is_stored(page_size=5, ascending="true")
def test_store_with_filter(self):
self.setup_first_publishing_node(store="true", relay="true", filter="true")
self.setup_first_store_node(store="false", relay="false", filter="true")
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message_via("relay")
self.check_published_message_is_stored(pageSize=5, ascending="true")
self.publish_message()
self.check_published_message_is_stored(page_size=5, ascending="true")

View File

@ -0,0 +1,27 @@
import pytest
from datetime import timedelta, datetime
from src.libs.custom_logger import get_custom_logger
from src.steps.store import StepsStore
logger = get_custom_logger(__name__)
@pytest.mark.usefixtures("node_setup")
class TestTimeFilter(StepsStore):
def test_messages_with_timestamps_close_to_now(self):
failed_timestamps = []
sample_ts = [
int((datetime.now() - timedelta(seconds=2)).timestamp() * 1e9),
int((datetime.now() + timedelta(seconds=2)).timestamp() * 1e9),
int((datetime.now() + timedelta(seconds=10)).timestamp() * 1e9),
]
for timestamp in sample_ts:
logger.debug(f"Running test with timestamp {timestamp}")
message = self.create_message(timestamp=timestamp)
try:
self.publish_message(message=message)
self.check_published_message_is_stored(page_size=5, ascending="true")
except Exception as ex:
logger.error(f"Timestamp {timestamp} failed: {str(ex)}")
failed_timestamps.append(timestamp)
assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}"