Merge pull request #2 from logos-co/test-data-availability-integrity

Test/data availability and integrity
This commit is contained in:
Roman Zajic 2025-01-27 21:22:44 +08:00 committed by GitHub
commit 1ec1fa4011
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 248 additions and 31 deletions

View File

@ -0,0 +1,31 @@
port: 4400
n_hosts: 2
timeout: 30
# ConsensusConfig related parameters
security_param: 10
active_slot_coeff: 0.9
# DaConfig related parameters
subnetwork_size: 2
dispersal_factor: 2
num_samples: 1
num_subnets: 2
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
global_params_path: "/kzgrs_test_params"
# Tracing
tracing_settings:
logger: Stdout
tracing: !Otlp
endpoint: http://tempo:4317/
sample_ratio: 0.5
service_name: node
filter: !EnvFilter
filters:
nomos: debug
metrics: !Otlp
endpoint: http://prometheus:9090/api/v1/otlp/v1/metrics
host_identifier: node
level: INFO

View File

@ -0,0 +1,31 @@
port: 4400
n_hosts: 5
timeout: 30
# ConsensusConfig related parameters
security_param: 10
active_slot_coeff: 0.9
# DaConfig related parameters
subnetwork_size: 2
dispersal_factor: 2
num_samples: 1
num_subnets: 2
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
global_params_path: "/kzgrs_test_params"
# Tracing
tracing_settings:
logger: Stdout
tracing: !Otlp
endpoint: http://tempo:4317/
sample_ratio: 0.5
service_name: node
filter: !EnvFilter
filters:
nomos: debug
metrics: !Otlp
endpoint: http://prometheus:9090/api/v1/otlp/v1/metrics
host_identifier: node
level: INFO

View File

@ -1,5 +1,5 @@
port: 4400
n_hosts: 2
n_hosts: 5
timeout: 30
# ConsensusConfig related parameters

View File

@ -23,3 +23,10 @@ class REST(BaseClient):
def info(self):
status_response = self.rest_call("get", "cryptarchia/info")
return status_response.json()
def send_dispersal_request(self, data):
return self.rest_call("post", "disperse-data", json.dumps(data))
def send_get_range(self, query):
response = self.rest_call("post", "da/get-range", json.dumps(query))
return response.json()

View File

@ -29,6 +29,7 @@ class NomosNode:
self._internal_ports = nomos_nodes[node_type]["ports"]
self._volumes = nomos_nodes[node_type]["volumes"]
self._entrypoint = nomos_nodes[node_type]["entrypoint"]
self._node_type = node_type
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{container_name}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name)
@ -36,8 +37,7 @@ class NomosNode:
self._container = None
cwd = os.getcwd()
for i, volume in enumerate(self._volumes):
self._volumes[i] = cwd + "/" + volume
self._volumes = [cwd + "/" + volume for volume in self._volumes]
logger.debug(f"NomosNode instance initialized with log path {self._log_path}")
@ -136,6 +136,9 @@ class NomosNode:
def info(self):
return self._api.info()
def node_type(self):
return self._node_type
def check_nomos_log_errors(self, whitelist=None):
keywords = LOG_ERROR_KEYWORDS
@ -145,3 +148,9 @@ class NomosNode:
matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False)
assert not matches, f"Found errors {matches}"
def send_dispersal_request(self, data):
return self._api.send_dispersal_request(data)
def send_get_data_range_request(self, data):
return self._api.send_get_range(data)

View File

