From ab9825ca0ad60f32ff4f198b7e3f7cbc9d7f38ad Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 28 Feb 2025 01:18:14 +0000 Subject: [PATCH 1/6] test: consumed bandwidth random data dispersal --- cluster_config/cfgsync.yaml | 2 +- src/libs/common.py | 5 +++ src/steps/da.py | 14 +++++--- .../test_networking_privacy.py | 36 ++++++++++++++++++- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/cluster_config/cfgsync.yaml b/cluster_config/cfgsync.yaml index 64b1df0..10840a5 100644 --- a/cluster_config/cfgsync.yaml +++ b/cluster_config/cfgsync.yaml @@ -7,7 +7,7 @@ security_param: 10 active_slot_coeff: 0.9 # DaConfig related parameters -subnetwork_size: 1024 +subnetwork_size: 2 dispersal_factor: 2 num_samples: 1 num_subnets: 2 diff --git a/src/libs/common.py b/src/libs/common.py index 8f2961c..20bfeb7 100644 --- a/src/libs/common.py +++ b/src/libs/common.py @@ -38,3 +38,8 @@ def to_app_id(n: int) -> list: if n < 0: raise ValueError("Input must be an unsigned integer (non-negative)") return list(n.to_bytes(32, byteorder="big")) + + +def generate_random_bytes(n_max=31): + random_n = random.randint(1, n_max) + return os.urandom(random_n) diff --git a/src/steps/da.py b/src/steps/da.py index 779742b..b613bb7 100644 --- a/src/steps/da.py +++ b/src/steps/da.py @@ -46,9 +46,13 @@ def remove_padding(padded_bytes): return padded_bytes[:-padding_len] -def prepare_dispersal_request(data, app_id, index): - data_bytes = data.encode("utf-8") - padded_bytes = add_padding(list(data_bytes)) +def prepare_dispersal_request(data, app_id, index, with_utf8_padding=True): + if with_utf8_padding: + data_bytes = data.encode("utf-8") + padded_bytes = add_padding(list(data_bytes)) + else: + padded_bytes = list(data) + dispersal_data = {"data": padded_bytes, "metadata": {"app_id": app_id, "index": index}} return dispersal_data @@ -76,9 +80,9 @@ class StepsDataAvailability(StepsCommon): @allure.step @retry(stop=stop_after_delay(65), wait=wait_fixed(1), reraise=True) - def disperse_data(self, data, app_id, index): + def disperse_data(self, data, app_id, index, with_utf8_padding=True): response = [] - request = prepare_dispersal_request(data, app_id, index) + request = prepare_dispersal_request(data, app_id, index, with_utf8_padding) executor = self.find_executor_node() try: response = executor.send_dispersal_request(request) diff --git a/tests/networking_privacy/test_networking_privacy.py b/tests/networking_privacy/test_networking_privacy.py index 30ae389..a17cdae 100644 --- a/tests/networking_privacy/test_networking_privacy.py +++ b/tests/networking_privacy/test_networking_privacy.py @@ -1,7 +1,7 @@ import pytest import psutil -from src.libs.common import delay, to_app_id, to_index +from src.libs.common import delay, to_app_id, to_index, generate_random_bytes from src.libs.custom_logger import get_custom_logger from src.steps.da import StepsDataAvailability from src.test_data import DATA_TO_DISPERSE @@ -42,3 +42,37 @@ class TestNetworkingPrivacy(StepsDataAvailability): overhead = (consumed - data_sent) / data_sent assert overhead < 400, "Dispersal overhead is too high" + + @pytest.mark.usefixtures("setup_2_node_cluster") + def test_consumed_bandwidth_random_data_dispersal(self): + delay(5) + net_io = psutil.net_io_counters() + prev_total = net_io.bytes_sent + net_io.bytes_recv + + data_to_disperse = generate_random_bytes() + logger.debug(f"Using random data to disperse: {list(data_to_disperse)}") + + successful_dispersals = 0 + for i in range(20): + try: + self.disperse_data(data_to_disperse, to_app_id(1), to_index(0), with_utf8_padding=False) + successful_dispersals += 1 + except Exception as ex: + logger.warning(f"Dispersal #{i} was not successful with error {ex}") + + if successful_dispersals == 10: + break + + delay(0.1) + + net_io = psutil.net_io_counters() + curr_total = net_io.bytes_sent + net_io.bytes_recv + + consumed = curr_total - prev_total + + assert successful_dispersals == 10, "Unable to finish 10 successful dispersals" + + data_sent = 2 * successful_dispersals * len(data_to_disperse) + overhead = (consumed - data_sent) / data_sent + + assert overhead < 400, "Dispersal overhead is too high" From 1c227a7eca0c89786bb9eb830af6952349ec9e97 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 28 Feb 2025 01:27:08 +0000 Subject: [PATCH 2/6] fix: switch to testnet image - limit random bytes to fixed length - add balancer_interval_secs property --- cluster_config/cfgsync-template.yaml | 1 + cluster_config/cfgsync.yaml | 1 + src/env_vars.py | 2 +- src/libs/common.py | 5 ++--- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cluster_config/cfgsync-template.yaml b/cluster_config/cfgsync-template.yaml index 308c315..59e93e6 100644 --- a/cluster_config/cfgsync-template.yaml +++ b/cluster_config/cfgsync-template.yaml @@ -13,6 +13,7 @@ num_samples: 1 num_subnets: 2 old_blobs_check_interval_secs: 5 blobs_validity_duration_secs: 60 +balancer_interval_secs: 1 global_params_path: "/kzgrs_test_params" # Tracing diff --git a/cluster_config/cfgsync.yaml b/cluster_config/cfgsync.yaml index 10840a5..e1a0fad 100644 --- a/cluster_config/cfgsync.yaml +++ b/cluster_config/cfgsync.yaml @@ -13,6 +13,7 @@ num_samples: 1 num_subnets: 2 old_blobs_check_interval_secs: 5 blobs_validity_duration_secs: 60 +balancer_interval_secs: 1 global_params_path: "/kzgrs_test_params" # Tracing diff --git a/src/env_vars.py b/src/env_vars.py index 1c05782..01706e0 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -19,7 +19,7 @@ NOMOS = "nomos" NOMOS_EXECUTOR = "nomos_executor" CFGSYNC = "cfgsync" -DEFAULT_IMAGE = "ghcr.io/logos-co/nomos-node:latest" +DEFAULT_IMAGE = "ghcr.io/logos-co/nomos-node:testnet" NODE_1 = get_env_var("NODE_1", NOMOS) NODE_2 = get_env_var("NODE_2", NOMOS_EXECUTOR) diff --git a/src/libs/common.py b/src/libs/common.py index 20bfeb7..d9209de 100644 --- a/src/libs/common.py +++ b/src/libs/common.py @@ -40,6 +40,5 @@ def to_app_id(n: int) -> list: return list(n.to_bytes(32, byteorder="big")) -def generate_random_bytes(n_max=31): - random_n = random.randint(1, n_max) - return os.urandom(random_n) +def generate_random_bytes(n=31): + return os.urandom(n) From 0b0c59ea6bb961bc315b1b471a20c737a93a955a Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 28 Feb 2025 01:29:08 +0000 Subject: [PATCH 3/6] fix: use fixture without params --- tests/networking_privacy/test_networking_privacy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/networking_privacy/test_networking_privacy.py b/tests/networking_privacy/test_networking_privacy.py index a17cdae..396a69a 100644 --- a/tests/networking_privacy/test_networking_privacy.py +++ b/tests/networking_privacy/test_networking_privacy.py @@ -12,7 +12,7 @@ logger = get_custom_logger(__name__) class TestNetworkingPrivacy(StepsDataAvailability): main_nodes = [] - @pytest.mark.parametrize("setup_2_node_cluster", [2], indirect=True) + @pytest.mark.usefixtures("setup_2_node_cluster") def test_consumed_bandwidth_dispersal(self, setup_2_node_cluster): delay(5) net_io = psutil.net_io_counters() From 442a94551d7eb45fa6e015d0bb59f4bbc8108711 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 28 Feb 2025 01:34:53 +0000 Subject: [PATCH 4/6] fix: reduce delay(5) occurrence --- src/steps/common.py | 5 +++++ tests/data_integrity/test_data_integrity.py | 2 -- tests/e2e/test_2node_alive.py | 2 -- tests/networking_privacy/test_networking_privacy.py | 2 -- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/steps/common.py b/src/steps/common.py index 2ba7c8a..dcd593a 100644 --- a/src/steps/common.py +++ b/src/steps/common.py @@ -5,6 +5,7 @@ import shutil import pytest from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR +from src.libs.common import delay from src.libs.custom_logger import get_custom_logger from src.node.nomos_node import NomosNode @@ -65,6 +66,8 @@ class StepsCommon: logger.error(f"REST service did not become ready in time: {ex}") raise + delay(5) + @pytest.fixture(scope="function") def setup_4_node_cluster(self, request): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") @@ -82,3 +85,5 @@ class StepsCommon: except Exception as ex: logger.error(f"REST service did not become ready in time: {ex}") raise + + delay(5) diff --git a/tests/data_integrity/test_data_integrity.py b/tests/data_integrity/test_data_integrity.py index 9e5ed9c..e68544d 100644 --- a/tests/data_integrity/test_data_integrity.py +++ b/tests/data_integrity/test_data_integrity.py @@ -17,7 +17,6 @@ class TestDataIntegrity(StepsDataAvailability): @pytest.mark.usefixtures("setup_4_node_cluster") def test_da_identify_retrieve_missing_columns(self): - delay(5) self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0)) delay(5) # Select one target node at random to get blob data for 1/2 columns @@ -31,7 +30,6 @@ class TestDataIntegrity(StepsDataAvailability): @pytest.mark.usefixtures("setup_2_node_cluster") def test_da_sampling_determines_data_presence(self): - delay(5) self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0)) delay(5) rcv_data = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5)) diff --git a/tests/e2e/test_2node_alive.py b/tests/e2e/test_2node_alive.py index fbf87d9..e6137e4 100644 --- a/tests/e2e/test_2node_alive.py +++ b/tests/e2e/test_2node_alive.py @@ -1,8 +1,6 @@ import pytest -from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR from src.libs.custom_logger import get_custom_logger -from src.node.nomos_node import NomosNode from src.steps.common import StepsCommon logger = get_custom_logger(__name__) diff --git a/tests/networking_privacy/test_networking_privacy.py b/tests/networking_privacy/test_networking_privacy.py index 396a69a..9b85ee7 100644 --- a/tests/networking_privacy/test_networking_privacy.py +++ b/tests/networking_privacy/test_networking_privacy.py @@ -14,7 +14,6 @@ class TestNetworkingPrivacy(StepsDataAvailability): @pytest.mark.usefixtures("setup_2_node_cluster") def test_consumed_bandwidth_dispersal(self, setup_2_node_cluster): - delay(5) net_io = psutil.net_io_counters() prev_total = net_io.bytes_sent + net_io.bytes_recv @@ -45,7 +44,6 @@ class TestNetworkingPrivacy(StepsDataAvailability): @pytest.mark.usefixtures("setup_2_node_cluster") def test_consumed_bandwidth_random_data_dispersal(self): - delay(5) net_io = psutil.net_io_counters() prev_total = net_io.bytes_sent + net_io.bytes_recv From b455b57bf93225029b7a86e6c034c2219f4d3298 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 28 Feb 2025 02:53:34 +0000 Subject: [PATCH 5/6] fix: update da_identify_retrieve_missing_columns to cycle through all nodes --- src/node/nomos_node.py | 3 +++ tests/data_integrity/test_data_integrity.py | 18 ++++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/node/nomos_node.py b/src/node/nomos_node.py index a46ce26..86c5658 100644 --- a/src/node/nomos_node.py +++ b/src/node/nomos_node.py @@ -123,6 +123,9 @@ class NomosNode: def node_type(self): return self._node_type + def name(self): + return self._container_name + def check_nomos_log_errors(self, whitelist=None): keywords = LOG_ERROR_KEYWORDS diff --git a/tests/data_integrity/test_data_integrity.py b/tests/data_integrity/test_data_integrity.py index e68544d..40eae4a 100644 --- a/tests/data_integrity/test_data_integrity.py +++ b/tests/data_integrity/test_data_integrity.py @@ -19,14 +19,20 @@ class TestDataIntegrity(StepsDataAvailability): def test_da_identify_retrieve_missing_columns(self): self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0)) delay(5) - # Select one target node at random to get blob data for 1/2 columns - selected_node = self.main_nodes[random.randint(1, 3)] - rcv_data = self.get_data_range(selected_node, to_app_id(1), to_index(0), to_index(5)) - rcv_data_json = json.dumps(rcv_data) + test_results = [] + # Iterate through standard nodes to get blob data for 1/2 columns + for node in self.main_nodes[1:4]: + rcv_data = self.get_data_range(node, to_app_id(1), to_index(0), to_index(5)) + rcv_data_json = json.dumps(rcv_data) - reconstructed_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json]) + reconstructed_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json]) - assert DATA_TO_DISPERSE[1] == reconstructed_data, "Reconstructed data are not same with original data" + if DATA_TO_DISPERSE[1] == reconstructed_data: + test_results.append(node.name()) + + assert len(test_results) > 0, "Dispersed data were not received by any node" + + logger.info(f"Dispersed data received by : {test_results}") @pytest.mark.usefixtures("setup_2_node_cluster") def test_da_sampling_determines_data_presence(self): From bf236107bb032610f87951c251173d3eefbaddbf Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 28 Feb 2025 03:40:25 +0000 Subject: [PATCH 6/6] fix: use utf8 and padding separately --- src/steps/da.py | 20 ++++++++++++------- .../test_networking_privacy.py | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/steps/da.py b/src/steps/da.py index b613bb7..5aaaa98 100644 --- a/src/steps/da.py +++ b/src/steps/da.py @@ -2,8 +2,11 @@ import allure from tenacity import retry, stop_after_delay, wait_fixed from src.env_vars import NOMOS_EXECUTOR +from src.libs.custom_logger import get_custom_logger from src.steps.common import StepsCommon +logger = get_custom_logger(__name__) + def add_padding(orig_bytes): """ @@ -46,14 +49,17 @@ def remove_padding(padded_bytes): return padded_bytes[:-padding_len] -def prepare_dispersal_request(data, app_id, index, with_utf8_padding=True): - if with_utf8_padding: +def prepare_dispersal_request(data, app_id, index, utf8=True, padding=True): + if utf8: data_bytes = data.encode("utf-8") - padded_bytes = add_padding(list(data_bytes)) else: - padded_bytes = list(data) + data_bytes = bytes(data) - dispersal_data = {"data": padded_bytes, "metadata": {"app_id": app_id, "index": index}} + data_list = list(data_bytes) + if padding: + data_list = add_padding(data_list) + + dispersal_data = {"data": data_list, "metadata": {"app_id": app_id, "index": index}} return dispersal_data @@ -80,9 +86,9 @@ class StepsDataAvailability(StepsCommon): @allure.step @retry(stop=stop_after_delay(65), wait=wait_fixed(1), reraise=True) - def disperse_data(self, data, app_id, index, with_utf8_padding=True): + def disperse_data(self, data, app_id, index, utf8=True, padding=True): response = [] - request = prepare_dispersal_request(data, app_id, index, with_utf8_padding) + request = prepare_dispersal_request(data, app_id, index, utf8=utf8, padding=padding) executor = self.find_executor_node() try: response = executor.send_dispersal_request(request) diff --git a/tests/networking_privacy/test_networking_privacy.py b/tests/networking_privacy/test_networking_privacy.py index 9b85ee7..2ae100e 100644 --- a/tests/networking_privacy/test_networking_privacy.py +++ b/tests/networking_privacy/test_networking_privacy.py @@ -53,7 +53,7 @@ class TestNetworkingPrivacy(StepsDataAvailability): successful_dispersals = 0 for i in range(20): try: - self.disperse_data(data_to_disperse, to_app_id(1), to_index(0), with_utf8_padding=False) + self.disperse_data(data_to_disperse, to_app_id(1), to_index(0), utf8=False, padding=False) successful_dispersals += 1 except Exception as ex: logger.warning(f"Dispersal #{i} was not successful with error {ex}")