Merge pull request #14 from logos-co/test-protocol-api-compatibility

Test/protocol API compatibility
This commit is contained in:
Roman Zajic 2025-04-18 10:48:43 +08:00 committed by GitHub
commit 4848e59624
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 258 additions and 26 deletions

View File

@ -11,26 +11,22 @@ subnetwork_size: {{ subnet_size }}
dispersal_factor: {{ dispersal_factor }}
num_samples: 1
num_subnets: {{ subnet_size }}
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
old_blobs_check_interval: "5.0"
blobs_validity_duration: "60.0"
global_params_path: "/kzgrs_test_params"
min_dispersal_peers: 1
min_dispersal_peers: {{ min_dispersal_peers }}
min_replication_peers: 1
monitor_failure_time_window_secs: 5
balancer_interval_secs: 5
monitor_failure_time_window: "5.0"
balancer_interval: "5.0"
# Dispersal mempool publish strategy
mempool_publish_strategy: !SampleSubnetworks
sample_threshold: {{ subnet_size }}
timeout:
secs: 2
nanos: 0
cooldown:
secs: 0
nanos: 100000000
timeout: "2.0"
cooldown: "0.0001"
replication_settings:
seen_message_cache_size: 204800
seen_message_ttl_secs: "900.0"
seen_message_ttl: "900.0"
# Tracing
tracing_settings:

View File

@ -40,4 +40,5 @@ class BaseClient:
def print_request_size(self, data):
body_size = len(data) if data else 0
body_kb = body_size / 1024
logger.debug(f"Request body size: {body_kb:.2f}kB")
if body_size > 0:
logger.debug(f"Request body size: {body_kb:.2f}kB")

View File

@ -33,8 +33,9 @@ class REST(BaseClient):
response = self.rest_call("get", "cryptarchia/info")
return response.json()
def cryptarchia_headers(self, from_header_id, to_header_id):
response = self.rest_call("get", f"cryptarchia/headers?from={quote(from_header_id, safe='')}" f"&to={quote(to_header_id, safe='')}")
def cryptarchia_headers(self, query):
path = f"cryptarchia/headers{'?' + query if query else ''}"
response = self.rest_call("get", path)
return response.json()
def da_add_share(self, data):
@ -74,7 +75,7 @@ class REST(BaseClient):
return response.json()
def storage_block(self, query):
response = self.rest_call("get", "storage/block", json.dumps(query))
response = self.rest_call("post", f"storage/block", json.dumps(query))
return response.json()
def mempool_add_tx(self, data):

View File

@ -32,15 +32,31 @@ def generate_log_prefix():
def to_index(n: int) -> list:
if n < 0:
raise ValueError("Input must be an unsigned integer (non-negative)")
return list(n.to_bytes(8, byteorder="big"))
return to_byte_list(n, 8)
def to_app_id(n: int) -> list:
return to_byte_list(n, 32)
def to_blob_id(n: int) -> list:
return to_byte_list(n, 32)
def to_header_id(n: int):
if n < 0:
raise ValueError("Input must be an unsigned integer (non-negative)")
return list(n.to_bytes(32, byteorder="big"))
return n.to_bytes(32, byteorder="big").hex()
def to_byte_list(n: int, l: int) -> list:
if n < 0:
raise ValueError("Input must be an unsigned integer (non-negative)")
if l < 1:
raise ValueError("Length must be an unsigned integer greater than 0")
return list(n.to_bytes(l, byteorder="big"))
def random_divide_k(n, k):

View File

@ -1,6 +1,6 @@
import logging
max_log_line_length = 5000
max_log_line_length = 10000
def log_length_filter(max_length):

View File

@ -157,3 +157,12 @@ class NomosNode:
def send_get_data_range_request(self, data):
return self._api.da_get_range(data)
def send_get_commitments_request(self, data):
return self._api.da_get_commitments(data)
def send_get_storage_block_request(self, data):
return self._api.storage_block(data)
def send_get_cryptarchia_headers_request(self, data):
return self._api.cryptarchia_headers(data)

View File

