diff --git a/.gitignore b/.gitignore index a8cfc79..ff4e95e 100644 --- a/.gitignore +++ b/.gitignore @@ -104,3 +104,7 @@ dmypy.json # Pyre type checker .pyre/ + +log/ +kzgrs/ +cluster_config/cfgsync.yaml \ No newline at end of file diff --git a/cluster_config/cfgsync-template.yaml b/cluster_config/cfgsync-template.yaml index cad58a2..069440f 100644 --- a/cluster_config/cfgsync-template.yaml +++ b/cluster_config/cfgsync-template.yaml @@ -8,7 +8,7 @@ active_slot_coeff: 0.9 # DaConfig related parameters subnetwork_size: {{ subnet_size }} -dispersal_factor: 2 +dispersal_factor: {{ dispersal_factor }} num_samples: 1 num_subnets: {{ subnet_size }} old_blobs_check_interval_secs: 5 diff --git a/requirements.txt b/requirements.txt index 3ae825e..e90168e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ click==8.1.7 distlib==0.3.8 docker==7.0.0 execnet==2.0.2 +Faker==37.0.0 filelock==3.13.1 identify==2.5.33 idna==3.7 diff --git a/src/api_clients/base_client.py b/src/api_clients/base_client.py index b731e0f..c0381e6 100644 --- a/src/api_clients/base_client.py +++ b/src/api_clients/base_client.py @@ -9,6 +9,7 @@ logger = get_custom_logger(__name__) class BaseClient: def make_request(self, method, url, headers=None, data=None): self.log_request_as_curl(method, url, headers, data) + self.print_request_size(data) response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) try: response.raise_for_status() @@ -35,3 +36,8 @@ class BaseClient: headers_str_for_log = " ".join([f'-H "{key}: {value}"' for key, value in headers.items()]) if headers else "" curl_cmd = f"curl -v -X {method.upper()} \"{url}\" {headers_str_for_log} -d '{data}'" logger.info(curl_cmd) + + def print_request_size(self, data): + body_size = len(data) if data else 0 + body_kb = body_size / 1024 + logger.debug(f"Body size: {body_kb:.2f}kB") diff --git a/src/libs/common.py b/src/libs/common.py index af03e25..b089576 100644 --- a/src/libs/common.py +++ b/src/libs/common.py @@ -1,8 +1,11 @@ +import math import random import string import uuid from datetime import datetime from time import sleep + +from faker import Faker from src.libs.custom_logger import get_custom_logger import os import allure @@ -54,6 +57,19 @@ def generate_random_bytes(n=31): return os.urandom(n) +def generate_text_data(target_size): + faker = Faker() + text_data = faker.text(max_nb_chars=math.floor(target_size * 1.2)) # 20% more than target size + text_data = " ".join(text_data.splitlines()) # remove newlines + + while len(text_data.encode("utf-8")) > target_size: # trim to exact size + text_data = text_data[:-1] + + logger.debug(f"Raw data size: {len(text_data.encode("utf-8"))}\n\t{text_data}") + + return text_data + + def add_padding(orig_bytes): """ Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme: diff --git a/src/steps/common.py b/src/steps/common.py index b08e045..71e4a35 100644 --- a/src/steps/common.py +++ b/src/steps/common.py @@ -14,7 +14,7 @@ from jinja2 import Template logger = get_custom_logger(__name__) -def prepare_cluster_config(node_count, subnetwork_size=2): +def prepare_cluster_config(node_count, subnetwork_size=2, dispersal_factor=2): cwd = os.getcwd() config_dir = "cluster_config" @@ -22,7 +22,7 @@ def prepare_cluster_config(node_count, subnetwork_size=2): template_content = file.read() template = Template(template_content) - rendered = template.render(num_hosts=node_count, subnet_size=subnetwork_size) + rendered = template.render(num_hosts=node_count, subnet_size=subnetwork_size, dispersal_factor=dispersal_factor) with open(f"{cwd}/{config_dir}/cfgsync.yaml", "w") as outfile: outfile.write(rendered) @@ -38,6 +38,10 @@ def ensure_nodes_ready(nodes): node.ensure_ready() +def get_param_or_default(request, param_name, default_value): + return request.param.get(param_name, default_value) if hasattr(request, "param") else default_value + + class StepsCommon: @pytest.fixture(scope="function", autouse=True) def cluster_setup(self): @@ -49,12 +53,10 @@ class StepsCommon: def setup_2_node_cluster(self, request): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - if hasattr(request, "param"): - subnet_size = request.param - else: - subnet_size = 2 + subnet_size = get_param_or_default(request, "subnet_size", 2) + dispersal_factor = get_param_or_default(request, "dispersal_factor", 2) + prepare_cluster_config(2, subnet_size, dispersal_factor) - prepare_cluster_config(2, subnet_size) self.node1 = NomosNode(CFGSYNC, "cfgsync") self.node2 = NomosNode(NOMOS, "nomos_node_0") self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1") @@ -72,7 +74,11 @@ class StepsCommon: @pytest.fixture(scope="function") def setup_4_node_cluster(self, request): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - prepare_cluster_config(4) + + subnet_size = get_param_or_default(request, "subnet_size", 4) + dispersal_factor = get_param_or_default(request, "dispersal_factor", 1) + prepare_cluster_config(4, subnet_size, dispersal_factor) + self.node1 = NomosNode(CFGSYNC, "cfgsync") self.node2 = NomosNode(NOMOS, "nomos_node_0") self.node3 = NomosNode(NOMOS, "nomos_node_1") diff --git a/src/steps/da.py b/src/steps/da.py index cffe4a9..8fd60e5 100644 --- a/src/steps/da.py +++ b/src/steps/da.py @@ -29,6 +29,9 @@ def prepare_get_range_request(app_id, start_index, end_index): def response_contains_data(response): + if response is None: + return False + for index, blobs in response: if len(blobs) != 0: return True @@ -76,11 +79,12 @@ class StepsDataAvailability(StepsCommon): def get_data_range(self, node, app_id, start, end, client_node=None, **kwargs): timeout_duration = kwargs.get("timeout_duration", 65) + interval = kwargs.get("interval", 0.1) send_invalid = kwargs.get("send_invalid", False) query = prepare_get_range_request(app_id, start, end) - @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True) + @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True) def get_range(): response = [] try: diff --git a/tests/dos_robustness/test_large_volume.py b/tests/dos_robustness/test_large_volume.py new file mode 100644 index 0000000..6493f64 --- /dev/null +++ b/tests/dos_robustness/test_large_volume.py @@ -0,0 +1,44 @@ +import pytest + +from src.libs.common import delay, generate_text_data, to_app_id, to_index +from src.libs.custom_logger import get_custom_logger +from src.steps.da import StepsDataAvailability + +logger = get_custom_logger(__name__) + + +class TestLargeVolume(StepsDataAvailability): + + @pytest.mark.usefixtures("setup_4_node_cluster") + @pytest.mark.parametrize( + "setup_4_node_cluster,raw_data_size", + [ + ({"subnet_size": 4, "dispersal_factor": 1}, 50), # => ~~0.5kB + ({"subnet_size": 64, "dispersal_factor": 16}, 800), # => ~~ 4kB + ({"subnet_size": 2048, "dispersal_factor": 512}, 53 * 1024), # => ~~254kB, spec limit: 256kB + ], + indirect=["setup_4_node_cluster"], + ) + def test_large_volume_dispersal(self, raw_data_size): + data = generate_text_data(raw_data_size) + + try: + response = self.disperse_data(data, to_app_id(1), to_index(0), timeout_duration=0) + except Exception as ex: + raise Exception(f"Dispersal was not successful with error {ex}") + + assert response.status_code == 200 + + delay(5) + self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5), timeout_duration=20, interval=1) + + @pytest.mark.usefixtures("setup_2_node_cluster") + @pytest.mark.parametrize( + "setup_2_node_cluster,raw_data_size", + [ + ({"subnet_size": 2, "dispersal_factor": 2}, 50), + ], + indirect=["setup_2_node_cluster"], + ) + def test_large_volume_dispersal_2node(self, raw_data_size): + self.test_large_volume_dispersal(raw_data_size)