mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-07 00:13:06 +00:00
new tests
This commit is contained in:
parent
d50b612ece
commit
49e5f8b407
@ -1,9 +1,10 @@
|
||||
import logging
|
||||
from time import sleep
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import os
|
||||
import base64
|
||||
import allure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
def bytes_to_hex(byte_array):
|
||||
@ -26,3 +27,8 @@ def to_base64(input_data):
|
||||
def attach_allure_file(file):
|
||||
logger.debug("Attaching file %s", file)
|
||||
allure.attach.file(file, name=os.path.basename(file), attachment_type=allure.attachment_type.TEXT)
|
||||
|
||||
|
||||
def delay(num_seconds):
|
||||
logger.debug("Sleeping for %s seconds", num_seconds)
|
||||
sleep(num_seconds)
|
||||
|
||||
24
src/libs/custom_logger.py
Normal file
24
src/libs/custom_logger.py
Normal file
@ -0,0 +1,24 @@
|
||||
import logging
|
||||
|
||||
max_log_line_length = 5000
|
||||
|
||||
|
||||
def log_length_filter(max_length):
|
||||
class logLengthFilter(logging.Filter):
|
||||
def filter(self, record):
|
||||
if len(record.getMessage()) > max_length:
|
||||
logging.getLogger(record.name).log(
|
||||
record.levelno, f"Log line was discarded because it's longer than max_log_line_length={max_log_line_length}"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
|
||||
return logLengthFilter()
|
||||
|
||||
|
||||
def get_custom_logger(name):
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
logging.getLogger("docker").setLevel(logging.WARNING)
|
||||
logger = logging.getLogger(name)
|
||||
logger.addFilter(log_length_filter(max_log_line_length))
|
||||
return logger
|
||||
@ -1,9 +1,9 @@
|
||||
import logging
|
||||
import requests
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
from abc import ABC, abstractmethod
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class BaseClient(ABC):
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
import logging
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import json
|
||||
from dataclasses import asdict
|
||||
from urllib.parse import quote
|
||||
from src.node.api_clients.base_client import BaseClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class REST(BaseClient):
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import logging
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import json
|
||||
from dataclasses import asdict
|
||||
from src.node.api_clients.base_client import BaseClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class RPC(BaseClient):
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import os
|
||||
import logging
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import random
|
||||
import threading
|
||||
import docker
|
||||
@ -7,7 +7,7 @@ from src.env_vars import NETWORK_NAME, SUBNET, IP_RANGE, GATEWAY
|
||||
from docker.types import IPAMConfig, IPAMPool
|
||||
from docker.errors import NotFound
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class DockerManager:
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import os
|
||||
import logging
|
||||
from time import sleep
|
||||
from src.libs.common import delay
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
from src.node.api_clients.rpc import RPC
|
||||
from src.node.api_clients.rest import REST
|
||||
@ -8,7 +8,7 @@ from src.node.docker_mananger import DockerManager
|
||||
from src.env_vars import LOG_DIR, DEFAULT_PUBSUBTOPIC, PROTOCOL
|
||||
from src.data_storage import DS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class WakuNode:
|
||||
@ -72,7 +72,7 @@ class WakuNode:
|
||||
"Started container from image %s. RPC: %s REST: %s WebSocket: %s", self._image_name, self._rpc_port, self._rest_port, self._websocket_port
|
||||
)
|
||||
DS.waku_nodes.append(self)
|
||||
sleep(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
|
||||
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
|
||||
try:
|
||||
self.ensure_ready()
|
||||
except Exception as e:
|
||||
|
||||
@ -1,15 +1,15 @@
|
||||
import logging
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import math
|
||||
from time import sleep, time
|
||||
from time import time
|
||||
import pytest
|
||||
import allure
|
||||
from src.libs.common import to_base64
|
||||
from src.libs.common import to_base64, delay
|
||||
from src.data_classes import message_rpc_response_schema
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.node.waku_node import WakuNode
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class StepsRelay:
|
||||
@ -23,6 +23,7 @@ class StepsRelay:
|
||||
self.test_pubsub_topic = "/waku/2/rs/18/1"
|
||||
self.test_content_topic = "/test/1/waku-relay/proto"
|
||||
self.test_payload = "Relay works!!"
|
||||
self.test_message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
self.node1.set_subscriptions([self.test_pubsub_topic])
|
||||
self.node2.set_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
@ -36,22 +37,34 @@ class StepsRelay:
|
||||
raise Exception(f"WARM UP FAILED WITH: {ex}")
|
||||
|
||||
@allure.step
|
||||
def check_published_message_reaches_peer(self, message):
|
||||
self.node1.send_message(message, self.test_pubsub_topic)
|
||||
sleep(0.1)
|
||||
get_messages_response = self.node2.get_messages(self.test_pubsub_topic)
|
||||
def check_published_message_reaches_peer(self, message, pubsub_topic=None, message_propagation_delay=0.1):
|
||||
if not pubsub_topic:
|
||||
pubsub_topic = self.test_pubsub_topic
|
||||
self.node1.send_message(message, pubsub_topic)
|
||||
|
||||
delay(message_propagation_delay)
|
||||
get_messages_response = self.node2.get_messages(pubsub_topic)
|
||||
logger.debug("Got reponse from remote peer %s", get_messages_response)
|
||||
assert get_messages_response, "Peer node couldn't find any messages"
|
||||
received_message = message_rpc_response_schema.load(get_messages_response[0])
|
||||
self.assert_received_message(message, received_message)
|
||||
|
||||
def assert_received_message(self, sent_message, received_message):
|
||||
def assert_fail_message(field_name):
|
||||
return f"Incorrect {field_name}. Published {sent_message[field_name]} Received {getattr(received_message, field_name)}"
|
||||
|
||||
assert (
|
||||
received_message.payload == message["payload"]
|
||||
), f'Incorrect payload. Published {message["payload"]} Received {received_message.payload}'
|
||||
received_message.payload == sent_message["payload"]
|
||||
), f'Incorrect payload. Published {sent_message["payload"]} Received {received_message.payload}'
|
||||
assert (
|
||||
received_message.contentTopic == message["contentTopic"]
|
||||
), f'Incorrect contentTopic. Published {message["contentTopic"]} Received {received_message.contentTopic}'
|
||||
if "timestamp" in message and message["timestamp"]:
|
||||
assert_fail_message = f'Incorrect timestamp. Published {message["timestamp"]} Received {received_message.timestamp}'
|
||||
if isinstance(message["timestamp"], float):
|
||||
assert math.isclose(float(received_message.timestamp), message["timestamp"], rel_tol=1e-9), assert_fail_message
|
||||
received_message.contentTopic == sent_message["contentTopic"]
|
||||
), f'Incorrect contentTopic. Published {sent_message["contentTopic"]} Received {received_message.contentTopic}'
|
||||
if "timestamp" in sent_message and sent_message["timestamp"]:
|
||||
if isinstance(sent_message["timestamp"], float):
|
||||
assert math.isclose(float(received_message.timestamp), sent_message["timestamp"], rel_tol=1e-9), assert_fail_message("timestamp")
|
||||
else:
|
||||
assert str(received_message.timestamp) == str(message["timestamp"]), assert_fail_message
|
||||
assert str(received_message.timestamp) == str(sent_message["timestamp"]), assert_fail_message("timestamp")
|
||||
if "version" in sent_message and sent_message["version"]:
|
||||
assert str(received_message.version) == str(sent_message["version"]), assert_fail_message("version")
|
||||
if "meta" in sent_message and sent_message["meta"]:
|
||||
assert str(received_message.meta) == str(sent_message["meta"]), assert_fail_message("meta")
|
||||
|
||||
@ -47,6 +47,7 @@ SAMPLE_INPUTS = [
|
||||
]
|
||||
|
||||
INVALID_PAYLOADS = [
|
||||
{"description": "Empty string", "value": ""},
|
||||
{"description": "Unecoded text", "value": "Hello World!"},
|
||||
{"description": "A dictionary", "value": {"key": "YWFh"}},
|
||||
{"description": "An integer", "value": 1234567890},
|
||||
@ -55,6 +56,7 @@ INVALID_PAYLOADS = [
|
||||
]
|
||||
|
||||
INVALID_CONTENT_TOPICS = [
|
||||
{"description": "Empty string", "value": ""},
|
||||
{"description": "A dictionary", "value": {"key": "YWFh"}},
|
||||
{"description": "An integer", "value": 1234567890},
|
||||
{"description": "A list", "value": ["YWFh"]},
|
||||
|
||||
@ -1,4 +0,0 @@
|
||||
import logging
|
||||
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
logging.getLogger("docker").setLevel(logging.WARNING)
|
||||
@ -1,6 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import glob
|
||||
import logging
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import os
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
@ -9,7 +9,7 @@ from src.libs.common import attach_allure_file
|
||||
import src.env_vars as env_vars
|
||||
from src.data_storage import DS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
# See https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures
|
||||
@ -43,6 +43,12 @@ def test_id(request):
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def test_setup(request, test_id):
|
||||
logger.debug("Running test: %s with id: %s", request.node.name, request.cls.test_id)
|
||||
yield
|
||||
for file in glob.glob(os.path.join(env_vars.LOG_DIR, "*" + request.cls.test_id + "*")):
|
||||
try:
|
||||
os.remove(file)
|
||||
except Exception:
|
||||
logger.debug("Could not remove file: %s", file)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@ -50,7 +56,7 @@ def attach_logs_on_fail(request):
|
||||
yield
|
||||
if hasattr(request.node, "rep_call") and request.node.rep_call.failed:
|
||||
logger.debug("Test failed, attempting to attach logs to the allure reports")
|
||||
for file in glob.glob(os.path.join(env_vars.LOG_DIR, request.cls.test_id + "*")):
|
||||
for file in glob.glob(os.path.join(env_vars.LOG_DIR, "*" + request.cls.test_id + "*")):
|
||||
attach_allure_file(file)
|
||||
|
||||
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
import logging
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from time import time
|
||||
from src.libs.common import to_base64
|
||||
from src.steps.relay import StepsRelay
|
||||
from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TestRelayPublish(StepsRelay):
|
||||
@ -45,7 +45,24 @@ class TestRelayPublish(StepsRelay):
|
||||
else:
|
||||
raise Exception("Not implemented")
|
||||
|
||||
def test_publish_with_various_content_topics(self):
|
||||
def test_publish_with_payload_less_than_one_mb(self):
|
||||
payload_length = 1024 * 1023
|
||||
logger.debug("Running test with payload length of %s bytes", payload_length)
|
||||
message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
self.check_published_message_reaches_peer(message, message_propagation_delay=2)
|
||||
|
||||
def test_publish_with_payload_equal_or_more_than_one_mb(self):
|
||||
payload_length = 1024 * 1023
|
||||
for payload_length in [1024 * 1024, 1024 * 1024 * 10]:
|
||||
logger.debug("Running test with payload length of %s bytes", payload_length)
|
||||
message = {"payload": to_base64("a" * (payload_length)), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
try:
|
||||
self.check_published_message_reaches_peer(message, message_propagation_delay=2)
|
||||
raise AssertionError("Duplicate message was retrieved twice")
|
||||
except Exception as ex:
|
||||
assert "Peer node couldn't find any messages" in str(ex)
|
||||
|
||||
def test_publish_with_valid_content_topics(self):
|
||||
failed_content_topics = []
|
||||
for content_topic in SAMPLE_INPUTS:
|
||||
logger.debug("Running test with content topic %s", content_topic["description"])
|
||||
@ -82,6 +99,18 @@ class TestRelayPublish(StepsRelay):
|
||||
else:
|
||||
raise Exception("Not implemented")
|
||||
|
||||
def test_publish_on_unsubscribed_pubsub_topic(self):
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message, pubsub_topic="/waku/2/rs/19/1")
|
||||
raise AssertionError("Publish on unsubscribed pubsub_topic worked!!!")
|
||||
except Exception as ex:
|
||||
if self.node1.is_nwaku():
|
||||
assert "Bad Request" in str(ex)
|
||||
elif self.node1.is_gowaku():
|
||||
assert "Internal Server Error" in str(ex)
|
||||
else:
|
||||
raise Exception("Not implemented")
|
||||
|
||||
def test_publish_with_valid_timestamps(self):
|
||||
failed_timestamps = []
|
||||
for timestamp in SAMPLE_TIMESTAMPS:
|
||||
@ -111,3 +140,35 @@ class TestRelayPublish(StepsRelay):
|
||||
def test_publish_with_no_timestamp(self):
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic}
|
||||
self.check_published_message_reaches_peer(message)
|
||||
|
||||
def test_publish_with_valid_version(self):
|
||||
self.test_message["version"] = 10
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
|
||||
def test_publish_with_invalid_version(self):
|
||||
self.test_message["version"] = 2.1
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
raise AssertionError("Publish with invalid version worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex)
|
||||
|
||||
def test_publish_with_valid_meta(self):
|
||||
self.test_message["meta"] = to_base64(self.test_payload)
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
|
||||
def test_publish_with_invalid_meta(self):
|
||||
self.test_message["meta"] = self.test_payload
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
raise AssertionError("Publish with invalid meta worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex)
|
||||
|
||||
def test_publish_and_retrieve_duplicate_message(self):
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
try:
|
||||
self.check_published_message_reaches_peer(self.test_message)
|
||||
raise AssertionError("Duplicate message was retrieved twice")
|
||||
except Exception as ex:
|
||||
assert "Peer node couldn't find any messages" in str(ex)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user