@ -14,7 +14,7 @@ from jinja2 import Template
logger = get_custom_logger(__name__)
def prepare_cluster_config(node_count, subnetwork_size=2, dispersal_factor=2):
def prepare_cluster_config(node_count, subnetwork_size=2, dispersal_factor=2, min_dispersal_peers=1):
cwd = os.getcwd()
config_dir = "cluster_config"
@ -22,7 +22,9 @@ def prepare_cluster_config(node_count, subnetwork_size=2, dispersal_factor=2):
template_content = file.read()
template = Template(template_content)
rendered = template.render(num_hosts=node_count, subnet_size=subnetwork_size, dispersal_factor=dispersal_factor)
rendered = template.render(
num_hosts=node_count, subnet_size=subnetwork_size, dispersal_factor=dispersal_factor, min_dispersal_peers=min_dispersal_peers
)
with open(f"{cwd}/{config_dir}/cfgsync.yaml", "w") as outfile:
outfile.write(rendered)
@ -55,7 +57,8 @@ class StepsCommon:
subnet_size = get_param_or_default(request, "subnet_size", 2)
dispersal_factor = get_param_or_default(request, "dispersal_factor", 2)
prepare_cluster_config(2, subnet_size, dispersal_factor)
min_dispersal_peers = get_param_or_default(request, "min_dispersal_peers", 1)
prepare_cluster_config(2, subnet_size, dispersal_factor, min_dispersal_peers)
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
@ -76,8 +79,9 @@ class StepsCommon:
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
subnet_size = get_param_or_default(request, "subnet_size", 4)
dispersal_factor = get_param_or_default(request, "dispersal_factor", 1)
prepare_cluster_config(4, subnet_size, dispersal_factor)
dispersal_factor = get_param_or_default(request, "dispersal_factor", 2)
min_dispersal_peers = get_param_or_default(request, "min_dispersal_peers", 4)
prepare_cluster_config(4, subnet_size, dispersal_factor, min_dispersal_peers)
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")

43
src/steps/consensus.py Normal file
View File

@ -0,0 +1,43 @@
from urllib.parse import quote
import allure
from tenacity import retry, stop_after_delay, wait_fixed
from src.libs.custom_logger import get_custom_logger
from src.steps.common import StepsCommon
logger = get_custom_logger(__name__)
def prepare_get_cryptarchia_headers_request(from_header_id, to_header_id):
query_parts = []
if from_header_id is not None:
query_parts.append(f"from={quote(from_header_id, safe='')}")
if to_header_id is not None:
query_parts.append(f"to={quote(to_header_id, safe='')}")
return "&".join(query_parts)
class StepsConsensus(StepsCommon):
@allure.step
def get_cryptarchia_headers(self, node, from_header_id=None, to_header_id=None, **kwargs):
timeout_duration = kwargs.get("timeout_duration", 65)
interval = kwargs.get("interval", 0.1)
query = prepare_get_cryptarchia_headers_request(from_header_id, to_header_id)
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True)
def get_headers():
try:
response = node.send_get_cryptarchia_headers_request(query)
except Exception as ex:
logger.error(f"Exception while retrieving cryptarchia headers: {ex}")
raise
return response
return get_headers()

View File

@ -28,6 +28,11 @@ def prepare_get_range_request(app_id, start_index, end_index):
return query_data
def prepare_get_shares_commitments_request(blob_id):
query_data = {"blob_id": blob_id}
return query_data
def response_contains_data(response):
if response is None:
return False
@ -101,3 +106,23 @@ class StepsDataAvailability(StepsCommon):
return response
return get_range()
@allure.step
def get_shares_commitments(self, node, blob_id, **kwargs):
timeout_duration = kwargs.get("timeout_duration", 65)
interval = kwargs.get("interval", 0.1)
query = prepare_get_shares_commitments_request(blob_id)
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True)
def get_commitments():
try:
response = node.send_get_commitments_request(query)
except Exception as ex:
logger.error(f"Exception while retrieving commitments: {ex}")
raise
return response
return get_commitments()

36
src/steps/storage.py Normal file
View File

