remove `newPayload` from block production flow (#4186)

* remove `newPayload` from block production flow

* refactor block_processor to run `newPayload` as part of `storeBlock`
This commit is contained in:
tersec 2022-10-14 14:48:56 -05:00 committed by GitHub
parent 819442acc3
commit fb6e6d9cf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 169 additions and 172 deletions

View File

@ -85,6 +85,16 @@ type
verifier: BatchVerifier verifier: BatchVerifier
NewPayloadStatus {.pure.} = enum
valid
notValid
invalid
noResponse
BlockProcessingCompleted {.pure.} = enum
completed
notCompleted
proc addBlock*( proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
resfut: Future[Result[void, BlockError]] = nil, resfut: Future[Result[void, BlockError]] = nil,
@ -191,13 +201,108 @@ from ../consensus_object_pools/blockchain_dag import
from ../consensus_object_pools/block_dag import shortLog from ../consensus_object_pools/block_dag import shortLog
from ../consensus_object_pools/spec_cache import get_attesting_indices from ../consensus_object_pools/spec_cache import get_attesting_indices
from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock
from ../spec/datatypes/altair import SignedBeaconBlock
from ../spec/datatypes/bellatrix import SignedBeaconBlock
from eth/async_utils import awaitWithTimeout
from ../spec/datatypes/bellatrix import ExecutionPayload, SignedBeaconBlock
proc newExecutionPayload*(
eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload):
Future[Opt[PayloadExecutionStatus]] {.async.} =
if eth1Monitor.isNil:
warn "newPayload: attempting to process execution payload without Eth1Monitor. Ensure --web3-url setting is correct and JWT is configured."
return Opt.none PayloadExecutionStatus
debug "newPayload: inserting block into execution engine",
parentHash = executionPayload.parent_hash,
blockHash = executionPayload.block_hash,
stateRoot = shortLog(executionPayload.state_root),
receiptsRoot = shortLog(executionPayload.receipts_root),
prevRandao = shortLog(executionPayload.prev_randao),
blockNumber = executionPayload.block_number,
gasLimit = executionPayload.gas_limit,
gasUsed = executionPayload.gas_used,
timestamp = executionPayload.timestamp,
extraDataLen = executionPayload.extra_data.len,
baseFeePerGas = $executionPayload.base_fee_per_gas,
numTransactions = executionPayload.transactions.len
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.1/src/engine/specification.md#request
const NEWPAYLOAD_TIMEOUT = 8.seconds
try:
let
payloadResponse =
awaitWithTimeout(
eth1Monitor.newPayload(
executionPayload.asEngineExecutionPayload),
NEWPAYLOAD_TIMEOUT):
info "newPayload: newPayload timed out"
return Opt.none PayloadExecutionStatus
# Placeholder for type system
PayloadStatusV1(status: PayloadExecutionStatus.syncing)
payloadStatus = payloadResponse.status
debug "newPayload: succeeded",
parentHash = executionPayload.parent_hash,
blockHash = executionPayload.block_hash,
blockNumber = executionPayload.block_number,
payloadStatus
return Opt.some payloadStatus
except CatchableError as err:
error "newPayload failed", msg = err.msg
return Opt.none PayloadExecutionStatus
proc getExecutionValidity(
eth1Monitor: Eth1Monitor,
blck: phase0.SignedBeaconBlock | altair.SignedBeaconBlock):
Future[NewPayloadStatus] {.async.} =
return NewPayloadStatus.valid # vacuously
proc getExecutionValidity(
eth1Monitor: Eth1Monitor, blck: bellatrix.SignedBeaconBlock):
Future[NewPayloadStatus] {.async.} =
# Eth1 syncing is asynchronous from this
# TODO self.consensusManager.eth1Monitor.ttdReached
# should gate this when it works more reliably
# TODO detect have-TTD-but-not-is_execution_block case, and where
# execution payload was non-zero when TTD detection more reliable
if not blck.message.is_execution_block:
return NewPayloadStatus.valid # vacuously
try:
# Minimize window for Eth1 monitor to shut down connection
await eth1Monitor.ensureDataProvider()
let executionPayloadStatus = await newExecutionPayload(
eth1Monitor, blck.message.body.execution_payload)
if executionPayloadStatus.isNone:
return NewPayloadStatus.noResponse
case executionPayloadStatus.get
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
debug "getExecutionValidity: execution payload invalid",
executionPayloadStatus = $executionPayloadStatus.get,
blck = shortLog(blck)
return NewPayloadStatus.invalid
of PayloadExecutionStatus.syncing, PayloadExecutionStatus.accepted:
return NewPayloadStatus.notValid
of PayloadExecutionStatus.valid:
return NewPayloadStatus.valid
except CatchableError as err:
error "getExecutionValidity: newPayload failed", err = err.msg
return NewPayloadStatus.noResponse
proc storeBlock*( proc storeBlock*(
self: var BlockProcessor, self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
src: MsgSource, wallTime: BeaconTime, signedBlock: ForkySignedBeaconBlock, queueTick: Moment = Moment.now(),
signedBlock: ForkySignedBeaconBlock, payloadValid: bool, validationDur = Duration()):
queueTick: Moment = Moment.now(), Future[Result[BlockRef, (BlockError, BlockProcessingCompleted)]] {.async.} =
validationDur = Duration()): Result[BlockRef, BlockError] =
## storeBlock is the main entry point for unvalidated blocks - all untrusted ## storeBlock is the main entry point for unvalidated blocks - all untrusted
## blocks, regardless of origin, pass through here. When storing a block, ## blocks, regardless of origin, pass through here. When storing a block,
## we will add it to the dag and pass it to all block consumers that need ## we will add it to the dag and pass it to all block consumers that need
@ -207,10 +312,25 @@ proc storeBlock*(
startTick = Moment.now() startTick = Moment.now()
vm = self.validatorMonitor vm = self.validatorMonitor
dag = self.consensusManager.dag dag = self.consensusManager.dag
payloadStatus =
await self.consensusManager.eth1Monitor.getExecutionValidity(signedBlock)
payloadValid = payloadStatus == NewPayloadStatus.valid
# The block is certainly not missing any more # The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root) self.consensusManager.quarantine[].missing.del(signedBlock.root)
if NewPayloadStatus.invalid == payloadStatus:
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err((BlockError.UnviableFork, BlockProcessingCompleted.completed))
elif NewPayloadStatus.noResponse == payloadStatus:
# Disallow the `MissingParent` from leaking to the sync/request managers
# as it will be descored. However sync and request managers interact via
# `processBlock` (indirectly). `validator_duties` does call `storeBlock`
# directly, so is exposed to this, but only cares about whether there is
# an error or not.
return err((
BlockError.MissingParent, BlockProcessingCompleted.notCompleted))
# We'll also remove the block as an orphan: it's unlikely the parent is # We'll also remove the block as an orphan: it's unlikely the parent is
# missing if we get this far - should that be the case, the block will # missing if we get this far - should that be the case, the block will
# be re-added later # be re-added later
@ -240,7 +360,7 @@ proc storeBlock*(
trustedBlock.message.slot, trustedBlock.root, trustedBlock.message.slot, trustedBlock.root,
forkyState.data.current_sync_committee.pubkeys.data[i]) forkyState.data.current_sync_committee.pubkeys.data[i])
self.dumpBlock(signedBlock, blck) self[].dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received. # There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent # However this block was before the last finalized epoch and so its parent
@ -254,7 +374,7 @@ proc storeBlock*(
# this to the appropriate error so that sync etc doesn't retry the block # this to the appropriate error so that sync etc doesn't retry the block
self.consensusManager.quarantine[].addUnviable(signedBlock.root) self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err(BlockError.UnviableFork) return err((BlockError.UnviableFork, BlockProcessingCompleted.completed))
if not self.consensusManager.quarantine[].addOrphan( if not self.consensusManager.quarantine[].addOrphan(
dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock)): dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock)):
@ -267,7 +387,7 @@ proc storeBlock*(
self.consensusManager.quarantine[].addUnviable(signedBlock.root) self.consensusManager.quarantine[].addUnviable(signedBlock.root)
else: discard else: discard
return blck return err((blck.error, BlockProcessingCompleted.completed))
let storeBlockTick = Moment.now() let storeBlockTick = Moment.now()
@ -363,9 +483,9 @@ proc storeBlock*(
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root): for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent # Process the blocks that had the newly accepted block as parent
self.addBlock(MsgSource.gossip, quarantined) self[].addBlock(MsgSource.gossip, quarantined)
return blck return Result[BlockRef, (BlockError, BlockProcessingCompleted)].ok blck.get
# Enqueue # Enqueue
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -406,7 +526,7 @@ proc addBlock*(
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc processBlock( proc processBlock(
self: var BlockProcessor, entry: BlockEntry, payloadValid: bool) = self: ref BlockProcessor, entry: BlockEntry) {.async.} =
logScope: logScope:
blockRoot = shortLog(entry.blck.root) blockRoot = shortLog(entry.blck.root)
@ -419,64 +539,27 @@ proc processBlock(
quit 1 quit 1
let res = withBlck(entry.blck): let res = withBlck(entry.blck):
self.storeBlock( await self.storeBlock(
entry.src, wallTime, blck, payloadValid, entry.queueTick, entry.src, wallTime, blck, entry.queueTick, entry.validationDur)
entry.validationDur)
if res.isErr and res.error[1] == BlockProcessingCompleted.notCompleted:
# When an execution engine returns an error or fails to respond to a
# payload validity request for some block, a consensus engine:
# - MUST NOT optimistically import the block.
# - MUST NOT apply the block to the fork choice store.
# - MAY queue the block for later processing.
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/sync/optimistic.md#execution-engine-errors
await sleepAsync(chronos.seconds(1))
self[].addBlock(
entry.src, entry.blck, entry.resfut, entry.validationDur)
# To ensure backpressure on the sync manager, do not complete these futures.
return
if entry.resfut != nil: if entry.resfut != nil:
entry.resfut.complete( entry.resfut.complete(
if res.isOk(): Result[void, BlockError].ok() if res.isOk(): Result[void, BlockError].ok()
else: Result[void, BlockError].err(res.error())) else: Result[void, BlockError].err(res.error()[0]))
from eth/async_utils import awaitWithTimeout
from ../spec/datatypes/bellatrix import ExecutionPayload, SignedBeaconBlock
proc newExecutionPayload*(
eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload):
Future[Opt[PayloadExecutionStatus]] {.async.} =
if eth1Monitor.isNil:
warn "newPayload: attempting to process execution payload without Eth1Monitor. Ensure --web3-url setting is correct and JWT is configured."
return Opt.none PayloadExecutionStatus
debug "newPayload: inserting block into execution engine",
parentHash = executionPayload.parent_hash,
blockHash = executionPayload.block_hash,
stateRoot = shortLog(executionPayload.state_root),
receiptsRoot = shortLog(executionPayload.receipts_root),
prevRandao = shortLog(executionPayload.prev_randao),
blockNumber = executionPayload.block_number,
gasLimit = executionPayload.gas_limit,
gasUsed = executionPayload.gas_used,
timestamp = executionPayload.timestamp,
extraDataLen = executionPayload.extra_data.len,
baseFeePerGas = $executionPayload.base_fee_per_gas,
numTransactions = executionPayload.transactions.len
try:
let
payloadResponse =
awaitWithTimeout(
eth1Monitor.newPayload(
executionPayload.asEngineExecutionPayload),
NEWPAYLOAD_TIMEOUT):
info "newPayload: newPayload timed out"
return Opt.none PayloadExecutionStatus
# Placeholder for type system
PayloadStatusV1(status: PayloadExecutionStatus.syncing)
payloadStatus = payloadResponse.status
debug "newPayload: succeeded",
parentHash = executionPayload.parent_hash,
blockHash = executionPayload.block_hash,
blockNumber = executionPayload.block_number,
payloadStatus
return Opt.some payloadStatus
except CatchableError as err:
error "newPayload failed", msg = err.msg
return Opt.none PayloadExecutionStatus
proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
while true: while true:
@ -493,72 +576,4 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
discard await idleAsync().withTimeout(idleTimeout) discard await idleAsync().withTimeout(idleTimeout)
let await self.processBlock(await self[].blockQueue.popFirst())
blck = await self[].blockQueue.popFirst()
hasExecutionPayload =
withBlck(blck.blck): blck.message.is_execution_block
executionPayloadStatus =
if hasExecutionPayload:
# Eth1 syncing is asynchronous from this
# TODO self.consensusManager.eth1Monitor.ttdReached
# should gate this when it works more reliably
# TODO detect have-TTD-but-not-is_execution_block case, and where
# execution payload was non-zero when TTD detection more reliable
when true:
# When an execution engine returns an error or fails to respond to a
# payload validity request for some block, a consensus engine:
# - MUST NOT optimistically import the block.
# - MUST NOT apply the block to the fork choice store.
# - MAY queue the block for later processing.
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/sync/optimistic.md#execution-engine-errors
template reEnqueueBlock: untyped =
await sleepAsync(chronos.seconds(1))
self[].addBlock(
blck.src, blck.blck, blck.resfut, blck.validationDur)
try:
# Minimize window for Eth1 monitor to shut down connection
await self.consensusManager.eth1Monitor.ensureDataProvider()
let executionPayload =
withBlck(blck.blck):
when stateFork >= BeaconStateFork.Bellatrix:
blck.message.body.execution_payload
else:
doAssert false
default(bellatrix.ExecutionPayload) # satisfy Nim
let executionPayloadStatus = await newExecutionPayload(
self.consensusManager.eth1Monitor, executionPayload)
if executionPayloadStatus.isNone:
reEnqueueBlock()
continue
executionPayloadStatus.get
except CatchableError as err:
error "runQueueProcessingLoop: newPayload failed", err = err.msg
reEnqueueBlock()
continue
else:
debug "runQueueProcessingLoop: got execution payload before TTD"
PayloadExecutionStatus.syncing
else:
# Vacuously
PayloadExecutionStatus.valid
if executionPayloadStatus in static([
PayloadExecutionStatus.invalid,
PayloadExecutionStatus.invalid_block_hash]):
debug "runQueueProcessingLoop: execution payload invalid",
executionPayloadStatus,
blck = shortLog(blck.blck)
self.consensusManager.quarantine[].addUnviable(blck.blck.root)
# Every loop iteration ends with some version of blck.resfut.complete(),
# including processBlock(), otherwise the sync manager stalls.
if not blck.resfut.isNil:
blck.resfut.complete(
Result[void, BlockError].err(BlockError.UnviableFork))
else:
self[].processBlock(
blck,
payloadValid = executionPayloadStatus == PayloadExecutionStatus.valid)

View File

@ -31,9 +31,6 @@ const
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.1/src/engine/specification.md#request-1 # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.1/src/engine/specification.md#request-1
FORKCHOICEUPDATED_TIMEOUT* = 8.seconds FORKCHOICEUPDATED_TIMEOUT* = 8.seconds
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.1/src/engine/specification.md#request
NEWPAYLOAD_TIMEOUT* = 8.seconds
type type
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/bellatrix/beacon-chain.md#custom-types # https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/bellatrix/beacon-chain.md#custom-types
Transaction* = List[byte, Limit MAX_BYTES_PER_TRANSACTION] Transaction* = List[byte, Limit MAX_BYTES_PER_TRANSACTION]

View File

@ -207,7 +207,7 @@ template is_sync_committee_update*(update: SomeLightClientUpdate): bool =
else: else:
false false
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.3/specs/altair/light-client/sync-protocol.md#is_finality_update # https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/sync-protocol.md#is_finality_update
template is_finality_update*(update: SomeLightClientUpdate): bool = template is_finality_update*(update: SomeLightClientUpdate): bool =
when update is SomeLightClientUpdateWithFinality: when update is SomeLightClientUpdateWithFinality:
not isZeroMemory(update.finality_branch) not isZeroMemory(update.finality_branch)

View File

@ -121,8 +121,8 @@ proc routeSignedBeaconBlock*(
signature = shortLog(blck.signature), error = res.error() signature = shortLog(blck.signature), error = res.error()
let let
newBlockRef = router[].blockProcessor[].storeBlock( newBlockRef = await router[].blockProcessor.storeBlock(
MsgSource.api, sendTime, blck, true) MsgSource.api, sendTime, blck)
# The boolean we return tells the caller whether the block was integrated # The boolean we return tells the caller whether the block was integrated
# into the chain # into the chain

View File

@ -424,31 +424,17 @@ proc getExecutionPayload[T](
latestHead, latestSafe, latestFinalized, timestamp, random, latestHead, latestSafe, latestFinalized, timestamp, random,
feeRecipient, node.consensusManager.eth1Monitor)) feeRecipient, node.consensusManager.eth1Monitor))
payload = try: payload = try:
awaitWithTimeout(
get_execution_payload(payload_id, node.consensusManager.eth1Monitor),
GETPAYLOAD_TIMEOUT):
beacon_block_payload_errors.inc()
warn "Getting execution payload from Engine API timed out", payload_id
empty_execution_payload
except CatchableError as err:
beacon_block_payload_errors.inc()
warn "Getting execution payload from Engine API failed",
payload_id, err = err.msg
empty_execution_payload
executionPayloadStatus =
awaitWithTimeout( awaitWithTimeout(
node.consensusManager.eth1Monitor.newExecutionPayload(payload), get_execution_payload(payload_id, node.consensusManager.eth1Monitor),
NEWPAYLOAD_TIMEOUT): GETPAYLOAD_TIMEOUT):
info "getExecutionPayload: newPayload timed out" beacon_block_payload_errors.inc()
Opt.none PayloadExecutionStatus warn "Getting execution payload from Engine API timed out", payload_id
empty_execution_payload
if executionPayloadStatus.isNone or executionPayloadStatus.get in [ except CatchableError as err:
PayloadExecutionStatus.invalid, beacon_block_payload_errors.inc()
PayloadExecutionStatus.invalid_block_hash]: warn "Getting execution payload from Engine API failed",
info "getExecutionPayload: newExecutionPayload invalid", payload_id, err = err.msg
executionPayloadStatus empty_execution_payload
return Opt.none ExecutionPayload
return Opt.some payload return Opt.some payload
except CatchableError as err: except CatchableError as err:

View File

@ -21,6 +21,7 @@ import
../beacon_chain/eth1/eth1_monitor, ../beacon_chain/eth1/eth1_monitor,
./testutil, ./testdbutil, ./testblockutil ./testutil, ./testdbutil, ./testblockutil
from chronos/unittest2/asynctests import asyncTest
from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, init DynamicFeeRecipientsStore, init
from ../beacon_chain/validators/action_tracker import ActionTracker from ../beacon_chain/validators/action_tracker import ActionTracker
@ -56,11 +57,10 @@ suite "Block processor" & preset():
false, "", "", keys.newRng(), taskpool, consensusManager, false, "", "", keys.newRng(), taskpool, consensusManager,
validatorMonitor, getTimeFn) validatorMonitor, getTimeFn)
test "Reverse order block add & get" & preset(): asyncTest "Reverse order block add & get" & preset():
let missing = processor[].storeBlock( let missing = await processor.storeBlock(
MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, MsgSource.gossip, b2.message.slot.start_beacon_time(), b2)
payloadValid = true) check: missing.error[0] == BlockError.MissingParent
check: missing.error == BlockError.MissingParent
check: check:
not dag.containsForkBlock(b2.root) # Unresolved, shouldn't show up not dag.containsForkBlock(b2.root) # Unresolved, shouldn't show up
@ -68,9 +68,8 @@ suite "Block processor" & preset():
FetchRecord(root: b1.root) in quarantine[].checkMissing() FetchRecord(root: b1.root) in quarantine[].checkMissing()
let let
status = processor[].storeBlock( status = await processor.storeBlock(
MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, MsgSource.gossip, b2.message.slot.start_beacon_time(), b1)
payloadValid = true)
b1Get = dag.getBlockRef(b1.root) b1Get = dag.getBlockRef(b1.root)
check: check: