mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-10 14:26:26 +00:00
ca1775f725
When the requestmanager is busy fetching blocks, the queue might get filled with multiple entries of the same root - since there is no deduplication, requests containing the same root multiple times will be sent out. Also, because the items sit in the queue for a long time potentially, the request might be stale by the time that the manager is ready with the previous request. This PR removes the queue and directly fetches the blocks to download from the quarantine which solves both problems (the quarantine already de-duplicates and is clean of stale information). Removing the queue for blobs is left for a future PR. Co-authored-by: tersec <tersec@users.noreply.github.com>
117 lines
4.2 KiB
Nim
117 lines
4.2 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.used.}
|
|
|
|
import
|
|
chronos,
|
|
std/sequtils,
|
|
unittest2,
|
|
taskpools,
|
|
../beacon_chain/[conf, beacon_clock],
|
|
../beacon_chain/spec/[beaconstate, forks, helpers, state_transition],
|
|
../beacon_chain/spec/datatypes/deneb,
|
|
../beacon_chain/gossip_processing/block_processor,
|
|
../beacon_chain/consensus_object_pools/[
|
|
attestation_pool, blockchain_dag, blob_quarantine, block_quarantine,
|
|
block_clearance, consensus_manager],
|
|
../beacon_chain/el/el_manager,
|
|
./testutil, ./testdbutil, ./testblockutil
|
|
|
|
from chronos/unittest2/asynctests import asyncTest
|
|
from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import
|
|
DynamicFeeRecipientsStore, init
|
|
from ../beacon_chain/validators/action_tracker import ActionTracker
|
|
from ../beacon_chain/validators/keystore_management import KeymanagerHost
|
|
|
|
proc pruneAtFinalization(dag: ChainDAGRef) =
|
|
if dag.needStateCachesAndForkChoicePruning():
|
|
dag.pruneStateCachesDAG()
|
|
|
|
suite "Block processor" & preset():
|
|
setup:
|
|
let rng = HmacDrbgContext.new()
|
|
var
|
|
db = makeTestDB(SLOTS_PER_EPOCH)
|
|
validatorMonitor = newClone(ValidatorMonitor.init())
|
|
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
|
|
taskpool = Taskpool.new()
|
|
verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
|
quarantine = newClone(Quarantine.init())
|
|
blobQuarantine = newClone(BlobQuarantine())
|
|
attestationPool = newClone(AttestationPool.init(dag, quarantine))
|
|
elManager = new ELManager # TODO: initialise this properly
|
|
actionTracker: ActionTracker
|
|
keymanagerHost: ref KeymanagerHost
|
|
consensusManager = ConsensusManager.new(
|
|
dag, attestationPool, quarantine, elManager, actionTracker,
|
|
newClone(DynamicFeeRecipientsStore.init()), "",
|
|
Opt.some default(Eth1Address), defaultGasLimit)
|
|
state = newClone(dag.headState)
|
|
cache = StateCache()
|
|
b1 = addTestBlock(state[], cache).phase0Data
|
|
b2 = addTestBlock(state[], cache).phase0Data
|
|
getTimeFn = proc(): BeaconTime = b2.message.slot.start_beacon_time()
|
|
processor = BlockProcessor.new(
|
|
false, "", "", rng, taskpool, consensusManager,
|
|
validatorMonitor, blobQuarantine, getTimeFn)
|
|
|
|
asyncTest "Reverse order block add & get" & preset():
|
|
let missing = await processor.storeBlock(
|
|
MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, Opt.none(BlobSidecars))
|
|
check: missing.error[0] == VerifierError.MissingParent
|
|
|
|
check:
|
|
not dag.containsForkBlock(b2.root) # Unresolved, shouldn't show up
|
|
|
|
FetchRecord(root: b1.root) in quarantine[].checkMissing(32)
|
|
|
|
let
|
|
status = await processor.storeBlock(
|
|
MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, Opt.none(BlobSidecars))
|
|
b1Get = dag.getBlockRef(b1.root)
|
|
|
|
check:
|
|
status.isOk
|
|
b1Get.isSome()
|
|
dag.containsForkBlock(b1.root)
|
|
not dag.containsForkBlock(b2.root) # Async pipeline must still run
|
|
|
|
discard processor.runQueueProcessingLoop()
|
|
while processor[].hasBlocks():
|
|
poll()
|
|
|
|
let
|
|
b2Get = dag.getBlockRef(b2.root)
|
|
|
|
check:
|
|
b2Get.isSome()
|
|
|
|
b2Get.get().parent == b1Get.get()
|
|
|
|
dag.updateHead(b2Get.get(), quarantine[], [])
|
|
dag.pruneAtFinalization()
|
|
|
|
# The heads structure should have been updated to contain only the new
|
|
# b2 head
|
|
check:
|
|
dag.heads.mapIt(it) == @[b2Get.get()]
|
|
|
|
# check that init also reloads block graph
|
|
var
|
|
validatorMonitor2 = newClone(ValidatorMonitor.init())
|
|
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor2, {})
|
|
|
|
check:
|
|
# ensure we loaded the correct head state
|
|
dag2.head.root == b2.root
|
|
getStateRoot(dag2.headState) == b2.message.state_root
|
|
dag2.getBlockRef(b1.root).isSome()
|
|
dag2.getBlockRef(b2.root).isSome()
|
|
dag2.heads.len == 1
|
|
dag2.heads[0].root == b2.root
|