diff --git a/cluster_config/cfgsync-template.yaml b/cluster_config/cfgsync-template.yaml index 7097f80..df3bf29 100644 --- a/cluster_config/cfgsync-template.yaml +++ b/cluster_config/cfgsync-template.yaml @@ -11,26 +11,22 @@ subnetwork_size: {{ subnet_size }} dispersal_factor: {{ dispersal_factor }} num_samples: 1 num_subnets: {{ subnet_size }} -old_blobs_check_interval_secs: 5 -blobs_validity_duration_secs: 60 +old_blobs_check_interval: "5.0" +blobs_validity_duration: "60.0" global_params_path: "/kzgrs_test_params" -min_dispersal_peers: 1 +min_dispersal_peers: {{ min_dispersal_peers }} min_replication_peers: 1 -monitor_failure_time_window_secs: 5 -balancer_interval_secs: 5 +monitor_failure_time_window: "5.0" +balancer_interval: "5.0" # Dispersal mempool publish strategy mempool_publish_strategy: !SampleSubnetworks sample_threshold: {{ subnet_size }} - timeout: - secs: 2 - nanos: 0 - cooldown: - secs: 0 - nanos: 100000000 + timeout: "2.0" + cooldown: "0.0001" replication_settings: seen_message_cache_size: 204800 - seen_message_ttl_secs: "900.0" + seen_message_ttl: "900.0" # Tracing tracing_settings: diff --git a/src/api_clients/base_client.py b/src/api_clients/base_client.py index ddb68ee..2d6e257 100644 --- a/src/api_clients/base_client.py +++ b/src/api_clients/base_client.py @@ -40,4 +40,5 @@ class BaseClient: def print_request_size(self, data): body_size = len(data) if data else 0 body_kb = body_size / 1024 - logger.debug(f"Request body size: {body_kb:.2f}kB") + if body_size > 0: + logger.debug(f"Request body size: {body_kb:.2f}kB") diff --git a/src/api_clients/rest.py b/src/api_clients/rest.py index 280ebb9..c231def 100644 --- a/src/api_clients/rest.py +++ b/src/api_clients/rest.py @@ -33,8 +33,9 @@ class REST(BaseClient): response = self.rest_call("get", "cryptarchia/info") return response.json() - def cryptarchia_headers(self, from_header_id, to_header_id): - response = self.rest_call("get", f"cryptarchia/headers?from={quote(from_header_id, safe='')}" f"&to={quote(to_header_id, safe='')}") + def cryptarchia_headers(self, query): + path = f"cryptarchia/headers{'?' + query if query else ''}" + response = self.rest_call("get", path) return response.json() def da_add_share(self, data): @@ -74,7 +75,7 @@ class REST(BaseClient): return response.json() def storage_block(self, query): - response = self.rest_call("get", "storage/block", json.dumps(query)) + response = self.rest_call("post", f"storage/block", json.dumps(query)) return response.json() def mempool_add_tx(self, data): diff --git a/src/libs/common.py b/src/libs/common.py index b089576..d0fcdaf 100644 --- a/src/libs/common.py +++ b/src/libs/common.py @@ -32,15 +32,31 @@ def generate_log_prefix(): def to_index(n: int) -> list: - if n < 0: - raise ValueError("Input must be an unsigned integer (non-negative)") - return list(n.to_bytes(8, byteorder="big")) + return to_byte_list(n, 8) def to_app_id(n: int) -> list: + return to_byte_list(n, 32) + + +def to_blob_id(n: int) -> list: + return to_byte_list(n, 32) + + +def to_header_id(n: int): if n < 0: raise ValueError("Input must be an unsigned integer (non-negative)") - return list(n.to_bytes(32, byteorder="big")) + + return n.to_bytes(32, byteorder="big").hex() + + +def to_byte_list(n: int, l: int) -> list: + if n < 0: + raise ValueError("Input must be an unsigned integer (non-negative)") + if l < 1: + raise ValueError("Length must be an unsigned integer greater than 0") + + return list(n.to_bytes(l, byteorder="big")) def random_divide_k(n, k): diff --git a/src/libs/custom_logger.py b/src/libs/custom_logger.py index 989548c..ec2f8e5 100644 --- a/src/libs/custom_logger.py +++ b/src/libs/custom_logger.py @@ -1,6 +1,6 @@ import logging -max_log_line_length = 5000 +max_log_line_length = 10000 def log_length_filter(max_length): diff --git a/src/node/nomos_node.py b/src/node/nomos_node.py index ce894b1..d3f72ea 100644 --- a/src/node/nomos_node.py +++ b/src/node/nomos_node.py @@ -157,3 +157,12 @@ class NomosNode: def send_get_data_range_request(self, data): return self._api.da_get_range(data) + + def send_get_commitments_request(self, data): + return self._api.da_get_commitments(data) + + def send_get_storage_block_request(self, data): + return self._api.storage_block(data) + + def send_get_cryptarchia_headers_request(self, data): + return self._api.cryptarchia_headers(data) diff --git a/src/steps/common.py b/src/steps/common.py index 71e4a35..d110e12 100644 --- a/src/steps/common.py +++ b/src/steps/common.py @@ -14,7 +14,7 @@ from jinja2 import Template logger = get_custom_logger(__name__) -def prepare_cluster_config(node_count, subnetwork_size=2, dispersal_factor=2): +def prepare_cluster_config(node_count, subnetwork_size=2, dispersal_factor=2, min_dispersal_peers=1): cwd = os.getcwd() config_dir = "cluster_config" @@ -22,7 +22,9 @@ def prepare_cluster_config(node_count, subnetwork_size=2, dispersal_factor=2): template_content = file.read() template = Template(template_content) - rendered = template.render(num_hosts=node_count, subnet_size=subnetwork_size, dispersal_factor=dispersal_factor) + rendered = template.render( + num_hosts=node_count, subnet_size=subnetwork_size, dispersal_factor=dispersal_factor, min_dispersal_peers=min_dispersal_peers + ) with open(f"{cwd}/{config_dir}/cfgsync.yaml", "w") as outfile: outfile.write(rendered) @@ -55,7 +57,8 @@ class StepsCommon: subnet_size = get_param_or_default(request, "subnet_size", 2) dispersal_factor = get_param_or_default(request, "dispersal_factor", 2) - prepare_cluster_config(2, subnet_size, dispersal_factor) + min_dispersal_peers = get_param_or_default(request, "min_dispersal_peers", 1) + prepare_cluster_config(2, subnet_size, dispersal_factor, min_dispersal_peers) self.node1 = NomosNode(CFGSYNC, "cfgsync") self.node2 = NomosNode(NOMOS, "nomos_node_0") @@ -76,8 +79,9 @@ class StepsCommon: logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") subnet_size = get_param_or_default(request, "subnet_size", 4) - dispersal_factor = get_param_or_default(request, "dispersal_factor", 1) - prepare_cluster_config(4, subnet_size, dispersal_factor) + dispersal_factor = get_param_or_default(request, "dispersal_factor", 2) + min_dispersal_peers = get_param_or_default(request, "min_dispersal_peers", 4) + prepare_cluster_config(4, subnet_size, dispersal_factor, min_dispersal_peers) self.node1 = NomosNode(CFGSYNC, "cfgsync") self.node2 = NomosNode(NOMOS, "nomos_node_0") diff --git a/src/steps/consensus.py b/src/steps/consensus.py new file mode 100644 index 0000000..349666e --- /dev/null +++ b/src/steps/consensus.py @@ -0,0 +1,43 @@ +from urllib.parse import quote + +import allure +from tenacity import retry, stop_after_delay, wait_fixed + +from src.libs.custom_logger import get_custom_logger +from src.steps.common import StepsCommon + +logger = get_custom_logger(__name__) + + +def prepare_get_cryptarchia_headers_request(from_header_id, to_header_id): + query_parts = [] + + if from_header_id is not None: + query_parts.append(f"from={quote(from_header_id, safe='')}") + + if to_header_id is not None: + query_parts.append(f"to={quote(to_header_id, safe='')}") + + return "&".join(query_parts) + + +class StepsConsensus(StepsCommon): + @allure.step + def get_cryptarchia_headers(self, node, from_header_id=None, to_header_id=None, **kwargs): + + timeout_duration = kwargs.get("timeout_duration", 65) + interval = kwargs.get("interval", 0.1) + + query = prepare_get_cryptarchia_headers_request(from_header_id, to_header_id) + + @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True) + def get_headers(): + try: + response = node.send_get_cryptarchia_headers_request(query) + except Exception as ex: + logger.error(f"Exception while retrieving cryptarchia headers: {ex}") + raise + + return response + + return get_headers() diff --git a/src/steps/da.py b/src/steps/da.py index 95509b6..fbf457a 100644 --- a/src/steps/da.py +++ b/src/steps/da.py @@ -28,6 +28,11 @@ def prepare_get_range_request(app_id, start_index, end_index): return query_data +def prepare_get_shares_commitments_request(blob_id): + query_data = {"blob_id": blob_id} + return query_data + + def response_contains_data(response): if response is None: return False @@ -101,3 +106,23 @@ class StepsDataAvailability(StepsCommon): return response return get_range() + + @allure.step + def get_shares_commitments(self, node, blob_id, **kwargs): + + timeout_duration = kwargs.get("timeout_duration", 65) + interval = kwargs.get("interval", 0.1) + + query = prepare_get_shares_commitments_request(blob_id) + + @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True) + def get_commitments(): + try: + response = node.send_get_commitments_request(query) + except Exception as ex: + logger.error(f"Exception while retrieving commitments: {ex}") + raise + + return response + + return get_commitments() diff --git a/src/steps/storage.py b/src/steps/storage.py new file mode 100644 index 0000000..bf81580 --- /dev/null +++ b/src/steps/storage.py @@ -0,0 +1,36 @@ +from urllib.parse import quote + +import allure +from tenacity import retry, stop_after_delay, wait_fixed + +from src.libs.custom_logger import get_custom_logger +from src.steps.common import StepsCommon + +logger = get_custom_logger(__name__) + + +def prepare_get_storage_block_request(header_id): + query_data = f"{header_id}" + return query_data + + +class StepsStorage(StepsCommon): + @allure.step + def get_storage_block(self, node, header_id, **kwargs): + + timeout_duration = kwargs.get("timeout_duration", 65) + interval = kwargs.get("interval", 0.1) + + query = prepare_get_storage_block_request(header_id) + + @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True) + def get_block(): + try: + response = node.send_get_storage_block_request(query) + except Exception as ex: + logger.error(f"Exception while retrieving storage block: {ex}") + raise + + return response + + return get_block() diff --git a/tests/protocol_compatibility/__init__.py b/tests/protocol_compatibility/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/protocol_compatibility/test_api_compatibility.py b/tests/protocol_compatibility/test_api_compatibility.py new file mode 100644 index 0000000..bc05181 --- /dev/null +++ b/tests/protocol_compatibility/test_api_compatibility.py @@ -0,0 +1,101 @@ +import pytest + +from src.libs.common import to_app_id, to_index, delay +from src.libs.custom_logger import get_custom_logger +from src.steps.consensus import StepsConsensus +from src.steps.da import StepsDataAvailability +from src.steps.storage import StepsStorage +from src.test_data import DATA_TO_DISPERSE + +logger = get_custom_logger(__name__) + + +# Extract commitments from indexed shares +def extract_commitments(index_shares): + aggregated_column_commitments = [] + rows_commitments = [] + + for index, shares in index_shares: + for share in shares: + a_c_c = share["aggregated_column_commitment"] + if a_c_c not in aggregated_column_commitments: + aggregated_column_commitments.append(a_c_c) + + r_c = share["rows_commitments"] + for commitment in r_c: + if commitment not in rows_commitments: + rows_commitments.append(commitment) + + return aggregated_column_commitments, rows_commitments + + +# Parse commitments received by get_shares_commitments +def parse_commitments(commitments): + aggregated_column_commitments = [] + rows_commitments = [] + for commitment in commitments: + aggregated_column_commitments.append(commitment["aggregated_column_commitment"]) + for rows_commitment in commitment["rows_commitments"]: + rows_commitments.append(rows_commitment) + + return aggregated_column_commitments, rows_commitments + + +class TestApiCompatibility(StepsDataAvailability, StepsConsensus, StepsStorage): + main_nodes = [] + + @pytest.mark.usefixtures("setup_2_node_cluster") + def test_da_consensus_compatibility(self): + self.disperse_data(DATA_TO_DISPERSE[2], to_app_id(1), to_index(0)) + delay(5) + index_shares = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5)) + column_commitments, rows_commitments = extract_commitments(index_shares) + + # Get consensus headers + headers = self.get_cryptarchia_headers(self.node2) + + # Get storage blocks for received headers and extract blob ids + blob_ids = [ + blob["id"] + for header in headers + for block in [self.get_storage_block(self.node2, header)] + if block is not None and "bl_blobs" in block + for blob in block["bl_blobs"] + ] + + # Get commitments for blob ids + commitments = [self.get_shares_commitments(self.node2, blob_id) for blob_id in blob_ids] + + rcv_column_commitments, rcv_rows_commitments = parse_commitments(commitments) + + # Check commitments from shares match commitments received based on consensus data + assert all(c in rcv_column_commitments for c in column_commitments), "Not all aggregated column commitments are present" + assert all(r in rcv_rows_commitments for r in rows_commitments), "Not all rows commitments are present" + + @pytest.mark.usefixtures("setup_4_node_cluster") + def test_da_cross_nodes_consensus_compatibility(self): + self.disperse_data(DATA_TO_DISPERSE[2], to_app_id(1), to_index(0)) + delay(5) + index_shares = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5)) + column_commitments, rows_commitments = extract_commitments(index_shares) + + # Get consensus headers + headers = self.get_cryptarchia_headers(self.node3) + + # Get storage blocks for received headers and extract blob ids + blob_ids = [ + blob["id"] + for header in headers + for block in [self.get_storage_block(self.node3, header)] + if block is not None and "bl_blobs" in block + for blob in block["bl_blobs"] + ] + + # Get commitments for blob ids + commitments = [self.get_shares_commitments(self.node3, blob_id) for blob_id in blob_ids] + + rcv_column_commitments, rcv_rows_commitments = parse_commitments(commitments) + + # Check commitments from shares match commitments received based on consensus data + assert all(c in rcv_column_commitments for c in column_commitments), "Not all aggregated column commitments are present" + assert all(r in rcv_rows_commitments for r in rows_commitments), "Not all rows commitments are present"