Beacon sync targets cons head rather than finalised block (#2721)
* Fix fringe condition clarifying how to handle an empty range why: The `interval_set` module would treat an undefined interval construct `[2,1]` as`[2,2]` (the right bound being `max(2,1)`.) * Use the `consensus head` rather than the `finalised` block as sync target why: The former is ahead of the `finalised` block. * In ctx descriptor rename `final` field to `target` * Update docu, rename `F` -> `T`
This commit is contained in:
parent
b13f06fcfb
commit
f937f57838
|
@ -116,12 +116,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
|
|||
|
||||
# Update sync header (if any)
|
||||
com.syncReqNewHead(header)
|
||||
|
||||
# Pass on finalised header
|
||||
if com.haveSyncFinalisedBlockHash() or true:
|
||||
let finalizedBlockHash = ethHash update.finalizedBlockHash
|
||||
if finalizedBlockHash != zeroHash32:
|
||||
com.syncFinalisedBlockHash(finalizedBlockHash)
|
||||
com.reqBeaconSyncTargetCB(header)
|
||||
|
||||
return simpleFCU(PayloadExecutionStatus.syncing)
|
||||
|
||||
|
|
|
@ -42,8 +42,8 @@ type
|
|||
SyncReqNewHeadCB* = proc(header: BlockHeader) {.gcsafe, raises: [].}
|
||||
## Update head for syncing
|
||||
|
||||
SyncFinalisedBlockHashCB* = proc(hash: Hash256) {.gcsafe, raises: [].}
|
||||
## Ditto
|
||||
ReqBeaconSyncTargetCB* = proc(header: Header) {.gcsafe, raises: [].}
|
||||
## Ditto (for beacon sync)
|
||||
|
||||
NotifyBadBlockCB* = proc(invalid, origin: BlockHeader) {.gcsafe, raises: [].}
|
||||
## Notify engine-API of encountered bad block
|
||||
|
@ -76,7 +76,7 @@ type
|
|||
## Call back function for the sync processor. This function stages
|
||||
## the arguent header to a private aerea for subsequent processing.
|
||||
|
||||
syncFinalisedBlockHash: SyncFinalisedBlockHashCB
|
||||
reqBeaconSyncTargetCB: ReqBeaconSyncTargetCB
|
||||
## Call back function for a sync processor that returns the canonical
|
||||
## header.
|
||||
|
||||
|
@ -339,13 +339,10 @@ proc syncReqNewHead*(com: CommonRef; header: BlockHeader)
|
|||
if not com.syncReqNewHead.isNil:
|
||||
com.syncReqNewHead(header)
|
||||
|
||||
func haveSyncFinalisedBlockHash*(com: CommonRef): bool =
|
||||
not com.syncFinalisedBlockHash.isNil
|
||||
|
||||
proc syncFinalisedBlockHash*(com: CommonRef; hash: Hash256) =
|
||||
proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header) =
|
||||
## Used by RPC updater
|
||||
if not com.syncFinalisedBlockHash.isNil:
|
||||
com.syncFinalisedBlockHash(hash)
|
||||
if not com.reqBeaconSyncTargetCB.isNil:
|
||||
com.reqBeaconSyncTargetCB(header)
|
||||
|
||||
proc notifyBadBlock*(com: CommonRef; invalid, origin: BlockHeader)
|
||||
{.gcsafe, raises: [].} =
|
||||
|
@ -452,9 +449,9 @@ func `syncReqNewHead=`*(com: CommonRef; cb: SyncReqNewHeadCB) =
|
|||
## Activate or reset a call back handler for syncing.
|
||||
com.syncReqNewHead = cb
|
||||
|
||||
func `syncFinalisedBlockHash=`*(com: CommonRef; cb: SyncFinalisedBlockHashCB) =
|
||||
func `reqBeaconSyncTarget=`*(com: CommonRef; cb: ReqBeaconSyncTargetCB) =
|
||||
## Activate or reset a call back handler for syncing.
|
||||
com.syncFinalisedBlockHash = cb
|
||||
com.reqBeaconSyncTargetCB = cb
|
||||
|
||||
func `notifyBadBlock=`*(com: CommonRef; cb: NotifyBadBlockCB) =
|
||||
## Activate or reset a call back handler for bad block notification.
|
||||
|
|
|
@ -102,30 +102,31 @@ increasing *C* or decreasing *D* by adding/prepending headers so that the
|
|||
linked chain condition is not violated.
|
||||
|
||||
Only when the gap open interval *(C,D)* vanishes, the right end *E* can be
|
||||
increased to a larger **finalised** block number *F* say. Then
|
||||
increased to a larger target block number *T*, say. This block number will
|
||||
typically be the **consensus head**. Then
|
||||
|
||||
* *C==D* beacuse the open interval *(C,D)* is empty
|
||||
* *C==E* because *C* is maximal (see definition of `C` above)
|
||||
|
||||
and the header chains *(E,E,E)* (depicted in *(3)* below) can be set to
|
||||
*(C,F,F)* as depicted in *(4)* below.
|
||||
*(C,T,T)* as depicted in *(4)* below.
|
||||
|
||||
Layout before updating of *E*
|
||||
|
||||
C (3)
|
||||
D
|
||||
0 E F
|
||||
0 E T
|
||||
o----------------o---------------------o---->
|
||||
| <-- linked --> |
|
||||
|
||||
New layout with moving *D* and *E* to *F*
|
||||
New layout with moving *D* and *E* to *T*
|
||||
|
||||
D' (4)
|
||||
0 C E'
|
||||
o----------------o---------------------o---->
|
||||
| <-- linked --> | <-- unprocessed --> |
|
||||
|
||||
with *D'=F* and *E'=F*.
|
||||
with *D'=T* and *E'=T*.
|
||||
|
||||
Note that diagram *(3)* is a generalisation of *(2)*.
|
||||
|
||||
|
@ -133,8 +134,8 @@ Note that diagram *(3)* is a generalisation of *(2)*.
|
|||
### Complete a header linked chain:
|
||||
|
||||
The header chain is *relatively complete* if it satisfies clause *(3)* above
|
||||
for *0 < C*. It is *fully complete* if *E==F*. It should be obvious that the
|
||||
latter condition is temporary only on a live system (as *F* is contiuously
|
||||
for *0 < C*. It is *fully complete* if *E==T*. It should be obvious that the
|
||||
latter condition is temporary only on a live system (as *T* is contiuously
|
||||
updated.)
|
||||
|
||||
If a *relatively complete* header chain is reached for the first time, the
|
||||
|
@ -262,7 +263,7 @@ be available if *nimbus* is compiled with the additional make flags
|
|||
| beacon_coupler | block height | **C**, *increasing* |
|
||||
| beacon_dangling | block height | **D** |
|
||||
| beacon_end | block height | **E**, *increasing* |
|
||||
| beacon_final | block height | **F**, *increasing* |
|
||||
| beacon_target | block height | **T**, *increasing* |
|
||||
| | | |
|
||||
| beacon_header_lists_staged | size | # of staged header list records |
|
||||
| beacon_headers_unprocessed | size | # of accumulated header block numbers|
|
||||
|
|
|
@ -120,8 +120,9 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
|
|||
|
||||
block:
|
||||
# Set advisory flag telling that a slow/long running process will take
|
||||
# place. This works a bit like `runSingle()` only that in the case here
|
||||
# we might have no peer.
|
||||
# place. So there might be some peers active. If they are waiting for
|
||||
# a message reply, this will most probably time out as all processing
|
||||
# power is usurped by the import task here.
|
||||
ctx.pool.importRunningOk = true
|
||||
defer: ctx.pool.importRunningOk = false
|
||||
|
||||
|
@ -173,11 +174,6 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
|
|||
trace info, peer, nInvocations=buddy.only.nMultiLoop,
|
||||
lastIdleGap=buddy.only.multiRunIdle.toStr
|
||||
|
||||
# Update beacon header when needed. For the beacon header, a hash will be
|
||||
# auto-magically made available via RPC. The corresponding header is then
|
||||
# fetched from the current peer.
|
||||
await buddy.headerStagedUpdateBeacon info
|
||||
|
||||
if not await buddy.napUnlessSomethingToFetch info:
|
||||
#
|
||||
# Layout of a triple of linked header chains (see `README.md`)
|
||||
|
|
|
@ -74,7 +74,9 @@ proc blocksUnprocCommit*(
|
|||
|
||||
proc blocksUnprocCovered*(ctx: BeaconCtxRef; minPt,maxPt: BlockNumber): uint64 =
|
||||
## Check whether range is fully contained
|
||||
ctx.blk.unprocessed.covered(minPt, maxPt)
|
||||
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
|
||||
if minPt <= maxPt:
|
||||
return ctx.blk.unprocessed.covered(minPt, maxPt)
|
||||
|
||||
proc blocksUnprocCovered*(ctx: BeaconCtxRef; pt: BlockNumber): bool =
|
||||
## Check whether point is contained
|
||||
|
|
|
@ -54,21 +54,6 @@ proc fetchAndCheck(
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc headerStagedUpdateBeacon*(
|
||||
buddy: BeaconBuddyRef;
|
||||
info: static[string];
|
||||
) {.async.} =
|
||||
## Fetch beacon header if there is an update available
|
||||
let ctx = buddy.ctx
|
||||
if ctx.lhc.final.hash != zeroHash32:
|
||||
const iv = BnRange.new(1u,1u) # dummy interval
|
||||
let rc = await buddy.headersFetchReversed(iv, ctx.lhc.final.hash, info)
|
||||
if rc.isOk and ctx.lhc.final.header.number < rc.value[0].number:
|
||||
ctx.lhc.final.header = rc.value[0]
|
||||
ctx.lhc.final.changed = true
|
||||
ctx.lhc.final.hash = zeroHash32
|
||||
|
||||
|
||||
proc headersStagedCollect*(
|
||||
buddy: BeaconBuddyRef;
|
||||
info: static[string];
|
||||
|
@ -104,7 +89,7 @@ proc headersStagedCollect*(
|
|||
# the top level linked chain `[D,E]`, then there is the hash available for
|
||||
# the top level header to fetch. Otherwise -- with multi-peer mode -- the
|
||||
# range of headers is fetched opportunistically using block numbers only.
|
||||
isOpportunistic = uTop + 1 != ctx.layout.dangling
|
||||
isOpportunistic = uTop + 1 < ctx.layout.dangling
|
||||
|
||||
# Parent hash for `lhc` below
|
||||
topLink = (if isOpportunistic: EMPTY_ROOT_HASH
|
||||
|
|
|
@ -73,9 +73,15 @@ proc headersUnprocCommit*(
|
|||
|
||||
|
||||
|
||||
proc headersUnprocCovered*(ctx: BeaconCtxRef; minPt,maxPt: BlockNumber): uint64 =
|
||||
proc headersUnprocCovered*(
|
||||
ctx: BeaconCtxRef;
|
||||
minPt: BlockNumber;
|
||||
maxPt: BlockNumber;
|
||||
): uint64 =
|
||||
## Check whether range is fully contained
|
||||
ctx.lhc.unprocessed.covered(minPt, maxPt)
|
||||
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
|
||||
if minPt <= maxPt:
|
||||
return ctx.lhc.unprocessed.covered(minPt, maxPt)
|
||||
|
||||
proc headersUnprocCovered*(ctx: BeaconCtxRef; pt: BlockNumber): bool =
|
||||
## Check whether point is contained
|
||||
|
@ -119,6 +125,8 @@ proc headersUnprocSet*(ctx: BeaconCtxRef; iv: BnRange) =
|
|||
proc headersUnprocSet*(ctx: BeaconCtxRef; minPt, maxPt: BlockNumber) =
|
||||
## Set up new unprocessed range
|
||||
ctx.headersUnprocSet()
|
||||
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
|
||||
if minPt <= maxPt:
|
||||
discard ctx.lhc.unprocessed.merge(minPt, maxPt)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -33,8 +33,8 @@ when enableTicker:
|
|||
coupler: ctx.layout.coupler,
|
||||
dangling: ctx.layout.dangling,
|
||||
endBn: ctx.layout.endBn,
|
||||
final: ctx.lhc.final.header.number,
|
||||
finalUpdateOk: ctx.lhc.final.hash != zeroHash32,
|
||||
target: ctx.lhc.target.header.number,
|
||||
newTargetOk: ctx.lhc.target.changed,
|
||||
|
||||
nHdrStaged: ctx.headersStagedQueueLen(),
|
||||
hdrStagedTop: ctx.headersStagedTopKey(),
|
||||
|
@ -51,13 +51,14 @@ when enableTicker:
|
|||
reorg: ctx.pool.nReorg,
|
||||
nBuddies: ctx.pool.nBuddies)
|
||||
|
||||
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): SyncFinalisedBlockHashCB =
|
||||
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB =
|
||||
## Update beacon header. This function is intended as a call back function
|
||||
## for the RPC module.
|
||||
return proc(h: Hash32) {.gcsafe, raises: [].} =
|
||||
return proc(h: Header) {.gcsafe, raises: [].} =
|
||||
# Rpc checks empty header against a zero hash rather than `emptyRoot`
|
||||
if ctx.lhc.final.hash == zeroHash32:
|
||||
ctx.lhc.final.hash = h
|
||||
if ctx.lhc.target.header.number < h.number:
|
||||
ctx.lhc.target.header = h
|
||||
ctx.lhc.target.changed = true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
|
@ -109,11 +110,11 @@ proc setupDatabase*(ctx: BeaconCtxRef) =
|
|||
|
||||
proc setupRpcMagic*(ctx: BeaconCtxRef) =
|
||||
## Helper for `setup()`: Enable external pivot update via RPC
|
||||
ctx.pool.chain.com.syncFinalisedBlockHash = ctx.updateBeaconHeaderCB
|
||||
ctx.pool.chain.com.reqBeaconSyncTarget = ctx.updateBeaconHeaderCB
|
||||
|
||||
proc destroyRpcMagic*(ctx: BeaconCtxRef) =
|
||||
## Helper for `release()`
|
||||
ctx.pool.chain.com.syncFinalisedBlockHash = SyncFinalisedBlockHashCB(nil)
|
||||
ctx.pool.chain.com.reqBeaconSyncTarget = ReqBeaconSyncTargetCB(nil)
|
||||
|
||||
# ---------
|
||||
|
||||
|
|
|
@ -30,8 +30,8 @@ type
|
|||
coupler*: BlockNumber
|
||||
dangling*: BlockNumber
|
||||
endBn*: BlockNumber
|
||||
final*: BlockNumber
|
||||
finalUpdateOk*: bool
|
||||
target*: BlockNumber
|
||||
newTargetOk*: bool
|
||||
|
||||
hdrUnprocTop*: BlockNumber
|
||||
nHdrUnprocessed*: uint64
|
||||
|
@ -73,11 +73,11 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
|||
if data != t.lastStats or
|
||||
tickerLogSuppressMax < (now - t.visited):
|
||||
let
|
||||
B = data.base.bnStr
|
||||
C = if data.base == data.coupler: "B" else: data.coupler.bnStr
|
||||
D = if data.coupler == data.dangling: "C" else: data.dangling.bnStr
|
||||
E = if data.dangling == data.endBn: "D" else: data.endBn.bnStr
|
||||
F = if data.finalUpdateOk: "?" & $data.final else: data.final.bnStr
|
||||
B = if data.base == data.coupler: "C" else: data.base.bnStr
|
||||
C = if data.coupler == data.dangling: "D" else: data.coupler.bnStr
|
||||
D = if data.dangling == data.endBn: "E" else: data.dangling.bnStr
|
||||
E = if data.endBn == data.target: "T" else: data.endBn.bnStr
|
||||
T = if data.newTargetOk: "?" & $data.target else: data.target.bnStr
|
||||
|
||||
hS = if data.nHdrStaged == 0: "n/a"
|
||||
else: data.hdrStagedTop.bnStr & "(" & $data.nHdrStaged & ")"
|
||||
|
@ -101,7 +101,7 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
|||
t.lastStats = data
|
||||
t.visited = now
|
||||
|
||||
info "Sync state", up, peers, B, C, D, E, F, hS, hU, bS, bU, reorg, mem
|
||||
info "Sync state", up, peers, B, C, D, E, T, hS, hU, bS, bU, reorg, mem
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: ticking log messages
|
||||
|
|
|
@ -25,21 +25,21 @@ logScope:
|
|||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc updateFinalisedChange(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
##
|
||||
## Layout (see (3) in README):
|
||||
## ::
|
||||
## 0 C==D==E F
|
||||
## 0 C==D==E T
|
||||
## o----------------o---------------------o---->
|
||||
## | <-- linked --> |
|
||||
##
|
||||
## or
|
||||
## ::
|
||||
## 0==F C==D==E
|
||||
## 0==T C==D==E
|
||||
## o----------------o-------------------------->
|
||||
## | <-- linked --> |
|
||||
##
|
||||
## with `F == final.header.number` or `F == 0`
|
||||
## with `T == target.header.number` or `T == 0`
|
||||
##
|
||||
## to be updated to
|
||||
## ::
|
||||
|
@ -47,11 +47,11 @@ proc updateFinalisedChange(ctx: BeaconCtxRef; info: static[string]): bool =
|
|||
## o----------------o---------------------o---->
|
||||
## | <-- linked --> | <-- unprocessed --> |
|
||||
##
|
||||
var finBn = ctx.lhc.final.header.number
|
||||
var target = ctx.lhc.target.header.number
|
||||
|
||||
# Need: `E < F` and `C == D`
|
||||
if finBn != 0 and finBn <= ctx.layout.endBn: # violates `E < F`
|
||||
trace info & ": not applicable", E=ctx.layout.endBn.bnStr, F=finBn.bnStr
|
||||
# Need: `E < T` and `C == D`
|
||||
if target != 0 and target <= ctx.layout.endBn: # violates `E < T`
|
||||
trace info & ": not applicable", E=ctx.layout.endBn.bnStr, T=target.bnStr
|
||||
return false
|
||||
|
||||
if ctx.layout.coupler != ctx.layout.dangling: # violates `C == D`
|
||||
|
@ -62,19 +62,19 @@ proc updateFinalisedChange(ctx: BeaconCtxRef; info: static[string]): bool =
|
|||
# Check consistency: `C == D <= E` for maximal `C` => `D == E`
|
||||
doAssert ctx.layout.dangling == ctx.layout.endBn
|
||||
|
||||
let rlpHeader = rlp.encode(ctx.lhc.final.header)
|
||||
let rlpHeader = rlp.encode(ctx.lhc.target.header)
|
||||
|
||||
ctx.lhc.layout = LinkedHChainsLayout(
|
||||
coupler: ctx.layout.coupler,
|
||||
couplerHash: ctx.layout.couplerHash,
|
||||
dangling: finBn,
|
||||
danglingParent: ctx.lhc.final.header.parentHash,
|
||||
endBn: finBn,
|
||||
dangling: target,
|
||||
danglingParent: ctx.lhc.target.header.parentHash,
|
||||
endBn: target,
|
||||
endHash: rlpHeader.keccak256)
|
||||
|
||||
# Save this header on the database so it needs not be fetched again from
|
||||
# somewhere else.
|
||||
ctx.dbStashHeaders(finBn, @[rlpHeader])
|
||||
ctx.dbStashHeaders(target, @[rlpHeader])
|
||||
|
||||
# Save state
|
||||
discard ctx.dbStoreLinkedHChainsLayout()
|
||||
|
@ -86,11 +86,14 @@ proc updateFinalisedChange(ctx: BeaconCtxRef; info: static[string]): bool =
|
|||
ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1)
|
||||
|
||||
trace info & ": updated", C=ctx.layout.coupler.bnStr,
|
||||
D=ctx.layout.dangling.bnStr, E=ctx.layout.endBn.bnStr, F=finBn.bnStr
|
||||
uTop=ctx.headersUnprocTop(),
|
||||
D=ctx.layout.dangling.bnStr, E=ctx.layout.endBn.bnStr, T=target.bnStr
|
||||
true
|
||||
|
||||
|
||||
proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
## Merge if `C+1` == `D`
|
||||
##
|
||||
if ctx.lhc.layout.coupler+1 < ctx.lhc.layout.dangling or # gap btw. `C` & `D`
|
||||
ctx.lhc.layout.coupler == ctx.lhc.layout.dangling: # merged already
|
||||
return false
|
||||
|
@ -130,9 +133,9 @@ proc updateLinkedHChainsLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
|||
## Update layout
|
||||
|
||||
# Check whether there is something to do regarding beacon node change
|
||||
if ctx.lhc.final.changed:
|
||||
ctx.lhc.final.changed = false
|
||||
result = ctx.updateFinalisedChange info
|
||||
if ctx.lhc.target.changed:
|
||||
ctx.lhc.target.changed = false
|
||||
result = ctx.updateTargetChange info
|
||||
|
||||
# Check whether header downloading is done
|
||||
if ctx.mergeAdjacentChains info:
|
||||
|
|
|
@ -28,8 +28,8 @@ declareGauge beacon_dangling, "" &
|
|||
declareGauge beacon_end, "" &
|
||||
"Ending/max block number of higher up headers chain"
|
||||
|
||||
declareGauge beacon_final, "" &
|
||||
"Block number of latest known finalised header"
|
||||
declareGauge beacon_target, "" &
|
||||
"Block number of sync target (would be consensus header)"
|
||||
|
||||
|
||||
declareGauge beacon_header_lists_staged, "" &
|
||||
|
@ -54,7 +54,7 @@ template updateMetricsImpl*(ctx: BeaconCtxRef) =
|
|||
metrics.set(beacon_coupler, ctx.layout.coupler.int64)
|
||||
metrics.set(beacon_dangling, ctx.layout.dangling.int64)
|
||||
metrics.set(beacon_end, ctx.layout.endBn.int64)
|
||||
metrics.set(beacon_final, ctx.lhc.final.header.number.int64)
|
||||
metrics.set(beacon_target, ctx.lhc.target.header.number.int64)
|
||||
|
||||
metrics.set(beacon_header_lists_staged, ctx.headersStagedQueueLen())
|
||||
metrics.set(beacon_headers_unprocessed,
|
||||
|
|
|
@ -71,15 +71,14 @@ type
|
|||
endBn*: BlockNumber ## `E`, block num of some finalised block
|
||||
endHash*: Hash32 ## Hash of `E`
|
||||
|
||||
BeaconHeader* = object
|
||||
TargetReqHeader* = object
|
||||
## Beacon state to be implicitely updated by RPC method
|
||||
changed*: bool ## Set a marker if something has changed
|
||||
header*: Header ## Beacon chain, finalised header
|
||||
hash*: Hash32 ## From RPC, hash of finalised header
|
||||
|
||||
LinkedHChainsSync* = object
|
||||
## Sync state for linked header chains
|
||||
final*: BeaconHeader ## Finalised block, see `F` in `README.md`
|
||||
target*: TargetReqHeader ## Consensus head, see `T` in `README.md`
|
||||
unprocessed*: BnRangeSet ## Block or header ranges to fetch
|
||||
borrowed*: uint64 ## Total of temp. fetched ranges
|
||||
staged*: LinkedHChainQueue ## Blocks fetched but not stored yet
|
||||
|
|
Loading…
Reference in New Issue