Merge pull request #6 from logos-co/chore-collect-more-insight-dispersal

chore: Collect more insight for data dispersal
This commit is contained in:
Roman Zajic 2025-02-28 17:17:39 +08:00 committed by GitHub
commit 84cdeed3dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 79 additions and 21 deletions

View File

@ -13,6 +13,7 @@ num_samples: 1
num_subnets: 2
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
balancer_interval_secs: 1
global_params_path: "/kzgrs_test_params"
# Tracing

View File

@ -7,12 +7,13 @@ security_param: 10
active_slot_coeff: 0.9
# DaConfig related parameters
subnetwork_size: 1024
subnetwork_size: 2
dispersal_factor: 2
num_samples: 1
num_subnets: 2
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
balancer_interval_secs: 1
global_params_path: "/kzgrs_test_params"
# Tracing

View File

@ -19,7 +19,7 @@ NOMOS = "nomos"
NOMOS_EXECUTOR = "nomos_executor"
CFGSYNC = "cfgsync"
DEFAULT_IMAGE = "ghcr.io/logos-co/nomos-node:latest"
DEFAULT_IMAGE = "ghcr.io/logos-co/nomos-node:testnet"
NODE_1 = get_env_var("NODE_1", NOMOS)
NODE_2 = get_env_var("NODE_2", NOMOS_EXECUTOR)

View File

@ -38,3 +38,7 @@ def to_app_id(n: int) -> list:
if n < 0:
raise ValueError("Input must be an unsigned integer (non-negative)")
return list(n.to_bytes(32, byteorder="big"))
def generate_random_bytes(n=31):
return os.urandom(n)

View File

@ -123,6 +123,9 @@ class NomosNode:
def node_type(self):
return self._node_type
def name(self):
return self._container_name
def check_nomos_log_errors(self, whitelist=None):
keywords = LOG_ERROR_KEYWORDS

View File

@ -5,6 +5,7 @@ import shutil
import pytest
from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR
from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger
from src.node.nomos_node import NomosNode
@ -65,6 +66,8 @@ class StepsCommon:
logger.error(f"REST service did not become ready in time: {ex}")
raise
delay(5)
@pytest.fixture(scope="function")
def setup_4_node_cluster(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
@ -82,3 +85,5 @@ class StepsCommon:
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
delay(5)

View File

@ -2,8 +2,11 @@ import allure
from tenacity import retry, stop_after_delay, wait_fixed
from src.env_vars import NOMOS_EXECUTOR
from src.libs.custom_logger import get_custom_logger
from src.steps.common import StepsCommon
logger = get_custom_logger(__name__)
def add_padding(orig_bytes):
"""
@ -46,10 +49,17 @@ def remove_padding(padded_bytes):
return padded_bytes[:-padding_len]
def prepare_dispersal_request(data, app_id, index):
data_bytes = data.encode("utf-8")
padded_bytes = add_padding(list(data_bytes))
dispersal_data = {"data": padded_bytes, "metadata": {"app_id": app_id, "index": index}}
def prepare_dispersal_request(data, app_id, index, utf8=True, padding=True):
if utf8:
data_bytes = data.encode("utf-8")
else:
data_bytes = bytes(data)
data_list = list(data_bytes)
if padding:
data_list = add_padding(data_list)
dispersal_data = {"data": data_list, "metadata": {"app_id": app_id, "index": index}}
return dispersal_data
@ -76,9 +86,9 @@ class StepsDataAvailability(StepsCommon):
@allure.step
@retry(stop=stop_after_delay(65), wait=wait_fixed(1), reraise=True)
def disperse_data(self, data, app_id, index):
def disperse_data(self, data, app_id, index, utf8=True, padding=True):
response = []
request = prepare_dispersal_request(data, app_id, index)
request = prepare_dispersal_request(data, app_id, index, utf8=utf8, padding=padding)
executor = self.find_executor_node()
try:
response = executor.send_dispersal_request(request)

View File

@ -17,21 +17,25 @@ class TestDataIntegrity(StepsDataAvailability):
@pytest.mark.usefixtures("setup_4_node_cluster")
def test_da_identify_retrieve_missing_columns(self):
delay(5)
self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0))
delay(5)
# Select one target node at random to get blob data for 1/2 columns
selected_node = self.main_nodes[random.randint(1, 3)]
rcv_data = self.get_data_range(selected_node, to_app_id(1), to_index(0), to_index(5))
rcv_data_json = json.dumps(rcv_data)
test_results = []
# Iterate through standard nodes to get blob data for 1/2 columns
for node in self.main_nodes[1:4]:
rcv_data = self.get_data_range(node, to_app_id(1), to_index(0), to_index(5))
rcv_data_json = json.dumps(rcv_data)
reconstructed_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json])
reconstructed_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json])
assert DATA_TO_DISPERSE[1] == reconstructed_data, "Reconstructed data are not same with original data"
if DATA_TO_DISPERSE[1] == reconstructed_data:
test_results.append(node.name())
assert len(test_results) > 0, "Dispersed data were not received by any node"
logger.info(f"Dispersed data received by : {test_results}")
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_da_sampling_determines_data_presence(self):
delay(5)
self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0))
delay(5)
rcv_data = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5))

View File

@ -1,8 +1,6 @@
import pytest
from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR
from src.libs.custom_logger import get_custom_logger
from src.node.nomos_node import NomosNode
from src.steps.common import StepsCommon
logger = get_custom_logger(__name__)

View File

@ -1,7 +1,7 @@
import pytest
import psutil
from src.libs.common import delay, to_app_id, to_index
from src.libs.common import delay, to_app_id, to_index, generate_random_bytes
from src.libs.custom_logger import get_custom_logger
from src.steps.da import StepsDataAvailability
from src.test_data import DATA_TO_DISPERSE
@ -12,9 +12,8 @@ logger = get_custom_logger(__name__)
class TestNetworkingPrivacy(StepsDataAvailability):
main_nodes = []
@pytest.mark.parametrize("setup_2_node_cluster", [2], indirect=True)
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_consumed_bandwidth_dispersal(self, setup_2_node_cluster):
delay(5)
net_io = psutil.net_io_counters()
prev_total = net_io.bytes_sent + net_io.bytes_recv
@ -42,3 +41,36 @@ class TestNetworkingPrivacy(StepsDataAvailability):
overhead = (consumed - data_sent) / data_sent
assert overhead < 400, "Dispersal overhead is too high"
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_consumed_bandwidth_random_data_dispersal(self):
net_io = psutil.net_io_counters()
prev_total = net_io.bytes_sent + net_io.bytes_recv
data_to_disperse = generate_random_bytes()
logger.debug(f"Using random data to disperse: {list(data_to_disperse)}")
successful_dispersals = 0
for i in range(20):
try:
self.disperse_data(data_to_disperse, to_app_id(1), to_index(0), utf8=False, padding=False)
successful_dispersals += 1
except Exception as ex:
logger.warning(f"Dispersal #{i} was not successful with error {ex}")
if successful_dispersals == 10:
break
delay(0.1)
net_io = psutil.net_io_counters()
curr_total = net_io.bytes_sent + net_io.bytes_recv
consumed = curr_total - prev_total
assert successful_dispersals == 10, "Unable to finish 10 successful dispersals"
data_sent = 2 * successful_dispersals * len(data_to_disperse)
overhead = (consumed - data_sent) / data_sent
assert overhead < 400, "Dispersal overhead is too high"