nomos-specs/da/common.py
Daniel Sanchez 0e142c0888
Added verification to certificate (#82)
* Added verification method to certificate

* Update da/common.py

typo short -> sort

Co-authored-by: gusto <bacv@users.noreply.github.com>

* Fix test imports

* Added verification comment

---------

Co-authored-by: gusto <bacv@users.noreply.github.com>
2024-03-15 11:34:43 +01:00

75 lines
2.1 KiB
Python

from dataclasses import dataclass
from hashlib import sha3_256
from itertools import chain, zip_longest, compress
from typing import List, Generator, Self, Sequence
from eth2spec.eip7594.mainnet import Bytes32, KZGCommitment as Commitment
from py_ecc.bls import G2ProofOfPossession as bls_pop
class NodeId(Bytes32):
pass
class Chunk(Bytes32):
pass
class Column(List[Bytes32]):
def as_bytes(self) -> bytes:
return bytes(chain.from_iterable(self))
class Row(List[Bytes32]):
def as_bytes(self) -> bytes:
return bytes(chain.from_iterable(self))
class ChunksMatrix(List[Row | Column]):
@property
def columns(self) -> Generator[List[Chunk], None, None]:
yield from map(Column, zip_longest(*self, fillvalue=b""))
def transposed(self) -> Self:
return ChunksMatrix(self.columns)
BLSPublickey = bytes
BLSPrivateKey = int
BLSSignature = bytes
class Bitfield(List[bool]):
pass
@dataclass
class Attestation:
signature: BLSSignature
@dataclass
class Certificate:
aggregated_signatures: BLSSignature
signers: Bitfield
aggregated_column_commitment: Commitment
row_commitments: List[Commitment]
def verify(self, nodes_public_keys: List[BLSPublickey]) -> bool:
"""
List of nodes public keys should be a trusted list of verified proof of possession keys.
Otherwise, we could fall under the Rogue Key Attack
`assert all(bls_pop.PopVerify(pk, proof) for pk, proof in zip(node_public_keys, pops))`
"""
# we sort them as the signers bitfield is sorted by the public keys as well
signers_keys = list(compress(sorted(nodes_public_keys), self.signers))
message = build_attestation_message(self.aggregated_column_commitment, self.row_commitments)
return bls_pop.AggregateVerify(signers_keys, [message]*len(signers_keys), self.aggregated_signatures)
def build_attestation_message(aggregated_column_commitment: Commitment, row_commitments: Sequence[Commitment]) -> bytes:
hasher = sha3_256()
hasher.update(bytes(aggregated_column_commitment))
for c in row_commitments:
hasher.update(bytes(c))
return hasher.digest()