mirror of
https://github.com/logos-blockchain/logos-blockchain-e2e-tests.git
synced 2026-01-02 13:13:08 +00:00
fix: conversion functions for index and app_id
This commit is contained in:
parent
d340256605
commit
cdc7836e33
@ -1,7 +1,6 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from enum import Enum
|
||||
|
||||
from src.data_storage import DS
|
||||
from src.libs.common import generate_log_prefix
|
||||
|
||||
@ -49,7 +49,7 @@ class DockerManager:
|
||||
cli_args = command
|
||||
|
||||
cli_args_str_for_log = " ".join(cli_args)
|
||||
logger.debug(f"docker run -i -t {port_bindings} {image_name} {cli_args_str_for_log}")
|
||||
logger.debug(f"docker run -i -t --entrypoint {entrypoint} {port_bindings} {image_name} {cli_args_str_for_log}")
|
||||
|
||||
try:
|
||||
container = self._client.containers.run(
|
||||
|
||||
@ -26,3 +26,15 @@ def gen_step_id():
|
||||
|
||||
def generate_log_prefix():
|
||||
return "".join(random.choices(string.ascii_lowercase, k=4))
|
||||
|
||||
|
||||
def to_index(n: int) -> list:
|
||||
if n < 0:
|
||||
raise ValueError("Input must be an unsigned integer (non-negative)")
|
||||
return list(n.to_bytes(8, byteorder="big"))
|
||||
|
||||
|
||||
def to_app_id(n: int) -> list:
|
||||
if n < 0:
|
||||
raise ValueError("Input must be an unsigned integer (non-negative)")
|
||||
return list(n.to_bytes(32, byteorder="big"))
|
||||
|
||||
@ -6,13 +6,13 @@ from src.steps.common import StepsCommon
|
||||
|
||||
|
||||
def add_padding(orig_bytes):
|
||||
block_size = 31
|
||||
"""
|
||||
Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme:
|
||||
- The value of each padded byte is the number of bytes padded.
|
||||
- If the original data is already a multiple of the block size,
|
||||
an additional full block of bytes (each the block size) is added.
|
||||
"""
|
||||
block_size = 31
|
||||
original_len = len(orig_bytes)
|
||||
padding_needed = block_size - (original_len % block_size)
|
||||
# If the data is already a multiple of block_size, add a full block of padding
|
||||
@ -29,7 +29,6 @@ def remove_padding(padded_bytes):
|
||||
Removes PKCS#7-like padding from a list of bytes.
|
||||
Raises:
|
||||
ValueError: If the padding is incorrect.
|
||||
|
||||
Returns:
|
||||
The original list of bytes without padding.
|
||||
"""
|
||||
|
||||
@ -4,7 +4,7 @@ import random
|
||||
import pytest
|
||||
|
||||
from src.cli.nomos_cli import NomosCli
|
||||
from src.libs.common import delay
|
||||
from src.libs.common import delay, to_app_id, to_index
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.da import StepsDataAvailability
|
||||
from src.test_data import DATA_TO_DISPERSE
|
||||
@ -18,11 +18,11 @@ class TestDataIntegrity(StepsDataAvailability):
|
||||
@pytest.mark.usefixtures("setup_4_node_cluster")
|
||||
def test_da_identify_retrieve_missing_columns(self):
|
||||
delay(5)
|
||||
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
|
||||
self.disperse_data(DATA_TO_DISPERSE[0], to_app_id(1), to_index(0))
|
||||
delay(5)
|
||||
# Select one target node at random to get blob data for 1/2 columns
|
||||
selected_node = self.main_nodes[random.randint(1, 3)]
|
||||
rcv_data = self.get_data_range(selected_node, [0] * 31 + [1], [0] * 8, [0] * 7 + [5])
|
||||
rcv_data = self.get_data_range(selected_node, to_app_id(1), to_index(0), to_index(5))
|
||||
rcv_data_json = json.dumps(rcv_data)
|
||||
|
||||
reconstructed_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json])
|
||||
@ -32,9 +32,9 @@ class TestDataIntegrity(StepsDataAvailability):
|
||||
@pytest.mark.usefixtures("setup_2_node_cluster")
|
||||
def test_da_sampling_determines_data_presence(self):
|
||||
delay(5)
|
||||
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
|
||||
self.disperse_data(DATA_TO_DISPERSE[0], to_app_id(1), to_index(0))
|
||||
delay(5)
|
||||
rcv_data = self.get_data_range(self.node2, [0] * 31 + [1], [0] * 8, [0] * 7 + [5])
|
||||
rcv_data = self.get_data_range(self.node2, to_app_id(1), to_index(0), to_index(5))
|
||||
rcv_data_json = json.dumps(rcv_data)
|
||||
|
||||
decoded_data = NomosCli(command="reconstruct").run(input_values=[rcv_data_json], decode_only=True)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user