@ -0,0 +1,36 @@
from urllib.parse import quote
import allure
from tenacity import retry, stop_after_delay, wait_fixed
from src.libs.custom_logger import get_custom_logger
from src.steps.common import StepsCommon
logger = get_custom_logger(__name__)
def prepare_get_storage_block_request(header_id):
query_data = f"{header_id}"
return query_data
class StepsStorage(StepsCommon):
@allure.step
def get_storage_block(self, node, header_id, **kwargs):
timeout_duration = kwargs.get("timeout_duration", 65)
interval = kwargs.get("interval", 0.1)
query = prepare_get_storage_block_request(header_id)
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True)
def get_block():
try:
response = node.send_get_storage_block_request(query)
except Exception as ex:
logger.error(f"Exception while retrieving storage block: {ex}")
raise
return response
return get_block()

View File

View File

@ -0,0 +1,101 @@
import pytest
from src.libs.common import to_app_id, to_index, delay
from src.libs.custom_logger import get_custom_logger
from src.steps.consensus import StepsConsensus
from src.steps.da import StepsDataAvailability
from src.steps.storage import StepsStorage
from src.test_data import DATA_TO_DISPERSE
logger = get_custom_logger(__name__)
# Extract commitments from indexed shares
def extract_commitments(index_shares):
aggregated_column_commitments = []
rows_commitments = []
for index, shares in index_shares:
for share in shares:
a_c_c = share["aggregated_column_commitment"]
if a_c_c not in aggregated_column_commitments:
aggregated_column_commitments.append(a_c_c)
r_c = share["rows_commitments"]
for commitment in r_c:
if commitment not in rows_commitments:
rows_commitments.append(commitment)
return aggregated_column_commitments, rows_commitments
# Parse commitments received by get_shares_commitments
def parse_commitments(commitments):
aggregated_column_commitments = []
rows_commitments = []
for commitment in commitments:
aggregated_column_commitments.append(commitment["aggregated_column_commitment"])
for rows_commitment in commitment["rows_commitments"]:
rows_commitments.append(rows_commitment)
return aggregated_column_commitments, rows_commitments
class TestApiCompatibility(StepsDataAvailability, StepsConsensus, StepsStorage):
main_nodes = []
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_da_consensus_compatibility(self):
self.disperse_data(DATA_TO_DISPERSE[2], to_app_id(1), to_index(0))
delay(5)
index_shares = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5))
column_commitments, rows_commitments = extract_commitments(index_shares)
# Get consensus headers
headers = self.get_cryptarchia_headers(self.node2)
# Get storage blocks for received headers and extract blob ids
blob_ids = [
blob["id"]
for header in headers
for block in [self.get_storage_block(self.node2, header)]
if block is not None and "bl_blobs" in block
for blob in block["bl_blobs"]
]
# Get commitments for blob ids
commitments = [self.get_shares_commitments(self.node2, blob_id) for blob_id in blob_ids]
rcv_column_commitments, rcv_rows_commitments = parse_commitments(commitments)
# Check commitments from shares match commitments received based on consensus data
assert all(c in rcv_column_commitments for c in column_commitments), "Not all aggregated column commitments are present"
assert all(r in rcv_rows_commitments for r in rows_commitments), "Not all rows commitments are present"
@pytest.mark.usefixtures("setup_4_node_cluster")
def test_da_cross_nodes_consensus_compatibility(self):
self.disperse_data(DATA_TO_DISPERSE[2], to_app_id(1), to_index(0))
delay(5)
index_shares = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5))
column_commitments, rows_commitments = extract_commitments(index_shares)
# Get consensus headers
headers = self.get_cryptarchia_headers(self.node3)
# Get storage blocks for received headers and extract blob ids
blob_ids = [
blob["id"]
for header in headers
for block in [self.get_storage_block(self.node3, header)]
if block is not None and "bl_blobs" in block
for blob in block["bl_blobs"]
]
# Get commitments for blob ids
commitments = [self.get_shares_commitments(self.node3, blob_id) for blob_id in blob_ids]
rcv_column_commitments, rcv_rows_commitments = parse_commitments(commitments)
# Check commitments from shares match commitments received based on consensus data
assert all(c in rcv_column_commitments for c in column_commitments), "Not all aggregated column commitments are present"
assert all(r in rcv_rows_commitments for r in rows_commitments), "Not all rows commitments are present"