From faf399eaaf285c1b820c2aef9b7f8dcc053562e3 Mon Sep 17 00:00:00 2001 From: holisticode Date: Fri, 19 Jul 2024 13:19:35 -0500 Subject: [PATCH] moved from incorrect location at nomos-specs --- da/__init__.py | 0 da/api/__init__.py | 0 da/api/common.py | 58 ++++ da/api/test_flow.py | 97 +++++++ da/common.py | 82 ++++++ da/dispersal.py | 90 ++++++ da/encoder.py | 136 +++++++++ da/kzg_rs/__init__.py | 0 da/kzg_rs/common.py | 22 ++ da/kzg_rs/fft.py | 65 +++++ da/kzg_rs/fk20.py | 75 +++++ da/kzg_rs/kzg.py | 74 +++++ da/kzg_rs/poly.py | 111 +++++++ da/kzg_rs/roots.py | 25 ++ da/kzg_rs/rs.py | 40 +++ da/kzg_rs/test_fft.py | 14 + da/kzg_rs/test_fk20.py | 28 ++ da/kzg_rs/test_kzg.py | 67 +++++ da/kzg_rs/test_rs.py | 18 ++ da/kzg_rs/trusted_setup.py | 48 ++++ da/kzg_rs/utils.py | 5 + da/network/__init__.py | 0 .../__pycache__/constants.cpython-312.pyc | Bin 0 -> 582 bytes .../__pycache__/executor.cpython-312.pyc | Bin 0 -> 5214 bytes .../__pycache__/network.cpython-312.pyc | Bin 0 -> 1564 bytes da/network/__pycache__/node.cpython-312.pyc | Bin 0 -> 7280 bytes da/network/__pycache__/subnet.cpython-312.pyc | Bin 0 -> 2138 bytes da/network/constants.py | 21 ++ da/network/dispersal/README.md | 76 +++++ da/network/dispersal/__init__.py | 0 .../__pycache__/__init__.cpython-312.pyc | Bin 0 -> 164 bytes .../__pycache__/dispersal_pb2.cpython-312.pyc | Bin 0 -> 3823 bytes .../__pycache__/proto.cpython-312.pyc | Bin 0 -> 6143 bytes da/network/dispersal/dispersal.proto | 87 ++++++ da/network/dispersal/dispersal_pb2.py | 60 ++++ da/network/dispersal/mock_system.py | 123 ++++++++ da/network/dispersal/proto.py | 126 ++++++++ da/network/dispersal/test_proto_helpers.py | 75 +++++ da/network/executor.py | 106 +++++++ da/network/network.py | 35 +++ da/network/node.py | 129 +++++++++ da/network/poc.py | 271 ++++++++++++++++++ da/network/readme.md | 65 +++++ da/network/subnet.py | 65 +++++ da/test_common.py | 20 ++ da/test_dispersal.py | 74 +++++ da/test_encoder.py | 137 +++++++++ da/test_full_flow.py | 134 +++++++++ da/test_verifier.py | 71 +++++ da/verifier.py | 114 ++++++++ 50 files changed, 2844 insertions(+) create mode 100644 da/__init__.py create mode 100644 da/api/__init__.py create mode 100644 da/api/common.py create mode 100644 da/api/test_flow.py create mode 100644 da/common.py create mode 100644 da/dispersal.py create mode 100644 da/encoder.py create mode 100644 da/kzg_rs/__init__.py create mode 100644 da/kzg_rs/common.py create mode 100644 da/kzg_rs/fft.py create mode 100644 da/kzg_rs/fk20.py create mode 100644 da/kzg_rs/kzg.py create mode 100644 da/kzg_rs/poly.py create mode 100644 da/kzg_rs/roots.py create mode 100644 da/kzg_rs/rs.py create mode 100644 da/kzg_rs/test_fft.py create mode 100644 da/kzg_rs/test_fk20.py create mode 100644 da/kzg_rs/test_kzg.py create mode 100644 da/kzg_rs/test_rs.py create mode 100644 da/kzg_rs/trusted_setup.py create mode 100644 da/kzg_rs/utils.py create mode 100644 da/network/__init__.py create mode 100644 da/network/__pycache__/constants.cpython-312.pyc create mode 100644 da/network/__pycache__/executor.cpython-312.pyc create mode 100644 da/network/__pycache__/network.cpython-312.pyc create mode 100644 da/network/__pycache__/node.cpython-312.pyc create mode 100644 da/network/__pycache__/subnet.cpython-312.pyc create mode 100644 da/network/constants.py create mode 100644 da/network/dispersal/README.md create mode 100644 da/network/dispersal/__init__.py create mode 100644 da/network/dispersal/__pycache__/__init__.cpython-312.pyc create mode 100644 da/network/dispersal/__pycache__/dispersal_pb2.cpython-312.pyc create mode 100644 da/network/dispersal/__pycache__/proto.cpython-312.pyc create mode 100644 da/network/dispersal/dispersal.proto create mode 100644 da/network/dispersal/dispersal_pb2.py create mode 100644 da/network/dispersal/mock_system.py create mode 100644 da/network/dispersal/proto.py create mode 100644 da/network/dispersal/test_proto_helpers.py create mode 100644 da/network/executor.py create mode 100644 da/network/network.py create mode 100644 da/network/node.py create mode 100644 da/network/poc.py create mode 100644 da/network/readme.md create mode 100644 da/network/subnet.py create mode 100644 da/test_common.py create mode 100644 da/test_dispersal.py create mode 100644 da/test_encoder.py create mode 100644 da/test_full_flow.py create mode 100644 da/test_verifier.py create mode 100644 da/verifier.py diff --git a/da/__init__.py b/da/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/da/api/__init__.py b/da/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/da/api/common.py b/da/api/common.py new file mode 100644 index 0000000..6a17ff7 --- /dev/null +++ b/da/api/common.py @@ -0,0 +1,58 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional, List, Sequence + +from da.common import Certificate +from da.verifier import DABlob + + +@dataclass +class Metadata: + # app identifier + app_id: bytes + # index of VID certificate blob + index: int + + +@dataclass +class VID: + # da certificate id + cert_id: bytes + # application + index information + metadata: Metadata + + +class BlobStore(ABC): + @abstractmethod + def add(self, certificate: Certificate, metadata: Metadata): + """ + Raises: ValueError if there is already a registered certificate fot the given metadata + """ + pass + + @abstractmethod + def get_multiple(self, app_id: bytes, indexes: Sequence[int]) -> List[Optional[DABlob]]: + pass + + +class DAApi: + def __init__(self, bs: BlobStore): + self.store = bs + + def write(self, certificate: Certificate, metadata: Metadata): + """ + Write method should be used by a service that is able to retrieve verified certificates + from the latest Block. Once a certificate is retrieved, api creates a relation between + the blob of an original data, certificate and index for the app_id of the certificate. + Raises: ValueError if there is already a registered certificate for a given metadata + """ + self.store.add(certificate, metadata) + + def read(self, app_id, indexes) -> List[Optional[DABlob]]: + """ + Read method should accept only `app_id` and a list of indexes. The returned list of + blobs should be ordered in the same sequence as `indexes` in a request. + If node does not have the blob for some indexes, then it should add None object as an + item. + """ + return self.store.get_multiple(app_id, indexes) diff --git a/da/api/test_flow.py b/da/api/test_flow.py new file mode 100644 index 0000000..201d1b8 --- /dev/null +++ b/da/api/test_flow.py @@ -0,0 +1,97 @@ +from unittest import TestCase +from collections import defaultdict + +from da.api.common import * + + +@dataclass +class MockCertificate: + cert_id: int + + +class MockStore(BlobStore): + def __init__(self): + self.blob_store = {} + self.app_id_store = defaultdict(dict) + + def populate(self, blob, cert_id: bytes): + self.blob_store[cert_id] = blob + + # Implements `add` method from BlobStore abstract class. + def add(self, cert_id: bytes, metadata: Metadata): + if metadata.index in self.app_id_store[metadata.app_id]: + raise ValueError("index already written") + + self.app_id_store[metadata.app_id][metadata.index] = cert_id + + # Implements `get_multiple` method from BlobStore abstract class. + def get_multiple(self, app_id, indexes) -> List[Optional[DABlob]]: + return [ + self.blob_store.get(self.app_id_store[app_id].get(i), None) if self.app_id_store[app_id].get(i) else None for i in indexes + ] + + + +class TestFlow(TestCase): + def test_api_write_read(self): + expected_blob = "hello" + cert_id = b"11"*32 + app_id = 1 + idx = 1 + mock_meta = Metadata(1, 1) + + mock_store = MockStore() + mock_store.populate(expected_blob, cert_id) + + api = DAApi(mock_store) + + api.write(cert_id, mock_meta) + blobs = api.read(app_id, [idx]) + + self.assertEqual([expected_blob], blobs) + + def test_same_index(self): + expected_blob = "hello" + cert_id = b"11"*32 + app_id = 1 + idx = 1 + mock_meta = Metadata(1, 1) + + mock_store = MockStore() + mock_store.populate(expected_blob, cert_id) + + api = DAApi(mock_store) + + api.write(cert_id, mock_meta) + with self.assertRaises(ValueError): + api.write(cert_id, mock_meta) + + blobs = api.read(app_id, [idx]) + + self.assertEqual([expected_blob], blobs) + + def test_multiple_indexes_same_data(self): + expected_blob = "hello" + cert_id = b"11"*32 + app_id = 1 + idx1 = 1 + idx2 = 2 + mock_meta1 = Metadata(app_id, idx1) + mock_meta2 = Metadata(app_id, idx2) + + mock_store = MockStore() + mock_store.populate(expected_blob, cert_id) + + api = DAApi(mock_store) + + api.write(cert_id, mock_meta1) + mock_store.populate(expected_blob, cert_id) + api.write(cert_id, mock_meta2) + + blobs_idx1 = api.read(app_id, [idx1]) + blobs_idx2 = api.read(app_id, [idx2]) + + self.assertEqual([expected_blob], blobs_idx1) + self.assertEqual([expected_blob], blobs_idx2) + self.assertEqual(mock_store.app_id_store[app_id][idx1], mock_store.app_id_store[app_id][idx2]) + diff --git a/da/common.py b/da/common.py new file mode 100644 index 0000000..af21514 --- /dev/null +++ b/da/common.py @@ -0,0 +1,82 @@ +from dataclasses import dataclass +from hashlib import sha3_256 +from itertools import chain, zip_longest, compress +from typing import List, Generator, Self, Sequence + +from eth2spec.eip7594.mainnet import Bytes32, KZGCommitment as Commitment +from py_ecc.bls import G2ProofOfPossession + + +class NodeId(Bytes32): + pass + + +class Chunk(bytes): + pass + + +class Column(List[Bytes32]): + def as_bytes(self) -> bytes: + return bytes(chain.from_iterable(self)) + + +class Row(List[Bytes32]): + def as_bytes(self) -> bytes: + return bytes(chain.from_iterable(self)) + + +class ChunksMatrix(List[Row | Column]): + @property + def columns(self) -> Generator[List[Chunk], None, None]: + yield from map(Column, zip_longest(*self, fillvalue=b"")) + + def transposed(self) -> Self: + return ChunksMatrix(self.columns) + + +BLSPublicKey = bytes +BLSPrivateKey = int +BLSSignature = bytes + + +class Bitfield(List[bool]): + pass + + +@dataclass +class Attestation: + signature: BLSSignature + + +@dataclass +class Certificate: + aggregated_signatures: BLSSignature + signers: Bitfield + aggregated_column_commitment: Commitment + row_commitments: List[Commitment] + + def id(self) -> bytes: + return build_attestation_message(self.aggregated_column_commitment, self.row_commitments) + + def verify(self, nodes_public_keys: List[BLSPublicKey]) -> bool: + """ + List of nodes public keys should be a trusted list of verified proof of possession keys. + Otherwise, we could fall under the Rogue Key Attack + `assert all(bls_pop.PopVerify(pk, proof) for pk, proof in zip(node_public_keys, pops))` + """ + # we sort them as the signers bitfield is sorted by the public keys as well + signers_keys = list(compress(sorted(nodes_public_keys), self.signers)) + message = build_attestation_message(self.aggregated_column_commitment, self.row_commitments) + return NomosDaG2ProofOfPossession.AggregateVerify(signers_keys, [message]*len(signers_keys), self.aggregated_signatures) + + +def build_attestation_message(aggregated_column_commitment: Commitment, row_commitments: Sequence[Commitment]) -> bytes: + hasher = sha3_256() + hasher.update(bytes(aggregated_column_commitment)) + for c in row_commitments: + hasher.update(bytes(c)) + return hasher.digest() + +class NomosDaG2ProofOfPossession(G2ProofOfPossession): + # Domain specific tag for Nomos DA protocol + DST = b"NOMOS_DA_AVAIL" diff --git a/da/dispersal.py b/da/dispersal.py new file mode 100644 index 0000000..4cd1067 --- /dev/null +++ b/da/dispersal.py @@ -0,0 +1,90 @@ +from dataclasses import dataclass +from hashlib import sha3_256 +from typing import List, Optional, Generator, Sequence + +from da.common import Certificate, NodeId, BLSPublicKey, Bitfield, build_attestation_message, NomosDaG2ProofOfPossession as bls_pop +from da.encoder import EncodedData +from da.verifier import DABlob, Attestation + + +@dataclass +class DispersalSettings: + nodes_ids: List[NodeId] + nodes_pubkey: List[BLSPublicKey] + threshold: int + + +class Dispersal: + def __init__(self, settings: DispersalSettings): + self.settings = settings + # sort over public keys + self.settings.nodes_ids, self.settings.nodes_pubkey = zip( + *sorted(zip(self.settings.nodes_ids, self.settings.nodes_pubkey), key=lambda x: x[1]) + ) + + def _prepare_data(self, encoded_data: EncodedData) -> Generator[DABlob, None, None]: + assert len(encoded_data.column_commitments) == len(self.settings.nodes_ids) + assert len(encoded_data.aggregated_column_proofs) == len(self.settings.nodes_ids) + columns = encoded_data.extended_matrix.columns + column_commitments = encoded_data.column_commitments + row_commitments = encoded_data.row_commitments + rows_proofs = encoded_data.row_proofs + aggregated_column_commitment = encoded_data.aggregated_column_commitment + aggregated_column_proofs = encoded_data.aggregated_column_proofs + blobs_data = zip(columns, column_commitments, zip(*rows_proofs), aggregated_column_proofs) + for (column, column_commitment, row_proofs, column_proof) in blobs_data: + blob = DABlob( + column, + column_commitment, + aggregated_column_commitment, + column_proof, + row_commitments, + row_proofs + ) + yield blob + + def _send_and_await_response(self, node: NodeId, blob: DABlob) -> Optional[Attestation]: + pass + + def _build_certificate( + self, + encoded_data: EncodedData, + attestations: Sequence[Attestation], + signers: Bitfield + ) -> Certificate: + assert len(attestations) >= self.settings.threshold + assert len(attestations) == signers.count(True) + aggregated = bls_pop.Aggregate([attestation.signature for attestation in attestations]) + return Certificate( + aggregated_signatures=aggregated, + signers=signers, + aggregated_column_commitment=encoded_data.aggregated_column_commitment, + row_commitments=encoded_data.row_commitments + ) + + @staticmethod + def _verify_attestation(public_key: BLSPublicKey, attested_message: bytes, attestation: Attestation) -> bool: + return bls_pop.Verify(public_key, attested_message, attestation.signature) + + @staticmethod + def _build_attestation_message(encoded_data: EncodedData) -> bytes: + return build_attestation_message(encoded_data.aggregated_column_commitment, encoded_data.row_commitments) + + def disperse(self, encoded_data: EncodedData) -> Optional[Certificate]: + attestations = [] + attested_message = self._build_attestation_message(encoded_data) + signed = Bitfield(False for _ in range(len(self.settings.nodes_ids))) + blob_data = zip( + range(len(self.settings.nodes_ids)), + self.settings.nodes_ids, + self.settings.nodes_pubkey, + self._prepare_data(encoded_data) + ) + for i, node, pk, blob in blob_data: + if attestation := self._send_and_await_response(node, blob): + if self._verify_attestation(pk, attested_message, attestation): + # mark as received + signed[i] = True + attestations.append(attestation) + if len(attestations) >= self.settings.threshold: + return self._build_certificate(encoded_data, attestations, signed) diff --git a/da/encoder.py b/da/encoder.py new file mode 100644 index 0000000..7c46091 --- /dev/null +++ b/da/encoder.py @@ -0,0 +1,136 @@ +from dataclasses import dataclass +from itertools import batched, chain +from typing import List, Sequence, Tuple +from hashlib import blake2b + +from eth2spec.eip7594.mainnet import KZGCommitment as Commitment, KZGProof as Proof, BLSFieldElement + +from da.common import ChunksMatrix, Chunk, Row, Column +from da.kzg_rs import kzg, rs +from da.kzg_rs.common import GLOBAL_PARAMETERS, ROOTS_OF_UNITY, BLS_MODULUS, BYTES_PER_FIELD_ELEMENT +from da.kzg_rs.poly import Polynomial + + +@dataclass +class DAEncoderParams: + column_count: int + bytes_per_chunk: int + + +@dataclass +class EncodedData: + data: bytes + chunked_data: ChunksMatrix + extended_matrix: ChunksMatrix + row_commitments: List[Commitment] + row_proofs: List[List[Proof]] + column_commitments: List[Commitment] + aggregated_column_commitment: Commitment + aggregated_column_proofs: List[Proof] + + +class DAEncoder: + def __init__(self, params: DAEncoderParams): + # we can only encode up to 31 bytes per element which fits without problem in a 32 byte element + assert params.bytes_per_chunk < BYTES_PER_FIELD_ELEMENT + self.params = params + + def _chunkify_data(self, data: bytes) -> ChunksMatrix: + size: int = self.params.column_count * self.params.bytes_per_chunk + return ChunksMatrix( + Row(Chunk(int.from_bytes(chunk, byteorder="big").to_bytes(length=BYTES_PER_FIELD_ELEMENT)) + for chunk in batched(b, self.params.bytes_per_chunk) + ) + for b in batched(data, size) + ) + + def _compute_row_kzg_commitments(self, matrix: ChunksMatrix) -> List[Tuple[Polynomial, Commitment]]: + return [ + kzg.bytes_to_commitment( + row.as_bytes(), + GLOBAL_PARAMETERS, + ) + for row in matrix + ] + + def _rs_encode_rows(self, chunks_matrix: ChunksMatrix) -> ChunksMatrix: + def __rs_encode_row(row: Row) -> Row: + polynomial = kzg.bytes_to_polynomial(row.as_bytes()) + return Row( + Chunk(BLSFieldElement.to_bytes( + x, + # fixed to 32 bytes as bls_field_elements are 32bytes (256bits) encoded + length=32, byteorder="big" + )) for x in rs.encode(polynomial, 2, ROOTS_OF_UNITY) + ) + return ChunksMatrix(__rs_encode_row(row) for row in chunks_matrix) + + @staticmethod + def _compute_rows_proofs( + chunks_matrix: ChunksMatrix, + polynomials: Sequence[Polynomial], + row_commitments: Sequence[Commitment] + ) -> List[List[Proof]]: + proofs = [] + for row, poly, commitment in zip(chunks_matrix, polynomials, row_commitments): + proofs.append( + [ + kzg.generate_element_proof(i, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY) + for i in range(len(row)) + ] + ) + return proofs + + def _compute_column_kzg_commitments(self, chunks_matrix: ChunksMatrix) -> List[Tuple[Polynomial, Commitment]]: + return self._compute_row_kzg_commitments(chunks_matrix.transposed()) + + @staticmethod + def _compute_aggregated_column_commitment( + chunks_matrix: ChunksMatrix, column_commitments: Sequence[Commitment] + ) -> Tuple[Polynomial, Commitment]: + data = bytes(chain.from_iterable( + DAEncoder.hash_column_and_commitment(column, commitment) + for column, commitment in zip(chunks_matrix.columns, column_commitments) + )) + return kzg.bytes_to_commitment(data, GLOBAL_PARAMETERS) + + @staticmethod + def _compute_aggregated_column_proofs( + polynomial: Polynomial, + column_commitments: Sequence[Commitment], + ) -> List[Proof]: + return [ + kzg.generate_element_proof(i, polynomial, GLOBAL_PARAMETERS, ROOTS_OF_UNITY) + for i in range(len(column_commitments)) + ] + + def encode(self, data: bytes) -> EncodedData: + chunks_matrix = self._chunkify_data(data) + row_polynomials, row_commitments = zip(*self._compute_row_kzg_commitments(chunks_matrix)) + extended_matrix = self._rs_encode_rows(chunks_matrix) + row_proofs = self._compute_rows_proofs(extended_matrix, row_polynomials, row_commitments) + column_polynomials, column_commitments = zip(*self._compute_column_kzg_commitments(extended_matrix)) + aggregated_column_polynomial, aggregated_column_commitment = ( + self._compute_aggregated_column_commitment(extended_matrix, column_commitments) + ) + aggregated_column_proofs = self._compute_aggregated_column_proofs( + aggregated_column_polynomial, column_commitments + ) + result = EncodedData( + data, + chunks_matrix, + extended_matrix, + row_commitments, + row_proofs, + column_commitments, + aggregated_column_commitment, + aggregated_column_proofs + ) + return result + + @staticmethod + def hash_column_and_commitment(column: Column, commitment: Commitment) -> bytes: + return ( + # digest size must be 31 bytes as we cannot encode 32 without risking overflowing the BLS_MODULUS + int.from_bytes(blake2b(column.as_bytes() + bytes(commitment), digest_size=31).digest()) + ).to_bytes(32, byteorder="big") diff --git a/da/kzg_rs/__init__.py b/da/kzg_rs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/da/kzg_rs/common.py b/da/kzg_rs/common.py new file mode 100644 index 0000000..7c12604 --- /dev/null +++ b/da/kzg_rs/common.py @@ -0,0 +1,22 @@ +from typing import List, Tuple + +import eth2spec.eip7594.mainnet +from py_ecc.bls.typing import G1Uncompressed, G2Uncompressed + +from da.kzg_rs.roots import compute_roots_of_unity +from da.kzg_rs.trusted_setup import generate_setup + +G1 = G1Uncompressed +G2 = G2Uncompressed + + +BYTES_PER_FIELD_ELEMENT = 32 +BLS_MODULUS = eth2spec.eip7594.mainnet.BLS_MODULUS +PRIMITIVE_ROOT: int = 7 +GLOBAL_PARAMETERS: List[G1] +GLOBAL_PARAMETERS_G2: List[G2] +# secret is fixed but this should come from a different synchronization protocol +GLOBAL_PARAMETERS, GLOBAL_PARAMETERS_G2 = map(list, generate_setup(4096, 8, 1987)) +ROOTS_OF_UNITY: Tuple[int] = compute_roots_of_unity( + PRIMITIVE_ROOT, 4096, BLS_MODULUS +) diff --git a/da/kzg_rs/fft.py b/da/kzg_rs/fft.py new file mode 100644 index 0000000..9d7c0ec --- /dev/null +++ b/da/kzg_rs/fft.py @@ -0,0 +1,65 @@ +from typing import Sequence, List + +from eth2spec.deneb.mainnet import BLSFieldElement +from eth2spec.utils import bls + +from da.kzg_rs.common import G1 + + +def fft_g1(vals: Sequence[G1], roots_of_unity: Sequence[BLSFieldElement], modulus: int) -> List[G1]: + if len(vals) == 1: + return vals + L = fft_g1(vals[::2], roots_of_unity[::2], modulus) + R = fft_g1(vals[1::2], roots_of_unity[::2], modulus) + o = [bls.Z1() for _ in vals] + for i, (x, y) in enumerate(zip(L, R)): + y_times_root = bls.multiply(y, roots_of_unity[i]) + o[i] = (x + y_times_root) + o[i + len(L)] = x + -y_times_root + return o + + +def ifft_g1(vals: Sequence[G1], roots_of_unity: Sequence[BLSFieldElement], modulus: int) -> List[G1]: + assert len(vals) == len(roots_of_unity) + # modular inverse + invlen = pow(len(vals), modulus-2, modulus) + return [ + bls.multiply(x, invlen) + for x in fft_g1( + vals, [roots_of_unity[0], *roots_of_unity[:0:-1]], modulus + ) + ] + + +def _fft( + vals: Sequence[BLSFieldElement], + roots_of_unity: Sequence[BLSFieldElement], + modulus: int, +) -> Sequence[BLSFieldElement]: + if len(vals) == 1: + return vals + L = _fft(vals[::2], roots_of_unity[::2], modulus) + R = _fft(vals[1::2], roots_of_unity[::2], modulus) + o = [BLSFieldElement(0) for _ in vals] + for i, (x, y) in enumerate(zip(L, R)): + y_times_root = BLSFieldElement((int(y) * int(roots_of_unity[i])) % modulus) + o[i] = BLSFieldElement((int(x) + y_times_root) % modulus) + o[i + len(L)] = BLSFieldElement((int(x) - int(y_times_root) + modulus) % modulus) + return o + + +def fft(vals, root_of_unity, modulus): + assert len(vals) == len(root_of_unity) + return _fft(vals, root_of_unity, modulus) + + +def ifft(vals, roots_of_unity, modulus): + assert len(vals) == len(roots_of_unity) + # modular inverse + invlen = pow(len(vals), modulus-2, modulus) + return [ + BLSFieldElement((int(x) * invlen) % modulus) + for x in _fft( + vals, [roots_of_unity[0], *roots_of_unity[:0:-1]], modulus + ) + ] diff --git a/da/kzg_rs/fk20.py b/da/kzg_rs/fk20.py new file mode 100644 index 0000000..3034f48 --- /dev/null +++ b/da/kzg_rs/fk20.py @@ -0,0 +1,75 @@ +from typing import List, Sequence + +from eth2spec.deneb.mainnet import KZGProof as Proof, BLSFieldElement +from eth2spec.utils import bls + +from da.kzg_rs.common import G1, BLS_MODULUS, PRIMITIVE_ROOT +from da.kzg_rs.fft import fft, fft_g1, ifft_g1 +from da.kzg_rs.poly import Polynomial +from da.kzg_rs.roots import compute_roots_of_unity +from da.kzg_rs.utils import is_power_of_two + + +def __toeplitz1(global_parameters: List[G1], polynomial_degree: int) -> List[G1]: + """ + This part can be precomputed for different global_parameters lengths depending on polynomial degree of powers of two. + :param global_parameters: + :param roots_of_unity: + :param polynomial_degree: + :return: + """ + assert len(global_parameters) == polynomial_degree + # algorithm only works on powers of 2 for dft computations + assert is_power_of_two(len(global_parameters)) + roots_of_unity = compute_roots_of_unity(PRIMITIVE_ROOT, polynomial_degree*2, BLS_MODULUS) + vector_x_extended = global_parameters + [bls.Z1() for _ in range(polynomial_degree)] + vector_x_extended_fft = fft_g1(vector_x_extended, roots_of_unity, BLS_MODULUS) + return vector_x_extended_fft + + +def __toeplitz2(coefficients: List[BLSFieldElement], extended_vector: Sequence[G1]) -> List[G1]: + assert is_power_of_two(len(coefficients)) + roots_of_unity = compute_roots_of_unity(PRIMITIVE_ROOT, len(coefficients), BLS_MODULUS) + toeplitz_coefficients_fft = fft(coefficients, roots_of_unity, BLS_MODULUS) + return [bls.multiply(v, c) for v, c in zip(extended_vector, toeplitz_coefficients_fft)] + + +def __toeplitz3(h_extended_fft: Sequence[G1], polynomial_degree: int) -> List[G1]: + roots_of_unity = compute_roots_of_unity(PRIMITIVE_ROOT, len(h_extended_fft), BLS_MODULUS) + return ifft_g1(h_extended_fft, roots_of_unity, BLS_MODULUS)[:polynomial_degree] + + +def fk20_generate_proofs( + polynomial: Polynomial, global_parameters: List[G1] +) -> List[Proof]: + """ + Generate all proofs for the polynomial points in batch. + This method uses the fk20 algorthm from https://eprint.iacr.org/2023/033.pdf + Disclaimer: It only works for polynomial degree of powers of two. + :param polynomial: polynomial to generate proof for + :param global_parameters: setup generated parameters + :return: list of proof for each point in the polynomial + """ + polynomial_degree = len(polynomial) + assert len(global_parameters) >= polynomial_degree + assert is_power_of_two(len(polynomial)) + + # 1 - Build toeplitz matrix for h values + # 1.1 y = dft([s^d-1, s^d-2, ..., s, 1, *[0 for _ in len(polynomial)]]) + # 1.2 z = dft([*[0 for _ in len(polynomial)], f1, f2, ..., fd]) + # 1.3 u = y * v * roots_of_unity(len(polynomial)*2) + roots_of_unity = compute_roots_of_unity(PRIMITIVE_ROOT, polynomial_degree, BLS_MODULUS) + global_parameters = list(reversed(global_parameters[:polynomial_degree])) + extended_vector = __toeplitz1(global_parameters, polynomial_degree) + # 2 - Build circulant matrix with the polynomial coefficients (reversed N..n, and padded) + toeplitz_coefficients = [ + *(BLSFieldElement(0) for _ in range(polynomial_degree)), + *polynomial.coefficients + ] + h_extended_vector = __toeplitz2(toeplitz_coefficients, extended_vector) + # 3 - Perform fft and nub the tail half as it is padding + h_vector = __toeplitz3(h_extended_vector, polynomial_degree) + # 4 - proof are the dft of the h vector + proofs = fft_g1(h_vector, roots_of_unity, BLS_MODULUS) + proofs = [Proof(bls.G1_to_bytes48(proof)) for proof in proofs] + return proofs diff --git a/da/kzg_rs/kzg.py b/da/kzg_rs/kzg.py new file mode 100644 index 0000000..5185f90 --- /dev/null +++ b/da/kzg_rs/kzg.py @@ -0,0 +1,74 @@ +from functools import reduce +from itertools import batched +from typing import Sequence, Tuple + +from eth2spec.deneb.mainnet import bytes_to_bls_field, BLSFieldElement, KZGCommitment as Commitment, KZGProof as Proof +from eth2spec.utils import bls + +from .common import BYTES_PER_FIELD_ELEMENT, G1, BLS_MODULUS, GLOBAL_PARAMETERS_G2 +from .poly import Polynomial + + +def bytes_to_polynomial(b: bytes, bytes_per_field_element=BYTES_PER_FIELD_ELEMENT) -> Polynomial: + """ + Convert bytes to list of BLS field scalars. + """ + assert len(b) % bytes_per_field_element == 0 + eval_form = [int(bytes_to_bls_field(b)) for b in batched(b, int(bytes_per_field_element))] + return Polynomial.from_evaluations(eval_form, BLS_MODULUS) + + +def g1_linear_combination(polynomial: Polynomial[BLSFieldElement], global_parameters: Sequence[G1]) -> Commitment: + """ + BLS multiscalar multiplication. + """ + # we assert to have more points available than elements, + # this is dependent on the available kzg setup size + assert len(polynomial) <= len(global_parameters) + point = reduce( + bls.add, + (bls.multiply(g, p) for g, p in zip(global_parameters, polynomial)), + bls.Z1() + ) + return Commitment(bls.G1_to_bytes48(point)) + + +def bytes_to_commitment(b: bytes, global_parameters: Sequence[G1]) -> Tuple[Polynomial, Commitment]: + poly = bytes_to_polynomial(b, bytes_per_field_element=BYTES_PER_FIELD_ELEMENT) + return poly, g1_linear_combination(poly, global_parameters) + + +def generate_element_proof( + element_index: int, + polynomial: Polynomial, + global_parameters: Sequence[G1], + roots_of_unity: Sequence[BLSFieldElement], +) -> Proof: + # compute a witness polynomial in that satisfies `witness(x) = (f(x)-v)/(x-u)` + u = int(roots_of_unity[element_index]) + v = polynomial.eval(u) + f_x_v = polynomial - Polynomial([v], BLS_MODULUS) + x_u = Polynomial([-u, 1], BLS_MODULUS) + witness, _ = f_x_v / x_u + return g1_linear_combination(witness, global_parameters) + + +def verify_element_proof( + chunk: BLSFieldElement, + commitment: Commitment, + proof: Proof, + element_index: int, + roots_of_unity: Sequence[BLSFieldElement], +) -> bool: + u = int(roots_of_unity[element_index]) + v = chunk + commitment_check_G1 = bls.bytes48_to_G1(commitment) - bls.multiply(bls.G1(), v) + proof_check_g2 = bls.add( + GLOBAL_PARAMETERS_G2[1], + bls.neg(bls.multiply(bls.G2(), u)) + ) + return bls.pairing_check([ + # G2 here needs to be negated due to library requirements as pairing_check([[G1, -G2], [G1, G2]]) + [commitment_check_G1, bls.neg(bls.G2())], + [bls.bytes48_to_G1(proof), proof_check_g2], + ]) diff --git a/da/kzg_rs/poly.py b/da/kzg_rs/poly.py new file mode 100644 index 0000000..311b097 --- /dev/null +++ b/da/kzg_rs/poly.py @@ -0,0 +1,111 @@ +from itertools import zip_longest +from typing import List, Sequence, Self + +from eth2spec.eip7594.mainnet import interpolate_polynomialcoeff + +from da.kzg_rs.common import ROOTS_OF_UNITY + + +class Polynomial[T]: + def __init__(self, coefficients, modulus): + self.coefficients = coefficients + self.modulus = modulus + + @staticmethod + def interpolate(evaluations: List[int], roots_of_unity: List[int]) -> List[int]: + """ + Lagrange interpolation + + Parameters: + evaluations: List of evaluations + roots_of_unity: Powers of 2 sequence + + Returns: + list: Coefficients of the interpolated polynomial + """ + return list(map(int, interpolate_polynomialcoeff(roots_of_unity[:len(evaluations)], evaluations))) + + @classmethod + def from_evaluations(cls, evaluations: Sequence[T], modulus, roots_of_unity: Sequence[int]=ROOTS_OF_UNITY) -> Self: + coefficients = [ + x % modulus + for x in map(int, Polynomial.interpolate(evaluations, roots_of_unity)) + ] + return cls(coefficients, modulus) + + def __repr__(self): + return "Polynomial({}, modulus={})".format(self.coefficients, self.modulus) + + def __add__(self, other): + return Polynomial( + [(a + b) % self.modulus for a, b in zip_longest(self.coefficients, other.coefficients, fillvalue=0)], + self.modulus + ) + + def __sub__(self, other): + return Polynomial( + [(a - b) % self.modulus for a, b in zip_longest(self.coefficients, other.coefficients, fillvalue=0)], + self.modulus + ) + + def __mul__(self, other): + result = [0] * (len(self.coefficients) + len(other.coefficients) - 1) + for i in range(len(self.coefficients)): + for j in range(len(other.coefficients)): + result[i + j] = (result[i + j] + self.coefficients[i] * other.coefficients[j]) % self.modulus + return Polynomial(result, self.modulus) + + def divide(self, other): + if not isinstance(other, Polynomial): + raise ValueError("Unsupported type for division.") + + dividend = list(self.coefficients) + divisor = list(other.coefficients) + + quotient = [] + remainder = dividend + + while len(remainder) >= len(divisor): + factor = remainder[-1] * pow(divisor[-1], -1, self.modulus) % self.modulus + quotient.insert(0, factor) + + # Subtract divisor * factor from remainder + for i in range(len(divisor)): + remainder[len(remainder) - len(divisor) + i] -= divisor[i] * factor + remainder[len(remainder) - len(divisor) + i] %= self.modulus + + # Remove leading zeros from remainder + while remainder and remainder[-1] == 0: + remainder.pop() + + return Polynomial(quotient, self.modulus), Polynomial(remainder, self.modulus) + + def __truediv__(self, other): + return self.divide(other) + + def __neg__(self): + return Polynomial([-1 * c for c in self.coefficients], self.modulus) + + def __len__(self): + return len(self.coefficients) + + def __iter__(self): + return iter(self.coefficients) + + def __getitem__(self, item): + return self.coefficients[item] + + def __eq__(self, other): + return ( + self.coefficients == other.coefficients and + self.modulus == other.modulus + ) + + def eval(self, x): + return (self.coefficients[0] + sum( + (pow(x, i, mod=self.modulus)*coefficient) + for i, coefficient in enumerate(self.coefficients[1:], start=1) + )) % self.modulus + + def evaluation_form(self) -> List[T]: + return [self.eval(ROOTS_OF_UNITY[i]) for i in range(len(self))] \ No newline at end of file diff --git a/da/kzg_rs/roots.py b/da/kzg_rs/roots.py new file mode 100644 index 0000000..2c2d7a7 --- /dev/null +++ b/da/kzg_rs/roots.py @@ -0,0 +1,25 @@ +from typing import Tuple + + +def compute_root_of_unity(primitive_root: int, order: int, modulus: int) -> int: + """ + Generate a w such that ``w**length = 1``. + """ + assert (modulus - 1) % order == 0 + return pow(primitive_root, (modulus - 1) // order, modulus) + + +def compute_roots_of_unity(primitive_root: int, order: int, modulus: int) -> Tuple[int]: + """ + Compute a list of roots of unity for a given order. + The order must divide the BLS multiplicative group order, i.e. BLS_MODULUS - 1 + """ + assert (modulus - 1) % order == 0 + root_of_unity = compute_root_of_unity(primitive_root, order, modulus) + + roots = [] + current_root_of_unity = 1 + for _ in range(order): + roots.append(current_root_of_unity) + current_root_of_unity = current_root_of_unity * root_of_unity % modulus + return tuple(roots) diff --git a/da/kzg_rs/rs.py b/da/kzg_rs/rs.py new file mode 100644 index 0000000..08b3009 --- /dev/null +++ b/da/kzg_rs/rs.py @@ -0,0 +1,40 @@ +from typing import Sequence, Optional + +from eth2spec.deneb.mainnet import BLSFieldElement +from .common import BLS_MODULUS +from .poly import Polynomial + +ExtendedData = Sequence[Optional[BLSFieldElement]] + + +def encode(polynomial: Polynomial, factor: int, roots_of_unity: Sequence[int]) -> ExtendedData: + """ + Encode a polynomial extending to the given factor + Parameters: + polynomial: Polynomial to be encoded + factor: Encoding factor + roots_of_unity: Powers of 2 sequence + + Returns: + list: Extended data set + """ + assert factor >= 2 + assert len(polynomial)*factor <= len(roots_of_unity) + return [polynomial.eval(e) for e in roots_of_unity[:len(polynomial)*factor]] + + +def decode(encoded: ExtendedData, roots_of_unity: Sequence[BLSFieldElement], original_len: int) -> Polynomial: + """ + Decode a polynomial from an extended data-set and the roots of unity, cap to original length + + Parameters: + encoded: Extended data set + roots_of_unity: Powers of 2 sequence + original_len: Original length of the encoded polynomial + + Returns: + Polynomial: original polynomial + """ + encoded, roots_of_unity = zip(*((point, root) for point, root in zip(encoded, roots_of_unity) if point is not None)) + coefs = Polynomial.interpolate(list(map(int, encoded)), list(map(int, roots_of_unity)))[:original_len] + return Polynomial([int(c) for c in coefs], BLS_MODULUS) diff --git a/da/kzg_rs/test_fft.py b/da/kzg_rs/test_fft.py new file mode 100644 index 0000000..dc700c5 --- /dev/null +++ b/da/kzg_rs/test_fft.py @@ -0,0 +1,14 @@ +from unittest import TestCase + +from .roots import compute_roots_of_unity +from .common import BLS_MODULUS +from .fft import fft, ifft + + +class TestFFT(TestCase): + def test_fft_ifft(self): + for size in [16, 32, 64, 128, 256, 512, 1024, 2048, 4096]: + roots_of_unity = compute_roots_of_unity(2, size, BLS_MODULUS) + vals = list(x for x in range(size)) + vals_fft = fft(vals, roots_of_unity, BLS_MODULUS) + self.assertEqual(vals, ifft(vals_fft, roots_of_unity, BLS_MODULUS)) diff --git a/da/kzg_rs/test_fk20.py b/da/kzg_rs/test_fk20.py new file mode 100644 index 0000000..c7e095f --- /dev/null +++ b/da/kzg_rs/test_fk20.py @@ -0,0 +1,28 @@ +from itertools import chain +from unittest import TestCase +import random +from .fk20 import fk20_generate_proofs +from .kzg import generate_element_proof, bytes_to_polynomial +from .common import BLS_MODULUS, BYTES_PER_FIELD_ELEMENT, GLOBAL_PARAMETERS, PRIMITIVE_ROOT +from .roots import compute_roots_of_unity + + +class TestFK20(TestCase): + @staticmethod + def rand_bytes(n_chunks=1024): + return bytes( + chain.from_iterable( + int.to_bytes(random.randrange(BLS_MODULUS), length=BYTES_PER_FIELD_ELEMENT) + for _ in range(n_chunks) + ) + ) + + def test_fk20(self): + for size in [16, 32, 64, 128, 256]: + roots_of_unity = compute_roots_of_unity(PRIMITIVE_ROOT, size, BLS_MODULUS) + rand_bytes = self.rand_bytes(size) + polynomial = bytes_to_polynomial(rand_bytes) + proofs = [generate_element_proof(i, polynomial, GLOBAL_PARAMETERS, roots_of_unity) for i in range(size)] + fk20_proofs = fk20_generate_proofs(polynomial, GLOBAL_PARAMETERS) + self.assertEqual(len(proofs), len(fk20_proofs)) + self.assertEqual(proofs, fk20_proofs) diff --git a/da/kzg_rs/test_kzg.py b/da/kzg_rs/test_kzg.py new file mode 100644 index 0000000..d9e46e2 --- /dev/null +++ b/da/kzg_rs/test_kzg.py @@ -0,0 +1,67 @@ +from itertools import chain, batched +from random import randrange +from unittest import TestCase + +from eth2spec.deneb.mainnet import BLS_MODULUS, bytes_to_bls_field, BLSFieldElement + +from da.kzg_rs import kzg +from da.kzg_rs.common import BYTES_PER_FIELD_ELEMENT, GLOBAL_PARAMETERS, ROOTS_OF_UNITY, GLOBAL_PARAMETERS_G2 +from da.kzg_rs.trusted_setup import verify_setup + + +class TestKZG(TestCase): + + @staticmethod + def rand_bytes(n_chunks=1024): + return bytes( + chain.from_iterable( + int.to_bytes(randrange(BLS_MODULUS), length=BYTES_PER_FIELD_ELEMENT) + for _ in range(n_chunks) + ) + ) + + def test_kzg_setup(self): + self.assertTrue(verify_setup((GLOBAL_PARAMETERS, GLOBAL_PARAMETERS_G2))) + + def test_poly_forms(self): + n_chunks = 16 + rand_bytes = self.rand_bytes(n_chunks) + eval_form = [int(bytes_to_bls_field(b)) for b in batched(rand_bytes, int(BYTES_PER_FIELD_ELEMENT))] + poly = kzg.bytes_to_polynomial(rand_bytes) + self.assertEqual(poly.evaluation_form(), eval_form) + for i, chunk in enumerate(eval_form): + self.assertEqual(poly.eval(ROOTS_OF_UNITY[i]), chunk) + for i in range(n_chunks): + self.assertEqual(poly.evaluation_form()[i], poly.eval(int(ROOTS_OF_UNITY[i]))) + + def test_commitment(self): + rand_bytes = self.rand_bytes(32) + _, commit = kzg.bytes_to_commitment(rand_bytes, GLOBAL_PARAMETERS) + self.assertEqual(len(commit), 48) + + def test_proof(self): + rand_bytes = self.rand_bytes(2) + poly = kzg.bytes_to_polynomial(rand_bytes) + proof = kzg.generate_element_proof(0, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY) + self.assertEqual(len(proof), 48) + + def test_verify(self): + n_chunks = 32 + rand_bytes = self.rand_bytes(n_chunks) + _, commit = kzg.bytes_to_commitment(rand_bytes, GLOBAL_PARAMETERS) + poly = kzg.bytes_to_polynomial(rand_bytes) + for i, chunk in enumerate(batched(rand_bytes, BYTES_PER_FIELD_ELEMENT)): + chunk = bytes(chunk) + proof = kzg.generate_element_proof(i, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY) + self.assertEqual(len(proof), 48) + self.assertEqual(poly.eval(int(ROOTS_OF_UNITY[i])), bytes_to_bls_field(chunk)) + self.assertTrue(kzg.verify_element_proof( + bytes_to_bls_field(chunk), commit, proof, i, ROOTS_OF_UNITY + ) + ) + proof = kzg.generate_element_proof(0, poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY) + for n in range(1, n_chunks): + self.assertFalse(kzg.verify_element_proof( + BLSFieldElement(0), commit, proof, n, ROOTS_OF_UNITY + ) + ) \ No newline at end of file diff --git a/da/kzg_rs/test_rs.py b/da/kzg_rs/test_rs.py new file mode 100644 index 0000000..516a1da --- /dev/null +++ b/da/kzg_rs/test_rs.py @@ -0,0 +1,18 @@ +from unittest import TestCase + +from da.kzg_rs.common import BLS_MODULUS, ROOTS_OF_UNITY +from da.kzg_rs.poly import Polynomial +from da.kzg_rs.rs import encode, decode + + +class TestFFT(TestCase): + def test_encode_decode(self): + poly = Polynomial(list(range(10)), modulus=BLS_MODULUS) + encoded = encode(poly, 2, ROOTS_OF_UNITY) + # remove a few points, but enough so we can reconstruct + for i in [1, 3, 7]: + encoded[i] = None + decoded = decode(encoded, ROOTS_OF_UNITY, len(poly)) + # self.assertEqual(poly, decoded) + for i in range(len(poly)): + self.assertEqual(poly.eval(ROOTS_OF_UNITY[i]), decoded.eval(ROOTS_OF_UNITY[i])) diff --git a/da/kzg_rs/trusted_setup.py b/da/kzg_rs/trusted_setup.py new file mode 100644 index 0000000..69a5c69 --- /dev/null +++ b/da/kzg_rs/trusted_setup.py @@ -0,0 +1,48 @@ +import random +from typing import Tuple, Sequence, Generator +from eth2spec.utils import bls +from itertools import accumulate, repeat + + +def __linear_combination(points, coeffs, zero=bls.Z1()): + o = zero + for point, coeff in zip(points, coeffs): + o = bls.add(o, bls.multiply(point, coeff)) + return o + + +# Verifies the integrity of a setup +def verify_setup(setup) -> bool: + g1_setup, g2_setup = setup + g1_random_coefficients = [random.randrange(2**40) for _ in range(len(g1_setup) - 1)] + g1_lower = __linear_combination(g1_setup[:-1], g1_random_coefficients, bls.Z1()) + g1_upper = __linear_combination(g1_setup[1:], g1_random_coefficients, bls.Z1()) + g2_random_coefficients = [random.randrange(2**40) for _ in range(len(g2_setup) - 1)] + g2_lower = __linear_combination(g2_setup[:-1], g2_random_coefficients, bls.Z2()) + g2_upper = __linear_combination(g2_setup[1:], g2_random_coefficients, bls.Z2()) + return ( + g1_setup[0] == bls.G1() and + g2_setup[0] == bls.G2() and + bls.pairing_check([[g1_upper, bls.neg(g2_lower)], [g1_lower, g2_upper]]) + ) + + +def generate_one_sided_setup(length, secret, generator=bls.G1()): + def __take(gen): + return (next(gen) for _ in range(length)) + + secrets = repeat(secret) + + return __take(accumulate(secrets, bls.multiply, initial=generator)) + + +# Generate a trusted setup with the given secret +def generate_setup( + g1_length, + g2_length, + secret +) -> Tuple[Generator[bls.G1, None, None], Generator[bls.G2, None, None]]: + return ( + generate_one_sided_setup(g1_length, secret, bls.G1()), + generate_one_sided_setup(g2_length, secret, bls.G2()), + ) diff --git a/da/kzg_rs/utils.py b/da/kzg_rs/utils.py new file mode 100644 index 0000000..b519f82 --- /dev/null +++ b/da/kzg_rs/utils.py @@ -0,0 +1,5 @@ +POWERS_OF_2 = {2**i for i in range(1, 32)} + + +def is_power_of_two(n) -> bool: + return n in POWERS_OF_2 diff --git a/da/network/__init__.py b/da/network/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/da/network/__pycache__/constants.cpython-312.pyc b/da/network/__pycache__/constants.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..03b6e834d06a1ec0175781bf97094fdd3710ef21 GIT binary patch literal 582 zcmXw0O^ee|6uqxs)33DC!D1Y#h;F6}o6)7nK-#=cE19MyDGYBDI(@BH+rE%Qq;4ke z{0D;nK*YWO!A-$Ucpwb8>P9NFo6Y#5_AUgZPbe3jGj^;?7=1GAT zNO4)LOY3;0j(E=Qdn{lhHhWuA7HrOZI#eDu4jYHFIQ#ni`aB715E;|=H33KW1V9R4 zeOHXv#TbNwcCRjRX*NA`AGwWS=}s5Vz1V;k`lewUEkk#@8ZVxxr;dqL&Czj>7dood ziP^E;;iaCTVW)4Hw$oHC%q4t^TLarL|MqxA!)*@wz#xy z#8wuUx6)Ei`Z{*(j)|>~p=*49J27!z@3vICYxJD9+7f9zt>Nb2_!C@;$HUcQHq;^!%gG%r~ZuNeziYhm4L#Kk2XFLQ())E*nS&;K^r$xll5c3n#;D-j}V)RVS-+kz|DR z{n?saG#O>Mkgd(tCF>~hkY|Y;xIpCKRgW!EGFIT?p?_j0PA#7?mOA*@VZZ``)I@vpPQabhY;In5>TA}A(ICJ()-0$`kr@rY%R&gYK#5gWxZ!NmgZY>;6AIiv(fJaYH~ceS!* zG9*`BAjz;?4X{d%0IZg46alEB7r10Zu7!~rxej1djw!V`Cx_%+BR;tv=IaW(;tiJY z^f@IxVQTu+AA%yZUNN0b8HPBb>7tRzO=MGMMjaJX*vV%VQK^vk1R41Y6RII%I$`~S&uw6_I_E*II-a**DB9#@&W(-qN*_G-7oR`I+TvkY( z$VoO=!wRUdLPAO_fWk>eW=gR*M6D1T7)u#riedSOv)Ztfku9IDm=n61<|{XV540mM z{5Lp?O5!0!;#|-)Z^D?}#w*(G^zkXvBKn+&mFagV9d0n|*eoeh=($o9 zz0h-%l@vMXxvNK^KLwPdwCKer%;9;kHcHG|2j52Hs&>%>D?$!cksohz@XelUiY7AoLy2$puwTMV=M(WND*)PLX3)y4z-b57tAURmK!9@T5d<{|P&B;`0lu0Z zLl6aE)k}`fN>ZBXn1r_#_gUU_*04Ds;U*i4Gr(OK$Om;?6z?8QjA=O~F_Ie2Xoju8Y41n|G&ym_U zCuX@@EnVf7o>EKC(j!lOeCXrurNqE;%X72br$XbcSnE50y%kuAHL^!bHydAR>VR(b z>aK>x`25RDv93$6m0|~xVzqVm`z@u`{Yxzeu7yf1PuxQ;gv&y6Nof95XkCfa&4xBW z43c)$4mzk(OhFF=vHW2 z0bnZPo{LR)1c`P5I8Q*lw+j~wd zR+&z~w+I4ujUtYu(#lk=D~>z%Euh|jV7MvKlF;~n3&PZPyC4Fd0s5V(s4IsvbPOcW zQE}MT7N1dN9ybpJtpu(L??#UUFp!IA-;l-RL7%ow9vnJ> z#At2oqU=k$#Frx7OG5Ykr-QDCUDHc&VGCr_%{e+VO--C<4K$8(@Xe>=^!mvUCJ34K zY&txkcU{!|^f2o>r_I?Si@KX~JctTU%h`FnYID7VIr77LbWGcty-SgfqkGxb{+Aso zd25mb=RJ-Gy{&ETEV=JjpmEPshJ`uwT6RFaC!S)MuEf(+)+O8$Pqh&UmD!OVH^7Nm zMIALElqwM5S$uVu4aP${-kQZj$R1T}|A|+(>5Hwvm~u|ej4Fo7JhJ7R$OCJ|@>K@W z*5h8LfR+bBCe0Yg5~OEd!m7H*XHV6;hH=k$A1pC0060&+3X<2Hu_wsVfq1pbo2QCiGr7oVj)v@<-?4zbDP1hc|(z@Jn zxZH7~)Nx|D^nvENlUoHv*})t}>944xn1{#=l|8S)xxMRI@cx+R zVFy@+;cHtP$H2L*If?{^HBHD%AosLq+FSI>UR3^R4mfRFf!*=Y{ewl$k!S`g0w#}l zi#+!)AUl~u(jj+>VwB#X0l`(I2(_S!Bv`}W16>1UFQPR3s8nJS7Grelwna7Mi{>>NtJEjf|K5F2{O*m8Q9YJ z3xI!+JLkz-n8X?vj?N!lI5vOmvRO_XFC~sI^_-YHwp`cyH36Z+yv_JI0?)P?= z+aD{nKepU{WSt0A{dB%+&bP>~#+nw}=TFYj)%s?%<8u9;QvIIGbh&D$AXv5(mQKWoWEleY$f}*k%8BIeShB*^w+d+x9=k_ zV889<`ntV04%0qSxN+nl;Ge{4Ux)C?zD~ezR)v7(W+UzE;%+wi5Em(qcQDvRairVV z_qhM&e%hA^-#p+){E#2U^FJj&AAfXbga~;>fDgFX-_|d38N2yqxf|V3;XBy&-$SVSQwZe zSX4@}uGzrfY8%S62THXEE{|R-EZ6qJe0A*Y?8WTjk>%>nve5Y_q4TbRh0*K1zTO&o zL!iCY+>HR@@t7q@lA6jXl4J!XDW}O3S;WJV^y)+^Tbc1olB}gANpA!$I^M7qk))KW zY9>_rnrgt5?NTx-)XVH+Ku$5oG+9SI(03!~K=2TPy$BvgfC;<)6aq|N*il@ulUuty zz+_8DfO(a%0N@SsrAOd}wHT>wDM!0X(XM5#Yb97c^V~|XYGz<18e8UKE0O5T>D5T> z%;0J;Jack2S~qiMrK)D;)GA-I=Begm*UzpKcx-ryFUkhiMap-q`zYVUz#3qC7SkI9 z9_zj!-?`ox;rr=YN09HjG_^+HalMm0KJNModo0{DN#naLKjy~S%&?so`Rq>_I>v9y zXBP7CX@nh%6>+txhpCv}4j8|guS-?VfL=Ij;p=Mlp)i`~Lq;YnLBzO5%= z46+wuBmk^=DW$hb^KH_4n>75Lw0%x$J}03sNY59f{Wj^nO&jWWV_#a$u Bd&d9( literal 0 HcmV?d00001 diff --git a/da/network/__pycache__/network.cpython-312.pyc b/da/network/__pycache__/network.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0cf963819eefdfd239be282ebe0fede75ae42169 GIT binary patch literal 1564 zcmZ8h-ESL35Z}EIf5lE59GWzu=AbIlsUm(t^r1p>NmLNvqL#i`LaVFq?3}T0&F)&p zmRnT>q*N-ReG1|~sG|NQyiirB))B-5FZ2b4SQXGGX7=m^I+o|lJG1kfne`8aLXH6a z@$ma*#U$i6H0ET@l<7rK_J~h>+97q6Mu&Edy3sZ3CPmxqSY5krQ(}1jx~ZpV)LbLBH@7ZbzZ@^tu550xkFQ+)gk7p$+j41~+N{>NA2gDvghN4^rWZi$ zk$}{xPwK{XD5P!%R$%)EmewMmnW*8_^^|V|Px~q0nPJ+^#97E4r~wzBjaoSe=)9FqkQ?Gcxo?qKQ}3^ zO^PdtwyKRYPl{(B70*5{o_khE7Zwj~NbqchqF zx1&3wA2X)~?nBMZ;#-KkZopWaW30>leh2kDW1siE&g_>oV?J+y+k^?lON@C@#FeK) z9!bW;BBT^O&#VlNrS^G<1E)$W6 zOp=80X-P6EOW497&nDRv7v?C>B&{i1*jD82VLKh;2s_}(C7mf(*p+gJ-6=lI(>~T@ z)u)COTXIv%6ZWL4!_~CKo~%iE!(PfelC>#c*hhJ1a&xLKT*nX#IY30$IU>4mT8w#y z>t~n%{}sN30*r1`C!@~|>;}%RMAKqCtp!+p({NOc$BxG)rId8^^&`6TK>z+E@M%qV zy{^PhM>XlNGy`Lpo`6N?CS_IA9cgJgf*e%4k86q)O&ya|313|YLy~4x!J9Xp9+xBI zikymMp36j{qNv1-sTs7ipMM4Kmx)Bej7Y+k@4~)@S&2)~lPDiRh-4EjgHR>efnzD> z0FFa$0$Z)fS%~DkSsVy;Hrmf6+RqWQHtZH1=SY|roj|K_-@=dhY+eU{%vM5EjGK9DUC__Rl8C6wb5_UHkPfrM= zQ7tMAosPzn(Ub9HT$>TPM~4DI7wvs)Dy<4~T8Kh_MT~(YQOL-O7No7mwWy-0(3p&$ zguN5e&~CIGiL08lJ`_GBDZ;p{2#OSw;-~SuB7E2AjOUUTK~6GBYlXO0>?f&$Ce!+i zBq=I3#8n|Cr_)kQgQ;jTokzDeDJc?&mjqbaR0@U_`@~`HNqHh3!*PLiSm zlBzU~qXNc4bvHD?li4u#aw=jDiR?W-Fc@K9HY>-&zms^I^h( zqF6|_bdYm44eE`OC@Q}Bj!%)vGCw09T5cA5Wo?mCFU@ZDG}QuHO{FAR2Hvb@kE{jW zO7qe@W_{U6hgp|p6SXG4{><7EzV$bg%d%N6YrR?gDnqn7vqfprwwSzVNi>=K`ZLRz zt1_PqnSpbzj%)E~GCnJb zflazIHI;-{xU0JJsL3kT@KpuxqXM8v6fY87qt3#`S8!dr3v@uyBC0H>bv|NTjz~nA zO6%6q{lniqpj$HvfR1hhZAwM8fL&)}NwrLrJw2tUk}{(^)X6DLl&8~rl^9nupq^3$ z^;qXrDLJmg^xu)P!|L|wKKI>fS zJGk6*Xn|XCG~8=!|5?uhcfnKeHNeAJ*wPM_`$BLd#~)h zHhs%+d&i375AM6G@9{ND-ske(7jxbhZ~K?M`#$GiU1bSh^Not?_3m)Za4i!n=`>WK zzOwLJ+YRJ0=zap8l_h~aDvGI_$H@&Q6tK{v(`^%y7Kw{D7^NM$tN0;7!SyR$KxV6o zn_~3p!!DbFyh93}+V{>bp3QqYa-NQ5PuG&8YYiI6Q&UB0bgGTOJZq}QiP8zp0mjg! zZs5zN0eFBD9FNAN+3L~>6>FZuzC4i9e3l$7N&qVUKhvPvTsaK~?f?SRtfy>Cuwvlp zNz5$HEIAsCNtD0?x}ciQ!*5N8T{%zAGv_VmSp&cp|JO$dJui_9~k zzU(n?>t~ zRt1-8Tw9BRW=VH~-zP?J_5rK16Ka$;B&|pgSrh??iV}o(piqd>HyP88P!~y3X8pyh zMeRgzFe%5PN%fWB)4g6rL4F{QDLNOb>+{v^x$1Vn-pip&p=+;QKXB{K+jGBTEj8Y6 zRJ<!G3uKtDt|fgT+EQ)ja_xby$tGv}LnPE@1oGaUIq%L}zI^{su77CRJG{b=tlHQr z?&I`+<9X2ff|^AhFvxuO29iy858cilGRy$tlZ>tm3wvyF7y?`~yATKT)pcI5qb zs5ca(O0&fAVVgRdHc((y+ zY;Xm#)-1N5k*z2<(UL&{lzP@kb|ZBDpQOzgxWUc-d+FZ40gxe4uUkMpqyBt69g|b0 zJKrmuoYKHuS7l*Zf<9>t-FMSZD#-!A+W;U9JaTl*$HBQnuiOX+5T;H5JdMlAq&$tX zG42pj(No~pt5e|QYm)#mCuMC?kWNDo2$2I+7V)YS6rdQyF+{rYxB_3Ao{YyP1vwT2 z4-P+T-0!llhGOL#j2lZ4Dg(?E#s~mw2B`7i zl|s;Dw9R%FZ+r&;%3YrW zU-JiRTIm8NP<};$NUS(|j?qf42Ao#OaabRaK-{e;iIQ8e-e5{hV60J5%oClnWG#!< z1iEe&wB4$*u#0hW;)s%8-U8h8B@YprxoN zAWx{Mg4e{hZ`YY?O>{iF4d!1wM|0O;9c%)9`HBJ#&@;}NA!&EntXZ=kLoPBGTK6Ee z!|mBj-5oA8%R|tpYLF8U!5jJNzo5*vQguH$1Gt0tKP0E6USTRNy`7=ygf_^RaM(rX zj}E;VIktajG;(Br2tz_yll7XZbS4@*Wu%Iz5Ct!DDg28jBZ~Bv&YeukCv^^e4&6B^ zy)DKk0OpM7lf{UYIwlS?W>hd_R@SXPja+eKbXx}8CT#}e$+QT8sB3-px*c*HYIH)< z?U;-J@s)lJ`Z{I|y6)MysR+ZJnC*Br29$OC6BLB$xuRd=tt)H`eEiZ!FMX0--Zq$T z9=zx()HW~G2JW?N%eM^VS_W<H(>y4(Jt=S-dg0l7@z@`o}}_B^j2O35^xxP7j-i$ zoUdW*bOOl`5EYX!=!Jb>5MDLH+UKFB6iL$UM+HLPSPcRBz2S@bNPW|S`*$1%{tsX4 ze0jBpIID{>{NVMbTNC-+hjP0QtvC*^$M%EQ8-Ce*qxrTy|H6^n3r9Y895rJ2^Fwt* z%}lJ^9+eyy5$>R2@BZ&`e4>QJFz;e9F=CYb@fkwqSjZ=B99&{lP*KdWry%xKm8c#TkPCAh#3z7H z2jSNaKR3lGOXw?vJ5U|G_j(l#=P}iZz@j`NPm1aGZ*RhKW^V-G@R`y_pby|sQ zQqk!L&;W>nWLFzDaTyp)y;LK_L6^u-H`J+8%D{x8(@?2i0z#GT0l58Rm&WqG{+zFW z+4p?jH<vETzav+_W4XTXpFDkq>V|^1HShK3y#8z7 zS@w42`K~3ttI*VXb>EeJ`KEzf)4(n9@BZlaQAkqdHy>Jf?SZ%9a_^96j*}VgwcT+wAm7dm z2W@w@IgsyVkOq+|$6@GGejQq(dHAhiey6DA%*Oh7nj9~SDH#glkfK6>pmJ4|RoqkE z0Z#1QtazFG81vcxBNF*Ii zNs)-|ibPVfIF&@6k3`;@iYALK_DDpOV-Ue%GFCyKK=(u<(R5nYq8cQ=RcJzAM`t0S zsi1eDyJ_w}C25ngs9>;aDAso5b|66yfd-xm0+NDi0?AV_FK7mC5Sl+SGzj%yK~fDr z^-qEP8F^@NS{c%P$(BbpueE)(yW1LK3U$INi_8OO)tVKVwG)iP+O_Iqtew|l zza{W6riK#=G{BT+RKqkvTEno?HvDsvoKny$);&d)k0yiEwkrpqmhPj@X-ZtyttXRe zW=26MQ_!DO5SVl){8a%m9T0KoHX~m`H<+fJ_2M5vAW=)RxW)89P#gpl@#ChafS3n5I6tW821FT6|<<%yT=JB|Y#gPjMTW1sK) zUB2(w{&ip=gkb!2>)Z4)htQw&rZ+t9Vhh9K29l7(C@9ZdWf;2lD4ts$Sg~KAt8B>| z^F5^DF~$z)f-1?X2}|aH$kM0o@WV{+w-y2V29i;kHJRR~g>Jg*ljxH-5c(Km%HFci z@>jwUH~@Bs;}?qsf43ou1N9t5*rv>V z(!&HiXpf^K&{O|xzUeN@LCI70zoe48-Aaa_lD>uiL9v3l{RkP%+Nc!*UcfH<*YLmU z_MAG)Lmfdz*79%j@U$Xb^|_N!S&zg*|4_GEJ>QE+hn~7M^RK@e;ucMc6GG0D=uUUr-3n1G$2;L?_3(!O2N{&Llcj zE+~p4&Hzs7dBxNhB?MOnh$Je|ItCrew*e*H+XvTJk%3thQ~)%oi?pmL-~k(^pqp68 z2mqJXwY-yKY6Yxy(`lZAc*sF?3fAR$R-~-32x5VQ2-IGI7Uc4&i|5a!rf1Jxn3+30 zotnLHxoag(2~-CWx?PRpW9OO@7iM+1c~6!n4zJ#igF|6MAmU{+fq zi$ss(l2+_CS)kZ)=mwZ2a6Sr}Zo2Ug={N4--!ODIhHxD1AW@5Xr>xdTbJV-t2eEVA+8TBgkQG@J=rNk>7QhblHW}GwRGXEhPwfCrbs)Q& zwpW2B*vx{_2NLL;;EpFp5?L)z5@}&x))EDMAt9&DgrSQGRm*F}n*~iY5|WUB!?L95 zxkMLkvQV-I#ooKrQ>X6)Z2G}Bz6Q%HsIwo12UpJg9vP{}T9MZup@8qTRd0ocbekWoAFjRk!=c8~#^U8bd99Uwd?{ zE_}1Emg@K*;xUK#9o0-dUDI08i94~WR&?qR?(7T%0ZG$Rv@(og*QXZIq9+Y!Ect;SW^ z$NY07_o7psK`_>tK>hs6%oa2U&A2O52VhPV7v4DV$811!8U3IG5A literal 0 HcmV?d00001 diff --git a/da/network/constants.py b/da/network/constants.py new file mode 100644 index 0000000..8d9efde --- /dev/null +++ b/da/network/constants.py @@ -0,0 +1,21 @@ +from libp2p.typing import TProtocol + +""" + Some constants for use throught the poc +""" + +PROTOCOL_ID = TProtocol("/nomosda/1.0.0") +MAX_READ_LEN = 2**32 - 1 +HASH_LENGTH = 256 +NODE_PORT_BASE = 7560 +EXECUTOR_PORT = 8766 + +# These can be overridden with cli params +DEFAULT_DATA_SIZE = 1024 +DEFAULT_SUBNETS = 256 +DEFAULT_NODES = 32 +DEFAULT_SAMPLE_THRESHOLD = 12 +# how many nodes per subnet minimum +DEFAULT_REPLICATION_FACTOR = 4 + +DEBUG = False diff --git a/da/network/dispersal/README.md b/da/network/dispersal/README.md new file mode 100644 index 0000000..ab9086d --- /dev/null +++ b/da/network/dispersal/README.md @@ -0,0 +1,76 @@ +# Zone Executor to Nomos DA Communication + +Protocol for communication between the Zone Executor and Nomos DA using Protocol Buffers (protobuf). + +## Overview + +The protocol defines messages used to request and respond to data dispersal, sampling operations, and session control within the Nomos DA system. The communication involves the exchange of blobs (binary large objects) and error handling for various operations. + +## Messages + +### Blob +- **Blob**: Represents the binary data to be dispersed. + - `bytes blob_id`: Unique identifier for the blob. + - `bytes data`: The binary data of the blob. + +### Error Handling +- **DispersalErr**: Represents errors related to dispersal operations. + - `bytes blob_id`: Unique identifier of the blob related to the error. + - `enum DispersalErrType`: Enumeration of dispersal error types. + - `CHUNK_SIZE`: Error due to incorrect chunk size. + - `VERIFICATION`: Error due to verification failure. + - `string err_description`: Description of the error. + +- **SampleErr**: Represents errors related to sample operations. + - `bytes blob_id`: Unique identifier of the blob related to the error. + - `enum SampleErrType`: Enumeration of sample error types. + - `NOT_FOUND`: Error when a blob is not found. + - `string err_description`: Description of the error. + +### Dispersal +- **DispersalReq**: Request message for dispersing a blob. + - `Blob blob`: The blob to be dispersed. + +- **DispersalRes**: Response message for a dispersal request. + - `oneof message_type`: Contains either a success response or an error. + - `bytes blob_id`: Unique identifier of the dispersed blob. + - `DispersalErr err`: Error occurred during dispersal. + +### Sample +- **SampleReq**: Request message for sampling a blob. + - `bytes blob_id`: Unique identifier of the blob to be sampled. + +- **SampleRes**: Response message for a sample request. + - `oneof message_type`: Contains either a success response or an error. + - `Blob blob`: The sampled blob. + - `SampleErr err`: Error occurred during sampling. + +### Session Control +- **CloseMsg**: Message to close a session with a reason. + - `enum CloseReason`: Enumeration of close reasons. + - `GRACEFUL_SHUTDOWN`: Graceful shutdown of the session. + - `SUBNET_CHANGE`: Change in the subnet. + - `SUBNET_SAMPLE_FAIL`: Subnet sample failure. + - `CloseReason reason`: Reason for closing the session. + +- **SessionReq**: Request message for session control. + - `oneof message_type`: Contains one of the following message types. + - `CloseMsg close_msg`: Message to close the session. + +### DispersalMessage +- **DispersalMessage**: Wrapper message for different types of dispersal and sampling messages. + - `oneof message_type`: Contains one of the following message types. + - `DispersalReq dispersal_req`: Dispersal request. + - `DispersalRes dispersal_res`: Dispersal response. + - `SampleReq sample_req`: Sample request. + - `SampleRes sample_res`: Sample response. + +## Protobuf + +To generate the updated protobuf serializer from `dispersal.proto`, run the following command: + +```bash +protoc --python_out=. dispersal.proto +``` + +This will generate the necessary Python code to serialize and deserialize the messages defined in the `dispersal.proto` file. diff --git a/da/network/dispersal/__init__.py b/da/network/dispersal/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/da/network/dispersal/__pycache__/__init__.cpython-312.pyc b/da/network/dispersal/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..4a32656cd50dffdd8b85cfe8f87236dd0c2308fc GIT binary patch literal 164 zcmX@j%ge<81n<_)N(0f4K?FMZ%mNgd&QQsq$>_I|p@<2{`wUX^%UM4oKQ~oBEioxG zU%#LzOFuKMM8CKwSwAm7H@{f7AV0ZSKP6E=FSVpRzbIQjC9}97wWv5TM?XG3GcU6w lK3=b&@)w6qZhlH>PO4oIE6_|vAT9yH8-*h4LC6ZAY$&AdZOqDHnF;wmXIZ*Dl z&CV)v zkaE}jI~kOtz=L|yAA?BO!Cgmgzy6LfB61%Y6mPhw+)sw&7*$2Dg7Scejd;8`V1pgJ zYaZtFh5-BM5}t$kyz9Ul?%>_m5F}Z4Q@|qGN<(Iu+WQs zSuyHhDybaBx~Cp)ePS1j`;x)Ef>f=m zgukq=pT1Ss)D>|&Cn-;QcSXl=^dd8gdke*FaizFZ$j1;j>Lx}D*V!Q4k-uuh=RUZs zP_=Y>)h6Dzn@dXe2L^|7s%DT)V?X``_vl12pigMy_GRX-aOfJOY@uYEa77$$Q{zK; zV6~LZ@hdwUqOi8JoiF~R0Btn@h1yvz@Y`Z;En8USV@&)b96yJI?B>=6FRow7c9F^S{Qq;-k=kDLzgpd|I zf(~orLtD5+46uHPv0-5m~sCK|^+wkKn3h9^s z?NU~H>?88N)mFQDB~v(^rDtikxV%wZJ}uJ2wM2oam|qvR_>z#_;7cVX1Sjrki!O{^ zSh%2U|7?p_FfiM#OZ;a_4lsp(Ye7){Oc%bU3s`iTi3ZEVhm0}>7i-u@eaQ~9LYG}5 z8lysuJzzC_LbAc@x&NtGsd@rcQFy3W}nv?TVt+zprWgDsg5K3`QO7s6Y2$*K|EVkysmI zk3^kLIDk9GqE0_I0qz)$)^JChlu~ODy(X!8kQ#)<^TBD2J(pAkMjF|o;~Fw5Idk{9 zGj^?>yjGQzn$@$lv%ImMvx0fDCmpH{*b>AP<%%_sm1TkG3ix?JDE$h>PEfju(q`b=NnoTI7&!@W%>ZWxy3Ih3dmZ|JG{~@L z*HC!!6~1|bxhCdbOQ=r)G|4e*LJ) z4z}2Yd3(xJCe74cP@Xl}krsQ$oKJhoocV0|c;;1juF1}~*iX!-pZeO8nY!l@JFKo= znM;AqQj;BSv9o4!-cx4G#U<)H(PXDu>|^uMk9_T+d9~HmE3|EK8znQ%)20v7oq1nKH#? zW>;05(g4u}y|jlE1!^NH3Lh=l0es9c>9rRwNPxmZ1q>7gnoAK#IT$|m|7Rau(Ug^< zhoWv6=a*-2C`jl2Pwz`$PT#!WT)H-(l7f# z?veL^49EeHqAY?8%0ZBO<-H(7cYKl1H@KV#Yc{3Sm7E^`2A7v&vC%F({BrPYWkK9y z3e1wX*Bc<1|y8iM=PyBqpTk@wjy9 z{kic(gfm-~6-`SnDkd){^`zMeQx^4GQl2R3%!eATy05SA`NilhbyBo)o6%c^!Fucox9oQ&oaeN`p5qjE;eD@03XVXUgFBl$aKM?RUlE!i(4BIs>} zM|%@Qk$EZvtHM!3I9e9^i(?z1?)&`jCaa--Bh+6xe&(z3QmDTix>}rB7sB`7FoZ)T z{?Haoe36llp>gX^Af_00M^<#l))*dYhxIz&WE5%v;QFF2Eyx!L70cEg{lm3PefD%7@96p1nglpv~kmIo)he z6Lr}pwke+^ngTY^Ove%e&c&Dl5-M4I5w>S5BNS_gtukE)-`rf?fAUemnA@_se6Sj+BEJi&N`jcU3%Mh({hw zmBo`);Uusq1U?P@`do?M_iYQ)b&-AcR5*c@h z2w=UJ1-Z@95C$~EEgb|pn#^F$Dm_S*OJ^g8s;5eK=`48k)HD`+=w>~xX(Skn2%ch3 zx}cNnL(vT)!ckx;-ef;CI1XbWEP4^9K1dG2Ae^q90|DrU!lnH~m7bweXt)us6vWED zOqBvcH$~{JR!_kt94|Le27MKcbL)6^;vrK`7 zpu@%?R&=_4E#((ZnSn9K&1dWwvgaBa;$^T*D*{D)7CxQuS`C+B+a3jv3dBP@0;0%l z_zzV5!-jvj>>nvktn)&ZKVa|&?vGXY!|Ot@7`OK9-3=Nv-SZ!zvfG{uu#?AZEu?x7 zLOpe*;DhjMV|bVIjCBKaZ9;*2>^kvX-s9T5qandW`1qR7Y+1;v3sOd=u&l8rkN+W( zdypl{2azUn2(+90CFnIfT$pbtACNxipy3k+9mP6&Ht9!k9E2&>II@GShG_=f-uC;h zoU$s_M#1}}Wo?lR!Kh>KXfY7Lh7hg_y@t^HVDZUVML3IL`EWHfVuVKEf9Nb&Yxr0- zeA)<~{_3ZdaH7O_Z3x|^aHK2@miWP!*k>N}@6Hu4<)F`c&~L;PCQ4_l&*R^2E1=KC z*I0@dfW8fEEy7!1dtuf*Fmo2nQCKw@M10WTTrhj<(cqs5J@!OO2sqAK@FI-1T{2Z)Eg4ISA(Ux>DyLg412_ zILI0}h=sM;st_XSck+t42WzsTrAQ{PXVe_9$3YVr0S%Gndg8_jG22J4+>B32iOKnK zauP<>m|U|Q2J&J}pVt;pZ;j;9 z?hWyksyJkbLsjv#A)a0jANj6@?`kg2eAmu&y=uGGJC#uEd%4#Cja`t}0ZE%{au(`l z$GhVqx%&YYgZ{S7peh!D_bS9NVqw>&)vF{z!@KXBD7~+|KlRi6M z!K*Bnrf&Toij3q0ECP~HyOvzeXB7!EkR24;60~W1;;|#*D9maQ5rYVJO%wHsaTq!U zj}r#b69x=%;7NByJX^e4%Or!9@Jz!@;x%^+TokYWimXqLAWoQBA$Zjw*6but2B61u zarS1O7ZUsx)I@VoU3BW%hWYjsbQLCDhiB)!?y&Jtu86Py%WU9MC>#HaNeNDE=YPp zjG6^)@4@55D2F~L#h_zV#Z!iO%8F6J=HgXIF~LKY&pn*4h-bbR%MD``Cb!*>9^TU% z-X9{qI0P8Ho@#0iZUyvs+d7(6HDyLyga))wvn_=U7%{w;1+x=Dal6PI3{B(y!yq8V z270T3ek0KTFkK0}S)5!KLM1-*lJh4Xi@ZB1xOW}zXPO<4N41kCT0JdGyB?1U96f_Z zSdxGl0EM8z5fO8b6>M=&7AWlrtHTMo5Qz~RI*(vhvH}3cKsbcjNBaHLY-$=kXrZOt zhDXCRO3}KXE(eYkCpW+#f`dkI@X7m?;DzE;EeM~hgl8cLuL}oD{6PyynClkYHg))) zLB$KC=OT{Ft1FE_a*yqd=hEcT0ax!=KnZveZi2!?Za~cxCvL<>$1mTUmJ(NP&W+9f zC{EoCPWbeqi457m9IFRrkt9<}dL=7qw^npnUCq(_^b^z%k9!k6{~Ybfw~L1HNINJTKW6U+xRC|YhpZ5df4xau{A+A94X^}O}%WmR6uD(4Bt zPqYgS$9&7NEc+Mcz~7kY(@ Self: + self = cls() + self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + self.host = new_host() + self.port = port + self.num_subnets = num_subnets + self.data_size = data_size + # one packet per subnet + self.data = [[] * data_size] * num_subnets + # one hash per packet. **assumes 256 hash length** + self.data_hashes = [[] * HASH_LENGTH] * num_subnets + self.node_list = node_list + # create random simulated data right from the beginning + self.__create_data() + return self + + def get_id(self): + return self.host.get_id() + + def net_iface(self): + return self.host + + def get_port(self): + return self.port + + def get_hash(self, index: int): + return self.data_hashes[index] + + def __create_data(self): + """ + Create random data for dispersal + One packet of self.data_size length per subnet + """ + id = sha256() + for i in range(self.num_subnets): + self.data[i] = randbytes(self.data_size) + self.data_hashes[i] = sha256(self.data[i]).hexdigest() + id.update(self.data[i]) + self.blob_id = id.digest() + + async def disperse(self, nursery): + """ + Disperse the data to the DA network. + Sends one packet of data per network node + """ + + async with self.host.run(listen_addrs=[self.listen_addr]): + for subnet, nodes in self.node_list.items(): + # get first node of each subnet + n = nodes[0] + # connect to it... + await self.host.connect(n) + + # ...and send (async) + stream = await self.host.new_stream(n.peer_id, [PROTOCOL_ID]) + nursery.start_soon(self.write_data, stream, subnet) + + async def write_data(self, stream: INetStream, index: int) -> None: + """ + Send data to peer (async) + The index is the subnet number + """ + + blob_id = self.blob_id + blob_data = self.data[index] + + message = proto.new_dispersal_req_msg(blob_id, blob_data) + await stream.write(message) diff --git a/da/network/network.py b/da/network/network.py new file mode 100644 index 0000000..66f6d84 --- /dev/null +++ b/da/network/network.py @@ -0,0 +1,35 @@ +import trio +from constants import DEBUG, NODE_PORT_BASE +from node import DANode + + +class DANetwork: + """ + Lightweight wrapper around a network of DA nodes. + Really just creates the network for now + """ + + num_nodes: int + nodes: [] + + def __init__(self, nodes): + self.num_nodes = nodes + self.nodes = [] + + async def build(self, nursery, shutdown, disperse_send): + port_idx = NODE_PORT_BASE + for _ in range(self.num_nodes): + port_idx += 1 + nursery.start_soon( + DANode.new, + port_idx, + self.nodes, + nursery, + shutdown, + disperse_send.clone(), + ) + if DEBUG: + print("net built") + + def get_nodes(self): + return self.nodes diff --git a/da/network/node.py b/da/network/node.py new file mode 100644 index 0000000..2170e37 --- /dev/null +++ b/da/network/node.py @@ -0,0 +1,129 @@ +import sys +from hashlib import sha256 +from random import randint + +import dispersal.proto as proto +import multiaddr +import trio +from blspy import BasicSchemeMPL, G1Element, PrivateKey +from constants import * +from libp2p import host, new_host +from libp2p.network.stream.exceptions import StreamReset +from libp2p.network.stream.net_stream_interface import INetStream +from libp2p.peer.peerinfo import info_from_p2p_addr + + +class DANode: + """ + A class handling Data Availability (DA). + + Runs on a hardcoded port. + Starts a libp2p node. + Listens on a handler for receiving data. + Resends all data it receives to all peers it is connected to + (therefore assumes connection logic is established elsewhere) + + """ + + listen_addr: multiaddr.Multiaddr + libp2phost: host + port: int + node_list: [] + # list of packet hashes it "stores" + hashes: set() + + @classmethod + async def new(cls, port, node_list, nursery, shutdown, disperse_send): + self = cls() + self.listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") + self.libp2phost = new_host() + self.port = port + self.node_list = node_list + self.hashes = set() + nursery.start_soon(self.__run, nursery, shutdown, disperse_send) + if DEBUG: + print("DA node at port {} initialized".format(port)) + + def get_id(self): + return self.libp2phost.get_id() + + def net_iface(self): + return self.libp2phost + + def get_port(self): + return self.port + + async def __run(self, nursery, shutdown, disperse_send): + """ + Run the node. Starts libp2p host, and listener for data + """ + async with self.libp2phost.run(listen_addrs=[self.listen_addr]): + print("started node at {}...".format(self.listen_addr)) + + # handler to run when data is received + async def stream_handler(stream: INetStream) -> None: + nursery.start_soon( + self.read_data, stream, nursery, shutdown, disperse_send + ) + + # set the above handler + self.libp2phost.set_stream_handler(PROTOCOL_ID, stream_handler) + # at this point the node is "initialized" - signal it's "ready" + self.node_list.append(self) + # run until we shutdown + await shutdown.wait() + + async def read_data( + self, stream: INetStream, nursery, shutdown, disperse_send + ) -> None: + """ + We need to wait for incoming data, but also we want to shutdown + when the test is finished. + The following code makes sure that both events are listened to + and the first which occurs is handled. + """ + + first_event = None + + async def select_event(async_fn, cancel_scope): + nonlocal first_event + first_event = await async_fn() + cancel_scope.cancel() + disperse_send.close() + + async def read_stream(): + while True: + read_bytes = await stream.read(MAX_READ_LEN) + if read_bytes is not None: + message = proto.unpack_from_bytes(read_bytes) + hashstr = sha256(message.dispersal_req.blob.data).hexdigest() + if hashstr not in self.hashes: + # "store" the received packet + self.hashes.add(hashstr) + # now disperse this hash to all peers + nursery.start_soon(self.disperse, read_bytes, disperse_send) + if DEBUG: + print( + "{} stored {}".format( + self.libp2phost.get_id().pretty(), hashstr + ) + ) + await disperse_send.send(-1) + else: + print("read_bytes is None, unexpected!") + + nursery.start_soon(select_event, read_stream, nursery.cancel_scope) + nursery.start_soon(select_event, shutdown.wait, nursery.cancel_scope) + + async def disperse(self, packet, disperse_send) -> None: + # disperse the given packet to all peers + for p_id in self.libp2phost.get_peerstore().peer_ids(): + if p_id == self.libp2phost.get_id(): + continue + await disperse_send.send(1) + stream = await self.libp2phost.new_stream(p_id, [PROTOCOL_ID]) + + await stream.write(packet) + + async def has_hash(self, hashstr: str): + return hashstr in self.hashes diff --git a/da/network/poc.py b/da/network/poc.py new file mode 100644 index 0000000..a53790e --- /dev/null +++ b/da/network/poc.py @@ -0,0 +1,271 @@ +import argparse +import sys +import time +from random import randint + +import multiaddr +import trio +from constants import * +from executor import Executor +from libp2p.peer.peerinfo import info_from_p2p_addr +from network import DANetwork +from subnet import calculate_subnets + +""" + Entry point for the poc. + Handles cli arguments, initiates the network + and waits for it to complete. + + Also does some simple completion check. +""" + + +async def run_network(params): + """ + Create the network. + Run the run_subnets + """ + + num_nodes = int(params.nodes) + net = DANetwork(num_nodes) + shutdown = trio.Event() + disperse_send, disperse_recv = trio.open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(net.build, nursery, shutdown, disperse_send) + nursery.start_soon( + run_subnets, net, params, nursery, shutdown, disperse_send, disperse_recv + ) + + +async def run_subnets(net, params, nursery, shutdown, disperse_send, disperse_recv): + """ + Run the actual PoC logic. + Calculate the subnets. + -> Establish connections based on the subnets <- + Runs the executor. + Runs simulated sampling. + Runs simple completion check + """ + + num_nodes = int(params.nodes) + num_subnets = int(params.subnets) + data_size = int(params.data_size) + sample_threshold = int(params.sample_threshold) + fault_rate = int(params.fault_rate) + replication_factor = int(params.replication_factor) + + while len(net.get_nodes()) != num_nodes: + print("nodes not ready yet") + await trio.sleep(0.1) + + print("Nodes ready") + nodes = net.get_nodes() + subnets = calculate_subnets(nodes, num_subnets, replication_factor) + await print_subnet_info(subnets) + + print("Establishing connections...") + node_list = {} + all_node_instances = set() + await establish_connections(subnets, node_list, all_node_instances, fault_rate) + + print("Starting executor...") + exe = Executor.new(EXECUTOR_PORT, node_list, num_subnets, data_size) + + print("Start dispersal and wait to complete...") + print("depending on network and subnet size this may take a while...") + global TIMESTAMP + TIMESTAMP = time.time() + async with trio.open_nursery() as subnursery: + subnursery.start_soon(wait_disperse_finished, disperse_recv, num_subnets) + subnursery.start_soon(exe.disperse, nursery) + subnursery.start_soon(disperse_watcher, disperse_send.clone()) + + print() + print() + + print("OK. Start sampling...") + checked = [] + for _ in range(sample_threshold): + nursery.start_soon(sample_node, exe, subnets, checked) + + print("Waiting for sampling to finish...") + await check_complete(checked, sample_threshold) + + print_connections(all_node_instances) + + print("Test completed") + shutdown.set() + + +TIMESTAMP = time.time() + + +def print_connections(node_list): + for n in node_list: + for p in n.net_iface().get_peerstore().peer_ids(): + if p == n.net_iface().get_id(): + continue + print("node {} is connected to {}".format(n.get_id(), p)) + print() + + +async def disperse_watcher(disperse_send): + while time.time() - TIMESTAMP < 5: + await trio.sleep(1) + + await disperse_send.send(9999) + print("canceled") + + +async def wait_disperse_finished(disperse_recv, num_subnets): + # run until there are no changes detected + async for value in disperse_recv: + if value == 9999: + print("dispersal finished") + return + + print(".", end="") + + global TIMESTAMP + TIMESTAMP = time.time() + + +async def print_subnet_info(subnets): + """ + Print which node is in what subnet + """ + + print() + print("By subnets: ") + for subnet in subnets: + print("subnet: {} - ".format(subnet), end="") + for n in subnets[subnet]: + print(n.get_id().pretty()[:16], end=", ") + print() + + print() + print() + + +async def establish_connections(subnets, node_list, all_node_instances, fault_rate=0): + """ + Each node in a subnet connects to the other ones in that subnet. + """ + for subnet in subnets: + # n is a DANode + for n in subnets[subnet]: + # while nodes connect to each other, they are **mutually** added + # to their peer lists. Hence, we don't need to establish connections + # again to peers we are already connected. + # So in each iteration we get the peer list for the current node + # to later check if we are already connected with the next peer + this_nodes_peers = n.net_iface().get_peerstore().peer_ids() + all_node_instances.add(n) + faults = [] + for i in range(fault_rate): + faults.append(randint(0, len(subnets[subnet]))) + for i, nn in enumerate(subnets[subnet]): + # don't connect to self + if nn.get_id() == n.get_id(): + continue + if i in faults: + continue + remote_id = nn.get_id().pretty() + remote_port = nn.get_port() + # this script only works on localhost! + addr = "/ip4/127.0.0.1/tcp/{}/p2p/{}/".format(remote_port, remote_id) + remote_addr = multiaddr.Multiaddr(addr) + remote = info_from_p2p_addr(remote_addr) + if subnet not in node_list: + node_list[subnet] = [] + node_list[subnet].append(remote) + # check if we are already connected with this peer. If yes, skip connecting + if nn.get_id() in this_nodes_peers: + continue + if DEBUG: + print("{} connecting to {}...".format(n.get_id(), addr)) + await n.net_iface().connect(remote) + + print() + + +async def check_complete(checked, sample_threshold): + """ + Simple completion check: + Check how many nodes have already been "sampled" + """ + + while len(checked) < sample_threshold: + await trio.sleep(0.5) + print("check_complete exiting") + return + + +async def sample_node(exe, subnets, checked): + """ + Pick a random subnet. + Pick a random node in that subnet. + As the executor has a list of hashes per subnet, + we can ask that node if it has that hash. + """ + + # s: subnet + s = randint(0, len(subnets) - 1) + # n: node (index) + n = randint(0, len(subnets[s]) - 1) + # actual node + node = subnets[s][n] + # pick the hash to check + hashstr = exe.get_hash(s) + # run the "sampling" + has = await node.has_hash(hashstr) + if has: + print("node {} has hash {}".format(node.get_id().pretty(), hashstr)) + else: + print("node {} does NOT HAVE hash {}".format(node.get_id().pretty(), hashstr)) + print("TEST FAILED") + # signal we "sampled" another node + checked.append(1) + return + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-s", "--subnets", help="Number of subnets [default: 256]") + parser.add_argument("-n", "--nodes", help="Number of nodes [default: 32]") + parser.add_argument( + "-t", + "--sample-threshold", + help="Threshold for sampling request attempts [default: 12]", + ) + parser.add_argument("-d", "--data-size", help="Size of packages [default: 1024]") + parser.add_argument("-f", "--fault_rate", help="Fault rate [default: 0]") + parser.add_argument( + "-r", "--replication_factor", help="Replication factor [default: 4]" + ) + args = parser.parse_args() + + if not args.subnets: + args.subnets = DEFAULT_SUBNETS + if not args.nodes: + args.nodes = DEFAULT_NODES + if not args.sample_threshold: + args.sample_threshold = DEFAULT_SAMPLE_THRESHOLD + if not args.data_size: + args.data_size = DEFAULT_DATA_SIZE + if not args.replication_factor: + args.replication_factor = DEFAULT_REPLICATION_FACTOR + if not args.fault_rate: + args.fault_rate = 0 + + print("Number of subnets will be: {}".format(args.subnets)) + print("Number of nodes will be: {}".format(args.nodes)) + print("Size of data package will be: {}".format(args.data_size)) + print("Threshold for sampling attempts will be: {}".format(args.sample_threshold)) + print("Fault rate will be: {}".format(args.fault_rate)) + + print() + print("*******************") + print("Starting network...") + + trio.run(run_network, args) diff --git a/da/network/readme.md b/da/network/readme.md new file mode 100644 index 0000000..83831be --- /dev/null +++ b/da/network/readme.md @@ -0,0 +1,65 @@ +# Data Availability Subnets Proof-Of-Concept + +## Contents +This folder contains code as implementation for a Proof-Of-Concept (PoC) for the subnets designed +to address dispersal and sampling in Data Availability (DA) in Nomos. + +Refer to the [Specification](https://www.notion.so/Runnable-DA-PoC-Specification-50f204f2ff0a41d09de4926962bbb4ef?d=9e9677e5536a46d49fe95f366b7c3320#308624c50f1a42769b6c142976999483) +for the details of the design of this PoC. + + +Being a PoC, this code has no pretentions in terms of quality, and is certainly not meant to reach anywhere near production status. + +## How to run + +The entry point is `poc.py` , which can be run with a python3 binary. + +It can be parametrized with the following options: + +`python poc.py -s 512 -n 64 -t 12 -d 2048` + +To understand what these parameter mean, just look at the help output: + +```sh +> python poc.py -h +usage: poc.py [-h] [-s SUBNETS] [-n NODES] [-t SAMPLE_THRESHOLD] [-d DATA_SIZE] + +options: + -h, --help show this help message and exit + -s SUBNETS, --subnets SUBNETS + Number of subnets [default: 256] + -n NODES, --nodes NODES + Number of nodes [default: 32] + -t SAMPLE_THRESHOLD, --sample-threshold SAMPLE_THRESHOLD + Threshold for sampling request attempts [default: 12] + -d DATA_SIZE, --data-size DATA_SIZE + Size of packages [default: 1024] +``` + + +## What it does +The PoC first creates an instance of a light-weight `DANetwork`, which in turn +starts the configured number of nodes. + +[!NOTE] +Currently ports are hardcoded. Nodes start at 7561 and are instantiated sequentially from there. +The Executor simulator runs on 8766. + +After nodes are up, the subnets are calculated. Subnets calculation is explicitly **not part of the PoC**. +Therefore, the PoC uses a simple strategy of filling all subnets sequentially, and if not enough nodes are requested, +just fills up nodes up to a `REPLICATION_FACTOR` per subnet (thus, each subnet has at least `REPLICATION_FACTOR` nodes). + +After nodes are assigned to subnets, the network connections (via direct libp2p links) are established. +Each node in a subnet connects with every other node in that subnet. + +Next, the executor is started. It is just a simulator. It creates random data for each subnet of `DATA_SIZE` length, +simulating the columns generated by the NomosDA protocol. + +It then establishes one connection per subnet and sends one packet of `DATA_SIZE` length on each of these connections. +The executor also stores a hash of each packet per subnet. + +Receiving nodes then forward this package to each of their peers in the subnet. +They also store the respective hash (only). + +Finally a simulated check samples up to `SAMPLE_THRESHOLD` nodes. +For each subnet it simply picks a node randomly and asks if it has the hash. diff --git a/da/network/subnet.py b/da/network/subnet.py new file mode 100644 index 0000000..77d5110 --- /dev/null +++ b/da/network/subnet.py @@ -0,0 +1,65 @@ +from random import randint + +from constants import * + + +def calculate_subnets(node_list, num_subnets, replication_factor): + """ + Calculate in which subnet(s) to place each node. + This PoC does NOT require this to be analyzed, + nor to find the best solution. + + Hence, we just use a simple model here: + + 1. Iterate all nodes and place each node in the subsequent subnet + 2. If the subnet list can not be filled, start again from the top of the list + 3. If each subnet does NOT have at least up to REPLICATION_FACTOR nodes, then + fill up the list with nodes up to the factor. + + NOTE: This might be incomplete and/or buggy, but should be sufficient for + the purpose of the PoC. + + If however, you find a bug, please report. + + """ + # key of dict is the subnet number + subnets = {} + for i, n in enumerate(node_list): + idx = i % num_subnets + + # each key has an array, so multiple nodes can be filter + # into a subnet + if idx not in subnets: + subnets[idx] = [] + subnets[idx].append(n) + + listlen = len(node_list) + i = listlen + # if there are less nodes than subnets + while i < num_subnets: + subnets[i] = [] + subnets[i].append(node_list[i % listlen]) + i += 1 + + # if not each subnet has at least factor number of nodes, fill up + if listlen < replication_factor * num_subnets: + for subnet in subnets: + last = subnets[subnet][len(subnets[subnet]) - 1].get_id() + idx = -1 + # what is the last filled index of a subnet row + for j, n in enumerate(node_list): + if n.get_id() == last: + idx = j + 1 + # fill up until factor + while len(subnets[subnet]) < replication_factor: + # wrap index if at end + if idx > len(node_list) - 1: + idx = 0 + # don't add same node multiple times + if node_list[idx] in subnets[subnet]: + idx += 1 + continue + subnets[subnet].append(node_list[idx]) + idx += 1 + + return subnets diff --git a/da/test_common.py b/da/test_common.py new file mode 100644 index 0000000..9982270 --- /dev/null +++ b/da/test_common.py @@ -0,0 +1,20 @@ +from unittest import TestCase + +from da.common import ChunksMatrix + + +class TestCommon(TestCase): + + def test_chunks_matrix_columns(self): + matrix = ChunksMatrix([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + expected = [[1, 4, 7], [2, 5, 8], [3, 6, 9]] + for c1, c2 in zip(expected, matrix.columns): + self.assertEqual(c1, c2) + + def test_chunks_matrix_transposed(self): + matrix = ChunksMatrix([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + expected = ChunksMatrix([[1, 4, 7], [2, 5, 8], [3, 6, 9]]) + self.assertEqual(matrix.transposed(), expected) + matrix = ChunksMatrix([[1, 2, 3], [4, 5, 6]]) + expected = ChunksMatrix([[1, 4], [2, 5], [3, 6]]) + self.assertEqual(matrix.transposed(), expected) diff --git a/da/test_dispersal.py b/da/test_dispersal.py new file mode 100644 index 0000000..a4d59b6 --- /dev/null +++ b/da/test_dispersal.py @@ -0,0 +1,74 @@ +from hashlib import sha3_256 +from unittest import TestCase + +from da.encoder import DAEncoderParams, DAEncoder +from da.test_encoder import TestEncoder +from da.verifier import DAVerifier, DABlob +from da.common import NodeId, Attestation, Bitfield, NomosDaG2ProofOfPossession as bls_pop +from da.dispersal import Dispersal, EncodedData, DispersalSettings + + +class TestDispersal(TestCase): + def setUp(self): + self.n_nodes = 16 + self.nodes_ids = [NodeId(x.to_bytes(length=32, byteorder='big')) for x in range(self.n_nodes)] + self.secret_keys = list(range(1, self.n_nodes+1)) + self.public_keys = [bls_pop.SkToPk(sk) for sk in self.secret_keys] + # sort by pk as we do in dispersal + self.secret_keys, self.public_keys = zip( + *sorted(zip(self.secret_keys, self.public_keys), key=lambda x: x[1]) + ) + dispersal_settings = DispersalSettings( + self.nodes_ids, + self.public_keys, + self.n_nodes // 2 + 1 + ) + self.dispersal = Dispersal(dispersal_settings) + self.encoder_test = TestEncoder() + self.encoder_test.setUp() + + def test_build_certificate_insufficient_attestations(self): + with self.assertRaises(AssertionError): + self.dispersal._build_certificate(None, [], []) + + def test_build_certificate_enough_attestations(self): + mock_encoded_data = EncodedData( + None, None, None, [], [], [], bytes(b"f"*48), [] + ) + mock_message = sha3_256(mock_encoded_data.aggregated_column_commitment).digest() + mock_attestations = [Attestation(bls_pop.Sign(sk, mock_message)) for sk in self.secret_keys] + certificate = self.dispersal._build_certificate( + mock_encoded_data, + mock_attestations, + Bitfield([True for _ in range(len(self.secret_keys))]) + ) + self.assertIsNotNone(certificate) + self.assertEqual(certificate.aggregated_column_commitment, mock_encoded_data.aggregated_column_commitment) + self.assertEqual(certificate.row_commitments, []) + self.assertIsNotNone(certificate.aggregated_signatures) + self.assertTrue( + certificate.verify(self.public_keys) + ) + + def test_disperse(self): + data = self.encoder_test.data + encoding_params = DAEncoderParams(column_count=self.n_nodes // 2, bytes_per_chunk=31) + encoded_data = DAEncoder(encoding_params).encode(data) + + # mock send and await method with local verifiers + def __send_and_await_response(node: NodeId, blob: DABlob): + sk = self.secret_keys[int.from_bytes(node)] + verifier = DAVerifier(sk, self.public_keys) + return verifier.verify(blob) + # inject mock send and await method + self.dispersal._send_and_await_response = __send_and_await_response + + certificate = self.dispersal.disperse(encoded_data) + self.assertIsNotNone(certificate) + self.assertTrue(certificate.verify(self.public_keys) + ) + self.assertEqual( + certificate.signers, + [True if i < self.dispersal.settings.threshold else False for i in range(self.n_nodes)] + ) + diff --git a/da/test_encoder.py b/da/test_encoder.py new file mode 100644 index 0000000..5c7be12 --- /dev/null +++ b/da/test_encoder.py @@ -0,0 +1,137 @@ +from itertools import chain, batched +from random import randrange, randbytes +from unittest import TestCase + +from eth2spec.deneb.mainnet import bytes_to_bls_field + +from da import encoder +from da.encoder import DAEncoderParams, DAEncoder +from eth2spec.eip7594.mainnet import BYTES_PER_FIELD_ELEMENT, BLSFieldElement + +from da.kzg_rs.common import BLS_MODULUS, ROOTS_OF_UNITY +from da.kzg_rs import kzg, rs + + +class TestEncoder(TestCase): + def setUp(self): + self.params: DAEncoderParams = DAEncoderParams(column_count=16, bytes_per_chunk=31) + self.encoder: DAEncoder = DAEncoder(self.params) + self.elements = 32 + self.data = bytearray( + chain.from_iterable( + randbytes(self.params.bytes_per_chunk) + for _ in range(self.elements) + ) + ) + + def assert_encoding(self, encoder_params: DAEncoderParams, data: bytes): + encoded_data = encoder.DAEncoder(encoder_params).encode(data) + self.assertEqual(encoded_data.data, data) + extended_factor = 2 + column_count = encoder_params.column_count*extended_factor + columns_len = len(list(encoded_data.extended_matrix.columns)) + self.assertEqual(columns_len, column_count) + chunks_size = (len(data) // encoder_params.bytes_per_chunk) // encoder_params.column_count + self.assertEqual(len(encoded_data.row_commitments), chunks_size) + self.assertEqual(len(encoded_data.row_proofs), chunks_size) + self.assertEqual(len(encoded_data.row_proofs[0]), column_count) + self.assertIsNotNone(encoded_data.aggregated_column_commitment) + self.assertEqual(len(encoded_data.aggregated_column_proofs), columns_len) + + # verify rows + for row, proofs, commitment in zip(encoded_data.extended_matrix, encoded_data.row_proofs, encoded_data.row_commitments): + for i, (chunk, proof) in enumerate(zip(row, proofs)): + self.assertTrue( + kzg.verify_element_proof(bytes_to_bls_field(chunk), commitment, proof, i, ROOTS_OF_UNITY) + ) + + # verify column aggregation + for i, (column, proof) in enumerate(zip(encoded_data.extended_matrix.columns, encoded_data.aggregated_column_proofs)): + data = DAEncoder.hash_column_and_commitment(column, commitment) + kzg.verify_element_proof( + bytes_to_bls_field(data), + encoded_data.aggregated_column_commitment, + proof, + i, + ROOTS_OF_UNITY + ) + + def test_chunkify(self): + encoder_settings = DAEncoderParams(column_count=2, bytes_per_chunk=31) + elements = 10 + data = bytes(chain.from_iterable(int.to_bytes(0, length=31, byteorder='big') for _ in range(elements))) + _encoder = encoder.DAEncoder(encoder_settings) + chunks_matrix = _encoder._chunkify_data(data) + self.assertEqual(len(chunks_matrix), elements//encoder_settings.column_count) + for row in chunks_matrix: + self.assertEqual(len(row), encoder_settings.column_count) + self.assertEqual(len(row[0]), 32) + + def test_compute_row_kzg_commitments(self): + chunks_matrix = self.encoder._chunkify_data(self.data) + polynomials, commitments = zip(*self.encoder._compute_row_kzg_commitments(chunks_matrix)) + self.assertEqual(len(commitments), len(chunks_matrix)) + self.assertEqual(len(polynomials), len(chunks_matrix)) + + def test_rs_encode_rows(self): + chunks_matrix = self.encoder._chunkify_data(self.data) + extended_chunks_matrix = self.encoder._rs_encode_rows(chunks_matrix) + for r1, r2 in zip(chunks_matrix, extended_chunks_matrix): + self.assertEqual(len(r1), len(r2)//2) + r2 = [BLSFieldElement.from_bytes(x) for x in r2] + poly_1 = kzg.bytes_to_polynomial(r1.as_bytes()) + # we check against decoding so we now the encoding was properly done + poly_2 = rs.decode(r2, ROOTS_OF_UNITY, len(poly_1)) + self.assertEqual(poly_1, poly_2) + + def test_compute_rows_proofs(self): + chunks_matrix = self.encoder._chunkify_data(self.data) + polynomials, commitments = zip(*self.encoder._compute_row_kzg_commitments(chunks_matrix)) + extended_chunks_matrix = self.encoder._rs_encode_rows(chunks_matrix) + original_proofs = self.encoder._compute_rows_proofs(chunks_matrix, polynomials, commitments) + extended_proofs = self.encoder._compute_rows_proofs(extended_chunks_matrix, polynomials, commitments) + # check original sized matrix + for row, poly, commitment, proofs in zip(chunks_matrix, polynomials, commitments, original_proofs): + self.assertEqual(len(proofs), len(row)) + for i, chunk in enumerate(row): + self.assertTrue(kzg.verify_element_proof(BLSFieldElement.from_bytes(chunk), commitment, proofs[i], i, ROOTS_OF_UNITY)) + # check extended matrix + for row, poly, commitment, proofs in zip(extended_chunks_matrix, polynomials, commitments, extended_proofs): + for i, chunk in enumerate(row): + self.assertTrue(kzg.verify_element_proof(BLSFieldElement.from_bytes(chunk), commitment, proofs[i], i, ROOTS_OF_UNITY)) + + def test_compute_column_kzg_commitments(self): + chunks_matrix = self.encoder._chunkify_data(self.data) + polynomials, commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix)) + self.assertEqual(len(commitments), len(chunks_matrix[0])) + self.assertEqual(len(polynomials), len(chunks_matrix[0])) + + def test_generate_aggregated_column_commitments(self): + chunks_matrix = self.encoder._chunkify_data(self.data) + _, column_commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix)) + poly, commitment = self.encoder._compute_aggregated_column_commitment(chunks_matrix, column_commitments) + self.assertIsNotNone(poly) + self.assertIsNotNone(commitment) + + def test_generate_aggregated_column_proofs(self): + chunks_matrix = self.encoder._chunkify_data(self.data) + _, column_commitments = zip(*self.encoder._compute_column_kzg_commitments(chunks_matrix)) + poly, _ = self.encoder._compute_aggregated_column_commitment(chunks_matrix, column_commitments) + proofs = self.encoder._compute_aggregated_column_proofs(poly, column_commitments) + self.assertEqual(len(proofs), len(column_commitments)) + + def test_encode(self): + from random import randbytes + sizes = [pow(2, exp) for exp in range(4, 8, 2)] + encoder_params = DAEncoderParams( + column_count=8, + bytes_per_chunk=31 + ) + for size in sizes: + data = bytes( + chain.from_iterable( + randbytes(encoder_params.bytes_per_chunk) + for _ in range(size*encoder_params.column_count) + ) + ) + self.assert_encoding(encoder_params, data) diff --git a/da/test_full_flow.py b/da/test_full_flow.py new file mode 100644 index 0000000..ad63adf --- /dev/null +++ b/da/test_full_flow.py @@ -0,0 +1,134 @@ +from itertools import chain +from unittest import TestCase +from typing import List, Optional + +from da.common import NodeId, build_attestation_message, BLSPublicKey, NomosDaG2ProofOfPossession as bls_pop +from da.api.common import DAApi, VID, Metadata +from da.verifier import DAVerifier, DABlob +from da.api.test_flow import MockStore +from da.dispersal import Dispersal, DispersalSettings +from da.test_encoder import TestEncoder +from da.encoder import DAEncoderParams, DAEncoder + + +class DAVerifierWApi: + def __init__(self, sk: int, public_keys: List[BLSPublicKey]): + self.store = MockStore() + self.api = DAApi(self.store) + self.verifier = DAVerifier(sk, public_keys) + + def receive_blob(self, blob: DABlob): + if attestation := self.verifier.verify(blob): + # Warning: If aggregated col commitment and row commitment are the same, + # the build_attestation_message method will produce the same output. + cert_id = build_attestation_message(blob.aggregated_column_commitment, blob.rows_commitments) + self.store.populate(blob, cert_id) + return attestation + + def receive_cert(self, vid: VID): + # Usually the certificate would be verifier here, + # but we are assuming that this it is already coming from the verified block, + # in which case all certificates had been already verified by the DA Node. + self.api.write(vid.cert_id, vid.metadata) + + def read(self, app_id, indexes) -> List[Optional[DABlob]]: + return self.api.read(app_id, indexes) + + +class TestFullFlow(TestCase): + def setUp(self): + self.n_nodes = 16 + self.nodes_ids = [NodeId(x.to_bytes(length=32, byteorder='big')) for x in range(self.n_nodes)] + self.secret_keys = list(range(1, self.n_nodes+1)) + self.public_keys = [bls_pop.SkToPk(sk) for sk in self.secret_keys] + # sort by pk as we do in dispersal + self.secret_keys, self.public_keys = zip( + *sorted(zip(self.secret_keys, self.public_keys), key=lambda x: x[1]) + ) + dispersal_settings = DispersalSettings( + self.nodes_ids, + self.public_keys, + self.n_nodes + ) + self.dispersal = Dispersal(dispersal_settings) + self.encoder_test = TestEncoder() + self.encoder_test.setUp() + + self.api_nodes = [DAVerifierWApi(k, self.public_keys) for k in self.secret_keys] + + def test_full_flow(self): + app_id = int.to_bytes(1) + index = 1 + + # encoder + data = self.encoder_test.data + encoding_params = DAEncoderParams(column_count=self.n_nodes // 2, bytes_per_chunk=31) + encoded_data = DAEncoder(encoding_params).encode(data) + + # mock send and await method with local verifiers + def __send_and_await_response(node: int, blob: DABlob): + node = self.api_nodes[int.from_bytes(node)] + return node.receive_blob(blob) + + # inject mock send and await method + self.dispersal._send_and_await_response = __send_and_await_response + certificate = self.dispersal.disperse(encoded_data) + + vid = VID( + certificate.id(), + Metadata(app_id, index) + ) + + # verifier + for node in self.api_nodes: + node.receive_cert(vid) + + # read from api and confirm its working + # notice that we need to sort the api_nodes by their public key to have the blobs sorted in the same fashion + # we do actually do dispersal. + blobs = list(chain.from_iterable( + node.read(app_id, [index]) + for node in sorted(self.api_nodes, key=lambda n: bls_pop.SkToPk(n.verifier.sk)) + )) + original_blobs = list(self.dispersal._prepare_data(encoded_data)) + self.assertEqual(blobs, original_blobs) + + def test_same_blob_multiple_indexes(self): + app_id = int.to_bytes(1) + indexes = [1, 2, 3] # Different indexes to test with the same blob + + # encoder + data = self.encoder_test.data + encoding_params = DAEncoderParams(column_count=self.n_nodes // 2, bytes_per_chunk=31) + encoded_data = DAEncoder(encoding_params).encode(data) + + # mock send and await method with local verifiers + def __send_and_await_response(node: int, blob: DABlob): + node = self.api_nodes[int.from_bytes(node)] + return node.receive_blob(blob) + + # inject mock send and await method + self.dispersal._send_and_await_response = __send_and_await_response + certificate = self.dispersal.disperse(encoded_data) + + # Loop through each index and simulate dispersal with the same cert_id but different metadata + for index in indexes: + vid = VID( + certificate.id(), + Metadata(app_id, index) + ) + + # verifier + for node in self.api_nodes: + node.receive_cert(vid) + + # Verify retrieval for each index + for index in indexes: + # Notice that we need to sort the api_nodes by their public key to have the blobs sorted in the same fashion + # as we do actually do dispersal. + blobs = list(chain.from_iterable( + node.read(app_id, [index]) + for node in sorted(self.api_nodes, key=lambda n: bls_pop.SkToPk(n.verifier.sk)) + )) + original_blobs = list(self.dispersal._prepare_data(encoded_data)) + self.assertEqual(blobs, original_blobs, f"Failed at index {index}") diff --git a/da/test_verifier.py b/da/test_verifier.py new file mode 100644 index 0000000..8d9e763 --- /dev/null +++ b/da/test_verifier.py @@ -0,0 +1,71 @@ +from unittest import TestCase + +from da.common import Column, NomosDaG2ProofOfPossession as bls_pop +from da.encoder import DAEncoder +from da.kzg_rs import kzg +from da.kzg_rs.common import GLOBAL_PARAMETERS, ROOTS_OF_UNITY +from da.test_encoder import TestEncoder +from da.verifier import Attestation, DAVerifier, DABlob + + +class TestVerifier(TestCase): + + def setUp(self): + self.verifier = DAVerifier(1987, [bls_pop.SkToPk(1987)]) + + def test_verify_column(self): + column = Column(int.to_bytes(i, length=32) for i in range(8)) + _, column_commitment = kzg.bytes_to_commitment(column.as_bytes(), GLOBAL_PARAMETERS) + aggregated_poly, aggregated_column_commitment = kzg.bytes_to_commitment( + DAEncoder.hash_column_and_commitment(column, column_commitment), GLOBAL_PARAMETERS + ) + aggregated_proof = kzg.generate_element_proof(0, aggregated_poly, GLOBAL_PARAMETERS, ROOTS_OF_UNITY) + self.assertTrue( + self.verifier._verify_column( + column, column_commitment, aggregated_column_commitment, aggregated_proof, 0 + ) + ) + + def test_verify(self): + _ = TestEncoder() + _.setUp() + encoded_data = _.encoder.encode(_.data) + verifiers_sk = [i for i in range(1000, 1000+len(encoded_data.chunked_data[0]))] + vefiers_pk = [bls_pop.SkToPk(k) for k in verifiers_sk] + for i, column in enumerate(encoded_data.chunked_data.columns): + verifier = DAVerifier(verifiers_sk[i], vefiers_pk) + da_blob = DABlob( + Column(column), + encoded_data.column_commitments[i], + encoded_data.aggregated_column_commitment, + encoded_data.aggregated_column_proofs[i], + encoded_data.row_commitments, + [row[i] for row in encoded_data.row_proofs], + ) + self.assertIsNotNone(verifier.verify(da_blob)) + + def test_verify_duplicated_blob(self): + _ = TestEncoder() + _.setUp() + encoded_data = _.encoder.encode(_.data) + columns = enumerate(encoded_data.chunked_data.columns) + i, column = next(columns) + da_blob = DABlob( + Column(column), + encoded_data.column_commitments[i], + encoded_data.aggregated_column_commitment, + encoded_data.aggregated_column_proofs[i], + encoded_data.row_commitments, + [row[i] for row in encoded_data.row_proofs], + ) + self.assertIsNotNone(self.verifier.verify(da_blob)) + for i, column in columns: + da_blob = DABlob( + Column(column), + encoded_data.column_commitments[i], + encoded_data.aggregated_column_commitment, + encoded_data.aggregated_column_proofs[i], + encoded_data.row_commitments, + [row[i] for row in encoded_data.row_proofs], + ) + self.assertIsNone(self.verifier.verify(da_blob)) diff --git a/da/verifier.py b/da/verifier.py new file mode 100644 index 0000000..3c0cef3 --- /dev/null +++ b/da/verifier.py @@ -0,0 +1,114 @@ +from dataclasses import dataclass +from hashlib import sha3_256 +from typing import List, Optional, Sequence, Set, Dict + +from eth2spec.deneb.mainnet import BLSFieldElement +from eth2spec.eip7594.mainnet import ( + KZGCommitment as Commitment, + KZGProof as Proof, +) + +import da.common +from da.common import Column, Chunk, Attestation, BLSPrivateKey, BLSPublicKey, NomosDaG2ProofOfPossession as bls_pop +from da.encoder import DAEncoder +from da.kzg_rs import kzg +from da.kzg_rs.common import ROOTS_OF_UNITY, GLOBAL_PARAMETERS, BLS_MODULUS + + +@dataclass +class DABlob: + column: Column + column_commitment: Commitment + aggregated_column_commitment: Commitment + aggregated_column_proof: Proof + rows_commitments: List[Commitment] + rows_proofs: List[Proof] + + def id(self) -> bytes: + return da.common.build_attestation_message(self.aggregated_column_commitment, self.rows_commitments) + + def column_id(self) -> bytes: + return sha3_256(self.column.as_bytes()).digest() + + +class DAVerifier: + def __init__(self, sk: BLSPrivateKey, nodes_pks: List[BLSPublicKey]): + self.attested_blobs: Dict[bytes, (bytes, Attestation)] = dict() + self.sk = sk + self.index = nodes_pks.index(bls_pop.SkToPk(self.sk)) + + @staticmethod + def _verify_column( + column: Column, + column_commitment: Commitment, + aggregated_column_commitment: Commitment, + aggregated_column_proof: Proof, + index: int + ) -> bool: + # 1. compute commitment for column + _, computed_column_commitment = kzg.bytes_to_commitment(column.as_bytes(), GLOBAL_PARAMETERS) + # 2. If computed column commitment != column commitment, fail + if column_commitment != computed_column_commitment: + return False + # 3. compute column hash + column_hash = DAEncoder.hash_column_and_commitment(column, column_commitment) + # 4. Check proof with commitment and proof over the aggregated column commitment + chunk = BLSFieldElement.from_bytes(column_hash) + return kzg.verify_element_proof( + chunk, aggregated_column_commitment, aggregated_column_proof, index, ROOTS_OF_UNITY + ) + + @staticmethod + def _verify_chunk(chunk: Chunk, commitment: Commitment, proof: Proof, index: int) -> bool: + chunk = BLSFieldElement(int.from_bytes(bytes(chunk)) % BLS_MODULUS) + return kzg.verify_element_proof(chunk, commitment, proof, index, ROOTS_OF_UNITY) + + @staticmethod + def _verify_chunks( + chunks: Sequence[Chunk], + commitments: Sequence[Commitment], + proofs: Sequence[Proof], + index: int + ) -> bool: + if not (len(chunks) == len(commitments) == len(proofs)): + return False + for chunk, commitment, proof in zip(chunks, commitments, proofs): + if not DAVerifier._verify_chunk(chunk, commitment, proof, index): + return False + return True + + def _build_attestation(self, blob: DABlob) -> Attestation: + hasher = sha3_256() + hasher.update(bytes(blob.aggregated_column_commitment)) + for c in blob.rows_commitments: + hasher.update(bytes(c)) + message = hasher.digest() + return Attestation(signature=bls_pop.Sign(self.sk, message)) + + def verify(self, blob: DABlob) -> Optional[Attestation]: + blob_id = blob.id() + if previous_attestation := self.attested_blobs.get(blob_id): + column_id, attestation = previous_attestation + # we already attested, is cached so we return it + if column_id == blob.column_id(): + return attestation + # we already attested and they are asking us to attest the same data different column + # skip + return None + is_column_verified = DAVerifier._verify_column( + blob.column, + blob.column_commitment, + blob.aggregated_column_commitment, + blob.aggregated_column_proof, + self.index + ) + if not is_column_verified: + return + are_chunks_verified = DAVerifier._verify_chunks( + blob.column, blob.rows_commitments, blob.rows_proofs, self.index + ) + if not are_chunks_verified: + return + attestation = self._build_attestation(blob) + self.attested_blobs[blob_id] = (blob.column_id(), attestation) + return attestation