mirror of
https://github.com/logos-blockchain/logos-blockchain-specs.git
synced 2026-05-21 00:29:41 +00:00
KZG core functionality (#73)
* Added polynomial class * Added common types and constants * Implement commitment and proof generation * Added basic tests * Use custom polynomial * use evaluation form for building polynomial * Use fast division on polynomials * Fix poly operations * Add non working verification * Make verification work * Expand verify test * Cleanup imports * Update deps * Update common.py added verify setup mechanism * Added trusted setup, updated common to use gp generator and added setup verification test * Added comments --------- Co-authored-by: megonen <146561843+megonen@users.noreply.github.com>
This commit is contained in:
parent
9a54d90d14
commit
d15eaa2d98
0
da/kzg_rs/__init__.py
Normal file
0
da/kzg_rs/__init__.py
Normal file
19
da/kzg_rs/common.py
Normal file
19
da/kzg_rs/common.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
from typing import List
|
||||||
|
|
||||||
|
import eth2spec.eip7594.mainnet
|
||||||
|
from eth2spec.eip7594.mainnet import BLSFieldElement, compute_roots_of_unity
|
||||||
|
from py_ecc.bls.typing import G1Uncompressed, G2Uncompressed
|
||||||
|
from remerkleable.basic import uint64
|
||||||
|
from da.kzg_rs.trusted_setup import generate_setup
|
||||||
|
|
||||||
|
G1 = G1Uncompressed
|
||||||
|
G2 = G2Uncompressed
|
||||||
|
|
||||||
|
|
||||||
|
BYTES_PER_FIELD_ELEMENT = 32
|
||||||
|
GLOBAL_PARAMETERS: List[G1]
|
||||||
|
GLOBAL_PARAMETERS_G2: List[G2]
|
||||||
|
# secret is fixed but this should come from a different synchronization protocol
|
||||||
|
GLOBAL_PARAMETERS, GLOBAL_PARAMETERS_G2 = map(list, generate_setup(1024, 8, 1987))
|
||||||
|
ROOTS_OF_UNITY: List[BLSFieldElement] = list(compute_roots_of_unity(uint64(4096)))
|
||||||
|
BLS_MODULUS = eth2spec.eip7594.mainnet.BLS_MODULUS
|
||||||
74
da/kzg_rs/kzg.py
Normal file
74
da/kzg_rs/kzg.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
from functools import reduce
|
||||||
|
from itertools import batched
|
||||||
|
from typing import Sequence
|
||||||
|
|
||||||
|
from eth2spec.deneb.mainnet import bytes_to_bls_field, BLSFieldElement, KZGCommitment as Commitment, KZGProof as Proof
|
||||||
|
from eth2spec.utils import bls
|
||||||
|
|
||||||
|
from .common import BYTES_PER_FIELD_ELEMENT, G1, BLS_MODULUS, GLOBAL_PARAMETERS_G2
|
||||||
|
from .poly import Polynomial
|
||||||
|
|
||||||
|
|
||||||
|
def bytes_to_polynomial(bytes: bytearray) -> Polynomial:
|
||||||
|
"""
|
||||||
|
Convert bytes to list of BLS field scalars.
|
||||||
|
"""
|
||||||
|
assert len(bytes) % BYTES_PER_FIELD_ELEMENT == 0
|
||||||
|
eval_form = [int(bytes_to_bls_field(b)) for b in batched(bytes, int(BYTES_PER_FIELD_ELEMENT))]
|
||||||
|
return Polynomial.from_evaluations(eval_form, BLS_MODULUS)
|
||||||
|
|
||||||
|
|
||||||
|
def g1_linear_combination(polynomial: Polynomial[BLSFieldElement], global_parameters: Sequence[G1]) -> Commitment:
|
||||||
|
"""
|
||||||
|
BLS multiscalar multiplication.
|
||||||
|
"""
|
||||||
|
# we assert to have more points available than elements,
|
||||||
|
# this is dependent on the available kzg setup size
|
||||||
|
assert len(polynomial) <= len(global_parameters)
|
||||||
|
point = reduce(
|
||||||
|
bls.add,
|
||||||
|
(bls.multiply(g, p) for g, p in zip(global_parameters, polynomial)),
|
||||||
|
bls.Z1()
|
||||||
|
)
|
||||||
|
return Commitment(bls.G1_to_bytes48(point))
|
||||||
|
|
||||||
|
|
||||||
|
def bytes_to_commitment(b: bytearray, global_parameters: Sequence[G1]) -> Commitment:
|
||||||
|
poly = bytes_to_polynomial(b)
|
||||||
|
return g1_linear_combination(poly, global_parameters)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_element_proof(
|
||||||
|
element_index: int,
|
||||||
|
polynomial: Polynomial,
|
||||||
|
global_parameters: Sequence[G1],
|
||||||
|
roots_of_unity: Sequence[BLSFieldElement],
|
||||||
|
) -> Proof:
|
||||||
|
# compute a witness polynomial in that satisfies `witness(x) = (f(x)-v)/(x-u)`
|
||||||
|
u = int(roots_of_unity[element_index])
|
||||||
|
v = polynomial.eval(u)
|
||||||
|
f_x_v = polynomial - Polynomial([v], BLS_MODULUS)
|
||||||
|
x_u = Polynomial([-u, 1], BLS_MODULUS)
|
||||||
|
witness, _ = f_x_v / x_u
|
||||||
|
return g1_linear_combination(witness, global_parameters)
|
||||||
|
|
||||||
|
|
||||||
|
def verify_element_proof(
|
||||||
|
polynomial: Polynomial,
|
||||||
|
commitment: Commitment,
|
||||||
|
proof: Proof,
|
||||||
|
element_index: int,
|
||||||
|
roots_of_unity: Sequence[BLSFieldElement],
|
||||||
|
) -> bool:
|
||||||
|
u = int(roots_of_unity[element_index])
|
||||||
|
v = polynomial.eval(u)
|
||||||
|
commitment_check_G1 = bls.bytes48_to_G1(commitment) - bls.multiply(bls.G1(), v)
|
||||||
|
proof_check_g2 = bls.add(
|
||||||
|
GLOBAL_PARAMETERS_G2[1],
|
||||||
|
bls.neg(bls.multiply(bls.G2(), u))
|
||||||
|
)
|
||||||
|
return bls.pairing_check([
|
||||||
|
# G2 here needs to be negated due to library requirements as pairing_check([[G1, -G2], [G1, G2]])
|
||||||
|
[commitment_check_G1, bls.neg(bls.G2())],
|
||||||
|
[bls.bytes48_to_G1(proof), proof_check_g2],
|
||||||
|
])
|
||||||
85
da/kzg_rs/poly.py
Normal file
85
da/kzg_rs/poly.py
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
from itertools import zip_longest
|
||||||
|
from typing import List, Sequence, Self
|
||||||
|
|
||||||
|
from sympy import ntt, intt
|
||||||
|
|
||||||
|
|
||||||
|
class Polynomial[T]:
|
||||||
|
def __init__(self, coefficients, modulus):
|
||||||
|
self.coefficients = coefficients
|
||||||
|
self.modulus = modulus
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_evaluations(cls, evalutaions: Sequence[T], modulus) -> Self:
|
||||||
|
coefficients = intt(evalutaions, prime=modulus)
|
||||||
|
return cls(coefficients, modulus)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "Polynomial({}, modulus={})".format(self.coefficients, self.modulus)
|
||||||
|
|
||||||
|
def __add__(self, other):
|
||||||
|
return Polynomial(
|
||||||
|
[(a + b) % self.modulus for a, b in zip_longest(self.coefficients, other.coefficients, fillvalue=0)],
|
||||||
|
self.modulus
|
||||||
|
)
|
||||||
|
|
||||||
|
def __sub__(self, other):
|
||||||
|
return Polynomial(
|
||||||
|
[(a - b) % self.modulus for a, b in zip_longest(self.coefficients, other.coefficients, fillvalue=0)],
|
||||||
|
self.modulus
|
||||||
|
)
|
||||||
|
|
||||||
|
def __mul__(self, other):
|
||||||
|
result = [0] * (len(self.coefficients) + len(other.coefficients) - 1)
|
||||||
|
for i in range(len(self.coefficients)):
|
||||||
|
for j in range(len(other.coefficients)):
|
||||||
|
result[i + j] = (result[i + j] + self.coefficients[i] * other.coefficients[j]) % self.modulus
|
||||||
|
return Polynomial(result, self.modulus)
|
||||||
|
|
||||||
|
def divide(self, other):
|
||||||
|
if not isinstance(other, Polynomial):
|
||||||
|
raise ValueError("Unsupported type for division.")
|
||||||
|
|
||||||
|
dividend = list(self.coefficients)
|
||||||
|
divisor = list(other.coefficients)
|
||||||
|
|
||||||
|
quotient = []
|
||||||
|
remainder = dividend
|
||||||
|
|
||||||
|
while len(remainder) >= len(divisor):
|
||||||
|
factor = remainder[-1] * pow(divisor[-1], -1, self.modulus) % self.modulus
|
||||||
|
quotient.insert(0, factor)
|
||||||
|
|
||||||
|
# Subtract divisor * factor from remainder
|
||||||
|
for i in range(len(divisor)):
|
||||||
|
remainder[len(remainder) - len(divisor) + i] -= divisor[i] * factor
|
||||||
|
remainder[len(remainder) - len(divisor) + i] %= self.modulus
|
||||||
|
|
||||||
|
# Remove leading zeros from remainder
|
||||||
|
while remainder and remainder[-1] == 0:
|
||||||
|
remainder.pop()
|
||||||
|
|
||||||
|
return Polynomial(quotient, self.modulus), Polynomial(remainder, self.modulus)
|
||||||
|
|
||||||
|
def __truediv__(self, other):
|
||||||
|
return self.divide(other)
|
||||||
|
|
||||||
|
def __neg__(self):
|
||||||
|
return Polynomial([-1 * c for c in self.coefficients], self.modulus)
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.coefficients)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self.coefficients)
|
||||||
|
|
||||||
|
def __getitem__(self, item):
|
||||||
|
return self.coefficients[item]
|
||||||
|
|
||||||
|
def eval(self, element):
|
||||||
|
return sum(
|
||||||
|
(pow(element, i)*x) % self.modulus for i, x in enumerate(self.coefficients)
|
||||||
|
) % self.modulus
|
||||||
|
|
||||||
|
def evaluation_form(self) -> List[T]:
|
||||||
|
return ntt(self.coefficients, prime=self.modulus)
|
||||||
48
da/kzg_rs/trusted_setup.py
Normal file
48
da/kzg_rs/trusted_setup.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
import random
|
||||||
|
from typing import Tuple, Sequence, Generator
|
||||||
|
from eth2spec.utils import bls
|
||||||
|
from itertools import accumulate, repeat
|
||||||
|
|
||||||
|
|
||||||
|
def __linear_combination(points, coeffs, zero=bls.Z1()):
|
||||||
|
o = zero
|
||||||
|
for point, coeff in zip(points, coeffs):
|
||||||
|
o = bls.add(o, bls.multiply(point, coeff))
|
||||||
|
return o
|
||||||
|
|
||||||
|
|
||||||
|
# Verifies the integrity of a setup
|
||||||
|
def verify_setup(setup) -> bool:
|
||||||
|
g1_setup, g2_setup = setup
|
||||||
|
g1_random_coefficients = [random.randrange(2**40) for _ in range(len(g1_setup) - 1)]
|
||||||
|
g1_lower = __linear_combination(g1_setup[:-1], g1_random_coefficients, bls.Z1())
|
||||||
|
g1_upper = __linear_combination(g1_setup[1:], g1_random_coefficients, bls.Z1())
|
||||||
|
g2_random_coefficients = [random.randrange(2**40) for _ in range(len(g2_setup) - 1)]
|
||||||
|
g2_lower = __linear_combination(g2_setup[:-1], g2_random_coefficients, bls.Z2())
|
||||||
|
g2_upper = __linear_combination(g2_setup[1:], g2_random_coefficients, bls.Z2())
|
||||||
|
return (
|
||||||
|
g1_setup[0] == bls.G1() and
|
||||||
|
g2_setup[0] == bls.G2() and
|
||||||
|
bls.pairing_check([[g1_upper, bls.neg(g2_lower)], [g1_lower, g2_upper]])
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_one_sided_setup(length, secret, generator=bls.G1()):
|
||||||
|
def __take(gen):
|
||||||
|
return (next(gen) for _ in range(length))
|
||||||
|
|
||||||
|
secrets = repeat(secret)
|
||||||
|
|
||||||
|
return __take(accumulate(secrets, bls.multiply, initial=generator))
|
||||||
|
|
||||||
|
|
||||||
|
# Generate a trusted setup with the given secret
|
||||||
|
def generate_setup(
|
||||||
|
g1_length,
|
||||||
|
g2_length,
|
||||||
|
secret
|
||||||
|
) -> Tuple[Generator[bls.G1, None, None], Generator[bls.G2, None, None]]:
|
||||||
|
return (
|
||||||
|
generate_one_sided_setup(g1_length, secret, bls.G1()),
|
||||||
|
generate_one_sided_setup(g2_length, secret, bls.G2()),
|
||||||
|
)
|
||||||
61
da/test_kzg.py
Normal file
61
da/test_kzg.py
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
from itertools import chain, batched
|
||||||
|
from random import randrange
|
||||||
|
from unittest import TestCase
|
||||||
|
|
||||||
|
from eth2spec.deneb.mainnet import BLS_MODULUS, bytes_to_bls_field
|
||||||
|
|
||||||
|
from da.kzg_rs import kzg
|
||||||
|
from da.kzg_rs.common import BYTES_PER_FIELD_ELEMENT, GLOBAL_PARAMETERS, ROOTS_OF_UNITY, GLOBAL_PARAMETERS_G2
|
||||||
|
from da.kzg_rs.trusted_setup import verify_setup
|
||||||
|
|
||||||
|
|
||||||
|
class TestKZG(TestCase):
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def rand_bytes(size=1024):
|
||||||
|
return bytearray(
|
||||||
|
chain.from_iterable(
|
||||||
|
int.to_bytes(randrange(BLS_MODULUS), length=BYTES_PER_FIELD_ELEMENT)
|
||||||
|
for _ in range(size)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_kzg_setup(self):
|
||||||
|
self.assertTrue(verify_setup((GLOBAL_PARAMETERS, GLOBAL_PARAMETERS_G2)))
|
||||||
|
|
||||||
|
def test_poly_forms(self):
|
||||||
|
rand_bytes = self.rand_bytes(8)
|
||||||
|
eval_form = [int(bytes_to_bls_field(b)) for b in batched(rand_bytes, int(BYTES_PER_FIELD_ELEMENT))]
|
||||||
|
poly = kzg.bytes_to_polynomial(rand_bytes)
|
||||||
|
self.assertEqual(poly.evaluation_form(), eval_form)
|
||||||
|
self.assertEqual(poly.evaluation_form()[0], poly.eval(int(ROOTS_OF_UNITY[0])))
|
||||||
|
|
||||||
|
def test_commitment(self):
|
||||||
|
rand_bytes = self.rand_bytes(32)
|
||||||
|
commit = kzg.bytes_to_commitment(rand_bytes, GLOBAL_PARAMETERS)
|
||||||
|
self.assertEqual(len(commit), 48)
|
||||||
|
|
||||||
|
def test_proof(self):
|
||||||
|
rand_bytes = self.rand_bytes(2)
|
||||||
|
poly = kzg.bytes_to_polynomial(rand_bytes)
|
||||||
|
proof = kzg.generate_element_proof(0, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY)
|
||||||
|
self.assertEqual(len(proof), 48)
|
||||||
|
|
||||||
|
def test_verify(self):
|
||||||
|
n_chunks = 32
|
||||||
|
rand_bytes = self.rand_bytes(n_chunks)
|
||||||
|
commit = kzg.bytes_to_commitment(rand_bytes, GLOBAL_PARAMETERS)
|
||||||
|
poly = kzg.bytes_to_polynomial(rand_bytes)
|
||||||
|
for n in range(n_chunks):
|
||||||
|
proof = kzg.generate_element_proof(n, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY)
|
||||||
|
self.assertEqual(len(proof), 48)
|
||||||
|
self.assertTrue(kzg.verify_element_proof(
|
||||||
|
poly, commit, proof, n, ROOTS_OF_UNITY
|
||||||
|
)
|
||||||
|
)
|
||||||
|
proof = kzg.generate_element_proof(0, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY)
|
||||||
|
for n in range(1, n_chunks):
|
||||||
|
self.assertFalse(kzg.verify_element_proof(
|
||||||
|
poly, commit, proof, n, ROOTS_OF_UNITY
|
||||||
|
)
|
||||||
|
)
|
||||||
@ -6,3 +6,4 @@ pycparser==2.21
|
|||||||
pysphinx==0.0.1
|
pysphinx==0.0.1
|
||||||
scipy==1.11.4
|
scipy==1.11.4
|
||||||
black==23.12.1
|
black==23.12.1
|
||||||
|
sympy==1.12
|
||||||
Loading…
x
Reference in New Issue
Block a user