mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-04 16:25:10 +00:00
Beacon sync update multi exe heads aware (#2861)
* Log/trace cancellation events in scheduler * Provide `clear()` functions for explicitly flushing data objects * Renaming header cache functions why: More systematic, all functions start with prefix `dbHeader` * Remove `danglingParent` from layout why: Already provided by header cache * Remove `couplerHash` and `headHash` from layout why: No need to cache, `headHash` is unused and `couplerHash` used typically once, only. * Remove `lastLayout` from sync descriptor why: No need to compare changes, saving is always triggered after actively changing the sync layout state * Early reject unsuitable head + finalised header from CL why: The finalised header is only passed by its hash so the header must be fetched somewhere, e.g. from a peer via eth/xx. Also, finalised headers earlier than the `base` from `FC` cannot be handled due to the `Aristo` single state database architecture. Luckily, on a full node, the complete block history is available so unsuitable finalised headers are stored there already which is exploited here to avoid unnecessary network traffic. * Code cosmetics, remove cruft, prettify logging, remove `final` metrics detail: The `final` layout parameter will be deprecated and later removed * Update/re-calibrate syncer logic documentation why: The current implementation sucks if the `FC` module changes the canonical branch in the middle of completing a header chain (due to concurrent updates by the `newPayload()` logic.) * Implement according to re-calibrated syncer docu details: The implementation employs the notion of named layout states (see `SyncLayoutState` in `worker_desc.nim`) which are derived from the state parameter triple `(C,D,H)` as described in `README.md`.
This commit is contained in:
parent
c525590a51
commit
a241050c94
@ -66,58 +66,67 @@ Implementation, The Gory Details
|
||||
The following diagram depicts a most general state view of the sync and the
|
||||
*FC* modules and at a given point of time
|
||||
|
||||
0 C L (5)
|
||||
o------------o-------o
|
||||
0 L (5)
|
||||
o--------------------o
|
||||
| <--- imported ---> |
|
||||
Y D H
|
||||
C D H
|
||||
o---------------------o----------------o
|
||||
| <-- unprocessed --> | <-- linked --> |
|
||||
|
||||
where
|
||||
|
||||
* *C* -- coupler, cached **base** entity of the **FC** module, reported at
|
||||
the time when *H* was set. This determines the maximal way back length
|
||||
of the *linked* ancestor chain starting at *H*.
|
||||
* *C* -- coupler, parent of the left endpoint of the chain of headers or blocks
|
||||
to be fetched and imported.
|
||||
|
||||
* *Y* -- has the same block number as *C* and is often, but not necessarily
|
||||
equal to *C* (for notation *C~Y* see clause *(6)* below.)
|
||||
* *L* -- **latest**, current value of this entity (with the same name) of the
|
||||
**FC** module (i.e. the current value when looked up.) *L* need not
|
||||
be a parent of any header of the linked chain `(C,H]` (see below for
|
||||
notation). Both *L* and *H* might be heads of different forked chains.
|
||||
|
||||
* *L* -- **latest**, current value of this entity of the **FC** module (i.e.
|
||||
now, when looked up)
|
||||
* *D* -- dangling, header with the least block number of the linked chain in
|
||||
progress ending at *H*. This variable is used to record the download
|
||||
state eventually reaching *Y* (for notation *D<<H* see clause *(6)*
|
||||
below.)
|
||||
|
||||
* *D* -- dangling, least block number of the linked chain in progress ending
|
||||
at *H*. This variable is used to record the download state eventually
|
||||
reaching *Y* (for notation *D<<H* see clause *(6)* below.)
|
||||
* *H* -- head, sync target header which typically was the value of a *sync to
|
||||
new head* request (via RPC)
|
||||
|
||||
* *H* -- head, sync target which typically is the value of a *sync to new head*
|
||||
request (via RPC)
|
||||
|
||||
The internal sync state (as opposed to the general state also including **FC**)
|
||||
is defined by the triple *(C,D,H)*. Other parameters *L* and *Y* mentioned in
|
||||
*(5)* are considered ephemeral to the sync state. They are always used by its
|
||||
latest value and are not cached by the syncer.
|
||||
The internal sync state (as opposed to the general state also including the
|
||||
state of **FC**) is defined by the triple *(C,D,H)*. Other parameters like *L*
|
||||
mentioned in *(5)* are considered ephemeral to the sync state. They are always
|
||||
seen by its latest values and not cached by the syncer.
|
||||
|
||||
There are two order releations and some derivatives used to describe relations
|
||||
beween headers or blocks.
|
||||
between headers or blocks.
|
||||
|
||||
For blocks or headers A and B, A is said less or equal B if the (6)
|
||||
block numbers are less or equal. Notation: A <= B.
|
||||
|
||||
For blocks or headers A and B, A is said ancestor of, or equal to
|
||||
B if B is linked to A following up the lineage of parentHash fields
|
||||
of the block headers. Notation: A << B.
|
||||
The notation A ~ B stands for A <= B <= A which makes <= an order
|
||||
relation (relative to ~ rather than ==). If A ~ B does not hold
|
||||
then the notation A !~ B is used.
|
||||
|
||||
The relate notation A ~ B stands for A <= B <= A which is posh for
|
||||
saying that A and B have the same block numer.
|
||||
The notation A < B stands for A <= B and A !~ B.
|
||||
|
||||
The notation B-1 stands for any block or header with block number of
|
||||
B less one.
|
||||
|
||||
|
||||
For blocks or headers A and B, writing A <- B stands for the block
|
||||
A be parent of B (there can only be one parent of B.)
|
||||
|
||||
For blocks or headers A and B, A is said ancestor of, or equal to B
|
||||
if A == B or there is a non-empty parent lineage A <- X <- Y <-..<- B.
|
||||
Notation: A << B (note that << is an equivalence relation.)
|
||||
|
||||
The compact interval notation [A,B] stands for the set {X|A<<X<<B}
|
||||
and the half open interval notation stands for [A,B]-{A} (i.e. the
|
||||
interval without the left end point.)
|
||||
|
||||
|
||||
Note that *A<<B* implies *A<=B*. Boundary conditions that hold for the
|
||||
clause *(5)* diagram are
|
||||
|
||||
C ~ Y, C in [0,L], D in [Y,H] (7)
|
||||
there is a Z in [0,L] with C ~ Z, D is in [C,H] (7)
|
||||
|
||||
|
||||
### Sync Processing
|
||||
@ -134,64 +143,70 @@ parameters *C* and *D* are irrelevant here.
|
||||
Following, there will be a request to advance *H* to a new position as
|
||||
indicated in the diagram below
|
||||
|
||||
0 C (9)
|
||||
0 B (9)
|
||||
o------------o-------o
|
||||
| <--- imported ---> | D
|
||||
Y H
|
||||
C H
|
||||
o--------------------------------------o
|
||||
| <----------- unprocessed ----------> |
|
||||
|
||||
with a new sync state *(C,D,H)*. The parameter *C* in clause *(9)* is set
|
||||
as the **base** entity of the **FC** module. *Y* is only known by its block
|
||||
number, *Y~C*. The parameter *D* is set to the download start position *H*.
|
||||
with a new sync state *(C,H,H)*. The parameter *B* is the **base** entity
|
||||
of the **FC** module. The parameter *C* is a placeholder with *C ~ B*. The
|
||||
parameter *D* is set to the download start position *H*.
|
||||
|
||||
The syncer then fetches the header chain *(Y,H]* from the network. For the
|
||||
syncer state *(C,D,H)*, while iteratively fetching headers, only the parameter
|
||||
*D* will change each time a new header was fetched.
|
||||
The syncer then fetches the header chain *(C,H]* from the network. While
|
||||
iteratively fetching headers, the syncer state *(C,D,H)* will only change on
|
||||
its second position *D* time after a new header was fetched.
|
||||
|
||||
Having finished dowlnoading *(Y,H]* one might end up with a situation
|
||||
Having finished downloading then *C~D-1*. The sync state is *(D-1,D,H)*. One
|
||||
will end up with a situation like
|
||||
|
||||
0 B Z L (10)
|
||||
o-------------o--o---o
|
||||
0 Y L (10)
|
||||
o---------------o----o
|
||||
| <--- imported ---> |
|
||||
Y Z H
|
||||
o---o----------------------------------o
|
||||
C Z H
|
||||
o----o---------------------------------o
|
||||
| <-------------- linked ------------> |
|
||||
|
||||
where *Z* is in the intersection of *[B,L]\*(Y,H]* with *B* the current
|
||||
**base** entity of the **FC** logic. It is only known that *0<<B<<L*
|
||||
although in many cases *B==C* holds.
|
||||
for some *Y* in *[0,L]* and *Z* in *(C,H]* where *Y<<Z* with *L* the **latest**
|
||||
entity of the **FC** logic.
|
||||
|
||||
If there is no such *Z* then *(Y,H]* is discarded and sync processing restarts
|
||||
at clause *(8)* by resetting the sync state (e.g. to *(0,0,0)*.)
|
||||
If there are no such *Y* and *Z*, then *(C,H]* is discarded and sync processing
|
||||
restarts at clause *(8)* by resetting the sync state (e.g. to *(0,0,0)*.)
|
||||
|
||||
Otherwise assume *Z* is the one with the largest block number of the
|
||||
intersection *[B,L]\*(Y,H]*. Then the headers *(Z,H]* will be completed to
|
||||
a lineage of blocks by downloading block bodies.
|
||||
Otherwise choose *Y* and *Z* with maximal block number of *Y* so that *Y<-Z*.
|
||||
Then complete *(Y,H]==[Z,H]* to a lineage of blocks by downloading missing
|
||||
block bodies.
|
||||
|
||||
0 Z (11)
|
||||
o----------------o---o
|
||||
Having finished with block bodies, the sync state will be expressed as
|
||||
*(Y,Y,H)*. With the choice of the first two entries equal it is indicated that
|
||||
the lineage *(Y,H]* is fully populated with blocks.
|
||||
|
||||
0 Y (11)
|
||||
o---------------o----o
|
||||
| <--- imported ---> |
|
||||
Z H
|
||||
o----------------------------------o
|
||||
| <------------ blocks ----------> |
|
||||
Y H
|
||||
o-----------------------------------o
|
||||
| <------------ blocks -----------> |
|
||||
|
||||
The blocks *(Z,H]* will then be imported. While this happens, the internal
|
||||
state of the **FC** might change/reset so that further import becomes
|
||||
impossible. Even when starting import, the block *Z* might not be in *[0,L]*
|
||||
The blocks *(Y,H]* will then be imported and executed. While this happens, the
|
||||
internal state of the **FC** might change/reset so that further import becomes
|
||||
impossible. Even when starting import, the block *Y* might not be in *[0,L]*
|
||||
anymore due to some internal reset of the **FC** logic. In any of those
|
||||
cases, sync processing restarts at clause *(8)* by resetting the sync state.
|
||||
|
||||
Otherwise the block import will end up at
|
||||
In case all blocks can be imported, one will will end up at
|
||||
|
||||
0 Z H L (12)
|
||||
o----------------o----------------------------------o---o
|
||||
0 Y H L (12)
|
||||
o-----------------o---------------------------------o---o
|
||||
| <--- imported --------------------------------------> |
|
||||
|
||||
with *H<<L* for *L* the current value of the **latest** entity of the **FC**
|
||||
module. In many cases, *H==L* but there are other actors running that might
|
||||
import blocks quickly after importing *H* so that *H* is seen as ancestor,
|
||||
different from *L* when this stage is formally done with.
|
||||
module.
|
||||
|
||||
In many cases, *H==L* but there are other actors which also might import blocks
|
||||
quickly after finishing import of *H* before formally committing this task. So
|
||||
*H* can become ancestor of *L*.
|
||||
|
||||
Now clause *(12)* is equivalent to clause *(8)*.
|
||||
|
||||
@ -295,7 +310,6 @@ be available if *nimbus* is compiled with the additional make flags
|
||||
| beacon_latest | block height | **L**, *increasing* |
|
||||
| beacon_coupler | block height | **C**, *increasing* |
|
||||
| beacon_dangling | block height | **D** |
|
||||
| beacon_final | block height | **F**, *increasing* |
|
||||
| beacon_head | block height | **H**, *increasing* |
|
||||
| beacon_target | block height | **T**, *increasing* |
|
||||
| | | |
|
||||
|
@ -1,9 +1,3 @@
|
||||
## Update sync state management to what is described in *README.md*
|
||||
|
||||
1. For the moment, the events in *update.nim* need to be adjusted. This will fix an error where the CL forces the EL to fork internally by sending different head request headers with the same bock number.
|
||||
|
||||
2. General scenario update. This is mostly error handling.
|
||||
|
||||
## General TODO items
|
||||
|
||||
* Update/resolve code fragments which are tagged FIXME
|
||||
@ -25,3 +19,17 @@ which happened on several `holesky` tests immediately after loging somehing like
|
||||
or from another machine with literally the same exception text (but the stack-trace differs)
|
||||
|
||||
NTC 2024-10-31 21:58:07.616 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=9cbcc52953a8 baseNumber=2646857 baseHash=9db5c2ac537b
|
||||
|
||||
### 3. Mem overflow possible on small breasted systems
|
||||
|
||||
Running the exe client, a 1.5G response message was opbserved (on my 8G test system this kills the program as it has already 80% mem load. It happens while syncing holesky at around block #184160 and is reproducible on the 8G system but not yet on the an 80G system.)
|
||||
|
||||
[..]
|
||||
DBG 2024-11-20 16:16:18.871+00:00 Processing JSON-RPC request file=router.nim:135 id=178 name=eth_getLogs
|
||||
DBG 2024-11-20 16:16:18.915+00:00 Returning JSON-RPC response file=router.nim:137 id=178 name=eth_getLogs len=201631
|
||||
TRC 2024-11-20 16:16:18.951+00:00 <<< find_node from topics="eth p2p discovery" file=discovery.nim:248 node=Node[94.16.123.192:30303]
|
||||
TRC 2024-11-20 16:16:18.951+00:00 Neighbours to topics="eth p2p discovery" file=discovery.nim:161 node=Node[94.16.123.192:30303] nodes=[..]
|
||||
TRC 2024-11-20 16:16:18.951+00:00 Neighbours to topics="eth p2p discovery" file=discovery.nim:161 node=Node[94.16.123.192:30303] nodes=[..]
|
||||
DBG 2024-11-20 16:16:19.027+00:00 Received JSON-RPC request topics="JSONRPC-HTTP-SERVER" file=httpserver.nim:52 address=127.0.0.1:49746 len=239
|
||||
DBG 2024-11-20 16:16:19.027+00:00 Processing JSON-RPC request file=router.nim:135 id=179 name=eth_getLogs
|
||||
DBG 2024-11-20 16:20:23.664+00:00 Returning JSON-RPC response file=router.nim:137 id=179 name=eth_getLogs len=1630240149
|
||||
|
@ -54,7 +54,7 @@ proc napUnlessSomethingToFetch(
|
||||
|
||||
proc setup*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
## Global set up
|
||||
ctx.setupRpcMagic()
|
||||
ctx.setupRpcMagic info
|
||||
|
||||
# Load initial state from database if there is any
|
||||
ctx.setupDatabase info
|
||||
@ -109,12 +109,10 @@ proc runDaemon*(
|
||||
## first usable request from the CL (via RPC) stumbles in.
|
||||
##
|
||||
# Check for a possible header layout and body request changes
|
||||
ctx.updateSyncStateLayout info
|
||||
ctx.updateSyncState info
|
||||
if ctx.hibernate:
|
||||
return
|
||||
|
||||
ctx.updateBlockRequests info
|
||||
|
||||
# Execute staged block records.
|
||||
if ctx.blocksStagedCanImportOk():
|
||||
|
||||
|
@ -57,13 +57,13 @@ proc fetchAndCheck(
|
||||
blk.blocks.setLen(offset + ivReq.len)
|
||||
var blockHash = newSeq[Hash32](ivReq.len)
|
||||
for n in 1u ..< ivReq.len:
|
||||
let header = ctx.dbPeekHeader(ivReq.minPt + n).valueOr:
|
||||
let header = ctx.dbHeaderPeek(ivReq.minPt + n).valueOr:
|
||||
# There is nothing one can do here
|
||||
raiseAssert info & " stashed header missing: n=" & $n &
|
||||
" ivReq=" & $ivReq & " nth=" & (ivReq.minPt + n).bnStr
|
||||
blockHash[n - 1] = header.parentHash
|
||||
blk.blocks[offset + n].header = header
|
||||
blk.blocks[offset].header = ctx.dbPeekHeader(ivReq.minPt).valueOr:
|
||||
blk.blocks[offset].header = ctx.dbHeaderPeek(ivReq.minPt).valueOr:
|
||||
# There is nothing one can do here
|
||||
raiseAssert info & " stashed header missing: n=0" &
|
||||
" ivReq=" & $ivReq & " nth=" & ivReq.minPt.bnStr
|
||||
@ -325,7 +325,7 @@ proc blocksStagedImport*(
|
||||
|
||||
# Remove stashed headers for imported blocks
|
||||
for bn in iv.minPt .. maxImport:
|
||||
ctx.dbUnstashHeader bn
|
||||
ctx.dbHeaderUnstash bn
|
||||
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
@ -31,6 +31,10 @@ func blocksStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
|
||||
|
||||
# ----------------
|
||||
|
||||
func blocksStagedQueueClear*(ctx: BeaconCtxRef) =
|
||||
## Clear queue
|
||||
ctx.blk.staged.clear
|
||||
|
||||
func blocksStagedQueueInit*(ctx: BeaconCtxRef) =
|
||||
## Constructor
|
||||
ctx.blk.staged = StagedBlocksQueue.init()
|
||||
|
@ -83,11 +83,6 @@ proc blocksUnprocCovered*(ctx: BeaconCtxRef; pt: BlockNumber): bool =
|
||||
ctx.blk.unprocessed.covered(pt, pt) == 1
|
||||
|
||||
|
||||
proc blocksUnprocTop*(ctx: BeaconCtxRef): BlockNumber =
|
||||
let iv = ctx.blk.unprocessed.le().valueOr:
|
||||
return BlockNumber(0)
|
||||
iv.maxPt
|
||||
|
||||
proc blocksUnprocBottom*(ctx: BeaconCtxRef): BlockNumber =
|
||||
let iv = ctx.blk.unprocessed.ge().valueOr:
|
||||
return high(BlockNumber)
|
||||
@ -112,14 +107,14 @@ proc blocksUnprocInit*(ctx: BeaconCtxRef) =
|
||||
## Constructor
|
||||
ctx.blk.unprocessed = BnRangeSet.init()
|
||||
|
||||
proc blocksUnprocSet*(ctx: BeaconCtxRef) =
|
||||
proc blocksUnprocClear*(ctx: BeaconCtxRef) =
|
||||
## Clear
|
||||
ctx.blk.unprocessed.clear()
|
||||
ctx.blk.borrowed = 0u
|
||||
|
||||
proc blocksUnprocSet*(ctx: BeaconCtxRef; minPt, maxPt: BlockNumber) =
|
||||
## Set up new unprocessed range
|
||||
ctx.blocksUnprocSet()
|
||||
ctx.blocksUnprocClear()
|
||||
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
|
||||
if minPt <= maxPt:
|
||||
discard ctx.blk.unprocessed.merge(minPt, maxPt)
|
||||
|
@ -17,7 +17,7 @@ import
|
||||
pkg/results,
|
||||
"../../.."/[common, core/chain, db/storage_types],
|
||||
../worker_desc,
|
||||
"."/[blocks_unproc, headers_unproc]
|
||||
./headers_unproc
|
||||
|
||||
const
|
||||
LhcStateKey = 1.beaconStateKey
|
||||
@ -75,9 +75,6 @@ proc deleteStaleHeadersAndState(
|
||||
|
||||
proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Save chain layout to persistent db
|
||||
if ctx.layout == ctx.sst.lastLayout:
|
||||
return
|
||||
|
||||
let data = rlp.encode(ctx.layout)
|
||||
ctx.db.ctx.getKvt().put(LhcStateKey.toOpenArray, data).isOkOr:
|
||||
raiseAssert info & " put() failed: " & $$error
|
||||
@ -103,17 +100,16 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
# The base number is the least record of the FCU chains/tree. So the
|
||||
# finalised entry must not be smaller.
|
||||
ctx.chain.baseNumber() <= rc.value.final and
|
||||
|
||||
# If the latest FCU number is not larger than the head, there is nothing
|
||||
# to do (might also happen after a manual import.)
|
||||
latest < rc.value.head:
|
||||
latest < rc.value.head and
|
||||
|
||||
# Can only resume a header download. Blocks need to be set up from scratch.
|
||||
rc.value.lastState == collectingHeaders:
|
||||
|
||||
# Assign saved sync state
|
||||
ctx.sst.layout = rc.value
|
||||
ctx.sst.lastLayout = rc.value
|
||||
|
||||
# Add interval of unprocessed block range `(L,C]` from `README.md`
|
||||
ctx.blocksUnprocSet(latest+1, ctx.layout.coupler)
|
||||
ctx.blk.topRequest = ctx.layout.coupler
|
||||
|
||||
# Add interval of unprocessed header range `(C,D)` from `README.md`
|
||||
ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1)
|
||||
@ -125,28 +121,7 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
true
|
||||
|
||||
else:
|
||||
let
|
||||
latestHash = ctx.chain.latestHash()
|
||||
latestParent = ctx.chain.latestHeader.parentHash
|
||||
|
||||
ctx.sst.layout = SyncStateLayout(
|
||||
coupler: latest,
|
||||
couplerHash: latestHash,
|
||||
dangling: latest,
|
||||
danglingParent: latestParent,
|
||||
# There is no need to record a separate finalised head `F` as its only
|
||||
# use is to serve as second argument in `forkChoice()` when committing
|
||||
# a batch of imported blocks. Currently, there are no blocks to fetch
|
||||
# and import. The system must wait for instructions and update the fields
|
||||
# `final` and `head` while the latter will be increased so that import
|
||||
# can start.
|
||||
final: latest,
|
||||
finalHash: latestHash,
|
||||
head: latest,
|
||||
headHash: latestHash,
|
||||
headLocked: false)
|
||||
|
||||
ctx.sst.lastLayout = ctx.layout
|
||||
ctx.sst.layout = SyncStateLayout() # empty layout
|
||||
|
||||
if rc.isOk:
|
||||
# Some stored headers might have become stale, so delete them. Even
|
||||
@ -160,7 +135,7 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
# at the `head` and work backwards.
|
||||
ctx.deleteStaleHeadersAndState(rc.value.head, info)
|
||||
else:
|
||||
# Delete stale headers with block numbers starting at to `latest` wile
|
||||
# Delete stale headers with block numbers starting at to `latest` while
|
||||
# working backwards.
|
||||
ctx.deleteStaleHeadersAndState(latest, info)
|
||||
|
||||
@ -168,7 +143,11 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
|
||||
# ------------------
|
||||
|
||||
proc dbStashHeaders*(
|
||||
proc dbHeadersClear*(ctx: BeaconCtxRef) =
|
||||
## Clear stashed in-memory headers
|
||||
ctx.stash.clear
|
||||
|
||||
proc dbHeadersStash*(
|
||||
ctx: BeaconCtxRef;
|
||||
first: BlockNumber;
|
||||
revBlobs: openArray[seq[byte]];
|
||||
@ -199,7 +178,7 @@ proc dbStashHeaders*(
|
||||
kvt.put(key.toOpenArray, data).isOkOr:
|
||||
raiseAssert info & ": put() failed: " & $$error
|
||||
|
||||
proc dbPeekHeader*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] =
|
||||
proc dbHeaderPeek*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] =
|
||||
## Retrieve some stashed header.
|
||||
# Try cache first
|
||||
ctx.stash.withValue(num, val):
|
||||
@ -218,11 +197,11 @@ proc dbPeekHeader*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] =
|
||||
discard
|
||||
err()
|
||||
|
||||
proc dbPeekParentHash*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Hash32] =
|
||||
proc dbHeaderParentHash*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Hash32] =
|
||||
## Retrieve some stashed parent hash.
|
||||
ok (? ctx.dbPeekHeader num).parentHash
|
||||
ok (? ctx.dbHeaderPeek num).parentHash
|
||||
|
||||
proc dbUnstashHeader*(ctx: BeaconCtxRef; bn: BlockNumber) =
|
||||
proc dbHeaderUnstash*(ctx: BeaconCtxRef; bn: BlockNumber) =
|
||||
## Remove header from temporary DB list
|
||||
ctx.stash.withValue(bn, _):
|
||||
ctx.stash.del bn
|
||||
|
@ -19,7 +19,7 @@ import
|
||||
../worker_desc,
|
||||
./update/metrics,
|
||||
./headers_staged/[headers, linked_hchain],
|
||||
./headers_unproc
|
||||
"."/[headers_unproc, update]
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
@ -60,7 +60,7 @@ proc headerStagedUpdateTarget*(
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
peer = buddy.peer
|
||||
if not ctx.layout.headLocked and
|
||||
if ctx.layout.lastState == idleSyncState and
|
||||
ctx.target.final == 0 and
|
||||
ctx.target.finalHash != zeroHash32 and
|
||||
not ctx.target.locked:
|
||||
@ -78,23 +78,7 @@ proc headerStagedUpdateTarget*(
|
||||
trace info & ": finalised header hash mismatch", peer, hash,
|
||||
expected=ctx.target.finalHash
|
||||
else:
|
||||
let final = rc.value[0].number
|
||||
if final < ctx.chain.baseNumber():
|
||||
trace info & ": finalised number too low", peer,
|
||||
B=ctx.chain.baseNumber.bnStr, finalised=final.bnStr,
|
||||
delta=(ctx.chain.baseNumber - final)
|
||||
ctx.target.reset
|
||||
else:
|
||||
ctx.target.final = final
|
||||
|
||||
# Activate running (unless done yet)
|
||||
if ctx.hibernate:
|
||||
ctx.hibernate = false
|
||||
trace info & ": activated syncer", peer,
|
||||
finalised=final.bnStr, head=ctx.layout.head.bnStr
|
||||
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
ctx.updateFinalBlockHeader(rc.value[0], ctx.target.finalHash, info)
|
||||
|
||||
|
||||
proc headersStagedCollect*(
|
||||
@ -135,8 +119,9 @@ proc headersStagedCollect*(
|
||||
isOpportunistic = uTop + 1 < ctx.layout.dangling
|
||||
|
||||
# Parent hash for `lhc` below
|
||||
topLink = (if isOpportunistic: EMPTY_ROOT_HASH
|
||||
else: ctx.layout.danglingParent)
|
||||
topLink = if isOpportunistic: EMPTY_ROOT_HASH
|
||||
else: ctx.dbHeaderParentHash(ctx.layout.dangling).expect "Hash32"
|
||||
|
||||
var
|
||||
# This value is used for splitting the interval `iv` into
|
||||
# `[iv.minPt, somePt] + [somePt+1, ivTop] + already-collected` where the
|
||||
@ -210,7 +195,7 @@ proc headersStagedCollect*(
|
||||
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
|
||||
qItem.data = lhc[]
|
||||
|
||||
trace info & ": staged header list", peer,
|
||||
trace info & ": staged a list of headers", peer,
|
||||
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
|
||||
nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state
|
||||
|
||||
@ -244,7 +229,7 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
||||
if qItem.data.hash != ctx.layout.danglingParent:
|
||||
if qItem.data.hash != ctx.dbHeaderParentHash(dangling).expect "Hash32":
|
||||
# Discard wrong chain and merge back the range into the `unproc` list.
|
||||
ctx.headersUnprocCommit(0,iv)
|
||||
trace info & ": discarding staged header list", iv, D=dangling.bnStr,
|
||||
@ -252,14 +237,13 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
|
||||
break
|
||||
|
||||
# Store headers on database
|
||||
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs, info)
|
||||
ctx.dbHeadersStash(iv.minPt, qItem.data.revHdrs, info)
|
||||
ctx.layout.dangling = iv.minPt
|
||||
ctx.layout.danglingParent = qItem.data.parentHash
|
||||
ctx.dbStoreSyncStateLayout info
|
||||
|
||||
result += qItem.data.revHdrs.len # count headers
|
||||
|
||||
trace info & ": consecutive headers stashed",
|
||||
trace info & ": stashed consecutive headers",
|
||||
nListsLeft=ctx.hdr.staged.len, nStashed=result
|
||||
|
||||
if headersStagedQueueLengthLwm < ctx.hdr.staged.len:
|
||||
|
@ -31,6 +31,10 @@ func headersStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
|
||||
|
||||
# ----------------
|
||||
|
||||
func headersStagedQueueClear*(ctx: BeaconCtxRef) =
|
||||
## Clear queue
|
||||
ctx.hdr.staged.clear
|
||||
|
||||
func headersStagedQueueInit*(ctx: BeaconCtxRef) =
|
||||
## Constructor
|
||||
ctx.hdr.staged = LinkedHChainQueue.init()
|
||||
|
@ -112,14 +112,14 @@ proc headersUnprocInit*(ctx: BeaconCtxRef) =
|
||||
ctx.hdr.unprocessed = BnRangeSet.init()
|
||||
|
||||
|
||||
proc headersUnprocSet*(ctx: BeaconCtxRef) =
|
||||
proc headersUnprocClear*(ctx: BeaconCtxRef) =
|
||||
## Clear
|
||||
ctx.hdr.unprocessed.clear()
|
||||
ctx.hdr.borrowed = 0u
|
||||
|
||||
proc headersUnprocSet*(ctx: BeaconCtxRef; minPt, maxPt: BlockNumber) =
|
||||
## Set up new unprocessed range
|
||||
ctx.headersUnprocSet()
|
||||
ctx.headersUnprocClear()
|
||||
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
|
||||
if minPt <= maxPt:
|
||||
discard ctx.hdr.unprocessed.merge(minPt, maxPt)
|
||||
|
@ -17,7 +17,7 @@ import
|
||||
../worker_desc,
|
||||
./blocks_staged/staged_queue,
|
||||
./headers_staged/staged_queue,
|
||||
"."/[blocks_unproc, db, headers_unproc]
|
||||
"."/[blocks_unproc, db, headers_unproc, update]
|
||||
|
||||
when enableTicker:
|
||||
import ./start_stop/ticker
|
||||
@ -31,14 +31,12 @@ when enableTicker:
|
||||
## Legacy stuff, will be probably be superseded by `metrics`
|
||||
return proc: auto =
|
||||
TickerStats(
|
||||
stored: ctx.db.getSavedStateBlockNumber(),
|
||||
base: ctx.chain.baseNumber(),
|
||||
latest: ctx.chain.latestNumber(),
|
||||
coupler: ctx.layout.coupler,
|
||||
dangling: ctx.layout.dangling,
|
||||
final: ctx.layout.final,
|
||||
head: ctx.layout.head,
|
||||
headOk: ctx.layout.headLocked,
|
||||
headOk: ctx.layout.lastState != idleSyncState,
|
||||
target: ctx.target.consHead.number,
|
||||
targetOk: ctx.target.final != 0,
|
||||
|
||||
@ -50,7 +48,7 @@ when enableTicker:
|
||||
|
||||
nBlkStaged: ctx.blocksStagedQueueLen(),
|
||||
blkStagedBottom: ctx.blocksStagedQueueBottomKey(),
|
||||
blkUnprocTop: ctx.blk.topRequest,
|
||||
blkUnprocBottom: ctx.blocksUnprocBottom(),
|
||||
nBlkUnprocessed: ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed(),
|
||||
nBlkUnprocFragm: ctx.blocksUnprocChunks(),
|
||||
|
||||
@ -58,7 +56,10 @@ when enableTicker:
|
||||
nBuddies: ctx.pool.nBuddies)
|
||||
|
||||
|
||||
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB =
|
||||
proc updateBeaconHeaderCB(
|
||||
ctx: BeaconCtxRef;
|
||||
info: static[string];
|
||||
): ReqBeaconSyncTargetCB =
|
||||
## Update beacon header. This function is intended as a call back function
|
||||
## for the RPC module.
|
||||
return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} =
|
||||
@ -70,10 +71,22 @@ proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB =
|
||||
ctx.layout.head < h.number and # update is advancing
|
||||
ctx.target.consHead.number < h.number: # .. ditto
|
||||
|
||||
ctx.target.consHead = h
|
||||
ctx.target.final = BlockNumber(0)
|
||||
ctx.target.finalHash = f
|
||||
ctx.target.changed = true
|
||||
ctx.target.consHead = h
|
||||
ctx.target.finalHash = f
|
||||
ctx.target.changed = true
|
||||
|
||||
# Check whether `FC` knows about the finalised block already.
|
||||
#
|
||||
# On a full node, all blocks before the current state are stored on the
|
||||
# database which is also accessed by `FC`. So one can already decude here
|
||||
# whether `FC` id capable of handling that finalised block (the number of
|
||||
# must be at least the `base` from `FC`.)
|
||||
#
|
||||
# Otherwise the block header will need to be fetched from a peer when
|
||||
# available and checked there (see `headerStagedUpdateTarget()`.)
|
||||
#
|
||||
let finHdr = ctx.chain.headerByHash(f).valueOr: return
|
||||
ctx.updateFinalBlockHeader(finHdr, f, info)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
@ -128,9 +141,9 @@ proc setupDatabase*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
ctx.pool.blocksStagedQuLenMax = blocksStagedQueueLenMaxDefault
|
||||
|
||||
|
||||
proc setupRpcMagic*(ctx: BeaconCtxRef) =
|
||||
proc setupRpcMagic*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Helper for `setup()`: Enable external pivot update via RPC
|
||||
ctx.pool.chain.com.reqBeaconSyncTarget = ctx.updateBeaconHeaderCB
|
||||
ctx.pool.chain.com.reqBeaconSyncTarget = ctx.updateBeaconHeaderCB info
|
||||
|
||||
proc destroyRpcMagic*(ctx: BeaconCtxRef) =
|
||||
## Helper for `release()`
|
||||
|
@ -26,12 +26,10 @@ type
|
||||
|
||||
TickerStats* = object
|
||||
## Full sync state (see `TickerFullStatsUpdater`)
|
||||
stored*: BlockNumber
|
||||
base*: BlockNumber
|
||||
latest*: BlockNumber
|
||||
coupler*: BlockNumber
|
||||
dangling*: BlockNumber
|
||||
final*: BlockNumber
|
||||
head*: BlockNumber
|
||||
headOk*: bool
|
||||
target*: BlockNumber
|
||||
@ -43,7 +41,7 @@ type
|
||||
nHdrStaged*: int
|
||||
hdrStagedTop*: BlockNumber
|
||||
|
||||
blkUnprocTop*: BlockNumber
|
||||
blkUnprocBottom*: BlockNumber
|
||||
nBlkUnprocessed*: uint64
|
||||
nBlkUnprocFragm*: int
|
||||
nBlkStaged*: int
|
||||
@ -80,10 +78,8 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||
B = if data.base == data.latest: "L" else: data.base.bnStr
|
||||
L = if data.latest == data.coupler: "C" else: data.latest.bnStr
|
||||
C = if data.coupler == data.dangling: "D" else: data.coupler.bnStr
|
||||
D = if data.dangling == data.final: "F"
|
||||
elif data.dangling == data.head: "H"
|
||||
D = if data.dangling == data.head: "H"
|
||||
else: data.dangling.bnStr
|
||||
F = if data.final == data.head: "H" else: data.final.bnStr
|
||||
H = if data.headOk:
|
||||
if data.head == data.target: "T" else: data.head.bnStr
|
||||
else:
|
||||
@ -99,7 +95,7 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||
bS = if data.nBlkStaged == 0: "n/a"
|
||||
else: data.blkStagedBottom.bnStr & "(" & $data.nBlkStaged & ")"
|
||||
bU = if data.nBlkUnprocFragm == 0 and data.nBlkUnprocessed == 0: "n/a"
|
||||
else: data.blkUnprocTop.bnStr & "(" &
|
||||
else: data.blkUnprocBottom.bnStr & "(" &
|
||||
data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")"
|
||||
|
||||
rrg = data.reorg
|
||||
@ -112,13 +108,7 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||
t.lastStats = data
|
||||
t.visited = now
|
||||
|
||||
if data.stored == data.base:
|
||||
debug "Sync state", up, peers,
|
||||
B, L, C, D, F, H, T, hS, hU, bS, bU, rrg, mem
|
||||
else:
|
||||
debug "Sync state", up, peers,
|
||||
S=data.stored.bnStr,
|
||||
B, L, C, D, F, H, T, hS, hU, bS, bU, rrg, mem
|
||||
debug "Sync state", up, peers, B, L, C, D, H, T, hS, hU, bS, bU, rrg, mem
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: ticking log messages
|
||||
|
@ -17,6 +17,7 @@ import
|
||||
../../../core/chain,
|
||||
../worker_desc,
|
||||
./update/metrics,
|
||||
./blocks_staged/staged_queue,
|
||||
./headers_staged/staged_queue,
|
||||
"."/[blocks_unproc, db, headers_unproc, helpers]
|
||||
|
||||
@ -24,176 +25,325 @@ import
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) =
|
||||
func syncState(ctx: BeaconCtxRef; info: static[string]): SyncLayoutState =
|
||||
## Calculate `SyncLayoutState` from the download context
|
||||
|
||||
let
|
||||
b = ctx.chain.baseNumber()
|
||||
l = ctx.chain.latestNumber()
|
||||
c = ctx.layout.coupler
|
||||
d = ctx.layout.dangling
|
||||
h = ctx.layout.head
|
||||
|
||||
# See clause *(8)* in `README.md`:
|
||||
# ::
|
||||
# 0 H L
|
||||
# o---------------o----o
|
||||
# | <--- imported ---> |
|
||||
#
|
||||
# where `H << L` with `L` is the `latest` (aka cursor) parameter from
|
||||
# `FC` the logic will be updated to (see clause *(9)* in `README.md`):
|
||||
#
|
||||
if h <= c or h <= l: # empty interval `(C,H]` or nothing to do
|
||||
return idleSyncState
|
||||
|
||||
# See clauses *(9)* and *(10)* in `README.md`:
|
||||
# ::
|
||||
# 0 B
|
||||
# o---------------o----o
|
||||
# | <--- imported ---> |
|
||||
# C D H
|
||||
# o---------------------o----------------o
|
||||
# | <-- unprocessed --> | <-- linked --> |
|
||||
#
|
||||
# where *B* is the **base** entity of the `FC` module and `C` is sort of
|
||||
# a placehoder with block number equal to *B* at some earlier time (the
|
||||
# value *B* increases over time.)
|
||||
#
|
||||
# It is already known that `C < H` (see first check)
|
||||
#
|
||||
if c <= b: # check for `C <= B` as sketched above
|
||||
|
||||
# Case `C < D-1` => not ready yet
|
||||
if c + 1 < d:
|
||||
return collectingHeaders
|
||||
|
||||
# Case `C == D-1` => just finished the download
|
||||
if c + 1 == d:
|
||||
return finishedHeaders
|
||||
|
||||
# Case `C == D` => see below for general case
|
||||
|
||||
# Case `C == D` => set to import blocks (see *(10)* in `README.md`):
|
||||
# ::
|
||||
# 0 L
|
||||
# o--------------------o
|
||||
# | <--- imported ---> |
|
||||
# D
|
||||
# C H
|
||||
# o--------------------------------o
|
||||
# | <-- blocks to be completed --> |
|
||||
#
|
||||
# It is known already (see first check) that `L <`H`
|
||||
#
|
||||
if c == d:
|
||||
return processingBlocks
|
||||
|
||||
# Case `B < C` oops:
|
||||
# ::
|
||||
# 0 B
|
||||
# o---------------o----o
|
||||
# | <--- imported ---> |
|
||||
# C D H
|
||||
# o---------------------o----------------o
|
||||
# | <-- unprocessed --> | <-- linked --> |
|
||||
#
|
||||
trace info & ": inconsistent state",
|
||||
B=(if b == c: "C" else: b.bnStr),
|
||||
C=(if c == l: "L" else: c.bnStr),
|
||||
L=(if l == d: "D" else: l.bnStr),
|
||||
D=(if d == h: "H" else: d.bnStr),
|
||||
H=h.bnStr
|
||||
|
||||
idleSyncState
|
||||
|
||||
# ------------
|
||||
|
||||
proc startHibernating(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Clean up target bucket and await a new target.
|
||||
##
|
||||
## Layout (see (3) in README):
|
||||
ctx.sst.reset # => target.reset, layout.reset
|
||||
ctx.headersUnprocClear()
|
||||
ctx.blocksUnprocClear()
|
||||
ctx.headersStagedQueueClear()
|
||||
ctx.blocksStagedQueueClear()
|
||||
ctx.dbHeadersClear()
|
||||
|
||||
ctx.hibernate = true
|
||||
|
||||
trace info & ": hibernating, awaiting sync target",
|
||||
L=ctx.chain.latestNumber.bnStr
|
||||
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
||||
|
||||
proc setupCollectingHeaders(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Set up sync target (see clause *(9)* in `README.md`) by modifying
|
||||
## layout to:
|
||||
## ::
|
||||
## 0 C==D==H T
|
||||
## o----------------o---------------------o---->
|
||||
## | <-- linked --> |
|
||||
## 0 B
|
||||
## o------------o-------o
|
||||
## | <--- imported ---> | D
|
||||
## C H
|
||||
## o-----------------------------------o
|
||||
## | <--------- unprocessed ---------> |
|
||||
##
|
||||
## or
|
||||
## where *B* is the **base** entity of the `FC` module and `C ~ B`. The
|
||||
## parameter `H` is set to the new sync head target `T`.
|
||||
##
|
||||
let
|
||||
c = ctx.chain.baseNumber()
|
||||
h = ctx.target.consHead.number
|
||||
|
||||
if c+1 < h: # header chain interval is `(C,H]`
|
||||
doAssert ctx.headersUnprocTotal() == 0
|
||||
doAssert ctx.headersUnprocBorrowed() == 0
|
||||
doAssert ctx.headersStagedQueueIsEmpty()
|
||||
doAssert ctx.blocksUnprocTotal() == 0
|
||||
doAssert ctx.blocksUnprocBorrowed() == 0
|
||||
doAssert ctx.blocksStagedQueueIsEmpty()
|
||||
|
||||
ctx.sst.layout = SyncStateLayout(
|
||||
coupler: c,
|
||||
dangling: h,
|
||||
final: ctx.target.final,
|
||||
finalHash: ctx.target.finalHash,
|
||||
head: h,
|
||||
lastState: collectingHeaders) # state transition
|
||||
|
||||
# Save this header on the database so it needs not be fetched again from
|
||||
# somewhere else.
|
||||
ctx.dbHeadersStash(h, @[rlp.encode(ctx.target.consHead)], info)
|
||||
|
||||
# Save state
|
||||
ctx.dbStoreSyncStateLayout info
|
||||
|
||||
# Update range
|
||||
ctx.headersUnprocSet(c+1, h-1)
|
||||
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
||||
# Mark target used, reset for re-fill
|
||||
ctx.target.changed = false
|
||||
|
||||
trace info & ": new header target", C=c.bnStr, D="H", H="T", T=h.bnStr
|
||||
|
||||
|
||||
proc linkIntoFc(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
## Link `(C,H]` into the `FC` logic. If successful, `true` is returned.
|
||||
## Otherwise the chain `(C,H]` must be discarded.
|
||||
##
|
||||
## Condider the following layout (see clause *(10)* in `README.md`):
|
||||
## ::
|
||||
## 0==T C==D==H
|
||||
## o----------------o-------------------------->
|
||||
## | <-- linked --> |
|
||||
## 0 B Y L
|
||||
## o-------------o--o----o
|
||||
## | <--- imported ----> |
|
||||
## C Z H
|
||||
## o----o--------------------------------o
|
||||
## | <------------- linked ------------> |
|
||||
##
|
||||
## with `T == target.consHead.number` or `T == 0`
|
||||
## for some `Y` in `[B,L]` and `Z` in `(C,H]` where `Y<-Z` with `L` the
|
||||
## `latest` and `B` the `base` entity of the `FC` logic.
|
||||
##
|
||||
## to be updated to
|
||||
## If there are such `Y <- Z`, then update the sync state to (see chause
|
||||
## *(11)* in `README.md`):
|
||||
## ::
|
||||
## 0 C==D D'==H'
|
||||
## o----------------o---------------------o---->
|
||||
## | <-- linked --> | <-- unprocessed --> |
|
||||
## 0 Y
|
||||
## o----------------o----o
|
||||
## | <--- imported ----> |
|
||||
## D
|
||||
## C Z H
|
||||
## o-o------------------------------o
|
||||
## | <-- blocks to be completed --> |
|
||||
##
|
||||
var target = ctx.target.consHead.number
|
||||
## where `C==Y`, `(C,H]==[Z,H]`, `C<-Z`
|
||||
##
|
||||
## Otherwise, if *Z* does not exists then reset to idle state.
|
||||
##
|
||||
let
|
||||
b = ctx.chain.baseNumber()
|
||||
l = ctx.chain.latestNumber()
|
||||
c = ctx.layout.coupler
|
||||
h = ctx.layout.head
|
||||
|
||||
# Need: `H < T` and `C == D`
|
||||
if target != 0 and target <= ctx.layout.head: # violates `H < T`
|
||||
trace info & ": update not applicable",
|
||||
H=ctx.layout.head.bnStr, T=target.bnStr
|
||||
return
|
||||
if l < h:
|
||||
# Try to find a parent in the `FC` data domain. For practical reasons the
|
||||
# loop does not go further back than the base `B`. Blocks below/older than
|
||||
# that will not be handled by the `FC`.
|
||||
for bn in (l+1).countdown(max(b,c)):
|
||||
|
||||
if ctx.layout.coupler != ctx.layout.dangling: # violates `C == D`
|
||||
trace info & ": update not applicable",
|
||||
C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr
|
||||
return
|
||||
# The syncer cache holds headers for `(C,H]`. It starts with checking
|
||||
# whether `L<-Z` holds (i.e. `Y==L` can be chosen.)
|
||||
let
|
||||
yHash = ctx.dbHeaderParentHash(bn).expect "Hash32" # maybe `Y`
|
||||
yHdr = ctx.chain.headerByHash(yHash).valueOr: continue # test for `Y`
|
||||
yNum = yHdr.number # == bn-1
|
||||
|
||||
# Check consistency: `C == D <= H` for maximal `C` => `D == H`
|
||||
doAssert ctx.layout.dangling == ctx.layout.head
|
||||
ctx.layout.coupler = yNum # parent of `Z`
|
||||
ctx.layout.dangling = yNum # .. ditto
|
||||
|
||||
let rlpHeader = rlp.encode(ctx.target.consHead)
|
||||
trace info & ": linked into FC", B=b.bnStr,
|
||||
C=(if yNum==l: "L" else: yNum.bnStr), L=l.bnStr, H=h.bnStr
|
||||
|
||||
ctx.sst.layout = SyncStateLayout(
|
||||
coupler: ctx.layout.coupler,
|
||||
couplerHash: ctx.layout.couplerHash,
|
||||
dangling: target,
|
||||
danglingParent: ctx.target.consHead.parentHash,
|
||||
final: ctx.target.final,
|
||||
finalHash: ctx.target.finalHash,
|
||||
head: target,
|
||||
headHash: rlpHeader.keccak256,
|
||||
headLocked: true)
|
||||
# Save layout state
|
||||
ctx.dbStoreSyncStateLayout info
|
||||
|
||||
# Save this header on the database so it needs not be fetched again from
|
||||
# somewhere else.
|
||||
ctx.dbStashHeaders(target, @[rlpHeader], info)
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
return true
|
||||
|
||||
# Save state
|
||||
ctx.dbStoreSyncStateLayout info
|
||||
trace info & ": cannot link into FC", B=b.bnStr, L=l.bnStr,
|
||||
C=c.bnStr, H=h.bnStr
|
||||
false
|
||||
|
||||
# Update range
|
||||
|
||||
proc setupProcessingBlocks(ctx: BeaconCtxRef; info: static[string]) =
|
||||
doAssert ctx.headersUnprocTotal() == 0
|
||||
doAssert ctx.headersUnprocBorrowed() == 0
|
||||
doAssert ctx.headersStagedQueueIsEmpty()
|
||||
ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1)
|
||||
|
||||
trace info & ": updated sync state/new target", C=ctx.layout.coupler.bnStr,
|
||||
uTop=ctx.headersUnprocTop(),
|
||||
D=ctx.layout.dangling.bnStr, H=ctx.layout.head.bnStr, T=target.bnStr
|
||||
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
||||
|
||||
proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Merge if `C+1` == `D`
|
||||
##
|
||||
if ctx.layout.coupler+1 < ctx.layout.dangling or # gap btw. `C` & `D`
|
||||
ctx.layout.coupler == ctx.layout.dangling: # merged already
|
||||
return
|
||||
|
||||
# No overlap allowed!
|
||||
doAssert ctx.layout.coupler+1 == ctx.layout.dangling
|
||||
|
||||
# Verify adjacent chains
|
||||
if ctx.layout.couplerHash != ctx.layout.danglingParent:
|
||||
# FIXME: Oops -- any better idea than to defect?
|
||||
raiseAssert info & ": header chains C-D joining hashes do not match" &
|
||||
" L=" & ctx.chain.latestNumber().bnStr &
|
||||
" lHash=" & ctx.chain.latestHash.short &
|
||||
" C=" & ctx.layout.coupler.bnStr &
|
||||
" cHash=" & ctx.layout.couplerHash.short &
|
||||
" D=" & $ctx.layout.dangling.bnStr &
|
||||
" dParent=" & ctx.layout.danglingParent.short
|
||||
|
||||
trace info & ": merging adjacent header chains", C=ctx.layout.coupler.bnStr,
|
||||
D=ctx.layout.dangling.bnStr
|
||||
|
||||
# Merge adjacent linked chains
|
||||
ctx.sst.layout = SyncStateLayout(
|
||||
coupler: ctx.layout.head, # `C`
|
||||
couplerHash: ctx.layout.headHash,
|
||||
dangling: ctx.layout.head, # `D`
|
||||
danglingParent: ctx.dbPeekParentHash(ctx.layout.head).expect "Hash32",
|
||||
final: ctx.layout.final, # `F`
|
||||
finalHash: ctx.layout.finalHash,
|
||||
head: ctx.layout.head, # `H`
|
||||
headHash: ctx.layout.headHash,
|
||||
headLocked: ctx.layout.headLocked)
|
||||
|
||||
# Save state
|
||||
ctx.dbStoreSyncStateLayout info
|
||||
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
||||
|
||||
proc updateTargetReached(ctx: BeaconCtxRef; info: static[string]) =
|
||||
# Open up layout for update
|
||||
ctx.layout.headLocked = false
|
||||
|
||||
# Clean up target bucket and await a new target.
|
||||
ctx.target.reset
|
||||
ctx.hibernate = true
|
||||
doAssert ctx.blocksUnprocTotal() == 0
|
||||
doAssert ctx.blocksUnprocBorrowed() == 0
|
||||
doAssert ctx.blocksStagedQueueIsEmpty()
|
||||
|
||||
let
|
||||
latest {.used.} = ctx.chain.latestNumber()
|
||||
head {.used.} = ctx.layout.head
|
||||
trace info & ": hibernating, awaiting new sync target",
|
||||
L=(if head == latest: "H" else: latest.bnStr), H=head.bnStr
|
||||
c = ctx.layout.coupler
|
||||
h = ctx.layout.head
|
||||
|
||||
# Update blocks `(C,H]`
|
||||
ctx.blocksUnprocCommit(0, c+1, h)
|
||||
|
||||
# State transition
|
||||
ctx.layout.lastState = processingBlocks
|
||||
|
||||
trace info & ": collecting block bodies", iv=BnRange.new(c+1, h)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Update layout
|
||||
proc updateSyncState*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Update internal state when needed
|
||||
let
|
||||
prevState = ctx.layout.lastState # previous state
|
||||
thisState = ctx.syncState info # currently observed state
|
||||
|
||||
# Check whether the target has been reached. In that case, unlock the
|
||||
# consensus head `H` from the current layout so that it can be updated
|
||||
# in time.
|
||||
if ctx.layout.headLocked and # there is an active session
|
||||
ctx.layout.head <= ctx.chain.latestNumber(): # and target has been reached
|
||||
# Note that `latest` might exceed the `head`. This will happen when the
|
||||
# engine API got some request to execute and import subsequent blocks.
|
||||
ctx.updateTargetReached info
|
||||
if thisState == prevState:
|
||||
# Check whether the system has been idle and a new header download
|
||||
# session can be set up
|
||||
if prevState == idleSyncState and
|
||||
ctx.target.changed and # and there is a new target from CL
|
||||
ctx.target.final != 0: # .. ditto
|
||||
ctx.setupCollectingHeaders info # set up new header sync
|
||||
return
|
||||
# Notreached
|
||||
|
||||
# Check whether there is something to do regarding beacon node change
|
||||
if not ctx.layout.headLocked and # there was an active import request
|
||||
ctx.target.changed and # and there is a new target from CL
|
||||
ctx.target.final != 0: # .. ditto
|
||||
ctx.target.changed = false
|
||||
ctx.updateTargetChange info
|
||||
trace info & ": sync state changed", prevState, thisState,
|
||||
L=ctx.chain.latestNumber.bnStr,
|
||||
C=(if ctx.layout.coupler == ctx.layout.dangling: "D"
|
||||
else: ctx.layout.coupler.bnStr),
|
||||
D=(if ctx.layout.dangling == ctx.layout.head: "H"
|
||||
else: ctx.layout.dangling.bnStr),
|
||||
H=ctx.layout.head.bnStr
|
||||
|
||||
# Check whether header downloading is done
|
||||
ctx.mergeAdjacentChains info
|
||||
# So there is a states transition. The only relevant transition here
|
||||
# is `collectingHeaders -> finishedHeaders` which will be continued
|
||||
# as `finishedHeaders -> processingBlocks`.
|
||||
#
|
||||
if prevState == collectingHeaders and
|
||||
thisState == finishedHeaders and
|
||||
ctx.linkIntoFc(info): # commit downloading headers
|
||||
ctx.setupProcessingBlocks info # start downloading block bodies
|
||||
trace info & ": sync state changed",
|
||||
prevState=thisState, thisState=ctx.syncState(info)
|
||||
return
|
||||
# Notreached
|
||||
|
||||
# Final sync target reached or inconsistent/impossible state
|
||||
ctx.startHibernating info
|
||||
|
||||
|
||||
proc updateBlockRequests*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
## Update block requests if there staged block queue is empty
|
||||
let latest = ctx.chain.latestNumber()
|
||||
if latest < ctx.layout.coupler: # so half open interval `(L,C]` is not empty
|
||||
proc updateFinalBlockHeader*(
|
||||
ctx: BeaconCtxRef;
|
||||
finHdr: Header;
|
||||
finHash: Hash32;
|
||||
info: static[string];
|
||||
) =
|
||||
## Update the finalised header cache. If the finalised header is acceptable,
|
||||
## the syncer will be activated from hibernation if necessary.
|
||||
##
|
||||
let
|
||||
b = ctx.chain.baseNumber()
|
||||
f = finHdr.number
|
||||
if f < b:
|
||||
trace info & ": finalised number too low",
|
||||
B=b.bnStr, finalised=f.bnStr, delta=(b - f)
|
||||
|
||||
# One can fill/import/execute blocks by number from `(L,C]`
|
||||
if ctx.blk.topRequest < ctx.layout.coupler:
|
||||
# So there is some space
|
||||
trace info & ": updating block requests", L=latest.bnStr,
|
||||
topReq=ctx.blk.topRequest.bnStr, C=ctx.layout.coupler.bnStr
|
||||
ctx.target.reset
|
||||
|
||||
ctx.blocksUnprocCommit(
|
||||
0, max(latest, ctx.blk.topRequest) + 1, ctx.layout.coupler)
|
||||
ctx.blk.topRequest = ctx.layout.coupler
|
||||
else:
|
||||
ctx.target.final = f
|
||||
ctx.target.finalHash = finHash
|
||||
|
||||
# Activate running (unless done yet)
|
||||
if ctx.hibernate:
|
||||
ctx.hibernate = false
|
||||
trace info & ": activated syncer",
|
||||
finalised=f.bnStr, head=ctx.target.consHead.bnStr
|
||||
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -30,9 +30,6 @@ declareGauge beacon_coupler, "" &
|
||||
declareGauge beacon_dangling, "" &
|
||||
"Starting/min block number for higher up headers chain"
|
||||
|
||||
declareGauge beacon_final, "" &
|
||||
"Max number of finalised block in higher up headers chain"
|
||||
|
||||
declareGauge beacon_head, "" &
|
||||
"Ending/max block number of higher up headers chain"
|
||||
|
||||
@ -62,7 +59,6 @@ template updateMetricsImpl(ctx: BeaconCtxRef) =
|
||||
metrics.set(beacon_latest, ctx.chain.latestNumber().int64)
|
||||
metrics.set(beacon_coupler, ctx.layout.coupler.int64)
|
||||
metrics.set(beacon_dangling, ctx.layout.dangling.int64)
|
||||
metrics.set(beacon_final, ctx.layout.final.int64)
|
||||
metrics.set(beacon_head, ctx.layout.head.int64)
|
||||
metrics.set(beacon_target, ctx.target.consHead.number.int64)
|
||||
|
||||
|
@ -64,6 +64,12 @@ type
|
||||
|
||||
# -------------------
|
||||
|
||||
SyncLayoutState* = enum
|
||||
idleSyncState = 0 ## see clause *(8)*, *(12)* of `README.md`
|
||||
collectingHeaders ## see clauses *(5)*, *(9)* of `README.md`
|
||||
finishedHeaders ## see clause *(10)* of `README.md`
|
||||
processingBlocks ## see clause *(11)* of `README.md`
|
||||
|
||||
SyncStateTarget* = object
|
||||
## Beacon state to be implicitely updated by RPC method
|
||||
locked*: bool ## Don't update while fetching header
|
||||
@ -74,36 +80,33 @@ type
|
||||
|
||||
SyncStateLayout* = object
|
||||
## Layout of a linked header chains defined by the triple `(C,D,H)` as
|
||||
## described in the `README.md` text.
|
||||
## described in clause *(5)* of the `README.md` text.
|
||||
## ::
|
||||
## 0 B L C D F H
|
||||
## o----------o-----o-------o---------------------o------------o---o--->
|
||||
## | <- imported -> | | | |
|
||||
## | <------ linked ------> | <-- unprocessed --> | <-- linked --> |
|
||||
## 0 B L
|
||||
## o---------o----------o
|
||||
## | <--- imported ---> |
|
||||
## C D H
|
||||
## o---------------------o----------------o
|
||||
## | <-- unprocessed --> | <-- linked --> |
|
||||
##
|
||||
## Additional positions known but not declared in this descriptor:
|
||||
## * `B`: base state (from `forked_chain` importer)
|
||||
## * `L`: last imported block, canonical consensus head
|
||||
## * `F`: finalised head (from CL)
|
||||
## * `B`: `base` parameter from `FC` logic
|
||||
## * `L`: `latest` (aka cursor) parameter from `FC` logic
|
||||
##
|
||||
coupler*: BlockNumber ## Right end `C` of linked chain `[0,C]`
|
||||
couplerHash*: Hash32 ## Hash of `C`
|
||||
|
||||
coupler*: BlockNumber ## Bottom end `C` of full chain `(C,H]`
|
||||
dangling*: BlockNumber ## Left end `D` of linked chain `[D,H]`
|
||||
danglingParent*: Hash32 ## Parent hash of `D`
|
||||
head*: BlockNumber ## `H`, block num of some finalised block
|
||||
lastState*: SyncLayoutState ## Last known layout state
|
||||
|
||||
# Legacy entries, will be removed some time. This is currently needed
|
||||
# for importing blocks into `FC` the support of which will be deprecated.
|
||||
final*: BlockNumber ## Finalised block number `F`
|
||||
finalHash*: Hash32 ## Hash of `F`
|
||||
|
||||
head*: BlockNumber ## `H`, block num of some finalised block
|
||||
headHash*: Hash32 ## Hash of `H`
|
||||
headLocked*: bool ## No need to update `H` yet
|
||||
|
||||
SyncState* = object
|
||||
## Sync state for header and block chains
|
||||
target*: SyncStateTarget ## Consensus head, see `T` in `README.md`
|
||||
layout*: SyncStateLayout ## Current header chains layout
|
||||
lastLayout*: SyncStateLayout ## Previous layout (for delta update)
|
||||
|
||||
# -------------------
|
||||
|
||||
@ -117,7 +120,6 @@ type
|
||||
## Block sync staging area
|
||||
unprocessed*: BnRangeSet ## Blocks download requested
|
||||
borrowed*: uint64 ## Total of temp. fetched ranges
|
||||
topRequest*: BlockNumber ## Max requested block number
|
||||
staged*: StagedBlocksQueue ## Blocks ready for import
|
||||
|
||||
# -------------------
|
||||
|
@ -161,7 +161,7 @@ proc key(peer: Peer): ENode =
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc terminate[S,W](dsc: RunnerSyncRef[S,W]) =
|
||||
## Reqest termination and wait
|
||||
## Request termination and wait for sub-tasks to finish
|
||||
mixin runRelease
|
||||
|
||||
if dsc.runCtrl == running:
|
||||
@ -175,15 +175,20 @@ proc terminate[S,W](dsc: RunnerSyncRef[S,W]) =
|
||||
if w.data.isRunning:
|
||||
w.data.worker.ctrl.stopped = true
|
||||
# Activate async job so it can finish
|
||||
try: waitFor sleepAsync termWaitPollingTime
|
||||
except CancelledError: discard
|
||||
try:
|
||||
waitFor sleepAsync termWaitPollingTime
|
||||
except CancelledError:
|
||||
trace "Shutdown: peer timeout was cancelled",
|
||||
peer=w.data.worker.peer, nWorkers=dsc.buddies.len
|
||||
else:
|
||||
dsc.buddies.del w.key # this is OK to delete
|
||||
|
||||
while dsc.daemonRunning:
|
||||
# Activate async job so it can finish
|
||||
try: waitFor sleepAsync termWaitPollingTime
|
||||
except CancelledError: discard
|
||||
try:
|
||||
waitFor sleepAsync termWaitPollingTime
|
||||
except CancelledError:
|
||||
trace "Shutdown: daemon timeout was cancelled", nWorkers=dsc.buddies.len
|
||||
|
||||
# Final shutdown
|
||||
dsc.ctx.runRelease()
|
||||
@ -223,7 +228,11 @@ proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async: (raises: []).} =
|
||||
try:
|
||||
await sleepAsync suspend
|
||||
except CancelledError:
|
||||
break # stop on error (must not end up in busy-loop)
|
||||
# Stop on error (must not end up in busy-loop). If the activation flag
|
||||
# `dsc.ctx.daemon` remains `true`, the deamon will be re-started from
|
||||
# the worker loop in due time.
|
||||
trace "Deamon loop timeout was cancelled", nWorkers=dsc.buddies.len
|
||||
break
|
||||
# End while
|
||||
|
||||
dsc.daemonRunning = false
|
||||
@ -327,6 +336,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async: (raises: []).} =
|
||||
try:
|
||||
await sleepAsync suspend
|
||||
except CancelledError:
|
||||
trace "Peer loop timeout was cancelled", peer, nWorkers=dsc.buddies.len
|
||||
break # stop on error (must not end up in busy-loop)
|
||||
# End while
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user