disable blob activity (exp), improve gossip validation

This commit is contained in:
Agnish Ghosh 2024-07-02 14:36:44 +05:30
parent 9e6cad4105
commit 67fe8aca0b
No known key found for this signature in database
GPG Key ID: 7BDDA05D1B25E9F8
4 changed files with 142 additions and 141 deletions

View File

@ -252,24 +252,24 @@ proc processSignedBeaconBlock*(
# propagation of seemingly good blocks
trace "Block validated"
let blobs =
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
if self.blobQuarantine[].hasBlobs(signedBlock):
Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root, signedBlock))
else:
discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
signedBlock)
return v
else:
Opt.none(BlobSidecars)
# let blobs =
# when typeof(signedBlock).kind >= ConsensusFork.Deneb:
# if self.blobQuarantine[].hasBlobs(signedBlock):
# Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root, signedBlock))
# else:
# discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
# signedBlock)
# return v
# else:
# Opt.none(BlobSidecars)
self.blockProcessor[].enqueueBlock(
src, ForkedSignedBeaconBlock.init(signedBlock),
blobs,
Opt.none(DataColumnSidecars),
maybeFinalized = maybeFinalized,
validationDur = nanoseconds(
(self.getCurrentBeaconTime() - wallTime).nanoseconds))
# self.blockProcessor[].enqueueBlock(
# src, ForkedSignedBeaconBlock.init(signedBlock),
# blobs,
# Opt.none(DataColumnSidecars),
# maybeFinalized = maybeFinalized,
# validationDur = nanoseconds(
# (self.getCurrentBeaconTime() - wallTime).nanoseconds))
let data_columns =
when typeof(signedBlock).kind >= ConsensusFork.Deneb:

View File

