move quarantine outside of chaindag (#3124)

* move quarantine outside of chaindag

The quarantine has been part of the ChainDAG for the longest time, but
this design has a few issues:

* the function in which blocks are verified and added to the dag becomes
reentrant and therefore difficult to reason about - we're currently
using a stateful flag to work around it
* quarantined blocks bypass the processing queue leading to a processing
stampede
* the quarantine flow is unsuitable for orphaned attestations - these
should also should be quarantined eventually

Instead of processing the quarantine inside ChainDAG, this PR moves
re-queueing to `block_processor` which already is responsible for
dealing with follow-up work when a block is added to the dag

This sets the stage for keeping attestations in the quarantine as well.

Also:

* make `BlockError` `{.pure.}`
* avoid use of `ValidationResult` in block clearance (that's for gossip)
This commit is contained in:
Jacek Sieka 2021-12-06 10:49:01 +01:00 committed by GitHub
parent a8c801eddd
commit 1a8b7469e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 464 additions and 609 deletions

View File

@ -78,15 +78,18 @@ OK: 3/3 Fail: 0/3 Skip: 0/3
OK: 1/1 Fail: 0/1 Skip: 0/1
## Block pool processing [Preset: mainnet]
```diff
+ Adding the same block twice returns a Duplicate error [Preset: mainnet] OK
+ Reverse order block add & get [Preset: mainnet] OK
+ Simple block add&get [Preset: mainnet] OK
+ getRef returns nil for missing blocks OK
+ loading tail block works [Preset: mainnet] OK
+ updateHead updates head and headState [Preset: mainnet] OK
+ updateStateData sanity [Preset: mainnet] OK
```
OK: 7/7 Fail: 0/7 Skip: 0/7
OK: 5/5 Fail: 0/5 Skip: 0/5
## Block processor [Preset: mainnet]
```diff
+ Reverse order block add & get [Preset: mainnet] OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## BlockRef and helpers [Preset: mainnet]
```diff
+ epochAncestor sanity [Preset: mainnet] OK
@ -371,4 +374,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL---
OK: 207/209 Fail: 0/209 Skip: 2/209
OK: 206/208 Fail: 0/208 Skip: 2/208

View File

@ -12,7 +12,6 @@ import
# Nimble packages
chronos, json_rpc/servers/httpserver, presto,
taskpools,
# Local modules
"."/[beacon_clock, beacon_chain_db, conf],
@ -34,7 +33,6 @@ export
type
RpcServer* = RpcHttpServer
TaskPoolPtr* = TaskPool
GossipState* = enum
Disconnected
@ -51,7 +49,7 @@ type
config*: BeaconNodeConf
attachedValidators*: ref ValidatorPool
dag*: ChainDAGRef
quarantine*: QuarantineRef
quarantine*: ref Quarantine
attestationPool*: ref AttestationPool
syncCommitteeMsgPool*: ref SyncCommitteeMsgPool
exitPool*: ref ExitPool
@ -70,7 +68,6 @@ type
attachedValidatorBalanceTotal*: uint64
gossipState*: GossipState
beaconClock*: BeaconClock
taskpool*: TaskPoolPtr
onAttestationSent*: OnAttestationCallback
restKeysCache*: Table[ValidatorPubKey, ValidatorIndex]

View File

@ -70,7 +70,7 @@ type
## after that, they may no longer affect fork choice.
dag*: ChainDAGRef
quarantine*: QuarantineRef
quarantine*: ref Quarantine
forkChoice*: ForkChoice
@ -85,7 +85,7 @@ declareGauge attestation_pool_block_attestation_packing_time,
"Time it took to create list of attestations for block"
proc init*(T: type AttestationPool, dag: ChainDAGRef,
quarantine: QuarantineRef,
quarantine: ref Quarantine,
onAttestation: OnAttestationCallback = nil): T =
## Initialize an AttestationPool from the dag `headState`
## The `finalized_root` works around the finalized_checkpoint of the genesis block
@ -727,7 +727,7 @@ proc selectHead*(pool: var AttestationPool, wallSlot: Slot): BlockRef =
# get out of sync, we'll need to try to download the selected head - in
# the meantime, return nil to indicate that no new head was chosen
warn "Fork choice selected unknown head, trying to sync", root = newHead.get()
pool.quarantine.addMissing(newHead.get())
pool.quarantine[].addMissing(newHead.get())
ret

View File

@ -13,13 +13,13 @@ import
stew/[assign2, results],
eth/keys,
".."/[beacon_clock],
../spec/[eth2_merkleization, forks, helpers, signatures, signatures_batch, state_transition],
../spec/[
eth2_merkleization, forks, helpers, signatures, signatures_batch,
state_transition],
../spec/datatypes/[phase0, altair, merge],
"."/[blockchain_dag, block_quarantine]
"."/[blockchain_dag]
from libp2p/protocols/pubsub/pubsub import ValidationResult
export results, ValidationResult
export results, signatures_batch
# Clearance
# ---------------------------------------------
@ -31,86 +31,14 @@ export results, ValidationResult
logScope:
topics = "clearance"
proc batchVerify(quarantine: QuarantineRef, sigs: openArray[SignatureSet]): bool =
var secureRandomBytes: array[32, byte]
quarantine.rng[].brHmacDrbgGenerate(secureRandomBytes)
try:
return quarantine.taskpool.batchVerify(quarantine.sigVerifCache, sigs, secureRandomBytes)
except Exception as exc:
raiseAssert exc.msg
proc addRawBlock*(
dag: ChainDAGRef, quarantine: QuarantineRef,
signedBlock: ForkySignedBeaconBlock,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded
): Result[BlockRef, (ValidationResult, BlockError)] {.gcsafe.}
# Now that we have the new block, we should see if any of the previously
# unresolved blocks magically become resolved
# TODO This code is convoluted because when there are more than ~1.5k
# blocks being synced, there's a stack overflow as `add` gets called
# for the whole chain of blocks. Instead we use this ugly field in `dag`
# which could be avoided by refactoring the code
# TODO unit test the logic, in particular interaction with fork choice block parents
proc resolveQuarantinedBlocks(
dag: ChainDAGRef, quarantine: QuarantineRef,
onBlockAdded: OnPhase0BlockAdded) =
if not quarantine.inAdd:
quarantine.inAdd = true
defer: quarantine.inAdd = false
var entries = 0
while entries != quarantine.orphansPhase0.len:
entries = quarantine.orphansPhase0.len # keep going while quarantine is shrinking
var resolved: seq[phase0.SignedBeaconBlock]
for _, v in quarantine.orphansPhase0:
if v.message.parent_root in dag:
resolved.add(v)
for v in resolved:
discard addRawBlock(dag, quarantine, v, onBlockAdded)
proc resolveQuarantinedBlocks(
dag: ChainDAGRef, quarantine: QuarantineRef,
onBlockAdded: OnAltairBlockAdded) =
if not quarantine.inAdd:
quarantine.inAdd = true
defer: quarantine.inAdd = false
var entries = 0
while entries != quarantine.orphansAltair.len:
entries = quarantine.orphansAltair.len # keep going while quarantine is shrinking
var resolved: seq[altair.SignedBeaconBlock]
for _, v in quarantine.orphansAltair:
if v.message.parent_root in dag:
resolved.add(v)
for v in resolved:
discard addRawBlock(dag, quarantine, v, onBlockAdded)
proc resolveQuarantinedBlocks(
dag: ChainDAGRef, quarantine: QuarantineRef,
onBlockAdded: OnMergeBlockAdded) =
if not quarantine.inAdd:
quarantine.inAdd = true
defer: quarantine.inAdd = false
var entries = 0
while entries != quarantine.orphansMerge.len:
entries = quarantine.orphansMerge.len # keep going while quarantine is shrinking
var resolved: seq[merge.SignedBeaconBlock]
for _, v in quarantine.orphansMerge:
if v.message.parent_root in dag:
resolved.add(v)
for v in resolved:
discard addRawBlock(dag, quarantine, v, onBlockAdded)
proc addResolvedBlock(
dag: ChainDAGRef, quarantine: QuarantineRef,
dag: ChainDAGRef,
state: var StateData,
trustedBlock: ForkyTrustedSignedBeaconBlock,
parent: BlockRef, cache: var StateCache,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded,
stateDataDur, sigVerifyDur, stateVerifyDur: Duration
) =
): BlockRef =
doAssert getStateField(state.data, slot) == trustedBlock.message.slot,
"state must match block"
doAssert state.blck.root == trustedBlock.message.parent_root,
@ -169,7 +97,7 @@ proc addResolvedBlock(
if not(isNil(dag.onBlockAdded)):
dag.onBlockAdded(ForkedTrustedSignedBeaconBlock.init(trustedBlock))
resolveQuarantinedBlocks(dag, quarantine, onBlockAdded)
blockRef
# TODO workaround for https://github.com/nim-lang/Nim/issues/18095
type SomeSignedBlock =
@ -181,7 +109,7 @@ type SomeSignedBlock =
merge.TrustedSignedBeaconBlock
proc checkStateTransition(
dag: ChainDAGRef, signedBlock: SomeSignedBlock,
cache: var StateCache): (ValidationResult, BlockError) =
cache: var StateCache): Result[void, BlockError] =
## Ensure block can be applied on a state
func restore(v: var ForkedHashedBeaconState) =
# TODO address this ugly workaround - there should probably be a
@ -199,8 +127,9 @@ proc checkStateTransition(
cache, dag.updateFlags, restore):
info "Invalid block"
return (ValidationResult.Reject, Invalid)
return (ValidationResult.Accept, default(BlockError))
err(BlockError.Invalid)
else:
ok()
proc advanceClearanceState*(dag: ChainDAGRef) =
# When the chain is synced, the most likely block to be produced is the block
@ -220,25 +149,57 @@ proc advanceClearanceState*(dag: ChainDAGRef) =
debug "Prepared clearance state for next block",
next, updateStateDur = Moment.now() - startTick
proc addRawBlockKnownParent(
dag: ChainDAGRef, quarantine: QuarantineRef,
signedBlock: ForkySignedBeaconBlock,
parent: BlockRef,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded
): Result[BlockRef, (ValidationResult, BlockError)] =
## Add a block whose parent is known, after performing validity checks
proc addRawBlock*(
dag: ChainDAGRef, verifier: var BatchVerifier,
signedBlock: ForkySignedBeaconBlock,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded
): Result[BlockRef, BlockError] =
## Try adding a block to the chain, verifying first that it passes the state
## transition function and contains correct cryptographic signature.
##
## Cryptographic checks can be skipped by adding skipBLSValidation to dag.updateFlags
logScope:
blockRoot = shortLog(signedBlock.root)
blck = shortLog(signedBlock.message)
signature = shortLog(signedBlock.signature)
template blck(): untyped = signedBlock.message # shortcuts without copy
template blockRoot(): untyped = signedBlock.root
if blockRoot in dag:
debug "Block already exists"
# We should not call the block added callback for blocks that already
# existed in the pool, as that may confuse consumers such as the fork
# choice. While the validation result won't be accessed, it's IGNORE,
# according to the spec.
return err(BlockError.Duplicate)
# If the block we get is older than what we finalized already, we drop it.
# One way this can happen is that we start request a block and finalization
# happens in the meantime - the block we requested will then be stale
# by the time it gets here.
if blck.slot <= dag.finalizedHead.slot:
debug "Old block, dropping",
finalizedHead = shortLog(dag.finalizedHead),
tail = shortLog(dag.tail)
# Doesn't correspond to any specific validation condition, and still won't
# be used, but certainly would be IGNORE.
return err(BlockError.UnviableFork)
let parent = dag.getRef(blck.parent_root)
if parent == nil:
debug "Block parent unknown"
return err(BlockError.MissingParent)
if parent.slot >= signedBlock.message.slot:
# A block whose parent is newer than the block itself is clearly invalid -
# discard it immediately
info "Block with invalid parent, dropping",
debug "Block with invalid parent, dropping",
parentBlock = shortLog(parent)
return err((ValidationResult.Reject, Invalid))
return err(BlockError.Invalid)
if (parent.slot < dag.finalizedHead.slot) or
(parent.slot == dag.finalizedHead.slot and
@ -251,15 +212,11 @@ proc addRawBlockKnownParent(
# correct - from their point of view, the head block they have is the
# latest thing that happened on the chain and they're performing their
# duty correctly.
info "Unviable block, dropping",
debug "Unviable block, dropping",
finalizedHead = shortLog(dag.finalizedHead),
tail = shortLog(dag.tail)
return err((ValidationResult.Ignore, Unviable))
# The block might have been in either of `orphans` or `missing` - we don't
# want any more work done on its behalf
quarantine.removeOrphan(signedBlock)
return err(BlockError.UnviableFork)
# The block is resolved, now it's time to validate it to ensure that the
# blocks we add to the database are clean for the given state
@ -281,116 +238,23 @@ proc addRawBlockKnownParent(
err = e.error()
# A PublicKey or Signature isn't on the BLS12-381 curve
return err((ValidationResult.Reject, Invalid))
if not quarantine.batchVerify(sigs):
return err(BlockError.Invalid)
if not verifier.batchVerify(sigs):
info "Block signature verification failed"
return err((ValidationResult.Reject, Invalid))
return err(BlockError.Invalid)
let sigVerifyTick = Moment.now()
let (valRes, blockErr) = checkStateTransition(
dag, signedBlock.asSigVerified(), cache)
if valRes != ValidationResult.Accept:
return err((valRes, blockErr))
? checkStateTransition(dag, signedBlock.asSigVerified(), cache)
let stateVerifyTick = Moment.now()
# Careful, clearanceState.data has been updated but not blck - we need to
# create the BlockRef first!
addResolvedBlock(
dag, quarantine, dag.clearanceState,
ok addResolvedBlock(
dag, dag.clearanceState,
signedBlock.asTrusted(),
parent, cache,
onBlockAdded,
stateDataDur = stateDataTick - startTick,
sigVerifyDur = sigVerifyTick - stateDataTick,
stateVerifyDur = stateVerifyTick - sigVerifyTick)
return ok dag.clearanceState.blck
proc addRawBlockUnresolved(
dag: ChainDAGRef,
quarantine: QuarantineRef,
signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock |
merge.SignedBeaconBlock,
): Result[BlockRef, (ValidationResult, BlockError)] =
## addRawBlock - Block is unresolved / has no parent
logScope:
blockRoot = shortLog(signedBlock.root)
blck = shortLog(signedBlock.message)
# This is an unresolved block - add it to the quarantine, which will cause its
# parent to be scheduled for downloading
if not quarantine.add(dag, signedBlock):
debug "Block quarantine full"
if signedBlock.message.parent_root in quarantine.missing or
containsOrphan(quarantine, signedBlock):
debug "Unresolved block (parent missing or orphaned)",
orphansPhase0 = quarantine.orphansPhase0.len,
orphansAltair = quarantine.orphansAltair.len,
missing = quarantine.missing.len
return err((ValidationResult.Ignore, MissingParent))
# TODO if we receive spam blocks, one heurestic to implement might be to wait
# for a couple of attestations to appear before fetching parents - this
# would help prevent using up network resources for spam - this serves
# two purposes: one is that attestations are likely to appear for the
# block only if it's valid / not spam - the other is that malicious
# validators that are not proposers can sign invalid blocks and send
# them out without penalty - but signing invalid attestations carries
# a risk of being slashed, making attestations a more valuable spam
# filter.
debug "Unresolved block (parent missing)",
orphansPhase0 = quarantine.orphansPhase0.len,
orphansAltair = quarantine.orphansAltair.len,
missing = quarantine.missing.len
return err((ValidationResult.Ignore, MissingParent))
proc addRawBlock(
dag: ChainDAGRef, quarantine: QuarantineRef,
signedBlock: ForkySignedBeaconBlock,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded
): Result[BlockRef, (ValidationResult, BlockError)] =
## Try adding a block to the chain, verifying first that it passes the state
## transition function and contains correct cryptographic signature.
##
## Cryptographic checks can be skipped by adding skipBLSValidation to dag.updateFlags
logScope:
blockRoot = shortLog(signedBlock.root)
blck = shortLog(signedBlock.message)
template blck(): untyped = signedBlock.message # shortcuts without copy
template blockRoot(): untyped = signedBlock.root
if blockRoot in dag:
debug "Block already exists"
# We should not call the block added callback for blocks that already
# existed in the pool, as that may confuse consumers such as the fork
# choice. While the validation result won't be accessed, it's IGNORE,
# according to the spec.
return err((ValidationResult.Ignore, Duplicate))
quarantine.missing.del(blockRoot)
# If the block we get is older than what we finalized already, we drop it.
# One way this can happen is that we start resolving a block and finalization
# happens in the meantime - the block we requested will then be stale
# by the time it gets here.
if blck.slot <= dag.finalizedHead.slot:
debug "Old block, dropping",
finalizedHead = shortLog(dag.finalizedHead),
tail = shortLog(dag.tail)
# Doesn't correspond to any specific validation condition, and still won't
# be used, but certainly would be IGNORE.
return err((ValidationResult.Ignore, Unviable))
let parent = dag.getRef(blck.parent_root)
if parent != nil:
return addRawBlockKnownParent(dag, quarantine, signedBlock, parent, onBlockAdded)
return addRawBlockUnresolved(dag, quarantine, signedBlock)

View File

@ -12,7 +12,6 @@ import
std/[sets, tables, hashes],
# Status libraries
stew/endians2, chronicles,
eth/keys, taskpools,
# Internals
../spec/[signatures_batch, forks],
../spec/datatypes/[phase0, altair, merge],
@ -20,33 +19,26 @@ import
export sets, tables
# #############################################
#
# Quarantine & DAG
#
# #############################################
#
# The Quarantine and DagChain data structures
# keep track respectively of unsafe blocks coming from the network
# and blocks that underwent verification and have a resolved path to
# the last finalized block known.
# ChainDAG and types related to forming a DAG of blocks, keeping track of their
# relationships and allowing various forms of lookups
type
BlockError* = enum
BlockError* {.pure.} = enum
Invalid ##\
## Block is broken / doesn't apply cleanly - whoever sent it is fishy (or
## we're buggy)
MissingParent ##\
## We don't know the parent of this block so we can't tell if it's valid
## or not - it'll go into the quarantine and be reexamined when the parent
## appears or be discarded if finality obsoletes it
Unviable ##\
UnviableFork ##\
## Block is from a different history / fork than the one we're interested
## in (based on our finalized checkpoint)
Invalid ##\
## Block is broken / doesn't apply cleanly - whoever sent it is fishy (or
## we're buggy)
Old
Duplicate
Duplicate ##\
## We've seen this block already, can't add again
OnBlockCallback* =
proc(data: ForkedTrustedSignedBeaconBlock) {.gcsafe, raises: [Defect].}
@ -57,48 +49,6 @@ type
OnFinalizedCallback* =
proc(data: FinalizationInfoObject) {.gcsafe, raises: [Defect].}
QuarantineRef* = ref object
## Keeps track of unsafe blocks coming from the network
## and that cannot be added to the chain
##
## This only stores valid blocks that cannot be linked to the
## ChainDAGRef DAG due to missing ancestor(s).
##
## Invalid blocks are dropped immediately.
orphansPhase0*: Table[(Eth2Digest, ValidatorSig), phase0.SignedBeaconBlock] ##\
## Phase 0 Blocks that have passed validation but that we lack a link back
## to tail for - when we receive a "missing link", we can use this data to
## build an entire branch
orphansAltair*: Table[(Eth2Digest, ValidatorSig), altair.SignedBeaconBlock] ##\
## Altair Blocks that have passed validation, but that we lack a link back
## to tail for - when we receive a "missing link", we can use this data to
## build an entire branch
orphansMerge*: Table[(Eth2Digest, ValidatorSig), merge.SignedBeaconBlock] ##\
## Merge Blocks which have passed validation, but that we lack a link back
## to tail for - when we receive a "missing link", we can use this data to
## build an entire branch
missing*: Table[Eth2Digest, MissingBlock] ##\
## Roots of blocks that we would like to have (either parent_root of
## unresolved blocks or block roots of attestations)
sigVerifCache*: BatchedBLSVerifierCache ##\
## A cache for batch BLS signature verification contexts
rng*: ref BrHmacDrbgContext ##\
## A reference to the Nimbus application-wide RNG
inAdd*: bool
taskpool*: TaskPoolPtr
TaskPoolPtr* = TaskPool
MissingBlock* = object
tries*: int
FetchRecord* = object
root*: Eth2Digest

View File

@ -8,23 +8,47 @@
{.push raises: [Defect].}
import
std/[tables, options],
std/[tables],
chronicles,
stew/bitops2,
eth/keys,
../spec/forks,
../spec/datatypes/[phase0, altair, merge],
./block_pools_types
export options, block_pools_types
export tables, forks, block_pools_types
const
MaxMissingItems = 1024
type
MissingBlock* = object
tries*: int
Quarantine* = object
## Keeps track of unvalidated blocks coming from the network
## and that cannot yet be added to the chain
##
## This only stores blocks that cannot be linked to the
## ChainDAGRef DAG due to missing ancestor(s).
##
## Trivially invalid blocks may be dropped before reaching this stage.
orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock] ##\
## Blocks that we don't have a parent for - when we resolve the parent, we
## can proceed to resolving the block as well - we index this by root and
## signature such that a block with invalid signature won't cause a block
## with a valid signature to be dropped
missing*: Table[Eth2Digest, MissingBlock] ##\
## Roots of blocks that we would like to have (either parent_root of
## unresolved blocks or block roots of attestations)
logScope:
topics = "quarant"
func init*(T: type QuarantineRef, rng: ref BrHmacDrbgContext, taskpool: TaskpoolPtr): T =
T(rng: rng, taskpool: taskpool)
func init*(T: type Quarantine): T =
T()
func checkMissing*(quarantine: QuarantineRef): seq[FetchRecord] =
func checkMissing*(quarantine: var Quarantine): seq[FetchRecord] =
## Return a list of blocks that we should try to resolve from other client -
## to be called periodically but not too often (once per slot?)
var done: seq[Eth2Digest]
@ -54,61 +78,28 @@ template anyIt(s, pred: untyped): bool =
break
result
func containsOrphan*(
quarantine: QuarantineRef, signedBlock: phase0.SignedBeaconBlock): bool =
(signedBlock.root, signedBlock.signature) in quarantine.orphansPhase0
func containsOrphan*(
quarantine: QuarantineRef, signedBlock: altair.SignedBeaconBlock): bool =
(signedBlock.root, signedBlock.signature) in quarantine.orphansAltair
func containsOrphan*(
quarantine: QuarantineRef, signedBlock: merge.SignedBeaconBlock): bool =
(signedBlock.root, signedBlock.signature) in quarantine.orphansMerge
func addMissing*(quarantine: QuarantineRef, root: Eth2Digest) =
func addMissing*(quarantine: var Quarantine, root: Eth2Digest) =
## Schedule the download a the given block
# Can only request by root, not by signature, so partial match suffices
if (not anyIt(quarantine.orphansMerge.keys, it[0] == root)) and
(not anyIt(quarantine.orphansAltair.keys, it[0] == root)) and
(not anyIt(quarantine.orphansPhase0.keys, it[0] == root)):
if quarantine.missing.len >= MaxMissingItems:
return
# It's not really missing if we're keeping it in the quarantine
if (not anyIt(quarantine.orphans.keys, it[0] == root)):
# If the block is in orphans, we no longer need it
discard quarantine.missing.hasKeyOrPut(root, MissingBlock())
# TODO workaround for https://github.com/nim-lang/Nim/issues/18095
# copy of phase0.SomeSignedBeaconBlock from datatypes/phase0.nim
type SomeSignedPhase0Block =
phase0.SignedBeaconBlock | phase0.SigVerifiedSignedBeaconBlock |
phase0.TrustedSignedBeaconBlock
func removeOrphan*(
quarantine: QuarantineRef, signedBlock: SomeSignedPhase0Block) =
quarantine.orphansPhase0.del((signedBlock.root, signedBlock.signature))
# TODO workaround for https://github.com/nim-lang/Nim/issues/18095
# copy of altair.SomeSignedBeaconBlock from datatypes/altair.nim
type SomeSignedAltairBlock =
altair.SignedBeaconBlock | altair.SigVerifiedSignedBeaconBlock |
altair.TrustedSignedBeaconBlock
func removeOrphan*(
quarantine: QuarantineRef, signedBlock: SomeSignedAltairBlock) =
quarantine.orphansAltair.del((signedBlock.root, signedBlock.signature))
# TODO workaround for https://github.com/nim-lang/Nim/issues/18095
# copy of merge.SomeSignedBeaconBlock from datatypes/merge.nim
type SomeSignedMergeBlock =
merge.SignedBeaconBlock | merge.SigVerifiedSignedBeaconBlock |
merge.TrustedSignedBeaconBlock
func removeOrphan*(
quarantine: QuarantineRef, signedBlock: SomeSignedMergeBlock) =
quarantine.orphansMerge.del((signedBlock.root, signedBlock.signature))
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.orphans.del((signedBlock.root, signedBlock.signature))
func isViableOrphan(
dag: ChainDAGRef, signedBlock: ForkySignedBeaconBlock): bool =
dag: ChainDAGRef, signedBlock: ForkedSignedBeaconBlock): bool =
# The orphan must be newer than the finalization point so that its parent
# either is the finalized block or more recent
signedBlock.message.slot > dag.finalizedHead.slot
let slot = withBlck(signedBlock): blck.message.slot
slot > dag.finalizedHead.slot
func removeOldBlocks(quarantine: QuarantineRef, dag: ChainDAGRef) =
func removeOldBlocks(quarantine: var Quarantine, dag: ChainDAGRef) =
var oldBlocks: seq[(Eth2Digest, ValidatorSig)]
template removeNonviableOrphans(orphans: untyped) =
@ -119,14 +110,10 @@ func removeOldBlocks(quarantine: QuarantineRef, dag: ChainDAGRef) =
for k in oldBlocks:
orphans.del k
removeNonviableOrphans(quarantine.orphansPhase0)
removeNonviableOrphans(quarantine.orphansAltair)
removeNonviableOrphans(quarantine.orphansMerge)
removeNonviableOrphans(quarantine.orphans)
func clearQuarantine*(quarantine: QuarantineRef) =
quarantine.orphansPhase0.clear()
quarantine.orphansAltair.clear()
quarantine.orphansMerge.clear()
func clearQuarantine*(quarantine: var Quarantine) =
quarantine.orphans.clear()
quarantine.missing.clear()
# Typically, blocks will arrive in mostly topological order, with some
@ -147,8 +134,8 @@ func clearQuarantine*(quarantine: QuarantineRef) =
# good a number as any.
const MAX_QUARANTINE_ORPHANS = SLOTS_PER_EPOCH
func add*(quarantine: QuarantineRef, dag: ChainDAGRef,
signedBlock: phase0.SignedBeaconBlock): bool =
func add*(quarantine: var Quarantine, dag: ChainDAGRef,
signedBlock: ForkedSignedBeaconBlock): bool =
## Adds block to quarantine's `orphans` and `missing` lists.
if not isViableOrphan(dag, signedBlock):
return false
@ -157,55 +144,27 @@ func add*(quarantine: QuarantineRef, dag: ChainDAGRef,
# Even if the quarantine is full, we need to schedule its parent for
# downloading or we'll never get to the bottom of things
quarantine.addMissing(signedBlock.message.parent_root)
withBlck(signedBlock): quarantine.addMissing(blck.message.parent_root)
if quarantine.orphansPhase0.lenu64 >= MAX_QUARANTINE_ORPHANS:
if quarantine.orphans.lenu64 >= MAX_QUARANTINE_ORPHANS:
return false
quarantine.orphansPhase0[(signedBlock.root, signedBlock.signature)] =
quarantine.orphans[(signedBlock.root, signedBlock.signature)] =
signedBlock
quarantine.missing.del(signedBlock.root)
true
func add*(quarantine: QuarantineRef, dag: ChainDAGRef,
signedBlock: altair.SignedBeaconBlock): bool =
## Adds block to quarantine's `orphans` and `missing` lists.
if not isViableOrphan(dag, signedBlock):
return false
iterator pop*(quarantine: var Quarantine, root: Eth2Digest):
ForkedSignedBeaconBlock =
# Pop orphans whose parent is the block identified by `root`
quarantine.removeOldBlocks(dag)
var toRemove: seq[(Eth2Digest, ValidatorSig)]
defer: # Run even if iterator is not carried to termination
for k in toRemove:
quarantine.orphans.del k
# Even if the quarantine is full, we need to schedule its parent for
# downloading or we'll never get to the bottom of things
quarantine.addMissing(signedBlock.message.parent_root)
if quarantine.orphansAltair.lenu64 >= MAX_QUARANTINE_ORPHANS:
return false
quarantine.orphansAltair[(signedBlock.root, signedBlock.signature)] =
signedBlock
quarantine.missing.del(signedBlock.root)
true
func add*(quarantine: QuarantineRef, dag: ChainDAGRef,
signedBlock: merge.SignedBeaconBlock): bool =
## Adds block to quarantine's `orphans` and `missing` lists.
if not isViableOrphan(dag, signedBlock):
return false
quarantine.removeOldBlocks(dag)
# Even if the quarantine is full, we need to schedule its parent for
# downloading or we'll never get to the bottom of things
quarantine.addMissing(signedBlock.message.parent_root)
if quarantine.orphansMerge.lenu64 >= MAX_QUARANTINE_ORPHANS:
return false
quarantine.orphansMerge[(signedBlock.root, signedBlock.signature)] =
signedBlock
quarantine.missing.del(signedBlock.root)
true
for k, v in quarantine.orphans:
if getForkedBlockField(v, parent_root) == root:
toRemove.add(k)
yield v

View File

@ -1180,7 +1180,7 @@ proc pruneStateCachesDAG*(dag: ChainDAGRef) =
proc updateHead*(
dag: ChainDAGRef,
newHead: BlockRef,
quarantine: QuarantineRef) =
quarantine: var Quarantine) =
## Update what we consider to be the current head, as given by the fork
## choice.
##

View File

@ -20,8 +20,8 @@ There are multiple consumers of validated consensus objects:
- a `ValidationResult.Accept` output triggers rebroadcasting in libp2p
- We jump into method `validate(PubSub, Message)` in libp2p/protocols/pubsub/pubsub.nim
- which was called by `rpcHandler(GossipSub, PubSubPeer, RPCMsg)`
- a `blockValidator` message enqueues the validated object to the processing queue in block_processor
- `blocksQueue: AsyncQueue[BlockEntry]` (shared with request_manager and sync_manager)
- a `blockValidator` message enqueues the validated object to the processing queue in `block_processor`
- `blockQueue: AsyncQueue[BlockEntry]` (shared with request_manager and sync_manager)
- This queue is then regularly processed to be made available to the consensus object pools.
- a `xyzValidator` message adds the validated object to a pool in eth2_processor
- Attestations (unaggregated and aggregated) get collected into batches.

View File

@ -10,18 +10,10 @@
import
# Status
chronicles, chronos,
stew/results,
eth/keys, taskpools,
# Internals
../spec/[helpers, signatures_batch],
../spec/datatypes/base,
../consensus_object_pools/[
blockchain_dag, block_quarantine, attestation_pool, exit_pool,
block_pools_types, spec_cache
],
".."/[beacon_clock]
../spec/signatures_batch,
../consensus_object_pools/[blockchain_dag, spec_cache]
export BrHmacDrbgContext
export signatures_batch, blockchain_dag
logScope:
topics = "gossip_checks"
@ -62,15 +54,11 @@ type
## Eager is used to enable eager processing of attestations when it's
## prudent to do so (instead of leaving the CPU for other, presumably more
## important work like block processing)
sigVerifCache: BatchedBLSVerifierCache ##\
## A cache for batch BLS signature verification contexts
rng: ref BrHmacDrbgContext ##\
## A reference to the Nimbus application-wide RNG
pruneTime: Moment ## :ast time we had to prune something
## A pointer to the Nimbus application-wide threadpool
taskpool: TaskPoolPtr
##
verifier: BatchVerifier
pruneTime: Moment ## :ast time we had to prune something
TaskPoolPtr* = TaskPool
const
# We cap waiting for an idle slot in case there's a lot of network traffic
@ -90,7 +78,10 @@ const
proc new*(
T: type BatchCrypto, rng: ref BrHmacDrbgContext,
eager: Eager, taskpool: TaskPoolPtr): ref BatchCrypto =
(ref BatchCrypto)(rng: rng, eager: eager, pruneTime: Moment.now(), taskpool: taskpool)
(ref BatchCrypto)(
verifier: BatchVerifier(rng: rng, taskpool: taskpool),
eager: eager,
pruneTime: Moment.now())
func len(batch: Batch): int =
doAssert batch.resultsBuffer.len() == batch.pendingBuffer.len()
@ -148,16 +139,7 @@ proc processBatch(batchCrypto: ref BatchCrypto) =
let startTick = Moment.now()
var secureRandomBytes: array[32, byte]
batchCrypto[].rng[].brHmacDrbgGenerate(secureRandomBytes)
let ok = try:
batchCrypto.taskpool.batchVerify(
batchCrypto.sigVerifCache,
batch.pendingBuffer,
secureRandomBytes)
except Exception as exc:
raiseAssert exc.msg
let ok = batchCrypto.verifier.batchVerify(batch.pendingBuffer)
trace "batch crypto - finished",
batchSize,

View File

@ -12,13 +12,15 @@ import
stew/results,
chronicles, chronos, metrics,
../spec/datatypes/[phase0, altair, merge],
../spec/[forks],
../consensus_object_pools/[block_clearance, blockchain_dag, attestation_pool],
../spec/[forks, signatures_batch],
../consensus_object_pools/[
attestation_pool, block_clearance, blockchain_dag, block_quarantine,
spec_cache],
./consensus_manager,
".."/[beacon_clock],
../sszdump
export sszdump
export sszdump, signatures_batch
# Block Processor
# ------------------------------------------------------------------------------
@ -48,6 +50,9 @@ type
## - database
## - attestation pool
## - fork choice
##
## The processor will also reinsert blocks from the quarantine, should a
## parent be found.
# Config
# ----------------------------------------------------------------
@ -57,7 +62,7 @@ type
# Producers
# ----------------------------------------------------------------
blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager"
blockQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager"
# Consumer
# ----------------------------------------------------------------
@ -65,21 +70,26 @@ type
## Blockchain DAG, AttestationPool and Quarantine
getBeaconTime: GetBeaconTimeFn
verifier: BatchVerifier
# Initialization
# ------------------------------------------------------------------------------
proc new*(T: type BlockProcessor,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
rng: ref BrHmacDrbgContext, taskpool: TaskPoolPtr,
consensusManager: ref ConsensusManager,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
dumpDirIncoming: dumpDirIncoming,
blocksQueue: newAsyncQueue[BlockEntry](),
blockQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
getBeaconTime: getBeaconTime)
getBeaconTime: getBeaconTime,
verifier: BatchVerifier(rng: rng, taskpool: taskpool)
)
# Sync callbacks
# ------------------------------------------------------------------------------
@ -97,7 +107,7 @@ proc fail*(entry: BlockEntry, error: BlockError) =
entry.resfut.complete(Result[void, BlockError].err(error))
proc hasBlocks*(self: BlockProcessor): bool =
self.blocksQueue.len() > 0
self.blockQueue.len() > 0
# Enqueue
# ------------------------------------------------------------------------------
@ -118,7 +128,7 @@ proc addBlock*(
# addLast doesn't fail with unbounded queues, but we'll add asyncSpawn as a
# sanity check
try:
self.blocksQueue.addLastNoWait(BlockEntry(
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur))
@ -129,21 +139,19 @@ proc addBlock*(
# ------------------------------------------------------------------------------
proc dumpInvalidBlock*(
self: BlockProcessor,
signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock |
merge.SignedBeaconBlock) =
self: BlockProcessor, signedBlock: ForkySignedBeaconBlock) =
if self.dumpEnabled:
dump(self.dumpDirInvalid, signedBlock)
proc dumpBlock*[T](
self: BlockProcessor,
signedBlock: ForkySignedBeaconBlock,
res: Result[T, (ValidationResult, BlockError)]) =
res: Result[T, BlockError]) =
if self.dumpEnabled and res.isErr:
case res.error[1]
of Invalid:
case res.error
of BlockError.Invalid:
self.dumpInvalidBlock(signedBlock)
of MissingParent:
of BlockError.MissingParent:
dump(self.dumpDirIncoming, signedBlock)
else:
discard
@ -156,10 +164,18 @@ proc storeBlock*(
let
attestationPool = self.consensusManager.attestationPool
startTick = Moment.now()
dag = self.consensusManager.dag
# The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root)
# We'll also remove the block as an orphan: it's unlikely the parent is
# missing if we get this far - should that be the case, the block will
# be re-added later
self.consensusManager.quarantine[].removeOrphan(signedBlock)
type Trusted = typeof signedBlock.asTrusted()
let blck = self.consensusManager.dag.addRawBlock(
self.consensusManager.quarantine, signedBlock) do (
let blck = dag.addRawBlock(self.verifier, signedBlock) do (
blckRef: BlockRef, trustedBlock: Trusted, epochRef: EpochRef):
# Callback add to fork choice if valid
attestationPool[].addForkChoice(
@ -170,8 +186,16 @@ proc storeBlock*(
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr:
return err(blck.error[1])
if blck.isErr():
if blck.error() == BlockError.MissingParent:
if not self.consensusManager.quarantine[].add(
dag, ForkedSignedBeaconBlock.init(signedBlock)):
debug "Block quarantine full",
blockRoot = shortLog(signedBlock.root),
blck = shortLog(signedBlock.message),
signature = shortLog(signedBlock.signature)
return blck
let storeBlockTick = Moment.now()
@ -191,7 +215,11 @@ proc storeBlock*(
blockSlot = blck.get().slot,
validationDur, queueDur, storeBlockDur, updateHeadDur
ok(blck.get())
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent
self.addBlock(quarantined)
blck
# Event Loop
# ------------------------------------------------------------------------------
@ -212,8 +240,8 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
res = withBlck(entry.blck):
self.storeBlock(blck, wallSlot, entry.queueTick, entry.validationDur)
if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}):
# Duplicate and old blocks are ok from a sync point of view, so we mark
if res.isOk() or res.error() == BlockError.Duplicate:
# Duplicate blocks are ok from a sync point of view, so we mark
# them as successful
entry.done()
else:
@ -234,4 +262,4 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
discard await idleAsync().withTimeout(idleTimeout)
self[].processBlock(await self[].blocksQueue.popFirst())
self[].processBlock(await self[].blockQueue.popFirst())

View File

@ -10,7 +10,7 @@
import
chronicles, chronos,
../spec/datatypes/base,
../consensus_object_pools/[blockchain_dag, attestation_pool]
../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool]
# TODO: Move to "consensus_object_pools" folder
@ -26,7 +26,7 @@ type
# Missing info
# ----------------------------------------------------------------
quarantine*: QuarantineRef
quarantine*: ref Quarantine
# Initialization
# ------------------------------------------------------------------------------
@ -34,7 +34,7 @@ type
proc new*(T: type ConsensusManager,
dag: ChainDAGRef,
attestationPool: ref AttestationPool,
quarantine: QuarantineRef
quarantine: ref Quarantine
): ref ConsensusManager =
(ref ConsensusManager)(
dag: dag,
@ -87,7 +87,7 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
self.dag.updateHead(newHead, self.quarantine)
self.dag.updateHead(newHead, self.quarantine[])
self.checkExpectedBlock()

View File

@ -9,21 +9,21 @@
import
std/tables,
stew/results,
chronicles, chronos, metrics,
stew/results, bearssl,
chronicles, chronos, metrics, taskpools,
../spec/[helpers, forks],
../spec/datatypes/[altair, phase0],
../consensus_object_pools/[
block_clearance, blockchain_dag, exit_pool, attestation_pool,
block_clearance, block_quarantine, blockchain_dag, exit_pool, attestation_pool,
sync_committee_msg_pool],
../validators/validator_pool,
../beacon_clock,
"."/[gossip_validation, block_processor, batch_validation]
export
results, block_clearance, blockchain_dag, exit_pool, attestation_pool,
results, bearssl, taskpools, block_clearance, blockchain_dag, exit_pool, attestation_pool,
sync_committee_msg_pool, validator_pool, beacon_clock, gossip_validation,
block_processor, batch_validation
block_processor, batch_validation, block_quarantine
# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received,
@ -122,7 +122,7 @@ type
# Missing information
# ----------------------------------------------------------------
quarantine*: QuarantineRef
quarantine*: ref Quarantine
# Application-provided current time provider (to facilitate testing)
getCurrentBeaconTime*: GetBeaconTimeFn
@ -140,10 +140,10 @@ proc new*(T: type Eth2Processor,
exitPool: ref ExitPool,
validatorPool: ref ValidatorPool,
syncCommitteeMsgPool: ref SyncCommitteeMsgPool,
quarantine: QuarantineRef,
quarantine: ref Quarantine,
rng: ref BrHmacDrbgContext,
getBeaconTime: GetBeaconTimeFn,
taskpool: batch_validation.TaskPoolPtr
taskpool: TaskPoolPtr
): ref Eth2Processor =
(ref Eth2Processor)(
doppelGangerDetectionEnabled: doppelGangerDetectionEnabled,

View File

@ -105,7 +105,7 @@ func check_beacon_and_target_block(
# of the block in the pool.
let blck = pool.dag.getRef(data.beacon_block_root)
if blck.isNil:
pool.quarantine.addMissing(data.beacon_block_root)
pool.quarantine[].addMissing(data.beacon_block_root)
return errIgnore("Attestation block unknown")
# Not in spec - check that rewinding to the state is sane
@ -173,7 +173,7 @@ template checkedReject(error: ValidationError): untyped =
# https://github.com/ethereum/consensus-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_block
proc validateBeaconBlock*(
dag: ChainDAGRef, quarantine: QuarantineRef,
dag: ChainDAGRef, quarantine: ref Quarantine,
signed_beacon_block: phase0.SignedBeaconBlock | altair.SignedBeaconBlock,
wallTime: BeaconTime, flags: UpdateFlags): Result[void, ValidationError] =
# In general, checks are ordered from cheap to expensive. Especially, crypto
@ -245,10 +245,9 @@ proc validateBeaconBlock*(
# [REJECT] The block's parent (defined by block.parent_root) passes validation.
let parent_ref = dag.getRef(signed_beacon_block.message.parent_root)
if parent_ref.isNil:
# Pending dag gets checked via `ChainDAGRef.add(...)` later, and relevant
# checks are performed there. In usual paths beacon_node adds blocks via
# ChainDAGRef.add(...) directly, with no additional validity checks.
if not quarantine.add(dag, signed_beacon_block):
# When the parent is missing, we can't validate the block - we'll queue it
# in the quarantine for later processing
if not quarantine[].add(dag, ForkedSignedBeaconBlock.init(signed_beacon_block)):
debug "Block quarantine full"
return errIgnore("BeaconBlock: Parent not found")

View File

@ -299,7 +299,7 @@ proc init*(T: type BeaconNode,
else: {}
dag = ChainDAGRef.init(cfg, db, chainDagFlags, onBlockAdded, onHeadChanged,
onChainReorg, onFinalization)
quarantine = QuarantineRef.init(rng, taskpool)
quarantine = newClone(Quarantine.init())
databaseGenesisValidatorsRoot =
getStateField(dag.headState.data, genesis_validators_root)
@ -405,7 +405,7 @@ proc init*(T: type BeaconNode,
)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
consensusManager, getBeaconTime)
rng, taskpool, consensusManager, getBeaconTime)
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, dag, attestationPool, exitPool, validatorPool,
@ -440,7 +440,6 @@ proc init*(T: type BeaconNode,
consensusManager: consensusManager,
gossipState: GossipState.Disconnected,
beaconClock: beaconClock,
taskpool: taskpool,
onAttestationSent: onAttestationSent,
)
@ -940,7 +939,7 @@ proc onSlotStart(
await onSlotEnd(node, wallSlot)
proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.quarantine.checkMissing()
let missingBlocks = node.quarantine[].checkMissing()
if missingBlocks.len > 0:
debug "Requesting detected missing blocks", blocks = shortLog(missingBlocks)
node.requestManager.fetchAncestorBlocks(missingBlocks)

View File

@ -11,6 +11,8 @@ import
# Status lib
blscurve,
stew/[byteutils, results],
taskpools,
bearssl,
# Internal
"."/[helpers, beaconstate, forks],
"."/datatypes/[altair, merge, phase0]
@ -18,7 +20,18 @@ import
# Otherwise, error.
import chronicles
export altair, phase0
export altair, phase0, taskpools, bearssl
type
TaskPoolPtr* = TaskPool
BatchVerifier* = object
sigVerifCache*: BatchedBLSVerifierCache ##\
## A cache for batch BLS signature verification contexts
rng*: ref BrHmacDrbgContext ##\
## A reference to the Nimbus application-wide RNG
taskpool*: TaskPoolPtr
func `$`*(s: SignatureSet): string =
"(pubkey: 0x" & s.pubkey.toHex() &
@ -459,3 +472,11 @@ proc collectSignatureSets*(
DOMAIN_SYNC_COMMITTEE)
ok()
proc batchVerify*(verifier: var BatchVerifier, sigs: openArray[SignatureSet]): bool =
var bytes: array[32, byte]
verifier.rng[].brHmacDrbgGenerate(bytes)
try:
verifier.taskpool.batchVerify(verifier.sigVerifCache, sigs, bytes)
except Exception as exc:
raiseAssert exc.msg # Shouldn't happen

View File

@ -89,17 +89,21 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
if len(ublocks) > 0:
for b in ublocks:
res = await rman.validate(b)
# We are ignoring errors:
# `BlockError.MissingParent` - because the order of the blocks that
# we requested may be different from the order in which we need
# these blocks to apply.
# `BlockError.Old`, `BlockError.Duplicate` and `BlockError.Unviable`
# errors could occur due to the concurrent/parallel requests we are
# made.
if res.isErr() and (res.error == BlockError.Invalid):
# We stop processing blocks further to avoid DoS attack with big
# chunk of incorrect blocks.
break
if res.isErr():
case res.error()
of BlockError.MissingParent:
# Ignoring because the order of the blocks that
# we requested may be different from the order in which we need
# these blocks to apply.
discard
of BlockError.Duplicate, BlockError.UnviableFork:
# Ignoring because these errors could occur due to the
# concurrent/parallel requests we made.
discard
of BlockError.Invalid:
# We stop processing blocks further to avoid DoS attack with big
# chunk of incorrect blocks.
break
else:
res = Result[void, BlockError].ok()

View File

@ -133,7 +133,7 @@ To mitigate blocking networking and timeshare between Io and compute, blocks are
This in turn calls:
- `storeBlock(Eth2Processor, SignedBeaconBlock, Slot)`
- `addRawBlock(ChainDAGRef, QuarantineRef, SignedBeaconBlock, forkChoiceCallback)`
- `addRawBlock(ChainDAGRef, var BatchVerifier, SignedBeaconBlock, forkChoiceCallback)`
- trigger sending attestation if relevant
### Steady state (synced to head)

View File

@ -87,7 +87,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
eth1Chain = Eth1Chain.init(cfg, db)
merkleizer = depositContractSnapshot.createMerkleizer
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
quarantine = newClone(Quarantine.init())
attPool = AttestationPool.init(dag, quarantine)
syncCommitteePool = newClone SyncCommitteeMsgPool.init()
timers: array[Timers, RunningStat]
@ -302,7 +303,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
dag.withState(tmpState[], dag.head.atSlot(slot)):
let
newBlock = getNewBlock[phase0.SignedBeaconBlock](stateData, slot, cache)
added = dag.addRawBlock(quarantine, newBlock) do (
added = dag.addRawBlock(verifier, newBlock) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -310,7 +311,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
epochRef, blckRef, signedBlock.message, blckRef.slot)
blck() = added[]
dag.updateHead(added[], quarantine)
dag.updateHead(added[], quarantine[])
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
attPool.prune()
@ -322,7 +323,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
dag.withState(tmpState[], dag.head.atSlot(slot)):
let
newBlock = getNewBlock[altair.SignedBeaconBlock](stateData, slot, cache)
added = dag.addRawBlock(quarantine, newBlock) do (
added = dag.addRawBlock(verifier, newBlock) do (
blckRef: BlockRef, signedBlock: altair.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -330,7 +331,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
epochRef, blckRef, signedBlock.message, blckRef.slot)
blck() = added[]
dag.updateHead(added[], quarantine)
dag.updateHead(added[], quarantine[])
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
attPool.prune()
@ -342,7 +343,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
dag.withState(tmpState[], dag.head.atSlot(slot)):
let
newBlock = getNewBlock[merge.SignedBeaconBlock](stateData, slot, cache)
added = dag.addRawBlock(quarantine, newBlock) do (
added = dag.addRawBlock(verifier, newBlock) do (
blckRef: BlockRef, signedBlock: merge.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -350,7 +351,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
epochRef, blckRef, signedBlock.message, blckRef.slot)
blck() = added[]
dag.updateHead(added[], quarantine)
dag.updateHead(added[], quarantine[])
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
attPool.prune()

View File

@ -15,7 +15,8 @@ import # Unit test
./test_action_tracker,
./test_attestation_pool,
./test_beacon_chain_db,
./test_block_pool,
./test_block_processor,
./test_blockchain_dag,
./test_datatypes,
./test_discovery,
./test_eth1_monitor,

View File

@ -21,7 +21,7 @@ import
../../beacon_chain/fork_choice/[fork_choice, fork_choice_types],
../../beacon_chain/beacon_chain_db,
../../beacon_chain/consensus_object_pools/[
blockchain_dag, block_quarantine, block_clearance, spec_cache],
blockchain_dag, block_clearance, spec_cache],
# Third-party
yaml,
# Test
@ -122,7 +122,7 @@ proc loadOps(path: string, fork: BeaconBlockFork): seq[Operation] =
)
result.add Operation(kind: opOnBlock,
blk: ForkedSignedBeaconBlock.init(blk))
of BeaconBlockFork.Altair:
of BeaconBlockFork.Altair:
let blk = parseTest(
path/filename & ".ssz_snappy",
SSZ, altair.SignedBeaconBlock
@ -160,11 +160,11 @@ proc loadOps(path: string, fork: BeaconBlockFork): seq[Operation] =
proc stepOnBlock(
dag: ChainDagRef,
fkChoice: ref ForkChoice,
quarantine: QuarantineRef,
verifier: var BatchVerifier,
state: var StateData,
stateCache: var StateCache,
signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock | merge.SignedBeaconBlock,
time: Slot): Result[BlockRef, (ValidationResult, BlockError)] =
signedBlock: ForkySignedBeaconBlock,
time: Slot): Result[BlockRef, BlockError] =
# 1. Move state to proper slot.
dag.updateStateData(
state,
@ -181,7 +181,7 @@ proc stepOnBlock(
else:
type TrustedBlock = merge.TrustedSignedBeaconBlock
let blockAdded = dag.addRawBlock(quarantine, signedBlock) do (
let blockAdded = dag.addRawBlock(verifier, signedBlock) do (
blckRef: BlockRef, signedBlock: TrustedBlock, epochRef: EpochRef
):
@ -194,8 +194,8 @@ proc stepOnBlock(
time
)
doAssert status.isOk()
return blockAdded
return blockAdded
proc stepOnAttestation(
dag: ChainDagRef,
@ -205,7 +205,7 @@ proc stepOnAttestation(
let epochRef = dag.getEpochRef(dag.head, time.compute_epoch_at_slot())
let attesters = epochRef.get_attesting_indices(att.data, att.aggregation_bits)
let status = fkChoice[].on_attestation(
dag,
att.data.slot, att.data.beacon_block_root, attesters,
@ -287,9 +287,9 @@ proc runTest(path: string, fork: BeaconBlockFork) =
# # TODO: support merge genesis state
# merge.BeaconState, phase0.BeaconBlock
# )
let taskpool = Taskpool.new(numThreads = 1)
let quarantine = QuarantineRef.init(keys.newRng(), taskpool)
var
taskpool = Taskpool.new()
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
let steps = loadOps(path, fork)
var time = stores.fkChoice.checkpoints.time
@ -305,7 +305,7 @@ proc runTest(path: string, fork: BeaconBlockFork) =
withBlck(step.blk):
let status = stepOnBlock(
stores.dag, stores.fkChoice,
quarantine,
verifier,
state[], stateCache,
blck,
time)

View File

@ -60,7 +60,8 @@ suite "Attestation pool processing" & preset():
var
dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6), {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
quarantine = newClone(Quarantine.init())
pool = newClone(AttestationPool.init(dag, quarantine))
state = newClone(dag.headState)
cache = StateCache()
@ -381,7 +382,7 @@ suite "Attestation pool processing" & preset():
var cache = StateCache()
let
b1 = addTestBlock(state.data, cache).phase0Data
b1Add = dag.addRawBlock(quarantine, b1) do (
b1Add = dag.addRawBlock(verifier, b1) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -394,7 +395,7 @@ suite "Attestation pool processing" & preset():
let
b2 = addTestBlock(state.data, cache).phase0Data
b2Add = dag.addRawBlock(quarantine, b2) do (
b2Add = dag.addRawBlock(verifier, b2) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -409,7 +410,7 @@ suite "Attestation pool processing" & preset():
var cache = StateCache()
let
b10 = makeTestBlock(state.data, cache).phase0Data
b10Add = dag.addRawBlock(quarantine, b10) do (
b10Add = dag.addRawBlock(verifier, b10) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -424,7 +425,7 @@ suite "Attestation pool processing" & preset():
b11 = makeTestBlock(state.data, cache,
graffiti = GraffitiBytes [1'u8, 0, 0, 0 ,0 ,0 ,0 ,0 ,0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
).phase0Data
b11Add = dag.addRawBlock(quarantine, b11) do (
b11Add = dag.addRawBlock(verifier, b11) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -470,7 +471,7 @@ suite "Attestation pool processing" & preset():
var cache = StateCache()
let
b10 = makeTestBlock(state.data, cache).phase0Data
b10Add = dag.addRawBlock(quarantine, b10) do (
b10Add = dag.addRawBlock(verifier, b10) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -484,13 +485,13 @@ suite "Attestation pool processing" & preset():
# -------------------------------------------------------------
# Add back the old block to ensure we have a duplicate error
let b10_clone = b10 # Assumes deep copy
let b10Add_clone = dag.addRawBlock(quarantine, b10_clone) do (
let b10Add_clone = dag.addRawBlock(verifier, b10_clone) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot)
doAssert: b10Add_clone.error == (ValidationResult.Ignore, Duplicate)
doAssert: b10Add_clone.error == BlockError.Duplicate
test "Trying to add a duplicate block from an old pruned epoch is tagged as an error":
# Note: very sensitive to stack usage
@ -499,7 +500,7 @@ suite "Attestation pool processing" & preset():
var cache = StateCache()
let
b10 = addTestBlock(state.data, cache).phase0Data
b10Add = dag.addRawBlock(quarantine, b10) do (
b10Add = dag.addRawBlock(verifier, b10) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -524,7 +525,7 @@ suite "Attestation pool processing" & preset():
let new_block = addTestBlock(
state.data, cache, attestations = attestations).phase0Data
let blockRef = dag.addRawBlock(quarantine, new_block) do (
let blockRef = dag.addRawBlock(verifier, new_block) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -532,7 +533,7 @@ suite "Attestation pool processing" & preset():
let head = pool[].selectHead(blockRef[].slot)
doAssert: head == blockRef[]
dag.updateHead(head, quarantine)
dag.updateHead(head, quarantine[])
pruneAtFinalization(dag, pool[])
attestations.setlen(0)
@ -566,10 +567,10 @@ suite "Attestation pool processing" & preset():
doAssert: b10.root notin pool.forkChoice.backend
# Add back the old block to ensure we have a duplicate error
let b10Add_clone = dag.addRawBlock(quarantine, b10_clone) do (
let b10Add_clone = dag.addRawBlock(verifier, b10_clone) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot)
doAssert: b10Add_clone.error == (ValidationResult.Ignore, Duplicate)
doAssert: b10Add_clone.error == BlockError.Duplicate

View File

@ -0,0 +1,96 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
chronicles, chronos,
std/[options, sequtils],
unittest2,
eth/keys, taskpools,
../beacon_chain/beacon_clock,
../beacon_chain/spec/[beaconstate, forks, helpers, state_transition],
../beacon_chain/gossip_processing/[block_processor, consensus_manager],
../beacon_chain/consensus_object_pools/[
attestation_pool, blockchain_dag, block_quarantine, block_clearance],
./testutil, ./testdbutil, ./testblockutil
proc pruneAtFinalization(dag: ChainDAGRef) =
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
suite "Block processor" & preset():
setup:
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
taskpool = Taskpool.new()
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
quarantine = newClone(Quarantine.init())
attestationPool = newClone(AttestationPool.init(dag, quarantine))
consensusManager = ConsensusManager.new(dag, attestationPool, quarantine)
state = newClone(dag.headState.data)
cache = StateCache()
b1 = addTestBlock(state[], cache).phase0Data
b2 = addTestBlock(state[], cache).phase0Data
getTimeFn = proc(): BeaconTime = b2.message.slot.toBeaconTime()
processor = BlockProcessor.new(
false, "", "", keys.newRng(), taskpool, consensusManager,
getTimeFn)
test "Reverse order block add & get" & preset():
let missing = processor[].storeBlock(
b2, b2.message.slot)
check: missing.error == BlockError.MissingParent
check:
dag.get(b2.root).isNone() # Unresolved, shouldn't show up
FetchRecord(root: b1.root) in quarantine[].checkMissing()
let
status = processor[].storeBlock(
b1, b2.message.slot)
b1Get = dag.get(b1.root)
check:
status.isOk
b1Get.isSome()
dag.get(b2.root).isNone() # Async pipeline must still run
discard processor.runQueueProcessingLoop()
while processor[].hasBlocks():
poll()
let
b2Get = dag.get(b2.root)
check:
b2Get.isSome()
b2Get.get().refs.parent == b1Get.get().refs
dag.updateHead(b2Get.get().refs, quarantine[])
dag.pruneAtFinalization()
# The heads structure should have been updated to contain only the new
# b2 head
check:
dag.heads.mapIt(it) == @[b2Get.get().refs]
# check that init also reloads block graph
var
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
check:
# ensure we loaded the correct head state
dag2.head.root == b2.root
getStateRoot(dag2.headState.data) == b2.message.state_root
dag2.get(b1.root).isSome()
dag2.get(b2.root).isSome()
dag2.heads.len == 1
dag2.heads[0].root == b2.root

View File

@ -9,7 +9,6 @@
import
chronicles,
std/[options, sequtils],
unittest2,
stew/assign2,
eth/keys, taskpools,
@ -123,8 +122,8 @@ suite "Block pool processing" & preset():
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = Quarantine.init()
state = newClone(dag.headState.data)
cache = StateCache()
info = ForkedEpochInfo()
@ -144,7 +143,7 @@ suite "Block pool processing" & preset():
test "Simple block add&get" & preset():
let
b1Add = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
b1Get = dag.get(b1.root)
check:
@ -155,7 +154,7 @@ suite "Block pool processing" & preset():
dag.heads[0] == b1Add[]
let
b2Add = dag.addRawBlock(quarantine, b2, nilPhase0Callback)
b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback)
b2Get = dag.get(b2.root)
er = dag.findEpochRef(b1Add[], b1Add[].slot.epoch)
validators = getStateField(dag.headState.data, validators).lenu64()
@ -184,7 +183,7 @@ suite "Block pool processing" & preset():
let
b4 = addTestBlock(state[], cache).phase0Data
b4Add = dag.addRawBlock(quarantine, b4, nilPhase0Callback)
b4Add = dag.addRawBlock(verifier, b4, nilPhase0Callback)
check:
b4Add[].parent == b2Add[]
@ -231,61 +230,9 @@ suite "Block pool processing" & preset():
dag.getBlockRange(Slot(3), 2, blocks.toOpenArray(0, 1)) == 2
blocks[2..<2].len == 0
test "Reverse order block add & get" & preset():
let missing = dag.addRawBlock(quarantine, b2, nilPhase0Callback)
check: missing.error == (ValidationResult.Ignore, MissingParent)
check:
dag.get(b2.root).isNone() # Unresolved, shouldn't show up
FetchRecord(root: b1.root) in quarantine.checkMissing()
let status = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
check: status.isOk
let
b1Get = dag.get(b1.root)
b2Get = dag.get(b2.root)
check:
b1Get.isSome()
b2Get.isSome()
b2Get.get().refs.parent == b1Get.get().refs
dag.updateHead(b2Get.get().refs, quarantine)
dag.pruneAtFinalization()
# The heads structure should have been updated to contain only the new
# b2 head
check:
dag.heads.mapIt(it) == @[b2Get.get().refs]
# check that init also reloads block graph
var
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
check:
# ensure we loaded the correct head state
dag2.head.root == b2.root
getStateRoot(dag2.headState.data) == b2.message.state_root
dag2.get(b1.root).isSome()
dag2.get(b2.root).isSome()
dag2.heads.len == 1
dag2.heads[0].root == b2.root
test "Adding the same block twice returns a Duplicate error" & preset():
let
b10 = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
b11 = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
check:
b11.error == (ValidationResult.Ignore, Duplicate)
not b10[].isNil
test "updateHead updates head and headState" & preset():
let
b1Add = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
dag.updateHead(b1Add[], quarantine)
dag.pruneAtFinalization()
@ -296,8 +243,8 @@ suite "Block pool processing" & preset():
test "updateStateData sanity" & preset():
let
b1Add = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
b2Add = dag.addRawBlock(quarantine, b2, nilPhase0Callback)
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback)
bs1 = BlockSlot(blck: b1Add[], slot: b1.message.slot)
bs1_3 = b1Add[].atSlot(3.Slot)
bs2_3 = b2Add[].atSlot(3.Slot)
@ -352,9 +299,8 @@ suite "Block pool altair processing" & preset():
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, cfg, db, {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
nilAltairCallback: OnAltairBlockAdded
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = Quarantine.init()
state = newClone(dag.headState.data)
cache = StateCache()
info = ForkedEpochInfo()
@ -378,39 +324,39 @@ suite "Block pool altair processing" & preset():
MockPrivKeys[ValidatorIndex(0)]).toValidatorSig()
check:
dag.addRawBlock(quarantine, b1, nilAltairCallback).isOk()
dag.addRawBlock(verifier, b1, nilAltairCallback).isOk()
block: # Main signature
var b = b2
b.signature = badSignature
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
bAdd.error() == BlockError.Invalid
block: # Randao reveal
var b = b2
b.message.body.randao_reveal = badSignature
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
bAdd.error() == BlockError.Invalid
block: # Attestations
var b = b2
b.message.body.attestations[0].signature = badSignature
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
bAdd.error() == BlockError.Invalid
block: # SyncAggregate empty
var b = b2
b.message.body.sync_aggregate.sync_committee_signature = badSignature
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
bAdd.error() == BlockError.Invalid
block: # SyncAggregate junk
var b = b2
@ -418,17 +364,17 @@ suite "Block pool altair processing" & preset():
b.message.body.sync_aggregate.sync_committee_bits[0] = true
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
bAdd.error() == BlockError.Invalid
suite "chain DAG finalization tests" & preset():
setup:
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = Quarantine.init()
cache = StateCache()
info = ForkedEpochInfo()
@ -445,7 +391,7 @@ suite "chain DAG finalization tests" & preset():
let lateBlock = addTestBlock(tmpState[], cache).phase0Data
block:
let status = dag.addRawBlock(quarantine, blck, nilPhase0Callback)
let status = dag.addRawBlock(verifier, blck, nilPhase0Callback)
check: status.isOk()
assign(tmpState[], dag.headState.data)
@ -460,7 +406,7 @@ suite "chain DAG finalization tests" & preset():
tmpState[], cache,
attestations = makeFullAttestations(
tmpState[], dag.head.root, getStateField(tmpState[], slot), cache, {})).phase0Data
let added = dag.addRawBlock(quarantine, blck, nilPhase0Callback)
let added = dag.addRawBlock(verifier, blck, nilPhase0Callback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
@ -507,8 +453,8 @@ suite "chain DAG finalization tests" & preset():
block:
# The late block is a block whose parent was finalized long ago and thus
# is no longer a viable head candidate
let status = dag.addRawBlock(quarantine, lateBlock, nilPhase0Callback)
check: status.error == (ValidationResult.Ignore, Unviable)
let status = dag.addRawBlock(verifier, lateBlock, nilPhase0Callback)
check: status.error == BlockError.UnviableFork
block:
let
@ -536,7 +482,7 @@ suite "chain DAG finalization tests" & preset():
assign(prestate[], dag.headState.data)
let blck = makeTestBlock(dag.headState.data, cache).phase0Data
let added = dag.addRawBlock(quarantine, blck, nilPhase0Callback)
let added = dag.addRawBlock(verifier, blck, nilPhase0Callback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
@ -555,21 +501,21 @@ suite "chain DAG finalization tests" & preset():
let blck = makeTestBlock(prestate[], cache).phase0Data
# Add block, but don't update head
let added = dag.addRawBlock(quarantine, blck, nilPhase0Callback)
let added = dag.addRawBlock(verifier, blck, nilPhase0Callback)
check: added.isOk()
var
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
# check that we can apply the block after the orphaning
let added2 = dag2.addRawBlock(quarantine, blck, nilPhase0Callback)
let added2 = dag2.addRawBlock(verifier, blck, nilPhase0Callback)
check: added2.isOk()
test "init with gaps" & preset():
for blck in makeTestBlocks(
dag.headState.data, cache, int(SLOTS_PER_EPOCH * 6 - 2),
true):
let added = dag.addRawBlock(quarantine, blck.phase0Data, nilPhase0Callback)
let added = dag.addRawBlock(verifier, blck.phase0Data, nilPhase0Callback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
@ -586,7 +532,7 @@ suite "chain DAG finalization tests" & preset():
dag.headState.data, dag.head.root, getStateField(dag.headState.data, slot),
cache, {})).phase0Data
let added = dag.addRawBlock(quarantine, blck, nilPhase0Callback)
let added = dag.addRawBlock(verifier, blck, nilPhase0Callback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
@ -627,8 +573,9 @@ suite "Old database versions" & preset():
makeInitialDeposits(SLOTS_PER_EPOCH.uint64, flags = {skipBlsValidation}),
{skipBlsValidation}))
genBlock = get_initial_beacon_block(genState[])
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
var
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = Quarantine.init()
test "pre-1.1.0":
# only kvstore, no immutable validator keys
@ -651,7 +598,7 @@ suite "Old database versions" & preset():
cache = StateCache()
att0 = makeFullAttestations(state[], dag.tail.root, 0.Slot, cache)
b1 = addTestBlock(state[], cache, attestations = att0).phase0Data
b1Add = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
check:
b1Add.isOk()
@ -668,8 +615,9 @@ suite "Diverging hardforks":
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, phase0RuntimeConfig, db, {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = newClone(Quarantine.init())
nilPhase0Callback: OnPhase0BlockAdded
state = newClone(dag.headState.data)
cache = StateCache()
info = ForkedEpochInfo()
@ -687,10 +635,10 @@ suite "Diverging hardforks":
# common is the tail block
var
b1 = addTestBlock(tmpState[], cache).phase0Data
b1Add = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
check b1Add.isOk()
dag.updateHead(b1Add[], quarantine)
dag.updateHead(b1Add[], quarantine[])
var dagAltair = init(ChainDAGRef, altairRuntimeConfig, db, {})
discard AttestationPool.init(dagAltair, quarantine)
@ -705,7 +653,7 @@ suite "Diverging hardforks":
# There's a block in the shared-correct phase0 hardfork, before epoch 2
var
b1 = addTestBlock(tmpState[], cache).phase0Data
b1Add = dag.addRawBlock(quarantine, b1, nilPhase0Callback)
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
check:
b1Add.isOk()
@ -716,10 +664,10 @@ suite "Diverging hardforks":
var
b2 = addTestBlock(tmpState[], cache).phase0Data
b2Add = dag.addRawBlock(quarantine, b2, nilPhase0Callback)
b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback)
check b2Add.isOk()
dag.updateHead(b2Add[], quarantine)
dag.updateHead(b2Add[], quarantine[])
var dagAltair = init(ChainDAGRef, altairRuntimeConfig, db, {})
discard AttestationPool.init(dagAltair, quarantine)

View File

@ -38,7 +38,8 @@ suite "Gossip validation " & preset():
var
dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
quarantine = newClone(Quarantine.init())
pool = newClone(AttestationPool.init(dag, quarantine))
state = newClone(dag.headState)
cache = StateCache()
@ -74,14 +75,14 @@ suite "Gossip validation " & preset():
cache: StateCache
for blck in makeTestBlocks(
dag.headState.data, cache, int(SLOTS_PER_EPOCH * 5), false):
let added = dag.addRawBlock(quarantine, blck.phase0Data) do (
let added = dag.addRawBlock(verifier, blck.phase0Data) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.updateHead(added[], quarantine[])
pruneAtFinalization(dag, pool[])
var
@ -180,10 +181,11 @@ suite "Gossip validation - Extra": # Not based on preset config
cfg.ALTAIR_FORK_EPOCH = (GENESIS_EPOCH + 1).Epoch
cfg
dag = block:
let
dag = ChainDAGRef.init(cfg, makeTestDB(num_validators), {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
var
dag = ChainDAGRef.init(
cfg, makeTestDB(num_validators), {})
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
quarantine = newClone(Quarantine.init())
var cache = StateCache()
for blck in makeTestBlocks(
dag.headState.data, cache, int(SLOTS_PER_EPOCH), false, cfg = cfg):
@ -191,15 +193,15 @@ suite "Gossip validation - Extra": # Not based on preset config
case blck.kind
of BeaconBlockFork.Phase0:
const nilCallback = OnPhase0BlockAdded(nil)
dag.addRawBlock(quarantine, blck.phase0Data, nilCallback)
dag.addRawBlock(verifier, blck.phase0Data, nilCallback)
of BeaconBlockFork.Altair:
const nilCallback = OnAltairBlockAdded(nil)
dag.addRawBlock(quarantine, blck.altairData, nilCallback)
dag.addRawBlock(verifier, blck.altairData, nilCallback)
of BeaconBlockFork.Merge:
const nilCallback = OnMergeBlockAdded(nil)
dag.addRawBlock(quarantine, blck.mergeData, nilCallback)
dag.addRawBlock(verifier, blck.mergeData, nilCallback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.updateHead(added[], quarantine[])
dag
state = assignClone(dag.headState.data.altairData)
slot = state[].data.slot

View File

@ -24,7 +24,7 @@ proc newBlockProcessor(): ref BlockProcessor =
# Minimal block processor for test - the real block processor has an unbounded
# queue but the tests here
(ref BlockProcessor)(
blocksQueue: newAsyncQueue[BlockEntry]()
blockQueue: newAsyncQueue[BlockEntry]()
)
suite "SyncManager test suite":
@ -233,7 +233,7 @@ suite "SyncManager test suite":
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64,
getFirstSlotAtFinalizedEpoch, aq, 1)
var validatorFut = simpleValidator(aq[].blocksQueue)
var validatorFut = simpleValidator(aq[].blockQueue)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
@ -287,7 +287,7 @@ suite "SyncManager test suite":
let p3 = SomeTPeer()
let p4 = SomeTPeer()
var validatorFut = simpleValidator(aq[].blocksQueue)
var validatorFut = simpleValidator(aq[].blockQueue)
var r21 = queue.pop(Slot(11), p1)
var r22 = queue.pop(Slot(11), p2)
@ -344,7 +344,7 @@ suite "SyncManager test suite":
let p6 = SomeTPeer()
let p7 = SomeTPeer()
var validatorFut = simpleValidator(aq[].blocksQueue)
var validatorFut = simpleValidator(aq[].blockQueue)
var r21 = queue.pop(Slot(20), p1)
var r22 = queue.pop(Slot(20), p2)