When a block is introduced to the system both via REST and gossip at the same time, we will call `storeBlock` from two locations leading to a dupliace check race condition as we wait for the EL. This issue may manifest in particular when using an external block builder that itself publishes the block onto the gossip network. * refactor enqueue flow * simplify calling `addBlock` * complete request manager verifier future for blobless blocks * re-verify parent conditions before adding block among other things, it might have gone stale or finalized between one call and the other
This commit is contained in:
parent
5d936c24e4
commit
49729e1ef3
|
@ -230,13 +230,24 @@ proc addHeadBlockWithParent*(
|
|||
## Cryptographic checks can be skipped by adding skipBlsValidation to
|
||||
## dag.updateFlags.
|
||||
##
|
||||
## The parent must be obtained using `checkHeadBlock` to ensure complete
|
||||
## verification.
|
||||
## The parent should be obtained using `checkHeadBlock`.
|
||||
logScope:
|
||||
blockRoot = shortLog(signedBlock.root)
|
||||
blck = shortLog(signedBlock.message)
|
||||
signature = shortLog(signedBlock.signature)
|
||||
|
||||
block:
|
||||
# We re-check parent pre-conditions here to avoid the case where the parent
|
||||
# has become stale - it is possible that the dag has finalized the parent
|
||||
# by the time we get here which will cause us to return early.
|
||||
let checkedParent = ? checkHeadBlock(dag, signedBlock)
|
||||
if checkedParent != parent:
|
||||
# This should never happen: it would mean that the caller supplied a
|
||||
# different parent than the block points to!
|
||||
error "checkHeadBlock parent mismatch - this is a bug",
|
||||
parent = shortLog(parent), checkedParent = shortLog(checkedParent)
|
||||
return err(VerifierError.MissingParent)
|
||||
|
||||
template blck(): untyped = signedBlock.message # shortcuts without copy
|
||||
template blockRoot(): untyped = signedBlock.root
|
||||
|
||||
|
|
|
@ -117,13 +117,6 @@ type
|
|||
completed
|
||||
notCompleted
|
||||
|
||||
proc addBlock*(
|
||||
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
||||
blobs: Opt[BlobSidecars],
|
||||
resfut: Future[Result[void, VerifierError]] = nil,
|
||||
maybeFinalized = false,
|
||||
validationDur = Duration())
|
||||
|
||||
# Initialization
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
@ -386,7 +379,32 @@ proc checkBloblessSignature(self: BlockProcessor,
|
|||
return err("checkBloblessSignature: Invalid proposer signature")
|
||||
ok()
|
||||
|
||||
proc storeBlock*(
|
||||
proc enqueueBlock*(
|
||||
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
||||
blobs: Opt[BlobSidecars],
|
||||
resfut: Future[Result[void, VerifierError]] = nil,
|
||||
maybeFinalized = false,
|
||||
validationDur = Duration()) =
|
||||
withBlck(blck):
|
||||
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
|
||||
# let backfill blocks skip the queue - these are always "fast" to process
|
||||
# because there are no state rewinds to deal with
|
||||
let res = self.storeBackfillBlock(blck, blobs)
|
||||
resfut.complete(res)
|
||||
return
|
||||
|
||||
try:
|
||||
self.blockQueue.addLastNoWait(BlockEntry(
|
||||
blck: blck,
|
||||
blobs: blobs,
|
||||
maybeFinalized: maybeFinalized,
|
||||
resfut: resfut, queueTick: Moment.now(),
|
||||
validationDur: validationDur,
|
||||
src: src))
|
||||
except AsyncQueueFullError:
|
||||
raiseAssert "unbounded queue"
|
||||
|
||||
proc storeBlock(
|
||||
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
|
||||
signedBlock: ForkySignedBeaconBlock,
|
||||
blobsOpt: Opt[BlobSidecars],
|
||||
|
@ -444,6 +462,10 @@ proc storeBlock*(
|
|||
err((error, ProcessingStatus.completed))
|
||||
|
||||
let
|
||||
# We have to be careful that there exists only one in-flight entry point
|
||||
# for adding blocks or the checks performed in `checkHeadBlock` might
|
||||
# be invalidated (ie a block could be added while we wait for EL response
|
||||
# here)
|
||||
parent = dag.checkHeadBlock(signedBlock)
|
||||
|
||||
if parent.isErr():
|
||||
|
@ -698,10 +720,12 @@ proc storeBlock*(
|
|||
|
||||
withBlck(quarantined):
|
||||
when typeof(blck).toFork() < ConsensusFork.Deneb:
|
||||
self[].addBlock(MsgSource.gossip, quarantined, Opt.none(BlobSidecars))
|
||||
self[].enqueueBlock(
|
||||
MsgSource.gossip, quarantined, Opt.none(BlobSidecars))
|
||||
else:
|
||||
if len(blck.message.body.blob_kzg_commitments) == 0:
|
||||
self[].addBlock(MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]))
|
||||
self[].enqueueBlock(
|
||||
MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]))
|
||||
else:
|
||||
if (let res = checkBloblessSignature(self[], blck); res.isErr):
|
||||
warn "Failed to verify signature of unorphaned blobless block",
|
||||
|
@ -710,7 +734,7 @@ proc storeBlock*(
|
|||
continue
|
||||
if self.blobQuarantine[].hasBlobs(blck):
|
||||
let blobs = self.blobQuarantine[].popBlobs(blck.root)
|
||||
self[].addBlock(MsgSource.gossip, quarantined, Opt.some(blobs))
|
||||
self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs))
|
||||
else:
|
||||
if not self.consensusManager.quarantine[].addBlobless(
|
||||
dag.finalizedHead.slot, blck):
|
||||
|
@ -725,10 +749,8 @@ proc storeBlock*(
|
|||
|
||||
proc addBlock*(
|
||||
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
||||
blobs: Opt[BlobSidecars],
|
||||
resfut: Future[Result[void, VerifierError]] = nil,
|
||||
maybeFinalized = false,
|
||||
validationDur = Duration()) =
|
||||
blobs: Opt[BlobSidecars], maybeFinalized = false,
|
||||
validationDur = Duration()): Future[Result[void, VerifierError]] =
|
||||
## Enqueue a Gossip-validated block for consensus verification
|
||||
# Backpressure:
|
||||
# There is no backpressure here - producers must wait for `resfut` to
|
||||
|
@ -737,26 +759,10 @@ proc addBlock*(
|
|||
# - Gossip (when synced)
|
||||
# - SyncManager (during sync)
|
||||
# - RequestManager (missing ancestor blocks)
|
||||
|
||||
withBlck(blck):
|
||||
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
|
||||
# let backfill blocks skip the queue - these are always "fast" to process
|
||||
# because there are no state rewinds to deal with
|
||||
let res = self.storeBackfillBlock(blck, blobs)
|
||||
if resfut != nil:
|
||||
resfut.complete(res)
|
||||
return
|
||||
|
||||
try:
|
||||
self.blockQueue.addLastNoWait(BlockEntry(
|
||||
blck: blck,
|
||||
blobs: blobs,
|
||||
maybeFinalized: maybeFinalized,
|
||||
resfut: resfut, queueTick: Moment.now(),
|
||||
validationDur: validationDur,
|
||||
src: src))
|
||||
except AsyncQueueFullError:
|
||||
raiseAssert "unbounded queue"
|
||||
# - API
|
||||
let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock")
|
||||
enqueueBlock(self, src, blck, blobs, resfut, maybeFinalized, validationDur)
|
||||
resfut
|
||||
|
||||
# Event Loop
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -787,7 +793,7 @@ proc processBlock(
|
|||
# - MAY queue the block for later processing.
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/sync/optimistic.md#execution-engine-errors
|
||||
await sleepAsync(chronos.seconds(1))
|
||||
self[].addBlock(
|
||||
self[].enqueueBlock(
|
||||
entry.src, entry.blck, entry.blobs, entry.resfut, entry.maybeFinalized,
|
||||
entry.validationDur)
|
||||
# To ensure backpressure on the sync manager, do not complete these futures.
|
||||
|
|
|
@ -239,20 +239,22 @@ proc processSignedBeaconBlock*(
|
|||
# propagation of seemingly good blocks
|
||||
trace "Block validated"
|
||||
|
||||
var blobs = Opt.none(BlobSidecars)
|
||||
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
|
||||
if self.blobQuarantine[].hasBlobs(signedBlock):
|
||||
blobs = Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root))
|
||||
let blobs =
|
||||
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
|
||||
if self.blobQuarantine[].hasBlobs(signedBlock):
|
||||
Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root))
|
||||
else:
|
||||
if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
||||
signedBlock):
|
||||
notice "Block quarantine full (blobless)",
|
||||
blockRoot = shortLog(signedBlock.root),
|
||||
blck = shortLog(signedBlock.message),
|
||||
signature = shortLog(signedBlock.signature)
|
||||
return v
|
||||
else:
|
||||
if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
||||
signedBlock):
|
||||
notice "Block quarantine full (blobless)",
|
||||
blockRoot = shortLog(signedBlock.root),
|
||||
blck = shortLog(signedBlock.message),
|
||||
signature = shortLog(signedBlock.signature)
|
||||
return v
|
||||
Opt.none(BlobSidecars)
|
||||
|
||||
self.blockProcessor[].addBlock(
|
||||
self.blockProcessor[].enqueueBlock(
|
||||
src, ForkedSignedBeaconBlock.init(signedBlock),
|
||||
blobs,
|
||||
maybeFinalized = maybeFinalized,
|
||||
|
@ -293,7 +295,7 @@ proc processSignedBlobSidecar*(
|
|||
debug "Blob received", delay
|
||||
|
||||
let v =
|
||||
self.dag.validateBlobSidecar(self.quarantine, self.blob_quarantine,
|
||||
self.dag.validateBlobSidecar(self.quarantine, self.blobQuarantine,
|
||||
signedBlobSidecar, wallTime, idx)
|
||||
|
||||
if v.isErr():
|
||||
|
@ -312,7 +314,7 @@ proc processSignedBlobSidecar*(
|
|||
let blobless = o.unsafeGet()
|
||||
|
||||
if self.blobQuarantine[].hasBlobs(blobless):
|
||||
self.blockProcessor[].addBlock(
|
||||
self.blockProcessor[].enqueueBlock(
|
||||
MsgSource.gossip,
|
||||
ForkedSignedBeaconBlock.init(blobless),
|
||||
Opt.some(self.blobQuarantine[].popBlobs(
|
||||
|
|
|
@ -332,44 +332,40 @@ proc initFullNode(
|
|||
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
|
||||
rng, taskpool, consensusManager, node.validatorMonitor,
|
||||
blobQuarantine, getBeaconTime)
|
||||
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||
blobs: Opt[BlobSidecars], maybeFinalized: bool):
|
||||
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||
blobs: Opt[BlobSidecars], maybeFinalized: bool):
|
||||
Future[Result[void, VerifierError]] =
|
||||
# The design with a callback for block verification is unusual compared
|
||||
# to the rest of the application, but fits with the general approach
|
||||
# taken in the sync/request managers - this is an architectural compromise
|
||||
# that should probably be reimagined more holistically in the future.
|
||||
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
|
||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
blobs,
|
||||
resfut,
|
||||
maybeFinalized = maybeFinalized)
|
||||
resfut
|
||||
blockProcessor[].addBlock(
|
||||
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
|
||||
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||
maybeFinalized: bool):
|
||||
maybeFinalized: bool):
|
||||
Future[Result[void, VerifierError]] =
|
||||
let resfut = newFuture[Result[void, VerifierError]]("rmanBlockVerifier")
|
||||
withBlck(signedBlock):
|
||||
when typeof(blck).toFork() >= ConsensusFork.Deneb:
|
||||
if not blobQuarantine[].hasBlobs(blck):
|
||||
# We don't have all the blobs for this block, so we have
|
||||
# to put it in blobless quarantine.
|
||||
if not quarantine[].addBlobless(dag.finalizedHead.slot, blck):
|
||||
let e = Result[void, VerifierError].err(VerifierError.UnviableFork)
|
||||
resfut.complete(e)
|
||||
return
|
||||
let blobs = blobQuarantine[].popBlobs(blck.root)
|
||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
Opt.some(blobs),
|
||||
resfut,
|
||||
maybeFinalized = maybeFinalized)
|
||||
Future.completed(
|
||||
Result[void, VerifierError].err(VerifierError.UnviableFork),
|
||||
"rmanBlockVerifier")
|
||||
else:
|
||||
Future.completed(
|
||||
Result[void, VerifierError].err(VerifierError.MissingParent),
|
||||
"rmanBlockVerifier")
|
||||
else:
|
||||
let blobs = blobQuarantine[].popBlobs(blck.root)
|
||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
Opt.some(blobs),
|
||||
maybeFinalized = maybeFinalized)
|
||||
else:
|
||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
Opt.none(BlobSidecars),
|
||||
resfut,
|
||||
maybeFinalized = maybeFinalized)
|
||||
resfut
|
||||
|
||||
|
||||
processor = Eth2Processor.new(
|
||||
config.doppelgangerDetection,
|
||||
|
|
|
@ -164,16 +164,16 @@ proc routeSignedBeaconBlock*(
|
|||
notice "Blob sent", blob = shortLog(signedBlobs[i]), error = res.error[]
|
||||
blobs = Opt.some(blobsOpt.get().mapIt(newClone(it.message)))
|
||||
|
||||
let newBlockRef = await router[].blockProcessor.storeBlock(
|
||||
MsgSource.api, sendTime, blck, blobs)
|
||||
let added = await router[].blockProcessor[].addBlock(
|
||||
MsgSource.api, ForkedSignedBeaconBlock.init(blck), blobs)
|
||||
|
||||
# The boolean we return tells the caller whether the block was integrated
|
||||
# into the chain
|
||||
if newBlockRef.isErr():
|
||||
return if newBlockRef.error()[0] != VerifierError.Duplicate:
|
||||
if added.isErr():
|
||||
return if added.error() != VerifierError.Duplicate:
|
||||
warn "Unable to add routed block to block pool",
|
||||
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
|
||||
signature = shortLog(blck.signature), err = newBlockRef.error()
|
||||
signature = shortLog(blck.signature), err = added.error()
|
||||
ok(Opt.none(BlockRef))
|
||||
else:
|
||||
# If it's duplicate, there's an existing BlockRef to return. The block
|
||||
|
@ -183,10 +183,16 @@ proc routeSignedBeaconBlock*(
|
|||
if blockRef.isErr:
|
||||
warn "Unable to add routed duplicate block to block pool",
|
||||
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
|
||||
signature = shortLog(blck.signature), err = newBlockRef.error()
|
||||
signature = shortLog(blck.signature), err = added.error()
|
||||
ok(blockRef)
|
||||
|
||||
return ok(Opt.some(newBlockRef.get()))
|
||||
|
||||
let blockRef = router[].dag.getBlockRef(blck.root)
|
||||
if blockRef.isErr:
|
||||
warn "Block finalised while waiting for block processor",
|
||||
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
|
||||
signature = shortLog(blck.signature)
|
||||
ok(blockRef)
|
||||
|
||||
proc routeAttestation*(
|
||||
router: ref MessageRouter, attestation: Attestation,
|
||||
|
|
|
@ -59,11 +59,15 @@ suite "Block processor" & preset():
|
|||
processor = BlockProcessor.new(
|
||||
false, "", "", rng, taskpool, consensusManager,
|
||||
validatorMonitor, blobQuarantine, getTimeFn)
|
||||
processorFut = processor.runQueueProcessingLoop()
|
||||
|
||||
asyncTest "Reverse order block add & get" & preset():
|
||||
let missing = await processor.storeBlock(
|
||||
MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, Opt.none(BlobSidecars))
|
||||
check: missing.error[0] == VerifierError.MissingParent
|
||||
let
|
||||
missing = await processor[].addBlock(
|
||||
MsgSource.gossip, ForkedSignedBeaconBlock.init(b2),
|
||||
Opt.none(BlobSidecars))
|
||||
|
||||
check: missing.error == VerifierError.MissingParent
|
||||
|
||||
check:
|
||||
not dag.containsForkBlock(b2.root) # Unresolved, shouldn't show up
|
||||
|
@ -71,8 +75,9 @@ suite "Block processor" & preset():
|
|||
FetchRecord(root: b1.root) in quarantine[].checkMissing(32)
|
||||
|
||||
let
|
||||
status = await processor.storeBlock(
|
||||
MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, Opt.none(BlobSidecars))
|
||||
status = await processor[].addBlock(
|
||||
MsgSource.gossip, ForkedSignedBeaconBlock.init(b1),
|
||||
Opt.none(BlobSidecars))
|
||||
b1Get = dag.getBlockRef(b1.root)
|
||||
|
||||
check:
|
||||
|
@ -81,7 +86,6 @@ suite "Block processor" & preset():
|
|||
dag.containsForkBlock(b1.root)
|
||||
not dag.containsForkBlock(b2.root) # Async pipeline must still run
|
||||
|
||||
discard processor.runQueueProcessingLoop()
|
||||
while processor[].hasBlocks():
|
||||
poll()
|
||||
|
||||
|
|
Loading…
Reference in New Issue