nimbus-eth2/beacon_chain/gossip_processing/gossip_to_consensus.nim

248 lines
8.8 KiB
Nim
Raw Normal View History

# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/math,
stew/results,
chronicles, chronos, metrics,
../spec/[crypto, datatypes, digest],
../consensus_object_pools/[block_clearance, blockchain_dag, attestation_pool],
./consensus_manager,
".."/[beacon_clock, beacon_node_types],
../ssz/sszdump
# Gossip Queue Manager
# ------------------------------------------------------------------------------
# The queue manager moves blocks from "Gossip validated" to "Consensus verified"
declareHistogram beacon_store_block_duration_seconds,
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
type
SyncBlock* = object
blk*: SignedBeaconBlock
resfut*: Future[Result[void, BlockError]]
BlockEntry* = object
# Exported for "test_sync_manager"
v*: SyncBlock
VerifQueueManager* = object
## This manages the queues of blocks and attestations.
## Blocks and attestations are enqueued in a gossip-validated state
##
## from:
## - Gossip (when synced)
## - SyncManager (during sync)
## - RequestManager (missing ancestor blocks)
##
## are then consensus-verified and added to:
## - the blockchain DAG
## - database
## - attestation pool
## - fork choice
##
## The queue manager doesn't manage exits (voluntary, attester slashing or proposer slashing)
## as don't need extra verification and can be added to the exit pool as soon as they are gossip-validated.
# Config
# ----------------------------------------------------------------
dumpEnabled: bool
dumpDirInvalid: string
dumpDirIncoming: string
# Clock
# ----------------------------------------------------------------
getWallTime: GetWallTimeFn
# Producers
# ----------------------------------------------------------------
blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager"
# Consumer
# ----------------------------------------------------------------
consensusManager: ref ConsensusManager
## Blockchain DAG, AttestationPool and Quarantine
{.push raises: [Defect].}
# Initialization
# ------------------------------------------------------------------------------
proc new*(T: type VerifQueueManager,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
consensusManager: ref ConsensusManager,
getWallTime: GetWallTimeFn): ref VerifQueueManager =
(ref VerifQueueManager)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
dumpDirIncoming: dumpDirIncoming,
getWallTime: getWallTime,
blocksQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
)
# Sync callbacks
# ------------------------------------------------------------------------------
proc done*(blk: SyncBlock) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has passed
## verification successfully.
if blk.resfut != nil:
blk.resfut.complete(Result[void, BlockError].ok())
proc fail*(blk: SyncBlock, error: BlockError) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
## verification with specific ``error``.
if blk.resfut != nil:
blk.resfut.complete(Result[void, BlockError].err(error))
proc complete*(blk: SyncBlock, res: Result[void, BlockError]) =
## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk``
## verification.
if blk.resfut != nil:
blk.resfut.complete(res)
proc hasBlocks*(self: VerifQueueManager): bool =
self.blocksQueue.len() > 0
# Enqueue
# ------------------------------------------------------------------------------
proc addBlock*(self: var VerifQueueManager, syncBlock: SyncBlock) =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for the future in the
# SyncBlock to constrain their own processing
# Producers:
# - Gossip (when synced)
# - SyncManager (during sync)
# - RequestManager (missing ancestor blocks)
# addLast doesn't fail with unbounded queues, but we'll add asyncSpawn as a
# sanity check
asyncSpawn self.blocksQueue.addLast(BlockEntry(v: syncBlock))
# Storage
# ------------------------------------------------------------------------------
proc dumpBlock*[T](
self: VerifQueueManager, signedBlock: SignedBeaconBlock,
res: Result[T, (ValidationResult, BlockError)]) =
if self.dumpEnabled and res.isErr:
case res.error[1]
of Invalid:
dump(
self.dumpDirInvalid, signedBlock)
of MissingParent:
dump(
self.dumpDirIncoming, signedBlock)
else:
discard
proc storeBlock(
self: var VerifQueueManager, signedBlock: SignedBeaconBlock,
wallSlot: Slot): Result[void, BlockError] =
let
start = Moment.now()
attestationPool = self.consensusManager.attestationPool
let blck = self.consensusManager.chainDag.addRawBlock(self.consensusManager.quarantine, signedBlock) do (
blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock,
epochRef: EpochRef, state: HashedBeaconState):
# Callback add to fork choice if valid
attestationPool[].addForkChoice(
epochRef, blckRef, trustedBlock.message, wallSlot)
self.dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr:
return err(blck.error[1])
let duration = (Moment.now() - start).toFloatSeconds()
beacon_store_block_duration_seconds.observe(duration)
ok()
# Event Loop
# ------------------------------------------------------------------------------
proc processBlock(self: var VerifQueueManager, entry: BlockEntry) =
logScope:
blockRoot = shortLog(entry.v.blk.root)
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing block before genesis, clock turned back?"
quit 1
let
start = now(chronos.Moment)
res = self.storeBlock(entry.v.blk, wallSlot)
storeDone = now(chronos.Moment)
if res.isOk():
# Eagerly update head in case the new block gets selected
self.consensusManager[].updateHead(wallSlot) # This also eagerly prunes the blocks DAG to prevent processing forks.
# self.consensusManager.pruneStateCachesDAG() # Amortized pruning, we don't prune states & fork choice here but in `onSlotEnd`()
let updateDone = now(chronos.Moment)
let storeBlockDuration = storeDone - start
let updateHeadDuration = updateDone - storeDone
let overallDuration = updateDone - start
let storeSpeed =
block:
let secs = float(chronos.seconds(1).nanoseconds)
if not(overallDuration.isZero()):
let v = secs / float(overallDuration.nanoseconds)
round(v * 10_000) / 10_000
else:
0.0
debug "Block processed",
local_head_slot = self.consensusManager.chainDag.head.slot,
store_speed = storeSpeed,
block_slot = entry.v.blk.message.slot,
store_block_duration = $storeBlockDuration,
update_head_duration = $updateHeadDuration,
overall_duration = $overallDuration
if entry.v.resFut != nil:
entry.v.resFut.complete(Result[void, BlockError].ok())
elif res.error() in {BlockError.Duplicate, BlockError.Old}:
# These are harmless / valid outcomes - for the purpose of scoring peers,
# they are ok
if entry.v.resFut != nil:
entry.v.resFut.complete(Result[void, BlockError].ok())
else:
if entry.v.resFut != nil:
entry.v.resFut.complete(Result[void, BlockError].err(res.error()))
proc runQueueProcessingLoop*(self: ref VerifQueueManager) {.async.} =
while true:
# Cooperative concurrency: one block per loop iteration - because
# we run both networking and CPU-heavy things like block processing
# on the same thread, we need to make sure that there is steady progress
# on the networking side or we get long lockups that lead to timeouts.
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case - doing so also allows us to benefit from more batching /
# larger network reads when under load.
idleTimeout = 10.milliseconds
discard await idleAsync().withTimeout(idleTimeout)
self[].processBlock(await self[].blocksQueue.popFirst())