diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index d40f48ad9..45e9d10d7 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -272,7 +272,7 @@ proc getDataColumnSidecars[A, B](man: SyncManager[A, B], peer: A, logScope: peer_score = peer.getScore() peer_speed = peer.netKbps() - sync_indent = man.indent + sync_ident = man.ident direction = man.direction topics = "syncman" @@ -287,9 +287,9 @@ func groupDataColumns*[T](req: SyncRequest[T], var grouped = newSeq[DataColumnSidecars](len(blocks)) column_cursor = 0 - for column_idx, blck in blocks: + for block_idx, blck in blocks: withBlck(blck[]): - when consensusFork >= consensusFork.Deneb: + when consensusFork >= ConsensusFork.Deneb: template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments if kzgs.len == 0: continue @@ -301,7 +301,7 @@ func groupDataColumns*[T](req: SyncRequest[T], if column_cursor >= data_columns.len: return err("DataColumnSidecar: response too short") let data_column_sidecar = data_columns[column_cursor] - if data_column_sidecar.index == data_columns[column_cursor]: + if data_column_sidecar.index != ColumnIndex column_idx: return err("DataColumnSidecar: unexpected index") if kzg_commitment notin data_column_sidecar.kzg_commitments: return err("DataColumnSidecar: unexpected kzg_commitment") @@ -540,6 +540,65 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) else: Opt.none(seq[BlobSidecars]) + let shouldGetDataColumns = + if not man.shouldGetBlobs(req.slot.epoch): + false + else: + var hasColumns = false + for blck in blockData: + withBlck(blck[]): + when consensusFork >= ConsensusFork.Deneb: + if forkyBlck.message.body.blob_kzg_commitments.len > 0: + hasColumns = true + break + hasColumns + + let dataColumnData = + if shouldGetDataColumns: + let data_columns = await man.getDataColumnSidecars(peer, req) + if data_columns.isErr: + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive data columns on request", + request = req, err = data_columns.error + return + let dataColumnData = data_columns.get().asSeq() + let dataColumnSmap = getShortMap(req, dataColumnData) + debug "Received data columns on request", data_columns_count = len(dataColumnData), + data_columns_map = dataColumnSmap, request = req + + if len(dataColumnData) > 0: + let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot) + let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) + if not(checkResponse(req, uniqueSlots)): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received data columns sequence is not in requested range", + data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData), + request = req + + return + let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData) + if groupedDataColumns.isErr: + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + info "Received data columns is inconsistent", + data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error() + return + if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received data columns is invalid", + data_columns_count = len(dataColumnData), + data_columns_map = getShortMap(req, dataColumnData), + request = req, + msg = checkRes.error + return + Opt.some(groupedDataColumns.get()) + else: + Opt.none(seq[DataColumnSidecars]) + + if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and req.contains(man.getSafeSlot()): # The sync protocol does not distinguish between: @@ -564,7 +623,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) # TODO descore peers that lie maybeFinalized = lastSlot < peerFinalized - await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = + await man.queue.push(req, blockData, blobData, dataColumnData, maybeFinalized, proc() = man.workers[index].status = SyncWorkerStatus.Processing) proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} = diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index a70e49ca3..22c92f17d 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -651,6 +651,7 @@ func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 = proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], data: seq[ref ForkedSignedBeaconBlock], blobs: Opt[seq[BlobSidecars]], + data_columns: Opt[seq[DataColumnSidecars]], maybeFinalized: bool = false, processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} = logScope: @@ -678,7 +679,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], # SyncQueue reset happens. We are exiting to wake up sync-worker. return else: - let syncres = SyncResult[T](request: sr, data: data, blobs: blobs) + let syncres = SyncResult[T](request: sr, data: data, blobs: blobs, data_columns: data_columns) sq.readyQueue.push(syncres) break @@ -758,6 +759,37 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], blocks_map = getShortMap(req, item.data) req.item.updateScore(PeerScoreBadValues) break + + var counter = 0 + for blk, col in sq.das_blocks(item): + res = await sq.blockVerifier(blk[], Opt.none(BlobSidecars), col, maybeFinalized) + inc i + + if res.isOk: + goodBlock = some(blk[].slot) + else: + case res.error() + of VerifierError.MissingParent: + missingParentSlot = some(blk[].slot) + break + of VerifierError.Duplicate: + # Keep going, happens naturally + discard + of VerifierError.UnviableFork: + # Keep going so as to register other unviable blocks with the + # quarantine + if unviableBlock.isNone: + # Remember the first unviable block, so we can log it + unviableBlock = some((blk[].root, blk[].slot)) + + of VerifierError.Invalid: + hasInvalidBlock = true + + let req = item.request + notice "Received invalid sequence of blocks", request = req, + blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data) + req.item.updateScore(PeerScoreBadValues) # When errors happen while processing blocks, we retry the same request # with, hopefully, a different peer