From 03005f48e170a4c6b2e20550f526b7168a6c205d Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 13 Dec 2021 14:36:06 +0100 Subject: [PATCH] Backfill support for ChainDAG (#3171) In the ChainDAG, 3 block pointers are kept: genesis, tail and head. This PR adds one more block pointer: the backfill block which represents the block that has been backfilled so far. When doing a checkpoint sync, a random block is given as starting point - this is the tail block, and we require that the tail block has a corresponding state. When backfilling, we end up with blocks without corresponding states, hence we cannot use `tail` as a backfill pointer - there is no state. Nonetheless, we need to keep track of where we are in the backfill process between restarts, such that we can answer GetBeaconBlocksByRange requests. This PR adds the basic support for backfill handling - it needs to be integrated with backfill sync, and the REST API needs to be adjusted to take advantage of the new backfilled blocks when responding to certain requests. Future work will also enable moving the tail in either direction: * pruning means moving the tail forward in time and removing states * backwards means recreating past states from genesis, such that intermediate states are recreated step by step all the way to the tail - at that point, tail, genesis and backfill will match up. * backfilling is done when backfill != genesis - later, this will be the WSS checkpoint instead --- AllTests-mainnet.md | 8 +- beacon_chain/beacon_chain_db.nim | 9 + .../block_clearance.nim | 81 +++++++- .../block_pools_types.nim | 21 +- .../consensus_object_pools/blockchain_dag.nim | 193 ++++++++++++------ .../gossip_processing/block_processor.nim | 2 +- beacon_chain/sync/sync_protocol.nim | 45 ++-- docs/block_flow.md | 2 +- research/block_sim.nim | 6 +- .../test_fixture_fork_choice.nim | 2 +- tests/test_action_tracker.nim | 3 +- tests/test_attestation_pool.nim | 18 +- tests/test_blockchain_dag.nim | 171 +++++++++++++--- tests/test_eth1_monitor.nim | 3 - tests/test_gossip_validation.nim | 8 +- tests/test_merge_vectors.nim | 5 +- 16 files changed, 430 insertions(+), 147 deletions(-) diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 544a20843..becaa5983 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -44,6 +44,12 @@ OK: 25/25 Fail: 0/25 Skip: 0/25 + Working with aggregates [Preset: mainnet] OK ``` OK: 11/11 Fail: 0/11 Skip: 0/11 +## Backfill +```diff ++ backfill to genesis OK ++ reload backfill position OK +``` +OK: 2/2 Fail: 0/2 Skip: 0/2 ## Beacon chain DB [Preset: mainnet] ```diff + empty database [Preset: mainnet] OK @@ -389,4 +395,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1 ---TOTAL--- -OK: 213/215 Fail: 0/215 Skip: 2/215 +OK: 215/217 Fail: 0/217 Skip: 2/217 diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index bf33b6a56..00b610639 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -137,6 +137,9 @@ type ## only recent contract state data (i.e. only recent `deposit_roots`). kHashToStateDiff # Obsolete kHashToStateOnlyMutableValidators + kBackfillBlock + ## Pointer to the earliest block that we have backfilled - if this is not + ## set, backfill == tail BeaconBlockSummary* = object ## Cache of beacon block summaries - during startup when we construct the @@ -588,6 +591,9 @@ proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) = proc putGenesisBlock*(db: BeaconChainDB, key: Eth2Digest) = db.keyValues.putRaw(subkey(kGenesisBlock), key) +proc putBackfillBlock*(db: BeaconChainDB, key: Eth2Digest) = + db.keyValues.putRaw(subkey(kBackfillBlock), key) + proc putEth2FinalizedTo*(db: BeaconChainDB, eth1Checkpoint: DepositContractSnapshot) = db.keyValues.putSnappySSZ(subkey(kDepositsFinalizedByEth2), eth1Checkpoint) @@ -791,6 +797,9 @@ proc getGenesisBlock*(db: BeaconChainDB): Opt[Eth2Digest] = db.keyValues.getRaw(subkey(kGenesisBlock), Eth2Digest) or db.v0.getGenesisBlock() +proc getBackfillBlock*(db: BeaconChainDB): Opt[Eth2Digest] = + db.keyValues.getRaw(subkey(kBackfillBlock), Eth2Digest) + proc getEth2FinalizedTo(db: BeaconChainDBV0): Opt[DepositContractSnapshot] = result.ok(DepositContractSnapshot()) let r = db.backend.getSnappySSZ(subkey(kDepositsFinalizedByEth2), result.get) diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index 5a069dc6f..943be00ae 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -31,7 +31,7 @@ export results, signatures_batch logScope: topics = "clearance" -proc addResolvedBlock( +proc addResolvedHeadBlock( dag: ChainDAGRef, state: var StateData, trustedBlock: ForkyTrustedSignedBeaconBlock, @@ -149,7 +149,7 @@ proc advanceClearanceState*(dag: ChainDAGRef) = debug "Prepared clearance state for next block", next, updateStateDur = Moment.now() - startTick -proc addRawBlock*( +proc addHeadBlock*( dag: ChainDAGRef, verifier: var BatchVerifier, signedBlock: ForkySignedBeaconBlock, onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded @@ -212,7 +212,7 @@ proc addRawBlock*( # correct - from their point of view, the head block they have is the # latest thing that happened on the chain and they're performing their # duty correctly. - debug "Unviable block, dropping", + debug "Block from unviable fork", finalizedHead = shortLog(dag.finalizedHead), tail = shortLog(dag.tail) @@ -250,7 +250,7 @@ proc addRawBlock*( let stateVerifyTick = Moment.now() # Careful, clearanceState.data has been updated but not blck - we need to # create the BlockRef first! - ok addResolvedBlock( + ok addResolvedHeadBlock( dag, dag.clearanceState, signedBlock.asTrusted(), parent, cache, @@ -258,3 +258,76 @@ proc addRawBlock*( stateDataDur = stateDataTick - startTick, sigVerifyDur = sigVerifyTick - stateDataTick, stateVerifyDur = stateVerifyTick - sigVerifyTick) + +proc addBackfillBlock*( + dag: ChainDAGRef, + signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] = + ## When performing checkpoint sync, we need to backfill historical blocks + ## in order to respond to GetBlocksByRange requests. Backfill blocks are + ## added in backwards order, one by one, based on the `parent_root` of the + ## earliest block we know about. + ## + ## Because only one history is relevant when backfilling, one doesn't have to + ## consider forks or other finalization-related issues - a block is either + ## valid and finalized, or not. + logScope: + blockRoot = shortLog(signedBlock.root) + blck = shortLog(signedBlock.message) + backfill = (dag.backfill.slot, shortLog(dag.backfill.root)) + + template blck(): untyped = signedBlock.message # shortcuts without copy + template blockRoot(): untyped = signedBlock.root + + if dag.backfill.slot <= signedBlock.message.slot or + signedBlock.message.slot <= dag.genesis.slot: + if blockRoot in dag: + debug "Block already exists" + return err(BlockError.Duplicate) + + # The block is newer than our backfill position but not in the dag - either + # it sits somewhere between backfill and tail or it comes from an unviable + # fork. We don't have an in-memory way of checking the former condition so + # we return UnviableFork for that condition as well, even though `Duplicate` + # would be more correct + debug "Block unviable or duplicate" + return err(BlockError.UnviableFork) + + if dag.backfill.root != signedBlock.root: + debug "Block does not match expected backfill root" + return err(BlockError.MissingParent) # MissingChild really, but .. + + # If the hash is correct, the block itself must be correct, but the root does + # not cover the signature, which we check next + + let proposerKey = dag.validatorKey(blck.proposer_index) + if proposerKey.isNone(): + # This cannot happen, in theory, unless the checkpoint state is broken or + # there is a bug in our validator key caching scheme - in order not to + # send invalid attestations, we'll shut down defensively here - this might + # need revisiting in the future. + fatal "Invalid proposer in backfill block - checkpoint state corrupt?" + quit 1 + + if not verify_block_signature( + dag.forkAtEpoch(blck.slot.epoch), + getStateField(dag.headState.data, genesis_validators_root), + blck.slot, + signedBlock.root, + proposerKey.get(), + signedBlock.signature): + info "Block signature verification failed" + return err(BlockError.Invalid) + + dag.putBlock(signedBlock.asTrusted()) + dag.db.putBackfillBlock(signedBlock.root) + dag.backfill = (blck.slot, blck.parent_root) + + # Invariants maintained on startup + doAssert dag.backfillBlocks.lenu64 == dag.tail.slot.uint64 + doAssert dag.backfillBlocks.lenu64 > blck.slot.uint64 + + dag.backfillBlocks[blck.slot.int] = signedBlock.root + + debug "Block backfilled" + + ok() diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 8f5b16619..34b963062 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -92,13 +92,30 @@ type finalizedBlocks*: seq[BlockRef] ##\ ## Slot -> BlockRef mapping for the canonical chain - use getBlockBySlot - ## to access, generally + ## to access, generally - coverst the slots + ## `tail.slot..finalizedHead.slot` (including the finalized head slot) - + ## indices are thus offset by tail.slot + + backfillBlocks*: seq[Eth2Digest] ##\ + ## Slot -> Eth2Digest, tail.slot entries genesis*: BlockRef ##\ ## The genesis block of the network tail*: BlockRef ##\ - ## The earliest finalized block we know about + ## The earliest finalized block for which we have a corresponding state - + ## when making a replay of chain history, this is as far back as we can + ## go - the tail block is unique in that its parent is set to `nil`, even + ## in the case where a later genesis block exists. + + backfill*: tuple[slot: Slot, root: Eth2Digest] ##\ + ## The backfill is root of the parent of the the earliest block that we + ## have synced, when performing a checkpoint sync start. Because the + ## `tail` BlockRef does not have a parent, we store here the root of the + ## block we're expecting during backfill. + ## When starting a checkpoint sync, `backfill` == `tail.parent_root` - we + ## then sync backards, moving the backfill (but not tail!) until we hit + ## genesis at which point we set backfill to the zero hash. heads*: seq[BlockRef] ##\ ## Candidate heads of candidate chains diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index fdbc8ca34..1d07930e1 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -173,17 +173,55 @@ func effective_balances*(epochRef: EpochRef): seq[Gwei] = func getBlockBySlot*(dag: ChainDAGRef, slot: Slot): BlockSlot = ## Retrieve the canonical block at the given slot, or the last block that - ## comes before - similar to atSlot, but without the linear scan + ## comes before - similar to atSlot, but without the linear scan - see + ## getBlockSlotIdBySlot for a version that covers backfill blocks as well + ## May return an empty BlockSlot (where blck is nil!) + + if slot == dag.genesis.slot: + # There may be gaps in the + return dag.genesis.atSlot(slot) + if slot > dag.finalizedHead.slot: return dag.head.atSlot(slot) # Linear iteration is the fastest we have - var tmp = slot.int - while true: - if dag.finalizedBlocks[tmp] != nil: - return dag.finalizedBlocks[tmp].atSlot(slot) - if tmp == 0: - raiseAssert "At least the genesis block should be available!" - tmp = tmp - 1 + doAssert dag.finalizedHead.slot >= dag.tail.slot + doAssert dag.tail.slot >= dag.backfill.slot + doAssert dag.finalizedBlocks.len == + (dag.finalizedHead.slot - dag.tail.slot).int + 1, "see updateHead" + + if slot >= dag.tail.slot: + var pos = int(slot - dag.tail.slot) + + while true: + if dag.finalizedBlocks[pos] != nil: + return dag.finalizedBlocks[pos].atSlot(slot) + + if pos == 0: + break + + pos -= 1 + + if dag.tail.slot == 0: + raiseAssert "Genesis missing" + + BlockSlot() # nil blck! + +func getBlockSlotIdBySlot*(dag: ChainDAGRef, slot: Slot): BlockSlotId = + ## Retrieve the canonical block at the given slot, or the last block that + ## comes before - similar to atSlot, but without the linear scan + if slot == dag.genesis.slot: + return dag.genesis.bid.atSlot(slot) + + if slot >= dag.tail.slot: + return dag.getBlockBySlot(slot).toBlockSlotId() + + var pos = slot.int + while pos >= dag.backfill.slot.int: + if dag.backfillBlocks[pos] != Eth2Digest(): + return BlockId(root: dag.backfillBlocks[pos], slot: Slot(pos)).atSlot(slot) + pos -= 1 + + BlockSlotId() # not backfilled yet, and not genesis func epochAncestor*(blck: BlockRef, epoch: Epoch): EpochKey = ## The state transition works by storing information from blocks in a @@ -315,6 +353,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, let tailBlockRoot = db.getTailBlock() headBlockRoot = db.getHeadBlock() + backfillBlockRoot = db.getBackfillBlock() doAssert tailBlockRoot.isSome(), "Missing tail block, database corrupt?" doAssert headBlockRoot.isSome(), "Missing head block, database corrupt?" @@ -335,6 +374,18 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, "preInit should have initialized the database with a genesis block") withBlck(genesisBlock): BlockRef.init(genesisBlockRoot, blck.message) + let backfill = + if backfillBlockRoot.isSome(): + let backfillBlock = db.getForkedBlock(backfillBlockRoot.get()).expect( + "backfill block must be present in database, database corrupt?") + (getForkedBlockField(backfillBlock, slot), + getForkedBlockField(backfillBlock, parentRoot)) + elif tailRef.slot > GENESIS_SLOT: + (getForkedBlockField(tailBlock, slot), + getForkedBlockField(tailBlock, parentRoot)) + else: + (GENESIS_SLOT, Eth2Digest()) + var blocks: HashSet[KeyedBlockRef] headRef: BlockRef @@ -344,38 +395,46 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, if genesisRef != tailRef: blocks.incl(KeyedBlockRef.init(genesisRef)) - if headRoot != tailRoot: - var curRef: BlockRef + var + backfillBlocks = newSeq[Eth2Digest](tailRef.slot.int) + curRef: BlockRef - for blck in db.getAncestorSummaries(headRoot): - if blck.root == tailRef.root: - doAssert(not curRef.isNil) + for blck in db.getAncestorSummaries(headRoot): + if blck.summary.slot < tailRef.slot: + backfillBlocks[blck.summary.slot.int] = blck.root + elif blck.summary.slot == tailRef.slot: + if curRef == nil: + curRef = tailRef + headRef = tailRef + else: link(tailRef, curRef) curRef = curRef.parent - break + else: + if curRef == nil: + # When the database has been written with a pre-fork version of the + # software, it may happen that blocks produced using an "unforked" + # chain get written to the database - we need to skip such blocks + # when loading the database with a fork-compatible version + if not containsBlock(cfg, db, blck.summary.slot, blck.root): + continue let newRef = BlockRef.init(blck.root, blck.summary.slot) if curRef == nil: curRef = newRef + headRef = newRef else: link(newRef, curRef) curRef = curRef.parent - # Don't include blocks on incorrect hardforks - if headRef == nil and cfg.containsBlock(db, newRef.slot, newRef.root): - headRef = newRef - blocks.incl(KeyedBlockRef.init(curRef)) trace "Populating block dag", key = curRef.root, val = curRef - if curRef != tailRef: - fatal "Head block does not lead to tail - database corrupt?", - genesisRef, tailRef, headRef, curRef, tailRoot, headRoot, - blocks = blocks.len() + if curRef != tailRef: + fatal "Head block does not lead to tail - database corrupt?", + genesisRef, tailRef, headRef, curRef, tailRoot, headRoot, + blocks = blocks.len() - quit 1 - else: - headRef = tailRef + quit 1 # Because of incorrect hardfork check, there might be no head block, in which # case it's equivalent to the tail block @@ -429,8 +488,10 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, let dag = ChainDAGRef( db: db, blocks: blocks, + backfillBlocks: backfillBlocks, genesis: genesisRef, tail: tailRef, + backfill: backfill, finalizedHead: tailRef.atSlot(), lastPrunePoint: tailRef.atSlot(), # Tail is implicitly finalized - we'll adjust it below when computing the @@ -476,10 +537,11 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, dag.finalizedHead = headRef.atSlot(finalizedSlot) block: - dag.finalizedBlocks.setLen(dag.finalizedHead.slot.int + 1) + dag.finalizedBlocks.setLen((dag.finalizedHead.slot - dag.tail.slot).int + 1) + var tmp = dag.finalizedHead.blck while not isNil(tmp): - dag.finalizedBlocks[tmp.slot.int] = tmp + dag.finalizedBlocks[(tmp.slot - dag.tail.slot).int] = tmp tmp = tmp.parent dag.clearanceState = dag.headState @@ -499,7 +561,8 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, head = shortLog(dag.head), finalizedHead = shortLog(dag.finalizedHead), tail = shortLog(dag.tail), - totalBlocks = dag.blocks.len + totalBlocks = dag.blocks.len(), + backfill = (dag.backfill.slot, shortLog(dag.backfill.root)) dag @@ -631,9 +694,9 @@ func getRef*(dag: ChainDAGRef, root: Eth2Digest): BlockRef = else: nil -func getBlockRange*( +proc getBlockRange*( dag: ChainDAGRef, startSlot: Slot, skipStep: uint64, - output: var openArray[BlockRef]): Natural = + output: var openArray[BlockId]): Natural = ## This function populates an `output` buffer of blocks ## with a slots ranging from `startSlot` up to, but not including, ## `startSlot + skipStep * output.len`, skipping any slots that don't have @@ -652,55 +715,63 @@ func getBlockRange*( trace "getBlockRange entered", head = shortLog(dag.head.root), requestedCount, startSlot, skipStep, headSlot - if startSlot < dag.tail.slot or headSlot <= startSlot or requestedCount == 0: + if startSlot < dag.backfill.slot: + notice "Got request for pre-backfill slot", + startSlot, backfillSlot = dag.backfill.slot + return output.len + + if headSlot <= startSlot or requestedCount == 0: return output.len # Identical to returning an empty set of block as indicated above let runway = uint64(headSlot - startSlot) # This is the number of blocks that will follow the start block - extraBlocks = min(runway div skipStep, requestedCount - 1) + extraSlots = min(runway div skipStep, requestedCount - 1) - # If `skipStep` is very large, `extraBlocks` should be 0 from + # If `skipStep` is very large, `extraSlots` should be 0 from # the previous line, so `endSlot` will be equal to `startSlot`: - endSlot = startSlot + extraBlocks * skipStep + endSlot = startSlot + extraSlots * skipStep var - b = dag.getBlockBySlot(endSlot) + curSlot = endSlot o = output.len # Process all blocks that follow the start block (may be zero blocks) - for i in 1..extraBlocks: - if b.blck.slot == b.slot: - dec o - output[o] = b.blck - for j in 1..skipStep: - b = b.parent + while curSlot > startSlot: + let bs = dag.getBlockSlotIdBySlot(curSlot) + if bs.isProposed(): + o -= 1 + output[o] = bs.bid + curSlot -= skipStep - # We should now be at the start block. - # Like any "block slot", it may be a missing/skipped block: - if b.blck.slot == b.slot: - dec o - output[o] = b.blck + # Handle start slot separately (to avoid underflow when computing curSlot) + let bs = dag.getBlockSlotIdBySlot(startSlot) + if bs.isProposed(): + o -= 1 + output[o] = bs.bid o # Return the index of the first non-nil item in the output -proc getForkedBlock*(dag: ChainDAGRef, blck: BlockRef): ForkedTrustedSignedBeaconBlock = - case dag.cfg.blockForkAtEpoch(blck.slot.epoch) +proc getForkedBlock*(dag: ChainDAGRef, id: BlockId): Opt[ForkedTrustedSignedBeaconBlock] = + case dag.cfg.blockForkAtEpoch(id.slot.epoch) of BeaconBlockFork.Phase0: - let data = dag.db.getPhase0Block(blck.root) + let data = dag.db.getPhase0Block(id.root) if data.isOk(): - return ForkedTrustedSignedBeaconBlock.init(data.get) + return ok ForkedTrustedSignedBeaconBlock.init(data.get) of BeaconBlockFork.Altair: - let data = dag.db.getAltairBlock(blck.root) + let data = dag.db.getAltairBlock(id.root) if data.isOk(): - return ForkedTrustedSignedBeaconBlock.init(data.get) + return ok ForkedTrustedSignedBeaconBlock.init(data.get) of BeaconBlockFork.Merge: - let data = dag.db.getMergeBlock(blck.root) + let data = dag.db.getMergeBlock(id.root) if data.isOk(): - return ForkedTrustedSignedBeaconBlock.init(data.get) + return ok ForkedTrustedSignedBeaconBlock.init(data.get) - raiseAssert "BlockRef without backing data, database corrupt?" +proc getForkedBlock*(dag: ChainDAGRef, blck: BlockRef): ForkedTrustedSignedBeaconBlock = + let blck = dag.getForkedBlock(blck.bid) + if blck.isSome(): + return blck.get() proc get*(dag: ChainDAGRef, blck: BlockRef): BlockData = ## Retrieve the associated block body of a block reference @@ -1195,17 +1266,17 @@ proc updateHead*( let currentEpoch = epoch(newHead.slot) let currentDutyDepRoot = - if currentEpoch > Epoch(0): + if currentEpoch > dag.tail.slot.epoch: dag.head.atSlot( compute_start_slot_at_epoch(currentEpoch) - 1).blck.root else: - dag.genesis.root + dag.tail.root previousDutyDepRoot = - if currentEpoch > Epoch(1): + if currentEpoch > dag.tail.slot.epoch + 1: dag.head.atSlot( compute_start_slot_at_epoch(currentEpoch - 1) - 1).blck.root else: - dag.genesis.root + dag.tail.root epochTransition = (finalizedHead != dag.finalizedHead) let data = HeadChangeInfoObject.init(dag.head.slot, dag.head.root, getStateRoot(dag.headState.data), @@ -1260,10 +1331,10 @@ proc updateHead*( # Update `dag.finalizedBlocks` with all newly finalized blocks (those # newer than the previous finalized head), then update `dag.finalizedHead` - dag.finalizedBlocks.setLen(finalizedHead.slot.int + 1) + dag.finalizedBlocks.setLen(finalizedHead.slot - dag.tail.slot + 1) var tmp = finalizedHead.blck while not isNil(tmp) and tmp.slot >= dag.finalizedHead.slot: - dag.finalizedBlocks[tmp.slot.int] = tmp + dag.finalizedBlocks[(tmp.slot - dag.tail.slot).int] = tmp tmp = tmp.parent dag.finalizedHead = finalizedHead diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 93af11d07..0c9bd5a8a 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -175,7 +175,7 @@ proc storeBlock*( self.consensusManager.quarantine[].removeOrphan(signedBlock) type Trusted = typeof signedBlock.asTrusted() - let blck = dag.addRawBlock(self.verifier, signedBlock) do ( + let blck = dag.addHeadBlock(self.verifier, signedBlock) do ( blckRef: BlockRef, trustedBlock: Trusted, epochRef: EpochRef): # Callback add to fork choice if valid attestationPool[].addForkChoice( diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index 759bacf3e..7bdae3cd9 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -209,7 +209,7 @@ p2pProtocol BeaconSync(version = 1, trace "got range request", peer, startSlot, count = reqCount, step = reqStep if reqCount > 0'u64 and reqStep > 0'u64: - var blocks: array[MAX_REQUEST_BLOCKS, BlockRef] + var blocks: array[MAX_REQUEST_BLOCKS, BlockId] let dag = peer.networkState.dag # Limit number of blocks in response @@ -218,29 +218,29 @@ p2pProtocol BeaconSync(version = 1, let endIndex = count - 1 startIndex = - dag.getBlockRange(startSlot, reqStep, - blocks.toOpenArray(0, endIndex)) + dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex)) peer.updateRequestQuota( blockByRangeLookupCost + max(0, endIndex - startIndex + 1).float * blockResponseCost) peer.awaitNonNegativeRequestQuota() for i in startIndex..endIndex: - doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only" trace "wrote response block", slot = blocks[i].slot, roor = shortLog(blocks[i].root) - let blk = dag.get(blocks[i]).data - case blk.kind - of BeaconBlockFork.Phase0: - await response.write(blk.phase0Data.asSigned) - of BeaconBlockFork.Altair, BeaconBlockFork.Merge: - # Skipping all subsequent blocks should be OK because the spec says: - # "Clients MAY limit the number of blocks in the response." - # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange - # - # Also, our response would be indistinguishable from a node - # that have been synced exactly to the altair transition slot. - break + let blk = dag.getForkedBlock(blocks[i]) + if blk.isSome(): + let blck = blk.get() + case blck.kind + of BeaconBlockFork.Phase0: + await response.write(blck.phase0Data.asSigned) + of BeaconBlockFork.Altair, BeaconBlockFork.Merge: + # Skipping all subsequent blocks should be OK because the spec says: + # "Clients MAY limit the number of blocks in the response." + # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange + # + # Also, our response would be indistinguishable from a node + # that have been synced exactly to the altair transition slot. + break debug "Block range request done", peer, startSlot, count, reqStep, found = count - startIndex @@ -297,7 +297,7 @@ p2pProtocol BeaconSync(version = 1, trace "got range request", peer, startSlot, count = reqCount, step = reqStep if reqCount > 0'u64 and reqStep > 0'u64: - var blocks: array[MAX_REQUEST_BLOCKS, BlockRef] + var blocks: array[MAX_REQUEST_BLOCKS, BlockId] let dag = peer.networkState.dag # Limit number of blocks in response @@ -314,11 +314,12 @@ p2pProtocol BeaconSync(version = 1, peer.awaitNonNegativeRequestQuota() for i in startIndex..endIndex: - doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only" - trace "wrote response block", - slot = blocks[i].slot, roor = shortLog(blocks[i].root) - let blk = dag.getForkedBlock(blocks[i]) - await response.write(blk.asSigned) + let + blk = dag.getForkedBlock(blocks[i]) + + if blk.isSome(): + let blck = blk.get() + await response.write(blck.asSigned) debug "Block range request done", peer, startSlot, count, reqStep, found = count - startIndex diff --git a/docs/block_flow.md b/docs/block_flow.md index 1cffcdcb9..9ac0e3abf 100644 --- a/docs/block_flow.md +++ b/docs/block_flow.md @@ -133,7 +133,7 @@ To mitigate blocking networking and timeshare between Io and compute, blocks are This in turn calls: - `storeBlock(Eth2Processor, SignedBeaconBlock, Slot)` -- `addRawBlock(ChainDAGRef, var BatchVerifier, SignedBeaconBlock, forkChoiceCallback)` +- `addHeadBlock(ChainDAGRef, var BatchVerifier, SignedBeaconBlock, forkChoiceCallback)` - trigger sending attestation if relevant ### Steady state (synced to head) diff --git a/research/block_sim.nim b/research/block_sim.nim index d6493224e..60449a95a 100644 --- a/research/block_sim.nim +++ b/research/block_sim.nim @@ -303,7 +303,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6, dag.withState(tmpState[], dag.head.atSlot(slot)): let newBlock = getNewBlock[phase0.SignedBeaconBlock](stateData, slot, cache) - added = dag.addRawBlock(verifier, newBlock) do ( + added = dag.addHeadBlock(verifier, newBlock) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -323,7 +323,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6, dag.withState(tmpState[], dag.head.atSlot(slot)): let newBlock = getNewBlock[altair.SignedBeaconBlock](stateData, slot, cache) - added = dag.addRawBlock(verifier, newBlock) do ( + added = dag.addHeadBlock(verifier, newBlock) do ( blckRef: BlockRef, signedBlock: altair.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -343,7 +343,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6, dag.withState(tmpState[], dag.head.atSlot(slot)): let newBlock = getNewBlock[merge.SignedBeaconBlock](stateData, slot, cache) - added = dag.addRawBlock(verifier, newBlock) do ( + added = dag.addHeadBlock(verifier, newBlock) do ( blckRef: BlockRef, signedBlock: merge.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid diff --git a/tests/consensus_spec/test_fixture_fork_choice.nim b/tests/consensus_spec/test_fixture_fork_choice.nim index 8591d25c3..1bb50af2e 100644 --- a/tests/consensus_spec/test_fixture_fork_choice.nim +++ b/tests/consensus_spec/test_fixture_fork_choice.nim @@ -181,7 +181,7 @@ proc stepOnBlock( else: type TrustedBlock = merge.TrustedSignedBeaconBlock - let blockAdded = dag.addRawBlock(verifier, signedBlock) do ( + let blockAdded = dag.addHeadBlock(verifier, signedBlock) do ( blckRef: BlockRef, signedBlock: TrustedBlock, epochRef: EpochRef ): diff --git a/tests/test_action_tracker.nim b/tests/test_action_tracker.nim index e71c44ce8..c9af001d2 100644 --- a/tests/test_action_tracker.nim +++ b/tests/test_action_tracker.nim @@ -6,7 +6,8 @@ import ../beacon_chain/validators/action_tracker suite "subnet tracker": - let rng = keys.newRng() + setup: + let rng = keys.newRng() test "should register stability subnets on attester duties": var tracker = ActionTracker.init(rng, false) diff --git a/tests/test_attestation_pool.nim b/tests/test_attestation_pool.nim index 551166776..96c98de5f 100644 --- a/tests/test_attestation_pool.nim +++ b/tests/test_attestation_pool.nim @@ -382,7 +382,7 @@ suite "Attestation pool processing" & preset(): var cache = StateCache() let b1 = addTestBlock(state.data, cache).phase0Data - b1Add = dag.addRawBlock(verifier, b1) do ( + b1Add = dag.addHeadBlock(verifier, b1) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -395,7 +395,7 @@ suite "Attestation pool processing" & preset(): let b2 = addTestBlock(state.data, cache).phase0Data - b2Add = dag.addRawBlock(verifier, b2) do ( + b2Add = dag.addHeadBlock(verifier, b2) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -410,7 +410,7 @@ suite "Attestation pool processing" & preset(): var cache = StateCache() let b10 = makeTestBlock(state.data, cache).phase0Data - b10Add = dag.addRawBlock(verifier, b10) do ( + b10Add = dag.addHeadBlock(verifier, b10) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -425,7 +425,7 @@ suite "Attestation pool processing" & preset(): b11 = makeTestBlock(state.data, cache, graffiti = GraffitiBytes [1'u8, 0, 0, 0 ,0 ,0 ,0 ,0 ,0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ).phase0Data - b11Add = dag.addRawBlock(verifier, b11) do ( + b11Add = dag.addHeadBlock(verifier, b11) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -471,7 +471,7 @@ suite "Attestation pool processing" & preset(): var cache = StateCache() let b10 = makeTestBlock(state.data, cache).phase0Data - b10Add = dag.addRawBlock(verifier, b10) do ( + b10Add = dag.addHeadBlock(verifier, b10) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -485,7 +485,7 @@ suite "Attestation pool processing" & preset(): # ------------------------------------------------------------- # Add back the old block to ensure we have a duplicate error let b10_clone = b10 # Assumes deep copy - let b10Add_clone = dag.addRawBlock(verifier, b10_clone) do ( + let b10Add_clone = dag.addHeadBlock(verifier, b10_clone) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -500,7 +500,7 @@ suite "Attestation pool processing" & preset(): var cache = StateCache() let b10 = addTestBlock(state.data, cache).phase0Data - b10Add = dag.addRawBlock(verifier, b10) do ( + b10Add = dag.addHeadBlock(verifier, b10) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -525,7 +525,7 @@ suite "Attestation pool processing" & preset(): let new_block = addTestBlock( state.data, cache, attestations = attestations).phase0Data - let blockRef = dag.addRawBlock(verifier, new_block) do ( + let blockRef = dag.addHeadBlock(verifier, new_block) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid @@ -567,7 +567,7 @@ suite "Attestation pool processing" & preset(): doAssert: b10.root notin pool.forkChoice.backend # Add back the old block to ensure we have a duplicate error - let b10Add_clone = dag.addRawBlock(verifier, b10_clone) do ( + let b10Add_clone = dag.addHeadBlock(verifier, b10_clone) do ( blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if valid diff --git a/tests/test_blockchain_dag.nim b/tests/test_blockchain_dag.nim index c6287b9c0..bccf79d15 100644 --- a/tests/test_blockchain_dag.nim +++ b/tests/test_blockchain_dag.nim @@ -72,7 +72,7 @@ suite "Block pool processing" & preset(): test "Simple block add&get" & preset(): let - b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback) + b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback) b1Get = dag.get(b1.root) check: @@ -83,7 +83,7 @@ suite "Block pool processing" & preset(): dag.heads[0] == b1Add[] let - b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback) + b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback) b2Get = dag.get(b2.root) er = dag.findEpochRef(b1Add[], b1Add[].slot.epoch) validators = getStateField(dag.headState.data, validators).lenu64() @@ -112,7 +112,7 @@ suite "Block pool processing" & preset(): let b4 = addTestBlock(state[], cache).phase0Data - b4Add = dag.addRawBlock(verifier, b4, nilPhase0Callback) + b4Add = dag.addHeadBlock(verifier, b4, nilPhase0Callback) check: b4Add[].parent == b2Add[] @@ -120,31 +120,31 @@ suite "Block pool processing" & preset(): dag.updateHead(b4Add[], quarantine) dag.pruneAtFinalization() - var blocks: array[3, BlockRef] + var blocks: array[3, BlockId] check: dag.getBlockRange(Slot(0), 1, blocks.toOpenArray(0, 0)) == 0 - blocks[0..<1] == [dag.tail] + blocks[0..<1] == [dag.tail.bid] dag.getBlockRange(Slot(0), 1, blocks.toOpenArray(0, 1)) == 0 - blocks[0..<2] == [dag.tail, b1Add[]] + blocks[0..<2] == [dag.tail.bid, b1Add[].bid] dag.getBlockRange(Slot(0), 2, blocks.toOpenArray(0, 1)) == 0 - blocks[0..<2] == [dag.tail, b2Add[]] + blocks[0..<2] == [dag.tail.bid, b2Add[].bid] dag.getBlockRange(Slot(0), 3, blocks.toOpenArray(0, 1)) == 1 - blocks[1..<2] == [dag.tail] # block 3 is missing! + blocks[1..<2] == [dag.tail.bid] # block 3 is missing! dag.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, 1)) == 0 - blocks[0..<2] == [b2Add[], b4Add[]] # block 3 is missing! + blocks[0..<2] == [b2Add[].bid, b4Add[].bid] # block 3 is missing! # large skip step dag.getBlockRange(Slot(0), uint64.high, blocks.toOpenArray(0, 2)) == 2 - blocks[2..2] == [dag.tail] + blocks[2..2] == [dag.tail.bid] # large skip step dag.getBlockRange(Slot(2), uint64.high, blocks.toOpenArray(0, 1)) == 1 - blocks[1..1] == [b2Add[]] + blocks[1..1] == [b2Add[].bid] # empty length dag.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, -1)) == 0 @@ -161,7 +161,7 @@ suite "Block pool processing" & preset(): test "updateHead updates head and headState" & preset(): let - b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback) + b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback) dag.updateHead(b1Add[], quarantine) dag.pruneAtFinalization() @@ -172,8 +172,8 @@ suite "Block pool processing" & preset(): test "updateStateData sanity" & preset(): let - b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback) - b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback) + b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback) + b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback) bs1 = BlockSlot(blck: b1Add[], slot: b1.message.slot) bs1_3 = b1Add[].atSlot(3.Slot) bs2_3 = b2Add[].atSlot(3.Slot) @@ -219,6 +219,9 @@ suite "Block pool processing" & preset(): tmpState.blck == b1Add[].parent getStateField(tmpState.data, slot) == bs1.parent.slot +when declared(GC_fullCollect): # i386 test machines seem to run low.. + GC_fullCollect() + suite "Block pool altair processing" & preset(): setup: var @@ -253,13 +256,13 @@ suite "Block pool altair processing" & preset(): MockPrivKeys[ValidatorIndex(0)]).toValidatorSig() check: - dag.addRawBlock(verifier, b1, nilAltairCallback).isOk() + dag.addHeadBlock(verifier, b1, nilAltairCallback).isOk() block: # Main signature var b = b2 b.signature = badSignature let - bAdd = dag.addRawBlock(verifier, b, nilAltairCallback) + bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback) check: bAdd.error() == BlockError.Invalid @@ -267,7 +270,7 @@ suite "Block pool altair processing" & preset(): var b = b2 b.message.body.randao_reveal = badSignature let - bAdd = dag.addRawBlock(verifier, b, nilAltairCallback) + bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback) check: bAdd.error() == BlockError.Invalid @@ -275,7 +278,7 @@ suite "Block pool altair processing" & preset(): var b = b2 b.message.body.attestations[0].signature = badSignature let - bAdd = dag.addRawBlock(verifier, b, nilAltairCallback) + bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback) check: bAdd.error() == BlockError.Invalid @@ -283,7 +286,7 @@ suite "Block pool altair processing" & preset(): var b = b2 b.message.body.sync_aggregate.sync_committee_signature = badSignature let - bAdd = dag.addRawBlock(verifier, b, nilAltairCallback) + bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback) check: bAdd.error() == BlockError.Invalid @@ -293,7 +296,7 @@ suite "Block pool altair processing" & preset(): b.message.body.sync_aggregate.sync_committee_bits[0] = true let - bAdd = dag.addRawBlock(verifier, b, nilAltairCallback) + bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback) check: bAdd.error() == BlockError.Invalid @@ -320,7 +323,7 @@ suite "chain DAG finalization tests" & preset(): let lateBlock = addTestBlock(tmpState[], cache).phase0Data block: - let status = dag.addRawBlock(verifier, blck, nilPhase0Callback) + let status = dag.addHeadBlock(verifier, blck, nilPhase0Callback) check: status.isOk() assign(tmpState[], dag.headState.data) @@ -335,7 +338,7 @@ suite "chain DAG finalization tests" & preset(): tmpState[], cache, attestations = makeFullAttestations( tmpState[], dag.head.root, getStateField(tmpState[], slot), cache, {})).phase0Data - let added = dag.addRawBlock(verifier, blck, nilPhase0Callback) + let added = dag.addHeadBlock(verifier, blck, nilPhase0Callback) check: added.isOk() dag.updateHead(added[], quarantine) dag.pruneAtFinalization() @@ -382,7 +385,7 @@ suite "chain DAG finalization tests" & preset(): block: # The late block is a block whose parent was finalized long ago and thus # is no longer a viable head candidate - let status = dag.addRawBlock(verifier, lateBlock, nilPhase0Callback) + let status = dag.addHeadBlock(verifier, lateBlock, nilPhase0Callback) check: status.error == BlockError.UnviableFork block: @@ -411,7 +414,7 @@ suite "chain DAG finalization tests" & preset(): assign(prestate[], dag.headState.data) let blck = makeTestBlock(dag.headState.data, cache).phase0Data - let added = dag.addRawBlock(verifier, blck, nilPhase0Callback) + let added = dag.addHeadBlock(verifier, blck, nilPhase0Callback) check: added.isOk() dag.updateHead(added[], quarantine) dag.pruneAtFinalization() @@ -430,21 +433,21 @@ suite "chain DAG finalization tests" & preset(): let blck = makeTestBlock(prestate[], cache).phase0Data # Add block, but don't update head - let added = dag.addRawBlock(verifier, blck, nilPhase0Callback) + let added = dag.addHeadBlock(verifier, blck, nilPhase0Callback) check: added.isOk() var dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {}) # check that we can apply the block after the orphaning - let added2 = dag2.addRawBlock(verifier, blck, nilPhase0Callback) + let added2 = dag2.addHeadBlock(verifier, blck, nilPhase0Callback) check: added2.isOk() test "init with gaps" & preset(): for blck in makeTestBlocks( dag.headState.data, cache, int(SLOTS_PER_EPOCH * 6 - 2), true): - let added = dag.addRawBlock(verifier, blck.phase0Data, nilPhase0Callback) + let added = dag.addHeadBlock(verifier, blck.phase0Data, nilPhase0Callback) check: added.isOk() dag.updateHead(added[], quarantine) dag.pruneAtFinalization() @@ -461,7 +464,7 @@ suite "chain DAG finalization tests" & preset(): dag.headState.data, dag.head.root, getStateField(dag.headState.data, slot), cache, {})).phase0Data - let added = dag.addRawBlock(verifier, blck, nilPhase0Callback) + let added = dag.addHeadBlock(verifier, blck, nilPhase0Callback) check: added.isOk() dag.updateHead(added[], quarantine) dag.pruneAtFinalization() @@ -527,7 +530,7 @@ suite "Old database versions" & preset(): cache = StateCache() att0 = makeFullAttestations(state[], dag.tail.root, 0.Slot, cache) b1 = addTestBlock(state[], cache, attestations = att0).phase0Data - b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback) + b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback) check: b1Add.isOk() @@ -561,7 +564,7 @@ suite "Diverging hardforks": # common is the tail block var b1 = addTestBlock(tmpState[], cache).phase0Data - b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback) + b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback) check b1Add.isOk() dag.updateHead(b1Add[], quarantine[]) @@ -579,7 +582,7 @@ suite "Diverging hardforks": # There's a block in the shared-correct phase0 hardfork, before epoch 2 var b1 = addTestBlock(tmpState[], cache).phase0Data - b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback) + b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback) check: b1Add.isOk() @@ -590,10 +593,114 @@ suite "Diverging hardforks": var b2 = addTestBlock(tmpState[], cache).phase0Data - b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback) + b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback) check b2Add.isOk() dag.updateHead(b2Add[], quarantine[]) var dagAltair = init(ChainDAGRef, altairRuntimeConfig, db, {}) discard AttestationPool.init(dagAltair, quarantine) + +suite "Backfill": + setup: + let + genState = (ref ForkedHashedBeaconState)( + kind: BeaconStateFork.Phase0, + phase0Data: initialize_hashed_beacon_state_from_eth1( + defaultRuntimeConfig, + Eth2Digest(), + 0, + makeInitialDeposits(SLOTS_PER_EPOCH.uint64, flags = {skipBlsValidation}), + {skipBlsValidation})) + genBlock = get_initial_beacon_block(genState[]) + tailState = assignClone(genState[]) + + blocks = block: + var blocks: seq[ForkedSignedBeaconBlock] + var cache: StateCache + for i in 0..