Beacon sync align internal names and docu update (#2690)
* Rename `base` -> `coupler`, `B` -> `C` why: Glossary: The jargon `base` is used for the `base state` block number which can be smaller than what is now the `coupler`. * Rename `global state` -> `base`, `T` -> `B` why: See glossary * Rename `final` -> `end`, `F` -> `E` why: See glossary. Previously, `final` denoted some finalised block but not `the finalised` block from the glossary (which is maximal.) * Properly name finalised block as such, rename `Z` -> `F` why: See glossary * Rename `least` -> `dangling`, `L` -> `D` * Metrics update (variables not covered yet) * Docu update and corrections * Logger updates * Remove obsolete `skeleton*Key` kvt columns from `storage_types` module
This commit is contained in:
parent
0b81a8c783
commit
d6eb8c36f5
|
@ -24,12 +24,8 @@ type
|
||||||
transitionStatus = 7
|
transitionStatus = 7
|
||||||
safeHash = 8
|
safeHash = 8
|
||||||
finalizedHash = 9
|
finalizedHash = 9
|
||||||
skeletonProgress = 10
|
beaconState = 10
|
||||||
skeletonBlockHashToNumber = 11
|
beaconHeader = 11
|
||||||
skeletonHeader = 12
|
|
||||||
skeletonBody = 13
|
|
||||||
beaconState = 14
|
|
||||||
beaconHeader = 15
|
|
||||||
|
|
||||||
DbKey* = object
|
DbKey* = object
|
||||||
# The first byte stores the key type. The rest are key-specific values
|
# The first byte stores the key type. The rest are key-specific values
|
||||||
|
@ -87,26 +83,6 @@ func finalizedHashKey*(): DbKey {.inline.} =
|
||||||
result.data[0] = byte ord(finalizedHash)
|
result.data[0] = byte ord(finalizedHash)
|
||||||
result.dataEndPos = uint8 1
|
result.dataEndPos = uint8 1
|
||||||
|
|
||||||
func skeletonProgressKey*(): DbKey {.inline.} =
|
|
||||||
result.data[0] = byte ord(skeletonProgress)
|
|
||||||
result.dataEndPos = 1
|
|
||||||
|
|
||||||
func skeletonBlockHashToNumberKey*(h: Hash32): DbKey {.inline.} =
|
|
||||||
result.data[0] = byte ord(skeletonBlockHashToNumber)
|
|
||||||
result.data[1 .. 32] = h.data
|
|
||||||
result.dataEndPos = uint8 32
|
|
||||||
|
|
||||||
func skeletonHeaderKey*(u: BlockNumber): DbKey {.inline.} =
|
|
||||||
result.data[0] = byte ord(skeletonHeader)
|
|
||||||
doAssert sizeof(u) <= 32
|
|
||||||
copyMem(addr result.data[1], unsafeAddr u, sizeof(u))
|
|
||||||
result.dataEndPos = uint8 sizeof(u)
|
|
||||||
|
|
||||||
func skeletonBodyKey*(h: Hash32): DbKey {.inline.} =
|
|
||||||
result.data[0] = byte ord(skeletonBody)
|
|
||||||
result.data[1 .. 32] = h.data
|
|
||||||
result.dataEndPos = uint8 32
|
|
||||||
|
|
||||||
func hashIndexKey*(hash: Hash32, index: uint16): HashIndexKey =
|
func hashIndexKey*(hash: Hash32, index: uint16): HashIndexKey =
|
||||||
result[0..31] = hash.data
|
result[0..31] = hash.data
|
||||||
result[32] = byte(index and 0xFF)
|
result[32] = byte(index and 0xFF)
|
||||||
|
|
|
@ -1,11 +1,34 @@
|
||||||
Syncing
|
Beacon Sync
|
||||||
=======
|
===========
|
||||||
|
|
||||||
Syncing blocks is performed in two partially overlapping phases
|
According to the merge-first
|
||||||
|
[glossary](https://notes.status.im/nimbus-merge-first-el?both=#Glossary),
|
||||||
|
a beacon sync is a "*Sync method that relies on devp2p and eth/6x to fetch
|
||||||
|
headers and bodies backwards then apply these in the forward direction to the
|
||||||
|
head state*".
|
||||||
|
|
||||||
|
This [glossary](https://notes.status.im/nimbus-merge-first-el?both=#Glossary)
|
||||||
|
is used as a naming template for relevant entities described here. When
|
||||||
|
referred to, names from the glossary are printed **bold**.
|
||||||
|
|
||||||
|
Syncing blocks is performed in two overlapping phases
|
||||||
|
|
||||||
|
* loading header chains and stashing them into a separate database table,
|
||||||
|
* removing headers from the stashed headers chain, fetching the block bodies
|
||||||
|
the headers refer to and importing/executing them via `persistentBlocks()`.
|
||||||
|
|
||||||
|
So this beacon syncer slightly differs from the definition in the
|
||||||
|
[glossary](https://notes.status.im/nimbus-merge-first-el?both=#Glossary) in
|
||||||
|
that only headers are stashed on the database table and the block bodies are
|
||||||
|
fetched in the *forward* direction.
|
||||||
|
|
||||||
|
The reason for that behavioural change is that the block bodies are addressed
|
||||||
|
by the hash of the block headers for fetching. They cannot be fully verified
|
||||||
|
upon arrival on the cheap (e.g. by a payload hash.) They will be validated not
|
||||||
|
before imported/executed. So potentially corrupt blocks will be discarded.
|
||||||
|
They will automatically be re-fetched with other missing blocks in the
|
||||||
|
*forward* direction.
|
||||||
|
|
||||||
* loading the header chains into separate database tables
|
|
||||||
* removing headers from the headers chain, fetching the rest of the
|
|
||||||
block the header belongs to and executing it
|
|
||||||
|
|
||||||
Header chains
|
Header chains
|
||||||
-------------
|
-------------
|
||||||
|
@ -15,121 +38,130 @@ The header chains are the triple of
|
||||||
* a consecutively linked chain of headers starting starting at Genesis
|
* a consecutively linked chain of headers starting starting at Genesis
|
||||||
* followed by a sequence of missing headers
|
* followed by a sequence of missing headers
|
||||||
* followed by a consecutively linked chain of headers ending up at a
|
* followed by a consecutively linked chain of headers ending up at a
|
||||||
finalised block header received from the consensus layer
|
finalised block header (earlier received from the consensus layer)
|
||||||
|
|
||||||
A sequence *@[h(1),h(2),..]* of block headers is called a consecutively
|
A sequence *@[h(1),h(2),..]* of block headers is called a *linked chain* if
|
||||||
linked chain if
|
|
||||||
|
|
||||||
* block numbers join without gaps, i.e. *h(n).number+1 == h(n+1).number*
|
* block numbers join without gaps, i.e. *h(n).number+1 == h(n+1).number*
|
||||||
* parent hashes match, i.e. *h(n).hash == h(n+1).parentHash*
|
* parent hashes match, i.e. *h(n).hash == h(n+1).parentHash*
|
||||||
|
|
||||||
General header chains layout diagram
|
General header linked chains layout diagram
|
||||||
|
|
||||||
G B L F (1)
|
0 C D E (1)
|
||||||
o----------------o---------------------o----------------o--->
|
o----------------o---------------------o----------------o--->
|
||||||
| <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
| <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
||||||
|
|
||||||
Here, the single upper letter symbols *G*, *B*, *L*, *F* denote block numbers.
|
Here, the single upper letter symbols *0*, *C*, *D*, *E* denote block numbers.
|
||||||
For convenience, these letters are also identified with its associated block
|
For convenience, these letters are also identified with its associated block
|
||||||
header or the full block. Saying *"the header G"* is short for *"the header
|
header or the full blocks. Saying *"the header 0"* is short for *"the header
|
||||||
with block number G"*.
|
with block number 0"*.
|
||||||
|
|
||||||
Meaning of *G*, *B*, *L*, *F*:
|
Meaning of *0*, *C*, *D*, *E*:
|
||||||
|
|
||||||
* *G* -- Genesis block number *#0*
|
* *0* -- Genesis, block number number *0*
|
||||||
* *B* -- base, maximal block number of linked chain starting at *G*
|
* *C* -- coupler, maximal block number of linked chain starting at *0*
|
||||||
* *L* -- least, minimal block number of linked chain ending at *F* with *B <= L*
|
* *D* -- dangling, minimal block number of linked chain ending at *E*
|
||||||
* *F* -- final, some finalised block
|
with *C <= D*
|
||||||
|
* *E* -- end, block number of some finalised block (not necessarily the latest
|
||||||
|
one)
|
||||||
|
|
||||||
|
This definition implies *0 <= C <= D <= E* and the state of the header linked
|
||||||
|
chains can uniquely be described by the triple of block numbers *(C,D,E)*.
|
||||||
|
|
||||||
This definition implies *G <= B <= L <= F* and the header chains can uniquely
|
|
||||||
be described by the triple of block numbers *(B,L,F)*.
|
|
||||||
|
|
||||||
### Storage of header chains:
|
### Storage of header chains:
|
||||||
|
|
||||||
Some block numbers from the set *{w|G<=w<=B}* may correspond to finalised
|
Some block numbers from the closed interval (including end points) *[0,C]* may
|
||||||
blocks which may be stored anywhere. If some block numbers do not correspond
|
correspond to finalised blocks, e.g. the sub-interval *[0,**base**]* where
|
||||||
to finalised blocks, then the headers must reside in the *beaconHeader*
|
**base** is the block number of the ledger state. The headers for
|
||||||
database table. Of course, due to being finalised such block numbers constitute
|
*[0,**base**]* are stored in the persistent state database. The headers for the
|
||||||
a sub-chain starting at *G*.
|
half open interval *(**base**,C]* are always stored on the *beaconHeader*
|
||||||
|
column of the *KVT* database.
|
||||||
|
|
||||||
The block numbers from the set *{w|L<=w<=F}* must reside in the *beaconHeader*
|
The block numbers from the interval *[D,E]* also reside on the *beaconHeader*
|
||||||
database table. They do not correspond to finalised blocks.
|
column of the *KVT* database table.
|
||||||
|
|
||||||
### Header chains initialisation:
|
|
||||||
|
### Header linked chains initialisation:
|
||||||
|
|
||||||
Minimal layout on a pristine system
|
Minimal layout on a pristine system
|
||||||
|
|
||||||
G (2)
|
0 (2)
|
||||||
B
|
C
|
||||||
L
|
D
|
||||||
F
|
E
|
||||||
o--->
|
o--->
|
||||||
|
|
||||||
When first initialised, the header chains are set to *(G,G,G)*.
|
When first initialised, the header linked chains are set to *(0,0,0)*.
|
||||||
|
|
||||||
### Updating header chains:
|
|
||||||
|
|
||||||
A header chain with an non empty open interval *(B,L)* can be updated only by
|
### Updating a header linked chains:
|
||||||
increasing *B* or decreasing *L* by adding headers so that the linked chain
|
|
||||||
condition is not violated.
|
|
||||||
|
|
||||||
Only when the open interval *(B,L)* vanishes the right end *F* can be increased
|
A header chain with an non empty open interval *(C,D)* can be updated only by
|
||||||
by *Z* say. Then
|
increasing *C* or decreasing *D* by adding/prepending headers so that the
|
||||||
|
linked chain condition is not violated.
|
||||||
|
|
||||||
* *B==L* beacuse interval *(B,L)* is empty
|
Only when the gap open interval *(C,D)* vanishes, the right end *E* can be
|
||||||
* *B==F* because *B* is maximal
|
increased to a larger **finalised** block number *F* say. Then
|
||||||
|
|
||||||
and the header chains *(F,F,F)* (depicted in *(3)*) can be set to *(B,Z,Z)*
|
* *C==D* beacuse the open interval *(C,D)* is empty
|
||||||
(as depicted in *(4)*.)
|
* *C==E* because *C* is maximal (see definition of `C` above)
|
||||||
|
|
||||||
Layout before updating of *F*
|
and the header chains *(E,E,E)* (depicted in *(3)* below) can be set to
|
||||||
|
*(C,F,F)* as depicted in *(4)* below.
|
||||||
|
|
||||||
B (3)
|
Layout before updating of *E*
|
||||||
L
|
|
||||||
G F Z
|
C (3)
|
||||||
|
D
|
||||||
|
0 E F
|
||||||
o----------------o---------------------o---->
|
o----------------o---------------------o---->
|
||||||
| <-- linked --> |
|
| <-- linked --> |
|
||||||
|
|
||||||
New layout with *Z*
|
New layout with moving *D* and *E* to *F*
|
||||||
|
|
||||||
L' (4)
|
D' (4)
|
||||||
G B F'
|
0 C E'
|
||||||
o----------------o---------------------o---->
|
o----------------o---------------------o---->
|
||||||
| <-- linked --> | <-- unprocessed --> |
|
| <-- linked --> | <-- unprocessed --> |
|
||||||
|
|
||||||
with *L'=Z* and *F'=Z*.
|
with *D'=F* and *E'=F*.
|
||||||
|
|
||||||
Note that diagram *(3)* is a generalisation of *(2)*.
|
Note that diagram *(3)* is a generalisation of *(2)*.
|
||||||
|
|
||||||
|
|
||||||
### Complete header chain:
|
### Complete a header linked chain:
|
||||||
|
|
||||||
The header chain is *relatively complete* if it satisfies clause *(3)* above
|
The header chain is *relatively complete* if it satisfies clause *(3)* above
|
||||||
for *G < B*. It is *fully complete* if *F==Z*. It should be obvious that the
|
for *0 < C*. It is *fully complete* if *E==F*. It should be obvious that the
|
||||||
latter condition is temporary only on a live system (as *Z* is permanently
|
latter condition is temporary only on a live system (as *F* is contiuously
|
||||||
updated.)
|
updated.)
|
||||||
|
|
||||||
If a *relatively complete* header chain is reached for the first time, the
|
If a *relatively complete* header chain is reached for the first time, the
|
||||||
execution layer can start running an importer in the background compiling
|
execution layer can start running an importer in the background
|
||||||
or executing blocks (starting from block number *#1*.) So the ledger database
|
compiling/executing blocks (starting from block number *#1*.) So the ledger
|
||||||
state will be updated incrementally.
|
database state will be updated incrementally.
|
||||||
|
|
||||||
Imported block chain
|
Block chain import/execution
|
||||||
--------------------
|
-----------------------------
|
||||||
|
|
||||||
The following imported block chain diagram amends the layout *(1)*:
|
The following diagram with a parially imported/executed block chain amends the
|
||||||
|
layout *(1)*:
|
||||||
|
|
||||||
G T B L F (5)
|
0 B C D E (5)
|
||||||
o------------------o-------o---------------------o----------------o-->
|
o------------------o-------o---------------------o----------------o-->
|
||||||
| <-- imported --> | | | |
|
| <-- imported --> | | | |
|
||||||
| <------- linked ------> | <-- unprocessed --> | <-- linked --> |
|
| <------- linked ------> | <-- unprocessed --> | <-- linked --> |
|
||||||
|
|
||||||
|
|
||||||
where *T* is the number of the last imported and executed block. Coincidentally,
|
where *B* is the **base**, i.e. the **base state** block number of the last
|
||||||
*T* also refers to the global state of the ledger database.
|
imported/executed block. It also refers to the global state block number of
|
||||||
|
the ledger database.
|
||||||
|
|
||||||
|
The headers corresponding to the half open interval `(B,C]` will be completed
|
||||||
|
by fetching block bodies and then import/execute them together with the already
|
||||||
|
cached headers.
|
||||||
|
|
||||||
The headers corresponding to the half open interval `(T,B]` can be completed by
|
|
||||||
fetching block bodies and then imported/executed.
|
|
||||||
|
|
||||||
Running the sync process for *MainNet*
|
Running the sync process for *MainNet*
|
||||||
--------------------------------------
|
--------------------------------------
|
||||||
|
@ -138,7 +170,8 @@ For syncing, a beacon node is needed that regularly informs via *RPC* of a
|
||||||
recently finalised block header.
|
recently finalised block header.
|
||||||
|
|
||||||
The beacon node program used here is the *nimbus_beacon_node* binary from the
|
The beacon node program used here is the *nimbus_beacon_node* binary from the
|
||||||
*nimbus-eth2* project (any other will do.) *Nimbus_beacon_node* is started as
|
*nimbus-eth2* project (any other, e.g.the *light client* will do.)
|
||||||
|
*Nimbus_beacon_node* is started as
|
||||||
|
|
||||||
./run-mainnet-beacon-node.sh \
|
./run-mainnet-beacon-node.sh \
|
||||||
--web3-url=http://127.0.0.1:8551 \
|
--web3-url=http://127.0.0.1:8551 \
|
||||||
|
@ -153,19 +186,20 @@ It will take a while for *nimbus_beacon_node* to catch up (see the
|
||||||
|
|
||||||
### Starting `nimbus` for syncing
|
### Starting `nimbus` for syncing
|
||||||
|
|
||||||
As the sync process is quite slow, it makes sense to pre-load the database
|
As the syncing process is quite slow, it makes sense to pre-load the database
|
||||||
with data from an `Era1` archive (if available) before starting the real
|
from an *Era1* archive (if available) before starting the real sync process.
|
||||||
sync process. The command would be something like
|
The command for importing an *Era1* reproitory would be something like
|
||||||
|
|
||||||
./build/nimbus import \
|
./build/nimbus import \
|
||||||
--era1-dir:/path/to/main-era1/repo \
|
--era1-dir:/path/to/main-era1/repo \
|
||||||
...
|
...
|
||||||
|
|
||||||
which will take a while for the full *MainNet* era1 repository (but way faster
|
which will take its time for the full *MainNet* Era1 repository (but way faster
|
||||||
than the sync.)
|
than the beacon sync.)
|
||||||
|
|
||||||
|
On a system with memory considerably larger than *8GiB* the *nimbus* binary is
|
||||||
|
started on the same machine where the beacon node runs with the command
|
||||||
|
|
||||||
On a system with memory considerably larger than *8GiB* the *nimbus*
|
|
||||||
binary is started on the same machine where the beacon node runs as
|
|
||||||
|
|
||||||
./build/nimbus \
|
./build/nimbus \
|
||||||
--network=mainnet \
|
--network=mainnet \
|
||||||
|
@ -200,8 +234,9 @@ To start syncing, the following additional options apply to *nimbus*:
|
||||||
--debug-rdb-vtx-cache-size=268435456
|
--debug-rdb-vtx-cache-size=268435456
|
||||||
|
|
||||||
Also, to reduce the backlog for *nimbus-eth2* stored on disk, the following
|
Also, to reduce the backlog for *nimbus-eth2* stored on disk, the following
|
||||||
changes might be considered. For file
|
changes might be considered. In the file
|
||||||
*nimbus-eth2/vendor/mainnet/metadata/config.yaml* change setting constants:
|
*nimbus-eth2/vendor/mainnet/metadata/config.yaml* change the folloing
|
||||||
|
settings
|
||||||
|
|
||||||
MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024
|
MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024
|
||||||
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 4096
|
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 4096
|
||||||
|
@ -221,17 +256,17 @@ be available if *nimbus* is compiled with the additional make flags
|
||||||
*NIMFLAGS="-d:metrics \-\-threads:on"*:
|
*NIMFLAGS="-d:metrics \-\-threads:on"*:
|
||||||
|
|
||||||
| *Variable* | *Logic type* | *Short description* |
|
| *Variable* | *Logic type* | *Short description* |
|
||||||
|:-------------------------------|:------------:|:--------------------|
|
|:-------------------|:------------:|:--------------------|
|
||||||
| | | |
|
| | | |
|
||||||
| beacon_state_block_number | block height | **T**, *increasing* |
|
| beacon_base | block height | **B**, *increasing* |
|
||||||
| beacon_base_block_number | block height | **B**, *increasing* |
|
| beacon_coupler | block height | **C**, *increasing* |
|
||||||
| beacon_least_block_number | block height | **L** |
|
| beacon_dangling | block height | **D** |
|
||||||
| beacon_final_block_number | block height | **F**, *increasing* |
|
| beacon_end | block height | **E**, *increasing* |
|
||||||
| beacon_beacon_block_number | block height | **Z**, *increasing* |
|
| beacon_final | block height | **F**, *increasing* |
|
||||||
| | | |
|
| | | |
|
||||||
| beacon_headers_staged_queue_len| size | # of staged header list records |
|
| beacon_header_lists_staged | size | # of staged header list records |
|
||||||
| beacon_headers_unprocessed | size | # of accumulated header block numbers|
|
| beacon_headers_unprocessed | size | # of accumulated header block numbers|
|
||||||
| beacon_blocks_staged_queue_len | size | # of staged block list records |
|
| beacon_block_lists_staged | size | # of staged block list records |
|
||||||
| beacon_blocks_unprocessed | size | # of accumulated body block numbers |
|
| beacon_blocks_unprocessed | size | # of accumulated body block numbers |
|
||||||
| | | |
|
| | | |
|
||||||
| beacon_number_of_buddies | size | # of working peers |
|
| beacon_buddies | size | # of peers working concurrently |
|
||||||
|
|
|
@ -1,4 +1 @@
|
||||||
* Update/resolve code fragments which are tagged FIXME
|
* Update/resolve code fragments which are tagged FIXME
|
||||||
* Update docu:
|
|
||||||
+ beacon_beacon_block_number -> beacon_finalized ??
|
|
||||||
+ in general, check agaist glossary https://notes.status.im/nimbus-merge-first-el?both=#Glossary
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ import
|
||||||
pkg/eth/[common, p2p],
|
pkg/eth/[common, p2p],
|
||||||
pkg/stew/[interval_set, sorted_set],
|
pkg/stew/[interval_set, sorted_set],
|
||||||
../../common,
|
../../common,
|
||||||
./worker/[blocks_staged, db, headers_staged, headers_unproc, helpers,
|
./worker/[blocks_staged, db, headers_staged, headers_unproc,
|
||||||
start_stop, update],
|
start_stop, update],
|
||||||
./worker_desc
|
./worker_desc
|
||||||
|
|
||||||
|
@ -112,8 +112,8 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
|
||||||
debug info
|
debug info
|
||||||
|
|
||||||
# Check for a possible header layout and body request changes
|
# Check for a possible header layout and body request changes
|
||||||
discard ctx.updateLinkedHChainsLayout()
|
discard ctx.updateLinkedHChainsLayout info
|
||||||
discard ctx.updateBlockRequests()
|
discard ctx.updateBlockRequests info
|
||||||
|
|
||||||
# Execute staged block records.
|
# Execute staged block records.
|
||||||
if ctx.blocksStagedCanImportOk():
|
if ctx.blocksStagedCanImportOk():
|
||||||
|
@ -132,7 +132,7 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
|
||||||
# Allow pseudo/async thread switch
|
# Allow pseudo/async thread switch
|
||||||
await sleepAsync asyncThreadSwitchTimeSlot
|
await sleepAsync asyncThreadSwitchTimeSlot
|
||||||
|
|
||||||
# At the end of the cycle, leave time to refill
|
# At the end of the cycle, leave time to refill headers/blocks
|
||||||
await sleepAsync daemonWaitInterval
|
await sleepAsync daemonWaitInterval
|
||||||
|
|
||||||
ctx.updateMetrics()
|
ctx.updateMetrics()
|
||||||
|
@ -182,8 +182,8 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
|
||||||
#
|
#
|
||||||
# Layout of a triple of linked header chains (see `README.md`)
|
# Layout of a triple of linked header chains (see `README.md`)
|
||||||
# ::
|
# ::
|
||||||
# G B L F
|
# 0 C D E
|
||||||
# | <--- [G,B] --> | <----- (B,L) -----> | <-- [L,F] ---> |
|
# | <--- [0,C] --> | <----- (C,D) -----> | <-- [D,E] ---> |
|
||||||
# o----------------o---------------------o----------------o--->
|
# o----------------o---------------------o----------------o--->
|
||||||
# | <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
# | <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
||||||
#
|
#
|
||||||
|
@ -191,17 +191,17 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
|
||||||
# headers and stashing them on the database. Each concurrently running
|
# headers and stashing them on the database. Each concurrently running
|
||||||
# actor works as follows:
|
# actor works as follows:
|
||||||
#
|
#
|
||||||
# * Get a range of block numbers from the `unprocessed` range `(B,L)`.
|
# * Get a range of block numbers from the `unprocessed` range `(C,D)`.
|
||||||
# * Fetch headers for this range (as much as one can get).
|
# * Fetch headers for this range (as much as one can get).
|
||||||
# * Stash then on the database.
|
# * Stash then on the database.
|
||||||
# * Rinse and repeat.
|
# * Rinse and repeat.
|
||||||
#
|
#
|
||||||
# The block numbers range concurrently taken from `(B,L)` are chosen
|
# The block numbers range concurrently taken from `(C,D)` are chosen
|
||||||
# from the upper range. So exactly one of the actors has a range
|
# from the upper range. So exactly one of the actors has a range
|
||||||
# `[whatever,L-1]` adjacent to `[L,F]`. Call this actor the lead actor.
|
# `[whatever,D-1]` adjacent to `[D,E]`. Call this actor the lead actor.
|
||||||
#
|
#
|
||||||
# For the lead actor, headers can be downloaded all by the hashes as
|
# For the lead actor, headers can be downloaded all by the hashes as
|
||||||
# the parent hash for the header with block number `L` is known. All
|
# the parent hash for the header with block number `D` is known. All
|
||||||
# other non-lead actors will download headers by the block number only
|
# other non-lead actors will download headers by the block number only
|
||||||
# and stage it to be re-ordered and stashed on the database when ready.
|
# and stage it to be re-ordered and stashed on the database when ready.
|
||||||
#
|
#
|
||||||
|
@ -221,7 +221,7 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
|
||||||
if await buddy.headersStagedCollect info:
|
if await buddy.headersStagedCollect info:
|
||||||
|
|
||||||
# * Save updated state and headers
|
# * Save updated state and headers
|
||||||
# * Decrease the left boundary `L` of the trusted range `[L,F]`
|
# * Decrease the dangling left boundary `D` of the trusted range `[D,E]`
|
||||||
discard buddy.ctx.headersStagedProcess info
|
discard buddy.ctx.headersStagedProcess info
|
||||||
|
|
||||||
# Fetch bodies and combine them with headers to blocks to be staged. These
|
# Fetch bodies and combine them with headers to blocks to be staged. These
|
||||||
|
|
|
@ -225,9 +225,9 @@ proc blocksStagedImport*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||||
return false
|
return false
|
||||||
|
|
||||||
# Fetch least record, accept only if it matches the global ledger state
|
# Fetch least record, accept only if it matches the global ledger state
|
||||||
let t = ctx.dbStateBlockNumber()
|
let base = ctx.dbStateBlockNumber()
|
||||||
if qItem.key != t + 1:
|
if qItem.key != base + 1:
|
||||||
trace info & ": there is a gap", T=t.bnStr, stagedBottom=qItem.key.bnStr
|
trace info & ": there is a gap", B=base.bnStr, stagedBottom=qItem.key.bnStr
|
||||||
return false
|
return false
|
||||||
|
|
||||||
# Remove from queue
|
# Remove from queue
|
||||||
|
@ -236,12 +236,12 @@ proc blocksStagedImport*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||||
# Execute blocks
|
# Execute blocks
|
||||||
let stats = ctx.pool.chain.persistBlocks(qItem.data.blocks).valueOr:
|
let stats = ctx.pool.chain.persistBlocks(qItem.data.blocks).valueOr:
|
||||||
# FIXME: should that be rather an `raiseAssert` here?
|
# FIXME: should that be rather an `raiseAssert` here?
|
||||||
warn info & ": block exec error", T=t.bnStr,
|
warn info & ": block exec error", B=base.bnStr,
|
||||||
iv=BnRange.new(qItem.key,qItem.key+qItem.data.blocks.len.uint64-1), error
|
iv=BnRange.new(qItem.key,qItem.key+qItem.data.blocks.len.uint64-1), error
|
||||||
doAssert t == ctx.dbStateBlockNumber()
|
doAssert base == ctx.dbStateBlockNumber()
|
||||||
return false
|
return false
|
||||||
|
|
||||||
trace info & ": imported staged blocks", T=ctx.dbStateBlockNumber.bnStr,
|
trace info & ": imported staged blocks", B=ctx.dbStateBlockNumber.bnStr,
|
||||||
first=qItem.key.bnStr, stats
|
first=qItem.key.bnStr, stats
|
||||||
|
|
||||||
# Remove stashed headers
|
# Remove stashed headers
|
||||||
|
|
|
@ -16,8 +16,7 @@ import
|
||||||
pkg/eth/[common, p2p],
|
pkg/eth/[common, p2p],
|
||||||
pkg/stew/interval_set,
|
pkg/stew/interval_set,
|
||||||
../../../protocol,
|
../../../protocol,
|
||||||
../../worker_desc,
|
../../worker_desc
|
||||||
../helpers
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "beacon bodies"
|
topics = "beacon bodies"
|
||||||
|
|
|
@ -65,7 +65,6 @@ proc fetchSavedState(ctx: BeaconCtxRef): Opt[SavedDbStateSpecs] =
|
||||||
|
|
||||||
err()
|
err()
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Public functions
|
# Public functions
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
@ -103,21 +102,23 @@ proc dbLoadLinkedHChainsLayout*(ctx: BeaconCtxRef) =
|
||||||
let rc = ctx.fetchLinkedHChainsLayout()
|
let rc = ctx.fetchLinkedHChainsLayout()
|
||||||
if rc.isOk:
|
if rc.isOk:
|
||||||
ctx.lhc.layout = rc.value
|
ctx.lhc.layout = rc.value
|
||||||
let (uMin,uMax) = (rc.value.base+1, rc.value.least-1)
|
let (uMin,uMax) = (rc.value.coupler+1, rc.value.dangling-1)
|
||||||
if uMin <= uMax:
|
if uMin <= uMax:
|
||||||
# Add interval of unprocessed block range `(B,L)` from README
|
# Add interval of unprocessed block range `(C,D)` from `README.md`
|
||||||
ctx.headersUnprocSet(uMin, uMax)
|
ctx.headersUnprocSet(uMin, uMax)
|
||||||
trace info & ": restored layout from DB"
|
trace info & ": restored layout", C=rc.value.coupler.bnStr,
|
||||||
|
D=rc.value.dangling.bnStr, E=rc.value.endBn.bnStr
|
||||||
else:
|
else:
|
||||||
let val = ctx.fetchSavedState().expect "saved states"
|
let val = ctx.fetchSavedState().expect "saved states"
|
||||||
ctx.lhc.layout = LinkedHChainsLayout(
|
ctx.lhc.layout = LinkedHChainsLayout(
|
||||||
base: val.number,
|
coupler: val.number,
|
||||||
baseHash: val.hash,
|
couplerHash: val.hash,
|
||||||
least: val.number,
|
dangling: val.number,
|
||||||
leastParent: val.parent,
|
danglingParent: val.parent,
|
||||||
final: val.number,
|
endBn: val.number,
|
||||||
finalHash: val.hash)
|
endHash: val.hash)
|
||||||
trace info & ": new layout"
|
trace info & ": new layout", B=val.number, C=rc.value.coupler.bnStr,
|
||||||
|
D=rc.value.dangling.bnStr, E=rc.value.endBn.bnStr
|
||||||
|
|
||||||
ctx.lhc.lastLayout = ctx.layout
|
ctx.lhc.lastLayout = ctx.layout
|
||||||
|
|
||||||
|
|
|
@ -60,14 +60,13 @@ proc headerStagedUpdateBeacon*(
|
||||||
) {.async.} =
|
) {.async.} =
|
||||||
## Fetch beacon header if there is an update available
|
## Fetch beacon header if there is an update available
|
||||||
let ctx = buddy.ctx
|
let ctx = buddy.ctx
|
||||||
if ctx.lhc.beacon.finalised != zeroHash32:
|
if ctx.lhc.final.hash != zeroHash32:
|
||||||
const iv = BnRange.new(1u,1u) # dummy interval
|
const iv = BnRange.new(1u,1u) # dummy interval
|
||||||
let finHash = ctx.lhc.beacon.finalised
|
let rc = await buddy.headersFetchReversed(iv, ctx.lhc.final.hash, info)
|
||||||
let rc = await buddy.headersFetchReversed(iv, finHash, info)
|
if rc.isOk and ctx.lhc.final.header.number < rc.value[0].number:
|
||||||
if rc.isOk and ctx.lhc.beacon.header.number < rc.value[0].number:
|
ctx.lhc.final.header = rc.value[0]
|
||||||
ctx.lhc.beacon.header = rc.value[0]
|
ctx.lhc.final.changed = true
|
||||||
ctx.lhc.beacon.changed = true
|
ctx.lhc.final.hash = zeroHash32
|
||||||
ctx.lhc.beacon.finalised = zeroHash32
|
|
||||||
|
|
||||||
|
|
||||||
proc headersStagedCollect*(
|
proc headersStagedCollect*(
|
||||||
|
@ -102,14 +101,14 @@ proc headersStagedCollect*(
|
||||||
iv = ctx.headersUnprocFetch(nFetchHeadersBatch).expect "valid interval"
|
iv = ctx.headersUnprocFetch(nFetchHeadersBatch).expect "valid interval"
|
||||||
|
|
||||||
# Check for top header hash. If the range to fetch directly joins below
|
# Check for top header hash. If the range to fetch directly joins below
|
||||||
# the top level linked chain `L..F`, then there is the hash available for
|
# the top level linked chain `[D,E]`, then there is the hash available for
|
||||||
# the top level header to fetch. Otherwise -- with multi-peer mode -- the
|
# the top level header to fetch. Otherwise -- with multi-peer mode -- the
|
||||||
# range of headers is fetched opportunistically using block numbers only.
|
# range of headers is fetched opportunistically using block numbers only.
|
||||||
isOpportunistic = uTop + 1 != ctx.layout.least
|
isOpportunistic = uTop + 1 != ctx.layout.dangling
|
||||||
|
|
||||||
# Parent hash for `lhc` below
|
# Parent hash for `lhc` below
|
||||||
topLink = (if isOpportunistic: EMPTY_ROOT_HASH else: ctx.layout.leastParent)
|
topLink = (if isOpportunistic: EMPTY_ROOT_HASH
|
||||||
|
else: ctx.layout.danglingParent)
|
||||||
var
|
var
|
||||||
# This value is used for splitting the interval `iv` into
|
# This value is used for splitting the interval `iv` into
|
||||||
# `[iv.minPt, somePt] + [somePt+1, ivTop] + already-collected` where the
|
# `[iv.minPt, somePt] + [somePt+1, ivTop] + already-collected` where the
|
||||||
|
@ -196,30 +195,31 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
|
||||||
break # all done
|
break # all done
|
||||||
|
|
||||||
let
|
let
|
||||||
least = ctx.layout.least # `L` from `README.md` (1) or `worker_desc`
|
dangling = ctx.layout.dangling
|
||||||
iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint64 + 1, qItem.key)
|
iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint64 + 1, qItem.key)
|
||||||
if iv.maxPt+1 < least:
|
if iv.maxPt+1 < dangling:
|
||||||
trace info & ": there is a gap", iv, L=least.bnStr, nSaved=result
|
trace info & ": there is a gap", iv, D=dangling.bnStr, nSaved=result
|
||||||
break # there is a gap -- come back later
|
break # there is a gap -- come back later
|
||||||
|
|
||||||
# Overlap must not happen
|
# Overlap must not happen
|
||||||
if iv.maxPt+1 != least:
|
if iv.maxPt+1 != dangling:
|
||||||
raiseAssert info & ": Overlap iv=" & $iv & " L=" & least.bnStr
|
raiseAssert info & ": Overlap iv=" & $iv & " D=" & dangling.bnStr
|
||||||
|
|
||||||
# Process item from `staged` queue. So it is not needed in the list,
|
# Process item from `staged` queue. So it is not needed in the list,
|
||||||
# anymore.
|
# anymore.
|
||||||
discard ctx.lhc.staged.delete(iv.maxPt)
|
discard ctx.lhc.staged.delete(iv.maxPt)
|
||||||
|
|
||||||
if qItem.data.hash != ctx.layout.leastParent:
|
if qItem.data.hash != ctx.layout.danglingParent:
|
||||||
# Discard wrong chain and merge back the range into the `unproc` list.
|
# Discard wrong chain and merge back the range into the `unproc` list.
|
||||||
ctx.headersUnprocCommit(0,iv)
|
ctx.headersUnprocCommit(0,iv)
|
||||||
trace info & ": discarding staged record", iv, L=least.bnStr, lap=result
|
trace info & ": discarding staged record",
|
||||||
|
iv, D=dangling.bnStr, lap=result
|
||||||
break
|
break
|
||||||
|
|
||||||
# Store headers on database
|
# Store headers on database
|
||||||
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs)
|
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs)
|
||||||
ctx.layout.least = iv.minPt
|
ctx.layout.dangling = iv.minPt
|
||||||
ctx.layout.leastParent = qItem.data.parentHash
|
ctx.layout.danglingParent = qItem.data.parentHash
|
||||||
discard ctx.dbStoreLinkedHChainsLayout()
|
discard ctx.dbStoreLinkedHChainsLayout()
|
||||||
|
|
||||||
result.inc # count records
|
result.inc # count records
|
||||||
|
|
|
@ -17,8 +17,7 @@ import
|
||||||
pkg/stew/interval_set,
|
pkg/stew/interval_set,
|
||||||
../../../protocol,
|
../../../protocol,
|
||||||
../../../protocol/eth/eth_types,
|
../../../protocol/eth/eth_types,
|
||||||
../../worker_desc,
|
../../worker_desc
|
||||||
../helpers
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "beacon headers"
|
topics = "beacon headers"
|
||||||
|
|
|
@ -10,6 +10,8 @@
|
||||||
|
|
||||||
{.push raises:[].}
|
{.push raises:[].}
|
||||||
|
|
||||||
|
## Extracted helpers from `worker_desc` (avoids circular import)
|
||||||
|
|
||||||
import
|
import
|
||||||
pkg/chronos,
|
pkg/chronos,
|
||||||
pkg/eth/common
|
pkg/eth/common
|
||||||
|
|
|
@ -29,11 +29,12 @@ when enableTicker:
|
||||||
## Legacy stuff, will be probably be superseded by `metrics`
|
## Legacy stuff, will be probably be superseded by `metrics`
|
||||||
return proc: auto =
|
return proc: auto =
|
||||||
TickerStats(
|
TickerStats(
|
||||||
stateTop: ctx.dbStateBlockNumber(),
|
base: ctx.dbStateBlockNumber(),
|
||||||
base: ctx.layout.base,
|
coupler: ctx.layout.coupler,
|
||||||
least: ctx.layout.least,
|
dangling: ctx.layout.dangling,
|
||||||
final: ctx.layout.final,
|
endBn: ctx.layout.endBn,
|
||||||
beacon: ctx.lhc.beacon.header.number,
|
final: ctx.lhc.final.header.number,
|
||||||
|
finalUpdateOk: ctx.lhc.final.hash != zeroHash32,
|
||||||
|
|
||||||
nHdrStaged: ctx.headersStagedQueueLen(),
|
nHdrStaged: ctx.headersStagedQueueLen(),
|
||||||
hdrStagedTop: ctx.headersStagedTopKey(),
|
hdrStagedTop: ctx.headersStagedTopKey(),
|
||||||
|
@ -47,15 +48,16 @@ when enableTicker:
|
||||||
nBlkUnprocessed: ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed(),
|
nBlkUnprocessed: ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed(),
|
||||||
nBlkUnprocFragm: ctx.blocksUnprocChunks(),
|
nBlkUnprocFragm: ctx.blocksUnprocChunks(),
|
||||||
|
|
||||||
reorg: ctx.pool.nReorg)
|
reorg: ctx.pool.nReorg,
|
||||||
|
nBuddies: ctx.pool.nBuddies)
|
||||||
|
|
||||||
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): SyncFinalisedBlockHashCB =
|
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): SyncFinalisedBlockHashCB =
|
||||||
## Update beacon header. This function is intended as a call back function
|
## Update beacon header. This function is intended as a call back function
|
||||||
## for the RPC module.
|
## for the RPC module.
|
||||||
return proc(h: Hash32) {.gcsafe, raises: [].} =
|
return proc(h: Hash32) {.gcsafe, raises: [].} =
|
||||||
# Rpc checks empty header against a zero hash rather than `emptyRoot`
|
# Rpc checks empty header against a zero hash rather than `emptyRoot`
|
||||||
if ctx.lhc.beacon.finalised == zeroHash32:
|
if ctx.lhc.final.hash == zeroHash32:
|
||||||
ctx.lhc.beacon.finalised = h
|
ctx.lhc.final.hash = h
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Public functions
|
# Public functions
|
||||||
|
@ -122,14 +124,10 @@ proc startBuddy*(buddy: BeaconBuddyRef): bool =
|
||||||
peer = buddy.peer
|
peer = buddy.peer
|
||||||
if peer.supports(protocol.eth) and peer.state(protocol.eth).initialized:
|
if peer.supports(protocol.eth) and peer.state(protocol.eth).initialized:
|
||||||
ctx.pool.nBuddies.inc # for metrics
|
ctx.pool.nBuddies.inc # for metrics
|
||||||
when enableTicker:
|
|
||||||
ctx.pool.ticker.startBuddy()
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
proc stopBuddy*(buddy: BeaconBuddyRef) =
|
proc stopBuddy*(buddy: BeaconBuddyRef) =
|
||||||
buddy.ctx.pool.nBuddies.dec # for metrics
|
buddy.ctx.pool.nBuddies.dec # for metrics
|
||||||
when enableTicker:
|
|
||||||
buddy.ctx.pool.ticker.stopBuddy()
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# End
|
# End
|
||||||
|
|
|
@ -26,11 +26,12 @@ type
|
||||||
|
|
||||||
TickerStats* = object
|
TickerStats* = object
|
||||||
## Full sync state (see `TickerFullStatsUpdater`)
|
## Full sync state (see `TickerFullStatsUpdater`)
|
||||||
stateTop*: BlockNumber
|
|
||||||
base*: BlockNumber
|
base*: BlockNumber
|
||||||
least*: BlockNumber
|
coupler*: BlockNumber
|
||||||
|
dangling*: BlockNumber
|
||||||
|
endBn*: BlockNumber
|
||||||
final*: BlockNumber
|
final*: BlockNumber
|
||||||
beacon*: BlockNumber
|
finalUpdateOk*: bool
|
||||||
|
|
||||||
hdrUnprocTop*: BlockNumber
|
hdrUnprocTop*: BlockNumber
|
||||||
nHdrUnprocessed*: uint64
|
nHdrUnprocessed*: uint64
|
||||||
|
@ -45,10 +46,10 @@ type
|
||||||
blkStagedBottom*: BlockNumber
|
blkStagedBottom*: BlockNumber
|
||||||
|
|
||||||
reorg*: int
|
reorg*: int
|
||||||
|
nBuddies*: int
|
||||||
|
|
||||||
TickerRef* = ref object
|
TickerRef* = ref object
|
||||||
## Ticker descriptor object
|
## Ticker descriptor object
|
||||||
nBuddies: int
|
|
||||||
started: Moment
|
started: Moment
|
||||||
visited: Moment
|
visited: Moment
|
||||||
prettyPrint: proc(t: TickerRef) {.gcsafe, raises: [].}
|
prettyPrint: proc(t: TickerRef) {.gcsafe, raises: [].}
|
||||||
|
@ -72,11 +73,11 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||||
if data != t.lastStats or
|
if data != t.lastStats or
|
||||||
tickerLogSuppressMax < (now - t.visited):
|
tickerLogSuppressMax < (now - t.visited):
|
||||||
let
|
let
|
||||||
T = data.stateTop.bnStr
|
|
||||||
B = data.base.bnStr
|
B = data.base.bnStr
|
||||||
L = data.least.bnStr
|
C = if data.base == data.coupler: "B" else: data.coupler.bnStr
|
||||||
F = data.final.bnStr
|
D = if data.coupler == data.dangling: "C" else: data.dangling.bnStr
|
||||||
Z = data.beacon.bnStr
|
E = if data.dangling == data.endBn: "D" else: data.endBn.bnStr
|
||||||
|
F = if data.finalUpdateOk: "?" & $data.final else: data.final.bnStr
|
||||||
|
|
||||||
hS = if data.nHdrStaged == 0: "n/a"
|
hS = if data.nHdrStaged == 0: "n/a"
|
||||||
else: data.hdrStagedTop.bnStr & "(" & $data.nHdrStaged & ")"
|
else: data.hdrStagedTop.bnStr & "(" & $data.nHdrStaged & ")"
|
||||||
|
@ -91,7 +92,7 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||||
data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")"
|
data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")"
|
||||||
|
|
||||||
reorg = data.reorg
|
reorg = data.reorg
|
||||||
peers = t.nBuddies
|
peers = data.nBuddies
|
||||||
|
|
||||||
# With `int64`, there are more than 29*10^10 years range for seconds
|
# With `int64`, there are more than 29*10^10 years range for seconds
|
||||||
up = (now - t.started).seconds.uint64.toSI
|
up = (now - t.started).seconds.uint64.toSI
|
||||||
|
@ -100,7 +101,7 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
|
||||||
t.lastStats = data
|
t.lastStats = data
|
||||||
t.visited = now
|
t.visited = now
|
||||||
|
|
||||||
info "State", up, peers, T, B, L, F, Z, hS, hU, bS, bU, reorg, mem
|
info "Sync state", up, peers, B, C, D, E, F, hS, hU, bS, bU, reorg, mem
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Private functions: ticking log messages
|
# Private functions: ticking log messages
|
||||||
|
@ -114,7 +115,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||||
|
|
||||||
proc setLogTicker(t: TickerRef; at: Moment) =
|
proc setLogTicker(t: TickerRef; at: Moment) =
|
||||||
if t.statsCb.isNil:
|
if t.statsCb.isNil:
|
||||||
debug "Stopped", nBuddies=t.nBuddies
|
debug "Stopped", nBuddies=t.lastStats.nBuddies
|
||||||
else:
|
else:
|
||||||
# Store the `runLogTicker()` in a closure to avoid some garbage collection
|
# Store the `runLogTicker()` in a closure to avoid some garbage collection
|
||||||
# memory corruption issues that might occur otherwise.
|
# memory corruption issues that might occur otherwise.
|
||||||
|
@ -137,27 +138,6 @@ proc destroy*(t: TickerRef) =
|
||||||
if not t.isNil:
|
if not t.isNil:
|
||||||
t.statsCb = TickerStatsUpdater(nil)
|
t.statsCb = TickerStatsUpdater(nil)
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
# Public functions
|
|
||||||
# ------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
proc startBuddy*(t: TickerRef) =
|
|
||||||
## Increment buddies counter and start ticker unless running.
|
|
||||||
if not t.isNil:
|
|
||||||
if t.nBuddies <= 0:
|
|
||||||
t.nBuddies = 1
|
|
||||||
else:
|
|
||||||
t.nBuddies.inc
|
|
||||||
debug "Start buddy", nBuddies=t.nBuddies
|
|
||||||
|
|
||||||
proc stopBuddy*(t: TickerRef) =
|
|
||||||
## Decrement buddies counter and stop ticker if there are no more registered
|
|
||||||
## buddies.
|
|
||||||
if not t.isNil:
|
|
||||||
if 0 < t.nBuddies:
|
|
||||||
t.nBuddies.dec
|
|
||||||
debug "Stop buddy", nBuddies=t.nBuddies
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# End
|
# End
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
|
|
@ -16,7 +16,7 @@ import
|
||||||
pkg/stew/sorted_set,
|
pkg/stew/sorted_set,
|
||||||
../worker_desc,
|
../worker_desc,
|
||||||
./update/metrics,
|
./update/metrics,
|
||||||
"."/[blocks_unproc, db, headers_staged, headers_unproc, helpers]
|
"."/[blocks_unproc, db, headers_staged, headers_unproc]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "beacon update"
|
topics = "beacon update"
|
||||||
|
@ -25,58 +25,56 @@ logScope:
|
||||||
# Private functions
|
# Private functions
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
proc updateBeaconChange(ctx: BeaconCtxRef): bool =
|
proc updateFinalisedChange(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||||
##
|
##
|
||||||
## Layout (see (3) in README):
|
## Layout (see (3) in README):
|
||||||
## ::
|
## ::
|
||||||
## G B==L==F Z
|
## 0 C==D==E F
|
||||||
## o----------------o---------------------o---->
|
## o----------------o---------------------o---->
|
||||||
## | <-- linked --> |
|
## | <-- linked --> |
|
||||||
##
|
##
|
||||||
## or
|
## or
|
||||||
## ::
|
## ::
|
||||||
## G==Z B==L==F
|
## 0==F C==D==E
|
||||||
## o----------------o-------------------------->
|
## o----------------o-------------------------->
|
||||||
## | <-- linked --> |
|
## | <-- linked --> |
|
||||||
##
|
##
|
||||||
## with `Z == beacon.header.number` or `Z == 0`
|
## with `F == final.header.number` or `F == 0`
|
||||||
##
|
##
|
||||||
## to be updated to
|
## to be updated to
|
||||||
## ::
|
## ::
|
||||||
## G B==L L'==F'
|
## 0 C==D D'==E'
|
||||||
## o----------------o---------------------o---->
|
## o----------------o---------------------o---->
|
||||||
## | <-- linked --> | <-- unprocessed --> |
|
## | <-- linked --> | <-- unprocessed --> |
|
||||||
##
|
##
|
||||||
const info = "updateBeaconChange"
|
var finBn = ctx.lhc.final.header.number
|
||||||
|
|
||||||
var z = ctx.lhc.beacon.header.number
|
# Need: `E < F` and `C == D`
|
||||||
|
if finBn != 0 and finBn <= ctx.layout.endBn: # violates `E < F`
|
||||||
# Need: `F < Z` and `B == L`
|
trace info & ": not applicable", E=ctx.layout.endBn.bnStr, F=finBn.bnStr
|
||||||
if z != 0 and z <= ctx.layout.final: # violates `F < Z`
|
|
||||||
trace info & ": not applicable", Z=z.bnStr, F=ctx.layout.final.bnStr
|
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if ctx.layout.base != ctx.layout.least: # violates `B == L`
|
if ctx.layout.coupler != ctx.layout.dangling: # violates `C == D`
|
||||||
trace info & ": not applicable",
|
trace info & ": not applicable",
|
||||||
B=ctx.layout.base.bnStr, L=ctx.layout.least.bnStr
|
C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr
|
||||||
return false
|
return false
|
||||||
|
|
||||||
# Check consistency: `B == L <= F` for maximal `B` => `L == F`
|
# Check consistency: `C == D <= E` for maximal `C` => `D == E`
|
||||||
doAssert ctx.layout.least == ctx.layout.final
|
doAssert ctx.layout.dangling == ctx.layout.endBn
|
||||||
|
|
||||||
let rlpHeader = rlp.encode(ctx.lhc.beacon.header)
|
let rlpHeader = rlp.encode(ctx.lhc.final.header)
|
||||||
|
|
||||||
ctx.lhc.layout = LinkedHChainsLayout(
|
ctx.lhc.layout = LinkedHChainsLayout(
|
||||||
base: ctx.layout.base,
|
coupler: ctx.layout.coupler,
|
||||||
baseHash: ctx.layout.baseHash,
|
couplerHash: ctx.layout.couplerHash,
|
||||||
least: z,
|
dangling: finBn,
|
||||||
leastParent: ctx.lhc.beacon.header.parentHash,
|
danglingParent: ctx.lhc.final.header.parentHash,
|
||||||
final: z,
|
endBn: finBn,
|
||||||
finalHash: rlpHeader.keccak256)
|
endHash: rlpHeader.keccak256)
|
||||||
|
|
||||||
# Save this header on the database so it needs not be fetched again from
|
# Save this header on the database so it needs not be fetched again from
|
||||||
# somewhere else.
|
# somewhere else.
|
||||||
ctx.dbStashHeaders(z, @[rlpHeader])
|
ctx.dbStashHeaders(finBn, @[rlpHeader])
|
||||||
|
|
||||||
# Save state
|
# Save state
|
||||||
discard ctx.dbStoreLinkedHChainsLayout()
|
discard ctx.dbStoreLinkedHChainsLayout()
|
||||||
|
@ -85,39 +83,39 @@ proc updateBeaconChange(ctx: BeaconCtxRef): bool =
|
||||||
doAssert ctx.headersUnprocTotal() == 0
|
doAssert ctx.headersUnprocTotal() == 0
|
||||||
doAssert ctx.headersUnprocBorrowed() == 0
|
doAssert ctx.headersUnprocBorrowed() == 0
|
||||||
doAssert ctx.headersStagedQueueIsEmpty()
|
doAssert ctx.headersStagedQueueIsEmpty()
|
||||||
ctx.headersUnprocSet(ctx.layout.base+1, ctx.layout.least-1)
|
ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1)
|
||||||
|
|
||||||
trace info & ": updated"
|
trace info & ": updated", C=ctx.layout.coupler.bnStr,
|
||||||
|
D=ctx.layout.dangling.bnStr, E=ctx.layout.endBn.bnStr, F=finBn.bnStr
|
||||||
true
|
true
|
||||||
|
|
||||||
|
|
||||||
proc mergeAdjacentChains(ctx: BeaconCtxRef): bool =
|
proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||||
const info = "mergeAdjacentChains"
|
if ctx.lhc.layout.coupler+1 < ctx.lhc.layout.dangling or # gap btw. `C` & `D`
|
||||||
|
ctx.lhc.layout.coupler == ctx.lhc.layout.dangling: # merged already
|
||||||
if ctx.lhc.layout.base + 1 < ctx.lhc.layout.least or # gap betw `B` and `L`
|
|
||||||
ctx.lhc.layout.base == ctx.lhc.layout.least: # merged already
|
|
||||||
return false
|
return false
|
||||||
|
|
||||||
# No overlap allowed!
|
# No overlap allowed!
|
||||||
doAssert ctx.lhc.layout.base + 1 == ctx.lhc.layout.least
|
doAssert ctx.lhc.layout.coupler+1 == ctx.lhc.layout.dangling
|
||||||
|
|
||||||
# Verify adjacent chains
|
# Verify adjacent chains
|
||||||
if ctx.lhc.layout.baseHash != ctx.lhc.layout.leastParent:
|
if ctx.lhc.layout.couplerHash != ctx.lhc.layout.danglingParent:
|
||||||
# FIXME: Oops -- any better idea than to defect?
|
# FIXME: Oops -- any better idea than to defect?
|
||||||
raiseAssert info & ": hashes do not match" &
|
raiseAssert info & ": hashes do not match" &
|
||||||
" B=" & ctx.lhc.layout.base.bnStr & " L=" & $ctx.lhc.layout.least.bnStr
|
" C=" & ctx.lhc.layout.coupler.bnStr &
|
||||||
|
" D=" & $ctx.lhc.layout.dangling.bnStr
|
||||||
|
|
||||||
trace info & ": merging", B=ctx.lhc.layout.base.bnStr,
|
trace info & ": merging", C=ctx.lhc.layout.coupler.bnStr,
|
||||||
L=ctx.lhc.layout.least.bnStr
|
D=ctx.lhc.layout.dangling.bnStr
|
||||||
|
|
||||||
# Merge adjacent linked chains
|
# Merge adjacent linked chains
|
||||||
ctx.lhc.layout = LinkedHChainsLayout(
|
ctx.lhc.layout = LinkedHChainsLayout(
|
||||||
base: ctx.layout.final, # `B`
|
coupler: ctx.layout.endBn, # `C`
|
||||||
baseHash: ctx.layout.finalHash,
|
couplerHash: ctx.layout.endHash,
|
||||||
least: ctx.layout.final, # `L`
|
dangling: ctx.layout.endBn, # `D`
|
||||||
leastParent: ctx.dbPeekParentHash(ctx.layout.final).expect "Hash32",
|
danglingParent: ctx.dbPeekParentHash(ctx.layout.endBn).expect "Hash32",
|
||||||
final: ctx.layout.final, # `F`
|
endBn: ctx.layout.endBn, # `E`
|
||||||
finalHash: ctx.layout.finalHash)
|
endHash: ctx.layout.endHash)
|
||||||
|
|
||||||
# Save state
|
# Save state
|
||||||
discard ctx.dbStoreLinkedHChainsLayout()
|
discard ctx.dbStoreLinkedHChainsLayout()
|
||||||
|
@ -128,34 +126,33 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef): bool =
|
||||||
# Public functions
|
# Public functions
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
proc updateLinkedHChainsLayout*(ctx: BeaconCtxRef): bool =
|
proc updateLinkedHChainsLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||||
## Update layout
|
## Update layout
|
||||||
|
|
||||||
# Check whether there is something to do regarding beacon node change
|
# Check whether there is something to do regarding beacon node change
|
||||||
if ctx.lhc.beacon.changed:
|
if ctx.lhc.final.changed:
|
||||||
ctx.lhc.beacon.changed = false
|
ctx.lhc.final.changed = false
|
||||||
result = ctx.updateBeaconChange()
|
result = ctx.updateFinalisedChange info
|
||||||
|
|
||||||
# Check whether header downloading is done
|
# Check whether header downloading is done
|
||||||
if ctx.mergeAdjacentChains():
|
if ctx.mergeAdjacentChains info:
|
||||||
result = true
|
result = true
|
||||||
|
|
||||||
|
|
||||||
proc updateBlockRequests*(ctx: BeaconCtxRef): bool =
|
proc updateBlockRequests*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||||
## Update block requests if there staged block queue is empty
|
## Update block requests if there staged block queue is empty
|
||||||
const info = "updateBlockRequests"
|
let base = ctx.dbStateBlockNumber()
|
||||||
|
if base < ctx.layout.coupler: # so half open interval `(B,C]` is not empty
|
||||||
|
|
||||||
let t = ctx.dbStateBlockNumber()
|
# One can fill/import/execute blocks by number from `(B,C]`
|
||||||
if t < ctx.layout.base: # so the half open interval `(T,B]` is not empty
|
if ctx.blk.topRequest < ctx.layout.coupler:
|
||||||
|
|
||||||
# One can fill/import/execute blocks by number from `(T,B]`
|
|
||||||
if ctx.blk.topRequest < ctx.layout.base:
|
|
||||||
# So there is some space
|
# So there is some space
|
||||||
trace info & ": extending", T=t.bnStr, topReq=ctx.blk.topRequest.bnStr,
|
trace info & ": updating", B=base.bnStr, topReq=ctx.blk.topRequest.bnStr,
|
||||||
B=ctx.layout.base.bnStr
|
C=ctx.layout.coupler.bnStr
|
||||||
|
|
||||||
ctx.blocksUnprocCommit(0, max(t,ctx.blk.topRequest) + 1, ctx.layout.base)
|
ctx.blocksUnprocCommit(
|
||||||
ctx.blk.topRequest = ctx.layout.base
|
0, max(base, ctx.blk.topRequest) + 1, ctx.layout.coupler)
|
||||||
|
ctx.blk.topRequest = ctx.layout.coupler
|
||||||
return true
|
return true
|
||||||
|
|
||||||
false
|
false
|
||||||
|
|
|
@ -15,53 +15,55 @@ import
|
||||||
../../worker_desc,
|
../../worker_desc,
|
||||||
".."/[db, blocks_staged, headers_staged]
|
".."/[db, blocks_staged, headers_staged]
|
||||||
|
|
||||||
declareGauge beacon_beacon_block_number, "" &
|
|
||||||
"Block number of latest known finalised header"
|
|
||||||
|
|
||||||
declareGauge beacon_state_block_number, "" &
|
declareGauge beacon_base, "" &
|
||||||
"Max block number of imported/executed blocks"
|
"Max block number of imported/executed blocks"
|
||||||
|
|
||||||
declareGauge beacon_base_block_number, "" &
|
declareGauge beacon_coupler, "" &
|
||||||
"Max block number initial header chain starting at genesis"
|
"Max block number for header chain starting at genesis"
|
||||||
|
|
||||||
declareGauge beacon_least_block_number, "" &
|
declareGauge beacon_dangling, "" &
|
||||||
"Starting/min block number for higher up headers chain"
|
"Starting/min block number for higher up headers chain"
|
||||||
|
|
||||||
declareGauge beacon_final_block_number, "" &
|
declareGauge beacon_end, "" &
|
||||||
"Ending/max block number of higher up headers chain"
|
"Ending/max block number of higher up headers chain"
|
||||||
|
|
||||||
declareGauge beacon_headers_staged_queue_len, "" &
|
declareGauge beacon_final, "" &
|
||||||
|
"Block number of latest known finalised header"
|
||||||
|
|
||||||
|
|
||||||
|
declareGauge beacon_header_lists_staged, "" &
|
||||||
"Number of header list records staged for serialised processing"
|
"Number of header list records staged for serialised processing"
|
||||||
|
|
||||||
declareGauge beacon_headers_unprocessed, "" &
|
declareGauge beacon_headers_unprocessed, "" &
|
||||||
"Number of block numbers ready to fetch and stage headers"
|
"Number of block numbers ready to fetch and stage headers"
|
||||||
|
|
||||||
declareGauge beacon_blocks_staged_queue_len, "" &
|
declareGauge beacon_block_lists_staged, "" &
|
||||||
"Number of block list records staged for importing"
|
"Number of block list records staged for importing"
|
||||||
|
|
||||||
declareGauge beacon_blocks_unprocessed, "" &
|
declareGauge beacon_blocks_unprocessed, "" &
|
||||||
"Number of block numbers ready to fetch and stage block data"
|
"Number of block numbers ready to fetch and stage block data"
|
||||||
|
|
||||||
declareGauge beacon_number_of_buddies, "" &
|
|
||||||
|
declareGauge beacon_buddies, "" &
|
||||||
"Number of currently active worker instances"
|
"Number of currently active worker instances"
|
||||||
|
|
||||||
|
|
||||||
template updateMetricsImpl*(ctx: BeaconCtxRef) =
|
template updateMetricsImpl*(ctx: BeaconCtxRef) =
|
||||||
metrics.set(beacon_beacon_block_number, ctx.lhc.beacon.header.number.int64)
|
metrics.set(beacon_base, ctx.dbStateBlockNumber().int64)
|
||||||
|
metrics.set(beacon_coupler, ctx.layout.coupler.int64)
|
||||||
|
metrics.set(beacon_dangling, ctx.layout.dangling.int64)
|
||||||
|
metrics.set(beacon_end, ctx.layout.endBn.int64)
|
||||||
|
metrics.set(beacon_final, ctx.lhc.final.header.number.int64)
|
||||||
|
|
||||||
metrics.set(beacon_state_block_number, ctx.dbStateBlockNumber().int64)
|
metrics.set(beacon_header_lists_staged, ctx.headersStagedQueueLen())
|
||||||
metrics.set(beacon_base_block_number, ctx.layout.base.int64)
|
|
||||||
metrics.set(beacon_least_block_number, ctx.layout.least.int64)
|
|
||||||
metrics.set(beacon_final_block_number, ctx.layout.final.int64)
|
|
||||||
|
|
||||||
metrics.set(beacon_headers_staged_queue_len, ctx.headersStagedQueueLen())
|
|
||||||
metrics.set(beacon_headers_unprocessed,
|
metrics.set(beacon_headers_unprocessed,
|
||||||
(ctx.headersUnprocTotal() + ctx.headersUnprocBorrowed()).int64)
|
(ctx.headersUnprocTotal() + ctx.headersUnprocBorrowed()).int64)
|
||||||
|
|
||||||
metrics.set(beacon_blocks_staged_queue_len, ctx.blocksStagedQueueLen())
|
metrics.set(beacon_block_lists_staged, ctx.blocksStagedQueueLen())
|
||||||
metrics.set(beacon_blocks_unprocessed,
|
metrics.set(beacon_blocks_unprocessed,
|
||||||
(ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed()).int64)
|
(ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed()).int64)
|
||||||
|
|
||||||
metrics.set(beacon_number_of_buddies, ctx.pool.nBuddies)
|
metrics.set(beacon_buddies, ctx.pool.nBuddies)
|
||||||
|
|
||||||
# End
|
# End
|
||||||
|
|
|
@ -16,10 +16,11 @@ import
|
||||||
pkg/stew/[interval_set, sorted_set],
|
pkg/stew/[interval_set, sorted_set],
|
||||||
../../core/chain,
|
../../core/chain,
|
||||||
../sync_desc,
|
../sync_desc,
|
||||||
|
./worker/helpers,
|
||||||
./worker_config
|
./worker_config
|
||||||
|
|
||||||
export
|
export
|
||||||
sync_desc, worker_config
|
helpers, sync_desc, worker_config
|
||||||
|
|
||||||
when enableTicker:
|
when enableTicker:
|
||||||
import ./worker/start_stop/ticker
|
import ./worker/start_stop/ticker
|
||||||
|
@ -54,38 +55,31 @@ type
|
||||||
# -------------------
|
# -------------------
|
||||||
|
|
||||||
LinkedHChainsLayout* = object
|
LinkedHChainsLayout* = object
|
||||||
## Layout of a triple of linked header chains
|
## Layout of a linked header chains defined by the triple `(C,D,E)` as
|
||||||
|
## described in the `README.md` text.
|
||||||
## ::
|
## ::
|
||||||
## G B L F
|
## 0 C D E
|
||||||
## o----------------o---------------------o----------------o--->
|
## o----------------o---------------------o----------------o--->
|
||||||
## | <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
## | <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
||||||
##
|
##
|
||||||
## see `README.md` for details and explanations
|
coupler*: BlockNumber ## Right end `C` of linked chain `[0,C]`
|
||||||
##
|
couplerHash*: Hash32 ## Hash of `C`
|
||||||
base*: BlockNumber
|
|
||||||
## `B`, maximal block number of linked chain starting at Genesis `G`
|
|
||||||
baseHash*: Hash32
|
|
||||||
## Hash of `B`
|
|
||||||
|
|
||||||
least*: BlockNumber
|
dangling*: BlockNumber ## Left end `D` of linked chain `[D,E]`
|
||||||
## `L`, minimal block number of linked chain ending at `F` with `B <= L`
|
danglingParent*: Hash32 ## Parent hash of `D`
|
||||||
leastParent*: Hash32
|
|
||||||
## Parent hash of `L` (similar to `parentHash` in `HeaderChainItemRef`)
|
|
||||||
|
|
||||||
final*: BlockNumber
|
endBn*: BlockNumber ## `E`, block num of some finalised block
|
||||||
## `F`, some finalised block
|
endHash*: Hash32 ## Hash of `E`
|
||||||
finalHash*: Hash32
|
|
||||||
## Hash of `F` (similar to `hash` in `HeaderChainItemRef`)
|
|
||||||
|
|
||||||
BeaconHeader* = object
|
BeaconHeader* = object
|
||||||
## Beacon state to be implicitely updated by RPC method
|
## Beacon state to be implicitely updated by RPC method
|
||||||
changed*: bool ## Set a marker if something has changed
|
changed*: bool ## Set a marker if something has changed
|
||||||
header*: Header ## Beacon chain, finalised header
|
header*: Header ## Beacon chain, finalised header
|
||||||
finalised*: Hash32 ## From RPC, ghash of finalised header
|
hash*: Hash32 ## From RPC, hash of finalised header
|
||||||
|
|
||||||
LinkedHChainsSync* = object
|
LinkedHChainsSync* = object
|
||||||
## Sync state for linked header chains
|
## Sync state for linked header chains
|
||||||
beacon*: BeaconHeader ## See `Z` in README
|
final*: BeaconHeader ## Finalised block, see `F` in `README.md`
|
||||||
unprocessed*: BnRangeSet ## Block or header ranges to fetch
|
unprocessed*: BnRangeSet ## Block or header ranges to fetch
|
||||||
borrowed*: uint64 ## Total of temp. fetched ranges
|
borrowed*: uint64 ## Total of temp. fetched ranges
|
||||||
staged*: LinkedHChainQueue ## Blocks fetched but not stored yet
|
staged*: LinkedHChainQueue ## Blocks fetched but not stored yet
|
||||||
|
|
Loading…
Reference in New Issue