@ -500,27 +500,24 @@ proc validateDataColumnSidecar*(
if not (data_column_sidecar.index < NUMBER_OF_COLUMNS):
return dag.checkedReject("DataColumnSidecar: The sidecar's index should be consistent with NUMBER_OF_COLUMNS")
debugEcho "check 1"
# [REJECT] The sidecar is for the correct subnet
# -- i.e. `compute_subnet_for_data_column_sidecar(blob_sidecar.index) == subnet_id`.
# if not (compute_subnet_for_data_column_sidecar(data_column_sidecar.index) == subnet_id):
# return dag.checkedReject("DataColumnSidecar: The sidecar is not for the correct subnet")
if not (compute_subnet_for_data_column_sidecar(data_column_sidecar.index) == subnet_id):
return dag.checkedReject("DataColumnSidecar: The sidecar is not for the correct subnet")
# debugEcho "check 2"
# [IGNORE] The sidecar is not from a future slot (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance)
# -- i.e. validate that `block_header.slot <= current_slot` (a client MAY queue future sidecars for
# processing at the appropriate slot).
if not (block_header.slot <=
(wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero):
return errIgnore("DataColumnSidecar: slot too high")
debugEcho "check 3"
# [IGNORE] The sidecar is from a slot greater than the latest
# finalized slot -- i.e. validate that `block_header.slot >
# compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)`
if not (block_header.slot > dag.finalizedHead.slot):
return errIgnore("DataColumnSidecar: slot already finalized")
debugEcho "check 4"
# [IGNORE] The sidecar is the first sidecar for the tuple
# (block_header.slot, block_header.proposer_index, blob_sidecar.index)
# with valid header signature, sidecar inclusion proof, and kzg proof.
@ -530,7 +527,14 @@ proc validateDataColumnSidecar*(
if dataColumnQuarantine[].hasDataColumn(
block_header.slot, block_header.proposer_index, data_column_sidecar.index):
return errIgnore("DataColumnSidecar: already have valid data column from same proposer")
debugEcho "check 5"
# [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by
# `verify_data_column_sidecar_inclusion_proof(sidecar)`.
block:
let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar)
if v.isErr:
return dag.checkedReject(v.error)
# [IGNORE] The sidecar's block's parent (defined by
# `block_header.parent_root`) has been seen (via both gossip and
# non-gossip sources) (a client MAY queue sidecars for processing
@ -538,14 +542,38 @@ proc validateDataColumnSidecar*(
#
# [REJECT] The sidecar's block's parent (defined by
# `block_header.parent_root`) passes validation.
# let parent = dag.getBlockRef(block_header.parent_root).valueOr:
# if block_header.parent_root in quarantine[].unviable:
# quarantine[].addUnviable(block_root)
# return dag.checkedReject("DataColumnSidecar: parent not validated")
# else:
# quarantine[].addMissing(block_header.parent_root)
# return errIgnore("DataColumnSidecar: parent not found")
debugEcho "check 6"
let parent = dag.getBlockRef(block_header.parent_root).valueOr:
if block_header.parent_root in quarantine[].unviable:
quarantine[].addUnviable(block_root)
return dag.checkedReject("DataColumnSidecar: parent not validated")
else:
quarantine[].addMissing(block_header.parent_root)
return errIgnore("DataColumnSidecar: parent not found")
# [REJECT] The sidecar is from a higher slot than the sidecar's
# block's parent (defined by `block_header.parent_root`).
if not (block_header.slot > parent.bid.slot):
return dag.checkedReject("DataColumnSidecar: slot lower than parents'")
# [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's
# block -- i.e. `get_checkpoint_block(store, block_header.parent_root,
# store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`.
let
finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint)
ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot)
if ancestor.isNil:
# This shouldn't happen: we should always be able to trace the parent back
# to the finalized checkpoint (else it wouldn't be in the DAG)
return errIgnore("DataColumnSidecar: Can't find ancestor")
if not (
finalized_checkpoint.root == ancestor.root or
finalized_checkpoint.root.isZero):
quarantine[].addUnviable(block_root)
return dag.checkedReject(
"DataColumnSidecar: Finalized checkpoint not an ancestor")
# [REJECT] The sidecar is proposed by the expected `proposer_index`
# for the block's slot in the context of the current shuffling
# (defined by `block_header.parent_root`/`block_header.slot`).
@ -553,62 +581,35 @@ proc validateDataColumnSidecar*(
# shuffling, the sidecar MAY be queued for later processing while proposers
# for the block's branch are calculated -- in such a case do not
# REJECT, instead IGNORE this message.
# let proposer = getProposer(dag, parent, block_header.slot).valueOr:
# warn "cannot compute proposer for data column"
# return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue
let proposer = getProposer(dag, parent, block_header.slot).valueOr:
warn "cannot compute proposer for data column"
return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue
if uint64(proposer) != block_header.proposer_index:
return dag.checkedReject("DataColumnSidecar: Unexpected proposer")
# if uint64(proposer) != block_header.proposer_index:
# return dag.checkedReject("DataColumnSidecar: Unexpected proposer")
debugEcho "check 7"
# [REJECT] The proposer signature of `blob_sidecar.signed_block_header`,
# is valid with respect to the `block_header.proposer_index` pubkey.
# if not verify_block_signature(
# dag.forkAtEpoch(block_header.slot.epoch),
# getStateField(dag.headState, genesis_validators_root),
# block_header.slot,
# block_root,
# dag.validatorKey(proposer).get(),
# data_column_sidecar.signed_block_header.signature):
# return dag.checkedReject("DataColumnSidecar: Invalid proposer signature")
debugEcho "check 8"
# [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by
# `verify_data_column_sidecar_inclusion_proof(sidecar)`.
# block:
# let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar)
# if v.isErr:
# return dag.checkedReject(v.error)
if not verify_block_signature(
dag.forkAtEpoch(block_header.slot.epoch),
getStateField(dag.headState, genesis_validators_root),
block_header.slot,
block_root,
dag.validatorKey(proposer).get(),
data_column_sidecar.signed_block_header.signature):
return dag.checkedReject("DataColumnSidecar: Invalid proposer signature")
# # [REJECT] The sidecar's column data is valid as
# # verified by `verify_data_column_kzg_proofs(sidecar)`
# block:
# let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar)
# if r.isErr:
# return dag.checkedReject(r.error)
# [REJECT] The sidecar's column data is valid as
# verified by `verify_data_column_kzg_proofs(sidecar)`
block:
let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar)
if r.isErr:
return dag.checkedReject(r.error)
# # [REJECT] The sidecar is from a higher slot than the sidecar's
# # block's parent (defined by `block_header.parent_root`).
# if not (block_header.slot > parent.bid.slot):
# return dag.checkedReject("DataColumnSidecar: slot lower than parents'")
debugEcho "check 9"
# [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's
# block -- i.e. `get_checkpoint_block(store, block_header.parent_root,
# store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`.
# let
# finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint)
# ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot)
# Send notification about new data column sidecar via callback
if not(isNil(dataColumnQuarantine.onDataColumnSidecarCallback)):
dataColumnQuarantine.onDataColumnSidecarCallback(data_column_sidecar)
# if ancestor.isNil:
# # This shouldn't happen: we should always be able to trace the parent back
# # to the finalized checkpoint (else it wouldn't be in the DAG)
# return errIgnore("DataColumnSidecar: Can't find ancestor")
# if not (
# finalized_checkpoint.root == ancestor.root or
# finalized_checkpoint.root.isZero):
# quarantine[].addUnviable(block_root)
# return dag.checkedReject(
# "DataColumnSidecar: Finalized checkpoint not an ancestor")
debugEcho "check 10"
ok()

View File

@ -579,14 +579,14 @@ proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
rman.blockLoopFuture = rman.requestManagerBlockLoop()
rman.dataColumnLoopFuture = rman.requestManagerDataColumnLoop()
rman.blobLoopFuture = rman.requestManagerBlobLoop()
# rman.blobLoopFuture = rman.requestManagerBlobLoop()
proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
if not(isNil(rman.blockLoopFuture)):
rman.blockLoopFuture.cancelSoon()
if not(isNil(rman.blobLoopFuture)):
rman.blobLoopFuture.cancelSoon()
# if not(isNil(rman.blobLoopFuture)):
# rman.blobLoopFuture.cancelSoon()
if not(isNil(rman.dataColumnLoopFuture)):
rman.dataColumnLoopFuture.cancelSoon()

View File

@ -478,18 +478,18 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
request = req
return
let shouldGetBlobs =
if not man.shouldGetBlobs(req.slot.epoch):
false
else:
var hasBlobs = false
for blck in blockData:
withBlck(blck[]):
when consensusFork >= ConsensusFork.Deneb:
if forkyBlck.message.body.blob_kzg_commitments.len > 0:
hasBlobs = true
break
hasBlobs
# let shouldGetBlobs =
# if not man.shouldGetBlobs(req.slot.epoch):
# false
# else:
# var hasBlobs = false
# for blck in blockData:
# withBlck(blck[]):
# when consensusFork >= ConsensusFork.Deneb:
# if forkyBlck.message.body.blob_kzg_commitments.len > 0:
# hasBlobs = true
# break
# hasBlobs
func combine(acc: seq[Slot], cur: Slot): seq[Slot] =
var copy = acc
@ -497,49 +497,49 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
copy.add(cur)
copy
let blobData =
if shouldGetBlobs:
let blobs = await man.getBlobSidecars(peer, req)
if blobs.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive blobs on request",
request = req, err = blobs.error
return
let blobData = blobs.get().asSeq()
let blobSmap = getShortMap(req, blobData)
debug "Received blobs on request", blobs_count = len(blobData),
blobs_map = blobSmap, request = req
# let blobData =
# if shouldGetBlobs:
# let blobs = await man.getBlobSidecars(peer, req)
# if blobs.isErr():
# peer.updateScore(PeerScoreNoValues)
# man.queue.push(req)
# debug "Failed to receive blobs on request",
# request = req, err = blobs.error
# return
# let blobData = blobs.get().asSeq()
# let blobSmap = getShortMap(req, blobData)
# debug "Received blobs on request", blobs_count = len(blobData),
# blobs_map = blobSmap, request = req
if len(blobData) > 0:
let slots = mapIt(blobData, it[].signed_block_header.message.slot)
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is not in requested range",
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
request = req
return
let groupedBlobs = groupBlobs(req, blockData, blobData)
if groupedBlobs.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
info "Received blobs sequence is inconsistent",
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
return
if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is invalid",
blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData),
request = req,
msg = checkRes.error
return
Opt.some(groupedBlobs.get())
else:
Opt.none(seq[BlobSidecars])
# if len(blobData) > 0:
# let slots = mapIt(blobData, it[].signed_block_header.message.slot)
# let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
# if not(checkResponse(req, uniqueSlots)):
# peer.updateScore(PeerScoreBadResponse)
# man.queue.push(req)
# warn "Received blobs sequence is not in requested range",
# blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
# request = req
# return
# let groupedBlobs = groupBlobs(req, blockData, blobData)
# if groupedBlobs.isErr():
# peer.updateScore(PeerScoreNoValues)
# man.queue.push(req)
# info "Received blobs sequence is inconsistent",
# blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
# return
# if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
# peer.updateScore(PeerScoreBadResponse)
# man.queue.push(req)
# warn "Received blobs sequence is invalid",
# blobs_count = len(blobData),
# blobs_map = getShortMap(req, blobData),
# request = req,
# msg = checkRes.error
# return
# Opt.some(groupedBlobs.get())
# else:
# Opt.none(seq[BlobSidecars])
let shouldGetDataColumns =
if not man.shouldGetDataColumns(req.slot.epoch):
@ -623,7 +623,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
# TODO descore peers that lie
maybeFinalized = lastSlot < peerFinalized
await man.queue.push(req, blockData, blobData, dataColumnData, maybeFinalized, proc() =
await man.queue.push(req, blockData, Opt.none(seq[BlobSidecars]), dataColumnData, maybeFinalized, proc() =
man.workers[index].status = SyncWorkerStatus.Processing)
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} =