Merge pull request #3 from logos-co/test-data-availability-integrity

chore: Sync data availability and integrity tests with latest Nomos node changes
This commit is contained in:
Roman Zajic 2025-02-15 06:29:38 +08:00 committed by GitHub
commit c56f25b697
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 384 additions and 87 deletions

27
.github/actions/prune-vm/action.yml vendored Normal file
View File

@ -0,0 +1,27 @@
# Inspired by https://github.com/AdityaGarg8/remove-unwanted-software
# to free up disk space. Currently removes Dotnet, Android and Haskell.
name: Remove unwanted software
description: Default GitHub runners come with a lot of unnecessary software
runs:
using: "composite"
steps:
- name: Disk space report before modification
shell: bash
run: |
echo "==> Available space before cleanup"
echo
df -h
- name: Maximize build disk space
shell: bash
run: |
set -euo pipefail
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/lib/android
sudo rm -rf /opt/ghc
sudo rm -rf /usr/local/.ghcup
- name: Disk space report after modification
shell: bash
run: |
echo "==> Available space after cleanup"
echo
df -h

11
.github/workflows/nomos_daily.yml vendored Normal file
View File

@ -0,0 +1,11 @@
name: Nomos E2E Tests Daily
on:
schedule:
- cron: '0 4 * * *'
workflow_dispatch:
jobs:
test-common:
uses: ./.github/workflows/test_common.yml

32
.github/workflows/test_common.yml vendored Normal file
View File

@ -0,0 +1,32 @@
name: E2E Tests Common
on:
workflow_call:
env:
FORCE_COLOR: "1"
jobs:
tests:
name: tests
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
- uses: actions/checkout@v4
- name: Remove unwanted software
uses: ./.github/actions/prune-vm
- uses: actions/setup-python@v4
with:
python-version: '3.12'
cache: 'pip'
- run: |
pip install -r requirements.txt
mkdir -p kzgrs
wget https://raw.githubusercontent.com/logos-co/nomos-node/master/tests/kzgrs/kzgrs_test_params -O kzgrs/kzgrs_test_params
- name: Run tests
run: |
pytest

View File

@ -1,5 +1,5 @@
port: 4400
n_hosts: 5
n_hosts: 4
timeout: 30
# ConsensusConfig related parameters

View File

@ -1,5 +1,5 @@
port: 4400
n_hosts: 5
n_hosts: 2
timeout: 30
# ConsensusConfig related parameters

View File

@ -1,7 +1,6 @@
from src.libs.custom_logger import get_custom_logger
import json
from urllib.parse import quote
from src.node.api_clients.base_client import BaseClient
from src.api_clients.base_client import BaseClient
logger = get_custom_logger(__name__)
@ -12,12 +11,12 @@ class REST(BaseClient):
def rest_call(self, method, endpoint, payload=None):
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
headers = {"Content-Type": "application/json"}
headers = {"Content-Type": "application/json", "Connection": "close"}
return self.make_request(method, url, headers=headers, data=payload)
def rest_call_text(self, method, endpoint, payload=None):
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
headers = {"accept": "text/plain"}
headers = {"accept": "text/plain", "Connection": "close"}
return self.make_request(method, url, headers=headers, data=payload)
def info(self):

0
src/cli/__init__.py Normal file
View File

11
src/cli/cli_vars.py Normal file
View File

@ -0,0 +1,11 @@
from src.env_vars import NOMOS_IMAGE
nomos_cli = {
"reconstruct": {
"image": NOMOS_IMAGE,
"flags": [{"--app-blobs": [0]}], # Value [] is a list of indexes into list of values required for the flag
"volumes": [],
"ports": [],
"entrypoint": "",
},
}

111
src/cli/nomos_cli.py Normal file
View File

