exp: build failing, checking if failing on other machines with these changes

This commit is contained in:
Agnish Ghosh 2024-07-02 00:14:58 +05:30
parent 8ac4cc9152
commit 8e28654a24
No known key found for this signature in database
GPG Key ID: 7BDDA05D1B25E9F8
2 changed files with 97 additions and 6 deletions

View File

@ -272,7 +272,7 @@ proc getDataColumnSidecars[A, B](man: SyncManager[A, B], peer: A,
logScope: logScope:
peer_score = peer.getScore() peer_score = peer.getScore()
peer_speed = peer.netKbps() peer_speed = peer.netKbps()
sync_indent = man.indent sync_ident = man.ident
direction = man.direction direction = man.direction
topics = "syncman" topics = "syncman"
@ -287,9 +287,9 @@ func groupDataColumns*[T](req: SyncRequest[T],
var var
grouped = newSeq[DataColumnSidecars](len(blocks)) grouped = newSeq[DataColumnSidecars](len(blocks))
column_cursor = 0 column_cursor = 0
for column_idx, blck in blocks: for block_idx, blck in blocks:
withBlck(blck[]): withBlck(blck[]):
when consensusFork >= consensusFork.Deneb: when consensusFork >= ConsensusFork.Deneb:
template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments
if kzgs.len == 0: if kzgs.len == 0:
continue continue
@ -301,7 +301,7 @@ func groupDataColumns*[T](req: SyncRequest[T],
if column_cursor >= data_columns.len: if column_cursor >= data_columns.len:
return err("DataColumnSidecar: response too short") return err("DataColumnSidecar: response too short")
let data_column_sidecar = data_columns[column_cursor] let data_column_sidecar = data_columns[column_cursor]
if data_column_sidecar.index == data_columns[column_cursor]: if data_column_sidecar.index != ColumnIndex column_idx:
return err("DataColumnSidecar: unexpected index") return err("DataColumnSidecar: unexpected index")
if kzg_commitment notin data_column_sidecar.kzg_commitments: if kzg_commitment notin data_column_sidecar.kzg_commitments:
return err("DataColumnSidecar: unexpected kzg_commitment") return err("DataColumnSidecar: unexpected kzg_commitment")
@ -540,6 +540,65 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
else: else:
Opt.none(seq[BlobSidecars]) Opt.none(seq[BlobSidecars])
let shouldGetDataColumns =
if not man.shouldGetBlobs(req.slot.epoch):
false
else:
var hasColumns = false
for blck in blockData:
withBlck(blck[]):
when consensusFork >= ConsensusFork.Deneb:
if forkyBlck.message.body.blob_kzg_commitments.len > 0:
hasColumns = true
break
hasColumns
let dataColumnData =
if shouldGetDataColumns:
let data_columns = await man.getDataColumnSidecars(peer, req)
if data_columns.isErr:
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive data columns on request",
request = req, err = data_columns.error
return
let dataColumnData = data_columns.get().asSeq()
let dataColumnSmap = getShortMap(req, dataColumnData)
debug "Received data columns on request", data_columns_count = len(dataColumnData),
data_columns_map = dataColumnSmap, request = req
if len(dataColumnData) > 0:
let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot)
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received data columns sequence is not in requested range",
data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData),
request = req
return
let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData)
if groupedDataColumns.isErr:
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
info "Received data columns is inconsistent",
data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error()
return
if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received data columns is invalid",
data_columns_count = len(dataColumnData),
data_columns_map = getShortMap(req, dataColumnData),
request = req,
msg = checkRes.error
return
Opt.some(groupedDataColumns.get())
else:
Opt.none(seq[DataColumnSidecars])
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
req.contains(man.getSafeSlot()): req.contains(man.getSafeSlot()):
# The sync protocol does not distinguish between: # The sync protocol does not distinguish between:
@ -564,7 +623,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
# TODO descore peers that lie # TODO descore peers that lie
maybeFinalized = lastSlot < peerFinalized maybeFinalized = lastSlot < peerFinalized
await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = await man.queue.push(req, blockData, blobData, dataColumnData, maybeFinalized, proc() =
man.workers[index].status = SyncWorkerStatus.Processing) man.workers[index].status = SyncWorkerStatus.Processing)
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} = proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} =

View File

@ -651,6 +651,7 @@ func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock], data: seq[ref ForkedSignedBeaconBlock],
blobs: Opt[seq[BlobSidecars]], blobs: Opt[seq[BlobSidecars]],
data_columns: Opt[seq[DataColumnSidecars]],
maybeFinalized: bool = false, maybeFinalized: bool = false,
processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} = processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} =
logScope: logScope:
@ -678,7 +679,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# SyncQueue reset happens. We are exiting to wake up sync-worker. # SyncQueue reset happens. We are exiting to wake up sync-worker.
return return
else: else:
let syncres = SyncResult[T](request: sr, data: data, blobs: blobs) let syncres = SyncResult[T](request: sr, data: data, blobs: blobs, data_columns: data_columns)
sq.readyQueue.push(syncres) sq.readyQueue.push(syncres)
break break
@ -759,6 +760,37 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
req.item.updateScore(PeerScoreBadValues) req.item.updateScore(PeerScoreBadValues)
break break
var counter = 0
for blk, col in sq.das_blocks(item):
res = await sq.blockVerifier(blk[], Opt.none(BlobSidecars), col, maybeFinalized)
inc i
if res.isOk:
goodBlock = some(blk[].slot)
else:
case res.error()
of VerifierError.MissingParent:
missingParentSlot = some(blk[].slot)
break
of VerifierError.Duplicate:
# Keep going, happens naturally
discard
of VerifierError.UnviableFork:
# Keep going so as to register other unviable blocks with the
# quarantine
if unviableBlock.isNone:
# Remember the first unviable block, so we can log it
unviableBlock = some((blk[].root, blk[].slot))
of VerifierError.Invalid:
hasInvalidBlock = true
let req = item.request
notice "Received invalid sequence of blocks", request = req,
blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data)
req.item.updateScore(PeerScoreBadValues)
# When errors happen while processing blocks, we retry the same request # When errors happen while processing blocks, we retry the same request
# with, hopefully, a different peer # with, hopefully, a different peer
let retryRequest = let retryRequest =