Backfill support for ChainDAG (#3171)

In the ChainDAG, 3 block pointers are kept: genesis, tail and head. This
PR adds one more block pointer: the backfill block which represents the
block that has been backfilled so far.

When doing a checkpoint sync, a random block is given as starting point
- this is the tail block, and we require that the tail block has a
corresponding state.

When backfilling, we end up with blocks without corresponding states,
hence we cannot use `tail` as a backfill pointer - there is no state.

Nonetheless, we need to keep track of where we are in the backfill
process between restarts, such that we can answer GetBeaconBlocksByRange
requests.

This PR adds the basic support for backfill handling - it needs to be
integrated with backfill sync, and the REST API needs to be adjusted to
take advantage of the new backfilled blocks when responding to certain
requests.

Future work will also enable moving the tail in either direction:
* pruning means moving the tail forward in time and removing states
* backwards means recreating past states from genesis, such that
intermediate states are recreated step by step all the way to the tail -
at that point, tail, genesis and backfill will match up.
* backfilling is done when backfill != genesis - later, this will be the
WSS checkpoint instead
This commit is contained in:
Jacek Sieka 2021-12-13 14:36:06 +01:00 committed by GitHub
parent dfbd50b4d6
commit 03005f48e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 430 additions and 147 deletions

View File

@ -44,6 +44,12 @@ OK: 25/25 Fail: 0/25 Skip: 0/25
+ Working with aggregates [Preset: mainnet] OK
```
OK: 11/11 Fail: 0/11 Skip: 0/11
## Backfill
```diff
+ backfill to genesis OK
+ reload backfill position OK
```
OK: 2/2 Fail: 0/2 Skip: 0/2
## Beacon chain DB [Preset: mainnet]
```diff
+ empty database [Preset: mainnet] OK
@ -389,4 +395,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL---
OK: 213/215 Fail: 0/215 Skip: 2/215
OK: 215/217 Fail: 0/217 Skip: 2/217

View File

@ -137,6 +137,9 @@ type
## only recent contract state data (i.e. only recent `deposit_roots`).
kHashToStateDiff # Obsolete
kHashToStateOnlyMutableValidators
kBackfillBlock
## Pointer to the earliest block that we have backfilled - if this is not
## set, backfill == tail
BeaconBlockSummary* = object
## Cache of beacon block summaries - during startup when we construct the
@ -588,6 +591,9 @@ proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) =
proc putGenesisBlock*(db: BeaconChainDB, key: Eth2Digest) =
db.keyValues.putRaw(subkey(kGenesisBlock), key)
proc putBackfillBlock*(db: BeaconChainDB, key: Eth2Digest) =
db.keyValues.putRaw(subkey(kBackfillBlock), key)
proc putEth2FinalizedTo*(db: BeaconChainDB,
eth1Checkpoint: DepositContractSnapshot) =
db.keyValues.putSnappySSZ(subkey(kDepositsFinalizedByEth2), eth1Checkpoint)
@ -791,6 +797,9 @@ proc getGenesisBlock*(db: BeaconChainDB): Opt[Eth2Digest] =
db.keyValues.getRaw(subkey(kGenesisBlock), Eth2Digest) or
db.v0.getGenesisBlock()
proc getBackfillBlock*(db: BeaconChainDB): Opt[Eth2Digest] =
db.keyValues.getRaw(subkey(kBackfillBlock), Eth2Digest)
proc getEth2FinalizedTo(db: BeaconChainDBV0): Opt[DepositContractSnapshot] =
result.ok(DepositContractSnapshot())
let r = db.backend.getSnappySSZ(subkey(kDepositsFinalizedByEth2), result.get)

View File

