Skip execution payload verification for finalized blocks (#4591)
While syncing the finalized portion of the chain, the execution client cannot efficiently sync and most of the time returns `SYNCING` - in this PR, we use CL-verified optmistic sync as long as the block is claimed to be finalized, only occasionally updating the EL with progress. Although a peer might lie about what is finalized and what isn't, eventually we'll call the execution client - thus, all a dishonest client can do is delay execution verification slightly. Gossip blocks in particular are never assumed to be finalized.
This commit is contained in:
parent
4c845b9749
commit
f3ddea6c86
|
@ -39,10 +39,19 @@ export sszdump, signatures_batch
|
||||||
declareHistogram beacon_store_block_duration_seconds,
|
declareHistogram beacon_store_block_duration_seconds,
|
||||||
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
|
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
|
||||||
|
|
||||||
|
const
|
||||||
|
SLOTS_PER_PAYLOAD = SLOTS_PER_HISTORICAL_ROOT
|
||||||
|
## Number of slots we process between each execution payload execution, while
|
||||||
|
## syncing the finalized part of the chain
|
||||||
|
PAYLOAD_PRE_WALL_SLOTS = SLOTS_PER_EPOCH * 2
|
||||||
|
## Number of slots from wall time that we start processing every payload
|
||||||
|
|
||||||
type
|
type
|
||||||
BlockEntry* = object
|
BlockEntry* = object
|
||||||
blck*: ForkedSignedBeaconBlock
|
blck*: ForkedSignedBeaconBlock
|
||||||
blobs*: Opt[eip4844.BlobsSidecar]
|
blobs*: Opt[eip4844.BlobsSidecar]
|
||||||
|
maybeFinalized*: bool
|
||||||
|
## The block source claims the block has been finalized already
|
||||||
resfut*: Future[Result[void, VerifierError]]
|
resfut*: Future[Result[void, VerifierError]]
|
||||||
queueTick*: Moment # Moment when block was enqueued
|
queueTick*: Moment # Moment when block was enqueued
|
||||||
validationDur*: Duration # Time it took to perform gossip validation
|
validationDur*: Duration # Time it took to perform gossip validation
|
||||||
|
@ -86,6 +95,10 @@ type
|
||||||
|
|
||||||
verifier: BatchVerifier
|
verifier: BatchVerifier
|
||||||
|
|
||||||
|
lastPayload: Slot
|
||||||
|
## The slot at which we sent a payload to the execution client the last
|
||||||
|
## time
|
||||||
|
|
||||||
NewPayloadStatus {.pure.} = enum
|
NewPayloadStatus {.pure.} = enum
|
||||||
valid
|
valid
|
||||||
notValid
|
notValid
|
||||||
|
@ -100,6 +113,7 @@ proc addBlock*(
|
||||||
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
||||||
blobs: Opt[eip4844.BlobsSidecar],
|
blobs: Opt[eip4844.BlobsSidecar],
|
||||||
resfut: Future[Result[void, VerifierError]] = nil,
|
resfut: Future[Result[void, VerifierError]] = nil,
|
||||||
|
maybeFinalized = false,
|
||||||
validationDur = Duration())
|
validationDur = Duration())
|
||||||
|
|
||||||
# Initialization
|
# Initialization
|
||||||
|
@ -353,6 +367,7 @@ proc storeBlock*(
|
||||||
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
|
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
|
||||||
signedBlock: ForkySignedBeaconBlock,
|
signedBlock: ForkySignedBeaconBlock,
|
||||||
blobs: Opt[eip4844.BlobsSidecar],
|
blobs: Opt[eip4844.BlobsSidecar],
|
||||||
|
maybeFinalized = false,
|
||||||
queueTick: Moment = Moment.now(), validationDur = Duration()):
|
queueTick: Moment = Moment.now(), validationDur = Duration()):
|
||||||
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} =
|
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} =
|
||||||
## storeBlock is the main entry point for unvalidated blocks - all untrusted
|
## storeBlock is the main entry point for unvalidated blocks - all untrusted
|
||||||
|
@ -364,11 +379,23 @@ proc storeBlock*(
|
||||||
startTick = Moment.now()
|
startTick = Moment.now()
|
||||||
vm = self.validatorMonitor
|
vm = self.validatorMonitor
|
||||||
dag = self.consensusManager.dag
|
dag = self.consensusManager.dag
|
||||||
|
wallSlot = wallTime.slotOrZero
|
||||||
payloadStatus =
|
payloadStatus =
|
||||||
when typeof(signedBlock).toFork() >= ConsensusFork.Bellatrix:
|
if maybeFinalized and
|
||||||
await self.consensusManager.eth1Monitor.getExecutionValidity(signedBlock)
|
(self.lastPayload + SLOTS_PER_PAYLOAD) > signedBlock.message.slot and
|
||||||
|
(signedBlock.message.slot + PAYLOAD_PRE_WALL_SLOTS) < wallSlot:
|
||||||
|
# Skip payload validation when message source (reasonably) claims block
|
||||||
|
# has been finalized - this speeds up forward sync - in the worst case
|
||||||
|
# that the claim is false, we will correct every time we process a block
|
||||||
|
# from an honest source (or when we're close to head).
|
||||||
|
# Occasionally we also send a payload to the the EL so that it can
|
||||||
|
# progress in its own sync.
|
||||||
|
NewPayloadStatus.noResponse
|
||||||
else:
|
else:
|
||||||
NewPayloadStatus.valid # vacuously
|
when typeof(signedBlock).toFork() >= ConsensusFork.Bellatrix:
|
||||||
|
await self.consensusManager.eth1Monitor.getExecutionValidity(signedBlock)
|
||||||
|
else:
|
||||||
|
NewPayloadStatus.valid # vacuously
|
||||||
payloadValid = payloadStatus == NewPayloadStatus.valid
|
payloadValid = payloadStatus == NewPayloadStatus.valid
|
||||||
|
|
||||||
# The block is certainly not missing any more
|
# The block is certainly not missing any more
|
||||||
|
@ -467,6 +494,10 @@ proc storeBlock*(
|
||||||
|
|
||||||
return err((blck.error, ProcessingStatus.completed))
|
return err((blck.error, ProcessingStatus.completed))
|
||||||
|
|
||||||
|
if payloadStatus in {NewPayloadStatus.valid, NewPayloadStatus.notValid}:
|
||||||
|
# If the EL responded at all, we don't need to try again for a while
|
||||||
|
self[].lastPayload = signedBlock.message.slot
|
||||||
|
|
||||||
# write blobs now that block has been written.
|
# write blobs now that block has been written.
|
||||||
if blobs.isSome():
|
if blobs.isSome():
|
||||||
self.consensusManager.dag.db.putBlobsSidecar(blobs.get())
|
self.consensusManager.dag.db.putBlobsSidecar(blobs.get())
|
||||||
|
@ -490,7 +521,6 @@ proc storeBlock*(
|
||||||
# Grab the new head according to our latest attestation data; determines how
|
# Grab the new head according to our latest attestation data; determines how
|
||||||
# async this needs to be.
|
# async this needs to be.
|
||||||
let
|
let
|
||||||
wallSlot = wallTime.slotOrZero
|
|
||||||
newHead = attestationPool[].selectOptimisticHead(
|
newHead = attestationPool[].selectOptimisticHead(
|
||||||
wallSlot.start_beacon_time)
|
wallSlot.start_beacon_time)
|
||||||
|
|
||||||
|
@ -575,7 +605,9 @@ proc storeBlock*(
|
||||||
|
|
||||||
proc addBlock*(
|
proc addBlock*(
|
||||||
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
||||||
blobs: Opt[eip4844.BlobsSidecar], resfut: Future[Result[void, VerifierError]] = nil,
|
blobs: Opt[eip4844.BlobsSidecar],
|
||||||
|
resfut: Future[Result[void, VerifierError]] = nil,
|
||||||
|
maybeFinalized = false,
|
||||||
validationDur = Duration()) =
|
validationDur = Duration()) =
|
||||||
## Enqueue a Gossip-validated block for consensus verification
|
## Enqueue a Gossip-validated block for consensus verification
|
||||||
# Backpressure:
|
# Backpressure:
|
||||||
|
@ -599,6 +631,7 @@ proc addBlock*(
|
||||||
self.blockQueue.addLastNoWait(BlockEntry(
|
self.blockQueue.addLastNoWait(BlockEntry(
|
||||||
blck: blck,
|
blck: blck,
|
||||||
blobs: blobs,
|
blobs: blobs,
|
||||||
|
maybeFinalized: maybeFinalized,
|
||||||
resfut: resfut, queueTick: Moment.now(),
|
resfut: resfut, queueTick: Moment.now(),
|
||||||
validationDur: validationDur,
|
validationDur: validationDur,
|
||||||
src: src))
|
src: src))
|
||||||
|
@ -623,8 +656,8 @@ proc processBlock(
|
||||||
|
|
||||||
let res = withBlck(entry.blck):
|
let res = withBlck(entry.blck):
|
||||||
await self.storeBlock(
|
await self.storeBlock(
|
||||||
entry.src, wallTime, blck, entry.blobs, entry.queueTick,
|
entry.src, wallTime, blck, entry.blobs, entry.maybeFinalized,
|
||||||
entry.validationDur)
|
entry.queueTick, entry.validationDur)
|
||||||
|
|
||||||
if res.isErr and res.error[1] == ProcessingStatus.notCompleted:
|
if res.isErr and res.error[1] == ProcessingStatus.notCompleted:
|
||||||
# When an execution engine returns an error or fails to respond to a
|
# When an execution engine returns an error or fails to respond to a
|
||||||
|
@ -635,7 +668,8 @@ proc processBlock(
|
||||||
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.2/sync/optimistic.md#execution-engine-errors
|
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.2/sync/optimistic.md#execution-engine-errors
|
||||||
await sleepAsync(chronos.seconds(1))
|
await sleepAsync(chronos.seconds(1))
|
||||||
self[].addBlock(
|
self[].addBlock(
|
||||||
entry.src, entry.blck, entry.blobs, entry.resfut, entry.validationDur)
|
entry.src, entry.blck, entry.blobs, entry.resfut, entry.maybeFinalized,
|
||||||
|
entry.validationDur)
|
||||||
# To ensure backpressure on the sync manager, do not complete these futures.
|
# To ensure backpressure on the sync manager, do not complete these futures.
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -191,7 +191,8 @@ proc new*(T: type Eth2Processor,
|
||||||
|
|
||||||
proc processSignedBeaconBlock*(
|
proc processSignedBeaconBlock*(
|
||||||
self: var Eth2Processor, src: MsgSource,
|
self: var Eth2Processor, src: MsgSource,
|
||||||
signedBlockAndBlobs: ForkySignedBeaconBlockMaybeBlobs): ValidationRes =
|
signedBlockAndBlobs: ForkySignedBeaconBlockMaybeBlobs,
|
||||||
|
maybeFinalized: bool = false): ValidationRes =
|
||||||
let
|
let
|
||||||
wallTime = self.getCurrentBeaconTime()
|
wallTime = self.getCurrentBeaconTime()
|
||||||
(afterGenesis, wallSlot) = wallTime.toSlot()
|
(afterGenesis, wallSlot) = wallTime.toSlot()
|
||||||
|
@ -230,6 +231,7 @@ proc processSignedBeaconBlock*(
|
||||||
self.blockProcessor[].addBlock(
|
self.blockProcessor[].addBlock(
|
||||||
src, ForkedSignedBeaconBlock.init(signedBlock),
|
src, ForkedSignedBeaconBlock.init(signedBlock),
|
||||||
blobs,
|
blobs,
|
||||||
|
maybeFinalized = maybeFinalized,
|
||||||
validationDur = nanoseconds(
|
validationDur = nanoseconds(
|
||||||
(self.getCurrentBeaconTime() - wallTime).nanoseconds))
|
(self.getCurrentBeaconTime() - wallTime).nanoseconds))
|
||||||
|
|
||||||
|
|
|
@ -321,7 +321,8 @@ proc initFullNode(
|
||||||
blockProcessor = BlockProcessor.new(
|
blockProcessor = BlockProcessor.new(
|
||||||
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
|
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
|
||||||
rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime)
|
rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime)
|
||||||
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock):
|
blockVerifier =
|
||||||
|
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] =
|
Future[Result[void, VerifierError]] =
|
||||||
# The design with a callback for block verification is unusual compared
|
# The design with a callback for block verification is unusual compared
|
||||||
# to the rest of the application, but fits with the general approach
|
# to the rest of the application, but fits with the general approach
|
||||||
|
@ -329,7 +330,8 @@ proc initFullNode(
|
||||||
# that should probably be reimagined more holistically in the future.
|
# that should probably be reimagined more holistically in the future.
|
||||||
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
|
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
|
||||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||||
Opt.none(eip4844.BlobsSidecar), resfut)
|
Opt.none(eip4844.BlobsSidecar), resfut,
|
||||||
|
maybeFinalized = maybeFinalized)
|
||||||
resfut
|
resfut
|
||||||
blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
|
blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
|
||||||
Future[Result[void, VerifierError]] =
|
Future[Result[void, VerifierError]] =
|
||||||
|
|
|
@ -30,7 +30,7 @@ const
|
||||||
|
|
||||||
type
|
type
|
||||||
BlockVerifier* =
|
BlockVerifier* =
|
||||||
proc(signedBlock: ForkedSignedBeaconBlock):
|
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
||||||
BlockBlobsVerifier* =
|
BlockBlobsVerifier* =
|
||||||
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
|
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
|
||||||
|
@ -111,7 +111,7 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
|
||||||
gotUnviableBlock = false
|
gotUnviableBlock = false
|
||||||
|
|
||||||
for b in ublocks:
|
for b in ublocks:
|
||||||
let ver = await rman.blockVerifier(b[])
|
let ver = await rman.blockVerifier(b[], false)
|
||||||
if ver.isErr():
|
if ver.isErr():
|
||||||
case ver.error()
|
case ver.error()
|
||||||
of VerifierError.MissingParent:
|
of VerifierError.MissingParent:
|
||||||
|
|
|
@ -362,7 +362,15 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
||||||
|
|
||||||
# Scoring will happen in `syncUpdate`.
|
# Scoring will happen in `syncUpdate`.
|
||||||
man.workers[index].status = SyncWorkerStatus.Queueing
|
man.workers[index].status = SyncWorkerStatus.Queueing
|
||||||
await man.queue.push(req, data, proc() =
|
let
|
||||||
|
peerFinalized = peer.getFinalizedEpoch().start_slot()
|
||||||
|
lastSlot = req.slot + req.count
|
||||||
|
# The peer claims the block is finalized - our own block processing will
|
||||||
|
# verify this point down the line
|
||||||
|
# TODO descore peers that lie
|
||||||
|
maybeFinalized = lastSlot < peerFinalized
|
||||||
|
|
||||||
|
await man.queue.push(req, data, maybeFinalized, proc() =
|
||||||
man.workers[index].status = SyncWorkerStatus.Processing)
|
man.workers[index].status = SyncWorkerStatus.Processing)
|
||||||
else:
|
else:
|
||||||
peer.updateScore(PeerScoreNoValues)
|
peer.updateScore(PeerScoreNoValues)
|
||||||
|
|
|
@ -750,6 +750,10 @@ proc getHeadSlot*(peer: Peer): Slot =
|
||||||
## Returns head slot for specific peer ``peer``.
|
## Returns head slot for specific peer ``peer``.
|
||||||
peer.state(BeaconSync).statusMsg.headSlot
|
peer.state(BeaconSync).statusMsg.headSlot
|
||||||
|
|
||||||
|
proc getFinalizedEpoch*(peer: Peer): Epoch =
|
||||||
|
## Returns head slot for specific peer ``peer``.
|
||||||
|
peer.state(BeaconSync).statusMsg.finalizedEpoch
|
||||||
|
|
||||||
proc initBeaconSync*(network: Eth2Node, dag: ChainDAGRef,
|
proc initBeaconSync*(network: Eth2Node, dag: ChainDAGRef,
|
||||||
getBeaconTime: GetBeaconTimeFn) =
|
getBeaconTime: GetBeaconTimeFn) =
|
||||||
var networkState = network.protocolState(BeaconSync)
|
var networkState = network.protocolState(BeaconSync)
|
||||||
|
|
|
@ -26,7 +26,7 @@ type
|
||||||
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
|
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
|
||||||
ProcessingCallback* = proc() {.gcsafe, raises: [Defect].}
|
ProcessingCallback* = proc() {.gcsafe, raises: [Defect].}
|
||||||
BlockVerifier* =
|
BlockVerifier* =
|
||||||
proc(signedBlock: ForkedSignedBeaconBlock):
|
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
SyncQueueKind* {.pure.} = enum
|
SyncQueueKind* {.pure.} = enum
|
||||||
|
@ -578,6 +578,7 @@ func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
|
||||||
|
|
||||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||||
data: seq[ref ForkedSignedBeaconBlock],
|
data: seq[ref ForkedSignedBeaconBlock],
|
||||||
|
maybeFinalized: bool = false,
|
||||||
processingCb: ProcessingCallback = nil) {.async.} =
|
processingCb: ProcessingCallback = nil) {.async.} =
|
||||||
logScope:
|
logScope:
|
||||||
sync_ident = sq.ident
|
sync_ident = sq.ident
|
||||||
|
@ -654,7 +655,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||||
res: Result[void, VerifierError]
|
res: Result[void, VerifierError]
|
||||||
|
|
||||||
for blk in sq.blocks(item):
|
for blk in sq.blocks(item):
|
||||||
res = await sq.blockVerifier(blk[])
|
res = await sq.blockVerifier(blk[], maybeFinalized)
|
||||||
if res.isOk():
|
if res.isOk():
|
||||||
goodBlock = some(blk[].slot)
|
goodBlock = some(blk[].slot)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -49,7 +49,7 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier =
|
||||||
# in the async queue, similar to how BlockProcessor does it - as far as
|
# in the async queue, similar to how BlockProcessor does it - as far as
|
||||||
# testing goes, this is risky because it might introduce differences between
|
# testing goes, this is risky because it might introduce differences between
|
||||||
# the BlockProcessor and this test
|
# the BlockProcessor and this test
|
||||||
proc verify(signedBlock: ForkedSignedBeaconBlock):
|
proc verify(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] =
|
Future[Result[void, VerifierError]] =
|
||||||
let fut = newFuture[Result[void, VerifierError]]()
|
let fut = newFuture[Result[void, VerifierError]]()
|
||||||
try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut))
|
try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut))
|
||||||
|
|
Loading…
Reference in New Issue