diff --git a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim index e9d5edac5..5d1452049 100644 --- a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim +++ b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim @@ -53,6 +53,7 @@ RestJson.useDefaultSerializationFor( Checkpoint, Consolidation, ContributionAndProof, + DataColumnSidecar, DataEnclosedObject, DataMetaEnclosedObject, DataOptimisticAndFinalizedObject, diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index 23fcf3666..9b678b5ad 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -9,7 +9,7 @@ import chronicles, chronos, snappy, snappy/codec, - ../spec/datatypes/[phase0, altair, bellatrix, capella, deneb], + ../spec/datatypes/[phase0, altair, bellatrix, capella, deneb, eip7594], ../spec/[helpers, forks, network], ".."/[beacon_clock], ../networking/eth2_network, @@ -112,6 +112,26 @@ proc readChunkPayload*( else: return neterr InvalidContextBytes +proc readChunkPayload*( + conn: Connection, peer: Peer, MsgType: type (ref DataColumnSidecar)): + Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} = + var contextBytes: ForkDigest + try: + await conn.readExactly(addr contextBytes, sizeof contextBytes) + except CancelledError as exc: + raise exc + except CatchableError: + return neterr UnexpectedEOF + + if contextBytes == peer.network.forkDigests.deneb: + let res = await readChunkPayload(conn, peer, DataColumnSidecar) + if res.isOk: + return ok newClone(res.get) + else: + return err(res.error) + else: + return neterr InvalidContextBytes + {.pop.} # TODO fix p2p macro for raises p2pProtocol BeaconSync(version = 1, @@ -389,7 +409,7 @@ p2pProtocol BeaconSync(version = 1, # client call that returns `seq[ref BlobSidecar]` will # will be generated by the libp2p macro - we guarantee that seq items # are `not-nil` in the implementation - trace "got data columns range request", peer, len = blobIds.len + trace "got data columns range request", peer, len = columnIds.len if columnIds.len == 0: raise newException(InvalidInputsError, "No data columns requested") @@ -408,11 +428,11 @@ p2pProtocol BeaconSync(version = 1, if dag.db.getDataColumnSidecarSZ(blockRef.bid.root, index, bytes): let uncompressedLen = uncompressedLenFramed(bytes).valueOr: warn "Cannot read data column size, database corrupt?", - bytes = bytes.len(), blck = shortLog(blockRef), blobindex = index + bytes = bytes.len(), blck = shortLog(blockRef), columnIndex = index continue - peer.awaitQuota(blobResponseCost, "data_column_sidecars_by_root/1") - peer.network.awaitQuota(blobResponseCost, "data_column_sidecars_by_root/1") + peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1") + peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1") await response.writeBytesSZ( uncompressedLen, bytes, @@ -422,109 +442,68 @@ p2pProtocol BeaconSync(version = 1, debug "Data column root request done", peer, roots = columnIds.len, count, found - # # https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyroot-v1 - # proc dataColumnSidecarsByRoot( - # peer: Peer, - # columnIds: DataColumnIdentifierList, - # response: MultipleChunksResponse[ - # ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)]) - # {.async, libp2pProtocol("data_column_sidecars_by_root", 1).} = - - # trace "got data columns range request", peer, len = columnIds.len - # if columnIds.len == 0: - # raise newException(InvalidInputsError, "No data columns requested") - - # let - # dag = peer.networkState.dag - # count = columnIds.len - - # var - # found = 0 - # bytes: seq[byte] - - # for i in 0..= dag.head.slot.epoch: + GENESIS_EPOCH + else: + dag.head.slot.epoch - dag.cfg.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS - # trace "got data columns range request", peer, startSlot, - # count = reqCount, columns = reqColumns + if startSlot.epoch < epochBoundary: + raise newException(ResourceUnavailableError, DataColumnsOutOfRange) - # if reqCount == 0 or reqColumns.len == 0: - # raise newException(InvalidInputsError, "Empty range requested") + var blockIds: array[int(MAX_REQUEST_DATA_COLUMNS), BlockId] + let + count = int min(reqCount, blockIds.lenu64) + endIndex = count - 1 + startIndex = + dag.getBlockRange(startSlot, 1, blockIds.toOpenArray(0, endIndex)) - # let - # dag = peer.networkState.dag - # epochBoundary = - # if dag.cfg.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS >= dag.head.slot.epoch: - # GENESIS_EPOCH - # else: - # dag.head.slot.epoch - dag.cfg.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS + var + found = 0 + bytes: seq[byte] - # if startSlot.epoch < epochBoundary: - # raise newException(ResourceUnavailableError, DataColumnsOutOfRange) + for i in startIndex..endIndex: + for j in 0..= dag.cfg.BELLATRIX_FORK_EPOCH and + not dag.head.executionValid: + continue - # var blockIds: array[int(MAX_REQUEST_DATA_COLUMNS), BlockId] - # let - # count = int min(reqCount, blockIds.lenu64) - # endIndex = count - 1 - # startIndex = - # dag.getBlockRange(startSlot, 1, blockIds.toOpenArray(0, endIndex)) + let uncompressedLen = uncompressedLenFramed(bytes).valueOr: + warn "Cannot read data column sidecar size, database, corrupt", + bytes = bytes.len(), blck = shortLog(blockIds[i]) + continue - # var - # found = 0 - # bytes: seq[byte] - - # for i in startIndex..endIndex: - # for j in 0..= dag.cfg.BELLATRIX_FORK_EPOCH and - # not dag.head.executionValid: - # continue + peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1") + peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1") - # let uncompressedLen = uncompressedLenFramed(bytes).valueOr: - # warn "Cannot read data column sidecar size, database, corrupt", - # bytes = bytes.len(), blck = shortLog(blockIds[i]) - # continue + await response.writeBytesSZ( + uncompressedLen, bytes, + peer.network.forkDigestAtEpoch(blockIds[i].slot.epoch).data) + inc found + else: + break - # peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1") - # peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1") - - # await response.writeBytesSSZ( - # uncompressedLen, bytes, - # peer.network.forkDigestAtEpoch(blockIds[i].slot.epoch).data) - # inc found - # else: - # break - - # debug "DataColumnSidecar range request done", - # peer, startSlot, count = reqCount, columns = reqColumns, found + debug "DataColumnSidecar range request done", + peer, startSlot, count = reqCount, columns = reqColumns, found proc init*(T: type BeaconSync.NetworkState, dag: ChainDAGRef): T = T(