@ -31,7 +31,7 @@ export results, signatures_batch
logScope:
topics = "clearance"
proc addResolvedBlock(
proc addResolvedHeadBlock(
dag: ChainDAGRef,
state: var StateData,
trustedBlock: ForkyTrustedSignedBeaconBlock,
@ -149,7 +149,7 @@ proc advanceClearanceState*(dag: ChainDAGRef) =
debug "Prepared clearance state for next block",
next, updateStateDur = Moment.now() - startTick
proc addRawBlock*(
proc addHeadBlock*(
dag: ChainDAGRef, verifier: var BatchVerifier,
signedBlock: ForkySignedBeaconBlock,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded
@ -212,7 +212,7 @@ proc addRawBlock*(
# correct - from their point of view, the head block they have is the
# latest thing that happened on the chain and they're performing their
# duty correctly.
debug "Unviable block, dropping",
debug "Block from unviable fork",
finalizedHead = shortLog(dag.finalizedHead),
tail = shortLog(dag.tail)
@ -250,7 +250,7 @@ proc addRawBlock*(
let stateVerifyTick = Moment.now()
# Careful, clearanceState.data has been updated but not blck - we need to
# create the BlockRef first!
ok addResolvedBlock(
ok addResolvedHeadBlock(
dag, dag.clearanceState,
signedBlock.asTrusted(),
parent, cache,
@ -258,3 +258,76 @@ proc addRawBlock*(
stateDataDur = stateDataTick - startTick,
sigVerifyDur = sigVerifyTick - stateDataTick,
stateVerifyDur = stateVerifyTick - sigVerifyTick)
proc addBackfillBlock*(
dag: ChainDAGRef,
signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] =
## When performing checkpoint sync, we need to backfill historical blocks
## in order to respond to GetBlocksByRange requests. Backfill blocks are
## added in backwards order, one by one, based on the `parent_root` of the
## earliest block we know about.
##
## Because only one history is relevant when backfilling, one doesn't have to
## consider forks or other finalization-related issues - a block is either
## valid and finalized, or not.
logScope:
blockRoot = shortLog(signedBlock.root)
blck = shortLog(signedBlock.message)
backfill = (dag.backfill.slot, shortLog(dag.backfill.root))
template blck(): untyped = signedBlock.message # shortcuts without copy
template blockRoot(): untyped = signedBlock.root
if dag.backfill.slot <= signedBlock.message.slot or
signedBlock.message.slot <= dag.genesis.slot:
if blockRoot in dag:
debug "Block already exists"
return err(BlockError.Duplicate)
# The block is newer than our backfill position but not in the dag - either
# it sits somewhere between backfill and tail or it comes from an unviable
# fork. We don't have an in-memory way of checking the former condition so
# we return UnviableFork for that condition as well, even though `Duplicate`
# would be more correct
debug "Block unviable or duplicate"
return err(BlockError.UnviableFork)
if dag.backfill.root != signedBlock.root:
debug "Block does not match expected backfill root"
return err(BlockError.MissingParent) # MissingChild really, but ..
# If the hash is correct, the block itself must be correct, but the root does
# not cover the signature, which we check next
let proposerKey = dag.validatorKey(blck.proposer_index)
if proposerKey.isNone():
# This cannot happen, in theory, unless the checkpoint state is broken or
# there is a bug in our validator key caching scheme - in order not to
# send invalid attestations, we'll shut down defensively here - this might
# need revisiting in the future.
fatal "Invalid proposer in backfill block - checkpoint state corrupt?"
quit 1
if not verify_block_signature(
dag.forkAtEpoch(blck.slot.epoch),
getStateField(dag.headState.data, genesis_validators_root),
blck.slot,
signedBlock.root,
proposerKey.get(),
signedBlock.signature):
info "Block signature verification failed"
return err(BlockError.Invalid)
dag.putBlock(signedBlock.asTrusted())
dag.db.putBackfillBlock(signedBlock.root)
dag.backfill = (blck.slot, blck.parent_root)
# Invariants maintained on startup
doAssert dag.backfillBlocks.lenu64 == dag.tail.slot.uint64
doAssert dag.backfillBlocks.lenu64 > blck.slot.uint64
dag.backfillBlocks[blck.slot.int] = signedBlock.root
debug "Block backfilled"
ok()

View File

@ -92,13 +92,30 @@ type
finalizedBlocks*: seq[BlockRef] ##\
## Slot -> BlockRef mapping for the canonical chain - use getBlockBySlot
## to access, generally
## to access, generally - coverst the slots
## `tail.slot..finalizedHead.slot` (including the finalized head slot) -
## indices are thus offset by tail.slot
backfillBlocks*: seq[Eth2Digest] ##\
## Slot -> Eth2Digest, tail.slot entries
genesis*: BlockRef ##\
## The genesis block of the network
tail*: BlockRef ##\
## The earliest finalized block we know about
## The earliest finalized block for which we have a corresponding state -
## when making a replay of chain history, this is as far back as we can
## go - the tail block is unique in that its parent is set to `nil`, even
## in the case where a later genesis block exists.
backfill*: tuple[slot: Slot, root: Eth2Digest] ##\
## The backfill is root of the parent of the the earliest block that we
## have synced, when performing a checkpoint sync start. Because the
## `tail` BlockRef does not have a parent, we store here the root of the
## block we're expecting during backfill.
## When starting a checkpoint sync, `backfill` == `tail.parent_root` - we
## then sync backards, moving the backfill (but not tail!) until we hit
## genesis at which point we set backfill to the zero hash.
heads*: seq[BlockRef] ##\
## Candidate heads of candidate chains

View File

@ -173,17 +173,55 @@ func effective_balances*(epochRef: EpochRef): seq[Gwei] =
func getBlockBySlot*(dag: ChainDAGRef, slot: Slot): BlockSlot =
## Retrieve the canonical block at the given slot, or the last block that
## comes before - similar to atSlot, but without the linear scan
## comes before - similar to atSlot, but without the linear scan - see
## getBlockSlotIdBySlot for a version that covers backfill blocks as well
## May return an empty BlockSlot (where blck is nil!)
if slot == dag.genesis.slot:
# There may be gaps in the
return dag.genesis.atSlot(slot)
if slot > dag.finalizedHead.slot:
return dag.head.atSlot(slot) # Linear iteration is the fastest we have
var tmp = slot.int
doAssert dag.finalizedHead.slot >= dag.tail.slot
doAssert dag.tail.slot >= dag.backfill.slot
doAssert dag.finalizedBlocks.len ==
(dag.finalizedHead.slot - dag.tail.slot).int + 1, "see updateHead"
if slot >= dag.tail.slot:
var pos = int(slot - dag.tail.slot)
while true:
if dag.finalizedBlocks[tmp] != nil:
return dag.finalizedBlocks[tmp].atSlot(slot)
if tmp == 0:
raiseAssert "At least the genesis block should be available!"
tmp = tmp - 1
if dag.finalizedBlocks[pos] != nil:
return dag.finalizedBlocks[pos].atSlot(slot)
if pos == 0:
break
pos -= 1
if dag.tail.slot == 0:
raiseAssert "Genesis missing"
BlockSlot() # nil blck!
func getBlockSlotIdBySlot*(dag: ChainDAGRef, slot: Slot): BlockSlotId =
## Retrieve the canonical block at the given slot, or the last block that
## comes before - similar to atSlot, but without the linear scan
if slot == dag.genesis.slot:
return dag.genesis.bid.atSlot(slot)
if slot >= dag.tail.slot:
return dag.getBlockBySlot(slot).toBlockSlotId()
var pos = slot.int
while pos >= dag.backfill.slot.int:
if dag.backfillBlocks[pos] != Eth2Digest():
return BlockId(root: dag.backfillBlocks[pos], slot: Slot(pos)).atSlot(slot)
pos -= 1
BlockSlotId() # not backfilled yet, and not genesis
func epochAncestor*(blck: BlockRef, epoch: Epoch): EpochKey =
## The state transition works by storing information from blocks in a
@ -315,6 +353,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
let
tailBlockRoot = db.getTailBlock()
headBlockRoot = db.getHeadBlock()
backfillBlockRoot = db.getBackfillBlock()
doAssert tailBlockRoot.isSome(), "Missing tail block, database corrupt?"
doAssert headBlockRoot.isSome(), "Missing head block, database corrupt?"
@ -335,6 +374,18 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
"preInit should have initialized the database with a genesis block")
withBlck(genesisBlock): BlockRef.init(genesisBlockRoot, blck.message)
let backfill =
if backfillBlockRoot.isSome():
let backfillBlock = db.getForkedBlock(backfillBlockRoot.get()).expect(
"backfill block must be present in database, database corrupt?")
(getForkedBlockField(backfillBlock, slot),
getForkedBlockField(backfillBlock, parentRoot))
elif tailRef.slot > GENESIS_SLOT:
(getForkedBlockField(tailBlock, slot),
getForkedBlockField(tailBlock, parentRoot))
else:
(GENESIS_SLOT, Eth2Digest())
var
blocks: HashSet[KeyedBlockRef]
headRef: BlockRef
@ -344,27 +395,37 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
if genesisRef != tailRef:
blocks.incl(KeyedBlockRef.init(genesisRef))
if headRoot != tailRoot:
var curRef: BlockRef
var
backfillBlocks = newSeq[Eth2Digest](tailRef.slot.int)
curRef: BlockRef
for blck in db.getAncestorSummaries(headRoot):
if blck.root == tailRef.root:
doAssert(not curRef.isNil)
if blck.summary.slot < tailRef.slot:
backfillBlocks[blck.summary.slot.int] = blck.root
elif blck.summary.slot == tailRef.slot:
if curRef == nil:
curRef = tailRef
headRef = tailRef
else:
link(tailRef, curRef)
curRef = curRef.parent
break
else:
if curRef == nil:
# When the database has been written with a pre-fork version of the
# software, it may happen that blocks produced using an "unforked"
# chain get written to the database - we need to skip such blocks
# when loading the database with a fork-compatible version
if not containsBlock(cfg, db, blck.summary.slot, blck.root):
continue
let newRef = BlockRef.init(blck.root, blck.summary.slot)
if curRef == nil:
curRef = newRef
headRef = newRef
else:
link(newRef, curRef)
curRef = curRef.parent
# Don't include blocks on incorrect hardforks
if headRef == nil and cfg.containsBlock(db, newRef.slot, newRef.root):
headRef = newRef
blocks.incl(KeyedBlockRef.init(curRef))
trace "Populating block dag", key = curRef.root, val = curRef
@ -374,8 +435,6 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
blocks = blocks.len()
quit 1
else:
headRef = tailRef
# Because of incorrect hardfork check, there might be no head block, in which
# case it's equivalent to the tail block
@ -429,8 +488,10 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
let dag = ChainDAGRef(
db: db,
blocks: blocks,
backfillBlocks: backfillBlocks,
genesis: genesisRef,
tail: tailRef,
backfill: backfill,
finalizedHead: tailRef.atSlot(),
lastPrunePoint: tailRef.atSlot(),
# Tail is implicitly finalized - we'll adjust it below when computing the
@ -476,10 +537,11 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
dag.finalizedHead = headRef.atSlot(finalizedSlot)
block:
dag.finalizedBlocks.setLen(dag.finalizedHead.slot.int + 1)
dag.finalizedBlocks.setLen((dag.finalizedHead.slot - dag.tail.slot).int + 1)
var tmp = dag.finalizedHead.blck
while not isNil(tmp):
dag.finalizedBlocks[tmp.slot.int] = tmp
dag.finalizedBlocks[(tmp.slot - dag.tail.slot).int] = tmp
tmp = tmp.parent
dag.clearanceState = dag.headState
@ -499,7 +561,8 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
head = shortLog(dag.head),
finalizedHead = shortLog(dag.finalizedHead),
tail = shortLog(dag.tail),
totalBlocks = dag.blocks.len
totalBlocks = dag.blocks.len(),
backfill = (dag.backfill.slot, shortLog(dag.backfill.root))
dag
@ -631,9 +694,9 @@ func getRef*(dag: ChainDAGRef, root: Eth2Digest): BlockRef =
else:
nil
func getBlockRange*(
proc getBlockRange*(
dag: ChainDAGRef, startSlot: Slot, skipStep: uint64,
output: var openArray[BlockRef]): Natural =
output: var openArray[BlockId]): Natural =
## This function populates an `output` buffer of blocks
## with a slots ranging from `startSlot` up to, but not including,
## `startSlot + skipStep * output.len`, skipping any slots that don't have
@ -652,55 +715,63 @@ func getBlockRange*(
trace "getBlockRange entered",
head = shortLog(dag.head.root), requestedCount, startSlot, skipStep, headSlot
if startSlot < dag.tail.slot or headSlot <= startSlot or requestedCount == 0:
if startSlot < dag.backfill.slot:
notice "Got request for pre-backfill slot",
startSlot, backfillSlot = dag.backfill.slot
return output.len
if headSlot <= startSlot or requestedCount == 0:
return output.len # Identical to returning an empty set of block as indicated above
let
runway = uint64(headSlot - startSlot)
# This is the number of blocks that will follow the start block
extraBlocks = min(runway div skipStep, requestedCount - 1)
extraSlots = min(runway div skipStep, requestedCount - 1)
# If `skipStep` is very large, `extraBlocks` should be 0 from
# If `skipStep` is very large, `extraSlots` should be 0 from
# the previous line, so `endSlot` will be equal to `startSlot`:
endSlot = startSlot + extraBlocks * skipStep
endSlot = startSlot + extraSlots * skipStep
var
b = dag.getBlockBySlot(endSlot)
curSlot = endSlot
o = output.len
# Process all blocks that follow the start block (may be zero blocks)
for i in 1..extraBlocks:
if b.blck.slot == b.slot:
dec o
output[o] = b.blck
for j in 1..skipStep:
b = b.parent
while curSlot > startSlot:
let bs = dag.getBlockSlotIdBySlot(curSlot)
if bs.isProposed():
o -= 1
output[o] = bs.bid
curSlot -= skipStep
# We should now be at the start block.
# Like any "block slot", it may be a missing/skipped block:
if b.blck.slot == b.slot:
dec o
output[o] = b.blck
# Handle start slot separately (to avoid underflow when computing curSlot)
let bs = dag.getBlockSlotIdBySlot(startSlot)
if bs.isProposed():
o -= 1
output[o] = bs.bid
o # Return the index of the first non-nil item in the output
proc getForkedBlock*(dag: ChainDAGRef, blck: BlockRef): ForkedTrustedSignedBeaconBlock =
case dag.cfg.blockForkAtEpoch(blck.slot.epoch)
proc getForkedBlock*(dag: ChainDAGRef, id: BlockId): Opt[ForkedTrustedSignedBeaconBlock] =
case dag.cfg.blockForkAtEpoch(id.slot.epoch)
of BeaconBlockFork.Phase0:
let data = dag.db.getPhase0Block(blck.root)
let data = dag.db.getPhase0Block(id.root)
if data.isOk():
return ForkedTrustedSignedBeaconBlock.init(data.get)
return ok ForkedTrustedSignedBeaconBlock.init(data.get)
of BeaconBlockFork.Altair:
let data = dag.db.getAltairBlock(blck.root)
let data = dag.db.getAltairBlock(id.root)
if data.isOk():
return ForkedTrustedSignedBeaconBlock.init(data.get)
return ok ForkedTrustedSignedBeaconBlock.init(data.get)
of BeaconBlockFork.Merge:
let data = dag.db.getMergeBlock(blck.root)
let data = dag.db.getMergeBlock(id.root)
if data.isOk():
return ForkedTrustedSignedBeaconBlock.init(data.get)
return ok ForkedTrustedSignedBeaconBlock.init(data.get)
raiseAssert "BlockRef without backing data, database corrupt?"
proc getForkedBlock*(dag: ChainDAGRef, blck: BlockRef): ForkedTrustedSignedBeaconBlock =
let blck = dag.getForkedBlock(blck.bid)
if blck.isSome():
return blck.get()
proc get*(dag: ChainDAGRef, blck: BlockRef): BlockData =
## Retrieve the associated block body of a block reference
@ -1195,17 +1266,17 @@ proc updateHead*(
let currentEpoch = epoch(newHead.slot)
let
currentDutyDepRoot =
if currentEpoch > Epoch(0):
if currentEpoch > dag.tail.slot.epoch:
dag.head.atSlot(
compute_start_slot_at_epoch(currentEpoch) - 1).blck.root
else:
dag.genesis.root
dag.tail.root
previousDutyDepRoot =
if currentEpoch > Epoch(1):
if currentEpoch > dag.tail.slot.epoch + 1:
dag.head.atSlot(
compute_start_slot_at_epoch(currentEpoch - 1) - 1).blck.root
else:
dag.genesis.root
dag.tail.root
epochTransition = (finalizedHead != dag.finalizedHead)
let data = HeadChangeInfoObject.init(dag.head.slot, dag.head.root,
getStateRoot(dag.headState.data),
@ -1260,10 +1331,10 @@ proc updateHead*(
# Update `dag.finalizedBlocks` with all newly finalized blocks (those
# newer than the previous finalized head), then update `dag.finalizedHead`
dag.finalizedBlocks.setLen(finalizedHead.slot.int + 1)
dag.finalizedBlocks.setLen(finalizedHead.slot - dag.tail.slot + 1)
var tmp = finalizedHead.blck
while not isNil(tmp) and tmp.slot >= dag.finalizedHead.slot:
dag.finalizedBlocks[tmp.slot.int] = tmp
dag.finalizedBlocks[(tmp.slot - dag.tail.slot).int] = tmp
tmp = tmp.parent
dag.finalizedHead = finalizedHead

View File

@ -175,7 +175,7 @@ proc storeBlock*(
self.consensusManager.quarantine[].removeOrphan(signedBlock)
type Trusted = typeof signedBlock.asTrusted()
let blck = dag.addRawBlock(self.verifier, signedBlock) do (
let blck = dag.addHeadBlock(self.verifier, signedBlock) do (
blckRef: BlockRef, trustedBlock: Trusted, epochRef: EpochRef):
# Callback add to fork choice if valid
attestationPool[].addForkChoice(

View File

@ -209,7 +209,7 @@ p2pProtocol BeaconSync(version = 1,
trace "got range request", peer, startSlot,
count = reqCount, step = reqStep
if reqCount > 0'u64 and reqStep > 0'u64:
var blocks: array[MAX_REQUEST_BLOCKS, BlockRef]
var blocks: array[MAX_REQUEST_BLOCKS, BlockId]
let
dag = peer.networkState.dag
# Limit number of blocks in response
@ -218,21 +218,21 @@ p2pProtocol BeaconSync(version = 1,
let
endIndex = count - 1
startIndex =
dag.getBlockRange(startSlot, reqStep,
blocks.toOpenArray(0, endIndex))
dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex))
peer.updateRequestQuota(
blockByRangeLookupCost +
max(0, endIndex - startIndex + 1).float * blockResponseCost)
peer.awaitNonNegativeRequestQuota()
for i in startIndex..endIndex:
doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only"
trace "wrote response block",
slot = blocks[i].slot, roor = shortLog(blocks[i].root)
let blk = dag.get(blocks[i]).data
case blk.kind
let blk = dag.getForkedBlock(blocks[i])
if blk.isSome():
let blck = blk.get()
case blck.kind
of BeaconBlockFork.Phase0:
await response.write(blk.phase0Data.asSigned)
await response.write(blck.phase0Data.asSigned)
of BeaconBlockFork.Altair, BeaconBlockFork.Merge:
# Skipping all subsequent blocks should be OK because the spec says:
# "Clients MAY limit the number of blocks in the response."
@ -297,7 +297,7 @@ p2pProtocol BeaconSync(version = 1,
trace "got range request", peer, startSlot,
count = reqCount, step = reqStep
if reqCount > 0'u64 and reqStep > 0'u64:
var blocks: array[MAX_REQUEST_BLOCKS, BlockRef]
var blocks: array[MAX_REQUEST_BLOCKS, BlockId]
let
dag = peer.networkState.dag
# Limit number of blocks in response
@ -314,11 +314,12 @@ p2pProtocol BeaconSync(version = 1,
peer.awaitNonNegativeRequestQuota()
for i in startIndex..endIndex:
doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only"
trace "wrote response block",
slot = blocks[i].slot, roor = shortLog(blocks[i].root)
let blk = dag.getForkedBlock(blocks[i])
await response.write(blk.asSigned)
let
blk = dag.getForkedBlock(blocks[i])
if blk.isSome():
let blck = blk.get()
await response.write(blck.asSigned)
debug "Block range request done",
peer, startSlot, count, reqStep, found = count - startIndex

View File

@ -133,7 +133,7 @@ To mitigate blocking networking and timeshare between Io and compute, blocks are
This in turn calls:
- `storeBlock(Eth2Processor, SignedBeaconBlock, Slot)`
- `addRawBlock(ChainDAGRef, var BatchVerifier, SignedBeaconBlock, forkChoiceCallback)`
- `addHeadBlock(ChainDAGRef, var BatchVerifier, SignedBeaconBlock, forkChoiceCallback)`
- trigger sending attestation if relevant
### Steady state (synced to head)

View File

@ -303,7 +303,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
dag.withState(tmpState[], dag.head.atSlot(slot)):
let
newBlock = getNewBlock[phase0.SignedBeaconBlock](stateData, slot, cache)
added = dag.addRawBlock(verifier, newBlock) do (
added = dag.addHeadBlock(verifier, newBlock) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -323,7 +323,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
dag.withState(tmpState[], dag.head.atSlot(slot)):
let
newBlock = getNewBlock[altair.SignedBeaconBlock](stateData, slot, cache)
added = dag.addRawBlock(verifier, newBlock) do (
added = dag.addHeadBlock(verifier, newBlock) do (
blckRef: BlockRef, signedBlock: altair.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -343,7 +343,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
dag.withState(tmpState[], dag.head.atSlot(slot)):
let
newBlock = getNewBlock[merge.SignedBeaconBlock](stateData, slot, cache)
added = dag.addRawBlock(verifier, newBlock) do (
added = dag.addHeadBlock(verifier, newBlock) do (
blckRef: BlockRef, signedBlock: merge.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid

View File

@ -181,7 +181,7 @@ proc stepOnBlock(
else:
type TrustedBlock = merge.TrustedSignedBeaconBlock
let blockAdded = dag.addRawBlock(verifier, signedBlock) do (
let blockAdded = dag.addHeadBlock(verifier, signedBlock) do (
blckRef: BlockRef, signedBlock: TrustedBlock, epochRef: EpochRef
):

View File

@ -6,6 +6,7 @@ import
../beacon_chain/validators/action_tracker
suite "subnet tracker":
setup:
let rng = keys.newRng()
test "should register stability subnets on attester duties":

View File

@ -382,7 +382,7 @@ suite "Attestation pool processing" & preset():
var cache = StateCache()
let
b1 = addTestBlock(state.data, cache).phase0Data
b1Add = dag.addRawBlock(verifier, b1) do (
b1Add = dag.addHeadBlock(verifier, b1) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -395,7 +395,7 @@ suite "Attestation pool processing" & preset():
let
b2 = addTestBlock(state.data, cache).phase0Data
b2Add = dag.addRawBlock(verifier, b2) do (
b2Add = dag.addHeadBlock(verifier, b2) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -410,7 +410,7 @@ suite "Attestation pool processing" & preset():
var cache = StateCache()
let
b10 = makeTestBlock(state.data, cache).phase0Data
b10Add = dag.addRawBlock(verifier, b10) do (
b10Add = dag.addHeadBlock(verifier, b10) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -425,7 +425,7 @@ suite "Attestation pool processing" & preset():
b11 = makeTestBlock(state.data, cache,
graffiti = GraffitiBytes [1'u8, 0, 0, 0 ,0 ,0 ,0 ,0 ,0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
).phase0Data
b11Add = dag.addRawBlock(verifier, b11) do (
b11Add = dag.addHeadBlock(verifier, b11) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -471,7 +471,7 @@ suite "Attestation pool processing" & preset():
var cache = StateCache()
let
b10 = makeTestBlock(state.data, cache).phase0Data
b10Add = dag.addRawBlock(verifier, b10) do (
b10Add = dag.addHeadBlock(verifier, b10) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -485,7 +485,7 @@ suite "Attestation pool processing" & preset():
# -------------------------------------------------------------
# Add back the old block to ensure we have a duplicate error
let b10_clone = b10 # Assumes deep copy
let b10Add_clone = dag.addRawBlock(verifier, b10_clone) do (
let b10Add_clone = dag.addHeadBlock(verifier, b10_clone) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -500,7 +500,7 @@ suite "Attestation pool processing" & preset():
var cache = StateCache()
let
b10 = addTestBlock(state.data, cache).phase0Data
b10Add = dag.addRawBlock(verifier, b10) do (
b10Add = dag.addHeadBlock(verifier, b10) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -525,7 +525,7 @@ suite "Attestation pool processing" & preset():
let new_block = addTestBlock(
state.data, cache, attestations = attestations).phase0Data
let blockRef = dag.addRawBlock(verifier, new_block) do (
let blockRef = dag.addHeadBlock(verifier, new_block) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -567,7 +567,7 @@ suite "Attestation pool processing" & preset():
doAssert: b10.root notin pool.forkChoice.backend
# Add back the old block to ensure we have a duplicate error
let b10Add_clone = dag.addRawBlock(verifier, b10_clone) do (
let b10Add_clone = dag.addHeadBlock(verifier, b10_clone) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid

View File

@ -72,7 +72,7 @@ suite "Block pool processing" & preset():
test "Simple block add&get" & preset():
let
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
b1Get = dag.get(b1.root)
check:
@ -83,7 +83,7 @@ suite "Block pool processing" & preset():
dag.heads[0] == b1Add[]
let
b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback)
b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback)
b2Get = dag.get(b2.root)
er = dag.findEpochRef(b1Add[], b1Add[].slot.epoch)
validators = getStateField(dag.headState.data, validators).lenu64()
@ -112,7 +112,7 @@ suite "Block pool processing" & preset():
let
b4 = addTestBlock(state[], cache).phase0Data
b4Add = dag.addRawBlock(verifier, b4, nilPhase0Callback)
b4Add = dag.addHeadBlock(verifier, b4, nilPhase0Callback)
check:
b4Add[].parent == b2Add[]
@ -120,31 +120,31 @@ suite "Block pool processing" & preset():
dag.updateHead(b4Add[], quarantine)
dag.pruneAtFinalization()
var blocks: array[3, BlockRef]
var blocks: array[3, BlockId]
check:
dag.getBlockRange(Slot(0), 1, blocks.toOpenArray(0, 0)) == 0
blocks[0..<1] == [dag.tail]
blocks[0..<1] == [dag.tail.bid]
dag.getBlockRange(Slot(0), 1, blocks.toOpenArray(0, 1)) == 0
blocks[0..<2] == [dag.tail, b1Add[]]
blocks[0..<2] == [dag.tail.bid, b1Add[].bid]
dag.getBlockRange(Slot(0), 2, blocks.toOpenArray(0, 1)) == 0
blocks[0..<2] == [dag.tail, b2Add[]]
blocks[0..<2] == [dag.tail.bid, b2Add[].bid]
dag.getBlockRange(Slot(0), 3, blocks.toOpenArray(0, 1)) == 1
blocks[1..<2] == [dag.tail] # block 3 is missing!
blocks[1..<2] == [dag.tail.bid] # block 3 is missing!
dag.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, 1)) == 0
blocks[0..<2] == [b2Add[], b4Add[]] # block 3 is missing!
blocks[0..<2] == [b2Add[].bid, b4Add[].bid] # block 3 is missing!
# large skip step
dag.getBlockRange(Slot(0), uint64.high, blocks.toOpenArray(0, 2)) == 2
blocks[2..2] == [dag.tail]
blocks[2..2] == [dag.tail.bid]
# large skip step
dag.getBlockRange(Slot(2), uint64.high, blocks.toOpenArray(0, 1)) == 1
blocks[1..1] == [b2Add[]]
blocks[1..1] == [b2Add[].bid]
# empty length
dag.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, -1)) == 0
@ -161,7 +161,7 @@ suite "Block pool processing" & preset():
test "updateHead updates head and headState" & preset():
let
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
dag.updateHead(b1Add[], quarantine)
dag.pruneAtFinalization()
@ -172,8 +172,8 @@ suite "Block pool processing" & preset():
test "updateStateData sanity" & preset():
let
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback)
b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback)
bs1 = BlockSlot(blck: b1Add[], slot: b1.message.slot)
bs1_3 = b1Add[].atSlot(3.Slot)
bs2_3 = b2Add[].atSlot(3.Slot)
@ -219,6 +219,9 @@ suite "Block pool processing" & preset():
tmpState.blck == b1Add[].parent
getStateField(tmpState.data, slot) == bs1.parent.slot
when declared(GC_fullCollect): # i386 test machines seem to run low..
GC_fullCollect()
suite "Block pool altair processing" & preset():
setup:
var
@ -253,13 +256,13 @@ suite "Block pool altair processing" & preset():
MockPrivKeys[ValidatorIndex(0)]).toValidatorSig()
check:
dag.addRawBlock(verifier, b1, nilAltairCallback).isOk()
dag.addHeadBlock(verifier, b1, nilAltairCallback).isOk()
block: # Main signature
var b = b2
b.signature = badSignature
let
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == BlockError.Invalid
@ -267,7 +270,7 @@ suite "Block pool altair processing" & preset():
var b = b2
b.message.body.randao_reveal = badSignature
let
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == BlockError.Invalid
@ -275,7 +278,7 @@ suite "Block pool altair processing" & preset():
var b = b2
b.message.body.attestations[0].signature = badSignature
let
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == BlockError.Invalid
@ -283,7 +286,7 @@ suite "Block pool altair processing" & preset():
var b = b2
b.message.body.sync_aggregate.sync_committee_signature = badSignature
let
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == BlockError.Invalid
@ -293,7 +296,7 @@ suite "Block pool altair processing" & preset():
b.message.body.sync_aggregate.sync_committee_bits[0] = true
let
bAdd = dag.addRawBlock(verifier, b, nilAltairCallback)
bAdd = dag.addHeadBlock(verifier, b, nilAltairCallback)
check:
bAdd.error() == BlockError.Invalid
@ -320,7 +323,7 @@ suite "chain DAG finalization tests" & preset():
let lateBlock = addTestBlock(tmpState[], cache).phase0Data
block:
let status = dag.addRawBlock(verifier, blck, nilPhase0Callback)
let status = dag.addHeadBlock(verifier, blck, nilPhase0Callback)
check: status.isOk()
assign(tmpState[], dag.headState.data)
@ -335,7 +338,7 @@ suite "chain DAG finalization tests" & preset():
tmpState[], cache,
attestations = makeFullAttestations(
tmpState[], dag.head.root, getStateField(tmpState[], slot), cache, {})).phase0Data
let added = dag.addRawBlock(verifier, blck, nilPhase0Callback)
let added = dag.addHeadBlock(verifier, blck, nilPhase0Callback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
@ -382,7 +385,7 @@ suite "chain DAG finalization tests" & preset():
block:
# The late block is a block whose parent was finalized long ago and thus
# is no longer a viable head candidate
let status = dag.addRawBlock(verifier, lateBlock, nilPhase0Callback)
let status = dag.addHeadBlock(verifier, lateBlock, nilPhase0Callback)
check: status.error == BlockError.UnviableFork
block:
@ -411,7 +414,7 @@ suite "chain DAG finalization tests" & preset():
assign(prestate[], dag.headState.data)
let blck = makeTestBlock(dag.headState.data, cache).phase0Data
let added = dag.addRawBlock(verifier, blck, nilPhase0Callback)
let added = dag.addHeadBlock(verifier, blck, nilPhase0Callback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
@ -430,21 +433,21 @@ suite "chain DAG finalization tests" & preset():
let blck = makeTestBlock(prestate[], cache).phase0Data
# Add block, but don't update head
let added = dag.addRawBlock(verifier, blck, nilPhase0Callback)
let added = dag.addHeadBlock(verifier, blck, nilPhase0Callback)
check: added.isOk()
var
dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
# check that we can apply the block after the orphaning
let added2 = dag2.addRawBlock(verifier, blck, nilPhase0Callback)
let added2 = dag2.addHeadBlock(verifier, blck, nilPhase0Callback)
check: added2.isOk()
test "init with gaps" & preset():
for blck in makeTestBlocks(
dag.headState.data, cache, int(SLOTS_PER_EPOCH * 6 - 2),
true):
let added = dag.addRawBlock(verifier, blck.phase0Data, nilPhase0Callback)
let added = dag.addHeadBlock(verifier, blck.phase0Data, nilPhase0Callback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
@ -461,7 +464,7 @@ suite "chain DAG finalization tests" & preset():
dag.headState.data, dag.head.root, getStateField(dag.headState.data, slot),
cache, {})).phase0Data
let added = dag.addRawBlock(verifier, blck, nilPhase0Callback)
let added = dag.addHeadBlock(verifier, blck, nilPhase0Callback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
@ -527,7 +530,7 @@ suite "Old database versions" & preset():
cache = StateCache()
att0 = makeFullAttestations(state[], dag.tail.root, 0.Slot, cache)
b1 = addTestBlock(state[], cache, attestations = att0).phase0Data
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
check:
b1Add.isOk()
@ -561,7 +564,7 @@ suite "Diverging hardforks":
# common is the tail block
var
b1 = addTestBlock(tmpState[], cache).phase0Data
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
check b1Add.isOk()
dag.updateHead(b1Add[], quarantine[])
@ -579,7 +582,7 @@ suite "Diverging hardforks":
# There's a block in the shared-correct phase0 hardfork, before epoch 2
var
b1 = addTestBlock(tmpState[], cache).phase0Data
b1Add = dag.addRawBlock(verifier, b1, nilPhase0Callback)
b1Add = dag.addHeadBlock(verifier, b1, nilPhase0Callback)
check:
b1Add.isOk()
@ -590,10 +593,114 @@ suite "Diverging hardforks":
var
b2 = addTestBlock(tmpState[], cache).phase0Data
b2Add = dag.addRawBlock(verifier, b2, nilPhase0Callback)
b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback)
check b2Add.isOk()
dag.updateHead(b2Add[], quarantine[])
var dagAltair = init(ChainDAGRef, altairRuntimeConfig, db, {})
discard AttestationPool.init(dagAltair, quarantine)
suite "Backfill":
setup:
let
genState = (ref ForkedHashedBeaconState)(
kind: BeaconStateFork.Phase0,
phase0Data: initialize_hashed_beacon_state_from_eth1(
defaultRuntimeConfig,
Eth2Digest(),
0,
makeInitialDeposits(SLOTS_PER_EPOCH.uint64, flags = {skipBlsValidation}),
{skipBlsValidation}))
genBlock = get_initial_beacon_block(genState[])
tailState = assignClone(genState[])
blocks = block:
var blocks: seq[ForkedSignedBeaconBlock]
var cache: StateCache
for i in 0..<SLOTS_PER_EPOCH:
blocks.add addTestBlock(tailState[], cache)
blocks
let
db = BeaconChainDB.new("", inMemory = true)
test "backfill to genesis":
let
tailBlock = blocks[^1]
ChainDAGRef.preInit(
db, genState[], tailState[], tailBlock.asTrusted())
let dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
check:
dag.getRef(tailBlock.root) == dag.tail
dag.getRef(blocks[^2].root) == nil
dag.getBlockBySlot(dag.tail.slot).blck == dag.tail
dag.getBlockBySlot(dag.tail.slot - 1).blck == nil
dag.getBlockBySlot(Slot(0)).blck == dag.genesis
dag.getBlockSlotIdBySlot(Slot(0)) == dag.genesis.bid.atSlot(Slot(0))
dag.getBlockSlotIdBySlot(Slot(1)) == BlockSlotId()
var
badBlock = blocks[^2].phase0Data
badBlock.signature = blocks[^3].phase0Data.signature
check:
dag.addBackfillBlock(badBlock).error == BlockError.Invalid
check:
dag.addBackfillBlock(blocks[^3].phase0Data).error == BlockError.MissingParent
dag.addBackfillBlock(tailBlock.phase0Data).error == BlockError.Duplicate
dag.addBackfillBlock(genBlock.phase0Data.asSigned()).error == BlockError.Duplicate
check:
dag.addBackfillBlock(blocks[^2].phase0Data).isOk()
dag.getRef(tailBlock.root) == dag.tail
dag.getRef(blocks[^2].root) == nil
dag.getBlockBySlot(dag.tail.slot).blck == dag.tail
dag.getBlockBySlot(dag.tail.slot - 1).blck == nil
dag.getBlockSlotIdBySlot(dag.tail.slot - 1) ==
blocks[^2].toBlockId().atSlot()
dag.getBlockSlotIdBySlot(dag.tail.slot - 2) == BlockSlotId()
check:
dag.addBackfillBlock(blocks[^3].phase0Data).isOk()
dag.getBlockSlotIdBySlot(dag.tail.slot - 2) ==
blocks[^3].toBlockId().atSlot()
dag.getBlockSlotIdBySlot(dag.tail.slot - 3) == BlockSlotId()
for i in 3..<blocks.len:
check: dag.addBackfillBlock(blocks[blocks.len - i - 1].phase0Data).isOk()
test "reload backfill position":
let
tailBlock = blocks[^1]
ChainDAGRef.preInit(
db, genState[], tailState[], tailBlock.asTrusted())
let dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
check:
dag.addBackfillBlock(blocks[^2].phase0Data).isOk()
let dag2 = init(ChainDAGRef, defaultRuntimeConfig, db, {})
check:
dag.getRef(tailBlock.root) == dag.tail
dag.getRef(blocks[^2].root) == nil
dag.getBlockBySlot(dag.tail.slot).blck == dag.tail
dag.getBlockBySlot(dag.tail.slot - 1).blck == nil
dag.getBlockSlotIdBySlot(dag.tail.slot - 1) ==
blocks[^2].toBlockId().atSlot()
dag.getBlockSlotIdBySlot(dag.tail.slot - 2) == BlockSlotId()

View File

@ -6,9 +6,6 @@ import
../beacon_chain/eth1/eth1_monitor,
./testutil
suite "Eth1 Chain":
discard
suite "Eth1 monitor":
test "Rewrite HTTPS Infura URLs":
var

View File

@ -75,7 +75,7 @@ suite "Gossip validation " & preset():
cache: StateCache
for blck in makeTestBlocks(
dag.headState.data, cache, int(SLOTS_PER_EPOCH * 5), false):
let added = dag.addRawBlock(verifier, blck.phase0Data) do (
let added = dag.addHeadBlock(verifier, blck.phase0Data) do (
blckRef: BlockRef, signedBlock: phase0.TrustedSignedBeaconBlock,
epochRef: EpochRef):
# Callback add to fork choice if valid
@ -197,13 +197,13 @@ suite "Gossip validation - Extra": # Not based on preset config
case blck.kind
of BeaconBlockFork.Phase0:
const nilCallback = OnPhase0BlockAdded(nil)
dag.addRawBlock(verifier, blck.phase0Data, nilCallback)
dag.addHeadBlock(verifier, blck.phase0Data, nilCallback)
of BeaconBlockFork.Altair:
const nilCallback = OnAltairBlockAdded(nil)
dag.addRawBlock(verifier, blck.altairData, nilCallback)
dag.addHeadBlock(verifier, blck.altairData, nilCallback)
of BeaconBlockFork.Merge:
const nilCallback = OnMergeBlockAdded(nil)
dag.addRawBlock(verifier, blck.mergeData, nilCallback)
dag.addHeadBlock(verifier, blck.mergeData, nilCallback)
check: added.isOk()
dag.updateHead(added[], quarantine[])
dag

View File

@ -11,6 +11,7 @@ import
./testutil
suite "Merge test vectors":
setup:
let web3Provider = (waitFor Web3DataProvider.new(
default(Eth1Address), "http://127.0.0.1:8550")).get