nomos-specs/da/verifier.py

116 lines
4.4 KiB
Python
Raw Permalink Normal View History

from dataclasses import dataclass
from hashlib import sha3_256
from typing import List, Optional, Sequence, Set, Dict
from eth2spec.deneb.mainnet import BLSFieldElement
from eth2spec.eip7594.mainnet import (
KZGCommitment as Commitment,
KZGProof as Proof,
)
from py_ecc.bls import G2ProofOfPossession as bls_pop
import da.common
from da.common import Column, Chunk, Attestation, BLSPrivateKey, BLSPublicKey
from da.encoder import DAEncoder
from da.kzg_rs import kzg
from da.kzg_rs.common import ROOTS_OF_UNITY, GLOBAL_PARAMETERS, BLS_MODULUS
@dataclass
class DABlob:
column: Column
column_commitment: Commitment
aggregated_column_commitment: Commitment
aggregated_column_proof: Proof
rows_commitments: List[Commitment]
rows_proofs: List[Proof]
def id(self) -> bytes:
return da.common.build_attestation_message(self.aggregated_column_commitment, self.rows_commitments)
def column_id(self) -> bytes:
return sha3_256(self.column.as_bytes()).digest()
class DAVerifier:
def __init__(self, sk: BLSPrivateKey, nodes_pks: List[BLSPublicKey]):
self.attested_blobs: Dict[bytes, (bytes, Attestation)] = dict()
self.sk = sk
self.index = nodes_pks.index(bls_pop.SkToPk(self.sk))
@staticmethod
def _verify_column(
column: Column,
column_commitment: Commitment,
aggregated_column_commitment: Commitment,
aggregated_column_proof: Proof,
index: int
) -> bool:
# 1. compute commitment for column
_, computed_column_commitment = kzg.bytes_to_commitment(column.as_bytes(), GLOBAL_PARAMETERS)
# 2. If computed column commitment != column commitment, fail
if column_commitment != computed_column_commitment:
return False
# 3. compute column hash
column_hash = DAEncoder.hash_column_and_commitment(column, column_commitment)
# 4. Check proof with commitment and proof over the aggregated column commitment
chunk = BLSFieldElement.from_bytes(column_hash)
return kzg.verify_element_proof(
chunk, aggregated_column_commitment, aggregated_column_proof, index, ROOTS_OF_UNITY
)
@staticmethod
def _verify_chunk(chunk: Chunk, commitment: Commitment, proof: Proof, index: int) -> bool:
chunk = BLSFieldElement(int.from_bytes(bytes(chunk)) % BLS_MODULUS)
return kzg.verify_element_proof(chunk, commitment, proof, index, ROOTS_OF_UNITY)
@staticmethod
def _verify_chunks(
chunks: Sequence[Chunk],
commitments: Sequence[Commitment],
proofs: Sequence[Proof],
index: int
) -> bool:
if not (len(chunks) == len(commitments) == len(proofs)):
return False
for chunk, commitment, proof in zip(chunks, commitments, proofs):
if not DAVerifier._verify_chunk(chunk, commitment, proof, index):
return False
return True
def _build_attestation(self, blob: DABlob) -> Attestation:
hasher = sha3_256()
hasher.update(bytes(blob.aggregated_column_commitment))
for c in blob.rows_commitments:
hasher.update(bytes(c))
message = hasher.digest()
return Attestation(signature=bls_pop.Sign(self.sk, message))
def verify(self, blob: DABlob) -> Optional[Attestation]:
blob_id = blob.id()
if previous_attestation := self.attested_blobs.get(blob_id):
column_id, attestation = previous_attestation
# we already attested, is cached so we return it
if column_id == blob.column_id():
return attestation
# we already attested and they are asking us to attest the same data different column
# skip
return None
is_column_verified = DAVerifier._verify_column(
blob.column,
blob.column_commitment,
blob.aggregated_column_commitment,
blob.aggregated_column_proof,
self.index
)
if not is_column_verified:
return
are_chunks_verified = DAVerifier._verify_chunks(
blob.column, blob.rows_commitments, blob.rows_proofs, self.index
)
if not are_chunks_verified:
return
attestation = self._build_attestation(blob)
self.attested_blobs[blob_id] = (blob.column_id(), attestation)
return attestation