Test/store v3 (#38)
* first commit * get message tests 1 * store tests with node restarts * reliability tests * store tests * fixes * more tests * fixes * new store tests * new store tests * adjustments * adjust tests for go-waku * postgress test * postgress test * fixes * small fixes * reliability updates * postgress test * adjsutmens for go-waku
This commit is contained in:
parent
39a6d13276
commit
03973b8897
|
@ -2,6 +2,7 @@
|
||||||
log/
|
log/
|
||||||
.vscode
|
.vscode
|
||||||
allure-results/
|
allure-results/
|
||||||
|
postgresql
|
||||||
|
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
__pycache__/
|
__pycache__/
|
||||||
|
|
|
@ -29,6 +29,7 @@ pytest-rerunfailures==13.0
|
||||||
pytest-timeout==2.2.0
|
pytest-timeout==2.2.0
|
||||||
pytest-xdist==3.5.0
|
pytest-xdist==3.5.0
|
||||||
python-dotenv==1.0.1
|
python-dotenv==1.0.1
|
||||||
|
pytest-dependency==0.6.0
|
||||||
PyYAML==6.0.1
|
PyYAML==6.0.1
|
||||||
requests==2.31.0
|
requests==2.31.0
|
||||||
setuptools==69.0.3
|
setuptools==69.0.3
|
||||||
|
|
|
@ -29,6 +29,8 @@ RUNNING_IN_CI = get_env_var("CI")
|
||||||
NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68")
|
NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68")
|
||||||
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20)
|
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20)
|
||||||
RLN_CREDENTIALS = get_env_var("RLN_CREDENTIALS")
|
RLN_CREDENTIALS = get_env_var("RLN_CREDENTIALS")
|
||||||
|
PG_USER = get_env_var("POSTGRES_USER", "postgres")
|
||||||
|
PG_PASS = get_env_var("POSTGRES_PASSWORD", "test123")
|
||||||
|
|
||||||
# example for .env file
|
# example for .env file
|
||||||
# RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "wss://sepolia.infura.io/ws/v3/api_key", "rln-relay-eth-contract-address": "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4", "rln-relay-eth-private-key-1": "1111111111111111111111111111111111111111111111111111111111111111", "rln-relay-eth-private-key-2": "1111111111111111111111111111111111111111111111111111111111111111"}
|
# RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "wss://sepolia.infura.io/ws/v3/api_key", "rln-relay-eth-contract-address": "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4", "rln-relay-eth-private-key-1": "1111111111111111111111111111111111111111111111111111111111111111", "rln-relay-eth-private-key-2": "1111111111111111111111111111111111111111111111111111111111111111"}
|
||||||
|
|
|
@ -93,29 +93,29 @@ class REST(BaseClient):
|
||||||
return get_messages_response.json()
|
return get_messages_response.json()
|
||||||
|
|
||||||
def get_store_messages(
|
def get_store_messages(
|
||||||
self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs
|
self, peer_addr, include_data, pubsub_topic, content_topics, start_time, end_time, hashes, cursor, page_size, ascending, store_v, **kwargs
|
||||||
):
|
):
|
||||||
base_url = f"store/{store_v}/messages"
|
base_url = f"store/{store_v}/messages"
|
||||||
params = []
|
params = []
|
||||||
|
|
||||||
if peerAddr is not None:
|
if peer_addr is not None:
|
||||||
params.append(f"peerAddr={quote(peerAddr, safe='')}")
|
params.append(f"peerAddr={quote(peer_addr, safe='')}")
|
||||||
if includeData is not None:
|
if include_data is not None:
|
||||||
params.append(f"includeData={includeData}")
|
params.append(f"includeData={include_data}")
|
||||||
if pubsubTopic is not None:
|
if pubsub_topic is not None:
|
||||||
params.append(f"pubsubTopic={quote(pubsubTopic, safe='')}")
|
params.append(f"pubsubTopic={quote(pubsub_topic, safe='')}")
|
||||||
if contentTopics is not None:
|
if content_topics is not None:
|
||||||
params.append(f"contentTopics={quote(contentTopics, safe='')}")
|
params.append(f"contentTopics={quote(content_topics, safe='')}")
|
||||||
if startTime is not None:
|
if start_time is not None:
|
||||||
params.append(f"startTime={startTime}")
|
params.append(f"startTime={start_time}")
|
||||||
if endTime is not None:
|
if end_time is not None:
|
||||||
params.append(f"endTime={endTime}")
|
params.append(f"endTime={end_time}")
|
||||||
if hashes is not None:
|
if hashes is not None:
|
||||||
params.append(f"hashes={quote(hashes, safe='')}")
|
params.append(f"hashes={quote(hashes, safe='')}")
|
||||||
if cursor is not None:
|
if cursor is not None:
|
||||||
params.append(f"cursor={quote(cursor, safe='')}")
|
params.append(f"cursor={quote(cursor, safe='')}")
|
||||||
if pageSize is not None:
|
if page_size is not None:
|
||||||
params.append(f"pageSize={pageSize}")
|
params.append(f"pageSize={page_size}")
|
||||||
if ascending is not None:
|
if ascending is not None:
|
||||||
params.append(f"ascending={ascending}")
|
params.append(f"ascending={ascending}")
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ class DockerManager:
|
||||||
logger.debug(f"Network {network_name} created")
|
logger.debug(f"Network {network_name} created")
|
||||||
return network
|
return network
|
||||||
|
|
||||||
def start_container(self, image_name, ports, args, log_path, container_ip, volumes):
|
def start_container(self, image_name, ports, args, log_path, container_ip, volumes, remove_container=True):
|
||||||
cli_args = []
|
cli_args = []
|
||||||
for key, value in args.items():
|
for key, value in args.items():
|
||||||
if isinstance(value, list): # Check if value is a list
|
if isinstance(value, list): # Check if value is a list
|
||||||
|
@ -46,7 +46,7 @@ class DockerManager:
|
||||||
cli_args_str_for_log = " ".join(cli_args)
|
cli_args_str_for_log = " ".join(cli_args)
|
||||||
logger.debug(f"docker run -i -t {port_bindings_for_log} {image_name} {cli_args_str_for_log}")
|
logger.debug(f"docker run -i -t {port_bindings_for_log} {image_name} {cli_args_str_for_log}")
|
||||||
container = self._client.containers.run(
|
container = self._client.containers.run(
|
||||||
image_name, command=cli_args, ports=port_bindings, detach=True, remove=True, auto_remove=True, volumes=volumes
|
image_name, command=cli_args, ports=port_bindings, detach=True, remove=remove_container, auto_remove=remove_container, volumes=volumes
|
||||||
)
|
)
|
||||||
|
|
||||||
network = self._client.networks.get(NETWORK_NAME)
|
network = self._client.networks.get(NETWORK_NAME)
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
class StoreResponse:
|
||||||
|
def __init__(self, store_response, node):
|
||||||
|
self.response = store_response
|
||||||
|
self.node = node
|
||||||
|
|
||||||
|
@property
|
||||||
|
def request_id(self):
|
||||||
|
try:
|
||||||
|
if self.node.is_nwaku():
|
||||||
|
return self.response.get("requestId")
|
||||||
|
else:
|
||||||
|
return self.response.get("request_id")
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status_code(self):
|
||||||
|
try:
|
||||||
|
if self.node.is_nwaku():
|
||||||
|
return self.response.get("statusCode")
|
||||||
|
else:
|
||||||
|
return self.response.get("status_code")
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status_desc(self):
|
||||||
|
try:
|
||||||
|
if self.node.is_nwaku():
|
||||||
|
return self.response.get("statusDesc")
|
||||||
|
else:
|
||||||
|
return self.response.get("status_desc")
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def messages(self):
|
||||||
|
try:
|
||||||
|
return self.response.get("messages")
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pagination_cursor(self):
|
||||||
|
try:
|
||||||
|
if self.node.is_nwaku():
|
||||||
|
return self.response.get("paginationCursor")
|
||||||
|
else:
|
||||||
|
return self.response.get("pagination_cursor")
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def message_hash(self, index):
|
||||||
|
if self.messages is not None:
|
||||||
|
if self.node.is_nwaku():
|
||||||
|
return self.messages[index]["messageHash"]
|
||||||
|
else:
|
||||||
|
return self.messages[index]["message_hash"]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def message_payload(self, index):
|
||||||
|
try:
|
||||||
|
if self.messages is not None:
|
||||||
|
payload = self.messages[index]["message"]["payload"]
|
||||||
|
return payload
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
except IndexError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def message_at(self, index):
|
||||||
|
try:
|
||||||
|
if self.messages is not None:
|
||||||
|
message = self.messages[index]["message"]
|
||||||
|
return message
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
except IndexError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def message_pubsub_topic(self, index):
|
||||||
|
if self.messages is not None:
|
||||||
|
if self.node.is_nwaku():
|
||||||
|
return self.messages[index]["pubsubTopic"]
|
||||||
|
else:
|
||||||
|
return self.messages[index]["pubsub_topic"]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def resp_json(self):
|
||||||
|
return self.response
|
|
@ -13,6 +13,7 @@ class MessageRpcResponse:
|
||||||
timestamp: Optional[int]
|
timestamp: Optional[int]
|
||||||
ephemeral: Optional[bool]
|
ephemeral: Optional[bool]
|
||||||
meta: Optional[str]
|
meta: Optional[str]
|
||||||
|
rateLimitProof: Optional[str] = field(default_factory=dict)
|
||||||
rate_limit_proof: Optional[dict] = field(default_factory=dict)
|
rate_limit_proof: Optional[dict] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -100,6 +100,12 @@ class WakuNode:
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError("Not implemented for this node type")
|
raise NotImplementedError("Not implemented for this node type")
|
||||||
|
|
||||||
|
if "remove_container" in kwargs:
|
||||||
|
remove_container = kwargs["remove_container"]
|
||||||
|
del kwargs["remove_container"]
|
||||||
|
else:
|
||||||
|
remove_container = True
|
||||||
|
|
||||||
default_args.update(sanitize_docker_flags(kwargs))
|
default_args.update(sanitize_docker_flags(kwargs))
|
||||||
|
|
||||||
rln_args, rln_creds_set, keystore_path = self.parse_rln_credentials(default_args, False)
|
rln_args, rln_creds_set, keystore_path = self.parse_rln_credentials(default_args, False)
|
||||||
|
@ -116,7 +122,13 @@ class WakuNode:
|
||||||
logger.debug(f"Using volumes {self._volumes}")
|
logger.debug(f"Using volumes {self._volumes}")
|
||||||
|
|
||||||
self._container = self._docker_manager.start_container(
|
self._container = self._docker_manager.start_container(
|
||||||
self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip, self._volumes
|
self._docker_manager.image,
|
||||||
|
ports=self._ports,
|
||||||
|
args=default_args,
|
||||||
|
log_path=self._log_path,
|
||||||
|
container_ip=self._ext_ip,
|
||||||
|
volumes=self._volumes,
|
||||||
|
remove_container=remove_container,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}")
|
logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}")
|
||||||
|
@ -168,6 +180,10 @@ class WakuNode:
|
||||||
if self._container:
|
if self._container:
|
||||||
logger.debug(f"Stopping container with id {self._container.short_id}")
|
logger.debug(f"Stopping container with id {self._container.short_id}")
|
||||||
self._container.stop()
|
self._container.stop()
|
||||||
|
try:
|
||||||
|
self._container.remove()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
self._container = None
|
self._container = None
|
||||||
logger.debug("Container stopped.")
|
logger.debug("Container stopped.")
|
||||||
|
|
||||||
|
@ -291,18 +307,30 @@ class WakuNode:
|
||||||
return self._api.get_filter_messages(content_topic, pubsub_topic)
|
return self._api.get_filter_messages(content_topic, pubsub_topic)
|
||||||
|
|
||||||
def get_store_messages(
|
def get_store_messages(
|
||||||
self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs
|
self,
|
||||||
|
peer_addr=None,
|
||||||
|
include_data=None,
|
||||||
|
pubsub_topic=None,
|
||||||
|
content_topics=None,
|
||||||
|
start_time=None,
|
||||||
|
end_time=None,
|
||||||
|
hashes=None,
|
||||||
|
cursor=None,
|
||||||
|
page_size=None,
|
||||||
|
ascending=None,
|
||||||
|
store_v="v3",
|
||||||
|
**kwargs,
|
||||||
):
|
):
|
||||||
return self._api.get_store_messages(
|
return self._api.get_store_messages(
|
||||||
peerAddr=peerAddr,
|
peer_addr=peer_addr,
|
||||||
includeData=includeData,
|
include_data=include_data,
|
||||||
pubsubTopic=pubsubTopic,
|
pubsub_topic=pubsub_topic,
|
||||||
contentTopics=contentTopics,
|
content_topics=content_topics,
|
||||||
startTime=startTime,
|
start_time=start_time,
|
||||||
endTime=endTime,
|
end_time=end_time,
|
||||||
hashes=hashes,
|
hashes=hashes,
|
||||||
cursor=cursor,
|
cursor=cursor,
|
||||||
pageSize=pageSize,
|
page_size=page_size,
|
||||||
ascending=ascending,
|
ascending=ascending,
|
||||||
store_v=store_v,
|
store_v=store_v,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -393,3 +421,7 @@ class WakuNode:
|
||||||
raise NotImplementedError("Not implemented for type other than Nim Waku ")
|
raise NotImplementedError("Not implemented for type other than Nim Waku ")
|
||||||
|
|
||||||
return rln_args, True, keystore_path
|
return rln_args, True, keystore_path
|
||||||
|
|
||||||
|
@property
|
||||||
|
def container(self):
|
||||||
|
return self._container
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
import docker
|
||||||
|
import os
|
||||||
|
from src.env_vars import NETWORK_NAME, PG_PASS, PG_USER
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def start_postgres():
|
||||||
|
pg_env = {"POSTGRES_USER": PG_USER, "POSTGRES_PASSWORD": PG_PASS}
|
||||||
|
|
||||||
|
base_path = os.path.abspath(".")
|
||||||
|
volumes = {os.path.join(base_path, "postgresql"): {"bind": "/var/lib/postgresql/data", "mode": "Z"}}
|
||||||
|
|
||||||
|
client = docker.from_env()
|
||||||
|
|
||||||
|
postgres_container = client.containers.run(
|
||||||
|
"postgres:15.4-alpine3.18",
|
||||||
|
name="postgres",
|
||||||
|
environment=pg_env,
|
||||||
|
volumes=volumes,
|
||||||
|
command="postgres",
|
||||||
|
ports={"5432/tcp": ("127.0.0.1", 5432)},
|
||||||
|
restart_policy={"Name": "on-failure", "MaximumRetryCount": 5},
|
||||||
|
healthcheck={
|
||||||
|
"Test": ["CMD-SHELL", "pg_isready -U postgres -d postgres"],
|
||||||
|
"Interval": 30000000000, # 30 seconds in nanoseconds
|
||||||
|
"Timeout": 60000000000, # 60 seconds in nanoseconds
|
||||||
|
"Retries": 5,
|
||||||
|
"StartPeriod": 80000000000, # 80 seconds in nanoseconds
|
||||||
|
},
|
||||||
|
detach=True,
|
||||||
|
network_mode=NETWORK_NAME,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("Postgres container started")
|
||||||
|
|
||||||
|
return postgres_container
|
||||||
|
|
||||||
|
|
||||||
|
def stop_postgres(postgres_container):
|
||||||
|
postgres_container.stop()
|
||||||
|
postgres_container.remove()
|
||||||
|
logger.debug("Postgres container stopped and removed.")
|
|
@ -1,3 +1,5 @@
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
import inspect
|
import inspect
|
||||||
from time import time
|
from time import time
|
||||||
import allure
|
import allure
|
||||||
|
@ -34,3 +36,15 @@ class StepsCommon:
|
||||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||||
message.update(kwargs)
|
message.update(kwargs)
|
||||||
return message
|
return message
|
||||||
|
|
||||||
|
@allure.step
|
||||||
|
def compute_message_hash(self, pubsub_topic, msg):
|
||||||
|
ctx = hashlib.sha256()
|
||||||
|
ctx.update(pubsub_topic.encode("utf-8"))
|
||||||
|
ctx.update(base64.b64decode(msg["payload"]))
|
||||||
|
ctx.update(msg["contentTopic"].encode("utf-8"))
|
||||||
|
if "meta" in msg:
|
||||||
|
ctx.update(base64.b64decode(msg["meta"]))
|
||||||
|
ctx.update(int(msg["timestamp"]).to_bytes(8, byteorder="big"))
|
||||||
|
hash_bytes = ctx.digest()
|
||||||
|
return base64.b64encode(hash_bytes).decode("utf-8")
|
||||||
|
|
|
@ -1,14 +1,15 @@
|
||||||
import inspect
|
import inspect
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
from time import time
|
|
||||||
import pytest
|
import pytest
|
||||||
import allure
|
import allure
|
||||||
from src.libs.common import delay
|
from src.libs.common import delay
|
||||||
|
from src.node.store_response import StoreResponse
|
||||||
from src.node.waku_message import WakuMessage
|
from src.node.waku_message import WakuMessage
|
||||||
from src.env_vars import (
|
from src.env_vars import (
|
||||||
ADDITIONAL_NODES,
|
ADDITIONAL_NODES,
|
||||||
NODE_1,
|
NODE_1,
|
||||||
NODE_2,
|
NODE_2,
|
||||||
|
NODEKEY,
|
||||||
)
|
)
|
||||||
from src.node.waku_node import WakuNode
|
from src.node.waku_node import WakuNode
|
||||||
from src.steps.common import StepsCommon
|
from src.steps.common import StepsCommon
|
||||||
|
@ -29,6 +30,12 @@ class StepsStore(StepsCommon):
|
||||||
self.optional_nodes = []
|
self.optional_nodes = []
|
||||||
self.multiaddr_list = []
|
self.multiaddr_list = []
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function", autouse=False)
|
||||||
|
def node_setup(self, store_setup):
|
||||||
|
self.setup_first_publishing_node(store="true", relay="true")
|
||||||
|
self.setup_first_store_node(store="true", relay="true")
|
||||||
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def start_publishing_node(self, image, node_index, **kwargs):
|
def start_publishing_node(self, image, node_index, **kwargs):
|
||||||
node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}")
|
node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}")
|
||||||
|
@ -53,7 +60,7 @@ class StepsStore(StepsCommon):
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def setup_first_publishing_node(self, store="true", relay="true", **kwargs):
|
def setup_first_publishing_node(self, store="true", relay="true", **kwargs):
|
||||||
self.publishing_node1 = self.start_publishing_node(NODE_1, node_index=1, store=store, relay=relay, **kwargs)
|
self.publishing_node1 = self.start_publishing_node(NODE_1, node_index=1, store=store, relay=relay, nodekey=NODEKEY, **kwargs)
|
||||||
self.enr_uri = self.publishing_node1.get_enr_uri()
|
self.enr_uri = self.publishing_node1.get_enr_uri()
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
|
@ -110,37 +117,88 @@ class StepsStore(StepsCommon):
|
||||||
node.set_filter_subscriptions(subscription)
|
node.set_filter_subscriptions(subscription)
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def publish_message_via(self, type, pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None):
|
def publish_message(self, via="relay", pubsub_topic=None, message=None, message_propagation_delay=0.01, sender=None):
|
||||||
self.message = self.create_message() if message is None else message
|
self.message = self.create_message() if message is None else message
|
||||||
if pubsub_topic is None:
|
if pubsub_topic is None:
|
||||||
pubsub_topic = self.test_pubsub_topic
|
pubsub_topic = self.test_pubsub_topic
|
||||||
if not sender:
|
if not sender:
|
||||||
sender = self.publishing_node1
|
sender = self.publishing_node1
|
||||||
if type == "relay":
|
if via == "relay":
|
||||||
logger.debug("Relaying message")
|
logger.debug("Relaying message")
|
||||||
sender.send_relay_message(self.message, pubsub_topic)
|
sender.send_relay_message(self.message, pubsub_topic)
|
||||||
elif type == "lightpush":
|
elif via == "lightpush":
|
||||||
payload = self.create_payload(pubsub_topic, self.message)
|
payload = self.create_payload(pubsub_topic, self.message)
|
||||||
sender.send_light_push_message(payload)
|
sender.send_light_push_message(payload)
|
||||||
delay(message_propagation_delay)
|
delay(message_propagation_delay)
|
||||||
|
return self.message
|
||||||
|
|
||||||
|
def get_messages_from_store(
|
||||||
|
self,
|
||||||
|
node=None,
|
||||||
|
peer_addr=None,
|
||||||
|
include_data=None,
|
||||||
|
pubsub_topic=None,
|
||||||
|
content_topics=None,
|
||||||
|
start_time=None,
|
||||||
|
end_time=None,
|
||||||
|
hashes=None,
|
||||||
|
cursor=None,
|
||||||
|
page_size=None,
|
||||||
|
ascending="true",
|
||||||
|
store_v="v3",
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
if pubsub_topic is None:
|
||||||
|
pubsub_topic = self.test_pubsub_topic
|
||||||
|
if node.is_gowaku():
|
||||||
|
if content_topics is None:
|
||||||
|
content_topics = self.test_content_topic
|
||||||
|
if hashes is not None:
|
||||||
|
content_topics = None
|
||||||
|
pubsub_topic = None
|
||||||
|
peer_addr = self.multiaddr_list[0]
|
||||||
|
store_response = node.get_store_messages(
|
||||||
|
peer_addr=peer_addr,
|
||||||
|
include_data=include_data,
|
||||||
|
pubsub_topic=pubsub_topic,
|
||||||
|
content_topics=content_topics,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
hashes=hashes,
|
||||||
|
cursor=cursor,
|
||||||
|
page_size=page_size,
|
||||||
|
ascending=ascending,
|
||||||
|
store_v=store_v,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
store_response = StoreResponse(store_response, node)
|
||||||
|
assert store_response.request_id is not None, "Request id is missing"
|
||||||
|
assert store_response.status_code, "Status code is missing"
|
||||||
|
assert store_response.status_desc, "Status desc is missing"
|
||||||
|
return store_response
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def check_published_message_is_stored(
|
def check_published_message_is_stored(
|
||||||
self,
|
self,
|
||||||
store_node=None,
|
store_node=None,
|
||||||
peerAddr=None,
|
peer_addr=None,
|
||||||
includeData=None,
|
include_data=None,
|
||||||
pubsubTopic=None,
|
pubsub_topic=None,
|
||||||
contentTopics=None,
|
content_topics=None,
|
||||||
startTime=None,
|
start_time=None,
|
||||||
endTime=None,
|
end_time=None,
|
||||||
hashes=None,
|
hashes=None,
|
||||||
cursor=None,
|
cursor=None,
|
||||||
pageSize=None,
|
page_size=None,
|
||||||
ascending=None,
|
ascending=None,
|
||||||
store_v="v1",
|
store_v="v3",
|
||||||
|
message_to_check=None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
|
if pubsub_topic is None:
|
||||||
|
pubsub_topic = self.test_pubsub_topic
|
||||||
|
if message_to_check is None:
|
||||||
|
message_to_check = self.message
|
||||||
if store_node is None:
|
if store_node is None:
|
||||||
store_node = self.store_nodes
|
store_node = self.store_nodes
|
||||||
elif not isinstance(store_node, list):
|
elif not isinstance(store_node, list):
|
||||||
|
@ -149,33 +207,40 @@ class StepsStore(StepsCommon):
|
||||||
store_node = store_node
|
store_node = store_node
|
||||||
for node in store_node:
|
for node in store_node:
|
||||||
logger.debug(f"Checking that peer {node.image} can find the stored message")
|
logger.debug(f"Checking that peer {node.image} can find the stored message")
|
||||||
self.store_response = node.get_store_messages(
|
self.store_response = self.get_messages_from_store(
|
||||||
peerAddr=peerAddr,
|
node=node,
|
||||||
includeData=includeData,
|
peer_addr=peer_addr,
|
||||||
pubsubTopic=pubsubTopic,
|
include_data=include_data,
|
||||||
contentTopics=contentTopics,
|
pubsub_topic=pubsub_topic,
|
||||||
startTime=startTime,
|
content_topics=content_topics,
|
||||||
endTime=endTime,
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
hashes=hashes,
|
hashes=hashes,
|
||||||
cursor=cursor,
|
cursor=cursor,
|
||||||
pageSize=pageSize,
|
page_size=page_size,
|
||||||
ascending=ascending,
|
ascending=ascending,
|
||||||
store_v=store_v,
|
store_v=store_v,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert "messages" in self.store_response, f"Peer {node.image} has no messages key in the reponse"
|
assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response.resp_json}"
|
||||||
assert self.store_response["messages"], f"Peer {node.image} couldn't find any messages"
|
assert len(self.store_response.messages) >= 1, "Expected at least 1 message but got none"
|
||||||
assert len(self.store_response["messages"]) >= 1, "Expected at least 1 message but got none"
|
store_message_index = -1 # we are looking for the last and most recent message in the store
|
||||||
waku_message = WakuMessage(self.store_response["messages"][-1:])
|
waku_message = WakuMessage([self.store_response.messages[store_message_index:]])
|
||||||
waku_message.assert_received_message(self.message)
|
if store_v == "v1":
|
||||||
|
waku_message.assert_received_message(message_to_check)
|
||||||
|
else:
|
||||||
|
expected_hash = self.compute_message_hash(pubsub_topic, message_to_check)
|
||||||
|
assert expected_hash == self.store_response.message_hash(
|
||||||
|
store_message_index
|
||||||
|
), f"Message hash returned by store doesn't match the computed message hash {expected_hash}"
|
||||||
|
|
||||||
@allure.step
|
@allure.step
|
||||||
def check_store_returns_empty_response(self, pubsub_topic=None):
|
def check_store_returns_empty_response(self, pubsub_topic=None):
|
||||||
if not pubsub_topic:
|
if not pubsub_topic:
|
||||||
pubsub_topic = self.test_pubsub_topic
|
pubsub_topic = self.test_pubsub_topic
|
||||||
try:
|
try:
|
||||||
self.check_published_message_is_stored(pubsubTopic=pubsub_topic, pageSize=5, ascending="true")
|
self.check_published_message_is_stored(pubsubTopic=pubsub_topic, page_size=5, ascending="true")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "couldn't find any messages" in str(ex)
|
assert "couldn't find any messages" in str(ex)
|
||||||
|
|
||||||
|
|
|
@ -98,6 +98,18 @@ CONTENT_TOPICS_SHARD_7 = [
|
||||||
|
|
||||||
VALID_PUBSUB_TOPICS = ["/waku/2/rs/0/0", "/waku/2/rs/0/1", "/waku/2/rs/0/9", "/waku/2/rs/0/25", "/waku/2/rs/0/1000"]
|
VALID_PUBSUB_TOPICS = ["/waku/2/rs/0/0", "/waku/2/rs/0/1", "/waku/2/rs/0/9", "/waku/2/rs/0/25", "/waku/2/rs/0/1000"]
|
||||||
|
|
||||||
|
PUBSUB_TOPICS_STORE = [
|
||||||
|
"/waku/2/rs/0/0",
|
||||||
|
"/waku/2/rs/0/1",
|
||||||
|
"/waku/2/rs/0/2",
|
||||||
|
"/waku/2/rs/0/3",
|
||||||
|
"/waku/2/rs/0/4",
|
||||||
|
"/waku/2/rs/0/5",
|
||||||
|
"/waku/2/rs/0/6",
|
||||||
|
"/waku/2/rs/0/7",
|
||||||
|
"/waku/2/rs/0/8",
|
||||||
|
]
|
||||||
|
|
||||||
INVALID_PUBSUB_TOPICS = ["/test/2/rs/0/1", "/waku/3/rs/0/1", "/waku/2/test/0/1", "/waku/2/rs/0/b", "/waku/2/rs/0"]
|
INVALID_PUBSUB_TOPICS = ["/test/2/rs/0/1", "/waku/3/rs/0/1", "/waku/2/test/0/1", "/waku/2/rs/0/b", "/waku/2/rs/0"]
|
||||||
|
|
||||||
PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [
|
PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [
|
||||||
|
|
|
@ -10,6 +10,7 @@ from uuid import uuid4
|
||||||
from src.libs.common import attach_allure_file
|
from src.libs.common import attach_allure_file
|
||||||
import src.env_vars as env_vars
|
import src.env_vars as env_vars
|
||||||
from src.data_storage import DS
|
from src.data_storage import DS
|
||||||
|
from src.postgres_setup import start_postgres, stop_postgres
|
||||||
|
|
||||||
logger = get_custom_logger(__name__)
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
@ -37,6 +38,13 @@ def set_allure_env_variables():
|
||||||
outfile.write(f"{attribute_name}={attribute_value}\n")
|
outfile.write(f"{attribute_name}={attribute_value}\n")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="class", autouse=False)
|
||||||
|
def start_postgres_container():
|
||||||
|
pg_container = start_postgres()
|
||||||
|
yield
|
||||||
|
stop_postgres(pg_container)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def test_id(request):
|
def test_id(request):
|
||||||
# setting up an unique test id to be used where needed
|
# setting up an unique test id to be used where needed
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
import pytest
|
||||||
|
from src.libs.common import to_base64
|
||||||
|
from src.node.waku_message import WakuMessage
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
from src.test_data import SAMPLE_INPUTS
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("node_setup")
|
||||||
|
class TestApiFlags(StepsStore):
|
||||||
|
def test_store_with_peerAddr(self):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(store_node=self.store_node1, peer_addr=self.multiaddr_list[0])
|
||||||
|
|
||||||
|
def test_store_include_data(self):
|
||||||
|
message_list = []
|
||||||
|
for payload in SAMPLE_INPUTS:
|
||||||
|
message = self.create_message(payload=to_base64(payload["value"]))
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_list.append(message)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, include_data="true", page_size=50)
|
||||||
|
assert len(store_response.messages) == len(SAMPLE_INPUTS)
|
||||||
|
for index in range(len(store_response.messages)):
|
||||||
|
assert store_response.message_payload(index) == message_list[index]["payload"]
|
||||||
|
assert store_response.message_pubsub_topic(index) == self.test_pubsub_topic
|
||||||
|
waku_message = WakuMessage([store_response.message_at(index)])
|
||||||
|
waku_message.assert_received_message(message_list[index])
|
|
@ -0,0 +1,104 @@
|
||||||
|
import pytest
|
||||||
|
from src.env_vars import NODE_1, NODE_2
|
||||||
|
from src.libs.common import to_base64
|
||||||
|
from src.node.store_response import StoreResponse
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1109")
|
||||||
|
@pytest.mark.usefixtures("node_setup")
|
||||||
|
class TestCursor(StepsStore):
|
||||||
|
# we implicitly test the reusabilty of the cursor for multiple nodes
|
||||||
|
|
||||||
|
def test_get_multiple_2000_store_messages(self):
|
||||||
|
expected_message_hash_list = []
|
||||||
|
for i in range(2000):
|
||||||
|
message = self.create_message(payload=to_base64(f"Message_{i}"))
|
||||||
|
self.publish_message(message=message)
|
||||||
|
expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, self.store_node1)
|
||||||
|
response_message_hash_list = []
|
||||||
|
while store_response.pagination_cursor is not None:
|
||||||
|
cursor = store_response.pagination_cursor
|
||||||
|
store_response = self.get_messages_from_store(self.store_node1, page_size=100, cursor=cursor)
|
||||||
|
for index in range(len(store_response.messages)):
|
||||||
|
response_message_hash_list.append(store_response.message_hash(index))
|
||||||
|
assert len(expected_message_hash_list) == len(response_message_hash_list), "Message count mismatch"
|
||||||
|
assert expected_message_hash_list == response_message_hash_list, "Message hash mismatch"
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cursor_index, message_count", [[2, 4], [3, 20], [10, 40], [19, 20], [19, 50], [110, 120]])
|
||||||
|
def test_different_cursor_and_indexes(self, cursor_index, message_count):
|
||||||
|
message_hash_list = []
|
||||||
|
cursor = ""
|
||||||
|
cursor_index = cursor_index if cursor_index < 100 else 100
|
||||||
|
for i in range(message_count):
|
||||||
|
message = self.create_message(payload=to_base64(f"Message_{i}"))
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=cursor_index)
|
||||||
|
assert len(store_response.messages) == cursor_index
|
||||||
|
cursor = store_response.pagination_cursor
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=100, ascending="true", cursor=cursor)
|
||||||
|
assert len(store_response.messages) == message_count - cursor_index
|
||||||
|
for index in range(len(store_response.messages)):
|
||||||
|
assert store_response.message_hash(index) == message_hash_list[cursor_index + index], f"Message hash at index {index} doesn't match"
|
||||||
|
|
||||||
|
def test_passing_cursor_not_returned_in_paginationCursor(self):
|
||||||
|
cursor = ""
|
||||||
|
for i in range(10):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5)
|
||||||
|
# retrieving the cursor with the message hash of the 3rd message stored
|
||||||
|
cursor = store_response.message_hash(2)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||||
|
assert len(store_response.messages) == 7, "Message count mismatch"
|
||||||
|
|
||||||
|
def test_passing_cursor_of_the_last_message_from_the_store(self):
|
||||||
|
cursor = ""
|
||||||
|
for i in range(10):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=10)
|
||||||
|
# retrieving the cursor with the message hash of the last message stored
|
||||||
|
cursor = store_response.message_hash(9)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||||
|
assert not store_response.messages, "Messages found"
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
|
||||||
|
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||||
|
def test_passing_cursor_of_non_existing_message_from_the_store(self):
|
||||||
|
for i in range(4):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
# creating a cursor to a message that doesn't exist
|
||||||
|
wrong_message = self.create_message(payload=to_base64("test"))
|
||||||
|
cursor = self.compute_message_hash(self.test_pubsub_topic, wrong_message)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||||
|
assert not store_response.messages, "Messages found"
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
|
||||||
|
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||||
|
def test_passing_invalid_cursor(self):
|
||||||
|
for i in range(4):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
# creating a invalid base64 cursor
|
||||||
|
cursor = to_base64("test")
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||||
|
assert not store_response.messages, "Messages found"
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110")
|
||||||
|
@pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716")
|
||||||
|
def test_passing_non_base64_cursor(self):
|
||||||
|
for i in range(4):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
# creating a non base64 cursor
|
||||||
|
cursor = "test"
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
||||||
|
assert not store_response.messages, "Messages found"
|
|
@ -0,0 +1,26 @@
|
||||||
|
import pytest
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("node_setup")
|
||||||
|
class TestEphemeral(StepsStore):
|
||||||
|
def test_message_with_ephemeral_true(self):
|
||||||
|
self.publish_message(message=self.create_message(ephemeral=True))
|
||||||
|
self.check_store_returns_empty_response()
|
||||||
|
|
||||||
|
def test_message_with_ephemeral_false(self):
|
||||||
|
self.publish_message(message=self.create_message(ephemeral=False))
|
||||||
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
||||||
|
def test_message_with_both_ephemeral_true_and_false(self):
|
||||||
|
self.publish_message(message=self.create_message(ephemeral=True))
|
||||||
|
stored = self.publish_message(message=self.create_message(ephemeral=False))
|
||||||
|
self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored)
|
||||||
|
assert len(self.store_response.messages) == 1
|
||||||
|
stored = self.publish_message(message=self.create_message(ephemeral=False))
|
||||||
|
self.publish_message(message=self.create_message(ephemeral=True))
|
||||||
|
self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored)
|
||||||
|
assert len(self.store_response.messages) == 2
|
|
@ -0,0 +1,30 @@
|
||||||
|
import pytest
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
from src.env_vars import PG_PASS, PG_USER
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class TestExternalDb(StepsStore):
|
||||||
|
postgress_url = f"postgres://{PG_USER}:{PG_PASS}@postgres:5432/postgres"
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
|
def node_postgres_setup(self, store_setup, start_postgres_container):
|
||||||
|
self.setup_first_publishing_node(store="true", relay="true", store_message_db_url=self.postgress_url)
|
||||||
|
self.setup_first_store_node(store="false", relay="true")
|
||||||
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
|
|
||||||
|
@pytest.mark.dependency(name="test_on_empty_postgress_db")
|
||||||
|
def test_on_empty_postgress_db(self):
|
||||||
|
message = self.create_message()
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
assert len(self.store_response.messages) == 1
|
||||||
|
|
||||||
|
@pytest.mark.dependency(depends=["test_on_empty_postgress_db"])
|
||||||
|
def test_on_postgress_db_with_one_message(self):
|
||||||
|
message = self.create_message()
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
assert len(self.store_response.messages) == 2
|
|
@ -2,29 +2,91 @@ import pytest
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
from src.libs.common import to_base64
|
from src.libs.common import to_base64
|
||||||
from src.steps.store import StepsStore
|
from src.steps.store import StepsStore
|
||||||
from src.test_data import SAMPLE_INPUTS
|
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, SAMPLE_INPUTS, PUBSUB_TOPICS_STORE
|
||||||
|
|
||||||
logger = get_custom_logger(__name__)
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
# TO DO test without pubsubtopic freezes
|
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("node_setup")
|
||||||
class TestGetMessages(StepsStore):
|
class TestGetMessages(StepsStore):
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
# only one test for store v1, all other tests are using the new store v3
|
||||||
def store_functional_setup(self, store_setup):
|
def test_legacy_store_v1(self):
|
||||||
self.setup_first_publishing_node(store="true", relay="true")
|
self.publish_message()
|
||||||
self.setup_first_store_node(store="true", relay="true")
|
for node in self.store_nodes:
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v1")
|
||||||
|
assert len(store_response["messages"]) == 1
|
||||||
|
|
||||||
def test_store_messages_with_valid_payloads(self):
|
def test_get_store_messages_with_different_payloads(self):
|
||||||
failed_payloads = []
|
failed_payloads = []
|
||||||
for payload in SAMPLE_INPUTS:
|
for payload in SAMPLE_INPUTS:
|
||||||
logger.debug(f'Running test with payload {payload["description"]}')
|
logger.debug(f'Running test with payload {payload["description"]}')
|
||||||
message = self.create_message(payload=to_base64(payload["value"]))
|
message = self.create_message(payload=to_base64(payload["value"]))
|
||||||
try:
|
try:
|
||||||
self.publish_message_via("relay", message=message)
|
self.publish_message(message=message)
|
||||||
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true")
|
self.check_published_message_is_stored(page_size=50, ascending="true")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f'Payload {payload["description"]} failed: {str(e)}')
|
logger.error(f'Payload {payload["description"]} failed: {str(e)}')
|
||||||
failed_payloads.append(payload["description"])
|
failed_payloads.append(payload["description"])
|
||||||
assert not failed_payloads, f"Payloads failed: {failed_payloads}"
|
assert not failed_payloads, f"Payloads failed: {failed_payloads}"
|
||||||
|
assert len(self.store_response.messages) == len(SAMPLE_INPUTS)
|
||||||
|
|
||||||
|
def test_get_store_messages_with_different_content_topics(self):
|
||||||
|
failed_content_topics = []
|
||||||
|
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||||
|
logger.debug(f"Running test with content topic {content_topic}")
|
||||||
|
message = self.create_message(contentTopic=content_topic)
|
||||||
|
try:
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.check_published_message_is_stored(page_size=50, content_topics=content_topic, ascending="true")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"ContentTopic {content_topic} failed: {str(e)}")
|
||||||
|
failed_content_topics.append(content_topic)
|
||||||
|
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
|
||||||
|
|
||||||
|
def test_get_store_messages_with_different_pubsub_topics(self):
|
||||||
|
self.subscribe_to_pubsub_topics_via_relay(pubsub_topics=PUBSUB_TOPICS_STORE)
|
||||||
|
failed_pubsub_topics = []
|
||||||
|
for pubsub_topic in PUBSUB_TOPICS_STORE:
|
||||||
|
logger.debug(f"Running test with pubsub topic {pubsub_topic}")
|
||||||
|
try:
|
||||||
|
self.publish_message(pubsub_topic=pubsub_topic)
|
||||||
|
self.check_published_message_is_stored(pubsub_topic=pubsub_topic, page_size=50, ascending="true")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"PubsubTopic pubsub_topic failed: {str(e)}")
|
||||||
|
failed_pubsub_topics.append(pubsub_topic)
|
||||||
|
assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}"
|
||||||
|
|
||||||
|
def test_get_store_message_with_meta(self):
|
||||||
|
message = self.create_message(meta=to_base64(self.test_payload))
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
||||||
|
def test_get_store_message_with_version(self):
|
||||||
|
message = self.create_message(version=10)
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
||||||
|
def test_get_store_duplicate_messages(self):
|
||||||
|
message = self.create_message()
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
# only one message is stored
|
||||||
|
assert len(self.store_response.messages) == 1
|
||||||
|
|
||||||
|
def test_get_multiple_store_messages(self):
|
||||||
|
message_hash_list = []
|
||||||
|
for payload in SAMPLE_INPUTS:
|
||||||
|
message = self.create_message(payload=to_base64(payload["value"]))
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=50)
|
||||||
|
assert len(store_response.messages) == len(SAMPLE_INPUTS)
|
||||||
|
for index in range(len(store_response.messages)):
|
||||||
|
assert store_response.message_hash(index) == message_hash_list[index], f"Message hash at index {index} doesn't match"
|
||||||
|
|
||||||
|
def test_store_is_empty(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=50)
|
||||||
|
assert not store_response.messages
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
import pytest
|
||||||
|
from src.env_vars import NODE_2
|
||||||
|
from src.libs.common import to_base64
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
from src.test_data import SAMPLE_INPUTS
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1109")
|
||||||
|
@pytest.mark.usefixtures("node_setup")
|
||||||
|
class TestHashes(StepsStore):
|
||||||
|
def test_store_with_hashes(self):
|
||||||
|
message_hash_list = []
|
||||||
|
for payload in SAMPLE_INPUTS:
|
||||||
|
message = self.create_message(payload=to_base64(payload["value"]))
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
for message_hash in message_hash_list:
|
||||||
|
store_response = self.get_messages_from_store(node, hashes=message_hash, page_size=50)
|
||||||
|
assert len(store_response.messages) == 1
|
||||||
|
assert store_response.message_hash(0) == message_hash
|
||||||
|
|
||||||
|
def test_store_with_multiple_hashes(self):
|
||||||
|
message_hash_list = []
|
||||||
|
for payload in SAMPLE_INPUTS:
|
||||||
|
message = self.create_message(payload=to_base64(payload["value"]))
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, hashes=f"{message_hash_list[0]},{message_hash_list[4]}", page_size=50)
|
||||||
|
assert len(store_response.messages) == 2
|
||||||
|
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on multiple hashes"
|
||||||
|
assert store_response.message_hash(1) == message_hash_list[4], "Incorrect messaged filtered based on multiple hashes"
|
||||||
|
|
||||||
|
def test_store_with_wrong_hash(self):
|
||||||
|
for i in range(4):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
wrong_hash = self.compute_message_hash(self.test_pubsub_topic, self.create_message(payload=to_base64("test")))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, hashes=wrong_hash, page_size=50)
|
||||||
|
assert not store_response.messages, "Messages found"
|
||||||
|
|
||||||
|
def test_store_with_invalid_hash(self):
|
||||||
|
for i in range(4):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
invalid_hash = to_base64("test")
|
||||||
|
for node in self.store_nodes:
|
||||||
|
try:
|
||||||
|
store_response = self.get_messages_from_store(node, hashes=invalid_hash, page_size=50)
|
||||||
|
assert not store_response.messages
|
||||||
|
except Exception as ex:
|
||||||
|
assert "waku message hash parsing error: invalid hash length" in str(ex)
|
||||||
|
|
||||||
|
def test_store_with_non_base64_hash(self):
|
||||||
|
for i in range(4):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
non_base64_hash = "test"
|
||||||
|
for node in self.store_nodes:
|
||||||
|
try:
|
||||||
|
store_response = self.get_messages_from_store(node, hashes=non_base64_hash, page_size=50)
|
||||||
|
assert not store_response.messages
|
||||||
|
except Exception as ex:
|
||||||
|
assert "waku message hash parsing error: invalid hash length" in str(ex)
|
|
@ -0,0 +1,35 @@
|
||||||
|
import pytest
|
||||||
|
from src.libs.common import to_base64
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("node_setup")
|
||||||
|
class TestPageSize(StepsStore):
|
||||||
|
def test_default_page_size(self):
|
||||||
|
for i in range(30):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node)
|
||||||
|
assert len(store_response.messages) == 20, "Message count mismatch"
|
||||||
|
|
||||||
|
def test_page_size_0_defaults_to_20(self):
|
||||||
|
for i in range(30):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=0)
|
||||||
|
assert len(store_response.messages) == 20, "Message count mismatch"
|
||||||
|
|
||||||
|
def test_max_page_size(self):
|
||||||
|
for i in range(200):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=200)
|
||||||
|
assert len(store_response.messages) == 100, "Message count mismatch"
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("page_size", [1, 11, 39, 81, 99])
|
||||||
|
def test_different_page_size(self, page_size):
|
||||||
|
for i in range(page_size + 1):
|
||||||
|
self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}")))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=page_size)
|
||||||
|
assert len(store_response.messages) == page_size, "Message count mismatch"
|
|
@ -0,0 +1,123 @@
|
||||||
|
import pytest
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
from src.libs.common import delay
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class TestReliability(StepsStore):
|
||||||
|
def test_publishing_node_is_stopped(self, node_setup):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.publishing_node1.stop()
|
||||||
|
try:
|
||||||
|
store_response = self.get_messages_from_store(self.store_node1, page_size=5)
|
||||||
|
assert len(store_response.messages) == 1
|
||||||
|
except Exception as ex:
|
||||||
|
if self.store_node1.is_gowaku():
|
||||||
|
assert "failed to dial: context deadline exceeded" in str(ex)
|
||||||
|
else:
|
||||||
|
raise AssertionError(f"Nwaku failed with {ex}")
|
||||||
|
|
||||||
|
def test_publishing_node_restarts(self, node_setup):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.publishing_node1.restart()
|
||||||
|
self.publishing_node1.ensure_ready()
|
||||||
|
self.add_node_peer(self.store_node1, self.multiaddr_list)
|
||||||
|
self.subscribe_to_pubsub_topics_via_relay(node=self.publishing_node1)
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5)
|
||||||
|
assert len(store_response.messages) == 2
|
||||||
|
|
||||||
|
def test_store_node_restarts(self, node_setup):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.store_node1.restart()
|
||||||
|
self.store_node1.ensure_ready()
|
||||||
|
self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1)
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5)
|
||||||
|
assert len(store_response.messages) == 2
|
||||||
|
|
||||||
|
def test_publishing_node_paused_and_unpaused(self, node_setup):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.publishing_node1.pause()
|
||||||
|
delay(1)
|
||||||
|
self.publishing_node1.unpause()
|
||||||
|
self.publishing_node1.ensure_ready()
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5)
|
||||||
|
assert len(store_response.messages) == 2
|
||||||
|
|
||||||
|
def test_store_node_paused_and_unpaused(self, node_setup):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.store_node1.pause()
|
||||||
|
delay(1)
|
||||||
|
self.store_node1.unpause()
|
||||||
|
self.store_node1.ensure_ready()
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5)
|
||||||
|
assert len(store_response.messages) == 2
|
||||||
|
|
||||||
|
def test_message_relayed_while_store_node_is_paused(self, node_setup):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.store_node1.pause()
|
||||||
|
self.publish_message()
|
||||||
|
self.store_node1.unpause()
|
||||||
|
self.store_node1.ensure_ready()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5)
|
||||||
|
assert len(store_response.messages) == 2
|
||||||
|
|
||||||
|
def test_message_relayed_while_store_node_is_stopped_without_removing(self):
|
||||||
|
self.setup_first_publishing_node(store="true", relay="true")
|
||||||
|
self.setup_first_store_node(store="false", relay="true", remove_container=False)
|
||||||
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.store_node1.container.stop()
|
||||||
|
self.publish_message()
|
||||||
|
self.store_node1.container.start()
|
||||||
|
self.store_node1.ensure_ready()
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5)
|
||||||
|
assert len(store_response.messages) == 2
|
||||||
|
|
||||||
|
def test_message_relayed_while_store_node_is_stopped_and_removed(self, node_setup):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.store_node1.stop()
|
||||||
|
self.store_nodes.remove(self.store_node1)
|
||||||
|
self.publish_message()
|
||||||
|
self.setup_first_store_node(store="false", relay="true")
|
||||||
|
self.store_node1.ensure_ready()
|
||||||
|
self.add_node_peer(self.store_node1, self.multiaddr_list)
|
||||||
|
self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1)
|
||||||
|
delay(1)
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5)
|
||||||
|
assert len(store_response.messages) == 2
|
||||||
|
|
||||||
|
def test_message_relayed_before_store_node_is_started(self, node_setup):
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
||||||
|
self.setup_second_store_node(store="false", relay="true")
|
||||||
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
|
store_response = self.get_messages_from_store(self.store_node2, page_size=5)
|
||||||
|
assert len(store_response.messages) == 1
|
||||||
|
self.publish_message()
|
||||||
|
self.check_published_message_is_stored(page_size=5)
|
|
@ -1,3 +1,5 @@
|
||||||
|
import pytest
|
||||||
|
from src.env_vars import NODE_2
|
||||||
from src.steps.store import StepsStore
|
from src.steps.store import StepsStore
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,51 +8,54 @@ class TestRunningNodes(StepsStore):
|
||||||
self.setup_first_publishing_node(store="true", relay="true")
|
self.setup_first_publishing_node(store="true", relay="true")
|
||||||
self.setup_first_store_node(store="true", relay="true")
|
self.setup_first_store_node(store="true", relay="true")
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("relay")
|
self.publish_message()
|
||||||
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
||||||
def test_main_node_relay_and_store__peer_only_store(self):
|
def test_main_node_relay_and_store__peer_only_store(self):
|
||||||
self.setup_first_publishing_node(store="true", relay="true")
|
self.setup_first_publishing_node(store="true", relay="true")
|
||||||
self.setup_first_store_node(store="true", relay="false")
|
self.setup_first_store_node(store="true", relay="false")
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("relay")
|
self.publish_message()
|
||||||
self.check_store_returns_empty_response()
|
self.check_store_returns_empty_response()
|
||||||
|
|
||||||
def test_main_node_relay_and_store__peer_only_relay(self):
|
def test_main_node_relay_and_store__peer_only_relay(self):
|
||||||
self.setup_first_publishing_node(store="true", relay="true")
|
self.setup_first_publishing_node(store="true", relay="true")
|
||||||
self.setup_first_store_node(store="false", relay="true")
|
self.setup_first_store_node(store="false", relay="true")
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("relay")
|
self.publish_message()
|
||||||
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
||||||
def test_main_node_relay_and_store__peer_neither_relay_nor_store(self):
|
def test_main_node_relay_and_store__peer_neither_relay_nor_store(self):
|
||||||
self.setup_first_publishing_node(store="true", relay="true")
|
self.setup_first_publishing_node(store="true", relay="true")
|
||||||
self.setup_first_store_node(store="false", relay="false")
|
self.setup_first_store_node(store="false", relay="false")
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("relay")
|
self.publish_message()
|
||||||
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1106")
|
||||||
def test_main_node_only_relay__peer_relay_and_store(self):
|
def test_main_node_only_relay__peer_relay_and_store(self):
|
||||||
self.setup_first_publishing_node(store="false", relay="true")
|
self.setup_first_publishing_node(store="false", relay="true")
|
||||||
self.setup_first_store_node(store="true", relay="true")
|
self.setup_first_store_node(store="true", relay="true")
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("relay")
|
self.publish_message()
|
||||||
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1106")
|
||||||
def test_main_node_only_relay__peer_only_store(self):
|
def test_main_node_only_relay__peer_only_store(self):
|
||||||
self.setup_first_publishing_node(store="false", relay="true")
|
self.setup_first_publishing_node(store="false", relay="true")
|
||||||
self.setup_first_store_node(store="true", relay="false")
|
self.setup_first_store_node(store="true", relay="false")
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("relay")
|
self.publish_message()
|
||||||
self.check_store_returns_empty_response()
|
self.check_store_returns_empty_response()
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1106")
|
||||||
def test_main_node_only_relay__peer_only_relay(self):
|
def test_main_node_only_relay__peer_only_relay(self):
|
||||||
self.setup_first_publishing_node(store="false", relay="true")
|
self.setup_first_publishing_node(store="false", relay="true")
|
||||||
self.setup_first_store_node(store="false", relay="true")
|
self.setup_first_store_node(store="false", relay="true")
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("relay")
|
self.publish_message()
|
||||||
try:
|
try:
|
||||||
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert "failed to negotiate protocol: protocols not supported" in str(ex) or "PEER_DIAL_FAILURE" in str(ex)
|
assert "failed to negotiate protocol: protocols not supported" in str(ex) or "PEER_DIAL_FAILURE" in str(ex)
|
||||||
|
|
||||||
|
@ -58,12 +63,12 @@ class TestRunningNodes(StepsStore):
|
||||||
self.setup_first_publishing_node(store="true", relay="true", lightpush="true")
|
self.setup_first_publishing_node(store="true", relay="true", lightpush="true")
|
||||||
self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0])
|
self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0])
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("lightpush", sender=self.store_node1)
|
self.publish_message(via="lightpush", sender=self.store_node1)
|
||||||
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
||||||
def test_store_with_filter(self):
|
def test_store_with_filter(self):
|
||||||
self.setup_first_publishing_node(store="true", relay="true", filter="true")
|
self.setup_first_publishing_node(store="true", relay="true", filter="true")
|
||||||
self.setup_first_store_node(store="false", relay="false", filter="true")
|
self.setup_first_store_node(store="false", relay="false", filter="true")
|
||||||
self.subscribe_to_pubsub_topics_via_relay()
|
self.subscribe_to_pubsub_topics_via_relay()
|
||||||
self.publish_message_via("relay")
|
self.publish_message()
|
||||||
self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true")
|
self.check_published_message_is_stored(page_size=5, ascending="true")
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
import pytest
|
||||||
|
from src.libs.common import to_base64
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("node_setup")
|
||||||
|
class TestSorting(StepsStore):
|
||||||
|
@pytest.mark.parametrize("ascending", ["true", "false"])
|
||||||
|
def test_store_sort_ascending(self, ascending):
|
||||||
|
expected_message_hash_list = []
|
||||||
|
for i in range(10):
|
||||||
|
message = self.create_message(payload=to_base64(f"Message_{i}"))
|
||||||
|
self.publish_message(message=message)
|
||||||
|
expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(node, page_size=5, ascending=ascending)
|
||||||
|
response_message_hash_list = []
|
||||||
|
for index in range(len(store_response.messages)):
|
||||||
|
response_message_hash_list.append(store_response.message_hash(index))
|
||||||
|
if ascending == "true":
|
||||||
|
assert response_message_hash_list == expected_message_hash_list[:5], "Message hash mismatch for acending order"
|
||||||
|
else:
|
||||||
|
assert response_message_hash_list == expected_message_hash_list[5:], "Message hash mismatch for descending order"
|
|
@ -0,0 +1,114 @@
|
||||||
|
import pytest
|
||||||
|
from datetime import timedelta, datetime
|
||||||
|
from src.libs.custom_logger import get_custom_logger
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
|
||||||
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("node_setup")
|
||||||
|
class TestTimeFilter(StepsStore):
|
||||||
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
|
def setup_test_data(self):
|
||||||
|
self.ts_pass = [
|
||||||
|
{"description": "3 sec Past", "value": int((datetime.now() - timedelta(seconds=3)).timestamp() * 1e9)},
|
||||||
|
{"description": "1 sec Past", "value": int((datetime.now() - timedelta(seconds=1)).timestamp() * 1e9)},
|
||||||
|
{"description": "0.1 sec Past", "value": int((datetime.now() - timedelta(seconds=0.1)).timestamp() * 1e9)},
|
||||||
|
{"description": "0.1 sec Future", "value": int((datetime.now() + timedelta(seconds=0.1)).timestamp() * 1e9)},
|
||||||
|
{"description": "2 sec Future", "value": int((datetime.now() + timedelta(seconds=2)).timestamp() * 1e9)},
|
||||||
|
{"description": "10 sec Future", "value": int((datetime.now() + timedelta(seconds=10)).timestamp() * 1e9)},
|
||||||
|
]
|
||||||
|
self.ts_fail = [
|
||||||
|
{"description": "20 sec Past", "value": int((datetime.now() - timedelta(seconds=20)).timestamp() * 1e9)},
|
||||||
|
{"description": "40 sec Future", "value": int((datetime.now() + timedelta(seconds=40)).timestamp() * 1e9)},
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_messages_with_timestamps_close_to_now(self):
|
||||||
|
failed_timestamps = []
|
||||||
|
for timestamp in self.ts_pass:
|
||||||
|
logger.debug(f'Running test with payload {timestamp["description"]}')
|
||||||
|
message = self.create_message(timestamp=timestamp["value"])
|
||||||
|
try:
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.check_published_message_is_stored(page_size=20, ascending="true")
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error(f'Payload {timestamp["description"]} failed: {str(ex)}')
|
||||||
|
failed_timestamps.append(timestamp["description"])
|
||||||
|
assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}"
|
||||||
|
|
||||||
|
def test_messages_with_timestamps_far_from_now(self):
|
||||||
|
success_timestamps = []
|
||||||
|
for timestamp in self.ts_fail:
|
||||||
|
logger.debug(f'Running test with payload {timestamp["description"]}')
|
||||||
|
message = self.create_message(timestamp=timestamp["value"])
|
||||||
|
try:
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.check_store_returns_empty_response()
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error(f'Payload {timestamp["description"]} succeeded where it should have failed: {str(ex)}')
|
||||||
|
success_timestamps.append(timestamp["description"])
|
||||||
|
assert not success_timestamps, f"Timestamps succeeded: {success_timestamps}"
|
||||||
|
|
||||||
|
def test_time_filter_matches_one_message(self):
|
||||||
|
message_hash_list = []
|
||||||
|
for timestamp in self.ts_pass:
|
||||||
|
message = self.create_message(timestamp=timestamp["value"])
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(
|
||||||
|
node,
|
||||||
|
page_size=20,
|
||||||
|
start_time=self.ts_pass[0]["value"] - 100000,
|
||||||
|
end_time=self.ts_pass[0]["value"] + 100000,
|
||||||
|
)
|
||||||
|
assert len(store_response.messages) == 1, "Message count mismatch"
|
||||||
|
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on time"
|
||||||
|
|
||||||
|
def test_time_filter_matches_multiple_messages(self):
|
||||||
|
message_hash_list = []
|
||||||
|
for timestamp in self.ts_pass:
|
||||||
|
message = self.create_message(timestamp=timestamp["value"])
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(
|
||||||
|
node,
|
||||||
|
page_size=20,
|
||||||
|
start_time=self.ts_pass[0]["value"] - 100000,
|
||||||
|
end_time=self.ts_pass[4]["value"] + 100000,
|
||||||
|
)
|
||||||
|
assert len(store_response.messages) == 5, "Message count mismatch"
|
||||||
|
for i in range(5):
|
||||||
|
assert store_response.message_hash(i) == message_hash_list[i], f"Incorrect messaged filtered based on time at index {i}"
|
||||||
|
|
||||||
|
def test_time_filter_matches_no_message(self):
|
||||||
|
message_hash_list = []
|
||||||
|
for timestamp in self.ts_pass:
|
||||||
|
message = self.create_message(timestamp=timestamp["value"])
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(
|
||||||
|
node,
|
||||||
|
page_size=20,
|
||||||
|
start_time=self.ts_pass[0]["value"] - 100000,
|
||||||
|
end_time=self.ts_pass[0]["value"] - 100,
|
||||||
|
)
|
||||||
|
assert not store_response.messages, "Message count mismatch"
|
||||||
|
|
||||||
|
def test_time_filter_start_time_equals_end_time(self):
|
||||||
|
message_hash_list = []
|
||||||
|
for timestamp in self.ts_pass:
|
||||||
|
message = self.create_message(timestamp=timestamp["value"])
|
||||||
|
self.publish_message(message=message)
|
||||||
|
message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = self.get_messages_from_store(
|
||||||
|
node,
|
||||||
|
page_size=20,
|
||||||
|
start_time=self.ts_pass[0]["value"],
|
||||||
|
end_time=self.ts_pass[0]["value"],
|
||||||
|
)
|
||||||
|
assert len(store_response.messages) == 1, "Message count mismatch"
|
||||||
|
assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on time"
|
|
@ -0,0 +1,81 @@
|
||||||
|
import pytest
|
||||||
|
from src.env_vars import NODE_2
|
||||||
|
from src.steps.store import StepsStore
|
||||||
|
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1108")
|
||||||
|
class TestTopics(StepsStore):
|
||||||
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
|
def topics_setup(self, node_setup):
|
||||||
|
self.message_hash_list = []
|
||||||
|
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||||
|
message = self.create_message(contentTopic=content_topic)
|
||||||
|
self.publish_message(message=message)
|
||||||
|
self.message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message))
|
||||||
|
|
||||||
|
def test_store_with_one_content_topic(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS):
|
||||||
|
store_response = node.get_store_messages(content_topics=content_topic, page_size=20, ascending="true")
|
||||||
|
assert len(store_response["messages"]) == 1, "Message count mismatch"
|
||||||
|
assert (
|
||||||
|
store_response["messages"][0]["messageHash"] == self.message_hash_list[index]
|
||||||
|
), "Incorrect messaged filtered based on content topic"
|
||||||
|
|
||||||
|
def test_store_with_multiple_content_topics(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = node.get_store_messages(
|
||||||
|
content_topics=f"{CONTENT_TOPICS_DIFFERENT_SHARDS[0]},{CONTENT_TOPICS_DIFFERENT_SHARDS[4]}", page_size=20, ascending="true"
|
||||||
|
)
|
||||||
|
assert len(store_response["messages"]) == 2, "Message count mismatch"
|
||||||
|
assert (
|
||||||
|
store_response["messages"][0]["messageHash"] == self.message_hash_list[0]
|
||||||
|
), "Incorrect messaged filtered based on multiple content topics"
|
||||||
|
assert (
|
||||||
|
store_response["messages"][1]["messageHash"] == self.message_hash_list[4]
|
||||||
|
), "Incorrect messaged filtered based on multiple content topics"
|
||||||
|
|
||||||
|
def test_store_with_unknown_content_topic(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = node.get_store_messages(content_topics="test", page_size=20, ascending="true")
|
||||||
|
assert len(store_response["messages"]) == 0, "Message count mismatch"
|
||||||
|
|
||||||
|
def test_store_with_unknown_pubsub_topic(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = node.get_store_messages(pubsub_topic="test", page_size=20, ascending="true")
|
||||||
|
assert len(store_response["messages"]) == 0, "Message count mismatch"
|
||||||
|
|
||||||
|
def test_store_with_both_pubsub_topic_and_content_topic(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS):
|
||||||
|
store_response = node.get_store_messages(
|
||||||
|
pubsub_topic=self.test_pubsub_topic, content_topics=content_topic, page_size=20, ascending="true"
|
||||||
|
)
|
||||||
|
assert len(store_response["messages"]) == 1, "Message count mismatch"
|
||||||
|
assert (
|
||||||
|
store_response["messages"][0]["messageHash"] == self.message_hash_list[index]
|
||||||
|
), "Incorrect messaged filtered based on content topic"
|
||||||
|
|
||||||
|
def test_store_with_unknown_pubsub_topic_but_known_content_topic(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = node.get_store_messages(
|
||||||
|
pubsub_topic="test", content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS[0], page_size=20, ascending="true"
|
||||||
|
)
|
||||||
|
assert len(store_response["messages"]) == 0, "Message count mismatch"
|
||||||
|
|
||||||
|
def test_store_with_both_pubsub_topic_and_content_topic(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS):
|
||||||
|
store_response = node.get_store_messages(
|
||||||
|
pubsub_topic=self.test_pubsub_topic, content_topics=content_topic, page_size=20, ascending="true"
|
||||||
|
)
|
||||||
|
assert len(store_response["messages"]) == 1, "Message count mismatch"
|
||||||
|
assert (
|
||||||
|
store_response["messages"][0]["messageHash"] == self.message_hash_list[index]
|
||||||
|
), "Incorrect messaged filtered based on content topic"
|
||||||
|
|
||||||
|
def test_store_without_pubsub_topic_and_content_topic(self):
|
||||||
|
for node in self.store_nodes:
|
||||||
|
store_response = node.get_store_messages(page_size=20, ascending="true")
|
||||||
|
assert len(store_response["messages"]) == len(CONTENT_TOPICS_DIFFERENT_SHARDS), "Message count mismatch"
|
Loading…
Reference in New Issue