@ -0,0 +1,111 @@
import json
import os
import re
from src.data_storage import DS
from src.libs.common import generate_log_prefix
from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed
from src.cli.cli_vars import nomos_cli
from src.docker_manager import DockerManager, stop, kill
from src.env_vars import DOCKER_LOG_DIR, NOMOS_CLI
from src.steps.da import remove_padding
logger = get_custom_logger(__name__)
class NomosCli:
def __init__(self, **kwargs):
if "command" not in kwargs:
raise ValueError("The command parameter is required")
command = kwargs["command"]
if command not in nomos_cli:
raise ValueError("Unknown command provided")
logger.debug(f"Cli is going to be initialized with this config {nomos_cli[command]}")
self._command = command
self._image_name = nomos_cli[command]["image"]
self._internal_ports = nomos_cli[command]["ports"]
self._volumes = nomos_cli[command]["volumes"]
self._entrypoint = nomos_cli[command]["entrypoint"]
container_name = "nomos-cli-" + generate_log_prefix()
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{container_name}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name)
self._container_name = container_name
self._container = None
cwd = os.getcwd()
self._volumes = [cwd + "/" + volume for volume in self._volumes]
def run(self, input_values=None, **kwargs):
logger.debug(f"NomosCli starting with log path {self._log_path}")
self._port_map = {}
cmd = [NOMOS_CLI, self._command]
for flag in nomos_cli[self._command]["flags"]:
for f, indexes in flag.items():
cmd.append(f)
for j in indexes:
cmd.append(input_values[j])
logger.debug(f"NomosCli command to run {cmd}")
self._container = self._docker_manager.start_container(
self._docker_manager.image,
port_bindings=self._port_map,
args=None,
log_path=self._log_path,
volumes=self._volumes,
entrypoint=self._entrypoint,
remove_container=True,
name=self._container_name,
command=cmd,
)
DS.nomos_nodes.append(self)
match self._command:
case "reconstruct":
decode_only = kwargs.get("decode_only", False)
return self.reconstruct(input_values=input_values, decode_only=decode_only)
case _:
return
def reconstruct(self, input_values=None, decode_only=False):
keywords = ["Reconstructed data"]
log_stream = self._container.logs(stream=True)
matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False, log_stream)
assert len(matches) > 0, f"Reconstructed data not found {matches}"
# Use regular expression that captures the byte list after "Reconstructed data"
result = re.sub(r".*Reconstructed data\s*(\[[^\]]+\]).*", r"\1", matches[keywords[0]][0])
result_bytes = []
try:
result_bytes = json.loads(result)
except Exception as ex:
logger.debug(f"Conversion to bytes failed with exception {ex}")
if decode_only:
result_bytes = result_bytes[:-31]
result_bytes = remove_padding(result_bytes)
result = bytes(result_bytes).decode("utf-8")
DS.nomos_nodes.remove(self)
return result
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
self._container = stop(self._container)
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
self._container = kill(self._container)

View File

