diff --git a/cluster_config/cfgsync-2node.yaml b/cluster_config/cfgsync-2node.yaml new file mode 100644 index 0000000..10840a5 --- /dev/null +++ b/cluster_config/cfgsync-2node.yaml @@ -0,0 +1,31 @@ +port: 4400 +n_hosts: 2 +timeout: 30 + +# ConsensusConfig related parameters +security_param: 10 +active_slot_coeff: 0.9 + +# DaConfig related parameters +subnetwork_size: 2 +dispersal_factor: 2 +num_samples: 1 +num_subnets: 2 +old_blobs_check_interval_secs: 5 +blobs_validity_duration_secs: 60 +global_params_path: "/kzgrs_test_params" + +# Tracing +tracing_settings: + logger: Stdout + tracing: !Otlp + endpoint: http://tempo:4317/ + sample_ratio: 0.5 + service_name: node + filter: !EnvFilter + filters: + nomos: debug + metrics: !Otlp + endpoint: http://prometheus:9090/api/v1/otlp/v1/metrics + host_identifier: node + level: INFO \ No newline at end of file diff --git a/cluster_config/cfgsync-5node.yaml b/cluster_config/cfgsync-5node.yaml new file mode 100644 index 0000000..dde1f71 --- /dev/null +++ b/cluster_config/cfgsync-5node.yaml @@ -0,0 +1,31 @@ +port: 4400 +n_hosts: 5 +timeout: 30 + +# ConsensusConfig related parameters +security_param: 10 +active_slot_coeff: 0.9 + +# DaConfig related parameters +subnetwork_size: 2 +dispersal_factor: 2 +num_samples: 1 +num_subnets: 2 +old_blobs_check_interval_secs: 5 +blobs_validity_duration_secs: 60 +global_params_path: "/kzgrs_test_params" + +# Tracing +tracing_settings: + logger: Stdout + tracing: !Otlp + endpoint: http://tempo:4317/ + sample_ratio: 0.5 + service_name: node + filter: !EnvFilter + filters: + nomos: debug + metrics: !Otlp + endpoint: http://prometheus:9090/api/v1/otlp/v1/metrics + host_identifier: node + level: INFO \ No newline at end of file diff --git a/cluster_config/cfgsync.yaml b/cluster_config/cfgsync.yaml index 10840a5..dde1f71 100644 --- a/cluster_config/cfgsync.yaml +++ b/cluster_config/cfgsync.yaml @@ -1,5 +1,5 @@ port: 4400 -n_hosts: 2 +n_hosts: 5 timeout: 30 # ConsensusConfig related parameters diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index a899d83..6993e30 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -23,3 +23,10 @@ class REST(BaseClient): def info(self): status_response = self.rest_call("get", "cryptarchia/info") return status_response.json() + + def send_dispersal_request(self, data): + return self.rest_call("post", "disperse-data", json.dumps(data)) + + def send_get_range(self, query): + response = self.rest_call("post", "da/get-range", json.dumps(query)) + return response.json() diff --git a/src/node/nomos_node.py b/src/node/nomos_node.py index cd3f079..d44cdef 100644 --- a/src/node/nomos_node.py +++ b/src/node/nomos_node.py @@ -29,6 +29,7 @@ class NomosNode: self._internal_ports = nomos_nodes[node_type]["ports"] self._volumes = nomos_nodes[node_type]["volumes"] self._entrypoint = nomos_nodes[node_type]["entrypoint"] + self._node_type = node_type self._log_path = os.path.join(DOCKER_LOG_DIR, f"{container_name}__{self._image_name.replace('/', '_')}.log") self._docker_manager = DockerManager(self._image_name) @@ -36,8 +37,7 @@ class NomosNode: self._container = None cwd = os.getcwd() - for i, volume in enumerate(self._volumes): - self._volumes[i] = cwd + "/" + volume + self._volumes = [cwd + "/" + volume for volume in self._volumes] logger.debug(f"NomosNode instance initialized with log path {self._log_path}") @@ -136,6 +136,9 @@ class NomosNode: def info(self): return self._api.info() + def node_type(self): + return self._node_type + def check_nomos_log_errors(self, whitelist=None): keywords = LOG_ERROR_KEYWORDS @@ -145,3 +148,9 @@ class NomosNode: matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False) assert not matches, f"Found errors {matches}" + + def send_dispersal_request(self, data): + return self._api.send_dispersal_request(data) + + def send_get_data_range_request(self, data): + return self._api.send_get_range(data) diff --git a/src/steps/common.py b/src/steps/common.py index 91fdea5..90e734f 100644 --- a/src/steps/common.py +++ b/src/steps/common.py @@ -1,20 +1,71 @@ import inspect +import os +import shutil import pytest -from src.env_vars import NODE_1, NODE_2 +from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR from src.libs.custom_logger import get_custom_logger from src.node.nomos_node import NomosNode logger = get_custom_logger(__name__) +def prepare_cluster_config(node_count): + cwd = os.getcwd() + config_dir = "cluster_config" + src = f"{cwd}/{config_dir}/cfgsync-{node_count}node.yaml" + dst = f"{cwd}/{config_dir}/cfgsync.yaml" + shutil.copyfile(src, dst) + + +def start_nodes(nodes): + for node in nodes: + node.start() + + +def ensure_nodes_ready(nodes): + for node in nodes: + node.ensure_ready() + + class StepsCommon: - @pytest.fixture(scope="function") - def setup_main_nodes(self, request): + @pytest.fixture(scope="function", autouse=True) + def cluster_setup(self): logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") - self.node1 = NomosNode(NODE_1, f"node1_{request.cls.test_id}") - self.node1.start() - self.node2 = NomosNode(NODE_2, f"node2_{request.cls.test_id}") - self.node2.start() - self.main_nodes.extend([self.node1, self.node2]) + self.main_nodes = [] + + @pytest.fixture(scope="function") + def setup_2_node_cluster(self, request): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + prepare_cluster_config(2) + self.node1 = NomosNode(CFGSYNC, "cfgsync") + self.node2 = NomosNode(NOMOS, "nomos_node_0") + self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1") + self.main_nodes.extend([self.node1, self.node2, self.node3]) + start_nodes(self.main_nodes) + + try: + ensure_nodes_ready(self.main_nodes[2:]) + except Exception as ex: + logger.error(f"REST service did not become ready in time: {ex}") + raise + + @pytest.fixture(scope="function") + def setup_5_node_cluster(self, request): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + prepare_cluster_config(5) + self.node1 = NomosNode(CFGSYNC, "cfgsync") + self.node2 = NomosNode(NOMOS, "nomos_node_0") + self.node3 = NomosNode(NOMOS, "nomos_node_1") + self.node4 = NomosNode(NOMOS, "nomos_node_2") + self.node5 = NomosNode(NOMOS, "nomos_node_3") + self.node6 = NomosNode(NOMOS_EXECUTOR, "nomos_node_4") + self.main_nodes.extend([self.node1, self.node2, self.node3, self.node4, self.node5, self.node6]) + start_nodes(self.main_nodes) + + try: + ensure_nodes_ready(self.main_nodes[2:]) + except Exception as ex: + logger.error(f"REST service did not become ready in time: {ex}") + raise diff --git a/src/steps/da.py b/src/steps/da.py new file mode 100644 index 0000000..c534e75 --- /dev/null +++ b/src/steps/da.py @@ -0,0 +1,65 @@ +import allure + +from src.env_vars import NOMOS_EXECUTOR +from src.steps.common import StepsCommon + + +def add_padding(orig_bytes): + block_size = 31 + """ + Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme: + - The value of each padded byte is the number of bytes padded. + - If the original data is already a multiple of the block size, + an additional full block of bytes (each the block size) is added. + """ + original_len = len(orig_bytes) + padding_needed = block_size - (original_len % block_size) + # If the data is already a multiple of block_size, add a full block of padding + if padding_needed == 0: + padding_needed = block_size + + # Each padded byte will be equal to padding_needed + padded_bytes = orig_bytes + [padding_needed] * padding_needed + return padded_bytes + + +def prepare_dispersal_request(data, app_id, index): + data_bytes = data.encode("utf-8") + padded_bytes = add_padding(list(data_bytes)) + dispersal_data = {"data": padded_bytes, "metadata": {"app_id": app_id, "index": index}} + return dispersal_data + + +def prepare_get_range_request(app_id, start_index, end_index): + query_data = {"app_id": app_id, "range": {"start": start_index, "end": end_index}} + return query_data + + +class StepsDataAvailability(StepsCommon): + + def find_executor_node(self): + executor = {} + for node in self.main_nodes: + if node.node_type() == NOMOS_EXECUTOR: + executor = node + return executor + + @allure.step + def disperse_data(self, data, app_id, index): + request = prepare_dispersal_request(data, app_id, index) + executor = self.find_executor_node() + try: + executor.send_dispersal_request(request) + except Exception as ex: + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + + @allure.step + def get_data_range(self, node, app_id, start, end): + response = [] + query = prepare_get_range_request(app_id, start, end) + try: + response = node.send_get_data_range_request(query) + except Exception as ex: + assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + + return response diff --git a/src/test_data.py b/src/test_data.py index e4d7665..31cdf31 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -24,3 +24,13 @@ LOG_ERROR_KEYWORDS = [ "race condition", "double free", ] + +DATA_TO_DISPERSE = [ + "Hello World!", + "1234567890", + '{"key": "value"}', + "这是一些中文", + "🚀🌟✨", + "Lorem ipsum dolor sit amet", + "Hello", +] diff --git a/tests/data_integrity/test_data_integrity.py b/tests/data_integrity/test_data_integrity.py index 635559e..6afc39a 100644 --- a/tests/data_integrity/test_data_integrity.py +++ b/tests/data_integrity/test_data_integrity.py @@ -1,6 +1,29 @@ -class TestDataIntegrity: +import pytest + +from src.steps.da import StepsDataAvailability +from src.test_data import DATA_TO_DISPERSE + + +class TestDataIntegrity(StepsDataAvailability): main_nodes = [] - def test_cluster_start(self): - for node in self.main_nodes: - print(node) + @pytest.mark.skip(reason="Waiting for PR https://github.com/logos-co/nomos-node/pull/994") + @pytest.mark.usefixtures("setup_5_node_cluster") + def test_da_identify_retrieve_missing_columns(self): + self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8) + received_data = [] + # Get data only from half of nodes + for node in self.main_nodes[2:4]: + received_data.append(self.get_data_range(node, [0] * 31 + [1], [0] * 8, [0] * 7 + [3])) + + # Use received blob data to reconstruct the original data + # nomos-cli reconstruct command required + reconstructed_data = [] + assert DATA_TO_DISPERSE[0] == bytes(reconstructed_data).decode("utf-8") + + @pytest.mark.skip(reason="Waiting for Nomos testnet images could evolve blockchain") + @pytest.mark.usefixtures("setup_2_node_cluster") + def test_da_sampling_determines_data_presence(self): + self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8) + received_data = self.get_data_range(self.node2, [0] * 31 + [1], [0] * 8, [0] * 7 + [5]) + assert DATA_TO_DISPERSE[0] == bytes(received_data[0][1]).decode("utf-8") diff --git a/tests/e2e/test_2node_alive.py b/tests/e2e/test_2node_alive.py index 9553332..fbf87d9 100644 --- a/tests/e2e/test_2node_alive.py +++ b/tests/e2e/test_2node_alive.py @@ -1,24 +1,14 @@ +import pytest + from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR from src.libs.custom_logger import get_custom_logger from src.node.nomos_node import NomosNode +from src.steps.common import StepsCommon logger = get_custom_logger(__name__) -class Test2NodeClAlive: +class Test2NodeClAlive(StepsCommon): + @pytest.mark.usefixtures("setup_2_node_cluster") def test_cluster_start(self): - - self.node1 = NomosNode(CFGSYNC, "cfgsync") - self.node2 = NomosNode(NOMOS, "nomos_node_0") - self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1") - - self.node1.start() - self.node2.start() - self.node3.start() - - try: - self.node2.ensure_ready() - self.node3.ensure_ready() - except Exception as ex: - logger.error(f"REST service did not become ready in time: {ex}") - raise + logger.debug("Two node cluster started successfully!")