138 lines
6.9 KiB
Python
138 lines
6.9 KiB
Python
from itertools import chain, batched
|
|
from random import randrange, randbytes
|
|
from unittest import TestCase
|
|
|
|
from eth2spec.deneb.mainnet import bytes_to_bls_field
|
|
|
|
from da import encoder
|
|
from da.encoder import DAEncoderParams, DAEncoder
|
|
from eth2spec.eip7594.mainnet import BYTES_PER_FIELD_ELEMENT, BLSFieldElement
|
|
|
|
from da.kzg_rs.common import BLS_MODULUS, ROOTS_OF_UNITY
|
|
from da.kzg_rs import kzg, rs
|
|
|
|
|
|
class TestEncoder(TestCase):
|
|
def setUp(self):
|
|
self.params: DAEncoderParams = DAEncoderParams(column_count=16, bytes_per_chunk=31)
|
|
self.encoder: DAEncoder = DAEncoder(self.params)
|
|
self.elements = 32
|
|
self.data = bytearray(
|
|
chain.from_iterable(
|
|
randbytes(self.params.bytes_per_chunk)
|
|
for _ in range(self.elements)
|
|
)
|
|
)
|
|
|
|
def assert_encoding(self, encoder_params: DAEncoderParams, data: bytes):
|
|
encoded_data = encoder.DAEncoder(encoder_params).encode(data)
|
|
self.assertEqual(encoded_data.data, data)
|
|
extended_factor = 2
|
|
column_count = encoder_params.column_count*extended_factor
|
|
columns_len = len(list(encoded_data.extended_matrix.columns))
|
|
self.assertEqual(columns_len, column_count)
|
|
chunks_size = (len(data) // encoder_params.bytes_per_chunk) // encoder_params.column_count
|
|
self.assertEqual(len(encoded_data.row_commitments), chunks_size)
|
|
self.assertEqual(len(encoded_data.row_proofs), chunks_size)
|
|
self.assertEqual(len(encoded_data.row_proofs[0]), column_count)
|
|
self.assertIsNotNone(encoded_data.aggregated_column_commitment)
|
|
self.assertEqual(len(encoded_data.aggregated_column_proofs), columns_len)
|
|
|
|
# verify rows
|
|
for row, proofs, commitment in zip(encoded_data.extended_matrix, encoded_data.row_proofs, encoded_data.row_commitments):
|
|
for i, (chunk, proof) in enumerate(zip(row, proofs)):
|
|
self.assertTrue(
|
|
kzg.verify_element_proof(bytes_to_bls_field(chunk), commitment, proof, i, ROOTS_OF_UNITY)
|
|
)
|
|
|
|
# verify column aggregation
|
|
for i, (column, proof) in enumerate(zip(encoded_data.extended_matrix.columns, encoded_data.aggregated_column_proofs)):
|
|
data = DAEncoder.hash_column_and_commitment(column, commitment)
|
|
kzg.verify_element_proof(
|
|
bytes_to_bls_field(data),
|
|
encoded_data.aggregated_column_commitment,
|
|
proof,
|
|
i,
|
|
ROOTS_OF_UNITY
|
|
)
|
|
|
|
def test_chunkify(self):
|
|
encoder_settings = DAEncoderParams(column_count=2, bytes_per_chunk=31)
|
|
elements = 10
|
|
data = bytes(chain.from_iterable(int.to_bytes(0, length=31, byteorder='big') for _ in range(elements)))
|
|
_encoder = encoder.DAEncoder(encoder_settings)
|
|
chunks_matrix = _encoder._chunkify_data(data)
|
|
self.assertEqual(len(chunks_matrix), elements//encoder_settings.column_count)
|
|
for row in chunks_matrix:
|
|
self.assertEqual(len(row), encoder_settings.column_count)
|
|
self.assertEqual(len(row[0]), 32)
|
|
|
|
def test_compute_row_kzg_commitments(self):
|
|
chunks_matrix = self.encoder._chunkify_data(self.data)
|
|
polynomials, commitments = zip(*self.encoder._compute_row_kzg_commitments(chunks_matrix))
|
|
self.assertEqual(len(commitments), len(chunks_matrix))
|
|
self.assertEqual(len(polynomials), len(chunks_matrix))
|
|
|
|
def test_rs_encode_rows(self):
|
|
chunks_matrix = self.encoder._chunkify_data(self.data)
|
|
extended_chunks_matrix = self.encoder._rs_encode_rows(chunks_matrix)
|
|
for r1, r2 in zip(chunks_matrix, extended_chunks_matrix):
|
|
self.assertEqual(len(r1), len(r2)//2)
|
|
r2 = [BLSFieldElement.from_bytes(x) for x in r2]
|
|
poly_1 = kzg.bytes_to_polynomial(r1.as_bytes())
|
|
# we check against decoding so we now the encoding was properly done
|
|
poly_2 = rs.decode(r2, ROOTS_OF_UNITY, len(poly_1))
|
|
self.assertEqual(poly_1, poly_2)
|
|
|
|
def test_compute_rows_proofs(self):
|
|
chunks_matrix = self.encoder._chunkify_data(self.data)
|
|
polynomials, commitments = zip(*self.encoder._compute_row_kzg_commitments(chunks_matrix))
|
|
extended_chunks_matrix = self.encoder._rs_encode_rows(chunks_matrix)
|
|
original_proofs = self.encoder._compute_rows_proofs(chunks_matrix, polynomials, commitments)
|
|
extended_proofs = self.encoder._compute_rows_proofs(extended_chunks_matrix, polynomials, commitments)
|
|
# check original sized matrix
|
|
for row, poly, commitment, proofs in zip(chunks_matrix, polynomials, commitments, original_proofs):
|
|
self.assertEqual(len(proofs), len(row))
|
|
for i, chunk in enumerate(row):
|
|
self.assertTrue(kzg.verify_element_proof(BLSFieldElement.from_bytes(chunk), commitment, proofs[i], i, ROOTS_OF_UNITY))
|
|
# check extended matrix
|
|
for row, poly, commitment, proofs in zip(extended_chunks_matrix, polynomials, commitments, extended_proofs):
|
|
for i, chunk in enumerate(row):
|
|
self.assertTrue(kzg.verify_element_proof(BLSFieldElement.from_bytes(chunk), commitment, proofs[i], i, ROOTS_OF_UNITY))
|
|
|
|
def test_compute_column_kzg_commitments(self):
|
|
chunks_matrix = self.encoder._chunkify_data(self.data)
|
|
polynomials, commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix))
|
|
self.assertEqual(len(commitments), len(chunks_matrix[0]))
|
|
self.assertEqual(len(polynomials), len(chunks_matrix[0]))
|
|
|
|
def test_generate_aggregated_column_commitments(self):
|
|
chunks_matrix = self.encoder._chunkify_data(self.data)
|
|
_, column_commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix))
|
|
poly, commitment = self.encoder._compute_aggregated_column_commitment(chunks_matrix, column_commitments)
|
|
self.assertIsNotNone(poly)
|
|
self.assertIsNotNone(commitment)
|
|
|
|
def test_generate_aggregated_column_proofs(self):
|
|
chunks_matrix = self.encoder._chunkify_data(self.data)
|
|
_, column_commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix))
|
|
poly, _ = self.encoder._compute_aggregated_column_commitment(chunks_matrix, column_commitments)
|
|
proofs = self.encoder._compute_aggregated_column_proofs(poly, column_commitments)
|
|
self.assertEqual(len(proofs), len(column_commitments))
|
|
|
|
def test_encode(self):
|
|
from random import randbytes
|
|
sizes = [pow(2, exp) for exp in range(4, 8, 2)]
|
|
encoder_params = DAEncoderParams(
|
|
column_count=8,
|
|
bytes_per_chunk=31
|
|
)
|
|
for size in sizes:
|
|
data = bytes(
|
|
chain.from_iterable(
|
|
randbytes(encoder_params.bytes_per_chunk)
|
|
for _ in range(size*encoder_params.column_count)
|
|
)
|
|
)
|
|
self.assert_encoding(encoder_params, data)
|