@ -35,30 +35,41 @@ class DockerManager:
logger.debug(f"Network {network_name} created")
return network
def start_container(self, image_name, port_bindings, args, log_path, volumes, entrypoint, remove_container=True, name=None):
def start_container(self, image_name, port_bindings, args, log_path, volumes, entrypoint, **kwargs):
remove_container = kwargs.get("remove_container", True)
name = kwargs.get("name")
command = kwargs.get("command")
cli_args = []
for key, value in args.items():
if isinstance(value, list): # Check if value is a list
cli_args.extend([f"--{key}={item}" for item in value]) # Add a command for each item in the list
elif value is None:
cli_args.append(f"{key}") # Add simple command as it is passed in the key
else:
cli_args.append(f"--{key}={value}") # Add a single command
if command is None:
for key, value in args.items():
if isinstance(value, list): # Check if value is a list
cli_args.extend([f"--{key}={item}" for item in value]) # Add a command for each item in the list
elif value is None:
cli_args.append(f"{key}") # Add simple command as it is passed in the key
else:
cli_args.append(f"--{key}={value}") # Add a single command
else:
cli_args = command
cli_args_str_for_log = " ".join(cli_args)
logger.debug(f"docker run -i -t {port_bindings} {image_name} {cli_args_str_for_log}")
container = self._client.containers.run(
image_name,
command=cli_args,
ports=port_bindings,
detach=True,
remove=remove_container,
auto_remove=remove_container,
volumes=volumes,
entrypoint=entrypoint,
name=name,
network=NETWORK_NAME,
)
logger.debug(f"docker run -i -t --entrypoint {entrypoint} {port_bindings} {image_name} {cli_args_str_for_log}")
try:
container = self._client.containers.run(
image_name,
command=cli_args,
ports=port_bindings,
detach=True,
remove=remove_container,
auto_remove=remove_container,
volumes=volumes,
entrypoint=entrypoint,
name=name,
network=NETWORK_NAME,
)
except Exception as ex:
logger.debug(f"Docker container run failed with exception {ex}")
logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}")
log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path))
@ -125,19 +136,32 @@ class DockerManager:
def image(self):
return self._image
def search_log_for_keywords(self, log_path, keywords, use_regex=False):
def find_keywords_in_line(self, keywords, line, use_regex=False):
matches = {keyword: [] for keyword in keywords}
# Open the log file and search line by line
with open(log_path, "r") as log_file:
for line in log_file:
for keyword in keywords:
if use_regex:
if re.search(keyword, line, re.IGNORECASE):
matches[keyword].append(line.strip())
else:
if keyword.lower() in line.lower():
matches[keyword].append(line.strip())
for keyword in keywords:
if use_regex:
if re.search(keyword, line, re.IGNORECASE):
matches[keyword].append(line.strip())
else:
if keyword.lower() in line.lower():
matches[keyword].append(line.strip())
return matches
def search_log_for_keywords(self, log_path, keywords, use_regex=False, log_stream=None):
matches = {}
# Read from stream
if log_stream is not None:
for line in log_stream:
matches = self.find_keywords_in_line(keywords, line.decode("utf-8"), use_regex=use_regex)
else:
# Open the log file and search line by line
with open(log_path, "r") as log_file:
for line in log_file:
matches = self.find_keywords_in_line(keywords, line, use_regex=use_regex)
# Check if there were any matches
if any(matches[keyword] for keyword in keywords):
@ -146,5 +170,31 @@ class DockerManager:
logger.debug(f"Found matches for keyword '{keyword}': {lines}")
return matches
else:
logger.debug("No errors found in the nomos logs.")
logger.debug("No keywords found in the nomos logs.")
return None
def stop(container):
if container:
logger.debug(f"Stopping container with id {container.short_id}")
container.stop()
try:
container.remove()
except:
pass
logger.debug("Container stopped.")
return None
def kill(container):
if container:
logger.debug(f"Killing container with id {container.short_id}")
container.kill()
try:
container.remove()
except:
pass
logger.debug("Container killed.")
return None

View File

@ -19,10 +19,16 @@ NOMOS = "nomos"
NOMOS_EXECUTOR = "nomos_executor"
CFGSYNC = "cfgsync"
DEFAULT_IMAGE = "ghcr.io/logos-co/nomos-node:latest"
NODE_1 = get_env_var("NODE_1", NOMOS)
NODE_2 = get_env_var("NODE_2", NOMOS_EXECUTOR)
NODE_3 = get_env_var("NODE_3", CFGSYNC)
NOMOS_IMAGE = get_env_var("NOMOS_IMAGE", DEFAULT_IMAGE)
NOMOS_CLI = "/usr/bin/nomos-cli"
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{NOMOS},{NOMOS}")
# more nodes need to follow the NODE_X pattern
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")

View File

@ -1,3 +1,5 @@
import random
import string
import uuid
from datetime import datetime
from time import sleep
@ -20,3 +22,19 @@ def delay(num_seconds):
def gen_step_id():
return f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}__{str(uuid.uuid4())}"
def generate_log_prefix():
return "".join(random.choices(string.ascii_lowercase, k=4))
def to_index(n: int) -> list:
if n < 0:
raise ValueError("Input must be an unsigned integer (non-negative)")
return list(n.to_bytes(8, byteorder="big"))
def to_app_id(n: int) -> list:
if n < 0:
raise ValueError("Input must be an unsigned integer (non-negative)")
return list(n.to_bytes(32, byteorder="big"))

View File

