logos-blockchain-specs/da/test_encoder.py
2025-06-04 15:14:04 +02:00

103 lines
4.4 KiB
Python

from itertools import chain, batched
from random import randbytes
from unittest import TestCase
from da import encoder
from da.common import Column
from kzg_rs.bdfg_proving import derive_challenge
from da.encoder import DAEncoderParams, DAEncoder
from da.verifier import DAVerifier, DAShare
from eth2spec.eip7594.mainnet import BYTES_PER_FIELD_ELEMENT, BLSFieldElement
from da.kzg_rs.common import ROOTS_OF_UNITY
from da.kzg_rs import kzg, rs
class TestEncoder(TestCase):
def setUp(self):
self.params: DAEncoderParams = DAEncoderParams(column_count=16, bytes_per_chunk=31)
self.encoder: DAEncoder = DAEncoder(self.params)
self.elements = 32
self.data = bytearray(
chain.from_iterable(
randbytes(self.params.bytes_per_chunk)
for _ in range(self.elements)
)
)
def assert_encoding(self, encoder_params: DAEncoderParams, data: bytes):
encoded_data = encoder.DAEncoder(encoder_params).encode(data)
self.assertEqual(encoded_data.data, data)
extended_factor = 2
column_count = encoder_params.column_count*extended_factor
columns_len = len(list(encoded_data.extended_matrix.columns))
self.assertEqual(columns_len, column_count)
chunks_size = (len(data) // encoder_params.bytes_per_chunk) // encoder_params.column_count
self.assertEqual(len(encoded_data.row_commitments), chunks_size)
verifier = DAVerifier()
# verify columns
for idx, (column, column_proof) in enumerate(zip(encoded_data.extended_matrix.columns, encoded_data.combined_column_proofs)):
share = DAShare(
column=Column(column),
column_idx=idx,
combined_column_proof=column_proof,
row_commitments=encoded_data.row_commitments
)
verifier.verify(share)
def test_chunkify(self):
encoder_settings = DAEncoderParams(column_count=2, bytes_per_chunk=31)
elements = 10
data = bytes(chain.from_iterable(int.to_bytes(0, length=31, byteorder='big') for _ in range(elements)))
_encoder = encoder.DAEncoder(encoder_settings)
chunks_matrix = _encoder._chunkify_data(data)
self.assertEqual(len(chunks_matrix), elements//encoder_settings.column_count)
for row in chunks_matrix:
self.assertEqual(len(row), encoder_settings.column_count)
self.assertEqual(len(row[0]), 32)
def test_compute_row_kzg_commitments(self):
chunks_matrix = self.encoder._chunkify_data(self.data)
polynomials, commitments = zip(*self.encoder._compute_row_kzg_commitments(chunks_matrix))
self.assertEqual(len(commitments), len(chunks_matrix))
self.assertEqual(len(polynomials), len(chunks_matrix))
def test_rs_encode_rows(self):
chunks_matrix = self.encoder._chunkify_data(self.data)
extended_chunks_matrix = self.encoder._rs_encode_rows(chunks_matrix)
for r1, r2 in zip(chunks_matrix, extended_chunks_matrix):
self.assertEqual(len(r1), len(r2)//2)
r2 = [BLSFieldElement.from_bytes(x) for x in r2]
poly_1 = kzg.bytes_to_polynomial(r1.as_bytes())
# we check against decoding so we now the encoding was properly done
poly_2 = rs.decode(r2, ROOTS_OF_UNITY, len(poly_1))
self.assertEqual(poly_1, poly_2)
def test_generate_combined_column_proofs(self):
chunks_matrix = self.encoder._chunkify_data(self.data)
row_polynomials, row_commitments = zip(*self.encoder._compute_row_kzg_commitments(chunks_matrix))
h = derive_challenge(row_commitments)
combined_poly = self.encoder._combined_polynomial(row_polynomials, h)
proofs = self.encoder._compute_combined_column_proofs(combined_poly)
expected_extended_columns = self.params.column_count * 2
self.assertEqual(len(proofs), expected_extended_columns)
def test_encode(self):
from random import randbytes
sizes = [pow(2, exp) for exp in range(4, 8, 2)]
encoder_params = DAEncoderParams(
column_count=8,
bytes_per_chunk=31
)
for size in sizes:
data = bytes(
chain.from_iterable(
randbytes(encoder_params.bytes_per_chunk)
for _ in range(size*encoder_params.column_count)
)
)
self.assert_encoding(encoder_params, data)