@ -1,20 +1,71 @@
import inspect
import os
import shutil
import pytest
from src.env_vars import NODE_1, NODE_2
from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR
from src.libs.custom_logger import get_custom_logger
from src.node.nomos_node import NomosNode
logger = get_custom_logger(__name__)
def prepare_cluster_config(node_count):
cwd = os.getcwd()
config_dir = "cluster_config"
src = f"{cwd}/{config_dir}/cfgsync-{node_count}node.yaml"
dst = f"{cwd}/{config_dir}/cfgsync.yaml"
shutil.copyfile(src, dst)
def start_nodes(nodes):
for node in nodes:
node.start()
def ensure_nodes_ready(nodes):
for node in nodes:
node.ensure_ready()
class StepsCommon:
@pytest.fixture(scope="function")
def setup_main_nodes(self, request):
@pytest.fixture(scope="function", autouse=True)
def cluster_setup(self):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.node1 = NomosNode(NODE_1, f"node1_{request.cls.test_id}")
self.node1.start()
self.node2 = NomosNode(NODE_2, f"node2_{request.cls.test_id}")
self.node2.start()
self.main_nodes.extend([self.node1, self.node2])
self.main_nodes = []
@pytest.fixture(scope="function")
def setup_2_node_cluster(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
prepare_cluster_config(2)
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1")
self.main_nodes.extend([self.node1, self.node2, self.node3])
start_nodes(self.main_nodes)
try:
ensure_nodes_ready(self.main_nodes[2:])
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
@pytest.fixture(scope="function")
def setup_5_node_cluster(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
prepare_cluster_config(5)
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS, "nomos_node_1")
self.node4 = NomosNode(NOMOS, "nomos_node_2")
self.node5 = NomosNode(NOMOS, "nomos_node_3")
self.node6 = NomosNode(NOMOS_EXECUTOR, "nomos_node_4")
self.main_nodes.extend([self.node1, self.node2, self.node3, self.node4, self.node5, self.node6])
start_nodes(self.main_nodes)
try:
ensure_nodes_ready(self.main_nodes[2:])
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise

65
src/steps/da.py Normal file
View File

@ -0,0 +1,65 @@
import allure
from src.env_vars import NOMOS_EXECUTOR
from src.steps.common import StepsCommon
def add_padding(orig_bytes):
block_size = 31
"""
Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme:
- The value of each padded byte is the number of bytes padded.
- If the original data is already a multiple of the block size,
an additional full block of bytes (each the block size) is added.
"""
original_len = len(orig_bytes)
padding_needed = block_size - (original_len % block_size)
# If the data is already a multiple of block_size, add a full block of padding
if padding_needed == 0:
padding_needed = block_size
# Each padded byte will be equal to padding_needed
padded_bytes = orig_bytes + [padding_needed] * padding_needed
return padded_bytes
def prepare_dispersal_request(data, app_id, index):
data_bytes = data.encode("utf-8")
padded_bytes = add_padding(list(data_bytes))
dispersal_data = {"data": padded_bytes, "metadata": {"app_id": app_id, "index": index}}
return dispersal_data
def prepare_get_range_request(app_id, start_index, end_index):
query_data = {"app_id": app_id, "range": {"start": start_index, "end": end_index}}
return query_data
class StepsDataAvailability(StepsCommon):
def find_executor_node(self):
executor = {}
for node in self.main_nodes:
if node.node_type() == NOMOS_EXECUTOR:
executor = node
return executor
@allure.step
def disperse_data(self, data, app_id, index):
request = prepare_dispersal_request(data, app_id, index)
executor = self.find_executor_node()
try:
executor.send_dispersal_request(request)
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
@allure.step
def get_data_range(self, node, app_id, start, end):
response = []
query = prepare_get_range_request(app_id, start, end)
try:
response = node.send_get_data_range_request(query)
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
return response

View File

@ -24,3 +24,13 @@ LOG_ERROR_KEYWORDS = [
"race condition",
"double free",
]
DATA_TO_DISPERSE = [
"Hello World!",
"1234567890",
'{"key": "value"}',
"这是一些中文",
"🚀🌟✨",
"Lorem ipsum dolor sit amet",
"<html><body>Hello</body></html>",
]

View File

@ -1,6 +1,29 @@
class TestDataIntegrity:
import pytest
from src.steps.da import StepsDataAvailability
from src.test_data import DATA_TO_DISPERSE
class TestDataIntegrity(StepsDataAvailability):
main_nodes = []
def test_cluster_start(self):
for node in self.main_nodes:
print(node)
@pytest.mark.skip(reason="Waiting for PR https://github.com/logos-co/nomos-node/pull/994")
@pytest.mark.usefixtures("setup_5_node_cluster")
def test_da_identify_retrieve_missing_columns(self):
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
received_data = []
# Get data only from half of nodes
for node in self.main_nodes[2:4]:
received_data.append(self.get_data_range(node, [0] * 31 + [1], [0] * 8, [0] * 7 + [3]))
# Use received blob data to reconstruct the original data
# nomos-cli reconstruct command required
reconstructed_data = []
assert DATA_TO_DISPERSE[0] == bytes(reconstructed_data).decode("utf-8")
@pytest.mark.skip(reason="Waiting for Nomos testnet images could evolve blockchain")
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_da_sampling_determines_data_presence(self):
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
received_data = self.get_data_range(self.node2, [0] * 31 + [1], [0] * 8, [0] * 7 + [5])
assert DATA_TO_DISPERSE[0] == bytes(received_data[0][1]).decode("utf-8")

View File

@ -1,24 +1,14 @@
import pytest
from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR
from src.libs.custom_logger import get_custom_logger
from src.node.nomos_node import NomosNode
from src.steps.common import StepsCommon
logger = get_custom_logger(__name__)
class Test2NodeClAlive:
class Test2NodeClAlive(StepsCommon):
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_cluster_start(self):
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1")
self.node1.start()
self.node2.start()
self.node3.start()
try:
self.node2.ensure_ready()
self.node3.ensure_ready()
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
logger.debug("Two node cluster started successfully!")