@ -1,18 +1,20 @@
from src.env_vars import NOMOS_IMAGE
nomos_nodes = {
"nomos": {
"image": "nomos:latest",
"image": NOMOS_IMAGE,
"volumes": ["cluster_config:/etc/nomos", "./kzgrs/kzgrs_test_params:/kzgrs_test_params:z"],
"ports": ["3000/udp", "18080/tcp"],
"entrypoint": "/etc/nomos/scripts/run_nomos_node.sh",
},
"nomos_executor": {
"image": "nomos:latest",
"image": NOMOS_IMAGE,
"volumes": ["cluster_config:/etc/nomos", "./kzgrs/kzgrs_test_params:/kzgrs_test_params:z"],
"ports": ["3000/udp", "18080/tcp"],
"entrypoint": "/etc/nomos/scripts/run_nomos_executor.sh",
},
"cfgsync": {
"image": "nomos:latest",
"image": NOMOS_IMAGE,
"volumes": ["cluster_config:/etc/nomos"],
"ports": "",
"entrypoint": "/etc/nomos/scripts/run_cfgsync.sh",

View File

@ -4,8 +4,8 @@ from src.data_storage import DS
from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed
from src.node.api_clients.rest import REST
from src.node.docker_mananger import DockerManager
from src.api_clients.rest import REST
from src.docker_manager import DockerManager, stop, kill
from src.env_vars import DOCKER_LOG_DIR
from src.node.node_vars import nomos_nodes
from src.test_data import LOG_ERROR_KEYWORDS
@ -43,7 +43,7 @@ class NomosNode:
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
def start(self, wait_for_node_sec=120, **kwargs):
logger.debug("Starting Node...")
logger.debug(f"Starting Node {self._container_name} with role {self._node_type}")
self._docker_manager.create_network()
self._ext_ip = self._docker_manager.generate_random_ext_ip()
@ -84,27 +84,11 @@ class NomosNode:
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
try:
self._container.remove()
except:
pass
self._container = None
logger.debug("Container stopped.")
self._container = stop(self._container)
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
if self._container:
logger.debug(f"Killing container with id {self._container.short_id}")
self._container.kill()
try:
self._container.remove()
except:
pass
self._container = None
logger.debug("Container killed.")
self._container = kill(self._container)
def restart(self):
if self._container:

View File

@ -46,26 +46,25 @@ class StepsCommon:
start_nodes(self.main_nodes)
try:
ensure_nodes_ready(self.main_nodes[2:])
ensure_nodes_ready(self.main_nodes[1:])
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
@pytest.fixture(scope="function")
def setup_5_node_cluster(self, request):
def setup_4_node_cluster(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
prepare_cluster_config(5)
prepare_cluster_config(4)
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS, "nomos_node_1")
self.node4 = NomosNode(NOMOS, "nomos_node_2")
self.node5 = NomosNode(NOMOS, "nomos_node_3")
self.node6 = NomosNode(NOMOS_EXECUTOR, "nomos_node_4")
self.main_nodes.extend([self.node1, self.node2, self.node3, self.node4, self.node5, self.node6])
self.node5 = NomosNode(NOMOS_EXECUTOR, "nomos_node_3")
self.main_nodes.extend([self.node1, self.node2, self.node3, self.node4, self.node5])
start_nodes(self.main_nodes)
try:
ensure_nodes_ready(self.main_nodes[2:])
ensure_nodes_ready(self.main_nodes[1:])
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise

View File

@ -1,17 +1,18 @@
import allure
from tenacity import retry, stop_after_delay, wait_fixed
from src.env_vars import NOMOS_EXECUTOR
from src.steps.common import StepsCommon
def add_padding(orig_bytes):
block_size = 31
"""
Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme:
- The value of each padded byte is the number of bytes padded.
- If the original data is already a multiple of the block size,
an additional full block of bytes (each the block size) is added.
"""
block_size = 31
original_len = len(orig_bytes)
padding_needed = block_size - (original_len % block_size)
# If the data is already a multiple of block_size, add a full block of padding
@ -23,6 +24,28 @@ def add_padding(orig_bytes):
return padded_bytes
def remove_padding(padded_bytes):
"""
Removes PKCS#7-like padding from a list of bytes.
Raises:
ValueError: If the padding is incorrect.
Returns:
The original list of bytes without padding.
"""
if not padded_bytes:
raise ValueError("The input is empty, cannot remove padding.")
padding_len = padded_bytes[-1]
if padding_len < 1 or padding_len > 31:
raise ValueError("Invalid padding length.")
if padded_bytes[-padding_len:] != [padding_len] * padding_len:
raise ValueError("Invalid padding bytes.")
return padded_bytes[:-padding_len]
def prepare_dispersal_request(data, app_id, index):
data_bytes = data.encode("utf-8")
padded_bytes = add_padding(list(data_bytes))
@ -35,8 +58,15 @@ def prepare_get_range_request(app_id, start_index, end_index):
return query_data
class StepsDataAvailability(StepsCommon):
def response_contains_data(response):
for index, blobs in response:
if len(blobs) != 0:
return True
return False
class StepsDataAvailability(StepsCommon):
def find_executor_node(self):
executor = {}
for node in self.main_nodes:
@ -45,6 +75,7 @@ class StepsDataAvailability(StepsCommon):
return executor
@allure.step
@retry(stop=stop_after_delay(65), wait=wait_fixed(1), reraise=True)
def disperse_data(self, data, app_id, index):
request = prepare_dispersal_request(data, app_id, index)
executor = self.find_executor_node()
@ -54,6 +85,7 @@ class StepsDataAvailability(StepsCommon):
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
@allure.step
@retry(stop=stop_after_delay(45), wait=wait_fixed(1), reraise=True)
def get_data_range(self, node, app_id, start, end):
response = []
query = prepare_get_range_request(app_id, start, end)
@ -62,4 +94,6 @@ class StepsDataAvailability(StepsCommon):
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
assert response_contains_data(response), "Get data range response is empty"
return response

View File

@ -1,29 +1,42 @@
import json
import random
import pytest
from src.cli.nomos_cli import NomosCli
from src.libs.common import delay, to_app_id, to_index
from src.libs.custom_logger import get_custom_logger
from src.steps.da import StepsDataAvailability
from src.test_data import DATA_TO_DISPERSE
logger = get_custom_logger(__name__)
class TestDataIntegrity(StepsDataAvailability):
main_nodes = []
@pytest.mark.skip(reason="Waiting for PR https://github.com/logos-co/nomos-node/pull/994")
@pytest.mark.usefixtures("setup_5_node_cluster")
@pytest.mark.usefixtures("setup_4_node_cluster")
def test_da_identify_retrieve_missing_columns(self):
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
received_data = []
# Get data only from half of nodes
for node in self.main_nodes[2:4]:
received_data.append(self.get_data_range(node, [0] * 31 + [1], [0] * 8, [0] * 7 + [3]))
delay(5)
self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0))
delay(5)
# Select one target node at random to get blob data for 1/2 columns
selected_node = self.main_nodes[random.randint(1, 3)]
rcv_data = self.get_data_range(selected_node, to_app_id(1), to_index(0), to_index(5))
rcv_data_json = json.dumps(rcv_data)
# Use received blob data to reconstruct the original data
# nomos-cli reconstruct command required
reconstructed_data = []
assert DATA_TO_DISPERSE[0] == bytes(reconstructed_data).decode("utf-8")
reconstructed_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json])
assert DATA_TO_DISPERSE[1] == reconstructed_data, "Reconstructed data are not same with original data"
@pytest.mark.skip(reason="Waiting for Nomos testnet images could evolve blockchain")
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_da_sampling_determines_data_presence(self):
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
received_data = self.get_data_range(self.node2, [0] * 31 + [1], [0] * 8, [0] * 7 + [5])
assert DATA_TO_DISPERSE[0] == bytes(received_data[0][1]).decode("utf-8")
delay(5)
self.disperse_data(DATA_TO_DISPERSE[1], to_app_id(1), to_index(0))
delay(5)
rcv_data = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5))
rcv_data_json = json.dumps(rcv_data)
decoded_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json], decode_only=True)
assert DATA_TO_DISPERSE[1] == decoded_data, "Retrieved data are not same with original data"