diff --git a/cluster_config/cfgsync-template.yaml b/cluster_config/cfgsync-template.yaml index 308c315..59e93e6 100644 --- a/cluster_config/cfgsync-template.yaml +++ b/cluster_config/cfgsync-template.yaml @@ -13,6 +13,7 @@ num_samples: 1 num_subnets: 2 old_blobs_check_interval_secs: 5 blobs_validity_duration_secs: 60 +balancer_interval_secs: 1 global_params_path: "/kzgrs_test_params" # Tracing diff --git a/cluster_config/cfgsync.yaml b/cluster_config/cfgsync.yaml index 64b1df0..e1a0fad 100644 --- a/cluster_config/cfgsync.yaml +++ b/cluster_config/cfgsync.yaml @@ -7,12 +7,13 @@ security_param: 10 active_slot_coeff: 0.9 # DaConfig related parameters -subnetwork_size: 1024 +subnetwork_size: 2 dispersal_factor: 2 num_samples: 1 num_subnets: 2 old_blobs_check_interval_secs: 5 blobs_validity_duration_secs: 60 +balancer_interval_secs: 1 global_params_path: "/kzgrs_test_params" # Tracing diff --git a/src/env_vars.py b/src/env_vars.py index 1c05782..01706e0 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -19,7 +19,7 @@ NOMOS = "nomos" NOMOS_EXECUTOR = "nomos_executor" CFGSYNC = "cfgsync" -DEFAULT_IMAGE = "ghcr.io/logos-co/nomos-node:latest" +DEFAULT_IMAGE = "ghcr.io/logos-co/nomos-node:testnet" NODE_1 = get_env_var("NODE_1", NOMOS) NODE_2 = get_env_var("NODE_2", NOMOS_EXECUTOR) diff --git a/src/libs/common.py b/src/libs/common.py index 8f2961c..d9209de 100644 --- a/src/libs/common.py +++ b/src/libs/common.py @@ -38,3 +38,7 @@ def to_app_id(n: int) -> list: if n < 0: raise ValueError("Input must be an unsigned integer (non-negative)") return list(n.to_bytes(32, byteorder="big")) + + +def generate_random_bytes(n=31): + return os.urandom(n) diff --git a/src/node/nomos_node.py b/src/node/nomos_node.py index a46ce26..86c5658 100644 --- a/src/node/nomos_node.py +++ b/src/node/nomos_node.py @@ -123,6 +123,9 @@ class NomosNode: def node_type(self): return self._node_type + def name(self): + return self._container_name + def check_nomos_log_errors(self, whitelist=None): keywords = LOG_ERROR_KEYWORDS diff --git a/src/steps/common.py b/src/steps/common.py index 2ba7c8a..dcd593a 100644 --- a/src/steps/common.py +++ b/src/steps/common.py @@ -5,6 +5,7 @@ import shutil import pytest from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR +from src.libs.common import delay from src.libs.custom_logger import get_custom_logger from src.node.nomos_node import NomosNode @@ -65,6 +66,8 @@ class StepsCommon: logger.error(f"REST service did not become ready in time: {ex}") raise + delay(5) + @pytest.fixture(scope="function") def setup_4_node_cluster(self, request): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") @@ -82,3 +85,5 @@ class StepsCommon: except Exception as ex: logger.error(f"REST service did not become ready in time: {ex}") raise + + delay(5) diff --git a/src/steps/da.py b/src/steps/da.py index 779742b..5aaaa98 100644 --- a/src/steps/da.py +++ b/src/steps/da.py @@ -2,8 +2,11 @@ import allure from tenacity import retry, stop_after_delay, wait_fixed from src.env_vars import NOMOS_EXECUTOR +from src.libs.custom_logger import get_custom_logger from src.steps.common import StepsCommon +logger = get_custom_logger(__name__) + def add_padding(orig_bytes): """ @@ -46,10 +49,17 @@ def remove_padding(padded_bytes): return padded_bytes[:-padding_len] -def prepare_dispersal_request(data, app_id, index): - data_bytes = data.encode("utf-8") - padded_bytes = add_padding(list(data_bytes)) - dispersal_data = {"data": padded_bytes, "metadata": {"app_id": app_id, "index": index}} +def prepare_dispersal_request(data, app_id, index, utf8=True, padding=True): + if utf8: + data_bytes = data.encode("utf-8") + else: + data_bytes = bytes(data) + + data_list = list(data_bytes) + if padding: + data_list = add_padding(data_list) + + dispersal_data = {"data": data_list, "metadata": {"app_id": app_id, "index": index}} return dispersal_data @@ -76,9 +86,9 @@ class StepsDataAvailability(StepsCommon): @allure.step @retry(stop=stop_after_delay(65), wait=wait_fixed(1), reraise=True) - def disperse_data(self, data, app_id, index): + def disperse_data(self, data, app_id, index, utf8=True, padding=True): response = [] - request = prepare_dispersal_request(data, app_id, index) + request = prepare_dispersal_request(data, app_id, index, utf8=utf8, padding=padding) executor = self.find_executor_node() try: response = executor.send_dispersal_request(request) diff --git a/tests/data_integrity/test_data_integrity.py b/tests/data_integrity/test_data_integrity.py index 9e5ed9c..40eae4a 100644 --- a/tests/data_integrity/test_data_integrity.py +++ b/tests/data_integrity/test_data_integrity.py @@ -17,21 +17,25 @@ class TestDataIntegrity(StepsDataAvailability): @pytest.mark.usefixtures("setup_4_node_cluster") def test_da_identify_retrieve_missing_columns(self): - delay(5) self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0)) delay(5) - # Select one target node at random to get blob data for 1/2 columns - selected_node = self.main_nodes[random.randint(1, 3)] - rcv_data = self.get_data_range(selected_node, to_app_id(1), to_index(0), to_index(5)) - rcv_data_json = json.dumps(rcv_data) + test_results = [] + # Iterate through standard nodes to get blob data for 1/2 columns + for node in self.main_nodes[1:4]: + rcv_data = self.get_data_range(node, to_app_id(1), to_index(0), to_index(5)) + rcv_data_json = json.dumps(rcv_data) - reconstructed_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json]) + reconstructed_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json]) - assert DATA_TO_DISPERSE[1] == reconstructed_data, "Reconstructed data are not same with original data" + if DATA_TO_DISPERSE[1] == reconstructed_data: + test_results.append(node.name()) + + assert len(test_results) > 0, "Dispersed data were not received by any node" + + logger.info(f"Dispersed data received by : {test_results}") @pytest.mark.usefixtures("setup_2_node_cluster") def test_da_sampling_determines_data_presence(self): - delay(5) self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0)) delay(5) rcv_data = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5)) diff --git a/tests/e2e/test_2node_alive.py b/tests/e2e/test_2node_alive.py index fbf87d9..e6137e4 100644 --- a/tests/e2e/test_2node_alive.py +++ b/tests/e2e/test_2node_alive.py @@ -1,8 +1,6 @@ import pytest -from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR from src.libs.custom_logger import get_custom_logger -from src.node.nomos_node import NomosNode from src.steps.common import StepsCommon logger = get_custom_logger(__name__) diff --git a/tests/networking_privacy/test_networking_privacy.py b/tests/networking_privacy/test_networking_privacy.py index 30ae389..2ae100e 100644 --- a/tests/networking_privacy/test_networking_privacy.py +++ b/tests/networking_privacy/test_networking_privacy.py @@ -1,7 +1,7 @@ import pytest import psutil -from src.libs.common import delay, to_app_id, to_index +from src.libs.common import delay, to_app_id, to_index, generate_random_bytes from src.libs.custom_logger import get_custom_logger from src.steps.da import StepsDataAvailability from src.test_data import DATA_TO_DISPERSE @@ -12,9 +12,8 @@ logger = get_custom_logger(__name__) class TestNetworkingPrivacy(StepsDataAvailability): main_nodes = [] - @pytest.mark.parametrize("setup_2_node_cluster", [2], indirect=True) + @pytest.mark.usefixtures("setup_2_node_cluster") def test_consumed_bandwidth_dispersal(self, setup_2_node_cluster): - delay(5) net_io = psutil.net_io_counters() prev_total = net_io.bytes_sent + net_io.bytes_recv @@ -42,3 +41,36 @@ class TestNetworkingPrivacy(StepsDataAvailability): overhead = (consumed - data_sent) / data_sent assert overhead < 400, "Dispersal overhead is too high" + + @pytest.mark.usefixtures("setup_2_node_cluster") + def test_consumed_bandwidth_random_data_dispersal(self): + net_io = psutil.net_io_counters() + prev_total = net_io.bytes_sent + net_io.bytes_recv + + data_to_disperse = generate_random_bytes() + logger.debug(f"Using random data to disperse: {list(data_to_disperse)}") + + successful_dispersals = 0 + for i in range(20): + try: + self.disperse_data(data_to_disperse, to_app_id(1), to_index(0), utf8=False, padding=False) + successful_dispersals += 1 + except Exception as ex: + logger.warning(f"Dispersal #{i} was not successful with error {ex}") + + if successful_dispersals == 10: + break + + delay(0.1) + + net_io = psutil.net_io_counters() + curr_total = net_io.bytes_sent + net_io.bytes_recv + + consumed = curr_total - prev_total + + assert successful_dispersals == 10, "Unable to finish 10 successful dispersals" + + data_sent = 2 * successful_dispersals * len(data_to_disperse) + overhead = (consumed - data_sent) / data_sent + + assert overhead < 400, "Dispersal overhead is too high"