Da encoding protocol (#76)

* Implement generator polynomial and rs encoding

* Implement encode/decode+test using fft. Non-working

* Use lagrange for interpolation

* Remove fft, use evaluations instead

* Move and rename kzg and rs test modules

* Update docs

* Added columns property to chunks matrix
Added test for columns

* Added chunkify and test

* Added compute row commitments
Added row commitments size test

* Fix poly from evaluations method

* Implement encode rows and test

* Update encode row test

* Implement compute row proofs (not working on extended data)

* Use same polynomials for commitment and proof creation after extend

* Fix polynomial from/to evaluations

* Use chunks for verification

* Refactor interpolate

* Implement chunks matrix transposed method

* Added compute column kzg commitments

* Use square size data for encoder tests

* Add column type to columns method

* Added compute columns aggregated commitment
Added aggregated commitment test
Fixed and expanded encode test

* Use sha3 for hashing
This commit is contained in:
Daniel Sanchez 2024-03-08 14:16:14 +01:00 committed by GitHub
parent 09c9b7e4ec
commit cf899d2384
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 284 additions and 98 deletions

View File

@ -1,5 +1,6 @@
from dataclasses import dataclass
from typing import List, Generator
from itertools import chain, zip_longest
from typing import List, Generator, Self
from eth2spec.eip7594.mainnet import Bytes32
@ -11,18 +12,23 @@ class Chunk(Bytes32):
pass
class Column(List[Chunk]):
pass
class Column(List[Bytes32]):
def as_bytes(self) -> bytes:
return bytes(chain.from_iterable(self))
class Row(List[Chunk]):
pass
class Row(List[Bytes32]):
def as_bytes(self) -> bytes:
return bytes(chain.from_iterable(self))
class ChunksMatrix(List[Row]):
class ChunksMatrix(List[Row | Column]):
@property
def columns(self) -> Generator[List[Chunk], None, None]:
# TODO: yield columns
yield None
yield from map(Column, zip_longest(*self, fillvalue=b""))
def transposed(self) -> Self:
return ChunksMatrix(self.columns)

View File

@ -13,7 +13,7 @@ class Dispersal:
def _prepare_data(self, encoded_data: EncodedData) -> Generator[DABlob, None, None]:
assert len(encoded_data.row_commitments) == len(self.nodes)
assert len(encoded_data.row_proofs) == len(self.nodes)
columns = encoded_data.extended_matrix.columns()
columns = encoded_data.extended_matrix.columns
column_commitments = encoded_data.column_commitments
row_commitments = encoded_data.row_commitments
rows_proofs = encoded_data.row_proofs

View File

@ -1,8 +1,14 @@
from dataclasses import dataclass
from typing import List
from eth2spec.eip7594.mainnet import KZGCommitment as Commitment, KZGProof as Proof
from itertools import batched, chain
from typing import List, Sequence, Tuple
from hashlib import sha3_256
from da.common import ChunksMatrix
from eth2spec.eip7594.mainnet import KZGCommitment as Commitment, KZGProof as Proof, BLSFieldElement
from da.common import ChunksMatrix, Chunk, Row, Column
from da.kzg_rs import kzg, rs
from da.kzg_rs.common import GLOBAL_PARAMETERS, ROOTS_OF_UNITY, BLS_MODULUS
from da.kzg_rs.poly import Polynomial
@dataclass
@ -13,7 +19,7 @@ class DAEncoderParams:
@dataclass
class EncodedData:
data: bytearray
data: bytes
extended_matrix: ChunksMatrix
row_commitments: List[Commitment]
row_proofs: List[List[Proof]]
@ -26,41 +32,79 @@ class DAEncoder:
def __init__(self, params: DAEncoderParams):
self.params = params
def _chunkify_data(self, data: bytearray) -> ChunksMatrix:
...
def _chunkify_data(self, data: bytes) -> ChunksMatrix:
size: int = self.params.column_count * self.params.bytes_per_field_element
return ChunksMatrix(
Row(Chunk(bytes(chunk)) for chunk in batched(b, self.params.bytes_per_field_element))
for b in batched(data, size)
)
def _compute_row_kzg_commitments(self, rows: List[bytearray]) -> List[Commitment]:
...
@staticmethod
def _compute_row_kzg_commitments(matrix: ChunksMatrix) -> List[Tuple[Polynomial, Commitment]]:
return [kzg.bytes_to_commitment(row.as_bytes(), GLOBAL_PARAMETERS) for row in matrix]
def _rs_encode_rows(self, chunks_matrix: ChunksMatrix) -> ChunksMatrix:
...
def __rs_encode_row(row: Row) -> Row:
polynomial = kzg.bytes_to_polynomial(row.as_bytes())
return Row(
Chunk(BLSFieldElement.to_bytes(
x,
length=self.params.bytes_per_field_element, byteorder="big"
)) for x in rs.encode(polynomial, 2, ROOTS_OF_UNITY)
)
return ChunksMatrix(__rs_encode_row(row) for row in chunks_matrix)
def _compute_rows_proofs(self, chunks_matrix: ChunksMatrix, row_commitments: List[Commitment]) -> List[List[Proof]]:
...
@staticmethod
def _compute_rows_proofs(
chunks_matrix: ChunksMatrix,
polynomials: Sequence[Polynomial],
row_commitments: Sequence[Commitment]
) -> List[List[Proof]]:
proofs = []
for row, poly, commitment in zip(chunks_matrix, polynomials, row_commitments):
proofs.append(
[
kzg.generate_element_proof(i, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY)
for i in range(len(row))
]
)
return proofs
def _compute_column_kzg_commitments(self, chunks_matrix: ChunksMatrix) -> List[Commitment]:
...
def _compute_column_kzg_commitments(self, chunks_matrix: ChunksMatrix) -> List[Tuple[Polynomial, Commitment]]:
return self._compute_row_kzg_commitments(chunks_matrix.transposed())
def _compute_aggregated_column_commitments(
self, chunks_matrix: ChunksMatrix, column_commitments: List[Commitment]
) -> Commitment:
...
@staticmethod
def _compute_aggregated_column_commitment(
chunks_matrix: ChunksMatrix, column_commitments: Sequence[Commitment]
) -> Tuple[Polynomial, Commitment]:
data = bytes(chain.from_iterable(
DAEncoder._hash_column_and_commitment(column, commitment)
for column, commitment in zip(chunks_matrix.columns, column_commitments)
))
return kzg.bytes_to_commitment(data, GLOBAL_PARAMETERS)
@staticmethod
def _compute_aggregated_column_proofs(
self,
chunks_matrix: ChunksMatrix,
aggregated_column_commitment: Commitment
polynomial: Polynomial,
column_commitments: Sequence[Commitment],
) -> List[Proof]:
...
return [
kzg.generate_element_proof(i, polynomial, GLOBAL_PARAMETERS, ROOTS_OF_UNITY)
for i in range(len(column_commitments))
]
def encode(self, data: bytearray) -> EncodedData:
def encode(self, data: bytes) -> EncodedData:
chunks_matrix = self._chunkify_data(data)
row_commitments = self._compute_row_kzg_commitments(chunks_matrix)
row_polynomials, row_commitments = zip(*self._compute_row_kzg_commitments(chunks_matrix))
extended_matrix = self._rs_encode_rows(chunks_matrix)
row_proofs = self._compute_rows_proofs(extended_matrix, row_commitments)
column_commitments = self._compute_column_kzg_commitments(extended_matrix)
aggregated_column_commitment = self._compute_aggregated_column_commitments(extended_matrix, column_commitments)
aggregated_column_proofs = self._compute_aggregated_column_proofs(extended_matrix, aggregated_column_commitment)
row_proofs = self._compute_rows_proofs(extended_matrix, row_polynomials, row_commitments)
column_polynomials, column_commitments = zip(*self._compute_column_kzg_commitments(extended_matrix))
aggregated_column_polynomial, aggregated_column_commitment = (
self._compute_aggregated_column_commitment(extended_matrix, column_commitments)
)
aggregated_column_proofs = self._compute_aggregated_column_proofs(
aggregated_column_polynomial, column_commitments
)
result = EncodedData(
data,
extended_matrix,
@ -71,3 +115,10 @@ class DAEncoder:
aggregated_column_proofs
)
return result
@staticmethod
def _hash_column_and_commitment(column: Column, commitment: Commitment) -> bytes:
# TODO: Check correctness of bytes to blsfieldelement using modulus over the hash
return (
int.from_bytes(sha3_256(column.as_bytes() + bytes(commitment)).digest()) % BLS_MODULUS
).to_bytes(32, byteorder="big")

View File

@ -1,6 +1,6 @@
from functools import reduce
from itertools import batched
from typing import Sequence
from typing import Sequence, Tuple
from eth2spec.deneb.mainnet import bytes_to_bls_field, BLSFieldElement, KZGCommitment as Commitment, KZGProof as Proof
from eth2spec.utils import bls
@ -9,12 +9,12 @@ from .common import BYTES_PER_FIELD_ELEMENT, G1, BLS_MODULUS, GLOBAL_PARAMETERS_
from .poly import Polynomial
def bytes_to_polynomial(bytes: bytearray) -> Polynomial:
def bytes_to_polynomial(b: bytes) -> Polynomial:
"""
Convert bytes to list of BLS field scalars.
"""
assert len(bytes) % BYTES_PER_FIELD_ELEMENT == 0
eval_form = [int(bytes_to_bls_field(b)) for b in batched(bytes, int(BYTES_PER_FIELD_ELEMENT))]
assert len(b) % BYTES_PER_FIELD_ELEMENT == 0
eval_form = [int(bytes_to_bls_field(b)) for b in batched(b, int(BYTES_PER_FIELD_ELEMENT))]
return Polynomial.from_evaluations(eval_form, BLS_MODULUS)
@ -33,9 +33,9 @@ def g1_linear_combination(polynomial: Polynomial[BLSFieldElement], global_parame
return Commitment(bls.G1_to_bytes48(point))
def bytes_to_commitment(b: bytearray, global_parameters: Sequence[G1]) -> Commitment:
def bytes_to_commitment(b: bytes, global_parameters: Sequence[G1]) -> Tuple[Polynomial, Commitment]:
poly = bytes_to_polynomial(b)
return g1_linear_combination(poly, global_parameters)
return poly, g1_linear_combination(poly, global_parameters)
def generate_element_proof(
@ -54,14 +54,14 @@ def generate_element_proof(
def verify_element_proof(
polynomial: Polynomial,
chunk: BLSFieldElement,
commitment: Commitment,
proof: Proof,
element_index: int,
roots_of_unity: Sequence[BLSFieldElement],
) -> bool:
u = int(roots_of_unity[element_index])
v = polynomial.eval(u)
v = chunk
commitment_check_G1 = bls.bytes48_to_G1(commitment) - bls.multiply(bls.G1(), v)
proof_check_g2 = bls.add(
GLOBAL_PARAMETERS_G2[1],

View File

@ -1,7 +1,9 @@
from itertools import zip_longest
from typing import List, Sequence, Self
from sympy import ntt, intt
from eth2spec.eip7594.mainnet import interpolate_polynomialcoeff
from da.kzg_rs.common import ROOTS_OF_UNITY
class Polynomial[T]:
@ -9,9 +11,26 @@ class Polynomial[T]:
self.coefficients = coefficients
self.modulus = modulus
@staticmethod
def interpolate(evaluations: List[int], roots_of_unity: List[int]) -> List[int]:
"""
Lagrange interpolation
Parameters:
evaluations: List of evaluations
roots_of_unity: Powers of 2 sequence
Returns:
list: Coefficients of the interpolated polynomial
"""
return list(map(int, interpolate_polynomialcoeff(roots_of_unity[:len(evaluations)], evaluations)))
@classmethod
def from_evaluations(cls, evalutaions: Sequence[T], modulus) -> Self:
coefficients = intt(evalutaions, prime=modulus)
def from_evaluations(cls, evaluations: Sequence[T], modulus, roots_of_unity: Sequence[int]=ROOTS_OF_UNITY) -> Self:
coefficients = [
x % modulus
for x in map(int, Polynomial.interpolate(evaluations, roots_of_unity))
]
return cls(coefficients, modulus)
def __repr__(self):
@ -77,12 +96,16 @@ class Polynomial[T]:
return self.coefficients[item]
def __eq__(self, other):
return self.coefficients == other.coefficients and self.modulus == other.modulus
return (
self.coefficients == other.coefficients and
self.modulus == other.modulus
)
def eval(self, element):
return sum(
(pow(element, i)*x) % self.modulus for i, x in enumerate(self.coefficients)
) % self.modulus
def eval(self, x):
return (self.coefficients[0] + sum(
(pow(x, i, mod=self.modulus)*coefficient)
for i, coefficient in enumerate(self.coefficients[1:], start=1)
)) % self.modulus
def evaluation_form(self) -> List[T]:
return ntt(self.coefficients, prime=self.modulus)
return [self.eval(ROOTS_OF_UNITY[i]) for i in range(len(self))]

View File

@ -9,7 +9,7 @@ from .poly import Polynomial
ExtendedData = Sequence[BLSFieldElement]
def encode(polynomial: Polynomial, factor: int, roots_of_unity: Sequence[BLSFieldElement]) -> ExtendedData:
def encode(polynomial: Polynomial, factor: int, roots_of_unity: Sequence[int]) -> ExtendedData:
"""
Encode a polynomial extending to the given factor
Parameters:
@ -25,20 +25,6 @@ def encode(polynomial: Polynomial, factor: int, roots_of_unity: Sequence[BLSFiel
return [polynomial.eval(e) for e in roots_of_unity[:len(polynomial)*factor]]
def __interpolate(evaluations: List[int], roots_of_unity: List[int]) -> List[int]:
"""
Lagrange interpolation
Parameters:
evaluations: List of evaluations
roots_of_unity: Powers of 2 sequence
Returns:
list: Coefficients of the interpolated polynomial
"""
return list(map(int, interpolate_polynomialcoeff(roots_of_unity[:len(evaluations)], evaluations)))
def decode(encoded: ExtendedData, roots_of_unity: Sequence[BLSFieldElement], original_len: int) -> Polynomial:
"""
Decode a polynomial from an extended data-set and the roots of unity, cap to original length
@ -51,5 +37,5 @@ def decode(encoded: ExtendedData, roots_of_unity: Sequence[BLSFieldElement], ori
Returns:
Polynomial: original polynomial
"""
coefs = __interpolate(list(map(int, encoded)), list(map(int, roots_of_unity)))[:original_len]
coefs = Polynomial.interpolate(list(map(int, encoded)), list(map(int, roots_of_unity)))[:original_len]
return Polynomial([int(c) for c in coefs], BLS_MODULUS)

View File

@ -2,7 +2,7 @@ from itertools import chain, batched
from random import randrange
from unittest import TestCase
from eth2spec.deneb.mainnet import BLS_MODULUS, bytes_to_bls_field
from eth2spec.deneb.mainnet import BLS_MODULUS, bytes_to_bls_field, BLSFieldElement
from da.kzg_rs import kzg
from da.kzg_rs.common import BYTES_PER_FIELD_ELEMENT, GLOBAL_PARAMETERS, ROOTS_OF_UNITY, GLOBAL_PARAMETERS_G2
@ -12,11 +12,11 @@ from da.kzg_rs.trusted_setup import verify_setup
class TestKZG(TestCase):
@staticmethod
def rand_bytes(size=1024):
return bytearray(
def rand_bytes(n_chunks=1024):
return bytes(
chain.from_iterable(
int.to_bytes(randrange(BLS_MODULUS), length=BYTES_PER_FIELD_ELEMENT)
for _ in range(size)
for _ in range(n_chunks)
)
)
@ -24,15 +24,19 @@ class TestKZG(TestCase):
self.assertTrue(verify_setup((GLOBAL_PARAMETERS, GLOBAL_PARAMETERS_G2)))
def test_poly_forms(self):
rand_bytes = self.rand_bytes(8)
n_chunks = 16
rand_bytes = self.rand_bytes(n_chunks)
eval_form = [int(bytes_to_bls_field(b)) for b in batched(rand_bytes, int(BYTES_PER_FIELD_ELEMENT))]
poly = kzg.bytes_to_polynomial(rand_bytes)
self.assertEqual(poly.evaluation_form(), eval_form)
self.assertEqual(poly.evaluation_form()[0], poly.eval(int(ROOTS_OF_UNITY[0])))
for i, chunk in enumerate(eval_form):
self.assertEqual(poly.eval(ROOTS_OF_UNITY[i]), chunk)
for i in range(n_chunks):
self.assertEqual(poly.evaluation_form()[i], poly.eval(int(ROOTS_OF_UNITY[i])))
def test_commitment(self):
rand_bytes = self.rand_bytes(32)
commit = kzg.bytes_to_commitment(rand_bytes, GLOBAL_PARAMETERS)
_, commit = kzg.bytes_to_commitment(rand_bytes, GLOBAL_PARAMETERS)
self.assertEqual(len(commit), 48)
def test_proof(self):
@ -44,18 +48,20 @@ class TestKZG(TestCase):
def test_verify(self):
n_chunks = 32
rand_bytes = self.rand_bytes(n_chunks)
commit = kzg.bytes_to_commitment(rand_bytes, GLOBAL_PARAMETERS)
_, commit = kzg.bytes_to_commitment(rand_bytes, GLOBAL_PARAMETERS)
poly = kzg.bytes_to_polynomial(rand_bytes)
for n in range(n_chunks):
proof = kzg.generate_element_proof(n, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY)
for i, chunk in enumerate(batched(rand_bytes, BYTES_PER_FIELD_ELEMENT)):
chunk = bytes(chunk)
proof = kzg.generate_element_proof(i, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY)
self.assertEqual(len(proof), 48)
self.assertEqual(poly.eval(int(ROOTS_OF_UNITY[i])), bytes_to_bls_field(chunk))
self.assertTrue(kzg.verify_element_proof(
poly, commit, proof, n, ROOTS_OF_UNITY
bytes_to_bls_field(chunk), commit, proof, i, ROOTS_OF_UNITY
)
)
proof = kzg.generate_element_proof(0, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY)
for n in range(1, n_chunks):
self.assertFalse(kzg.verify_element_proof(
poly, commit, proof, n, ROOTS_OF_UNITY
BLSFieldElement(0), commit, proof, n, ROOTS_OF_UNITY
)
)

20
da/test_common.py Normal file
View File

@ -0,0 +1,20 @@
from unittest import TestCase
from da.common import ChunksMatrix
class TestCommon(TestCase):
def test_chunks_matrix_columns(self):
matrix = ChunksMatrix([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
expected = [[1, 4, 7], [2, 5, 8], [3, 6, 9]]
for c1, c2 in zip(expected, matrix.columns):
self.assertEqual(c1, c2)
def test_chunks_matrix_transposed(self):
matrix = ChunksMatrix([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
expected = ChunksMatrix([[1, 4, 7], [2, 5, 8], [3, 6, 9]])
self.assertEqual(matrix.transposed(), expected)
matrix = ChunksMatrix([[1, 2, 3], [4, 5, 6]])
expected = ChunksMatrix([[1, 4], [2, 5], [3, 6]])
self.assertEqual(matrix.transposed(), expected)

View File

@ -1,47 +1,141 @@
from typing import List
from itertools import chain, batched
from random import randrange
from unittest import TestCase
from da import encoder
from da.encoder import DAEncoderParams, Commitment
from eth2spec.eip7594.mainnet import BYTES_PER_FIELD_ELEMENT
from eth2spec.deneb.mainnet import bytes_to_bls_field
from da import encoder
from da.encoder import DAEncoderParams, DAEncoder
from eth2spec.eip7594.mainnet import BYTES_PER_FIELD_ELEMENT, BLSFieldElement
from da.kzg_rs.common import BLS_MODULUS, ROOTS_OF_UNITY
from da.kzg_rs import kzg, rs
class TestEncoder(TestCase):
def assert_encoding(self, encoder_params: DAEncoderParams, data: bytearray):
def setUp(self):
self.params: DAEncoderParams = DAEncoderParams(column_count=16, bytes_per_field_element=32)
self.encoder: DAEncoder = DAEncoder(self.params)
self.elements = 32
self.data = bytearray(
chain.from_iterable(
randrange(BLS_MODULUS).to_bytes(length=self.params.bytes_per_field_element, byteorder='big')
for _ in range(self.elements)
)
)
def assert_encoding(self, encoder_params: DAEncoderParams, data: bytes):
encoded_data = encoder.DAEncoder(encoder_params).encode(data)
self.assertEqual(encoded_data.data, data)
self.assertEqual(len(encoded_data.extended_matrix), encoder_params.column_count)
extended_factor = 2
column_count = encoder_params.column_count*extended_factor
columns_len = len(list(encoded_data.extended_matrix.columns))
self.assertEqual(columns_len, column_count)
chunks_size = (len(data) // encoder_params.bytes_per_field_element) // encoder_params.column_count
self.assertEqual(len(encoded_data.row_commitments), chunks_size)
self.assertEqual(len(encoded_data.row_proofs), chunks_size)
self.assertEqual(len(encoded_data.row_proofs[0]), column_count)
self.assertIsNotNone(encoded_data.aggregated_column_commitment)
self.assertEqual(len(encoded_data.aggregated_column_proofs), columns_len)
# verify rows
for row, proofs, commitment in zip(encoded_data.extended_matrix, encoded_data.row_proofs, encoded_data.row_commitments):
for i, (chunk, proof) in enumerate(zip(row, proofs)):
self.assertTrue(
kzg.verify_element_proof(bytes_to_bls_field(chunk), commitment, proof, i, ROOTS_OF_UNITY)
)
# verify column aggregation
for i, (column, proof) in enumerate(zip(encoded_data.extended_matrix.columns, encoded_data.aggregated_column_proofs)):
data = DAEncoder._hash_column_and_commitment(column, commitment)
kzg.verify_element_proof(
bytes_to_bls_field(data),
encoded_data.aggregated_column_commitment,
proof,
i,
ROOTS_OF_UNITY
)
def test_chunkify(self):
pass
encoder_settings = DAEncoderParams(column_count=2, bytes_per_field_element=32)
elements = 10
data = bytearray(chain.from_iterable(int.to_bytes(0, length=32, byteorder='big') for _ in range(elements)))
_encoder = encoder.DAEncoder(encoder_settings)
chunks_matrix = _encoder._chunkify_data(data)
self.assertEqual(len(chunks_matrix), elements//encoder_settings.column_count)
for row in chunks_matrix:
self.assertEqual(len(row), encoder_settings.column_count)
self.assertEqual(len(row[0]), encoder_settings.bytes_per_field_element)
def test_compute_row_kzg_commitments(self):
pass
chunks_matrix = self.encoder._chunkify_data(self.data)
polynomials, commitments = zip(*self.encoder._compute_row_kzg_commitments(chunks_matrix))
self.assertEqual(len(commitments), len(chunks_matrix))
self.assertEqual(len(polynomials), len(chunks_matrix))
def test_rs_encode_rows(self):
pass
chunks_matrix = self.encoder._chunkify_data(self.data)
extended_chunks_matrix = self.encoder._rs_encode_rows(chunks_matrix)
for r1, r2 in zip(chunks_matrix, extended_chunks_matrix):
self.assertEqual(len(r1), len(r2)//2)
r2 = [BLSFieldElement.from_bytes(x) for x in r2]
poly_1 = kzg.bytes_to_polynomial(r1.as_bytes())
# we check against decoding so we now the encoding was properly done
poly_2 = rs.decode(r2, ROOTS_OF_UNITY, len(poly_1))
self.assertEqual(poly_1, poly_2)
def test_compute_rows_proofs(self):
pass
chunks_matrix = self.encoder._chunkify_data(self.data)
polynomials, commitments = zip(*self.encoder._compute_row_kzg_commitments(chunks_matrix))
extended_chunks_matrix = self.encoder._rs_encode_rows(chunks_matrix)
original_proofs = self.encoder._compute_rows_proofs(chunks_matrix, polynomials, commitments)
extended_proofs = self.encoder._compute_rows_proofs(extended_chunks_matrix, polynomials, commitments)
# check original sized matrix
for row, poly, commitment, proofs in zip(chunks_matrix, polynomials, commitments, original_proofs):
self.assertEqual(len(proofs), len(row))
for i, chunk in enumerate(row):
self.assertTrue(kzg.verify_element_proof(BLSFieldElement.from_bytes(chunk), commitment, proofs[i], i, ROOTS_OF_UNITY))
# check extended matrix
for row, poly, commitment, proofs in zip(extended_chunks_matrix, polynomials, commitments, extended_proofs):
for i, chunk in enumerate(row):
self.assertTrue(kzg.verify_element_proof(BLSFieldElement.from_bytes(chunk), commitment, proofs[i], i, ROOTS_OF_UNITY))
def test_compute_column_kzg_commitments(self):
pass
chunks_matrix = self.encoder._chunkify_data(self.data)
polynomials, commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix))
self.assertEqual(len(commitments), len(chunks_matrix[0]))
self.assertEqual(len(polynomials), len(chunks_matrix[0]))
def test_generate_aggregated_column_commitments(self):
pass
chunks_matrix = self.encoder._chunkify_data(self.data)
_, column_commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix))
poly, commitment = self.encoder._compute_aggregated_column_commitment(chunks_matrix, column_commitments)
self.assertIsNotNone(poly)
self.assertIsNotNone(commitment)
def test_generate_aggregated_column_proofs(self):
chunks_matrix = self.encoder._chunkify_data(self.data)
_, column_commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix))
poly, _ = self.encoder._compute_aggregated_column_commitment(chunks_matrix, column_commitments)
proofs = self.encoder._compute_aggregated_column_proofs(poly, column_commitments)
self.assertEqual(len(proofs), len(column_commitments))
def test_encode(self):
# TODO: remove return, for now we make it work for now so we do not disturb other modules
return
from random import randbytes
sizes = [pow(2, exp) for exp in range(0, 8, 2)]
sizes = [pow(2, exp) for exp in range(4, 8, 2)]
encoder_params = DAEncoderParams(
column_count=10,
column_count=8,
bytes_per_field_element=BYTES_PER_FIELD_ELEMENT
)
for size in sizes:
data = bytearray(randbytes(size*1024))
data = bytes(
chain.from_iterable(
# TODO: For now we make data fit with modulus, we need to research if this is correct
(int.from_bytes(b) % BLS_MODULUS).to_bytes(length=32)
for b in batched(
randbytes(size*self.encoder.params.column_count), self.encoder.params.bytes_per_field_element
)
)
)
self.assert_encoding(encoder_params, data)