test: Disperse large volume data [TC8.3] (#9)

* test large volume

* refactor prepare_cluster_config

* refactor test case

* add more cases

* update cases

* update cases

* use faker to generate text

* handle none response and remove assertion
This commit is contained in:
Radosław Kamiński 2025-03-13 10:18:02 +00:00 committed by GitHub
parent 8d298829ea
commit a2c33d6404
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 91 additions and 10 deletions

4
.gitignore vendored
View File

@ -104,3 +104,7 @@ dmypy.json
# Pyre type checker # Pyre type checker
.pyre/ .pyre/
log/
kzgrs/
cluster_config/cfgsync.yaml

View File

@ -8,7 +8,7 @@ active_slot_coeff: 0.9
# DaConfig related parameters # DaConfig related parameters
subnetwork_size: {{ subnet_size }} subnetwork_size: {{ subnet_size }}
dispersal_factor: 2 dispersal_factor: {{ dispersal_factor }}
num_samples: 1 num_samples: 1
num_subnets: {{ subnet_size }} num_subnets: {{ subnet_size }}
old_blobs_check_interval_secs: 5 old_blobs_check_interval_secs: 5

View File

@ -9,6 +9,7 @@ click==8.1.7
distlib==0.3.8 distlib==0.3.8
docker==7.0.0 docker==7.0.0
execnet==2.0.2 execnet==2.0.2
Faker==37.0.0
filelock==3.13.1 filelock==3.13.1
identify==2.5.33 identify==2.5.33
idna==3.7 idna==3.7

View File

@ -9,6 +9,7 @@ logger = get_custom_logger(__name__)
class BaseClient: class BaseClient:
def make_request(self, method, url, headers=None, data=None): def make_request(self, method, url, headers=None, data=None):
self.log_request_as_curl(method, url, headers, data) self.log_request_as_curl(method, url, headers, data)
self.print_request_size(data)
response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT)
try: try:
response.raise_for_status() response.raise_for_status()
@ -35,3 +36,8 @@ class BaseClient:
headers_str_for_log = " ".join([f'-H "{key}: {value}"' for key, value in headers.items()]) if headers else "" headers_str_for_log = " ".join([f'-H "{key}: {value}"' for key, value in headers.items()]) if headers else ""
curl_cmd = f"curl -v -X {method.upper()} \"{url}\" {headers_str_for_log} -d '{data}'" curl_cmd = f"curl -v -X {method.upper()} \"{url}\" {headers_str_for_log} -d '{data}'"
logger.info(curl_cmd) logger.info(curl_cmd)
def print_request_size(self, data):
body_size = len(data) if data else 0
body_kb = body_size / 1024
logger.debug(f"Body size: {body_kb:.2f}kB")

View File

@ -1,8 +1,11 @@
import math
import random import random
import string import string
import uuid import uuid
from datetime import datetime from datetime import datetime
from time import sleep from time import sleep
from faker import Faker
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
import os import os
import allure import allure
@ -54,6 +57,19 @@ def generate_random_bytes(n=31):
return os.urandom(n) return os.urandom(n)
def generate_text_data(target_size):
faker = Faker()
text_data = faker.text(max_nb_chars=math.floor(target_size * 1.2)) # 20% more than target size
text_data = " ".join(text_data.splitlines()) # remove newlines
while len(text_data.encode("utf-8")) > target_size: # trim to exact size
text_data = text_data[:-1]
logger.debug(f"Raw data size: {len(text_data.encode("utf-8"))}\n\t{text_data}")
return text_data
def add_padding(orig_bytes): def add_padding(orig_bytes):
""" """
Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme: Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme:

View File

