666 lines
26 KiB
Python
666 lines
26 KiB
Python
"""
|
|
KZG 4844 test vectors generator
|
|
"""
|
|
|
|
from hashlib import sha256
|
|
from typing import Tuple, Iterable, Any, Callable, Dict
|
|
|
|
from eth_utils import (
|
|
encode_hex,
|
|
int_to_big_endian,
|
|
)
|
|
|
|
from eth2spec.utils import bls
|
|
from eth2spec.test.helpers.constants import DENEB
|
|
from eth2spec.test.helpers.typing import SpecForkName
|
|
from eth2spec.gen_helpers.gen_base import gen_runner, gen_typing
|
|
from eth2spec.deneb import spec
|
|
|
|
|
|
def expect_exception(func, *args):
|
|
try:
|
|
func(*args)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
raise Exception("should have raised exception")
|
|
|
|
|
|
def field_element_bytes(x):
|
|
return int.to_bytes(x % spec.BLS_MODULUS, 32, spec.KZG_ENDIANNESS)
|
|
|
|
|
|
def field_element_bytes_unchecked(x):
|
|
return int.to_bytes(x, 32, spec.KZG_ENDIANNESS)
|
|
|
|
|
|
def encode_hex_list(a):
|
|
return [encode_hex(x) for x in a]
|
|
|
|
|
|
def bls_add_one(x):
|
|
"""
|
|
Adds "one" (actually bls.G1()) to a compressed group element.
|
|
Useful to compute definitely incorrect proofs.
|
|
"""
|
|
return bls.G1_to_bytes48(
|
|
bls.add(bls.bytes48_to_G1(x), bls.G1())
|
|
)
|
|
|
|
|
|
def evaluate_blob_at(blob, z):
|
|
return field_element_bytes(
|
|
spec.evaluate_polynomial_in_evaluation_form(spec.blob_to_polynomial(blob), spec.bytes_to_bls_field(z))
|
|
)
|
|
|
|
|
|
BLS_MODULUS_BYTES = spec.BLS_MODULUS.to_bytes(32, spec.KZG_ENDIANNESS)
|
|
|
|
G1 = bls.G1_to_bytes48(bls.G1())
|
|
G1_INVALID_TOO_FEW_BYTES = G1[:-1]
|
|
G1_INVALID_TOO_MANY_BYTES = G1 + b"\x00"
|
|
G1_INVALID_P1_NOT_IN_G1 = bytes.fromhex("8123456789abcdef0123456789abcdef0123456789abcdef" +
|
|
"0123456789abcdef0123456789abcdef0123456789abcdef")
|
|
G1_INVALID_P1_NOT_ON_CURVE = bytes.fromhex("8123456789abcdef0123456789abcdef0123456789abcdef" +
|
|
"0123456789abcdef0123456789abcdef0123456789abcde0")
|
|
INVALID_G1_POINTS = [G1_INVALID_TOO_FEW_BYTES, G1_INVALID_TOO_MANY_BYTES,
|
|
G1_INVALID_P1_NOT_IN_G1, G1_INVALID_P1_NOT_ON_CURVE]
|
|
|
|
BLOB_ALL_ZEROS = spec.Blob()
|
|
BLOB_ALL_TWOS = spec.Blob(b''.join([field_element_bytes(2) for n in range(4096)]))
|
|
BLOB_RANDOM_VALID1 = spec.Blob(b''.join([field_element_bytes(pow(2, n + 256, spec.BLS_MODULUS)) for n in range(4096)]))
|
|
BLOB_RANDOM_VALID2 = spec.Blob(b''.join([field_element_bytes(pow(3, n + 256, spec.BLS_MODULUS)) for n in range(4096)]))
|
|
BLOB_RANDOM_VALID3 = spec.Blob(b''.join([field_element_bytes(pow(5, n + 256, spec.BLS_MODULUS)) for n in range(4096)]))
|
|
BLOB_ALL_MODULUS_MINUS_ONE = spec.Blob(b''.join([field_element_bytes(spec.BLS_MODULUS - 1) for n in range(4096)]))
|
|
BLOB_ALMOST_ZERO = spec.Blob(b''.join([field_element_bytes(1 if n == 3211 else 0) for n in range(4096)]))
|
|
BLOB_INVALID = spec.Blob(b'\xFF' * 4096 * 32)
|
|
BLOB_INVALID_CLOSE = spec.Blob(b''.join(
|
|
[BLS_MODULUS_BYTES if n == 2111 else field_element_bytes(0) for n in range(4096)]
|
|
))
|
|
BLOB_INVALID_LENGTH_PLUS_ONE = BLOB_RANDOM_VALID1 + b"\x00"
|
|
BLOB_INVALID_LENGTH_MINUS_ONE = BLOB_RANDOM_VALID1[:-1]
|
|
|
|
VALID_BLOBS = [BLOB_ALL_ZEROS, BLOB_ALL_TWOS, BLOB_RANDOM_VALID1, BLOB_RANDOM_VALID2,
|
|
BLOB_RANDOM_VALID3, BLOB_ALL_MODULUS_MINUS_ONE, BLOB_ALMOST_ZERO]
|
|
INVALID_BLOBS = [BLOB_INVALID, BLOB_INVALID_CLOSE, BLOB_INVALID_LENGTH_PLUS_ONE, BLOB_INVALID_LENGTH_MINUS_ONE]
|
|
|
|
FE_VALID1 = field_element_bytes(0)
|
|
FE_VALID2 = field_element_bytes(1)
|
|
FE_VALID3 = field_element_bytes(2)
|
|
FE_VALID4 = field_element_bytes(pow(5, 1235, spec.BLS_MODULUS))
|
|
FE_VALID5 = field_element_bytes(spec.BLS_MODULUS - 1)
|
|
FE_VALID6 = field_element_bytes(spec.compute_roots_of_unity(spec.FIELD_ELEMENTS_PER_BLOB)[1])
|
|
VALID_FIELD_ELEMENTS = [FE_VALID1, FE_VALID2, FE_VALID3, FE_VALID4, FE_VALID5, FE_VALID6]
|
|
|
|
FE_INVALID_EQUAL_TO_MODULUS = field_element_bytes_unchecked(spec.BLS_MODULUS)
|
|
FE_INVALID_MODULUS_PLUS_ONE = field_element_bytes_unchecked(spec.BLS_MODULUS + 1)
|
|
FE_INVALID_UINT256_MAX = field_element_bytes_unchecked(2**256 - 1)
|
|
FE_INVALID_UINT256_MID = field_element_bytes_unchecked(2**256 - 2**128)
|
|
FE_INVALID_LENGTH_PLUS_ONE = VALID_FIELD_ELEMENTS[0] + b"\x00"
|
|
FE_INVALID_LENGTH_MINUS_ONE = VALID_FIELD_ELEMENTS[0][:-1]
|
|
INVALID_FIELD_ELEMENTS = [FE_INVALID_EQUAL_TO_MODULUS, FE_INVALID_MODULUS_PLUS_ONE,
|
|
FE_INVALID_UINT256_MAX, FE_INVALID_UINT256_MID,
|
|
FE_INVALID_LENGTH_PLUS_ONE, FE_INVALID_LENGTH_MINUS_ONE]
|
|
|
|
|
|
def hash(x):
|
|
return sha256(x).digest()
|
|
|
|
|
|
def int_to_hex(n: int, byte_length: int = None) -> str:
|
|
byte_value = int_to_big_endian(n)
|
|
if byte_length:
|
|
byte_value = byte_value.rjust(byte_length, b'\x00')
|
|
return encode_hex(byte_value)
|
|
|
|
|
|
def case01_blob_to_kzg_commitment():
|
|
# Valid cases
|
|
for blob in VALID_BLOBS:
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
yield f'blob_to_kzg_commitment_case_valid_blob_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
},
|
|
'output': encode_hex(commitment)
|
|
}
|
|
|
|
# Edge case: Invalid blobs
|
|
for blob in INVALID_BLOBS:
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
expect_exception(spec.blob_to_kzg_commitment, blob)
|
|
yield f'blob_to_kzg_commitment_case_invalid_blob_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob)
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
|
|
def case02_compute_kzg_proof():
|
|
# Valid cases
|
|
for blob in VALID_BLOBS:
|
|
for z in VALID_FIELD_ELEMENTS:
|
|
proof, y = spec.compute_kzg_proof(blob, z)
|
|
identifier = f'{encode_hex(hash(blob))}_{encode_hex(z)}'
|
|
yield f'compute_kzg_proof_case_valid_blob_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'z': encode_hex(z),
|
|
},
|
|
'output': (encode_hex(proof), encode_hex(y))
|
|
}
|
|
|
|
# Edge case: Invalid blobs
|
|
for blob in INVALID_BLOBS:
|
|
z = VALID_FIELD_ELEMENTS[0]
|
|
expect_exception(spec.compute_kzg_proof, blob, z)
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
yield f'compute_kzg_proof_case_invalid_blob_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'z': encode_hex(z),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid z
|
|
for z in INVALID_FIELD_ELEMENTS:
|
|
blob = VALID_BLOBS[4]
|
|
expect_exception(spec.compute_kzg_proof, blob, z)
|
|
identifier = f'{encode_hex(hash(z))}'
|
|
yield f'compute_kzg_proof_case_invalid_z_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'z': encode_hex(z),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
|
|
def case03_verify_kzg_proof():
|
|
# Valid cases
|
|
for blob in VALID_BLOBS:
|
|
for z in VALID_FIELD_ELEMENTS:
|
|
proof, y = spec.compute_kzg_proof(blob, z)
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
assert spec.verify_kzg_proof(commitment, z, y, proof)
|
|
identifier = f'{encode_hex(hash(blob))}_{encode_hex(z)}'
|
|
yield f'verify_kzg_proof_case_correct_proof_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': True
|
|
}
|
|
|
|
# Incorrect proofs
|
|
for blob in VALID_BLOBS:
|
|
for z in VALID_FIELD_ELEMENTS:
|
|
proof_orig, y = spec.compute_kzg_proof(blob, z)
|
|
proof = bls_add_one(proof_orig)
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
assert not spec.verify_kzg_proof(commitment, z, y, proof)
|
|
identifier = f'{encode_hex(hash(blob))}_{encode_hex(z)}'
|
|
yield f'verify_kzg_proof_case_incorrect_proof_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': False
|
|
}
|
|
|
|
# Incorrect `G1_POINT_AT_INFINITY` proof
|
|
blob = BLOB_RANDOM_VALID1
|
|
for z in VALID_FIELD_ELEMENTS:
|
|
_, y = spec.compute_kzg_proof(blob, z)
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.G1_POINT_AT_INFINITY
|
|
assert not spec.verify_kzg_proof(commitment, z, y, proof)
|
|
prefix = 'verify_kzg_proof_case_incorrect_proof_point_at_infinity'
|
|
identifier = f'{encode_hex(hash(blob))}_{encode_hex(z)}'
|
|
yield f'{prefix}_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': False
|
|
}
|
|
|
|
# Correct `G1_POINT_AT_INFINITY` proof for zero poly
|
|
blob = BLOB_ALL_ZEROS
|
|
for z in VALID_FIELD_ELEMENTS:
|
|
_, y = spec.compute_kzg_proof(blob, z)
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.G1_POINT_AT_INFINITY
|
|
assert spec.verify_kzg_proof(commitment, z, y, proof)
|
|
prefix = 'verify_kzg_proof_case_correct_proof_point_at_infinity_for_zero_poly'
|
|
identifier = f'{encode_hex(hash(blob))}_{encode_hex(z)}'
|
|
yield f'{prefix}_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': True
|
|
}
|
|
|
|
# Correct `G1_POINT_AT_INFINITY` proof for poly of all twos
|
|
blob = BLOB_ALL_TWOS
|
|
for z in VALID_FIELD_ELEMENTS:
|
|
_, y = spec.compute_kzg_proof(blob, z)
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.G1_POINT_AT_INFINITY
|
|
assert spec.verify_kzg_proof(commitment, z, y, proof)
|
|
assert y == field_element_bytes(2)
|
|
prefix = 'verify_kzg_proof_case_correct_proof_point_at_infinity_for_twos_poly'
|
|
identifier = f'{encode_hex(hash(blob))}_{encode_hex(z)}'
|
|
yield f'{prefix}_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': True
|
|
}
|
|
|
|
# Edge case: Invalid commitment
|
|
for commitment in INVALID_G1_POINTS:
|
|
blob, z = VALID_BLOBS[2], VALID_FIELD_ELEMENTS[1]
|
|
proof, y = spec.compute_kzg_proof(blob, z)
|
|
expect_exception(spec.verify_kzg_proof, commitment, z, y, proof)
|
|
identifier = f'{encode_hex(commitment)}'
|
|
yield f'verify_kzg_proof_case_invalid_commitment_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid z
|
|
for z in INVALID_FIELD_ELEMENTS:
|
|
blob, validz = VALID_BLOBS[4], VALID_FIELD_ELEMENTS[1]
|
|
proof, y = spec.compute_kzg_proof(blob, validz)
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
expect_exception(spec.verify_kzg_proof, commitment, z, y, proof)
|
|
identifier = f'{encode_hex(z)}'
|
|
yield f'verify_kzg_proof_case_invalid_z_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid y
|
|
for y in INVALID_FIELD_ELEMENTS:
|
|
blob, z = VALID_BLOBS[4], VALID_FIELD_ELEMENTS[1]
|
|
proof, _ = spec.compute_kzg_proof(blob, z)
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
expect_exception(spec.verify_kzg_proof, commitment, z, y, proof)
|
|
identifier = f'{encode_hex(y)}'
|
|
yield f'verify_kzg_proof_case_invalid_y_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid proof
|
|
for proof in INVALID_G1_POINTS:
|
|
blob, z = VALID_BLOBS[2], VALID_FIELD_ELEMENTS[1]
|
|
_, y = spec.compute_kzg_proof(blob, z)
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
expect_exception(spec.verify_kzg_proof, commitment, z, y, proof)
|
|
identifier = f'{encode_hex(proof)}'
|
|
yield f'verify_kzg_proof_case_invalid_proof_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'commitment': encode_hex(commitment),
|
|
'z': encode_hex(z),
|
|
'y': encode_hex(y),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
|
|
def case04_compute_blob_kzg_proof():
|
|
# Valid cases
|
|
for blob in VALID_BLOBS:
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.compute_blob_kzg_proof(blob, commitment)
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
yield f'compute_blob_kzg_proof_case_valid_blob_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
},
|
|
'output': encode_hex(proof)
|
|
}
|
|
|
|
# Edge case: Invalid blob
|
|
for blob in INVALID_BLOBS:
|
|
commitment = G1
|
|
expect_exception(spec.compute_blob_kzg_proof, blob, commitment)
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
yield f'compute_blob_kzg_proof_case_invalid_blob_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid commitment
|
|
for commitment in INVALID_G1_POINTS:
|
|
blob = VALID_BLOBS[1]
|
|
expect_exception(spec.compute_blob_kzg_proof, blob, commitment)
|
|
identifier = f'{encode_hex(hash(commitment))}'
|
|
yield f'compute_blob_kzg_proof_case_invalid_commitment_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
|
|
def case05_verify_blob_kzg_proof():
|
|
# Valid cases
|
|
for blob in VALID_BLOBS:
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.compute_blob_kzg_proof(blob, commitment)
|
|
assert spec.verify_blob_kzg_proof(blob, commitment, proof)
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
yield f'verify_blob_kzg_proof_case_correct_proof_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': True
|
|
}
|
|
|
|
# Incorrect proofs
|
|
for blob in VALID_BLOBS:
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = bls_add_one(spec.compute_blob_kzg_proof(blob, commitment))
|
|
assert not spec.verify_blob_kzg_proof(blob, commitment, proof)
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
yield f'verify_blob_kzg_proof_case_incorrect_proof_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': False
|
|
}
|
|
|
|
# Incorrect `G1_POINT_AT_INFINITY` proof
|
|
blob = BLOB_RANDOM_VALID1
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.G1_POINT_AT_INFINITY
|
|
assert not spec.verify_blob_kzg_proof(blob, commitment, proof)
|
|
yield 'verify_blob_kzg_proof_case_incorrect_proof_point_at_infinity', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': False
|
|
}
|
|
|
|
# Correct `G1_POINT_AT_INFINITY` proof and commitment for zero poly
|
|
blob = BLOB_ALL_ZEROS
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.G1_POINT_AT_INFINITY
|
|
assert commitment == spec.G1_POINT_AT_INFINITY
|
|
assert spec.verify_blob_kzg_proof(blob, commitment, proof)
|
|
yield 'verify_blob_kzg_proof_case_correct_proof_point_at_infinity_for_zero_poly', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': True
|
|
}
|
|
|
|
# Correct `G1_POINT_AT_INFINITY` proof for all twos poly
|
|
blob = BLOB_ALL_TWOS
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.G1_POINT_AT_INFINITY
|
|
assert commitment != spec.G1_POINT_AT_INFINITY
|
|
assert spec.verify_blob_kzg_proof(blob, commitment, proof)
|
|
yield 'verify_blob_kzg_proof_case_correct_proof_point_at_infinity_for_twos_poly', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': True
|
|
}
|
|
|
|
# Edge case: Invalid blob
|
|
for blob in INVALID_BLOBS:
|
|
proof = G1
|
|
commitment = G1
|
|
expect_exception(spec.verify_blob_kzg_proof, blob, commitment, proof)
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
yield f'verify_blob_kzg_proof_case_invalid_blob_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid commitment
|
|
for commitment in INVALID_G1_POINTS:
|
|
blob = VALID_BLOBS[1]
|
|
proof = G1
|
|
expect_exception(spec.verify_blob_kzg_proof, blob, commitment, proof)
|
|
identifier = f'{encode_hex(hash(commitment))}'
|
|
yield f'verify_blob_kzg_proof_case_invalid_commitment_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid proof
|
|
for proof in INVALID_G1_POINTS:
|
|
blob = VALID_BLOBS[1]
|
|
commitment = G1
|
|
expect_exception(spec.verify_blob_kzg_proof, blob, commitment, proof)
|
|
identifier = f'{encode_hex(hash(proof))}'
|
|
yield f'verify_blob_kzg_proof_case_invalid_proof_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blob': encode_hex(blob),
|
|
'commitment': encode_hex(commitment),
|
|
'proof': encode_hex(proof),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
|
|
def case06_verify_blob_kzg_proof_batch():
|
|
# Valid cases
|
|
proofs = []
|
|
commitments = []
|
|
for blob in VALID_BLOBS:
|
|
commitments.append(spec.blob_to_kzg_commitment(blob))
|
|
proofs.append(spec.compute_blob_kzg_proof(blob, commitments[-1]))
|
|
|
|
for i in range(len(proofs)):
|
|
assert spec.verify_blob_kzg_proof_batch(VALID_BLOBS[:i], commitments[:i], proofs[:i])
|
|
identifier = f'{encode_hex(hash(b"".join(VALID_BLOBS[:i])))}'
|
|
yield f'verify_blob_kzg_proof_batch_case_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blobs': encode_hex_list(VALID_BLOBS[:i]),
|
|
'commitments': encode_hex_list(commitments[:i]),
|
|
'proofs': encode_hex_list(proofs[:i]),
|
|
},
|
|
'output': True
|
|
}
|
|
|
|
# Incorrect proof
|
|
proofs_incorrect = [bls_add_one(proofs[0])] + proofs[1:]
|
|
assert not spec.verify_blob_kzg_proof_batch(VALID_BLOBS, commitments, proofs_incorrect)
|
|
yield 'verify_blob_kzg_proof_batch_case_incorrect_proof_add_one', {
|
|
'input': {
|
|
'blobs': encode_hex_list(VALID_BLOBS),
|
|
'commitments': encode_hex_list(commitments),
|
|
'proofs': encode_hex_list(proofs_incorrect),
|
|
},
|
|
'output': False
|
|
}
|
|
|
|
# Incorrect `G1_POINT_AT_INFINITY` proof
|
|
blob = BLOB_RANDOM_VALID1
|
|
commitment = spec.blob_to_kzg_commitment(blob)
|
|
proof = spec.G1_POINT_AT_INFINITY
|
|
assert not spec.verify_blob_kzg_proof_batch([blob], [commitment], [proof])
|
|
yield 'verify_blob_kzg_proof_batch_case_incorrect_proof_point_at_infinity', {
|
|
'input': {
|
|
'blobs': encode_hex_list([blob]),
|
|
'commitments': encode_hex_list([commitment]),
|
|
'proofs': encode_hex_list([proof]),
|
|
},
|
|
'output': False
|
|
}
|
|
|
|
# Edge case: Invalid blobs
|
|
for blob in INVALID_BLOBS:
|
|
blobs_invalid = VALID_BLOBS[:4] + [blob] + VALID_BLOBS[5:]
|
|
expect_exception(spec.verify_blob_kzg_proof_batch, blobs_invalid, commitments, proofs)
|
|
identifier = f'{encode_hex(hash(blob))}'
|
|
yield f'verify_blob_kzg_proof_batch_case_invalid_blob_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blobs': encode_hex_list(blobs_invalid),
|
|
'commitments': encode_hex_list(commitments),
|
|
'proofs': encode_hex_list(proofs),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid commitment
|
|
for commitment in INVALID_G1_POINTS:
|
|
blobs = VALID_BLOBS
|
|
commitments_invalid = [commitment] + commitments[1:]
|
|
expect_exception(spec.verify_blob_kzg_proof_batch, blobs, commitments_invalid, proofs)
|
|
identifier = f'{encode_hex(hash(commitment))}'
|
|
yield f'verify_blob_kzg_proof_batch_case_invalid_commitment_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blobs': encode_hex_list(blobs),
|
|
'commitments': encode_hex_list(commitments_invalid),
|
|
'proofs': encode_hex_list(proofs),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Invalid proof
|
|
for proof in INVALID_G1_POINTS:
|
|
blobs = VALID_BLOBS
|
|
proofs_invalid = [proof] + proofs[1:]
|
|
expect_exception(spec.verify_blob_kzg_proof_batch, blobs, commitments, proofs_invalid)
|
|
identifier = f'{encode_hex(hash(proof))}'
|
|
yield f'verify_blob_kzg_proof_batch_case_invalid_proof_{(hash(bytes(identifier, "utf-8"))[:8]).hex()}', {
|
|
'input': {
|
|
'blobs': encode_hex_list(blobs),
|
|
'commitments': encode_hex_list(commitments),
|
|
'proofs': encode_hex_list(proofs_invalid),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Blob length different
|
|
expect_exception(spec.verify_blob_kzg_proof_batch, VALID_BLOBS[:-1], commitments, proofs)
|
|
yield 'verify_blob_kzg_proof_batch_case_blob_length_different', {
|
|
'input': {
|
|
'blobs': encode_hex_list(VALID_BLOBS[:-1]),
|
|
'commitments': encode_hex_list(commitments),
|
|
'proofs': encode_hex_list(proofs),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Commitment length different
|
|
expect_exception(spec.verify_blob_kzg_proof_batch, VALID_BLOBS, commitments[:-1], proofs)
|
|
yield 'verify_blob_kzg_proof_batch_case_commitment_length_different', {
|
|
'input': {
|
|
'blobs': encode_hex_list(VALID_BLOBS),
|
|
'commitments': encode_hex_list(commitments[:-1]),
|
|
'proofs': encode_hex_list(proofs),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
# Edge case: Proof length different
|
|
expect_exception(spec.verify_blob_kzg_proof_batch, VALID_BLOBS, commitments, proofs[:-1])
|
|
yield 'verify_blob_kzg_proof_batch_case_proof_length_different', {
|
|
'input': {
|
|
'blobs': encode_hex_list(VALID_BLOBS),
|
|
'commitments': encode_hex_list(commitments),
|
|
'proofs': encode_hex_list(proofs[:-1]),
|
|
},
|
|
'output': None
|
|
}
|
|
|
|
|
|
def create_provider(fork_name: SpecForkName,
|
|
handler_name: str,
|
|
test_case_fn: Callable[[], Iterable[Tuple[str, Dict[str, Any]]]]) -> gen_typing.TestProvider:
|
|
|
|
def prepare_fn() -> None:
|
|
# Nothing to load / change in spec. Maybe in future forks.
|
|
# Put the tests into the general config category, to not require any particular configuration.
|
|
return
|
|
|
|
def cases_fn() -> Iterable[gen_typing.TestCase]:
|
|
for data in test_case_fn():
|
|
(case_name, case_content) = data
|
|
yield gen_typing.TestCase(
|
|
fork_name=fork_name,
|
|
preset_name='general',
|
|
runner_name='kzg',
|
|
handler_name=handler_name,
|
|
suite_name='kzg-mainnet',
|
|
case_name=case_name,
|
|
case_fn=lambda: [('data', 'data', case_content)]
|
|
)
|
|
|
|
return gen_typing.TestProvider(prepare=prepare_fn, make_cases=cases_fn)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
bls.use_arkworks()
|
|
gen_runner.run_generator("kzg", [
|
|
# DENEB
|
|
create_provider(DENEB, 'blob_to_kzg_commitment', case01_blob_to_kzg_commitment),
|
|
create_provider(DENEB, 'compute_kzg_proof', case02_compute_kzg_proof),
|
|
create_provider(DENEB, 'verify_kzg_proof', case03_verify_kzg_proof),
|
|
create_provider(DENEB, 'compute_blob_kzg_proof', case04_compute_blob_kzg_proof),
|
|
create_provider(DENEB, 'verify_blob_kzg_proof', case05_verify_blob_kzg_proof),
|
|
create_provider(DENEB, 'verify_blob_kzg_proof_batch', case06_verify_blob_kzg_proof_batch),
|
|
])
|