reduce LC optsync latency (#4002)

The optimistic sync spec was updated since the LC based optsync module
was introduced. It is no longer necessary to wait for the justified
checkpoint to have execution enabled; instead, any block is okay to be
optimistically imported to the EL client, as long as its parent block
has execution enabled. Complex syncing logic has been removed, and the
LC optsync module will now follow gossip directly, reducing the latency
when using this module. Note that because this is now based on gossip
instead of using sync manager / request manager, that individual blocks
may be missed. However, EL clients should recover from this by fetching
missing blocks themselves.
This commit is contained in:
Etan Kissling 2022-08-25 05:53:59 +02:00 committed by GitHub
parent b6488d5245
commit 9180f09641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 553 additions and 1401 deletions

View File

@ -56,17 +56,6 @@ OK: 6/6 Fail: 0/6 Skip: 0/6
+ basics OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Block clearance (light client) [Preset: mainnet]
```diff
+ Delayed finality update OK
+ Error conditions OK
+ Incremental sync OK
+ Initial sync OK
+ Low slot numbers OK
+ Reorg OK
+ Reverse incremental sync OK
```
OK: 7/7 Fail: 0/7 Skip: 0/7
## Block pool altair processing [Preset: mainnet]
```diff
+ Invalid signatures [Preset: mainnet] OK
@ -596,4 +585,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 9/9 Fail: 0/9 Skip: 0/9
---TOTAL---
OK: 333/338 Fail: 0/338 Skip: 5/338
OK: 326/331 Fail: 0/331 Skip: 5/331

View File

@ -15,7 +15,7 @@ import
# Local modules
"."/[beacon_clock, beacon_chain_db, conf, light_client],
./gossip_processing/[eth2_processor, block_processor],
./gossip_processing/[eth2_processor, block_processor, optimistic_processor],
./networking/eth2_network,
./eth1/eth1_monitor,
./consensus_object_pools/[
@ -23,7 +23,7 @@ import
attestation_pool, sync_committee_msg_pool],
./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients,
./sync/[optimistic_sync_light_client, sync_manager, request_manager],
./sync/[sync_manager, request_manager],
./validators/[
action_tracker, message_router, validator_monitor, validator_pool,
keystore_management],
@ -33,9 +33,9 @@ export
osproc, chronos, httpserver, presto, action_tracker,
beacon_clock, beacon_chain_db, conf, light_client,
attestation_pool, sync_committee_msg_pool, validator_pool,
eth2_network, eth1_monitor, optimistic_sync_light_client,
request_manager, sync_manager, eth2_processor, blockchain_dag,
block_quarantine, base, exit_pool, message_router, validator_monitor,
eth2_network, eth1_monitor, request_manager, sync_manager,
eth2_processor, optimistic_processor, blockchain_dag, block_quarantine,
base, exit_pool, message_router, validator_monitor,
consensus_manager, dynamic_fee_recipients
type
@ -58,7 +58,7 @@ type
db*: BeaconChainDB
config*: BeaconNodeConf
attachedValidators*: ref ValidatorPool
lcOptSync*: LCOptimisticSync
optimisticProcessor*: OptimisticProcessor
lightClient*: LightClient
dag*: ChainDAGRef
quarantine*: ref Quarantine
@ -83,6 +83,7 @@ type
consensusManager*: ref ConsensusManager
attachedValidatorBalanceTotal*: uint64
gossipState*: GossipState
blocksGossipState*: GossipState
beaconClock*: BeaconClock
restKeysCache*: Table[ValidatorPubKey, ValidatorIndex]
validatorMonitor*: ref ValidatorMonitor

View File

@ -16,6 +16,24 @@ import
logScope: topics = "beacnde"
func shouldSyncOptimistically*(node: BeaconNode, wallSlot: Slot): bool =
# Check whether light client is used for syncing
let optimisticHeader = node.lightClient.optimisticHeader.valueOr:
return false
# Check whether light client is sufficiently ahead of DAG
const minProgress = 8 * SLOTS_PER_EPOCH # Set arbitrarily
let dagSlot = getStateField(node.dag.headState, slot)
if dagSlot + minProgress > optimisticHeader.slot:
return false
# Check whether light client has synced sufficiently close to wall slot
const maxAge = 2 * SLOTS_PER_EPOCH
if optimisticHeader.slot < max(wallSlot, maxAge.Slot) - maxAge:
return false
true
proc initLightClient*(
node: BeaconNode,
rng: ref HmacDrbgContext,
@ -30,52 +48,28 @@ proc initLightClient*(
# for broadcasting light client data as a server.
let
optimisticProcessor = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock):
optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock):
Future[void] {.async.} =
debug "New LC optimistic block",
opt = signedBlock.toBlockId(),
dag = node.dag.head.bid,
wallSlot = node.currentSlot
return
optSync = initLCOptimisticSync(
node.network, getBeaconTime, optimisticProcessor,
config.safeSlotsToImportOptimistically)
optimisticProcessor = initOptimisticProcessor(
getBeaconTime, optimisticHandler)
lightClient = createLightClient(
node.network, rng, config, cfg, forkDigests, getBeaconTime,
genesis_validators_root, LightClientFinalizationMode.Strict)
if config.lightClientEnable:
proc shouldSyncOptimistically(slot: Slot): bool =
const
# Minimum number of slots to be ahead of DAG to use optimistic sync
minProgress = 8 * SLOTS_PER_EPOCH
# Maximum age of light client optimistic header to use optimistic sync
maxAge = 2 * SLOTS_PER_EPOCH
proc onFinalizedHeader(
lightClient: LightClient, finalizedHeader: BeaconBlockHeader) =
optimisticProcessor.setFinalizedHeader(finalizedHeader)
if slot < getStateField(node.dag.headState, slot) + minProgress:
false
elif getBeaconTime().slotOrZero > slot + maxAge:
false
else:
true
proc onFinalizedHeader(lightClient: LightClient) =
let optimisticHeader = lightClient.optimisticHeader.valueOr:
return
if not shouldSyncOptimistically(optimisticHeader.slot):
return
let finalizedHeader = lightClient.finalizedHeader.valueOr:
return
optSync.setOptimisticHeader(optimisticHeader)
optSync.setFinalizedHeader(finalizedHeader)
proc onOptimisticHeader(lightClient: LightClient) =
let optimisticHeader = lightClient.optimisticHeader.valueOr:
return
if not shouldSyncOptimistically(optimisticHeader.slot):
return
optSync.setOptimisticHeader(optimisticHeader)
proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: BeaconBlockHeader) =
optimisticProcessor.setOptimisticHeader(optimisticHeader)
lightClient.onFinalizedHeader = onFinalizedHeader
lightClient.onOptimisticHeader = onOptimisticHeader
@ -86,14 +80,13 @@ proc initLightClient*(
lightClientEnable = config.lightClientEnable,
lightClientTrustedBlockRoot = config.lightClientTrustedBlockRoot
node.lcOptSync = optSync
node.optimisticProcessor = optimisticProcessor
node.lightClient = lightClient
proc startLightClient*(node: BeaconNode) =
if not node.config.lightClientEnable:
return
node.lcOptSync.start()
node.lightClient.start()
proc installLightClientMessageValidators*(node: BeaconNode) =

View File

@ -133,12 +133,6 @@ type LightClientConf* = object
desc: "A file containing the hex-encoded 256 bit secret key to be used for verifying/generating JWT tokens"
name: "jwt-secret" .}: Option[string]
safeSlotsToImportOptimistically* {.
hidden
desc: "Modify SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY"
defaultValue: 128
name: "safe-slots-to-import-optimistically" .}: uint16
# Testing
stopAtEpoch* {.
hidden

View File

@ -1,309 +0,0 @@
# beacon_chain
# Copyright (c) 2019-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[deques, math],
chronicles,
../spec/forks,
../beacon_chain_db,
./block_pools_types
export forks, block_pools_types
logScope:
topics = "clearance"
# Clearance (light client)
# ---------------------------------------------
#
# This module validates blocks obtained using the light client sync protocol.
# Those blocks are considered trusted by delegating the full verification to a
# supermajority (> 2/3) of the corresponding sync committee (512 members).
# The validated blocks are downloaded in backwards order into a `deque`.
#
# If the sync committee is trusted, expensive verification already done by the
# sync committee may be skipped:
# - BLS signatures (except the outer block signature not covered by `root`)
# - Verifying whether the state transition function applies
# - `ExecutionPayload` verification
# - `state_root` computation and verification
type LCBlocks* = object
maxSlots: int # max cache.len
cache: Deque[ref ForkedMsgTrustedSignedBeaconBlock] # by slots descending
headSlot: Slot # matches cache[0].slot once block is downloaded
backfill: BeaconBlockSummary # next expected block
finalizedBid: BlockId
func initLCBlocks*(maxSlots: int): LCBlocks =
LCBlocks(
maxSlots: maxSlots,
cache: initDeque[ref ForkedMsgTrustedSignedBeaconBlock](
nextPowerOfTwo(maxSlots)),
headSlot: FAR_FUTURE_SLOT)
func getHeadSlot*(lcBlocks: LCBlocks): Slot =
lcBlocks.headSlot
func getFinalizedSlot*(lcBlocks: LCBlocks): Slot =
lcBlocks.finalizedBid.slot
func getFrontfillSlot*(lcBlocks: LCBlocks): Slot =
lcBlocks.headSlot + 1 - lcBlocks.cache.lenu64
func getBackfillSlot*(lcBlocks: LCBlocks): Slot =
if lcBlocks.backfill.slot != FAR_FUTURE_SLOT:
max(lcBlocks.backfill.slot, lcBlocks.getFrontfillSlot())
else:
lcBlocks.headSlot + 1
func getBackfillRoot*(lcBlocks: LCBlocks): Option[Eth2Digest] =
if lcBlocks.headSlot == FAR_FUTURE_SLOT:
none(Eth2Digest)
elif lcBlocks.backfill.slot < lcBlocks.getFrontfillSlot():
none(Eth2Digest)
else:
some lcBlocks.backfill.parent_root
func getCacheIndex(lcBlocks: LCBlocks, slot: Slot): uint64 =
if slot < lcBlocks.headSlot and lcBlocks.headSlot != FAR_FUTURE_SLOT:
lcBlocks.headSlot - slot
else:
0
func getBlockAtSlot*(
lcBlocks: LCBlocks, slot: Slot): Opt[ForkedMsgTrustedSignedBeaconBlock] =
if slot < lcBlocks.backfill.slot:
return err()
let index = lcBlocks.getCacheIndex(slot)
if index >= lcBlocks.cache.lenu64:
return err()
let existing = lcBlocks.cache[index]
if existing == nil:
return err()
return ok existing[]
func getLatestBlockThroughSlot*(
lcBlocks: LCBlocks, maxSlot: Slot): Opt[ForkedMsgTrustedSignedBeaconBlock] =
if maxSlot < lcBlocks.backfill.slot:
return err()
let startIndex = lcBlocks.getCacheIndex(maxSlot)
for i in startIndex ..< lcBlocks.cache.lenu64:
let blck = lcBlocks.cache[i]
if blck != nil:
return ok blck[]
err()
proc processBlock(
lcBlocks: var LCBlocks,
signedBlock: ForkySignedBeaconBlock,
isNewBlock = true): Result[void, BlockError] =
logScope:
headSlot = lcBlocks.headSlot
backfill = (lcBlocks.backfill.slot, shortLog(lcBlocks.backfill.parent_root))
blck = shortLog(signedBlock.toBlockId())
let startTick = Moment.now()
template blck(): untyped = signedBlock.message
template blockRoot(): untyped = signedBlock.root
if blck.slot > lcBlocks.headSlot:
debug "LC block too new"
return err(BlockError.Duplicate)
# Handle head block
if lcBlocks.backfill.slot == FAR_FUTURE_SLOT:
if blck.slot < lcBlocks.headSlot:
if isNewBlock:
debug "Head LC block skipped"
return err(BlockError.MissingParent)
if blockRoot != lcBlocks.backfill.parent_root:
if isNewBlock:
debug "Head LC block from unviable fork"
return err(BlockError.UnviableFork)
const index = 0'u64 # Head block is always mapped to index 0 (never empty)
if index >= lcBlocks.cache.lenu64:
lcBlocks.backfill.slot = blck.slot
debug "Final head LC block"
return ok()
lcBlocks.backfill = blck.toBeaconBlockSummary()
let existing = lcBlocks.cache[index]
if existing != nil:
if blockRoot == existing[].root:
if isNewBlock:
debug "Head LC block already known"
return ok()
warn "Head LC block reorg", existing = existing[]
lcBlocks.cache[index] =
newClone ForkedMsgTrustedSignedBeaconBlock.init(
signedBlock.asMsgTrusted())
debug "Head LC block cached", cacheDur = Moment.now() - startTick
return ok()
# Handle duplicate block
if blck.slot >= lcBlocks.getBackfillSlot():
let index = lcBlocks.getCacheIndex(blck.slot)
doAssert index < lcBlocks.cache.lenu64
let existing = lcBlocks.cache[index]
if existing == nil:
debug "Duplicate LC block for empty slot"
return err(BlockError.UnviableFork)
doAssert blck.slot == existing[].slot
if blockRoot != existing[].root:
debug "Duplicate LC block from unviable fork"
return err(BlockError.UnviableFork)
debug "Duplicate LC block"
return err(BlockError.Duplicate)
# Handle new block
if blck.slot > lcBlocks.backfill.slot:
debug "LC block for empty slot"
return err(BlockError.UnviableFork)
if blockRoot != lcBlocks.backfill.parent_root:
if blck.slot == lcBlocks.backfill.slot:
debug "Final LC block from unviable fork"
return err(BlockError.UnviableFork)
if isNewBlock:
debug "LC block does not match expected backfill root"
return err(BlockError.MissingParent)
if blck.slot == lcBlocks.backfill.slot:
debug "Duplicate final LC block"
return err(BlockError.Duplicate)
let
previousIndex = lcBlocks.getCacheIndex(lcBlocks.backfill.slot)
index = lcBlocks.getCacheIndex(blck.slot)
for i in previousIndex + 1 ..< min(index, lcBlocks.cache.lenu64):
let existing = lcBlocks.cache[i]
if existing != nil:
warn "LC block reorg to empty", existing = existing[]
lcBlocks.cache[i] = nil
if index >= lcBlocks.cache.lenu64:
lcBlocks.backfill.slot = blck.slot
debug "Final LC block"
return ok()
lcBlocks.backfill = blck.toBeaconBlockSummary()
let existing = lcBlocks.cache[index]
if existing != nil:
if blockRoot == existing[].root:
if isNewBlock:
debug "LC block already known"
return ok()
warn "LC block reorg", existing = existing[]
lcBlocks.cache[index] =
newClone ForkedMsgTrustedSignedBeaconBlock.init(
signedBlock.asMsgTrusted())
debug "LC block cached", cacheDur = Moment.now() - startTick
ok()
proc setHeadBid*(lcBlocks: var LCBlocks, headBid: BlockId) =
debug "New LC head block", headBid
if lcBlocks.maxSlots == 0:
discard
elif lcBlocks.headSlot == FAR_FUTURE_SLOT or
headBid.slot >= lcBlocks.headSlot + lcBlocks.maxSlots.uint64 or (
lcBlocks.headSlot - lcBlocks.cache.lenu64 != FAR_FUTURE_SLOT and
headBid.slot <= lcBlocks.headSlot - lcBlocks.cache.lenu64):
lcBlocks.cache.clear()
for i in 0 ..< min(headBid.slot + 1, lcBlocks.maxSlots.Slot).int:
lcBlocks.cache.addLast(nil)
elif headBid.slot > lcBlocks.headSlot:
let numNewSlots = headBid.slot - lcBlocks.headSlot
doAssert numNewSlots <= lcBlocks.maxSlots.uint64
if numNewSlots > lcBlocks.maxSlots.uint64 - lcBlocks.cache.lenu64:
lcBlocks.cache.shrink(
fromLast = numNewSlots.int + lcBlocks.cache.len - lcBlocks.maxSlots)
for i in 0 ..< numNewSlots:
lcBlocks.cache.addFirst(nil)
else:
lcBlocks.cache.shrink(fromFirst = (lcBlocks.headSlot - headBid.slot).int)
let startLen = lcBlocks.cache.len
for i in startLen ..< min(headBid.slot + 1, lcBlocks.maxSlots.Slot).int:
lcBlocks.cache.addLast(nil)
lcBlocks.headSlot = headBid.slot
lcBlocks.backfill.slot = FAR_FUTURE_SLOT
lcBlocks.backfill.parent_root = headBid.root
for i in 0 ..< lcBlocks.cache.len:
let existing = lcBlocks.cache[i]
if existing != nil:
let res =
withBlck(existing[]):
lcBlocks.processBlock(blck.asSigned(), isNewBlock = false)
if res.isErr:
break
proc setFinalizedBid*(lcBlocks: var LCBlocks, finalizedBid: BlockId) =
if finalizedBid.slot > lcBlocks.headSlot:
lcBlocks.setHeadBid(finalizedBid)
if finalizedBid != lcBlocks.finalizedBid:
debug "New LC finalized block", finalizedBid
lcBlocks.finalizedBid = finalizedBid
if finalizedBid.slot <= lcBlocks.headSlot and
finalizedBid.slot >= lcBlocks.getBackfillSlot:
let index = lcBlocks.getCacheIndex(finalizedBid.slot)
doAssert index < lcBlocks.cache.lenu64
let existing = lcBlocks.cache[index]
if existing == nil or finalizedBid.root != existing[].root:
if existing != nil:
error "Finalized LC block reorg", existing = existing[]
else:
error "Finalized LC block reorg"
lcBlocks.cache.clear()
lcBlocks.backfill.reset()
lcBlocks.headSlot.reset()
lcBlocks.setHeadBid(finalizedBid)
proc addBlock*(
lcBlocks: var LCBlocks,
signedBlock: ForkedSignedBeaconBlock): Result[void, BlockError] =
let oldBackfillSlot = lcBlocks.getBackfillSlot()
withBlck(signedBlock):
? lcBlocks.processBlock(blck)
if oldBackfillSlot > lcBlocks.finalizedBid.slot and
lcBlocks.getBackfillSlot() <= lcBlocks.finalizedBid.slot:
if signedBlock.slot != lcBlocks.finalizedBid.slot or
signedBlock.root != lcBlocks.finalizedBid.root:
error "LC finalized block from unviable fork"
lcBlocks.setFinalizedBid(lcBlocks.finalizedBid)
return err(BlockError.UnviableFork)
let slot = signedBlock.slot
for i in lcBlocks.getCacheIndex(slot) + 1 ..< lcBlocks.cache.lenu64:
let existing = lcBlocks.cache[i]
if existing != nil:
let res =
withBlck(existing[]):
lcBlocks.processBlock(blck.asSigned(), isNewBlock = false)
if res.isErr:
break
ok()

View File

@ -0,0 +1,216 @@
# beacon_chain
# Copyright (c) 2019-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronicles, chronos,
../spec/forks,
../beacon_clock,
./gossip_validation
from ./eth2_processor import ValidationRes
export gossip_validation
logScope:
topics = "gossip_opt"
const
# Maximum `blocks` to cache (not validated; deleted on new optimistic header)
maxBlocks = 16 # <= `GOSSIP_MAX_SIZE_BELLATRIX` (10 MB) each
# Maximum `seenBlocks` to cache (only used until a finalized block was seen)
maxSeenBlocks = 1024 # `Eth2Digest` each
# Minimum interval at which spam is logged
minLogInterval = chronos.seconds(5)
type
MsgTrustedBlockProcessor* =
proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): Future[void] {.
gcsafe, raises: [Defect].}
OptimisticProcessor* = ref object
getBeaconTime: GetBeaconTimeFn
optimisticVerifier: MsgTrustedBlockProcessor
seenBlocks: Option[HashSet[Eth2Digest]]
blocks: Table[Eth2Digest, ref ForkedSignedBeaconBlock]
latestOptimisticSlot: Slot
processFut: Future[void]
logMoment: Moment
proc initOptimisticProcessor*(
getBeaconTime: GetBeaconTimeFn,
optimisticVerifier: MsgTrustedBlockProcessor): OptimisticProcessor =
OptimisticProcessor(
getBeaconTime: getBeaconTime,
optimisticVerifier: optimisticVerifier,
seenBlocks: some(default(HashSet[Eth2Digest])))
proc validateBeaconBlock(
self: OptimisticProcessor,
signed_beacon_block: ForkySignedBeaconBlock,
wallTime: BeaconTime): Result[void, ValidationError] =
## Minimally validate a block for potential relevance.
if not (signed_beacon_block.message.slot <=
(wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero):
return errIgnore("BeaconBlock: slot too high")
if signed_beacon_block.message.slot <= self.latestOptimisticSlot:
return errIgnore("BeaconBlock: no significant progress")
if not signed_beacon_block.message.is_execution_block():
return errIgnore("BeaconBlock: no execution block")
ok()
proc processSignedBeaconBlock*(
self: OptimisticProcessor,
signedBlock: ForkySignedBeaconBlock): ValidationRes =
let
wallTime = self.getBeaconTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
logScope:
blockRoot = shortLog(signedBlock.root)
blck = shortLog(signedBlock.message)
signature = shortLog(signedBlock.signature)
wallSlot
if not afterGenesis:
notice "Optimistic block before genesis"
return errIgnore("Block before genesis")
# Potential under/overflows are fine; would just create odd metrics and logs
let delay = wallTime - signedBlock.message.slot.start_beacon_time
# Start of block processing - in reality, we have already gone through SSZ
# decoding at this stage, which may be significant
debug "Optimistic block received", delay
let v = self.validateBeaconBlock(signedBlock, wallTime)
if v.isErr:
debug "Dropping optimistic block", error = v.error
return err(v.error)
# Note that validation of blocks is delayed by ~4/3 slots because we have to
# wait for the sync committee to sign the correct block and for that signature
# to be included in the next block. Therefore, we skip block validation here
# and cache the block in memory. Because there is no validation, we have to
# mitigate against bogus blocks, mostly by bounding the caches. Assuming that
# any denial-of-service attacks eventually subside, care is taken to recover.
template logWithSpamProtection(body: untyped): untyped =
block:
let now = Moment.now()
if self.logMoment + minLogInterval <= now:
logScope: minLogInterval
body
self.logMoment = now
# Update `seenBlocks` (this is only used until a finalized block is seen)
let parentSeen =
self.seenBlocks.isNone or
self.seenBlocks.get.contains(signedBlock.message.parent_root)
if self.seenBlocks.isSome:
# If `seenBlocks` is full, we got spammed with too many blocks,
# or the finalized epoch boundary blocks or finalized header advancements
# have been all withheld from us, in which case the `seenBlocks` mechanism
# could not be marked obsolete.
# Mitigation: Randomly delete half of `seenBlocks` and hope that the root
# of the next finalized header is still in there when it arrives.
if self.seenBlocks.get.len >= maxSeenBlocks:
logWithSpamProtection:
error "`seenBlocks` full - pruning", maxSeenBlocks
var rootsToDelete = newSeqOfCap[Eth2Digest](maxSeenBlocks div 2)
for root in self.seenBlocks.get:
rootsToDelete.add root
for root in rootsToDelete:
self.seenBlocks.get.excl root
self.seenBlocks.get.incl signedBlock.root
# Store block for later verification (only if parent has execution enabled)
if parentSeen and not self.blocks.hasKey(signedBlock.root):
# If `blocks` is full, we got spammed with multiple blocks for a slot,
# of the optimistic header advancements have been all withheld from us.
# Whenever the optimistic header advances, old blocks are cleared,
# so we can simply ignore additional spam blocks until that happens.
if self.blocks.len >= maxBlocks:
logWithSpamProtection:
error "`blocks` full - ignoring", maxBlocks
else:
self.blocks[signedBlock.root] =
newClone(ForkedSignedBeaconBlock.init(signedBlock))
# Block validation is delegated to the sync committee and is done with delay.
# If we forward invalid spam blocks, we may be disconnected + IP banned,
# so we avoid accepting any blocks. Since we don't meaningfully contribute
# to the blocks gossip, we may also accummulate negative peer score over time.
# However, we are actively contributing to other topics, so some of the
# negative peer score may be offset through those different topics.
# The practical impact depends on the actually deployed scoring heuristics.
trace "Optimistic block cached"
return errIgnore("Validation delegated to sync committee")
proc setOptimisticHeader*(
self: OptimisticProcessor, optimisticHeader: BeaconBlockHeader) =
# If irrelevant, skip processing
if optimisticHeader.slot <= self.latestOptimisticSlot:
return
self.latestOptimisticSlot = optimisticHeader.slot
# Delete blocks that are no longer of interest
let blockRoot = optimisticHeader.hash_tree_root()
var
rootsToDelete: seq[Eth2Digest]
signedBlock: ref ForkedMsgTrustedSignedBeaconBlock
for root, blck in self.blocks:
if root == blockRoot:
signedBlock = blck.asMsgTrusted()
if blck[].slot <= optimisticHeader.slot:
rootsToDelete.add root
for root in rootsToDelete:
self.blocks.del root
# Block must be known
if signedBlock == nil:
return
# Parent must be execution block or block must be deep (irrelevant for gossip)
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.3/sync/optimistic.md#helpers
let parentIsExecutionBlock =
self.seenBlocks.isNone or
self.seenBlocks.get.contains(optimisticHeader.parent_root)
if not parentIsExecutionBlock:
return
# If a block is already being processed, skip (backpressure)
if self.processFut != nil:
return
self.processFut = self.optimisticVerifier(signedBlock[])
proc handleFinishedProcess(future: pointer) =
self.processFut = nil
self.processFut.addCallback(handleFinishedProcess)
proc setFinalizedHeader*(
self: OptimisticProcessor, finalizedHeader: BeaconBlockHeader) =
# Once an execution block finalizes, all followup blocks are execution blocks
if self.seenBlocks.isNone:
return
# If the finalized block is an execution block, disable `seenBlocks` tracking
let blockRoot = finalizedHeader.hash_tree_root()
if self.seenBlocks.get.contains(blockRoot):
debug "Finalized execution block seen",
finalized_header = shortLog(finalizedHeader)
self.seenBlocks.reset()

View File

@ -14,7 +14,7 @@ import
chronicles,
eth/keys,
./gossip_processing/light_client_processor,
./networking/eth2_network,
./networking/[eth2_network, topic_params],
./spec/datatypes/altair,
./spec/helpers,
./sync/light_client_manager,
@ -25,8 +25,9 @@ export LightClientFinalizationMode, eth2_network, conf_light_client
logScope: topics = "lightcl"
type
LightClientCallback* =
proc(lightClient: LightClient) {.gcsafe, raises: [Defect].}
LightClientHeaderCallback* =
proc(lightClient: LightClient, header: BeaconBlockHeader) {.
gcsafe, raises: [Defect].}
LightClient* = ref object
network: Eth2Node
@ -37,7 +38,7 @@ type
processor: ref LightClientProcessor
manager: LightClientManager
gossipState: GossipState
onFinalizedHeader*, onOptimisticHeader*: LightClientCallback
onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback
trustedBlockRoot*: Option[Eth2Digest]
func finalizedHeader*(lightClient: LightClient): Opt[BeaconBlockHeader] =
@ -78,11 +79,13 @@ proc createLightClient(
proc onFinalizedHeader() =
if lightClient.onFinalizedHeader != nil:
lightClient.onFinalizedHeader(lightClient)
lightClient.onFinalizedHeader(
lightClient, lightClient.finalizedHeader.get)
proc onOptimisticHeader() =
if lightClient.onOptimisticHeader != nil:
lightClient.onOptimisticHeader(lightClient)
lightClient.onOptimisticHeader(
lightClient, lightClient.optimisticHeader.get)
lightClient.processor = LightClientProcessor.new(
dumpEnabled, dumpDirInvalid, dumpDirIncoming,
@ -178,11 +181,6 @@ proc resetToFinalizedHeader*(
import metrics
from
libp2p/protocols/pubsub/gossipsub
import
TopicParams, validateParameters, init
from
./gossip_processing/eth2_processor
import
@ -297,9 +295,6 @@ proc installMessageValidators*(
proc(msg: altair.LightClientOptimisticUpdate): ValidationResult =
validate(msg, processLightClientOptimisticUpdate))
const lightClientTopicParams = TopicParams.init()
static: lightClientTopicParams.validateParameters().tryGet()
proc updateGossipStatus*(
lightClient: LightClient, slot: Slot, dagIsBehind = default(Option[bool])) =
template cfg(): auto = lightClient.cfg
@ -362,9 +357,9 @@ proc updateGossipStatus*(
let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork)
lightClient.network.subscribe(
getLightClientFinalityUpdateTopic(forkDigest),
lightClientTopicParams)
basicParams)
lightClient.network.subscribe(
getLightClientOptimisticUpdateTopic(forkDigest),
lightClientTopicParams)
basicParams)
lightClient.gossipState = targetGossipState

View File

@ -0,0 +1,67 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import chronos
from
libp2p/protocols/pubsub/gossipsub
import
TopicParams, validateParameters, init
# inspired by lighthouse research here
# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py
const
blocksTopicParams* = TopicParams(
topicWeight: 0.5,
timeInMeshWeight: 0.03333333333333333,
timeInMeshQuantum: chronos.seconds(12),
timeInMeshCap: 300,
firstMessageDeliveriesWeight: 1.1471603557060206,
firstMessageDeliveriesDecay: 0.9928302477768374,
firstMessageDeliveriesCap: 34.86870846001471,
meshMessageDeliveriesWeight: -458.31054878249114,
meshMessageDeliveriesDecay: 0.9716279515771061,
meshMessageDeliveriesThreshold: 0.6849191409056553,
meshMessageDeliveriesCap: 2.054757422716966,
meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesWindow: chronos.seconds(2),
meshFailurePenaltyWeight: -458.31054878249114 ,
meshFailurePenaltyDecay: 0.9716279515771061,
invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesDecay: 0.9971259067705325
)
aggregateTopicParams* = TopicParams(
topicWeight: 0.5,
timeInMeshWeight: 0.03333333333333333,
timeInMeshQuantum: chronos.seconds(12),
timeInMeshCap: 300,
firstMessageDeliveriesWeight: 0.10764904539552399,
firstMessageDeliveriesDecay: 0.8659643233600653,
firstMessageDeliveriesCap: 371.5778421725158,
meshMessageDeliveriesWeight: -0.07538533073670682,
meshMessageDeliveriesDecay: 0.930572040929699,
meshMessageDeliveriesThreshold: 53.404248450179836,
meshMessageDeliveriesCap: 213.61699380071934,
meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesWindow: chronos.seconds(2),
meshFailurePenaltyWeight: -0.07538533073670682 ,
meshFailurePenaltyDecay: 0.930572040929699,
invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesDecay: 0.9971259067705325
)
basicParams* = TopicParams.init()
static:
# compile time validation
blocksTopicParams.validateParameters().tryGet()
aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet()

View File

@ -18,6 +18,7 @@ import
eth/p2p/discoveryv5/[enr, random2],
eth/keys,
./consensus_object_pools/vanity_logs/pandas,
./networking/topic_params,
./rpc/[rest_api, state_ttl_cache],
./spec/datatypes/[altair, bellatrix, phase0],
./spec/[engine_authentication, weak_subjectivity],
@ -751,6 +752,7 @@ proc init*(T: type BeaconNode,
eventBus: eventBus,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
gossipState: {},
blocksGossipState: {},
beaconClock: beaconClock,
validatorMonitor: validatorMonitor,
stateTtlCache: stateTtlCache,
@ -835,60 +837,54 @@ proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
unsubscribeSubnets = subnetLog(unsubscribeSubnets),
gossipState = node.gossipState
# inspired by lighthouse research here
# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py
const
blocksTopicParams = TopicParams(
topicWeight: 0.5,
timeInMeshWeight: 0.03333333333333333,
timeInMeshQuantum: chronos.seconds(12),
timeInMeshCap: 300,
firstMessageDeliveriesWeight: 1.1471603557060206,
firstMessageDeliveriesDecay: 0.9928302477768374,
firstMessageDeliveriesCap: 34.86870846001471,
meshMessageDeliveriesWeight: -458.31054878249114,
meshMessageDeliveriesDecay: 0.9716279515771061,
meshMessageDeliveriesThreshold: 0.6849191409056553,
meshMessageDeliveriesCap: 2.054757422716966,
meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesWindow: chronos.seconds(2),
meshFailurePenaltyWeight: -458.31054878249114 ,
meshFailurePenaltyDecay: 0.9716279515771061,
invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesDecay: 0.9971259067705325
)
aggregateTopicParams = TopicParams(
topicWeight: 0.5,
timeInMeshWeight: 0.03333333333333333,
timeInMeshQuantum: chronos.seconds(12),
timeInMeshCap: 300,
firstMessageDeliveriesWeight: 0.10764904539552399,
firstMessageDeliveriesDecay: 0.8659643233600653,
firstMessageDeliveriesCap: 371.5778421725158,
meshMessageDeliveriesWeight: -0.07538533073670682,
meshMessageDeliveriesDecay: 0.930572040929699,
meshMessageDeliveriesThreshold: 53.404248450179836,
meshMessageDeliveriesCap: 213.61699380071934,
meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesWindow: chronos.seconds(2),
meshFailurePenaltyWeight: -0.07538533073670682 ,
meshFailurePenaltyDecay: 0.930572040929699,
invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesDecay: 0.9971259067705325
)
basicParams = TopicParams.init()
proc updateBlocksGossipStatus*(
node: BeaconNode, slot: Slot, dagIsBehind: bool) =
template cfg(): auto = node.dag.cfg
static:
# compile time validation
blocksTopicParams.validateParameters().tryGet()
aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet()
let
isBehind =
if node.shouldSyncOptimistically(slot):
# If optimistic sync is active, always subscribe to blocks gossip
false
else:
# Use DAG status to determine whether to subscribe for blocks gossip
dagIsBehind
targetGossipState = getTargetGossipState(
slot.epoch, cfg.ALTAIR_FORK_EPOCH, cfg.BELLATRIX_FORK_EPOCH, isBehind)
template currentGossipState(): auto = node.blocksGossipState
if currentGossipState == targetGossipState:
return
if currentGossipState.card == 0 and targetGossipState.card > 0:
debug "Enabling blocks topic subscriptions",
wallSlot = slot, targetGossipState
elif currentGossipState.card > 0 and targetGossipState.card == 0:
debug "Disabling blocks topic subscriptions",
wallSlot = slot
else:
# Individual forks added / removed
discard
let
newGossipForks = targetGossipState - currentGossipState
oldGossipForks = currentGossipState - targetGossipState
for gossipFork in oldGossipForks:
let forkDigest = node.dag.forkDigests[].atStateFork(gossipFork)
node.network.unsubscribe(getBeaconBlocksTopic(forkDigest))
for gossipFork in newGossipForks:
let forkDigest = node.dag.forkDigests[].atStateFork(gossipFork)
node.network.subscribe(
getBeaconBlocksTopic(forkDigest), blocksTopicParams,
enableTopicMetrics = true)
node.blocksGossipState = targetGossipState
proc addPhase0MessageHandlers(
node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.network.subscribe(
getBeaconBlocksTopic(forkDigest), blocksTopicParams,
enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams)
@ -899,7 +895,6 @@ proc addPhase0MessageHandlers(
# updateAttestationSubnetHandlers subscribes attestation subnets
proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(getBeaconBlocksTopic(forkDigest))
node.network.unsubscribe(getVoluntaryExitsTopic(forkDigest))
node.network.unsubscribe(getProposerSlashingsTopic(forkDigest))
node.network.unsubscribe(getAttesterSlashingsTopic(forkDigest))
@ -1161,6 +1156,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
node.gossipState = targetGossipState
node.updateAttestationSubnetHandlers(slot)
node.updateBlocksGossipStatus(slot, isBehind)
node.updateLightClientGossipStatus(slot, isBehind)
proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
@ -1418,8 +1414,12 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addValidator(
getBeaconBlocksTopic(forkDigests.phase0),
proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
if node.shouldSyncOptimistically(node.currentSlot):
toValidationResult(
node.optimisticProcessor.processSignedBeaconBlock(signedBlock))
else:
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
template installPhase0Validators(digest: auto) =
for it in SubnetId:
@ -1472,14 +1472,22 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addValidator(
getBeaconBlocksTopic(forkDigests.altair),
proc (signedBlock: altair.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
if node.shouldSyncOptimistically(node.currentSlot):
toValidationResult(
node.optimisticProcessor.processSignedBeaconBlock(signedBlock))
else:
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
node.network.addValidator(
getBeaconBlocksTopic(forkDigests.bellatrix),
proc (signedBlock: bellatrix.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
if node.shouldSyncOptimistically(node.currentSlot):
toValidationResult(
node.optimisticProcessor.processSignedBeaconBlock(signedBlock))
else:
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
template installSyncCommitteeeValidators(digest: auto) =
for subcommitteeIdx in SyncSubcommitteeIndex:

View File

@ -10,12 +10,15 @@ import
chronicles, chronicles/chronos_tools, chronos,
eth/keys,
./eth1/eth1_monitor,
./gossip_processing/optimistic_processor,
./networking/topic_params,
./spec/beaconstate,
./sync/optimistic_sync_light_client,
./spec/datatypes/[phase0, altair, bellatrix],
"."/[light_client, nimbus_binary_common, version]
from ./consensus_object_pools/consensus_manager import runForkchoiceUpdated
from ./gossip_processing/block_processor import newExecutionPayload
from ./gossip_processing/eth2_processor import toValidationResult
programMain:
var config = makeBannerAndConfig(
@ -68,7 +71,7 @@ programMain:
else:
nil
optimisticProcessor = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock):
optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock):
Future[void] {.async.} =
notice "New LC optimistic block",
opt = signedBlock.toBlockId(),
@ -90,9 +93,8 @@ programMain:
finalizedBlockRoot = ZERO_HASH)
else: discard
return
optSync = initLCOptimisticSync(
network, getBeaconTime, optimisticProcessor,
config.safeSlotsToImportOptimistically)
optimisticProcessor = initOptimisticProcessor(
getBeaconTime, optimisticHandler)
lightClient = createLightClient(
network, rng, config, cfg, forkDigests, getBeaconTime,
@ -100,49 +102,106 @@ programMain:
info "Listening to incoming network requests"
network.initBeaconSync(cfg, forkDigests, genesisBlockRoot, getBeaconTime)
network.addValidator(
getBeaconBlocksTopic(forkDigests.phase0),
proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult =
toValidationResult(
optimisticProcessor.processSignedBeaconBlock(signedBlock)))
network.addValidator(
getBeaconBlocksTopic(forkDigests.altair),
proc (signedBlock: altair.SignedBeaconBlock): ValidationResult =
toValidationResult(
optimisticProcessor.processSignedBeaconBlock(signedBlock)))
network.addValidator(
getBeaconBlocksTopic(forkDigests.bellatrix),
proc (signedBlock: bellatrix.SignedBeaconBlock): ValidationResult =
toValidationResult(
optimisticProcessor.processSignedBeaconBlock(signedBlock)))
lightClient.installMessageValidators()
waitFor network.startListening()
waitFor network.start()
proc shouldSyncOptimistically(slot: Slot): bool =
const
# Maximum age of light client optimistic header to use optimistic sync
maxAge = 2 * SLOTS_PER_EPOCH
if eth1Monitor == nil:
false
elif getBeaconTime().slotOrZero > slot + maxAge:
false
else:
true
proc onFinalizedHeader(lightClient: LightClient) =
proc onFinalizedHeader(
lightClient: LightClient, finalizedHeader: BeaconBlockHeader) =
info "New LC finalized header",
finalized_header = shortLog(lightClient.finalizedHeader.get)
let optimisticHeader = lightClient.optimisticHeader.valueOr:
return
if not shouldSyncOptimistically(optimisticHeader.slot):
return
let finalizedHeader = lightClient.finalizedHeader.valueOr:
return
optSync.setOptimisticHeader(optimisticHeader)
optSync.setFinalizedHeader(finalizedHeader)
finalized_header = shortLog(finalizedHeader)
optimisticProcessor.setFinalizedHeader(finalizedHeader)
proc onOptimisticHeader(lightClient: LightClient) =
proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: BeaconBlockHeader) =
info "New LC optimistic header",
optimistic_header = shortLog(lightClient.optimisticHeader.get)
let optimisticHeader = lightClient.optimisticHeader.valueOr:
return
if not shouldSyncOptimistically(optimisticHeader.slot):
return
optSync.setOptimisticHeader(optimisticHeader)
optimistic_header = shortLog(optimisticHeader)
optimisticProcessor.setOptimisticHeader(optimisticHeader)
lightClient.onFinalizedHeader = onFinalizedHeader
lightClient.onOptimisticHeader = onOptimisticHeader
lightClient.trustedBlockRoot = some config.trustedBlockRoot
var nextExchangeTransitionConfTime: Moment
# Full blocks gossip is required to portably drive an EL client:
# - EL clients may not sync when only driven with `forkChoiceUpdated`
# - `newPayload` requires the full `ExecutionPayload` (most of block content)
# - `ExecutionPayload` block root is not available in `BeaconBlockHeader`,
# so won't be exchanged via light client gossip
#
# Future `ethereum/consensus-specs` versions may remove need for full blocks.
# Therefore, this current mechanism is to be seen as temporary; it is not
# optimized for reducing code duplication, e.g., with `nimbus_beacon_node`.
func shouldSyncOptimistically(wallSlot: Slot): bool =
# Check whether an EL is connected
if eth1Monitor == nil:
return false
# Check whether light client is used
let optimisticHeader = lightClient.optimisticHeader.valueOr:
return false
# Check whether light client has synced sufficiently close to wall slot
const maxAge = 2 * SLOTS_PER_EPOCH
if optimisticHeader.slot < max(wallSlot, maxAge.Slot) - maxAge:
return false
true
var blocksGossipState: GossipState = {}
proc updateBlocksGossipStatus(slot: Slot) =
let
isBehind = not shouldSyncOptimistically(slot)
targetGossipState = getTargetGossipState(
slot.epoch, cfg.ALTAIR_FORK_EPOCH, cfg.BELLATRIX_FORK_EPOCH, isBehind)
template currentGossipState(): auto = blocksGossipState
if currentGossipState == targetGossipState:
return
if currentGossipState.card == 0 and targetGossipState.card > 0:
debug "Enabling blocks topic subscriptions",
wallSlot = slot, targetGossipState
elif currentGossipState.card > 0 and targetGossipState.card == 0:
debug "Disabling blocks topic subscriptions",
wallSlot = slot
else:
# Individual forks added / removed
discard
let
newGossipForks = targetGossipState - currentGossipState
oldGossipForks = currentGossipState - targetGossipState
for gossipFork in oldGossipForks:
let forkDigest = forkDigests[].atStateFork(gossipFork)
network.unsubscribe(getBeaconBlocksTopic(forkDigest))
for gossipFork in newGossipForks:
let forkDigest = forkDigests[].atStateFork(gossipFork)
network.subscribe(
getBeaconBlocksTopic(forkDigest), blocksTopicParams,
enableTopicMetrics = true)
blocksGossipState = targetGossipState
var nextExchangeTransitionConfTime: Moment
proc onSecond(time: Moment) =
# engine_exchangeTransitionConfigurationV1
if time > nextExchangeTransitionConfTime and eth1Monitor != nil:
@ -153,6 +212,7 @@ programMain:
if checkIfShouldStopAtEpoch(wallSlot, config.stopAtEpoch):
quit(0)
updateBlocksGossipStatus(wallSlot + 1)
lightClient.updateGossipStatus(wallSlot + 1)
proc runOnSecondLoop() {.async.} =
@ -168,7 +228,6 @@ programMain:
trace "onSecond task completed", sleepTime, processingTime
onSecond(Moment.now())
optSync.start()
lightClient.start()
asyncSpawn runOnSecondLoop()

View File

@ -924,10 +924,20 @@ func getSizeofSig(x: auto, n: int = 0): seq[(string, int, int)] =
## see https://github.com/status-im/nimbus-eth2/pull/2250#discussion_r562010679
template isomorphicCast*[T, U](x: U): T =
# Each of these pairs of types has ABI-compatible memory representations.
static:
doAssert sizeof(T) == sizeof(U)
doAssert getSizeofSig(T()) == getSizeofSig(U())
cast[ptr T](unsafeAddr x)[]
static: doAssert (T is ref) == (U is ref)
when T is ref:
type
TT = typeof default(typeof T)[]
UU = typeof default(typeof U)[]
static:
doAssert sizeof(TT) == sizeof(UU)
doAssert getSizeofSig(TT()) == getSizeofSig(UU())
cast[T](x)
else:
static:
doAssert sizeof(T) == sizeof(U)
doAssert getSizeofSig(T()) == getSizeofSig(U())
cast[ptr T](unsafeAddr x)[]
func prune*(cache: var StateCache, epoch: Epoch) =
# Prune all cache information that is no longer relevant in order to process

View File

@ -422,19 +422,40 @@ template atEpoch*(
template asSigned*(
x: ForkedMsgTrustedSignedBeaconBlock |
ForkedTrustedSignedBeaconBlock): ForkedSignedBeaconBlock =
ForkedTrustedSignedBeaconBlock
): ForkedSignedBeaconBlock =
isomorphicCast[ForkedSignedBeaconBlock](x)
template asSigned*(
x: ref ForkedMsgTrustedSignedBeaconBlock |
ref ForkedTrustedSignedBeaconBlock
): ref ForkedSignedBeaconBlock =
isomorphicCast[ref ForkedSignedBeaconBlock](x)
template asMsgTrusted*(
x: ForkedSignedBeaconBlock |
ForkedTrustedSignedBeaconBlock): ForkedMsgTrustedSignedBeaconBlock =
ForkedTrustedSignedBeaconBlock
): ForkedMsgTrustedSignedBeaconBlock =
isomorphicCast[ForkedMsgTrustedSignedBeaconBlock](x)
template asMsgTrusted*(
x: ref ForkedSignedBeaconBlock |
ref ForkedTrustedSignedBeaconBlock
): ref ForkedMsgTrustedSignedBeaconBlock =
isomorphicCast[ref ForkedMsgTrustedSignedBeaconBlock](x)
template asTrusted*(
x: ForkedSignedBeaconBlock |
ForkedMsgTrustedSignedBeaconBlock): ForkedTrustedSignedBeaconBlock =
ForkedMsgTrustedSignedBeaconBlock
): ForkedTrustedSignedBeaconBlock =
isomorphicCast[ForkedTrustedSignedBeaconBlock](x)
template asTrusted*(
x: ref ForkedSignedBeaconBlock |
ref ForkedMsgTrustedSignedBeaconBlock
): ref ForkedTrustedSignedBeaconBlock =
isomorphicCast[ref ForkedTrustedSignedBeaconBlock](x)
template withBlck*(
x: ForkedBeaconBlock | Web3SignerForkedBeaconBlock |
ForkedSignedBeaconBlock | ForkedMsgTrustedSignedBeaconBlock |

View File

@ -1,301 +0,0 @@
# beacon_chain
# Copyright (c) 2019-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
../consensus_object_pools/block_clearance_light_client,
../networking/eth2_network,
../beacon_clock,
./request_manager
logScope:
topics = "optsync"
type
MsgTrustedBlockProcessor* =
proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): Future[void] {.
gcsafe, raises: [Defect].}
SyncStrategy {.pure.} = enum
None,
RequestManager,
SyncManager
LCOptimisticSync* = ref object
network: Eth2Node
getBeaconTime: GetBeaconTimeFn
optimisticProcessor: MsgTrustedBlockProcessor
safeSlotsToImportOptimistically: uint16
lcBlocks: LCBlocks
blockVerifier: request_manager.BlockVerifier
requestManager: RequestManager
finalizedBid, optimisticBid: BlockId
lastReportedSlot: Slot
finalizedIsExecutionBlock: Option[bool]
syncStrategy: SyncStrategy
syncFut, processFut: Future[void]
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md
proc reportOptimisticCandidateBlock(optSync: LCOptimisticSync) {.gcsafe.} =
if optSync.processFut != nil:
return
# Check if finalized is execution block (implies that justified is, too)
if optSync.finalizedIsExecutionBlock.isNone:
let
finalizedSlot = optSync.lcBlocks.getFinalizedSlot()
finalizedBlock = optSync.lcBlocks.getBlockAtSlot(finalizedSlot)
if finalizedBlock.isOk:
optSync.finalizedIsExecutionBlock =
withBlck(finalizedBlock.get):
some blck.message.is_execution_block()
let
currentSlot = optSync.lcBlocks.getHeadSlot()
maxSlot =
if optSync.finalizedIsExecutionBlock.get(false):
# If finalized is execution block, can import any later block
currentSlot
else:
# Else, block must be deep (min `SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY`)
let minAge = optSync.safeSlotsToImportOptimistically
max(currentSlot, minAge.Slot) - minAge.uint64
if maxSlot > optSync.lastReportedSlot:
const minGapSize = SLOTS_PER_EPOCH
var signedBlock: Opt[ForkedMsgTrustedSignedBeaconBlock]
if maxSlot - optSync.lastReportedSlot >= minGapSize:
# Large gap, skip to latest
signedBlock = optSync.lcBlocks.getLatestBlockThroughSlot(maxSlot)
elif optSync.lcBlocks.getFrontfillSlot() <= optSync.lastReportedSlot + 1 and
optSync.lcBlocks.getBackfillSlot() > optSync.lastReportedSlot + 1:
# Small gap, but still downloading
discard
else:
# Report next sequential block (even if it is slightly outdated)
for slot in optSync.lastReportedSlot + 1 .. maxSlot:
signedBlock = optSync.lcBlocks.getBlockAtSlot(slot)
if signedBlock.isOk:
break
if signedBlock.isOk and signedBlock.get.slot > optSync.lastReportedSlot:
optSync.lastReportedSlot = signedBlock.get.slot
optSync.processFut = optSync.optimisticProcessor(signedBlock.get)
proc handleFinishedProcess(future: pointer) =
optSync.processFut = nil
optSync.reportOptimisticCandidateBlock()
optSync.processFut.addCallback(handleFinishedProcess)
proc initLCOptimisticSync*(
network: Eth2Node,
getBeaconTime: GetBeaconTimeFn,
optimisticProcessor: MsgTrustedBlockProcessor,
safeSlotsToImportOptimistically: uint16): LCOptimisticSync =
const numExtraSlots = 2 * SLOTS_PER_EPOCH.int + 1
let maxSlots = safeSlotsToImportOptimistically.int + numExtraSlots
let optSync = LCOptimisticSync(
network: network,
getBeaconTime: getBeaconTime,
optimisticProcessor: optimisticProcessor,
safeSlotsToImportOptimistically: safeSlotsToImportOptimistically,
lcBlocks: initLCBlocks(maxSlots))
proc blockVerifier(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] =
let res = optSync.lcBlocks.addBlock(signedBlock)
if res.isOk:
if optSync.syncStrategy == SyncStrategy.RequestManager:
let root = optSync.lcBlocks.getBackfillRoot()
if root.isSome:
optSync.requestManager.fetchAncestorBlocks(
@[FetchRecord(root: root.get)])
else:
if not optSync.syncFut.finished:
optSync.syncFut.cancel()
optSync.reportOptimisticCandidateBlock()
let resfut = newFuture[Result[void, BlockError]]("lcOptSyncBlockVerifier")
resfut.complete(res)
resfut
optSync.blockVerifier = blockVerifier
optSync.requestManager = RequestManager.init(network, optSync.blockVerifier)
optSync
proc start*(optSync: LCOptimisticSync) =
optSync.requestManager.start()
func supportsRetarget(syncStrategy: SyncStrategy): bool =
case syncStrategy
of SyncStrategy.None, SyncStrategy.RequestManager:
true
of SyncStrategy.SyncManager:
false
proc syncUsingRequestManager(optSync: LCOptimisticSync) {.async.} =
let startTick = Moment.now()
var cancellationRequested = false
while not cancellationRequested:
let root = optSync.lcBlocks.getBackfillRoot()
if root.isNone:
break
if optSync.requestManager.inpQueue.empty:
optSync.requestManager.fetchAncestorBlocks(@[FetchRecord(root: root.get)])
try:
await chronos.sleepAsync(chronos.seconds(10))
except CancelledError as exc:
cancellationRequested = true
debug "LC optimistic sync complete",
headSlot = optSync.lcBlocks.getHeadSlot(),
finalizedSlot = optSync.lcBlocks.getFinalizedSlot(),
backfillSlot = optSync.lcBlocks.getBackfillSlot(),
frontfillSlot = optSync.lcBlocks.getFrontfillSlot(),
syncStrategy = optSync.syncStrategy,
cancellationRequested,
syncDur = Moment.now() - startTick
proc syncUsingSyncManager(optSync: LCOptimisticSync) {.async.} =
let startTick = Moment.now()
func getLocalHeadSlot(): Slot =
optSync.lcBlocks.getHeadSlot() + 1
proc getLocalWallSlot(): Slot =
optSync.getBeaconTime().slotOrZero
var cancellationRequested = false
func getProgressSlot(): Slot =
if not cancellationRequested:
optSync.lcBlocks.getBackfillSlot()
else:
# Report out-of-band completion of sync
optSync.lcBlocks.getFrontfillSlot()
func getFinalizedSlot(): Slot =
getProgressSlot()
func getBackfillSlot(): Slot =
getProgressSlot()
func getFrontfillSlot(): Slot =
optSync.lcBlocks.getFrontfillSlot()
let lcOptSyncManager = newSyncManager[Peer, PeerID](
optSync.network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFinalizedSlot, getBackfillSlot, getFrontfillSlot,
progressPivot = optSync.lcBlocks.getHeadSlot(), optSync.blockVerifier,
maxHeadAge = 0, flags = {SyncManagerFlag.NoMonitor}, ident = "lcOptSync")
lcOptSyncManager.start()
while lcOptSyncManager.inProgress:
try:
await chronos.sleepAsync(chronos.seconds(10))
except CancelledError as exc:
cancellationRequested = true
debug "LC optimistic sync complete",
headSlot = optSync.lcBlocks.getHeadSlot(),
finalizedSlot = optSync.lcBlocks.getFinalizedSlot(),
backfillSlot = optSync.lcBlocks.getBackfillSlot(),
frontfillSlot = optSync.lcBlocks.getFrontfillSlot(),
syncStrategy = optSync.syncStrategy,
cancellationRequested,
syncDur = Moment.now() - startTick
proc continueSync(optSync: LCOptimisticSync) {.gcsafe.} =
let
currentHeadSlot = optSync.lcBlocks.getHeadSlot()
targetHeadSlot = optSync.optimisticBid.slot
headDiff =
if targetHeadSlot > currentHeadSlot:
targetHeadSlot - currentHeadSlot
else:
currentHeadSlot - targetHeadSlot
currentFinalizedSlot = optSync.lcBlocks.getFinalizedSlot()
targetFinalizedSlot = optSync.finalizedBid.slot
backfillSlot = optSync.lcBlocks.getBackfillSlot()
frontfillSlot = optSync.lcBlocks.getFrontfillSlot()
syncDistance =
if backfillSlot > frontfillSlot:
backfillSlot - frontfillSlot
else:
0
# If sync is complete, work is done
if currentHeadSlot == targetHeadSlot and
currentFinalizedSlot == targetFinalizedSlot and
syncDistance == 0:
return
# Cancel ongoing sync if sync target jumped
if headDiff >= SLOTS_PER_EPOCH and optSync.syncFut != nil:
if not optSync.syncFut.finished:
optSync.syncFut.cancel()
return
# When retargeting ongoing sync is not possible, cancel on finality change
if not optSync.syncStrategy.supportsRetarget:
if currentFinalizedSlot != targetFinalizedSlot and optSync.syncFut != nil:
if not optSync.syncFut.finished:
optSync.syncFut.cancel()
return
# Set new sync target
let
finalizedBid = optSync.finalizedBid
optimisticBid = optSync.optimisticBid
doAssert optimisticBid.slot >= finalizedBid.slot
if optSync.lcBlocks.getHeadSlot() != optimisticBid.slot:
optSync.lcBlocks.setHeadBid(optimisticBid)
if optSync.lcBlocks.getFinalizedSlot() != finalizedBid.slot:
optSync.lcBlocks.setFinalizedBid(finalizedBid)
optSync.finalizedIsExecutionBlock.reset()
optSync.reportOptimisticCandidateBlock()
if optSync.syncFut == nil:
# Select sync strategy
optSync.syncFut =
if headDiff >= SLOTS_PER_EPOCH:
optSync.syncStrategy = SyncStrategy.SyncManager
optSync.syncUsingSyncManager()
else:
optSync.syncStrategy = SyncStrategy.RequestManager
optSync.syncUsingRequestManager()
# Continue syncing until complete
proc handleFinishedSync(future: pointer) =
optSync.syncStrategy.reset()
optSync.syncFut = nil
optSync.continueSync()
optSync.syncFut.addCallback(handleFinishedSync)
proc setOptimisticHeader*(
optSync: LCOptimisticSync, optimisticHeader: BeaconBlockHeader) =
optSync.optimisticBid = optimisticHeader.toBlockId
optSync.continueSync()
proc setFinalizedHeader*(
optSync: LCOptimisticSync, finalizedHeader: BeaconBlockHeader) =
optSync.finalizedBid = finalizedHeader.toBlockId
if optSync.finalizedBid.slot > optSync.optimisticBid.slot:
optSync.optimisticBid = optSync.finalizedBid
optSync.continueSync()

View File

@ -145,20 +145,32 @@ After a while, the light client will pick up beacon block headers from the Ether
### Nimbus
```
NOT 2022-07-24 21:57:57.537+02:00 Starting light client topics="lightcl" trusted_block_root=Some(f013a6f35bdfcffbf9cf8919c48bc0afb7720fb9c61f62a3659d7359f52386c4)
NOT 2022-08-20 14:56:58.063+02:00 Starting light client topics="lightcl" trusted_block_root=Some(e734eae428acd2e5ab3fb9a6db04926e5cc597a6f3d3b94835b051859539adfa)
...
INF 2022-07-24 22:07:59.892+02:00 New LC optimistic header optimistic_header="(slot: 396960, proposer_index: 90824, parent_root: \"77d30de6\", state_root: \"9c7343a0\")"
INF 2022-07-24 22:07:59.892+02:00 New LC finalized header finalized_header="(slot: 396960, proposer_index: 90824, parent_root: \"77d30de6\", state_root: \"9c7343a0\")"
INF 2022-07-24 22:08:03.962+02:00 New LC optimistic header optimistic_header="(slot: 397539, proposer_index: 97474, parent_root: \"063c998d\", state_root: \"0f790eaf\")"
WRN 2022-07-24 22:08:09.217+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=2 new_peers=@[] current_peers=11 wanted_peers=160
INF 2022-07-24 22:08:15.961+02:00 New LC optimistic header optimistic_header="(slot: 397540, proposer_index: 56720, parent_root: \"812d4790\", state_root: \"b846e95e\")"
INF 2022-07-24 22:08:27.961+02:00 New LC optimistic header optimistic_header="(slot: 397541, proposer_index: 65758, parent_root: \"725e435d\", state_root: \"559fd631\")"
INF 2022-07-24 22:08:39.960+02:00 New LC optimistic header optimistic_header="(slot: 397542, proposer_index: 90389, parent_root: \"903645d6\", state_root: \"873c9904\")"
WRN 2022-07-24 22:08:49.503+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=1 new_peers=@[] current_peers=11 wanted_peers=160
INF 2022-07-24 22:08:51.960+02:00 New LC optimistic header optimistic_header="(slot: 397543, proposer_index: 73061, parent_root: \"1abdfcd1\", state_root: \"c8ee813c\")"
WRN 2022-07-24 22:08:55.097+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=11 wanted_peers=160
INF 2022-07-24 22:09:03.961+02:00 New LC optimistic header optimistic_header="(slot: 397544, proposer_index: 62086, parent_root: \"4797507d\", state_root: \"60815f6a\")"
NOT 2022-07-24 22:09:05.069+02:00 New LC optimistic block opt=c6cf8526:397409 wallSlot=397545
INF 2022-08-20 15:04:07.674+02:00 New LC optimistic header optimistic_header="(slot: 1600, proposer_index: 158, parent_root: \"5692b969\", state_root: \"06befac2\")"
INF 2022-08-20 15:04:07.674+02:00 New LC finalized header finalized_header="(slot: 1600, proposer_index: 158, parent_root: \"5692b969\", state_root: \"06befac2\")"
INF 2022-08-20 15:04:08.041+02:00 New LC optimistic header optimistic_header="(slot: 3119, proposer_index: 1408, parent_root: \"f42c6c38\", state_root: \"b7cd7a87\")"
INF 2022-08-20 15:04:08.041+02:00 New LC finalized header finalized_header="(slot: 3040, proposer_index: 263, parent_root: \"5df53d22\", state_root: \"bed3164c\")"
...
INF 2022-08-20 15:04:08.207+02:00 New LC optimistic header optimistic_header="(slot: 432829, proposer_index: 1003, parent_root: \"2f847459\", state_root: \"5d9bbf00\")"
INF 2022-08-20 15:04:08.207+02:00 New LC finalized header finalized_header="(slot: 432736, proposer_index: 579, parent_root: \"23dd3358\", state_root: \"7273da0b\")"
WRN 2022-08-20 15:04:08.356+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=15 wanted_peers=160
INF 2022-08-20 15:04:15.984+02:00 New LC optimistic header optimistic_header="(slot: 438920, proposer_index: 1776, parent_root: \"81e3f439\", state_root: \"94298e8c\")"
WRN 2022-08-20 15:04:35.212+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=16 wanted_peers=160
INF 2022-08-20 15:04:39.979+02:00 New LC optimistic header optimistic_header="(slot: 438921, proposer_index: 163, parent_root: \"9fc27396\", state_root: \"3ff1d624\")"
INF 2022-08-20 15:04:51.982+02:00 New LC optimistic header optimistic_header="(slot: 438923, proposer_index: 706, parent_root: \"8112e2f5\", state_root: \"a0628d4a\")"
WRN 2022-08-20 15:04:54.156+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=16 wanted_peers=160
WRN 2022-08-20 15:05:03.161+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=1 new_peers=@[] current_peers=16 wanted_peers=160
INF 2022-08-20 15:05:03.987+02:00 New LC optimistic header optimistic_header="(slot: 438924, proposer_index: 1522, parent_root: \"3ff23c0c\", state_root: \"2de6d378\")"
NOT 2022-08-20 15:05:03.987+02:00 New LC optimistic block opt=69449681:438924 wallSlot=438925
WRN 2022-08-20 15:05:08.668+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=16 wanted_peers=160
WRN 2022-08-20 15:05:24.971+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=17 wanted_peers=160
WRN 2022-08-20 15:05:30.264+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=17 wanted_peers=160
INF 2022-08-20 15:05:39.982+02:00 New LC optimistic header optimistic_header="(slot: 438925, proposer_index: 1275, parent_root: \"69449681\", state_root: \"b1a6c3d6\")"
NOT 2022-08-20 15:05:39.983+02:00 New LC optimistic block opt=935c35e8:438925 wallSlot=438928
WRN 2022-08-20 15:05:42.601+02:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=18 wanted_peers=160
INF 2022-08-20 15:05:51.982+02:00 New LC optimistic header optimistic_header="(slot: 438928, proposer_index: 1356, parent_root: \"935c35e8\", state_root: \"331dda33\")"
NOT 2022-08-20 15:05:51.982+02:00 New LC optimistic block opt=5dbb26df:438928 wallSlot=438929
```
!!! note

View File

@ -15,7 +15,6 @@ import # Unit test
./test_attestation_pool,
./test_beacon_chain_db,
./test_beacon_time,
./test_block_clearance_light_client,
./test_block_dag,
./test_block_processor,
./test_block_quarantine,

View File

@ -1,602 +0,0 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
{.used.}
import
# Status libraries
eth/keys, taskpools,
# Beacon chain internals
../beacon_chain/consensus_object_pools/
[block_clearance_light_client, block_clearance,
block_quarantine, blockchain_dag],
../beacon_chain/spec/state_transition,
# Test utilities
./testutil, ./testdbutil
suite "Block clearance (light client)" & preset():
let
cfg = block:
var res = defaultRuntimeConfig
res.ALTAIR_FORK_EPOCH = GENESIS_EPOCH + 1
res
taskpool = Taskpool.new()
proc newTestDag(): ChainDAGRef =
const num_validators = SLOTS_PER_EPOCH
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = ChainDAGRef.init(
cfg, makeTestDB(num_validators), validatorMonitor, {})
dag
proc addBlocks(
dag: ChainDAGRef,
numBlocks: int,
finalizedCheckpoints: var seq[Checkpoint],
syncCommitteeRatio = 0.0,
numSkippedSlots = 0.uint64) =
let quarantine = newClone(Quarantine.init())
var
cache: StateCache
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
if numSkippedSlots > 0:
var info: ForkedEpochInfo
let slot = getStateField(dag.headState, slot) + numSkippedSlots
process_slots(
cfg, dag.headState, slot, cache, info, flags = {}).expect("no failure")
for blck in makeTestBlocks(dag.headState, cache, numBlocks,
attested = true, syncCommitteeRatio, cfg):
let added =
case blck.kind
of BeaconBlockFork.Phase0:
const nilCallback = OnPhase0BlockAdded(nil)
dag.addHeadBlock(verifier, blck.phase0Data, nilCallback)
of BeaconBlockFork.Altair:
const nilCallback = OnAltairBlockAdded(nil)
dag.addHeadBlock(verifier, blck.altairData, nilCallback)
of BeaconBlockFork.Bellatrix:
const nilCallback = OnBellatrixBlockAdded(nil)
dag.addHeadBlock(verifier, blck.bellatrixData, nilCallback)
check: added.isOk()
dag.updateHead(added[], quarantine[])
withState(dag.headState):
if finalizedCheckpoints.len == 0 or
state.data.finalized_checkpoint != finalizedCheckpoints[^1]:
finalizedCheckpoints.add(state.data.finalized_checkpoint)
proc checkBlocks(lcBlocks: LCBlocks, dag: ChainDAGRef, slots: Slice[Slot]) =
for slot in slots.a .. slots.b:
let
latestLcBlck = lcBlocks.getLatestBlockThroughSlot(slot)
lcBlck = lcBlocks.getBlockAtSlot(slot)
bsi = dag.getBlockIdAtSlot(slot)
dagBlck =
if bsi.isOk:
dag.getForkedBlock(bsi.get.bid)
else:
Opt[ForkedTrustedSignedBeaconBlock].err()
check:
lcBlck.isOk == dagBlck.isOk
lcBlck.isOk == latestLcBlck.isOk
if lcBlck.isOk:
check:
lcBlck.get.root == dagBlck.get.root
lcBlck.get.root == latestLcBlck.get.root
setup:
let dag = newTestDag()
var finalizedCheckpoints: seq[Checkpoint] = @[]
dag.addBlocks(200, finalizedCheckpoints)
test "Initial sync":
const maxSlots = 160
var lcBlocks = initLCBlocks(maxSlots)
let minSlot = dag.head.slot + 1 - maxSlots
check:
lcBlocks.getHeadSlot() == FAR_FUTURE_SLOT
lcBlocks.getFinalizedSlot() == GENESIS_SLOT
lcBlocks.getFrontfillSlot() == GENESIS_SLOT
lcBlocks.getBackfillSlot() == GENESIS_SLOT
lcBlocks.setHeadBid(dag.head.bid)
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == GENESIS_SLOT
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == dag.head.slot + 1
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == dag.head.slot + 1
var bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check:
res.isOk
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == max(bdata.slot, minSlot)
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == minSlot
lcBlocks.checkBlocks(dag, minSlot .. dag.head.slot)
test "Delayed finality update":
const maxSlots = 160
var lcBlocks = initLCBlocks(maxSlots)
let minSlot = dag.head.slot + 1 - maxSlots
lcBlocks.setHeadBid(dag.head.bid)
var bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
for finalizedCheckpoint in finalizedCheckpoints:
let bsi = dag.getBlockIdAtSlot(finalizedCheckpoint.epoch.start_slot)
check bsi.isOk
lcBlocks.setFinalizedBid(bsi.get.bid)
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == minSlot
lcBlocks.checkBlocks(dag, minSlot .. dag.head.slot)
test "Incremental sync":
const maxSlots = 160
var lcBlocks = initLCBlocks(maxSlots)
let
oldHeadSlot = dag.head.slot
oldMinSlot = dag.head.slot + 1 - maxSlots
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
var bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
dag.addBlocks(20, finalizedCheckpoints)
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
let newMinSlot = dag.head.slot + 1 - maxSlots
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == newMinSlot
lcBlocks.getBackfillSlot() == dag.head.slot + 1
bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == newMinSlot
lcBlocks.getBackfillSlot() == newMinSlot
lcBlocks.checkBlocks(dag, newMinSlot .. dag.head.slot)
dag.addBlocks(200, finalizedCheckpoints)
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
let minSlot = dag.head.slot + 1 - maxSlots
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == dag.head.slot + 1
bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == minSlot
lcBlocks.checkBlocks(dag, minSlot .. dag.head.slot)
test "Reverse incremental sync":
const maxSlots = 160
var lcBlocks = initLCBlocks(maxSlots)
let
newHeadBid = dag.head.bid
newFinalizedBid = dag.finalizedHead.blck.bid
dag.addBlocks(20, finalizedCheckpoints)
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
let oldMinSlot = dag.head.slot + 1 - maxSlots
var bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == oldMinSlot
lcBlocks.getBackfillSlot() == oldMinSlot
lcBlocks.checkBlocks(dag, oldMinSlot .. dag.head.slot)
lcBlocks.setHeadBid(newHeadBid)
lcBlocks.setFinalizedBid(newFinalizedBid)
let newMinSlot = newHeadBid.slot + 1 - maxSlots
check:
lcBlocks.getHeadSlot() == newHeadBid.slot
lcBlocks.getFinalizedSlot() == newFinalizedBid.slot
lcBlocks.getFrontfillSlot() == newMinSlot
lcBlocks.getBackfillSlot() == oldMinSlot
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == newHeadBid.slot
lcBlocks.getFinalizedSlot() == newFinalizedBid.slot
lcBlocks.getFrontfillSlot() == newMinSlot
lcBlocks.getBackfillSlot() == newMinSlot
lcBlocks.checkBlocks(dag, newMinSlot .. newHeadBid.slot)
test "Reorg":
const maxSlots = 160
var lcBlocks = initLCBlocks(maxSlots)
let minSlot = dag.head.slot + 1 - maxSlots
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
var bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == minSlot
lcBlocks.checkBlocks(dag, minSlot .. dag.head.slot)
let dag2 = newTestDag()
var finalizedCheckpoints2: seq[Checkpoint] = @[]
dag2.addBlocks(200, finalizedCheckpoints2, syncCommitteeRatio = 0.1)
lcBlocks.setHeadBid(dag2.head.bid)
lcBlocks.setFinalizedBid(dag2.finalizedHead.blck.bid)
check:
lcBlocks.getHeadSlot() == dag2.head.slot
lcBlocks.getFinalizedSlot() == dag2.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == dag2.head.slot + 1
bid = dag2.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag2.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag2.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag2.head.slot
lcBlocks.getFinalizedSlot() == dag2.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == minSlot
lcBlocks.checkBlocks(dag2, minSlot .. dag2.head.slot)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
check:
lcBlocks.getHeadSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() ==
max(dag.finalizedHead.slot, maxSlots.Slot) + 1 - maxSlots
lcBlocks.getBackfillSlot() == dag.finalizedHead.blck.slot + 1
bid = dag.finalizedHead.blck.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
lcBlocks.setHeadBid(dag.head.bid)
bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == minSlot
lcBlocks.getBackfillSlot() == minSlot
lcBlocks.checkBlocks(dag, minSlot .. dag.head.slot)
test "Low slot numbers":
const maxSlots = 320 # DAG slot numbers are smaller than `maxSlots`
var lcBlocks = initLCBlocks(maxSlots)
let
oldHeadBid = dag.head.bid
oldFinalizedBid = dag.finalizedHead.blck.bid
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
var bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == GENESIS_SLOT
lcBlocks.getBackfillSlot() == GENESIS_SLOT
lcBlocks.checkBlocks(dag, GENESIS_SLOT .. dag.head.slot)
dag.addBlocks(20, finalizedCheckpoints)
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
bid = dag.head.bid
while lcBlocks.getBackfillSlot() > lcBlocks.getFrontfillSlot():
let
bdata = dag.getForkedBlock(bid).valueOr:
break
res = lcBlocks.addBlock(bdata.asSigned())
check res.isOk
bid = dag.parent(bid).valueOr:
break
check:
lcBlocks.getHeadSlot() == dag.head.slot
lcBlocks.getFinalizedSlot() == dag.finalizedHead.blck.slot
lcBlocks.getFrontfillSlot() == GENESIS_SLOT
lcBlocks.getBackfillSlot() == GENESIS_SLOT
lcBlocks.setHeadBid(oldHeadBid)
lcBlocks.setFinalizedBid(oldFinalizedBid)
check:
lcBlocks.getHeadSlot() == oldHeadBid.slot
lcBlocks.getFinalizedSlot() == oldFinalizedBid.slot
lcBlocks.getFrontfillSlot() == GENESIS_SLOT
lcBlocks.getBackfillSlot() == GENESIS_SLOT
test "Error conditions":
let dag2 = newTestDag()
var finalizedCheckpoints2: seq[Checkpoint] = @[]
dag2.addBlocks(200, finalizedCheckpoints2, syncCommitteeRatio = 0.1)
const maxSlots = 2
var lcBlocks = initLCBlocks(maxSlots)
check:
lcBlocks.getBlockAtSlot(GENESIS_SLOT).isErr
lcBlocks.getBlockAtSlot(FAR_FUTURE_SLOT).isErr
lcBlocks.getLatestBlockThroughSlot(GENESIS_SLOT).isErr
lcBlocks.getLatestBlockThroughSlot(FAR_FUTURE_SLOT).isErr
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
check:
lcBlocks.getBlockAtSlot(GENESIS_SLOT).isErr
lcBlocks.getBlockAtSlot(FAR_FUTURE_SLOT).isErr
lcBlocks.getBlockAtSlot(dag.head.slot).isErr
lcBlocks.getBlockAtSlot(dag.finalizedHead.blck.slot).isErr
lcBlocks.getLatestBlockThroughSlot(GENESIS_SLOT).isErr
lcBlocks.getLatestBlockThroughSlot(FAR_FUTURE_SLOT).isErr
lcBlocks.getLatestBlockThroughSlot(dag.head.slot).isErr
lcBlocks.getLatestBlockThroughSlot(dag.finalizedHead.blck.slot).isErr
let
parentBid = dag.parent(dag.head.bid).expect("Parent exists")
parentBdata = dag.getForkedBlock(parentBid).expect("Parent block exists")
var res = lcBlocks.addBlock(parentBdata.asSigned())
check:
res.isErr
res.error == BlockError.MissingParent
lcBlocks.getBackfillSlot() == dag.head.slot + 1
let bdata2 = dag2.getForkedBlock(dag2.head.bid).expect("DAG 2 block exists")
res = lcBlocks.addBlock(bdata2.asSigned())
check:
res.isErr
res.error == BlockError.UnviableFork
lcBlocks.getBackfillSlot() == dag.head.slot + 1
let bdata = dag.getForkedBlock(dag.head.bid).expect("DAG block exists")
res = lcBlocks.addBlock(bdata.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag.head.slot
res = lcBlocks.addBlock(bdata2.asSigned())
check:
res.isErr
res.error == BlockError.UnviableFork
lcBlocks.getBackfillSlot() == dag.head.slot
res = lcBlocks.addBlock(bdata.asSigned())
check:
res.isErr
res.error == BlockError.Duplicate
lcBlocks.getBackfillSlot() == dag.head.slot
let
onePastBid = dag.parent(parentBid).expect("Parent of parent exists")
onePastBdata = dag.getForkedBlock(onePastBid).expect("Block exists")
res = lcBlocks.addBlock(onePastBdata.asSigned())
check:
res.isErr
res.error == BlockError.MissingParent
lcBlocks.getBackfillSlot() == dag.head.slot
res = lcBlocks.addBlock(parentBdata.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == parentBdata.slot
lcBlocks.getBlockAtSlot(parentBdata.slot).isOk
lcBlocks.getLatestBlockThroughSlot(parentBdata.slot).isOk
res = lcBlocks.addBlock(onePastBdata.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag.head.slot + 1 - maxSlots
lcBlocks.getBlockAtSlot(onePastBdata.slot).isErr
lcBlocks.getLatestBlockThroughSlot(onePastBdata.slot).isErr
res = lcBlocks.addBlock(onePastBdata.asSigned())
check:
res.isErr
res.error == BlockError.Duplicate
lcBlocks.getBackfillSlot() == dag.head.slot + 1 - maxSlots
let oldHeadBid = dag.head.bid
dag.addBlocks(1, finalizedCheckpoints, numSkippedSlots = 3) # ---X
dag2.addBlocks(2, finalizedCheckpoints2, numSkippedSlots = 2) # --XX
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
let newBdata = dag.getForkedBlock(dag.head.bid).expect("New block ok")
res = lcBlocks.addBlock(newBdata.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag.head.slot
res = lcBlocks.addBlock(bdata.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag.head.slot + 1 - maxSlots
lcBlocks.getBlockAtSlot(dag.head.slot).isOk
lcBlocks.getBlockAtSlot(dag.head.slot - 1).isErr
lcBlocks.getBlockAtSlot(dag.head.slot - 2).isErr
let
newParentBid2 = dag2.parent(dag2.head.bid).expect("New parent 2 exists")
newParentBdata2 = dag2.getForkedBlock(newParentBid2).expect("Parent 2 ok")
res = lcBlocks.addBlock(newParentBdata2.asSigned())
check:
res.isErr
res.error == BlockError.UnviableFork
lcBlocks.getBackfillSlot() == dag.head.slot + 1 - maxSlots
lcBlocks.setHeadBid(dag2.head.bid)
lcBlocks.setFinalizedBid(newParentBid2)
let newBdata2 = dag2.getForkedBlock(dag2.head.bid).expect("New block 2 ok")
res = lcBlocks.addBlock(newBdata2.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag2.head.slot
res = lcBlocks.addBlock(newParentBdata2.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag2.head.slot + 1 - maxSlots
lcBlocks.setHeadBid(dag.head.bid)
res = lcBlocks.addBlock(newBdata.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag.head.slot
res = lcBlocks.addBlock(bdata.asSigned())
check:
res.isErr
res.error == BlockError.UnviableFork
lcBlocks.getHeadSlot() == newParentBid2.slot
lcBlocks.getFinalizedSlot() == newParentBid2.slot
lcBlocks.getFrontfillSlot() == newParentBid2.slot + 1 - maxSlots
lcBlocks.getBackfillSlot() == newParentBid2.slot + 1
res = lcBlocks.addBlock(newParentBdata2.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == newParentBid2.slot
res = lcBlocks.addBlock(bdata2.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == newParentBid2.slot + 1 - maxSlots
lcBlocks.setHeadBid(dag2.head.bid)
lcBlocks.setFinalizedBid(oldHeadBid)
res = lcBlocks.addBlock(newBdata2.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag2.head.slot
res = lcBlocks.addBlock(newParentBdata2.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == newParentBid2.slot
res = lcBlocks.addBlock(bdata.asSigned())
check:
res.isErr
res.error == BlockError.MissingParent
lcBlocks.getBackfillSlot() == newParentBid2.slot
lcBlocks = initLCBlocks(maxSlots = 0)
lcBlocks.setHeadBid(dag.head.bid)
lcBlocks.setFinalizedBid(dag.finalizedHead.blck.bid)
res = lcBlocks.addBlock(newBdata2.asSigned())
check:
res.isErr
res.error == BlockError.UnviableFork
lcBlocks.getBackfillSlot() == dag.head.slot + 1
res = lcBlocks.addBlock(newBdata.asSigned())
check:
res.isOk
lcBlocks.getBackfillSlot() == dag.head.slot + 1
res = lcBlocks.addBlock(newBdata2.asSigned())
check:
res.isErr
res.error == BlockError.UnviableFork
lcBlocks.getBackfillSlot() == dag.head.slot + 1
res = lcBlocks.addBlock(newBdata.asSigned())
check:
res.isErr
res.error == BlockError.Duplicate
lcBlocks.getBackfillSlot() == dag.head.slot + 1