@ -14,7 +14,7 @@ from jinja2 import Template
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
def prepare_cluster_config(node_count, subnetwork_size=2): def prepare_cluster_config(node_count, subnetwork_size=2, dispersal_factor=2):
cwd = os.getcwd() cwd = os.getcwd()
config_dir = "cluster_config" config_dir = "cluster_config"
@ -22,7 +22,7 @@ def prepare_cluster_config(node_count, subnetwork_size=2):
template_content = file.read() template_content = file.read()
template = Template(template_content) template = Template(template_content)
rendered = template.render(num_hosts=node_count, subnet_size=subnetwork_size) rendered = template.render(num_hosts=node_count, subnet_size=subnetwork_size, dispersal_factor=dispersal_factor)
with open(f"{cwd}/{config_dir}/cfgsync.yaml", "w") as outfile: with open(f"{cwd}/{config_dir}/cfgsync.yaml", "w") as outfile:
outfile.write(rendered) outfile.write(rendered)
@ -38,6 +38,10 @@ def ensure_nodes_ready(nodes):
node.ensure_ready() node.ensure_ready()
def get_param_or_default(request, param_name, default_value):
return request.param.get(param_name, default_value) if hasattr(request, "param") else default_value
class StepsCommon: class StepsCommon:
@pytest.fixture(scope="function", autouse=True) @pytest.fixture(scope="function", autouse=True)
def cluster_setup(self): def cluster_setup(self):
@ -49,12 +53,10 @@ class StepsCommon:
def setup_2_node_cluster(self, request): def setup_2_node_cluster(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
if hasattr(request, "param"): subnet_size = get_param_or_default(request, "subnet_size", 2)
subnet_size = request.param dispersal_factor = get_param_or_default(request, "dispersal_factor", 2)
else: prepare_cluster_config(2, subnet_size, dispersal_factor)
subnet_size = 2
prepare_cluster_config(2, subnet_size)
self.node1 = NomosNode(CFGSYNC, "cfgsync") self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0") self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1") self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1")
@ -72,7 +74,11 @@ class StepsCommon:
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def setup_4_node_cluster(self, request): def setup_4_node_cluster(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
prepare_cluster_config(4)
subnet_size = get_param_or_default(request, "subnet_size", 4)
dispersal_factor = get_param_or_default(request, "dispersal_factor", 1)
prepare_cluster_config(4, subnet_size, dispersal_factor)
self.node1 = NomosNode(CFGSYNC, "cfgsync") self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0") self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS, "nomos_node_1") self.node3 = NomosNode(NOMOS, "nomos_node_1")

View File

@ -29,6 +29,9 @@ def prepare_get_range_request(app_id, start_index, end_index):
def response_contains_data(response): def response_contains_data(response):
if response is None:
return False
for index, blobs in response: for index, blobs in response:
if len(blobs) != 0: if len(blobs) != 0:
return True return True
@ -76,11 +79,12 @@ class StepsDataAvailability(StepsCommon):
def get_data_range(self, node, app_id, start, end, client_node=None, **kwargs): def get_data_range(self, node, app_id, start, end, client_node=None, **kwargs):
timeout_duration = kwargs.get("timeout_duration", 65) timeout_duration = kwargs.get("timeout_duration", 65)
interval = kwargs.get("interval", 0.1)
send_invalid = kwargs.get("send_invalid", False) send_invalid = kwargs.get("send_invalid", False)
query = prepare_get_range_request(app_id, start, end) query = prepare_get_range_request(app_id, start, end)
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True)
def get_range(): def get_range():
response = [] response = []
try: try:

View File

@ -0,0 +1,44 @@
import pytest
from src.libs.common import delay, generate_text_data, to_app_id, to_index
from src.libs.custom_logger import get_custom_logger
from src.steps.da import StepsDataAvailability
logger = get_custom_logger(__name__)
class TestLargeVolume(StepsDataAvailability):
@pytest.mark.usefixtures("setup_4_node_cluster")
@pytest.mark.parametrize(
"setup_4_node_cluster,raw_data_size",
[
({"subnet_size": 4, "dispersal_factor": 1}, 50), # => ~~0.5kB
({"subnet_size": 64, "dispersal_factor": 16}, 800), # => ~~ 4kB
({"subnet_size": 2048, "dispersal_factor": 512}, 53 * 1024), # => ~~254kB, spec limit: 256kB
],
indirect=["setup_4_node_cluster"],
)
def test_large_volume_dispersal(self, raw_data_size):
data = generate_text_data(raw_data_size)
try:
response = self.disperse_data(data, to_app_id(1), to_index(0), timeout_duration=0)
except Exception as ex:
raise Exception(f"Dispersal was not successful with error {ex}")
assert response.status_code == 200
delay(5)
self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5), timeout_duration=20, interval=1)
@pytest.mark.usefixtures("setup_2_node_cluster")
@pytest.mark.parametrize(
"setup_2_node_cluster,raw_data_size",
[
({"subnet_size": 2, "dispersal_factor": 2}, 50),
],
indirect=["setup_2_node_cluster"],
)
def test_large_volume_dispersal_2node(self, raw_data_size):
self.test_large_volume_dispersal(raw_data_size)