mirror of
https://github.com/logos-blockchain/logos-blockchain-e2e-tests.git
synced 2026-01-02 13:13:08 +00:00
Merge pull request #7 from logos-co/test-dos-robustness-high-load
Test/high load denial of service
This commit is contained in:
commit
90a30b635f
@ -13,9 +13,14 @@ pip install -r requirements.txt
|
||||
mkdir -p kzgrs
|
||||
wget https://raw.githubusercontent.com/logos-co/nomos-node/master/tests/kzgrs/kzgrs_test_params -O kzgrs/kzgrs_test_params
|
||||
pre-commit install
|
||||
(optional) Overwrite default vars from src/env_vars.py via cli env vars or by adding a .env file
|
||||
(optional) Overwrite default vars from src/env_vars.py via env vars or by adding a .env file
|
||||
pytest
|
||||
```
|
||||
Set optional environment variable to search logs for errors after each tests:
|
||||
```shell
|
||||
export CHECK_LOG_ERRORS=True
|
||||
```
|
||||
|
||||
|
||||
## License
|
||||
|
||||
|
||||
72
src/api_clients/invalid_rest.py
Normal file
72
src/api_clients/invalid_rest.py
Normal file
@ -0,0 +1,72 @@
|
||||
import random
|
||||
import string
|
||||
|
||||
from src.api_clients.rest import REST
|
||||
from src.libs.common import generate_random_bytes
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import json
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
def alter_dispersal_data(data):
|
||||
|
||||
# Add random bytes to data and break padding
|
||||
def alter_data_content():
|
||||
random_n = random.randint(1, 31)
|
||||
data["data"].extend(list(generate_random_bytes(random_n)))
|
||||
|
||||
# Change structure and content for metadata
|
||||
def alter_metadata():
|
||||
random_n = random.randint(7, 32)
|
||||
data["metadata"] = list(generate_random_bytes(random_n))
|
||||
|
||||
# Add random property to the data object with random list content
|
||||
def add_random_property():
|
||||
random_k = random.randint(1, 16)
|
||||
random_n = random.randint(7, 64)
|
||||
random_str = "".join(random.choices(string.printable, k=random_k))
|
||||
data[random_str] = list(generate_random_bytes(random_n))
|
||||
|
||||
choice = random.choice([alter_data_content, alter_metadata, add_random_property])
|
||||
logger.debug(f"Data for dispersal request has been altered with: {choice.__name__}")
|
||||
|
||||
choice()
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def alter_get_range_query(query):
|
||||
|
||||
# Swap range high with range low
|
||||
def swap_range():
|
||||
end = query["range"]["end"]
|
||||
query["range"]["end"] = query["range"]["start"]
|
||||
query["range"]["start"] = end
|
||||
|
||||
# Change app id
|
||||
def alter_app_id():
|
||||
random_n = random.randint(8, 33)
|
||||
query["app_id"] = list(generate_random_bytes(random_n))
|
||||
|
||||
choice = random.choice([swap_range, alter_app_id])
|
||||
logger.debug(f"Get-range query has been altered with: {choice.__name__}")
|
||||
|
||||
choice()
|
||||
|
||||
return query
|
||||
|
||||
|
||||
class InvalidRest(REST):
|
||||
def __init__(self, rest_port):
|
||||
super().__init__(rest_port)
|
||||
|
||||
def send_dispersal_request(self, data):
|
||||
data = alter_dispersal_data(data)
|
||||
response = self.rest_call("post", "disperse-data", json.dumps(data))
|
||||
return response
|
||||
|
||||
def send_get_range(self, query):
|
||||
query = alter_get_range_query(query)
|
||||
response = self.rest_call("post", "da/get-range", json.dumps(query))
|
||||
return response.json()
|
||||
@ -10,21 +10,22 @@ class REST(BaseClient):
|
||||
self._rest_port = rest_port
|
||||
|
||||
def rest_call(self, method, endpoint, payload=None):
|
||||
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
|
||||
url = f"http://localhost:{self._rest_port}/{endpoint}"
|
||||
headers = {"Content-Type": "application/json", "Connection": "close"}
|
||||
return self.make_request(method, url, headers=headers, data=payload)
|
||||
|
||||
def rest_call_text(self, method, endpoint, payload=None):
|
||||
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
|
||||
url = f"http://localhost:{self._rest_port}/{endpoint}"
|
||||
headers = {"accept": "text/plain", "Connection": "close"}
|
||||
return self.make_request(method, url, headers=headers, data=payload)
|
||||
|
||||
def info(self):
|
||||
status_response = self.rest_call("get", "cryptarchia/info")
|
||||
return status_response.json()
|
||||
response = self.rest_call("get", "cryptarchia/info")
|
||||
return response.json()
|
||||
|
||||
def send_dispersal_request(self, data):
|
||||
return self.rest_call("post", "disperse-data", json.dumps(data))
|
||||
response = self.rest_call("post", "disperse-data", json.dumps(data))
|
||||
return response
|
||||
|
||||
def send_get_range(self, query):
|
||||
response = self.rest_call("post", "da/get-range", json.dumps(query))
|
||||
|
||||
@ -1,11 +0,0 @@
|
||||
from src.env_vars import NOMOS_IMAGE
|
||||
|
||||
nomos_cli = {
|
||||
"reconstruct": {
|
||||
"image": NOMOS_IMAGE,
|
||||
"flags": [{"--app-blobs": [0]}], # Value [] is a list of indexes into list of values required for the flag
|
||||
"volumes": [],
|
||||
"ports": [],
|
||||
"entrypoint": "",
|
||||
},
|
||||
}
|
||||
21
src/client/client_vars.py
Normal file
21
src/client/client_vars.py
Normal file
@ -0,0 +1,21 @@
|
||||
from src.env_vars import NOMOS_IMAGE, HTTP_PROXY_IMAGE
|
||||
|
||||
nomos_cli = {
|
||||
"reconstruct": {
|
||||
"image": NOMOS_IMAGE,
|
||||
"flags": [{"--app-blobs": [0]}], # Value [] is a list of indexes into list of values required for the flag
|
||||
"volumes": [],
|
||||
"ports": [],
|
||||
"entrypoint": "",
|
||||
},
|
||||
}
|
||||
|
||||
http_proxy = {
|
||||
"configurable-http-proxy": {
|
||||
"image": HTTP_PROXY_IMAGE,
|
||||
"flags": [{"--default-target": [0]}], # Value [] is a list of indexes into list of values required for the flag
|
||||
"volumes": [],
|
||||
"ports": ["8000/tcp"],
|
||||
"entrypoint": "",
|
||||
}
|
||||
}
|
||||
@ -3,14 +3,13 @@ import os
|
||||
import re
|
||||
|
||||
from src.data_storage import DS
|
||||
from src.libs.common import generate_log_prefix
|
||||
from src.libs.common import generate_log_prefix, delay, remove_padding
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
|
||||
from src.cli.cli_vars import nomos_cli
|
||||
from src.client.client_vars import nomos_cli
|
||||
from src.docker_manager import DockerManager, stop, kill
|
||||
from src.env_vars import DOCKER_LOG_DIR, NOMOS_CLI
|
||||
from src.steps.da import remove_padding
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
@ -24,7 +23,7 @@ class NomosCli:
|
||||
if command not in nomos_cli:
|
||||
raise ValueError("Unknown command provided")
|
||||
|
||||
logger.debug(f"Cli is going to be initialized with this config {nomos_cli[command]}")
|
||||
logger.debug(f"NomosCli is going to be initialized with this config {nomos_cli[command]}")
|
||||
self._command = command
|
||||
self._image_name = nomos_cli[command]["image"]
|
||||
self._internal_ports = nomos_cli[command]["ports"]
|
||||
@ -66,25 +65,29 @@ class NomosCli:
|
||||
command=cmd,
|
||||
)
|
||||
|
||||
DS.nomos_nodes.append(self)
|
||||
logger.info(f"Started container {self._container_name} from image {self._image_name}.")
|
||||
DS.client_nodes.append(self)
|
||||
|
||||
match self._command:
|
||||
case "reconstruct":
|
||||
decode_only = kwargs.get("decode_only", False)
|
||||
return self.reconstruct(input_values=input_values, decode_only=decode_only)
|
||||
return self.reconstruct(decode_only=decode_only)
|
||||
case _:
|
||||
return
|
||||
|
||||
def reconstruct(self, input_values=None, decode_only=False):
|
||||
keywords = ["Reconstructed data"]
|
||||
def reconstruct(self, decode_only=False):
|
||||
keyword = "Reconstructed data"
|
||||
keywords = [keyword]
|
||||
|
||||
log_stream = self._container.logs(stream=True)
|
||||
|
||||
matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False, log_stream)
|
||||
assert len(matches) > 0, f"Reconstructed data not found {matches}"
|
||||
assert len(matches[keyword]) > 0, f"Reconstructed data not found {matches[keyword]}"
|
||||
|
||||
logger.debug(f"Reconstructed data match found {matches[keyword]}")
|
||||
|
||||
# Use regular expression that captures the byte list after "Reconstructed data"
|
||||
result = re.sub(r".*Reconstructed data\s*(\[[^\]]+\]).*", r"\1", matches[keywords[0]][0])
|
||||
result = re.sub(r".*Reconstructed data\s*(\[[^\]]+\]).*", r"\1", matches[keyword][0])
|
||||
|
||||
result_bytes = []
|
||||
try:
|
||||
@ -98,7 +101,7 @@ class NomosCli:
|
||||
result_bytes = remove_padding(result_bytes)
|
||||
result = bytes(result_bytes).decode("utf-8")
|
||||
|
||||
DS.nomos_nodes.remove(self)
|
||||
DS.client_nodes.remove(self)
|
||||
|
||||
return result
|
||||
|
||||
@ -109,3 +112,6 @@ class NomosCli:
|
||||
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
||||
def kill(self):
|
||||
self._container = kill(self._container)
|
||||
|
||||
def name(self):
|
||||
return self._container_name
|
||||
101
src/client/proxy_client.py
Normal file
101
src/client/proxy_client.py
Normal file
@ -0,0 +1,101 @@
|
||||
import os
|
||||
|
||||
from src.api_clients.invalid_rest import InvalidRest
|
||||
from src.api_clients.rest import REST
|
||||
from src.data_storage import DS
|
||||
from src.libs.common import generate_log_prefix, delay, remove_padding
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
|
||||
from src.client.client_vars import http_proxy
|
||||
from src.docker_manager import DockerManager, stop, kill
|
||||
from src.env_vars import DOCKER_LOG_DIR, NOMOS_CLI
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class ProxyClient:
|
||||
def __init__(self):
|
||||
command = "configurable-http-proxy"
|
||||
|
||||
logger.debug(f"ProxyClient is going to be initialized with this config {http_proxy[command]}")
|
||||
self._command = command
|
||||
self._image_name = http_proxy[command]["image"]
|
||||
self._internal_ports = http_proxy[command]["ports"]
|
||||
self._volumes = http_proxy[command]["volumes"]
|
||||
self._entrypoint = http_proxy[command]["entrypoint"]
|
||||
|
||||
container_name = "proxy-client-" + generate_log_prefix()
|
||||
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{container_name}__{self._image_name.replace('/', '_')}.log")
|
||||
self._docker_manager = DockerManager(self._image_name)
|
||||
self._container_name = container_name
|
||||
self._container = None
|
||||
self._api = None
|
||||
self._invalid_api = None
|
||||
|
||||
cwd = os.getcwd()
|
||||
self._volumes = [cwd + "/" + volume for volume in self._volumes]
|
||||
|
||||
def run(self, input_values=None, **kwargs):
|
||||
logger.debug(f"ProxyClient starting with log path {self._log_path}")
|
||||
|
||||
self._port_map = {}
|
||||
self._external_ports = self._docker_manager.generate_ports(count=1)
|
||||
self._tcp_port = self._external_ports[0]
|
||||
self._api = REST(self._tcp_port)
|
||||
self._invalid_api = InvalidRest(self._tcp_port)
|
||||
|
||||
logger.debug(f"Internal ports {self._internal_ports}")
|
||||
|
||||
for i, port in enumerate(self._internal_ports):
|
||||
self._port_map[port] = int(self._external_ports[i])
|
||||
|
||||
logger.debug(f"Port map {self._port_map}")
|
||||
|
||||
cmd = [self._command]
|
||||
|
||||
for flag in http_proxy[self._command]["flags"]:
|
||||
for f, indexes in flag.items():
|
||||
cmd.append(f)
|
||||
for j in indexes:
|
||||
cmd.append(input_values[j])
|
||||
|
||||
logger.debug(f"ProxyCLient command to run {cmd}")
|
||||
|
||||
self._container = self._docker_manager.start_container(
|
||||
self._docker_manager.image,
|
||||
port_bindings=self._port_map,
|
||||
args=None,
|
||||
log_path=self._log_path,
|
||||
volumes=self._volumes,
|
||||
entrypoint=self._entrypoint,
|
||||
remove_container=True,
|
||||
name=self._container_name,
|
||||
command=cmd,
|
||||
)
|
||||
|
||||
logger.info(f"Started container {self._container_name} from image {self._image_name}.")
|
||||
DS.client_nodes.append(self)
|
||||
|
||||
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
||||
def stop(self):
|
||||
self._container = stop(self._container)
|
||||
|
||||
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
||||
def kill(self):
|
||||
self._container = kill(self._container)
|
||||
|
||||
def name(self):
|
||||
return self._container_name
|
||||
|
||||
def send_dispersal_request(self, data, send_invalid=False):
|
||||
if send_invalid:
|
||||
return self._invalid_api.send_dispersal_request(data)
|
||||
|
||||
return self._api.send_dispersal_request(data)
|
||||
|
||||
def send_get_data_range_request(self, data, send_invalid=False):
|
||||
if send_invalid:
|
||||
return self._invalid_api.send_get_range(data)
|
||||
|
||||
return self._api.send_get_range(data)
|
||||
@ -1,3 +1,4 @@
|
||||
# We use this class for global variables
|
||||
class DS:
|
||||
nomos_nodes = []
|
||||
client_nodes = []
|
||||
|
||||
@ -142,35 +142,31 @@ class DockerManager:
|
||||
for keyword in keywords:
|
||||
if use_regex:
|
||||
if re.search(keyword, line, re.IGNORECASE):
|
||||
matches[keyword].append(line.strip())
|
||||
matches[keyword].append(line)
|
||||
else:
|
||||
if keyword.lower() in line.lower():
|
||||
matches[keyword].append(line.strip())
|
||||
matches[keyword].append(line)
|
||||
|
||||
return matches
|
||||
|
||||
def search_log_for_keywords(self, log_path, keywords, use_regex=False, log_stream=None):
|
||||
matches = {}
|
||||
matches = {keyword: [] for keyword in keywords}
|
||||
|
||||
# Read from stream
|
||||
if log_stream is not None:
|
||||
for line in log_stream:
|
||||
matches = self.find_keywords_in_line(keywords, line.decode("utf-8"), use_regex=use_regex)
|
||||
if log_stream is None:
|
||||
log_stream = open(log_path, "r")
|
||||
|
||||
else:
|
||||
# Open the log file and search line by line
|
||||
with open(log_path, "r") as log_file:
|
||||
for line in log_file:
|
||||
matches = self.find_keywords_in_line(keywords, line, use_regex=use_regex)
|
||||
for line in log_stream:
|
||||
# Decode line if it is a byte object not str
|
||||
if hasattr(line, "decode"):
|
||||
line = line.decode("utf-8")
|
||||
line_matches = self.find_keywords_in_line(keywords, line, use_regex=use_regex)
|
||||
for keyword, result in line_matches.items():
|
||||
matches[keyword].extend(result)
|
||||
|
||||
# Check if there were any matches
|
||||
if any(matches[keyword] for keyword in keywords):
|
||||
for keyword, lines in matches.items():
|
||||
if lines:
|
||||
logger.debug(f"Found matches for keyword '{keyword}': {lines}")
|
||||
if any(matches_list for matches_list in matches.values()):
|
||||
return matches
|
||||
else:
|
||||
logger.debug("No keywords found in the nomos logs.")
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@ -15,18 +15,20 @@ def get_env_var(var_name, default=None):
|
||||
|
||||
|
||||
# Configuration constants. Need to be upercase to appear in reports
|
||||
DEFAULT_NOMOS_IMAGE = "ghcr.io/logos-co/nomos-node:testnet"
|
||||
NOMOS_IMAGE = get_env_var("NOMOS_IMAGE", DEFAULT_NOMOS_IMAGE)
|
||||
|
||||
DEFAULT_PROXY_IMAGE = "bitnami/configurable-http-proxy:latest"
|
||||
HTTP_PROXY_IMAGE = get_env_var("HTTP_PROXY_IMAGE", DEFAULT_PROXY_IMAGE)
|
||||
|
||||
NOMOS = "nomos"
|
||||
NOMOS_EXECUTOR = "nomos_executor"
|
||||
CFGSYNC = "cfgsync"
|
||||
|
||||
DEFAULT_IMAGE = "ghcr.io/logos-co/nomos-node:testnet"
|
||||
|
||||
NODE_1 = get_env_var("NODE_1", NOMOS)
|
||||
NODE_2 = get_env_var("NODE_2", NOMOS_EXECUTOR)
|
||||
NODE_3 = get_env_var("NODE_3", CFGSYNC)
|
||||
|
||||
NOMOS_IMAGE = get_env_var("NOMOS_IMAGE", DEFAULT_IMAGE)
|
||||
|
||||
NOMOS_CLI = "/usr/bin/nomos-cli"
|
||||
|
||||
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{NOMOS},{NOMOS}")
|
||||
@ -38,3 +40,4 @@ IP_RANGE = get_env_var("IP_RANGE", "172.19.0.0/24")
|
||||
GATEWAY = get_env_var("GATEWAY", "172.19.0.1")
|
||||
RUNNING_IN_CI = get_env_var("CI")
|
||||
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20)
|
||||
CHECK_LOG_ERRORS = get_env_var("CHECK_LOG_ERRORS", False)
|
||||
|
||||
@ -52,3 +52,44 @@ def generate_random_bytes(n=31):
|
||||
if n < 0:
|
||||
raise ValueError("Input must be an unsigned integer (non-negative)")
|
||||
return os.urandom(n)
|
||||
|
||||
|
||||
def add_padding(orig_bytes):
|
||||
"""
|
||||
Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme:
|
||||
- The value of each padded byte is the number of bytes padded.
|
||||
- If the original data is already a multiple of the block size,
|
||||
an additional full block of bytes (each the block size) is added.
|
||||
"""
|
||||
block_size = 31
|
||||
original_len = len(orig_bytes)
|
||||
padding_needed = block_size - (original_len % block_size)
|
||||
# If the data is already a multiple of block_size, add a full block of padding
|
||||
if padding_needed == 0:
|
||||
padding_needed = block_size
|
||||
|
||||
# Each padded byte will be equal to padding_needed
|
||||
padded_bytes = orig_bytes + [padding_needed] * padding_needed
|
||||
return padded_bytes
|
||||
|
||||
|
||||
def remove_padding(padded_bytes):
|
||||
"""
|
||||
Removes PKCS#7-like padding from a list of bytes.
|
||||
Raises:
|
||||
ValueError: If the padding is incorrect.
|
||||
Returns:
|
||||
The original list of bytes without padding.
|
||||
"""
|
||||
if not padded_bytes:
|
||||
raise ValueError("The input is empty, cannot remove padding.")
|
||||
|
||||
padding_len = padded_bytes[-1]
|
||||
|
||||
if padding_len < 1 or padding_len > 31:
|
||||
raise ValueError("Invalid padding length.")
|
||||
|
||||
if padded_bytes[-padding_len:] != [padding_len] * padding_len:
|
||||
raise ValueError("Invalid padding bytes.")
|
||||
|
||||
return padded_bytes[:-padding_len]
|
||||
|
||||
@ -78,8 +78,7 @@ class NomosNode:
|
||||
name=self._container_name,
|
||||
)
|
||||
|
||||
logger.debug(f"Started container from image {self._image_name}. " f"REST: {getattr(self, '_tcp_port', 'N/A')}")
|
||||
|
||||
logger.info(f"Started container {self._container_name} from image {self._image_name}. " f"REST: {getattr(self, '_tcp_port', 'N/A')}")
|
||||
DS.nomos_nodes.append(self)
|
||||
|
||||
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
||||
@ -126,6 +125,15 @@ class NomosNode:
|
||||
def name(self):
|
||||
return self._container_name
|
||||
|
||||
def api_port(self):
|
||||
return self._tcp_port
|
||||
|
||||
def api_port_internal(self):
|
||||
for internal_port, external_port in self._port_map.items():
|
||||
if str(external_port).replace("/tcp", "") == self._tcp_port:
|
||||
return internal_port.replace("/tcp", "")
|
||||
return None
|
||||
|
||||
def check_nomos_log_errors(self, whitelist=None):
|
||||
keywords = LOG_ERROR_KEYWORDS
|
||||
|
||||
@ -133,8 +141,15 @@ class NomosNode:
|
||||
if whitelist:
|
||||
keywords = [keyword for keyword in keywords if keyword not in whitelist]
|
||||
|
||||
matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False)
|
||||
assert not matches, f"Found errors {matches}"
|
||||
matches_found = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False)
|
||||
|
||||
logger.info(f"Printing log matches for {self.name()}")
|
||||
if matches_found:
|
||||
for keyword, log_lines in matches_found.items():
|
||||
for line in log_lines:
|
||||
logger.debug(f"Log line matching keyword '{keyword}': {line}")
|
||||
else:
|
||||
logger.debug("No keyword matches found in the logs.")
|
||||
|
||||
def send_dispersal_request(self, data):
|
||||
return self._api.send_dispersal_request(data)
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import inspect
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import pytest
|
||||
|
||||
from src.client.proxy_client import ProxyClient
|
||||
from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR
|
||||
from src.libs.common import delay
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
@ -43,6 +43,7 @@ class StepsCommon:
|
||||
def cluster_setup(self):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
self.main_nodes = []
|
||||
self.client_nodes = []
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def setup_2_node_cluster(self, request):
|
||||
@ -87,3 +88,23 @@ class StepsCommon:
|
||||
raise
|
||||
|
||||
delay(5)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def setup_proxy_clients(self, request):
|
||||
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
|
||||
|
||||
assert len(self.main_nodes) == 3, "There should be two Nomos nodes running already"
|
||||
|
||||
if hasattr(request, "param"):
|
||||
num_clients = request.param
|
||||
else:
|
||||
num_clients = 10
|
||||
|
||||
assert num_clients % 2 == 0, "num_clients must be an even number"
|
||||
|
||||
# Every even proxy client for get-range, every odd for dispersal
|
||||
for i in range(num_clients):
|
||||
proxy_client = ProxyClient()
|
||||
default_target = [f"http://{self.main_nodes[1 + i % 2].name()}:18080"]
|
||||
proxy_client.run(input_values=default_target)
|
||||
self.client_nodes.append(proxy_client)
|
||||
|
||||
@ -2,53 +2,13 @@ import allure
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
|
||||
from src.env_vars import NOMOS_EXECUTOR
|
||||
from src.libs.common import add_padding
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.common import StepsCommon
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
def add_padding(orig_bytes):
|
||||
"""
|
||||
Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme:
|
||||
- The value of each padded byte is the number of bytes padded.
|
||||
- If the original data is already a multiple of the block size,
|
||||
an additional full block of bytes (each the block size) is added.
|
||||
"""
|
||||
block_size = 31
|
||||
original_len = len(orig_bytes)
|
||||
padding_needed = block_size - (original_len % block_size)
|
||||
# If the data is already a multiple of block_size, add a full block of padding
|
||||
if padding_needed == 0:
|
||||
padding_needed = block_size
|
||||
|
||||
# Each padded byte will be equal to padding_needed
|
||||
padded_bytes = orig_bytes + [padding_needed] * padding_needed
|
||||
return padded_bytes
|
||||
|
||||
|
||||
def remove_padding(padded_bytes):
|
||||
"""
|
||||
Removes PKCS#7-like padding from a list of bytes.
|
||||
Raises:
|
||||
ValueError: If the padding is incorrect.
|
||||
Returns:
|
||||
The original list of bytes without padding.
|
||||
"""
|
||||
if not padded_bytes:
|
||||
raise ValueError("The input is empty, cannot remove padding.")
|
||||
|
||||
padding_len = padded_bytes[-1]
|
||||
|
||||
if padding_len < 1 or padding_len > 31:
|
||||
raise ValueError("Invalid padding length.")
|
||||
|
||||
if padded_bytes[-padding_len:] != [padding_len] * padding_len:
|
||||
raise ValueError("Invalid padding bytes.")
|
||||
|
||||
return padded_bytes[:-padding_len]
|
||||
|
||||
|
||||
def prepare_dispersal_request(data, app_id, index, utf8=True, padding=True):
|
||||
if utf8:
|
||||
data_bytes = data.encode("utf-8")
|
||||
@ -85,32 +45,49 @@ class StepsDataAvailability(StepsCommon):
|
||||
return executor
|
||||
|
||||
@allure.step
|
||||
def disperse_data(self, data, app_id, index, timeout_duration=65, utf8=True, padding=True):
|
||||
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(1), reraise=True)
|
||||
def disperse_data(self, data, app_id, index, client_node=None, **kwargs):
|
||||
|
||||
timeout_duration = kwargs.get("timeout_duration", 65)
|
||||
utf8 = kwargs.get("utf8", True)
|
||||
padding = kwargs.get("padding", True)
|
||||
send_invalid = kwargs.get("send_invalid", False)
|
||||
|
||||
request = prepare_dispersal_request(data, app_id, index, utf8=utf8, padding=padding)
|
||||
|
||||
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True)
|
||||
def disperse(my_self=self):
|
||||
response = []
|
||||
request = prepare_dispersal_request(data, app_id, index, utf8=utf8, padding=padding)
|
||||
executor = my_self.find_executor_node()
|
||||
try:
|
||||
response = executor.send_dispersal_request(request)
|
||||
if client_node is None:
|
||||
executor = my_self.find_executor_node()
|
||||
response = executor.send_dispersal_request(request)
|
||||
else:
|
||||
response = client_node.send_dispersal_request(request, send_invalid=send_invalid)
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||
|
||||
assert hasattr(response, "status_code"), "Missing status_code"
|
||||
assert response.status_code in (200, 429), "Unexpected status code"
|
||||
|
||||
return response
|
||||
|
||||
return disperse()
|
||||
|
||||
@allure.step
|
||||
def get_data_range(self, node, app_id, start, end, timeout_duration=45):
|
||||
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(1), reraise=True)
|
||||
def get_data_range(self, node, app_id, start, end, client_node=None, **kwargs):
|
||||
|
||||
timeout_duration = kwargs.get("timeout_duration", 65)
|
||||
send_invalid = kwargs.get("send_invalid", False)
|
||||
|
||||
query = prepare_get_range_request(app_id, start, end)
|
||||
|
||||
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True)
|
||||
def get_range():
|
||||
response = []
|
||||
query = prepare_get_range_request(app_id, start, end)
|
||||
try:
|
||||
response = node.send_get_data_range_request(query)
|
||||
if client_node is None:
|
||||
response = node.send_get_data_range_request(query)
|
||||
else:
|
||||
response = client_node.send_get_data_range_request(query, send_invalid=send_invalid)
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||
|
||||
|
||||
@ -23,6 +23,8 @@ LOG_ERROR_KEYWORDS = [
|
||||
"goexit",
|
||||
"race condition",
|
||||
"double free",
|
||||
"error",
|
||||
"warn",
|
||||
]
|
||||
|
||||
DATA_TO_DISPERSE = [
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import inspect
|
||||
import glob
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
from src.env_vars import CHECK_LOG_ERRORS
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
import os
|
||||
import pytest
|
||||
@ -68,25 +71,36 @@ def attach_logs_on_fail(request):
|
||||
attach_allure_file(file)
|
||||
|
||||
|
||||
def stop_node(node):
|
||||
try:
|
||||
node.stop()
|
||||
except Exception as ex:
|
||||
if "No such container" in str(ex):
|
||||
logger.error(f"Failed to stop container {node.name()} because of error {ex}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def close_open_nodes(attach_logs_on_fail):
|
||||
DS.nomos_nodes = []
|
||||
DS.client_nodes = []
|
||||
yield
|
||||
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
|
||||
crashed_containers = []
|
||||
for node in DS.nomos_nodes:
|
||||
try:
|
||||
node.stop()
|
||||
except Exception as ex:
|
||||
if "No such container" in str(ex):
|
||||
crashed_containers.append(node.image)
|
||||
logger.error(f"Failed to stop container because of error {ex}")
|
||||
assert not crashed_containers, f"Containers {crashed_containers} crashed during the test!!!"
|
||||
failed_cleanups = []
|
||||
with ThreadPoolExecutor(max_workers=30) as executor:
|
||||
node_cleanups = [executor.submit(stop_node, node) for node in DS.nomos_nodes + DS.client_nodes]
|
||||
for cleanup in as_completed(node_cleanups):
|
||||
try:
|
||||
cleanup.result()
|
||||
except Exception as ex:
|
||||
failed_cleanups.append(ex)
|
||||
|
||||
assert not failed_cleanups, f"Container cleanup failed with {failed_cleanups} !!!"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def check_nomos_log_errors():
|
||||
def check_nomos_log_errors(request):
|
||||
yield
|
||||
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
|
||||
for node in DS.nomos_nodes:
|
||||
node.check_nomos_log_errors()
|
||||
if CHECK_LOG_ERRORS:
|
||||
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
|
||||
for node in DS.nomos_nodes:
|
||||
node.check_nomos_log_errors()
|
||||
|
||||
@ -3,7 +3,7 @@ import random
|
||||
|
||||
import pytest
|
||||
|
||||
from src.cli.nomos_cli import NomosCli
|
||||
from src.client.nomos_cli import NomosCli
|
||||
from src.libs.common import delay, to_app_id, to_index
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.da import StepsDataAvailability
|
||||
|
||||
185
tests/dos_robustness/test_high_load_dos.py
Normal file
185
tests/dos_robustness/test_high_load_dos.py
Normal file
@ -0,0 +1,185 @@
|
||||
import random
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from src.libs.common import to_app_id, to_index, delay
|
||||
from src.steps.da import StepsDataAvailability, logger
|
||||
from src.test_data import DATA_TO_DISPERSE
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_2_node_cluster")
|
||||
class TestHighLoadDos(StepsDataAvailability):
|
||||
main_nodes = []
|
||||
client_nodes = []
|
||||
|
||||
def test_sustained_high_rate_upload(self):
|
||||
timeout = 60
|
||||
start_time = time.time()
|
||||
successful_dispersals = 0
|
||||
unsuccessful_dispersals = 0
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
|
||||
delay(0.01)
|
||||
try:
|
||||
response = self.disperse_data(DATA_TO_DISPERSE[7], to_app_id(1), to_index(0), timeout_duration=0)
|
||||
assert response.status_code == 200, f"Dispersal failed with status code {response.status_code}"
|
||||
successful_dispersals += 1
|
||||
except AssertionError:
|
||||
unsuccessful_dispersals += 1
|
||||
|
||||
assert successful_dispersals > 0, "No successful dispersal"
|
||||
|
||||
failure_ratio = unsuccessful_dispersals / successful_dispersals
|
||||
logger.info(f"Unsuccessful dispersals ratio: {failure_ratio}")
|
||||
|
||||
assert failure_ratio < 0.20, f"Dispersal failure ratio {failure_ratio} too high"
|
||||
|
||||
def test_sustained_high_rate_download(self):
|
||||
timeout = 60
|
||||
successful_downloads = 0
|
||||
unsuccessful_downloads = 0
|
||||
|
||||
response = self.disperse_data(DATA_TO_DISPERSE[7], to_app_id(1), to_index(0))
|
||||
assert response.status_code == 200, "Initial dispersal was not successful"
|
||||
|
||||
delay(5)
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
|
||||
delay(0.01)
|
||||
try:
|
||||
self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5), timeout_duration=0)
|
||||
successful_downloads += 1
|
||||
except Exception:
|
||||
unsuccessful_downloads += 1
|
||||
|
||||
assert successful_downloads > 0, "No successful data downloads"
|
||||
|
||||
failure_ratio = unsuccessful_downloads / successful_downloads
|
||||
logger.info(f"Unsuccessful download ratio: {failure_ratio}")
|
||||
|
||||
assert failure_ratio < 0.20, f"Data download failure ratio {failure_ratio} too high"
|
||||
|
||||
def test_sustained_high_rate_mixed(self):
|
||||
timeout = 60
|
||||
start_time = time.time()
|
||||
successful_dispersals = 0
|
||||
unsuccessful_dispersals = 0
|
||||
successful_downloads = 0
|
||||
unsuccessful_downloads = 0
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
|
||||
delay(0.01)
|
||||
try:
|
||||
response = self.disperse_data(DATA_TO_DISPERSE[6], to_app_id(1), to_index(0), timeout_duration=0)
|
||||
assert response.status_code == 200, f"Dispersal failed with status code {response.status_code}"
|
||||
successful_dispersals += 1
|
||||
except AssertionError:
|
||||
unsuccessful_dispersals += 1
|
||||
|
||||
try:
|
||||
self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5), timeout_duration=0)
|
||||
successful_downloads += 1
|
||||
except Exception:
|
||||
unsuccessful_downloads += 1
|
||||
|
||||
assert successful_dispersals > 0, "No successful dispersal"
|
||||
assert successful_downloads > 0, "No successful download"
|
||||
|
||||
failure_ratio_w = unsuccessful_dispersals / successful_dispersals
|
||||
failure_ratio_r = unsuccessful_downloads / successful_downloads
|
||||
|
||||
logger.info(f"Unsuccessful dispersals ratio: {failure_ratio_w}")
|
||||
logger.info(f"Unsuccessful download ratio: {failure_ratio_r}")
|
||||
|
||||
assert failure_ratio_w < 0.20, f"Dispersal failure ratio {failure_ratio_w} too high"
|
||||
assert failure_ratio_r < 0.20, f"Data download failure ratio {failure_ratio_r} too high"
|
||||
|
||||
@pytest.mark.usefixtures("setup_2_node_cluster", "setup_proxy_clients")
|
||||
def test_sustained_high_rate_multiple_clients(self):
|
||||
timeout = 60
|
||||
start_time = time.time()
|
||||
successful_dispersals = 0
|
||||
unsuccessful_dispersals = 0
|
||||
successful_downloads = 0
|
||||
unsuccessful_downloads = 0
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
|
||||
dispersal_cl, download_cl = random.choice(self.client_nodes[1::2]), random.choice(self.client_nodes[::2])
|
||||
|
||||
delay(0.01)
|
||||
try:
|
||||
response = self.disperse_data(DATA_TO_DISPERSE[6], to_app_id(1), to_index(0), client_node=dispersal_cl, timeout_duration=0)
|
||||
assert response.status_code == 200, f"Dispersal failed with status code {response.status_code}"
|
||||
successful_dispersals += 1
|
||||
except AssertionError:
|
||||
unsuccessful_dispersals += 1
|
||||
|
||||
try:
|
||||
self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5), client_node=download_cl, timeout_duration=0)
|
||||
successful_downloads += 1
|
||||
except Exception:
|
||||
unsuccessful_downloads += 1
|
||||
|
||||
assert successful_dispersals > 0, "No successful dispersal"
|
||||
assert successful_downloads > 0, "No successful download"
|
||||
|
||||
failure_ratio_w = unsuccessful_dispersals / successful_dispersals
|
||||
failure_ratio_r = unsuccessful_downloads / successful_downloads
|
||||
|
||||
logger.info(f"Unsuccessful dispersals ratio: {failure_ratio_w}")
|
||||
logger.info(f"Unsuccessful download ratio: {failure_ratio_r}")
|
||||
|
||||
assert failure_ratio_w < 0.20, f"Dispersal failure ratio {failure_ratio_w} too high"
|
||||
assert failure_ratio_r < 0.20, f"Data download failure ratio {failure_ratio_r} too high"
|
||||
|
||||
@pytest.mark.usefixtures("setup_2_node_cluster", "setup_proxy_clients")
|
||||
def test_sustained_high_rate_with_invalid_requests(self):
|
||||
timeout = 60
|
||||
start_time = time.time()
|
||||
successful_dispersals = 0
|
||||
unsuccessful_dispersals = 0
|
||||
successful_downloads = 0
|
||||
unsuccessful_downloads = 0
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
|
||||
dispersal_cl, download_cl = random.choice(self.client_nodes[1::2]), random.choice(self.client_nodes[::2])
|
||||
invalid = random.choice([False, True])
|
||||
|
||||
delay(0.01)
|
||||
try:
|
||||
response = self.disperse_data(
|
||||
DATA_TO_DISPERSE[6], to_app_id(1), to_index(0), client_node=dispersal_cl, timeout_duration=0, send_invalid=invalid
|
||||
)
|
||||
assert response.status_code == 200, f"Dispersal failed with status code {response.status_code}"
|
||||
successful_dispersals += 1
|
||||
except AssertionError:
|
||||
if not invalid:
|
||||
unsuccessful_dispersals += 1
|
||||
|
||||
try:
|
||||
self.get_data_range(
|
||||
self.node2, to_app_id(1), to_index(0), to_index(5), client_node=download_cl, timeout_duration=0, send_invalid=invalid
|
||||
)
|
||||
successful_downloads += 1
|
||||
except Exception:
|
||||
if not invalid:
|
||||
unsuccessful_downloads += 1
|
||||
|
||||
assert successful_dispersals > 0, "No successful dispersal"
|
||||
assert successful_downloads > 0, "No successful download"
|
||||
|
||||
failure_ratio_w = unsuccessful_dispersals / successful_dispersals
|
||||
failure_ratio_r = unsuccessful_downloads / successful_downloads
|
||||
|
||||
logger.info(f"Unsuccessful dispersals ratio: {failure_ratio_w}")
|
||||
logger.info(f"Unsuccessful download ratio: {failure_ratio_r}")
|
||||
|
||||
assert failure_ratio_w < 0.20, f"Dispersal failure ratio {failure_ratio_w} too high"
|
||||
assert failure_ratio_r < 0.20, f"Data download failure ratio {failure_ratio_r} too high"
|
||||
Loading…
x
Reference in New Issue
Block a user