mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-12 13:24:21 +00:00
Beacon sync block import via forked chain (#2747)
* Accept finalised hash from RPC with the canon header as well * Reorg internal sync descriptor(s) details: Update target from RPC to provide the `consensus header` as well as the `finalised` block number why: Prepare for using `importBlock()` instead of `persistBlocks()` * Cosmetic updates details: + Collect all pretty printers in `helpers.nim` + Remove unused return codes from function prototype * Use `importBlock()` + `forkChoice()` rather than `persistBlocks()` * Update logging and metrics * Update docu
This commit is contained in:
parent
7d41a992e6
commit
0b93236d1b
@ -110,7 +110,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
|
||||
|
||||
# Update sync header (if any)
|
||||
com.syncReqNewHead(header)
|
||||
com.reqBeaconSyncTargetCB(header)
|
||||
com.reqBeaconSyncTargetCB(header, update.finalizedBlockHash)
|
||||
|
||||
return simpleFCU(PayloadExecutionStatus.syncing)
|
||||
|
||||
|
@ -42,7 +42,7 @@ type
|
||||
SyncReqNewHeadCB* = proc(header: Header) {.gcsafe, raises: [].}
|
||||
## Update head for syncing
|
||||
|
||||
ReqBeaconSyncTargetCB* = proc(header: Header) {.gcsafe, raises: [].}
|
||||
ReqBeaconSyncTargetCB* = proc(header: Header; finHash: Hash32) {.gcsafe, raises: [].}
|
||||
## Ditto (for beacon sync)
|
||||
|
||||
NotifyBadBlockCB* = proc(invalid, origin: Header) {.gcsafe, raises: [].}
|
||||
@ -337,10 +337,10 @@ proc syncReqNewHead*(com: CommonRef; header: Header)
|
||||
if not com.syncReqNewHead.isNil:
|
||||
com.syncReqNewHead(header)
|
||||
|
||||
proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header) =
|
||||
proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header; finHash: Hash32) =
|
||||
## Used by RPC updater
|
||||
if not com.reqBeaconSyncTargetCB.isNil:
|
||||
com.reqBeaconSyncTargetCB(header)
|
||||
com.reqBeaconSyncTargetCB(header, finHash)
|
||||
|
||||
proc notifyBadBlock*(com: CommonRef; invalid, origin: Header)
|
||||
{.gcsafe, raises: [].} =
|
||||
|
@ -63,7 +63,7 @@ proc init*(
|
||||
desc.initSync(ethNode, maxPeers)
|
||||
desc.ctx.pool.nBodiesBatch = chunkSize
|
||||
# Initalise for `persistBlocks()`
|
||||
desc.ctx.pool.chain = chain.com.newChain()
|
||||
desc.ctx.pool.chain = chain
|
||||
desc
|
||||
|
||||
proc start*(ctx: BeaconSyncRef) =
|
||||
|
@ -47,26 +47,26 @@ A sequence *@[h(1),h(2),..]* of block headers is called a *linked chain* if
|
||||
|
||||
General header linked chains layout diagram
|
||||
|
||||
0 C D E (1)
|
||||
0 C D H (1)
|
||||
o----------------o---------------------o----------------o--->
|
||||
| <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
||||
|
||||
Here, the single upper letter symbols *0*, *C*, *D*, *E* denote block numbers.
|
||||
Here, the single upper letter symbols *0*, *C*, *D*, *H* denote block numbers.
|
||||
For convenience, these letters are also identified with its associated block
|
||||
header or the full blocks. Saying *"the header 0"* is short for *"the header
|
||||
with block number 0"*.
|
||||
|
||||
Meaning of *0*, *C*, *D*, *E*:
|
||||
Meaning of *0*, *C*, *D*, *H*:
|
||||
|
||||
* *0* -- Genesis, block number number *0*
|
||||
* *C* -- coupler, maximal block number of linked chain starting at *0*
|
||||
* *D* -- dangling, minimal block number of linked chain ending at *E*
|
||||
* *D* -- dangling, minimal block number of linked chain ending at *H*
|
||||
with *C <= D*
|
||||
* *E* -- end, block number of some finalised block (not necessarily the latest
|
||||
one)
|
||||
* *H* -- head, end block number of **consensus head** (not necessarily the
|
||||
latest one as this is moving while processing)
|
||||
|
||||
This definition implies *0 <= C <= D <= E* and the state of the header linked
|
||||
chains can uniquely be described by the triple of block numbers *(C,D,E)*.
|
||||
This definition implies *0 <= C <= D <= H* and the state of the header linked
|
||||
chains can uniquely be described by the triple of block numbers *(C,D,H)*.
|
||||
|
||||
|
||||
### Storage of header chains:
|
||||
@ -78,7 +78,7 @@ correspond to finalised blocks, e.g. the sub-interval *[0,**base**]* where
|
||||
half open interval *(**base**,C]* are always stored on the *beaconHeader*
|
||||
column of the *KVT* database.
|
||||
|
||||
The block numbers from the interval *[D,E]* also reside on the *beaconHeader*
|
||||
The block numbers from the interval *[D,H]* also reside on the *beaconHeader*
|
||||
column of the *KVT* database table.
|
||||
|
||||
|
||||
@ -89,7 +89,7 @@ Minimal layout on a pristine system
|
||||
0 (2)
|
||||
C
|
||||
D
|
||||
E
|
||||
H
|
||||
o--->
|
||||
|
||||
When first initialised, the header linked chains are set to *(0,0,0)*.
|
||||
@ -101,32 +101,32 @@ A header chain with an non empty open interval *(C,D)* can be updated only by
|
||||
increasing *C* or decreasing *D* by adding/prepending headers so that the
|
||||
linked chain condition is not violated.
|
||||
|
||||
Only when the gap open interval *(C,D)* vanishes, the right end *E* can be
|
||||
Only when the gap open interval *(C,D)* vanishes, the right end *H* can be
|
||||
increased to a larger target block number *T*, say. This block number will
|
||||
typically be the **consensus head**. Then
|
||||
|
||||
* *C==D* beacuse the open interval *(C,D)* is empty
|
||||
* *C==E* because *C* is maximal (see definition of `C` above)
|
||||
* *C==H* because *C* is maximal (see definition of `C` above)
|
||||
|
||||
and the header chains *(E,E,E)* (depicted in *(3)* below) can be set to
|
||||
and the header chains *(H,H,H)* (depicted in *(3)* below) can be set to
|
||||
*(C,T,T)* as depicted in *(4)* below.
|
||||
|
||||
Layout before updating of *E*
|
||||
Layout before updating of *H*
|
||||
|
||||
C (3)
|
||||
D
|
||||
0 E T
|
||||
0 H T
|
||||
o----------------o---------------------o---->
|
||||
| <-- linked --> |
|
||||
|
||||
New layout with moving *D* and *E* to *T*
|
||||
New layout with moving *D* and *H* to *T*
|
||||
|
||||
D' (4)
|
||||
0 C E'
|
||||
0 C H'
|
||||
o----------------o---------------------o---->
|
||||
| <-- linked --> | <-- unprocessed --> |
|
||||
|
||||
with *D'=T* and *E'=T*.
|
||||
with *D'=T* and *H'=T*.
|
||||
|
||||
Note that diagram *(3)* is a generalisation of *(2)*.
|
||||
|
||||
@ -134,7 +134,7 @@ Note that diagram *(3)* is a generalisation of *(2)*.
|
||||
### Complete a header linked chain:
|
||||
|
||||
The header chain is *relatively complete* if it satisfies clause *(3)* above
|
||||
for *0 < C*. It is *fully complete* if *E==T*. It should be obvious that the
|
||||
for *0 < C*. It is *fully complete* if *H==T*. It should be obvious that the
|
||||
latter condition is temporary only on a live system (as *T* is contiuously
|
||||
updated.)
|
||||
|
||||
@ -146,20 +146,22 @@ database state will be updated incrementally.
|
||||
Block chain import/execution
|
||||
-----------------------------
|
||||
|
||||
The following diagram with a parially imported/executed block chain amends the
|
||||
The following diagram with a partially imported/executed block chain amends the
|
||||
layout *(1)*:
|
||||
|
||||
0 B C D E (5)
|
||||
o------------------o-------o---------------------o----------------o-->
|
||||
0 B L C D H (5)
|
||||
o------------o-----o-------o---------------------o----------------o-->
|
||||
| <-- imported --> | | | |
|
||||
| <------- linked ------> | <-- unprocessed --> | <-- linked --> |
|
||||
|
||||
where
|
||||
|
||||
where *B* is the **base**, i.e. the **base state** block number of the last
|
||||
imported/executed block. It also refers to the global state block number of
|
||||
the ledger database.
|
||||
* *B* is the **base state** stored on the persistent state database. *B* is
|
||||
not addressed directly except upon start up or resuming sync when *B == L*.
|
||||
* *L* is the last imported/executed block, typically up to the **canonical
|
||||
consensus head**.
|
||||
|
||||
The headers corresponding to the half open interval `(B,C]` will be completed
|
||||
The headers corresponding to the half open interval `(L,C]` will be completed
|
||||
by fetching block bodies and then import/execute them together with the already
|
||||
cached headers.
|
||||
|
||||
@ -260,9 +262,11 @@ be available if *nimbus* is compiled with the additional make flags
|
||||
|:-------------------|:------------:|:--------------------|
|
||||
| | | |
|
||||
| beacon_base | block height | **B**, *increasing* |
|
||||
| beacon_latest | block height | **L**, *increasing* |
|
||||
| beacon_coupler | block height | **C**, *increasing* |
|
||||
| beacon_dangling | block height | **D** |
|
||||
| beacon_end | block height | **E**, *increasing* |
|
||||
| beacon_final | block height | **F**, *increasing* |
|
||||
| beacon_head | block height | **H**, *increasing* |
|
||||
| beacon_target | block height | **T**, *increasing* |
|
||||
| | | |
|
||||
| beacon_header_lists_staged | size | # of staged header list records |
|
||||
|
@ -8,7 +8,6 @@
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
|
||||
{.push raises:[].}
|
||||
|
||||
import
|
||||
@ -112,8 +111,8 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
|
||||
debug info
|
||||
|
||||
# Check for a possible header layout and body request changes
|
||||
discard ctx.updateLinkedHChainsLayout info
|
||||
discard ctx.updateBlockRequests info
|
||||
ctx.updateSyncStateLayout info
|
||||
ctx.updateBlockRequests info
|
||||
|
||||
# Execute staged block records.
|
||||
if ctx.blocksStagedCanImportOk():
|
||||
@ -127,13 +126,10 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
|
||||
defer: ctx.pool.importRunningOk = false
|
||||
|
||||
# Import from staged queue.
|
||||
while ctx.blocksStagedImport info:
|
||||
while await ctx.blocksStagedImport(info):
|
||||
ctx.updateMetrics()
|
||||
|
||||
# Allow pseudo/async thread switch
|
||||
await sleepAsync asyncThreadSwitchTimeSlot
|
||||
|
||||
# At the end of the cycle, leave time to refill headers/blocks
|
||||
# At the end of the cycle, leave time to trigger refill headers/blocks
|
||||
await sleepAsync daemonWaitInterval
|
||||
|
||||
ctx.updateMetrics()
|
||||
@ -174,12 +170,16 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
|
||||
trace info, peer, nInvocations=buddy.only.nMultiLoop,
|
||||
lastIdleGap=buddy.only.multiRunIdle.toStr
|
||||
|
||||
# Update consensus header target when needed. It comes with a finalised
|
||||
# header hash where we need to complete the block number.
|
||||
await buddy.headerStagedUpdateTarget info
|
||||
|
||||
if not await buddy.napUnlessSomethingToFetch info:
|
||||
#
|
||||
# Layout of a triple of linked header chains (see `README.md`)
|
||||
# ::
|
||||
# 0 C D E
|
||||
# | <--- [0,C] --> | <----- (C,D) -----> | <-- [D,E] ---> |
|
||||
# 0 C D H
|
||||
# | <--- [0,C] --> | <----- (C,D) -----> | <-- [D,H] ---> |
|
||||
# o----------------o---------------------o----------------o--->
|
||||
# | <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
||||
#
|
||||
@ -194,7 +194,7 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
|
||||
#
|
||||
# The block numbers range concurrently taken from `(C,D)` are chosen
|
||||
# from the upper range. So exactly one of the actors has a range
|
||||
# `[whatever,D-1]` adjacent to `[D,E]`. Call this actor the lead actor.
|
||||
# `[whatever,D-1]` adjacent to `[D,H]`. Call this actor the lead actor.
|
||||
#
|
||||
# For the lead actor, headers can be downloaded all by the hashes as
|
||||
# the parent hash for the header with block number `D` is known. All
|
||||
@ -217,7 +217,7 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
|
||||
if await buddy.headersStagedCollect info:
|
||||
|
||||
# * Save updated state and headers
|
||||
# * Decrease the dangling left boundary `D` of the trusted range `[D,E]`
|
||||
# * Decrease the dangling left boundary `D` of the trusted range `[D,H]`
|
||||
discard buddy.ctx.headersStagedProcess info
|
||||
|
||||
# Fetch bodies and combine them with headers to blocks to be staged. These
|
||||
|
@ -26,6 +26,13 @@ logScope:
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func getNthHash(blk: BlocksForImport; n: int): Hash32 =
|
||||
if n + 1 < blk.blocks.len:
|
||||
blk.blocks[n + 1].header.parentHash
|
||||
else:
|
||||
rlp.encode(blk.blocks[n].header).keccak256
|
||||
|
||||
|
||||
proc fetchAndCheck(
|
||||
buddy: BeaconBuddyRef;
|
||||
ivReq: BnRange;
|
||||
@ -218,38 +225,73 @@ proc blocksStagedCollect*(
|
||||
return true
|
||||
|
||||
|
||||
proc blocksStagedImport*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
proc blocksStagedImport*(
|
||||
ctx: BeaconCtxRef;
|
||||
info: static[string];
|
||||
): Future[bool]
|
||||
{.async.} =
|
||||
## Import/execute blocks record from staged queue
|
||||
##
|
||||
let qItem = ctx.blk.staged.ge(0).valueOr:
|
||||
return false
|
||||
|
||||
# Fetch least record, accept only if it matches the global ledger state
|
||||
let base = ctx.dbStateBlockNumber()
|
||||
if qItem.key != base + 1:
|
||||
trace info & ": there is a gap", B=base.bnStr, stagedBottom=qItem.key.bnStr
|
||||
return false
|
||||
block:
|
||||
let imported = ctx.chain.latestNumber()
|
||||
if qItem.key != imported + 1:
|
||||
trace info & ": there is a gap L vs. staged",
|
||||
B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr
|
||||
return false
|
||||
|
||||
# Remove from queue
|
||||
discard ctx.blk.staged.delete qItem.key
|
||||
|
||||
# Execute blocks
|
||||
let stats = ctx.pool.chain.persistBlocks(qItem.data.blocks).valueOr:
|
||||
# FIXME: should that be rather an `raiseAssert` here?
|
||||
warn info & ": block exec error", B=base.bnStr,
|
||||
iv=BnRange.new(qItem.key,qItem.key+qItem.data.blocks.len.uint64-1),
|
||||
error=error
|
||||
doAssert base == ctx.dbStateBlockNumber()
|
||||
return false
|
||||
let
|
||||
nBlocks = qItem.data.blocks.len
|
||||
iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1)
|
||||
|
||||
trace info & ": imported staged blocks", B=ctx.dbStateBlockNumber.bnStr,
|
||||
first=qItem.key.bnStr, stats
|
||||
var maxImport = iv.maxPt
|
||||
for n in 0 ..< nBlocks:
|
||||
let nBn = qItem.data.blocks[n].header.number
|
||||
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
|
||||
warn info & ": import block error", iv,
|
||||
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
|
||||
nBn=nBn.bnStr, txLevel=ctx.chain.db.level, `error`=error
|
||||
# Restore what is left over below
|
||||
maxImport = ctx.chain.latestNumber()
|
||||
break
|
||||
|
||||
# Remove stashed headers
|
||||
for bn in qItem.key ..< qItem.key + qItem.data.blocks.len.uint64:
|
||||
# Occasionally mark the chain finalized
|
||||
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
|
||||
let
|
||||
nHash = qItem.data.getNthHash(n)
|
||||
finHash = if nBn < ctx.layout.final: nHash else: ctx.layout.finalHash
|
||||
|
||||
doAssert nBn == ctx.chain.latestNumber()
|
||||
ctx.pool.chain.forkChoice(headHash=nHash, finalizedHash=finHash).isOkOr:
|
||||
warn info & ": fork choice error", iv,
|
||||
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
|
||||
F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level, nHash,
|
||||
finHash=(if finHash == nHash: "nHash" else: "F"), `error`=error
|
||||
# Restore what is left over below
|
||||
maxImport = ctx.chain.latestNumber()
|
||||
break
|
||||
|
||||
# Allow pseudo/async thread switch.
|
||||
await sleepAsync asyncThreadSwitchTimeSlot
|
||||
|
||||
# Import probably incomplete, so a partial roll back may be needed
|
||||
if maxImport < iv.maxPt:
|
||||
ctx.blocksUnprocCommit(0, maxImport+1, qItem.data.blocks[^1].header.number)
|
||||
|
||||
# Remove stashed headers for imported blocks
|
||||
for bn in iv.minPt .. maxImport:
|
||||
ctx.dbUnstashHeader bn
|
||||
|
||||
true
|
||||
trace info & ": import done", iv,
|
||||
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
|
||||
F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level
|
||||
return true
|
||||
|
||||
|
||||
func blocksStagedBottomKey*(ctx: BeaconCtxRef): BlockNumber =
|
||||
|
@ -112,6 +112,18 @@ proc blocksUnprocInit*(ctx: BeaconCtxRef) =
|
||||
## Constructor
|
||||
ctx.blk.unprocessed = BnRangeSet.init()
|
||||
|
||||
proc blocksUnprocSet*(ctx: BeaconCtxRef) =
|
||||
## Clear
|
||||
ctx.blk.unprocessed.clear()
|
||||
ctx.blk.borrowed = 0u
|
||||
|
||||
proc blocksUnprocSet*(ctx: BeaconCtxRef; minPt, maxPt: BlockNumber) =
|
||||
## Set up new unprocessed range
|
||||
ctx.blocksUnprocSet()
|
||||
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
|
||||
if minPt <= maxPt:
|
||||
discard ctx.blk.unprocessed.merge(minPt, maxPt)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -15,10 +15,9 @@ import
|
||||
pkg/eth/[common, rlp],
|
||||
pkg/stew/[byteutils, interval_set, sorted_set],
|
||||
pkg/results,
|
||||
../../../db/storage_types,
|
||||
../../../common,
|
||||
"../../.."/[common, core/chain, db/storage_types],
|
||||
../worker_desc,
|
||||
./headers_unproc
|
||||
"."/[blocks_unproc, headers_unproc]
|
||||
|
||||
logScope:
|
||||
topics = "beacon db"
|
||||
@ -43,37 +42,24 @@ formatIt(Hash32):
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc fetchLinkedHChainsLayout(ctx: BeaconCtxRef): Opt[LinkedHChainsLayout] =
|
||||
proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] =
|
||||
let data = ctx.db.ctx.getKvt().get(LhcStateKey.toOpenArray).valueOr:
|
||||
return err()
|
||||
try:
|
||||
result = ok(rlp.decode(data, LinkedHChainsLayout))
|
||||
return ok(rlp.decode(data, SyncStateLayout))
|
||||
except RlpError:
|
||||
return err()
|
||||
|
||||
|
||||
proc fetchSavedState(ctx: BeaconCtxRef): Opt[SavedDbStateSpecs] =
|
||||
let db = ctx.db
|
||||
var val: SavedDbStateSpecs
|
||||
val.number = db.getSavedStateBlockNumber()
|
||||
|
||||
if db.getBlockHash(val.number, val.hash):
|
||||
var header: Header
|
||||
if db.getBlockHeader(val.hash, header):
|
||||
val.parent = header.parentHash
|
||||
return ok(val)
|
||||
|
||||
discard
|
||||
err()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc dbStoreLinkedHChainsLayout*(ctx: BeaconCtxRef): bool =
|
||||
proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef) =
|
||||
## Save chain layout to persistent db
|
||||
const info = "dbStoreLinkedHChainsLayout"
|
||||
if ctx.layout == ctx.lhc.lastLayout:
|
||||
return false
|
||||
const info = "dbStoreSyncStateLayout"
|
||||
if ctx.layout == ctx.sst.lastLayout:
|
||||
return
|
||||
|
||||
let data = rlp.encode(ctx.layout)
|
||||
ctx.db.ctx.getKvt().put(LhcStateKey.toOpenArray, data).isOkOr:
|
||||
@ -86,42 +72,54 @@ proc dbStoreLinkedHChainsLayout*(ctx: BeaconCtxRef): bool =
|
||||
let number = ctx.db.getSavedStateBlockNumber()
|
||||
ctx.db.persistent(number).isOkOr:
|
||||
debug info & ": failed to save persistently", error=($$error)
|
||||
return false
|
||||
return
|
||||
else:
|
||||
trace info & ": not saved, tx pending", txLevel
|
||||
return false
|
||||
return
|
||||
|
||||
trace info & ": saved pesistently on DB"
|
||||
true
|
||||
|
||||
|
||||
proc dbLoadLinkedHChainsLayout*(ctx: BeaconCtxRef) =
|
||||
proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef) =
|
||||
## Restore chain layout from persistent db
|
||||
const info = "dbLoadLinkedHChainsLayout"
|
||||
|
||||
let rc = ctx.fetchLinkedHChainsLayout()
|
||||
let
|
||||
rc = ctx.fetchSyncStateLayout()
|
||||
latest = ctx.chain.latestNumber()
|
||||
|
||||
if rc.isOk:
|
||||
ctx.lhc.layout = rc.value
|
||||
let (uMin,uMax) = (rc.value.coupler+1, rc.value.dangling-1)
|
||||
if uMin <= uMax:
|
||||
# Add interval of unprocessed block range `(C,D)` from `README.md`
|
||||
ctx.headersUnprocSet(uMin, uMax)
|
||||
trace info & ": restored layout", C=rc.value.coupler.bnStr,
|
||||
D=rc.value.dangling.bnStr, E=rc.value.endBn.bnStr
|
||||
ctx.sst.layout = rc.value
|
||||
|
||||
# Add interval of unprocessed block range `(L,C]` from `README.md`
|
||||
ctx.blocksUnprocSet(latest+1, ctx.layout.coupler)
|
||||
ctx.blk.topRequest = ctx.layout.coupler
|
||||
|
||||
# Add interval of unprocessed header range `(C,D)` from `README.md`
|
||||
ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1)
|
||||
|
||||
trace info & ": restored layout", L=latest.bnStr,
|
||||
C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr,
|
||||
F=ctx.layout.final.bnStr, H=ctx.layout.head.bnStr
|
||||
|
||||
else:
|
||||
let val = ctx.fetchSavedState().expect "saved states"
|
||||
ctx.lhc.layout = LinkedHChainsLayout(
|
||||
coupler: val.number,
|
||||
couplerHash: val.hash,
|
||||
dangling: val.number,
|
||||
danglingParent: val.parent,
|
||||
endBn: val.number,
|
||||
endHash: val.hash)
|
||||
trace info & ": new layout", B=val.number, C=rc.value.coupler.bnStr,
|
||||
D=rc.value.dangling.bnStr, E=rc.value.endBn.bnStr
|
||||
let
|
||||
latestHash = ctx.chain.latestHash()
|
||||
latestParent = ctx.chain.latestHeader.parentHash
|
||||
|
||||
ctx.lhc.lastLayout = ctx.layout
|
||||
ctx.sst.layout = SyncStateLayout(
|
||||
coupler: latest,
|
||||
couplerHash: latestHash,
|
||||
dangling: latest,
|
||||
danglingParent: latestParent,
|
||||
final: latest,
|
||||
finalHash: latestHash,
|
||||
head: latest,
|
||||
headHash: latestHash)
|
||||
|
||||
trace info & ": new layout", L="C", C="D", D="F", F="H", H=latest.bnStr
|
||||
|
||||
ctx.sst.lastLayout = ctx.layout
|
||||
|
||||
# ------------------
|
||||
|
||||
@ -170,13 +168,6 @@ proc dbUnstashHeader*(ctx: BeaconCtxRef; bn: BlockNumber) =
|
||||
## Remove header from temporary DB list
|
||||
discard ctx.db.ctx.getKvt().del(beaconHeaderKey(bn).toOpenArray)
|
||||
|
||||
# ------------------
|
||||
|
||||
proc dbStateBlockNumber*(ctx: BeaconCtxRef): BlockNumber =
|
||||
## Currently only a wrapper around the function returning the current
|
||||
## database state block number
|
||||
ctx.db.getSavedStateBlockNumber()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -54,6 +54,33 @@ proc fetchAndCheck(
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc headerStagedUpdateTarget*(
|
||||
buddy: BeaconBuddyRef;
|
||||
info: static[string];
|
||||
) {.async.} =
|
||||
## Fetch finalised beacon header if there is an update available
|
||||
let ctx = buddy.ctx
|
||||
if not ctx.layout.headLocked and
|
||||
ctx.target.final == 0 and
|
||||
ctx.target.finalHash != zeroHash32 and
|
||||
not ctx.target.locked:
|
||||
const iv = BnRange.new(1u,1u) # dummy interval
|
||||
|
||||
ctx.target.locked = true
|
||||
let rc = await buddy.headersFetchReversed(iv, ctx.target.finalHash, info)
|
||||
ctx.target.locked = false
|
||||
|
||||
if rc.isOK:
|
||||
let hash = rlp.encode(rc.value[0]).keccak256
|
||||
if hash != ctx.target.finalHash:
|
||||
# Oops
|
||||
buddy.ctrl.zombie = true
|
||||
trace info & ": finalised header hash mismatch", peer=buddy.peer, hash,
|
||||
expected=ctx.target.finalHash
|
||||
else:
|
||||
ctx.target.final = rc.value[0].number
|
||||
|
||||
|
||||
proc headersStagedCollect*(
|
||||
buddy: BeaconBuddyRef;
|
||||
info: static[string];
|
||||
@ -86,7 +113,7 @@ proc headersStagedCollect*(
|
||||
iv = ctx.headersUnprocFetch(nFetchHeadersBatch).expect "valid interval"
|
||||
|
||||
# Check for top header hash. If the range to fetch directly joins below
|
||||
# the top level linked chain `[D,E]`, then there is the hash available for
|
||||
# the top level linked chain `[D,H]`, then there is the hash available for
|
||||
# the top level header to fetch. Otherwise -- with multi-peer mode -- the
|
||||
# range of headers is fetched opportunistically using block numbers only.
|
||||
isOpportunistic = uTop + 1 < ctx.layout.dangling
|
||||
@ -158,13 +185,13 @@ proc headersStagedCollect*(
|
||||
break
|
||||
|
||||
# Store `lhc` chain on the `staged` queue
|
||||
let qItem = ctx.lhc.staged.insert(iv.maxPt).valueOr:
|
||||
let qItem = ctx.hdr.staged.insert(iv.maxPt).valueOr:
|
||||
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
|
||||
qItem.data = lhc[]
|
||||
|
||||
trace info & ": staged headers", peer,
|
||||
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
|
||||
nStaged=ctx.lhc.staged.len, isOpportunistic, ctrl=buddy.ctrl.state
|
||||
nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state
|
||||
|
||||
return true
|
||||
|
||||
@ -175,7 +202,7 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
|
||||
## of records processed and saved.
|
||||
while true:
|
||||
# Fetch largest block
|
||||
let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr:
|
||||
let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr:
|
||||
trace info & ": no staged headers", error=error
|
||||
break # all done
|
||||
|
||||
@ -192,7 +219,7 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
|
||||
|
||||
# Process item from `staged` queue. So it is not needed in the list,
|
||||
# anymore.
|
||||
discard ctx.lhc.staged.delete(iv.maxPt)
|
||||
discard ctx.hdr.staged.delete(iv.maxPt)
|
||||
|
||||
if qItem.data.hash != ctx.layout.danglingParent:
|
||||
# Discard wrong chain and merge back the range into the `unproc` list.
|
||||
@ -205,14 +232,14 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
|
||||
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs)
|
||||
ctx.layout.dangling = iv.minPt
|
||||
ctx.layout.danglingParent = qItem.data.parentHash
|
||||
discard ctx.dbStoreLinkedHChainsLayout()
|
||||
ctx.dbStoreSyncStateLayout()
|
||||
|
||||
result.inc # count records
|
||||
|
||||
trace info & ": staged records saved",
|
||||
nStaged=ctx.lhc.staged.len, nSaved=result
|
||||
nStaged=ctx.hdr.staged.len, nSaved=result
|
||||
|
||||
if headersStagedQueueLengthLwm < ctx.lhc.staged.len:
|
||||
if headersStagedQueueLengthLwm < ctx.hdr.staged.len:
|
||||
ctx.poolMode = true
|
||||
|
||||
|
||||
@ -227,14 +254,14 @@ func headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## (downloading deterministically by hashes) and many fast opportunistic
|
||||
## actors filling the staged queue.
|
||||
##
|
||||
if ctx.lhc.staged.len == 0:
|
||||
if ctx.hdr.staged.len == 0:
|
||||
# nothing to do
|
||||
return
|
||||
|
||||
# Update counter
|
||||
ctx.pool.nReorg.inc
|
||||
|
||||
let nStaged = ctx.lhc.staged.len
|
||||
let nStaged = ctx.hdr.staged.len
|
||||
if headersStagedQueueLengthHwm < nStaged:
|
||||
trace info & ": hwm reached, flushing staged queue",
|
||||
nStaged, max=headersStagedQueueLengthLwm
|
||||
@ -244,32 +271,32 @@ func headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
# remain.
|
||||
for _ in 0 .. nStaged - headersStagedQueueLengthLwm:
|
||||
let
|
||||
qItem = ctx.lhc.staged.ge(BlockNumber 0).expect "valid record"
|
||||
qItem = ctx.hdr.staged.ge(BlockNumber 0).expect "valid record"
|
||||
key = qItem.key
|
||||
nHeaders = qItem.data.revHdrs.len.uint64
|
||||
ctx.headersUnprocCommit(0, key - nHeaders + 1, key)
|
||||
discard ctx.lhc.staged.delete key
|
||||
discard ctx.hdr.staged.delete key
|
||||
|
||||
|
||||
func headersStagedTopKey*(ctx: BeaconCtxRef): BlockNumber =
|
||||
## Retrieve to staged block number
|
||||
let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr:
|
||||
let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr:
|
||||
return BlockNumber(0)
|
||||
qItem.key
|
||||
|
||||
func headersStagedQueueLen*(ctx: BeaconCtxRef): int =
|
||||
## Number of staged records
|
||||
ctx.lhc.staged.len
|
||||
ctx.hdr.staged.len
|
||||
|
||||
func headersStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
|
||||
## `true` iff no data are on the queue.
|
||||
ctx.lhc.staged.len == 0
|
||||
ctx.hdr.staged.len == 0
|
||||
|
||||
# ----------------
|
||||
|
||||
func headersStagedInit*(ctx: BeaconCtxRef) =
|
||||
## Constructor
|
||||
ctx.lhc.staged = LinkedHChainQueue.init()
|
||||
ctx.hdr.staged = LinkedHChainQueue.init()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -28,7 +28,7 @@ proc headersUnprocFetch*(
|
||||
## `0` is interpreted as `2^64`.
|
||||
##
|
||||
let
|
||||
q = ctx.lhc.unprocessed
|
||||
q = ctx.hdr.unprocessed
|
||||
|
||||
# Fetch top/right interval with largest block numbers
|
||||
jv = q.le().valueOr:
|
||||
@ -49,18 +49,18 @@ proc headersUnprocFetch*(
|
||||
BnRange.new(jv.maxPt - maxLen + 1, jv.maxPt)
|
||||
|
||||
discard q.reduce(iv)
|
||||
ctx.lhc.borrowed += iv.len
|
||||
ctx.hdr.borrowed += iv.len
|
||||
ok(iv)
|
||||
|
||||
|
||||
proc headersUnprocCommit*(ctx: BeaconCtxRef; borrowed: uint) =
|
||||
## Commit back all processed range
|
||||
ctx.lhc.borrowed -= borrowed
|
||||
ctx.hdr.borrowed -= borrowed
|
||||
|
||||
proc headersUnprocCommit*(ctx: BeaconCtxRef; borrowed: uint; retuor: BnRange) =
|
||||
## Merge back unprocessed range `retour`
|
||||
ctx.headersUnprocCommit borrowed
|
||||
doAssert ctx.lhc.unprocessed.merge(retuor) == retuor.len
|
||||
doAssert ctx.hdr.unprocessed.merge(retuor) == retuor.len
|
||||
|
||||
proc headersUnprocCommit*(
|
||||
ctx: BeaconCtxRef;
|
||||
@ -69,7 +69,7 @@ proc headersUnprocCommit*(
|
||||
rMaxPt: BlockNumber) =
|
||||
## Variant of `headersUnprocCommit()`
|
||||
ctx.headersUnprocCommit borrowed
|
||||
doAssert ctx.lhc.unprocessed.merge(rMinPt, rMaxPt) == rMaxPt - rMinPt + 1
|
||||
doAssert ctx.hdr.unprocessed.merge(rMinPt, rMaxPt) == rMaxPt - rMinPt + 1
|
||||
|
||||
|
||||
|
||||
@ -81,53 +81,48 @@ proc headersUnprocCovered*(
|
||||
## Check whether range is fully contained
|
||||
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
|
||||
if minPt <= maxPt:
|
||||
return ctx.lhc.unprocessed.covered(minPt, maxPt)
|
||||
return ctx.hdr.unprocessed.covered(minPt, maxPt)
|
||||
|
||||
proc headersUnprocCovered*(ctx: BeaconCtxRef; pt: BlockNumber): bool =
|
||||
## Check whether point is contained
|
||||
ctx.lhc.unprocessed.covered(pt, pt) == 1
|
||||
ctx.hdr.unprocessed.covered(pt, pt) == 1
|
||||
|
||||
|
||||
proc headersUnprocTop*(ctx: BeaconCtxRef): BlockNumber =
|
||||
let iv = ctx.lhc.unprocessed.le().valueOr:
|
||||
let iv = ctx.hdr.unprocessed.le().valueOr:
|
||||
return BlockNumber(0)
|
||||
iv.maxPt
|
||||
|
||||
proc headersUnprocTotal*(ctx: BeaconCtxRef): uint64 =
|
||||
ctx.lhc.unprocessed.total()
|
||||
ctx.hdr.unprocessed.total()
|
||||
|
||||
proc headersUnprocBorrowed*(ctx: BeaconCtxRef): uint64 =
|
||||
ctx.lhc.borrowed
|
||||
ctx.hdr.borrowed
|
||||
|
||||
proc headersUnprocChunks*(ctx: BeaconCtxRef): int =
|
||||
ctx.lhc.unprocessed.chunks()
|
||||
ctx.hdr.unprocessed.chunks()
|
||||
|
||||
proc headersUnprocIsEmpty*(ctx: BeaconCtxRef): bool =
|
||||
ctx.lhc.unprocessed.chunks() == 0
|
||||
ctx.hdr.unprocessed.chunks() == 0
|
||||
|
||||
# ------------
|
||||
|
||||
proc headersUnprocInit*(ctx: BeaconCtxRef) =
|
||||
## Constructor
|
||||
ctx.lhc.unprocessed = BnRangeSet.init()
|
||||
ctx.hdr.unprocessed = BnRangeSet.init()
|
||||
|
||||
|
||||
proc headersUnprocSet*(ctx: BeaconCtxRef) =
|
||||
## Clear
|
||||
ctx.lhc.unprocessed.clear()
|
||||
ctx.lhc.borrowed = 0u
|
||||
|
||||
proc headersUnprocSet*(ctx: BeaconCtxRef; iv: BnRange) =
|
||||
## Set up new unprocessed range
|
||||
ctx.headersUnprocSet()
|
||||
discard ctx.lhc.unprocessed.merge(iv)
|
||||
ctx.hdr.unprocessed.clear()
|
||||
ctx.hdr.borrowed = 0u
|
||||
|
||||
proc headersUnprocSet*(ctx: BeaconCtxRef; minPt, maxPt: BlockNumber) =
|
||||
## Set up new unprocessed range
|
||||
ctx.headersUnprocSet()
|
||||
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
|
||||
if minPt <= maxPt:
|
||||
discard ctx.lhc.unprocessed.merge(minPt, maxPt)
|
||||
discard ctx.hdr.unprocessed.merge(minPt, maxPt)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -14,12 +14,26 @@
|
||||
|
||||
import
|
||||
pkg/chronos,
|
||||
pkg/eth/common
|
||||
pkg/eth/common,
|
||||
pkg/stew/interval_set
|
||||
|
||||
proc bnStr*(w: BlockNumber): string =
|
||||
func bnStr*(w: BlockNumber): string =
|
||||
"#" & $w
|
||||
|
||||
func bnStr*(h: Header): string =
|
||||
h.number.bnStr
|
||||
|
||||
func bnStr*(b: EthBlock): string =
|
||||
b.header.bnStr
|
||||
|
||||
func bnStr*(w: Interval[BlockNumber,uint64]): string =
|
||||
if w.len == 1: w.minPt.bnStr else: w.minPt.bnStr & ".." & w.maxPt.bnStr
|
||||
|
||||
func toStr*(a: chronos.Duration): string =
|
||||
a.toString 2
|
||||
|
||||
|
||||
proc `$`*(w: Interval[BlockNumber,uint64]): string =
|
||||
w.bnStr
|
||||
|
||||
# End
|
||||
|
@ -29,12 +29,15 @@ when enableTicker:
|
||||
## Legacy stuff, will be probably be superseded by `metrics`
|
||||
return proc: auto =
|
||||
TickerStats(
|
||||
base: ctx.dbStateBlockNumber(),
|
||||
stored: ctx.db.getSavedStateBlockNumber(),
|
||||
base: ctx.chain.baseNumber(),
|
||||
latest: ctx.chain.latestNumber(),
|
||||
coupler: ctx.layout.coupler,
|
||||
dangling: ctx.layout.dangling,
|
||||
endBn: ctx.layout.endBn,
|
||||
target: ctx.lhc.target.header.number,
|
||||
newTargetOk: ctx.lhc.target.changed,
|
||||
final: ctx.layout.final,
|
||||
head: ctx.layout.head,
|
||||
target: ctx.target.consHead.number,
|
||||
targetOk: ctx.target.final != 0,
|
||||
|
||||
nHdrStaged: ctx.headersStagedQueueLen(),
|
||||
hdrStagedTop: ctx.headersStagedTopKey(),
|
||||
@ -54,11 +57,14 @@ when enableTicker:
|
||||
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB =
|
||||
## Update beacon header. This function is intended as a call back function
|
||||
## for the RPC module.
|
||||
return proc(h: Header) {.gcsafe, raises: [].} =
|
||||
return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} =
|
||||
# Rpc checks empty header against a zero hash rather than `emptyRoot`
|
||||
if ctx.lhc.target.header.number < h.number:
|
||||
ctx.lhc.target.header = h
|
||||
ctx.lhc.target.changed = true
|
||||
if not ctx.target.locked:
|
||||
if f != zeroHash32 and ctx.target.consHead.number < h.number:
|
||||
ctx.target.consHead = h
|
||||
ctx.target.final = BlockNumber(0)
|
||||
ctx.target.finalHash = f
|
||||
ctx.target.changed = true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
@ -90,7 +96,7 @@ proc setupDatabase*(ctx: BeaconCtxRef) =
|
||||
ctx.blocksUnprocInit()
|
||||
|
||||
# Load initial state from database if there is any
|
||||
ctx.dbLoadLinkedHChainsLayout()
|
||||
ctx.dbLoadSyncStateLayout()
|
||||
|
||||
# Set blocks batch import value for `persistBlocks()`
|
||||
if ctx.pool.nBodiesBatch < nFetchBodiesRequest:
|
||||
|
@ -26,12 +26,15 @@ type
|
||||
|
||||
TickerStats* = object
|
||||
## Full sync state (see `TickerFullStatsUpdater`)
|
||||
stored*: BlockNumber
|
||||
base*: BlockNumber
|
||||
latest*: BlockNumber
|
||||
coupler*: BlockNumber
|
||||
dangling*: BlockNumber
|
||||
endBn*: BlockNumber
|
||||
final*: BlockNumber
|
||||
head*: BlockNumber
|
||||
target*: BlockNumber
|
||||
newTargetOk*: bool
|
||||
targetOk*: bool
|
||||
|
||||
hdrUnprocTop*: BlockNumber
|
||||
nHdrUnprocessed*: uint64
|
||||
@ -73,11 +76,14 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||
if data != t.lastStats or
|
||||
tickerLogSuppressMax < (now - t.visited):
|
||||
let
|
||||
B = if data.base == data.coupler: "C" else: data.base.bnStr
|
||||
S = data.stored.bnStr
|
||||
B = if data.base == data.latest: "L" else: data.base.bnStr
|
||||
L = if data.latest == data.coupler: "C" else: data.latest.bnStr
|
||||
C = if data.coupler == data.dangling: "D" else: data.coupler.bnStr
|
||||
D = if data.dangling == data.endBn: "E" else: data.dangling.bnStr
|
||||
E = if data.endBn == data.target: "T" else: data.endBn.bnStr
|
||||
T = if data.newTargetOk: "?" & $data.target else: data.target.bnStr
|
||||
D = if data.dangling == data.final: "F" else: data.dangling.bnStr
|
||||
F = if data.final == data.head: "H" else: data.final.bnStr
|
||||
H = if data.head == data.target: "T" else: data.head.bnStr
|
||||
T = if data.targetOk: data.target.bnStr else: "?" & $data.target
|
||||
|
||||
hS = if data.nHdrStaged == 0: "n/a"
|
||||
else: data.hdrStagedTop.bnStr & "(" & $data.nHdrStaged & ")"
|
||||
@ -91,7 +97,7 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||
else: data.blkUnprocTop.bnStr & "(" &
|
||||
data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")"
|
||||
|
||||
reorg = data.reorg
|
||||
rrg = data.reorg
|
||||
peers = data.nBuddies
|
||||
|
||||
# With `int64`, there are more than 29*10^10 years range for seconds
|
||||
@ -101,7 +107,13 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||
t.lastStats = data
|
||||
t.visited = now
|
||||
|
||||
info "Sync state", up, peers, B, C, D, E, T, hS, hU, bS, bU, reorg, mem
|
||||
if data.stored == data.base:
|
||||
info "Sync state", up, peers,
|
||||
B, L, C, D, F, H, T, hS, hU, bS, bU, rrg, mem
|
||||
else:
|
||||
info "Sync state", up, peers,
|
||||
S=data.stored.bnStr,
|
||||
B, L, C, D, F, H, T, hS, hU, bS, bU, rrg, mem
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: ticking log messages
|
||||
|
@ -14,6 +14,7 @@ import
|
||||
pkg/[chronicles, chronos],
|
||||
pkg/eth/[common, rlp],
|
||||
pkg/stew/sorted_set,
|
||||
../../../core/chain,
|
||||
../worker_desc,
|
||||
./update/metrics,
|
||||
"."/[blocks_unproc, db, headers_staged, headers_unproc]
|
||||
@ -25,59 +26,62 @@ logScope:
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) =
|
||||
##
|
||||
## Layout (see (3) in README):
|
||||
## ::
|
||||
## 0 C==D==E T
|
||||
## 0 C==D==H T
|
||||
## o----------------o---------------------o---->
|
||||
## | <-- linked --> |
|
||||
##
|
||||
## or
|
||||
## ::
|
||||
## 0==T C==D==E
|
||||
## 0==T C==D==H
|
||||
## o----------------o-------------------------->
|
||||
## | <-- linked --> |
|
||||
##
|
||||
## with `T == target.header.number` or `T == 0`
|
||||
## with `T == target.consHead.number` or `T == 0`
|
||||
##
|
||||
## to be updated to
|
||||
## ::
|
||||
## 0 C==D D'==E'
|
||||
## 0 C==D D'==H'
|
||||
## o----------------o---------------------o---->
|
||||
## | <-- linked --> | <-- unprocessed --> |
|
||||
##
|
||||
var target = ctx.lhc.target.header.number
|
||||
var target = ctx.target.consHead.number
|
||||
|
||||
# Need: `E < T` and `C == D`
|
||||
if target != 0 and target <= ctx.layout.endBn: # violates `E < T`
|
||||
trace info & ": not applicable", E=ctx.layout.endBn.bnStr, T=target.bnStr
|
||||
return false
|
||||
# Need: `H < T` and `C == D`
|
||||
if target != 0 and target <= ctx.layout.head: # violates `H < T`
|
||||
trace info & ": not applicable", H=ctx.layout.head.bnStr, T=target.bnStr
|
||||
return
|
||||
|
||||
if ctx.layout.coupler != ctx.layout.dangling: # violates `C == D`
|
||||
trace info & ": not applicable",
|
||||
C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr
|
||||
return false
|
||||
return
|
||||
|
||||
# Check consistency: `C == D <= E` for maximal `C` => `D == E`
|
||||
doAssert ctx.layout.dangling == ctx.layout.endBn
|
||||
# Check consistency: `C == D <= H` for maximal `C` => `D == H`
|
||||
doAssert ctx.layout.dangling == ctx.layout.head
|
||||
|
||||
let rlpHeader = rlp.encode(ctx.lhc.target.header)
|
||||
let rlpHeader = rlp.encode(ctx.target.consHead)
|
||||
|
||||
ctx.lhc.layout = LinkedHChainsLayout(
|
||||
ctx.sst.layout = SyncStateLayout(
|
||||
coupler: ctx.layout.coupler,
|
||||
couplerHash: ctx.layout.couplerHash,
|
||||
dangling: target,
|
||||
danglingParent: ctx.lhc.target.header.parentHash,
|
||||
endBn: target,
|
||||
endHash: rlpHeader.keccak256)
|
||||
danglingParent: ctx.target.consHead.parentHash,
|
||||
final: ctx.target.final,
|
||||
finalHash: ctx.target.finalHash,
|
||||
head: target,
|
||||
headHash: rlpHeader.keccak256,
|
||||
headLocked: true)
|
||||
|
||||
# Save this header on the database so it needs not be fetched again from
|
||||
# somewhere else.
|
||||
ctx.dbStashHeaders(target, @[rlpHeader])
|
||||
|
||||
# Save state
|
||||
discard ctx.dbStoreLinkedHChainsLayout()
|
||||
ctx.dbStoreSyncStateLayout()
|
||||
|
||||
# Update range
|
||||
doAssert ctx.headersUnprocTotal() == 0
|
||||
@ -87,78 +91,83 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
|
||||
trace info & ": updated", C=ctx.layout.coupler.bnStr,
|
||||
uTop=ctx.headersUnprocTop(),
|
||||
D=ctx.layout.dangling.bnStr, E=ctx.layout.endBn.bnStr, T=target.bnStr
|
||||
true
|
||||
D=ctx.layout.dangling.bnStr, H=ctx.layout.head.bnStr, T=target.bnStr
|
||||
|
||||
|
||||
proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Merge if `C+1` == `D`
|
||||
##
|
||||
if ctx.lhc.layout.coupler+1 < ctx.lhc.layout.dangling or # gap btw. `C` & `D`
|
||||
ctx.lhc.layout.coupler == ctx.lhc.layout.dangling: # merged already
|
||||
return false
|
||||
if ctx.layout.coupler+1 < ctx.layout.dangling or # gap btw. `C` & `D`
|
||||
ctx.layout.coupler == ctx.layout.dangling: # merged already
|
||||
return
|
||||
|
||||
# No overlap allowed!
|
||||
doAssert ctx.lhc.layout.coupler+1 == ctx.lhc.layout.dangling
|
||||
doAssert ctx.layout.coupler+1 == ctx.layout.dangling
|
||||
|
||||
# Verify adjacent chains
|
||||
if ctx.lhc.layout.couplerHash != ctx.lhc.layout.danglingParent:
|
||||
if ctx.layout.couplerHash != ctx.layout.danglingParent:
|
||||
# FIXME: Oops -- any better idea than to defect?
|
||||
raiseAssert info & ": hashes do not match" &
|
||||
" C=" & ctx.lhc.layout.coupler.bnStr &
|
||||
" D=" & $ctx.lhc.layout.dangling.bnStr
|
||||
" C=" & ctx.layout.coupler.bnStr & " D=" & $ctx.layout.dangling.bnStr
|
||||
|
||||
trace info & ": merging", C=ctx.lhc.layout.coupler.bnStr,
|
||||
D=ctx.lhc.layout.dangling.bnStr
|
||||
trace info & ": merging", C=ctx.layout.coupler.bnStr,
|
||||
D=ctx.layout.dangling.bnStr
|
||||
|
||||
# Merge adjacent linked chains
|
||||
ctx.lhc.layout = LinkedHChainsLayout(
|
||||
coupler: ctx.layout.endBn, # `C`
|
||||
couplerHash: ctx.layout.endHash,
|
||||
dangling: ctx.layout.endBn, # `D`
|
||||
danglingParent: ctx.dbPeekParentHash(ctx.layout.endBn).expect "Hash32",
|
||||
endBn: ctx.layout.endBn, # `E`
|
||||
endHash: ctx.layout.endHash)
|
||||
ctx.sst.layout = SyncStateLayout(
|
||||
coupler: ctx.layout.head, # `C`
|
||||
couplerHash: ctx.layout.headHash,
|
||||
dangling: ctx.layout.head, # `D`
|
||||
danglingParent: ctx.dbPeekParentHash(ctx.layout.head).expect "Hash32",
|
||||
final: ctx.layout.final, # `F`
|
||||
finalHash: ctx.layout.finalHash,
|
||||
head: ctx.layout.head, # `H`
|
||||
headHash: ctx.layout.headHash,
|
||||
headLocked: ctx.layout.headLocked)
|
||||
|
||||
# Save state
|
||||
discard ctx.dbStoreLinkedHChainsLayout()
|
||||
|
||||
true
|
||||
ctx.dbStoreSyncStateLayout()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc updateLinkedHChainsLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Update layout
|
||||
|
||||
# Check whether the target has been reached. In that case, unlock the
|
||||
# consensus head `H` from the current layout so that it can be updated
|
||||
# in time.
|
||||
if ctx.layout.headLocked:
|
||||
# So we have a session
|
||||
let latest= ctx.chain.latestNumber()
|
||||
if ctx.layout.head <= latest:
|
||||
doAssert ctx.layout.head == latest
|
||||
ctx.layout.headLocked = false
|
||||
|
||||
# Check whether there is something to do regarding beacon node change
|
||||
if ctx.lhc.target.changed:
|
||||
ctx.lhc.target.changed = false
|
||||
result = ctx.updateTargetChange info
|
||||
if not ctx.layout.headLocked and ctx.target.changed and ctx.target.final != 0:
|
||||
ctx.target.changed = false
|
||||
ctx.updateTargetChange info
|
||||
|
||||
# Check whether header downloading is done
|
||||
if ctx.mergeAdjacentChains info:
|
||||
result = true
|
||||
ctx.mergeAdjacentChains info
|
||||
|
||||
|
||||
proc updateBlockRequests*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
proc updateBlockRequests*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Update block requests if there staged block queue is empty
|
||||
let base = ctx.dbStateBlockNumber()
|
||||
if base < ctx.layout.coupler: # so half open interval `(B,C]` is not empty
|
||||
let latest = ctx.chain.latestNumber()
|
||||
if latest < ctx.layout.coupler: # so half open interval `(L,C]` is not empty
|
||||
|
||||
# One can fill/import/execute blocks by number from `(B,C]`
|
||||
# One can fill/import/execute blocks by number from `(L,C]`
|
||||
if ctx.blk.topRequest < ctx.layout.coupler:
|
||||
# So there is some space
|
||||
trace info & ": updating", B=base.bnStr, topReq=ctx.blk.topRequest.bnStr,
|
||||
C=ctx.layout.coupler.bnStr
|
||||
trace info & ": updating", L=latest.bnStr,
|
||||
topReq=ctx.blk.topRequest.bnStr, C=ctx.layout.coupler.bnStr
|
||||
|
||||
ctx.blocksUnprocCommit(
|
||||
0, max(base, ctx.blk.topRequest) + 1, ctx.layout.coupler)
|
||||
0, max(latest, ctx.blk.topRequest) + 1, ctx.layout.coupler)
|
||||
ctx.blk.topRequest = ctx.layout.coupler
|
||||
return true
|
||||
|
||||
false
|
||||
|
||||
|
||||
proc updateMetrics*(ctx: BeaconCtxRef) =
|
||||
|
@ -12,12 +12,15 @@
|
||||
|
||||
import
|
||||
pkg/metrics,
|
||||
../../../../core/chain,
|
||||
../../worker_desc,
|
||||
".."/[db, blocks_staged, headers_staged]
|
||||
|
||||
".."/[blocks_staged, headers_staged]
|
||||
|
||||
declareGauge beacon_base, "" &
|
||||
"Max block number of imported/executed blocks"
|
||||
"Max block number of imported finalised blocks"
|
||||
|
||||
declareGauge beacon_latest, "" &
|
||||
"Block number of latest imported blocks"
|
||||
|
||||
declareGauge beacon_coupler, "" &
|
||||
"Max block number for header chain starting at genesis"
|
||||
@ -25,7 +28,10 @@ declareGauge beacon_coupler, "" &
|
||||
declareGauge beacon_dangling, "" &
|
||||
"Starting/min block number for higher up headers chain"
|
||||
|
||||
declareGauge beacon_end, "" &
|
||||
declareGauge beacon_final, "" &
|
||||
"Max number of finalised block in higher up headers chain"
|
||||
|
||||
declareGauge beacon_head, "" &
|
||||
"Ending/max block number of higher up headers chain"
|
||||
|
||||
declareGauge beacon_target, "" &
|
||||
@ -50,11 +56,13 @@ declareGauge beacon_buddies, "" &
|
||||
|
||||
|
||||
template updateMetricsImpl*(ctx: BeaconCtxRef) =
|
||||
metrics.set(beacon_base, ctx.dbStateBlockNumber().int64)
|
||||
metrics.set(beacon_base, ctx.chain.baseNumber().int64)
|
||||
metrics.set(beacon_latest, ctx.chain.latestNumber().int64)
|
||||
metrics.set(beacon_coupler, ctx.layout.coupler.int64)
|
||||
metrics.set(beacon_dangling, ctx.layout.dangling.int64)
|
||||
metrics.set(beacon_end, ctx.layout.endBn.int64)
|
||||
metrics.set(beacon_target, ctx.lhc.target.header.number.int64)
|
||||
metrics.set(beacon_final, ctx.layout.final.int64)
|
||||
metrics.set(beacon_head, ctx.layout.head.int64)
|
||||
metrics.set(beacon_target, ctx.target.consHead.number.int64)
|
||||
|
||||
metrics.set(beacon_header_lists_staged, ctx.headersStagedQueueLen())
|
||||
metrics.set(beacon_headers_unprocessed,
|
||||
|
@ -121,6 +121,10 @@ const
|
||||
## entry block number is too high and so leaves a gap to the ledger state
|
||||
## block number.)
|
||||
|
||||
finaliserChainLengthMax* = 32
|
||||
## When importing with `importBlock()`, finalise after at most this many
|
||||
## invocations of `importBlock()`.
|
||||
|
||||
# ----------------------
|
||||
|
||||
static:
|
||||
|
@ -54,39 +54,57 @@ type
|
||||
|
||||
# -------------------
|
||||
|
||||
LinkedHChainsLayout* = object
|
||||
## Layout of a linked header chains defined by the triple `(C,D,E)` as
|
||||
SyncStateTarget* = object
|
||||
## Beacon state to be implicitely updated by RPC method
|
||||
locked*: bool ## Don't update while fetching header
|
||||
changed*: bool ## Tell that something has changed
|
||||
consHead*: Header ## Consensus head
|
||||
final*: BlockNumber ## Finalised block number
|
||||
finalHash*: Hash32 ## Finalised hash
|
||||
|
||||
SyncStateLayout* = object
|
||||
## Layout of a linked header chains defined by the triple `(C,D,H)` as
|
||||
## described in the `README.md` text.
|
||||
## ::
|
||||
## 0 C D E
|
||||
## o----------------o---------------------o----------------o--->
|
||||
## | <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
||||
## 0 B L C D F H
|
||||
## o----------o-----o-------o---------------------o------------o---o--->
|
||||
## | <- imported -> | | | |
|
||||
## | <------ linked ------> | <-- unprocessed --> | <-- linked --> |
|
||||
##
|
||||
## Additional positions known but not declared in this descriptor:
|
||||
## * `B`: base state (from `forked_chain` importer)
|
||||
## * `L`: last imported block, canonical consensus head
|
||||
## * `F`: finalised head (from CL)
|
||||
##
|
||||
coupler*: BlockNumber ## Right end `C` of linked chain `[0,C]`
|
||||
couplerHash*: Hash32 ## Hash of `C`
|
||||
|
||||
dangling*: BlockNumber ## Left end `D` of linked chain `[D,E]`
|
||||
dangling*: BlockNumber ## Left end `D` of linked chain `[D,H]`
|
||||
danglingParent*: Hash32 ## Parent hash of `D`
|
||||
|
||||
endBn*: BlockNumber ## `E`, block num of some finalised block
|
||||
endHash*: Hash32 ## Hash of `E`
|
||||
final*: BlockNumber ## Finalised block number `F`
|
||||
finalHash*: Hash32 ## Hash of `F`
|
||||
|
||||
TargetReqHeader* = object
|
||||
## Beacon state to be implicitely updated by RPC method
|
||||
changed*: bool ## Set a marker if something has changed
|
||||
header*: Header ## Beacon chain, finalised header
|
||||
head*: BlockNumber ## `H`, block num of some finalised block
|
||||
headHash*: Hash32 ## Hash of `H`
|
||||
headLocked*: bool ## No need to update `H` yet
|
||||
|
||||
LinkedHChainsSync* = object
|
||||
## Sync state for linked header chains
|
||||
target*: TargetReqHeader ## Consensus head, see `T` in `README.md`
|
||||
SyncState* = object
|
||||
## Sync state for header and block chains
|
||||
target*: SyncStateTarget ## Consensus head, see `T` in `README.md`
|
||||
layout*: SyncStateLayout ## Current header chains layout
|
||||
lastLayout*: SyncStateLayout ## Previous layout (for delta update)
|
||||
|
||||
# -------------------
|
||||
|
||||
HeaderImportSync* = object
|
||||
## Header sync staging area
|
||||
unprocessed*: BnRangeSet ## Block or header ranges to fetch
|
||||
borrowed*: uint64 ## Total of temp. fetched ranges
|
||||
staged*: LinkedHChainQueue ## Blocks fetched but not stored yet
|
||||
layout*: LinkedHChainsLayout ## Current header chains layout
|
||||
lastLayout*: LinkedHChainsLayout ## Previous layout (for delta update)
|
||||
|
||||
BlocksImportSync* = object
|
||||
## Sync state for blocks to import/execute
|
||||
## Block sync staging area
|
||||
unprocessed*: BnRangeSet ## Blocks download requested
|
||||
borrowed*: uint64 ## Total of temp. fetched ranges
|
||||
topRequest*: BlockNumber ## Max requested block number
|
||||
@ -107,14 +125,15 @@ type
|
||||
BeaconCtxData* = object
|
||||
## Globally shared data extension
|
||||
nBuddies*: int ## Number of active workers
|
||||
lhcSyncState*: LinkedHChainsSync ## Syncing by linked header chains
|
||||
blkSyncState*: BlocksImportSync ## For importing/executing blocks
|
||||
syncState*: SyncState ## Save/resume state descriptor
|
||||
hdrSync*: HeaderImportSync ## Syncing by linked header chains
|
||||
blkSync*: BlocksImportSync ## For importing/executing blocks
|
||||
nextUpdate*: Moment ## For updating metrics
|
||||
|
||||
# Blocks import/execution settings for running `persistBlocks()` with
|
||||
# `nBodiesBatch` blocks in each round (minimum value is
|
||||
# `nFetchBodiesRequest`.)
|
||||
chain*: ChainRef
|
||||
chain*: ForkedChainRef ## Database
|
||||
importRunningOk*: bool ## Advisory lock, fetch vs. import
|
||||
nBodiesBatch*: int ## Default `nFetchBodiesBatchDefault`
|
||||
blocksStagedQuLenMax*: int ## Default `blocksStagedQueueLenMaxDefault`
|
||||
@ -136,29 +155,34 @@ type
|
||||
# Public helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func lhc*(ctx: BeaconCtxRef): var LinkedHChainsSync =
|
||||
func sst*(ctx: BeaconCtxRef): var SyncState =
|
||||
## Shortcut
|
||||
ctx.pool.lhcSyncState
|
||||
ctx.pool.syncState
|
||||
|
||||
func hdr*(ctx: BeaconCtxRef): var HeaderImportSync =
|
||||
## Shortcut
|
||||
ctx.pool.hdrSync
|
||||
|
||||
func blk*(ctx: BeaconCtxRef): var BlocksImportSync =
|
||||
## Shortcut
|
||||
ctx.pool.blkSyncState
|
||||
ctx.pool.blkSync
|
||||
|
||||
func layout*(ctx: BeaconCtxRef): var LinkedHChainsLayout =
|
||||
func layout*(ctx: BeaconCtxRef): var SyncStateLayout =
|
||||
## Shortcut
|
||||
ctx.pool.lhcSyncState.layout
|
||||
ctx.sst.layout
|
||||
|
||||
func target*(ctx: BeaconCtxRef): var SyncStateTarget =
|
||||
## Shortcut
|
||||
ctx.sst.target
|
||||
|
||||
func chain*(ctx: BeaconCtxRef): ForkedChainRef =
|
||||
## Getter
|
||||
ctx.pool.chain
|
||||
|
||||
func db*(ctx: BeaconCtxRef): CoreDbRef =
|
||||
## Getter
|
||||
ctx.pool.chain.db
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public logging/debugging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc `$`*(w: BnRange): string =
|
||||
if w.len == 1: $w.minPt else: $w.minPt & ".." & $w.maxPt
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
Loading…
x
Reference in New Issue
Block a user