mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-28 20:00:43 +00:00
Beacon sync docu todo and async prototype update v2 (#2832)
* Annotate `async` functions for non-exception tracking at compile time details: This also requires some additional try/except catching in the function bodies. * Update sync logic docu to what is to be updated why: The understanding of details of how to accommodate for merging sub-chains of blocks or headers have changed. Some previous set-ups are outright wrong.
This commit is contained in:
parent
226d084e90
commit
7fe4023d1f
@ -33,7 +33,7 @@ proc runSetup(ctx: BeaconCtxRef): bool =
|
|||||||
proc runRelease(ctx: BeaconCtxRef) =
|
proc runRelease(ctx: BeaconCtxRef) =
|
||||||
worker.release(ctx, "RunRelease")
|
worker.release(ctx, "RunRelease")
|
||||||
|
|
||||||
proc runDaemon(ctx: BeaconCtxRef) {.async.} =
|
proc runDaemon(ctx: BeaconCtxRef) {.async: (raises: []).} =
|
||||||
await worker.runDaemon(ctx, "RunDaemon")
|
await worker.runDaemon(ctx, "RunDaemon")
|
||||||
|
|
||||||
proc runStart(buddy: BeaconBuddyRef): bool =
|
proc runStart(buddy: BeaconBuddyRef): bool =
|
||||||
@ -45,7 +45,7 @@ proc runStop(buddy: BeaconBuddyRef) =
|
|||||||
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
|
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
|
||||||
worker.runPool(buddy, last, laps, "RunPool")
|
worker.runPool(buddy, last, laps, "RunPool")
|
||||||
|
|
||||||
proc runPeer(buddy: BeaconBuddyRef) {.async.} =
|
proc runPeer(buddy: BeaconBuddyRef) {.async: (raises: []).} =
|
||||||
await worker.runPeer(buddy, "RunPeer")
|
await worker.runPeer(buddy, "RunPeer")
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
@ -1,169 +1,199 @@
|
|||||||
Beacon Sync
|
Beacon Sync
|
||||||
===========
|
===========
|
||||||
|
|
||||||
According to the merge-first
|
Some definition of terms, and a suggestion of how a beacon sync can be encoded
|
||||||
[glossary](https://notes.status.im/nimbus-merge-first-el?both=#Glossary),
|
providing pseudo code is provided by
|
||||||
a beacon sync is a "*Sync method that relies on devp2p and eth/6x to fetch
|
[Beacon Sync](https://notes.status.im/nimbus-merge-first-el?both=#Beacon-sync).
|
||||||
headers and bodies backwards then apply these in the forward direction to the
|
|
||||||
head state*".
|
|
||||||
|
|
||||||
This [glossary](https://notes.status.im/nimbus-merge-first-el?both=#Glossary)
|
In the following, the data domain the Beacon Sync acts upon is explored and
|
||||||
is used as a naming template for relevant entities described here. When
|
presented. This leads to an implementation description without the help of
|
||||||
referred to, names from the glossary are printed **bold**.
|
pseudo code but rather provides a definition of the sync and domain state
|
||||||
|
at critical moments.
|
||||||
|
|
||||||
Syncing blocks is performed in two overlapping phases
|
For handling block chain imports and related actions, abstraction methods
|
||||||
|
from the `forked_chain` module will be used (abbreviated **FC**.) The **FC**
|
||||||
* loading header chains and stashing them into a separate database table,
|
entities **base** and **latest** from this module are always printed **bold**.
|
||||||
* removing headers from the stashed headers chain, fetching the block bodies
|
|
||||||
the headers refer to and importing/executing them via `persistentBlocks()`.
|
|
||||||
|
|
||||||
So this beacon syncer slightly differs from the definition in the
|
|
||||||
[glossary](https://notes.status.im/nimbus-merge-first-el?both=#Glossary) in
|
|
||||||
that only headers are stashed on the database table and the block bodies are
|
|
||||||
fetched in the *forward* direction.
|
|
||||||
|
|
||||||
The reason for that behavioural change is that the block bodies are addressed
|
|
||||||
by the hash of the block headers for fetching. They cannot be fully verified
|
|
||||||
upon arrival on the cheap (e.g. by a payload hash.) They will be validated not
|
|
||||||
before imported/executed. So potentially corrupt blocks will be discarded.
|
|
||||||
They will automatically be re-fetched with other missing blocks in the
|
|
||||||
*forward* direction.
|
|
||||||
|
|
||||||
|
|
||||||
Header chains
|
Sync Logic Outline
|
||||||
-------------
|
------------------
|
||||||
|
|
||||||
The header chains are the triple of
|
Here is a simplification of the sync process intended to provide a mental
|
||||||
|
outline of how it works.
|
||||||
|
|
||||||
* a consecutively linked chain of headers starting starting at Genesis
|
In the following block chain layouts, a left position always stands for an
|
||||||
* followed by a sequence of missing headers
|
ancestor of a right one.
|
||||||
* followed by a consecutively linked chain of headers ending up at a
|
|
||||||
finalised block header (earlier received from the consensus layer)
|
|
||||||
|
|
||||||
A sequence *@[h(1),h(2),..]* of block headers is called a *linked chain* if
|
0------C1 (1)
|
||||||
|
|
||||||
* block numbers join without gaps, i.e. *h(n).number+1 == h(n+1).number*
|
0--------L1 (2)
|
||||||
* parent hashes match, i.e. *h(n).hash == h(n+1).parentHash*
|
\_______H1
|
||||||
|
|
||||||
General header linked chains layout diagram
|
0------------------C2 (3)
|
||||||
|
|
||||||
0 C D H (1)
|
0--------------------L2 (4)
|
||||||
o----------------o---------------------o----------------o--->
|
\________H2
|
||||||
| <-- linked --> | <-- unprocessed --> | <-- linked --> |
|
|
||||||
|
|
||||||
Here, the single upper letter symbols *0*, *C*, *D*, *H* denote block numbers.
|
|
||||||
For convenience, these letters are also identified with its associated block
|
|
||||||
header or the full blocks. Saying *"the header 0"* is short for *"the header
|
|
||||||
with block number 0"*.
|
|
||||||
|
|
||||||
Meaning of *0*, *C*, *D*, *H*:
|
|
||||||
|
|
||||||
* *0* -- Genesis, block number number *0*
|
|
||||||
* *C* -- coupler, maximal block number of linked chain starting at *0*
|
|
||||||
* *D* -- dangling, minimal block number of linked chain ending at *H*
|
|
||||||
with *C <= D*
|
|
||||||
* *H* -- head, end block number of **consensus head** (not necessarily the
|
|
||||||
latest one as this is moving while processing)
|
|
||||||
|
|
||||||
This definition implies *0 <= C <= D <= H* and the state of the header linked
|
|
||||||
chains can uniquely be described by the triple of block numbers *(C,D,H)*.
|
|
||||||
|
|
||||||
|
|
||||||
### Storage of header chains:
|
|
||||||
|
|
||||||
Some block numbers from the closed interval (including end points) *[0,C]* may
|
|
||||||
correspond to finalised blocks, e.g. the sub-interval *[0,**base**]* where
|
|
||||||
**base** is the block number of the ledger state. The headers for
|
|
||||||
*[0,**base**]* are stored in the persistent state database. The headers for the
|
|
||||||
half open interval *(**base**,C]* are always stored on the *beaconHeader*
|
|
||||||
column of the *KVT* database.
|
|
||||||
|
|
||||||
The block numbers from the interval *[D,H]* also reside on the *beaconHeader*
|
|
||||||
column of the *KVT* database table.
|
|
||||||
|
|
||||||
|
|
||||||
### Header linked chains initialisation:
|
|
||||||
|
|
||||||
Minimal layout on a pristine system
|
|
||||||
|
|
||||||
0 (2)
|
|
||||||
C
|
|
||||||
D
|
|
||||||
H
|
|
||||||
o--->
|
|
||||||
|
|
||||||
When first initialised, the header linked chains are set to *(0,0,0)*.
|
|
||||||
|
|
||||||
|
|
||||||
### Updating a header linked chains:
|
|
||||||
|
|
||||||
A header chain with an non empty open interval *(C,D)* can be updated only by
|
|
||||||
increasing *C* or decreasing *D* by adding/prepending headers so that the
|
|
||||||
linked chain condition is not violated.
|
|
||||||
|
|
||||||
Only when the gap open interval *(C,D)* vanishes, the right end *H* can be
|
|
||||||
increased to a larger target block number *T*, say. This block number will
|
|
||||||
typically be the **consensus head**. Then
|
|
||||||
|
|
||||||
* *C==D* beacuse the open interval *(C,D)* is empty
|
|
||||||
* *C==H* because *C* is maximal (see definition of `C` above)
|
|
||||||
|
|
||||||
and the header chains *(H,H,H)* (depicted in *(3)* below) can be set to
|
|
||||||
*(C,T,T)* as depicted in *(4)* below.
|
|
||||||
|
|
||||||
Layout before updating of *H*
|
|
||||||
|
|
||||||
C (3)
|
|
||||||
D
|
|
||||||
0 H T
|
|
||||||
o----------------o---------------------o---->
|
|
||||||
| <-- linked --> |
|
|
||||||
|
|
||||||
New layout with moving *D* and *H* to *T*
|
|
||||||
|
|
||||||
D' (4)
|
|
||||||
0 C H'
|
|
||||||
o----------------o---------------------o---->
|
|
||||||
| <-- linked --> | <-- unprocessed --> |
|
|
||||||
|
|
||||||
with *D'=T* and *H'=T*.
|
|
||||||
|
|
||||||
Note that diagram *(3)* is a generalisation of *(2)*.
|
|
||||||
|
|
||||||
|
|
||||||
### Complete a header linked chain:
|
|
||||||
|
|
||||||
The header chain is *relatively complete* if it satisfies clause *(3)* above
|
|
||||||
for *0 < C*. It is *fully complete* if *H==T*. It should be obvious that the
|
|
||||||
latter condition is temporary only on a live system (as *T* is contiuously
|
|
||||||
updated.)
|
|
||||||
|
|
||||||
If a *relatively complete* header chain is reached for the first time, the
|
|
||||||
execution layer can start running an importer in the background
|
|
||||||
compiling/executing blocks (starting from block number *#1*.) So the ledger
|
|
||||||
database state will be updated incrementally.
|
|
||||||
|
|
||||||
Block chain import/execution
|
|
||||||
-----------------------------
|
|
||||||
|
|
||||||
The following diagram with a partially imported/executed block chain amends the
|
|
||||||
layout *(1)*:
|
|
||||||
|
|
||||||
0 B L C D H (5)
|
|
||||||
o------------o-----o-------o---------------------o----------------o-->
|
|
||||||
| <-- imported --> | | | |
|
|
||||||
| <------- linked ------> | <-- unprocessed --> | <-- linked --> |
|
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
* *B* is the **base state** stored on the persistent state database. *B* is
|
* *0* is genesis
|
||||||
not addressed directly except upon start up or resuming sync when *B == L*.
|
* *C1*, *C2* are the *latest* (aka cursor) entities from the **FC** module
|
||||||
* *L* is the last imported/executed block, typically up to the **canonical
|
* *L1*, *L2*, are updated *latest* entities from the **FC** module
|
||||||
consensus head**.
|
* *H1*, *H2* are block headers (or blocks) that are used as sync targets
|
||||||
|
|
||||||
The headers corresponding to the half open interval `(L,C]` will be completed
|
At stage *(1)*, there is a chain of imported blocks *[0,C1]* (written as
|
||||||
by fetching block bodies and then import/execute them together with the already
|
compact interval of block numbers.)
|
||||||
cached headers.
|
|
||||||
|
At stage *(2)*, there is a sync request to advance up until block *H1* which
|
||||||
|
is then fetched from the network along with its ancestors way back until there
|
||||||
|
is an ancestor within the chain of imported blocks *[0,L1]*. The chain *[0,L1]*
|
||||||
|
is what the *[0,C1]* has morphed into when the chain of blocks ending at *H1*
|
||||||
|
finds its ancestor.
|
||||||
|
|
||||||
|
At stage *(3)* all blocks recently fetched have now been imported via **FC**.
|
||||||
|
In addition to that, there might have been additional imports from other
|
||||||
|
entities (e.g. `newPayload`) which has advanced *H1* further to *C2*.
|
||||||
|
|
||||||
|
Stage *(3)* has become similar to stage *(1)* with *C1* renamed as *C2*, ditto
|
||||||
|
for the symbols *L2* and *H2* for stage *(4)*.
|
||||||
|
|
||||||
|
|
||||||
|
Implementation, The Gory Details
|
||||||
|
--------------------------------
|
||||||
|
|
||||||
|
### Description of Sync State
|
||||||
|
|
||||||
|
The following diagram depicts a most general state view of the sync and the
|
||||||
|
*FC* modules and at a given point of time
|
||||||
|
|
||||||
|
0 C L (5)
|
||||||
|
o------------o-------o
|
||||||
|
| <--- imported ---> |
|
||||||
|
Y D H
|
||||||
|
o---------------------o----------------o
|
||||||
|
| <-- unprocessed --> | <-- linked --> |
|
||||||
|
|
||||||
|
where
|
||||||
|
|
||||||
|
* *C* -- coupler, cached **base** entity of the **FC** module, reported at
|
||||||
|
the time when *H* was set. This determines the maximal way back length
|
||||||
|
of the *linked* ancestor chain starting at *H*.
|
||||||
|
|
||||||
|
* *Y* -- has the same block number as *C* and is often, but not necessarily
|
||||||
|
equal to *C* (for notation *C~Y* see clause *(6)* below.)
|
||||||
|
|
||||||
|
* *L* -- **latest**, current value of this entity of the **FC** module (i.e.
|
||||||
|
now, when looked up)
|
||||||
|
|
||||||
|
* *D* -- dangling, least block number of the linked chain in progress ending
|
||||||
|
at *H*. This variable is used to record the download state eventually
|
||||||
|
reaching *Y* (for notation *D<<H* see clause *(6)* below.)
|
||||||
|
|
||||||
|
* *H* -- head, sync target which typically is the value of a *sync to new head*
|
||||||
|
request (via RPC)
|
||||||
|
|
||||||
|
The internal sync state (as opposed to the general state also including **FC**)
|
||||||
|
is defined by the triple *(C,D,H)*. Other parameters *L* and *Y* mentioned in
|
||||||
|
*(5)* are considered ephemeral to the sync state. They are always used by its
|
||||||
|
latest value and are not cached by the syncer.
|
||||||
|
|
||||||
|
There are two order releations and some derivatives used to describe relations
|
||||||
|
beween headers or blocks.
|
||||||
|
|
||||||
|
For blocks or headers A and B, A is said less or equal B if the (6)
|
||||||
|
block numbers are less or equal. Notation: A <= B.
|
||||||
|
|
||||||
|
For blocks or headers A and B, A is said ancestor of, or equal to
|
||||||
|
B if B is linked to A following up the lineage of parentHash fields
|
||||||
|
of the block headers. Notation: A << B.
|
||||||
|
|
||||||
|
The relate notation A ~ B stands for A <= B <= A which is posh for
|
||||||
|
saying that A and B have the same block numer.
|
||||||
|
|
||||||
|
The compact interval notation [A,B] stands for the set {X|A<<X<<B}
|
||||||
|
and the half open interval notation stands for [A,B]-{A} (i.e. the
|
||||||
|
interval without the left end point.)
|
||||||
|
|
||||||
|
Note that *A<<B* implies *A<=B*. Boundary conditions that hold for the
|
||||||
|
clause *(5)* diagram are
|
||||||
|
|
||||||
|
C ~ Y, C in [0,L], D in [Y,H] (7)
|
||||||
|
|
||||||
|
|
||||||
|
### Sync Processing
|
||||||
|
|
||||||
|
Sync starts at an idle state
|
||||||
|
|
||||||
|
0 H L (8)
|
||||||
|
o-----------------o--o
|
||||||
|
| <--- imported ---> |
|
||||||
|
|
||||||
|
where *H<=L* (*H* needs only be known by its block number.) The state
|
||||||
|
parameters *C* and *D* are irrelevant here.
|
||||||
|
|
||||||
|
Following, there will be a request to advance *H* to a new position as
|
||||||
|
indicated in the diagram below
|
||||||
|
|
||||||
|
0 C (9)
|
||||||
|
o------------o-------o
|
||||||
|
| <--- imported ---> | D
|
||||||
|
Y H
|
||||||
|
o--------------------------------------o
|
||||||
|
| <----------- unprocessed ----------> |
|
||||||
|
|
||||||
|
with a new sync state *(C,D,H)*. The parameter *C* in clause *(9)* is set
|
||||||
|
as the **base** entity of the **FC** module. *Y* is only known by its block
|
||||||
|
number, *Y~C*. The parameter *D* is set to the download start position *H*.
|
||||||
|
|
||||||
|
The syncer then fetches the header chain *(Y,H]* from the network. For the
|
||||||
|
syncer state *(C,D,H)*, while iteratively fetching headers, only the parameter
|
||||||
|
*D* will change each time a new header was fetched.
|
||||||
|
|
||||||
|
Having finished dowlnoading *(Y,H]* one might end up with a situation
|
||||||
|
|
||||||
|
0 B Z L (10)
|
||||||
|
o-------------o--o---o
|
||||||
|
| <--- imported ---> |
|
||||||
|
Y Z H
|
||||||
|
o---o----------------------------------o
|
||||||
|
| <-------------- linked ------------> |
|
||||||
|
|
||||||
|
where *Z* is in the intersection of *[B,L]\*(Y,H]* with *B* the current
|
||||||
|
**base** entity of the **FC** logic. It is only known that *0<<B<<L*
|
||||||
|
although in many cases *B==C* holds.
|
||||||
|
|
||||||
|
If there is no such *Z* then *(Y,H]* is discarded and sync processing restarts
|
||||||
|
at clause *(8)* by resetting the sync state (e.g. to *(0,0,0)*.)
|
||||||
|
|
||||||
|
Otherwise assume *Z* is the one with the largest block number of the
|
||||||
|
intersection *[B,L]\*(Y,H]*. Then the headers *(Z,H]* will be completed to
|
||||||
|
a lineage of blocks by downloading block bodies.
|
||||||
|
|
||||||
|
0 Z (11)
|
||||||
|
o----------------o---o
|
||||||
|
| <--- imported ---> |
|
||||||
|
Z H
|
||||||
|
o----------------------------------o
|
||||||
|
| <------------ blocks ----------> |
|
||||||
|
|
||||||
|
The blocks *(Z,H]* will then be imported. While this happens, the internal
|
||||||
|
state of the **FC** might change/reset so that further import becomes
|
||||||
|
impossible. Even when starting import, the block *Z* might not be in *[0,L]*
|
||||||
|
anymore due to some internal reset of the **FC** logic. In any of those
|
||||||
|
cases, sync processing restarts at clause *(8)* by resetting the sync state.
|
||||||
|
|
||||||
|
Otherwise the block import will end up at
|
||||||
|
|
||||||
|
0 Z H L (12)
|
||||||
|
o----------------o----------------------------------o---o
|
||||||
|
| <--- imported --------------------------------------> |
|
||||||
|
|
||||||
|
with *H<<L* for *L* the current value of the **latest** entity of the **FC**
|
||||||
|
module. In many cases, *H==L* but there are other actors running that might
|
||||||
|
import blocks quickly after importing *H* so that *H* is seen as ancestor,
|
||||||
|
different from *L* when this stage is formally done with.
|
||||||
|
|
||||||
|
Now clause *(12)* is equivalent to clause *(8)*.
|
||||||
|
|
||||||
|
|
||||||
Running the sync process for *MainNet*
|
Running the sync process for *MainNet*
|
||||||
|
@ -1,3 +1,9 @@
|
|||||||
|
## Update sync state management to what is described in *README.md*
|
||||||
|
|
||||||
|
1. For the moment, the events in *update.nim* need to be adjusted. This will fix an error where the CL forces the EL to fork internally by sending different head request headers with the same bock number.
|
||||||
|
|
||||||
|
2. General scenario update. This is mostly error handling.
|
||||||
|
|
||||||
## General TODO items
|
## General TODO items
|
||||||
|
|
||||||
* Update/resolve code fragments which are tagged FIXME
|
* Update/resolve code fragments which are tagged FIXME
|
||||||
@ -19,17 +25,3 @@ which happened on several `holesky` tests immediately after loging somehing like
|
|||||||
or from another machine with literally the same exception text (but the stack-trace differs)
|
or from another machine with literally the same exception text (but the stack-trace differs)
|
||||||
|
|
||||||
NTC 2024-10-31 21:58:07.616 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=9cbcc52953a8 baseNumber=2646857 baseHash=9db5c2ac537b
|
NTC 2024-10-31 21:58:07.616 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=9cbcc52953a8 baseNumber=2646857 baseHash=9db5c2ac537b
|
||||||
|
|
||||||
|
|
||||||
### 3. Some assert
|
|
||||||
|
|
||||||
Seen on `holesky`, sometimes the header chain cannot not be joined with its
|
|
||||||
lower end after completing due to different hashes leading to an assert failure
|
|
||||||
|
|
||||||
Error: unhandled exception: header chains C-D joining hashes do not match L=#2646126 lHash=3bc2beb1b565 C=#2646126 cHash=3bc2beb1b565 D=#2646127 dParent=671c7c6cb904
|
|
||||||
|
|
||||||
which was preceeded somewhat earlier by log entries
|
|
||||||
|
|
||||||
INF 2024-10-31 18:21:16.464 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=3bc2beb1b565
|
|
||||||
[..]
|
|
||||||
INF 2024-10-31 18:21:25.872 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=671c7c6cb904
|
|
||||||
|
@ -32,13 +32,18 @@ proc bodiesToFetchOk(buddy: BeaconBuddyRef): bool =
|
|||||||
buddy.ctrl.running and
|
buddy.ctrl.running and
|
||||||
not buddy.ctx.poolMode
|
not buddy.ctx.poolMode
|
||||||
|
|
||||||
proc napUnlessSomethingToFetch(buddy: BeaconBuddyRef): Future[bool] {.async.} =
|
proc napUnlessSomethingToFetch(
|
||||||
|
buddy: BeaconBuddyRef;
|
||||||
|
): Future[bool] {.async: (raises: []).} =
|
||||||
## When idle, save cpu cycles waiting for something to do.
|
## When idle, save cpu cycles waiting for something to do.
|
||||||
if buddy.ctx.pool.blockImportOk or # currently importing blocks
|
if buddy.ctx.pool.blockImportOk or # currently importing blocks
|
||||||
buddy.ctx.hibernate or # not activated yet?
|
buddy.ctx.hibernate or # not activated yet?
|
||||||
not (buddy.headersToFetchOk() or # something on TODO list
|
not (buddy.headersToFetchOk() or # something on TODO list
|
||||||
buddy.bodiesToFetchOk()):
|
buddy.bodiesToFetchOk()):
|
||||||
await sleepAsync workerIdleWaitInterval
|
try:
|
||||||
|
await sleepAsync workerIdleWaitInterval
|
||||||
|
except CancelledError:
|
||||||
|
buddy.ctrl.zombie = true
|
||||||
return true
|
return true
|
||||||
else:
|
else:
|
||||||
return false
|
return false
|
||||||
@ -90,7 +95,10 @@ proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
|
|||||||
# Public functions
|
# Public functions
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} =
|
proc runDaemon*(
|
||||||
|
ctx: BeaconCtxRef;
|
||||||
|
info: static[string];
|
||||||
|
) {.async: (raises: []).} =
|
||||||
## Global background job that will be re-started as long as the variable
|
## Global background job that will be re-started as long as the variable
|
||||||
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
|
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
|
||||||
## `ctx.daemon` to `false`, it will be restarted next after it was reset
|
## `ctx.daemon` to `false`, it will be restarted next after it was reset
|
||||||
@ -125,7 +133,8 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} =
|
|||||||
return
|
return
|
||||||
|
|
||||||
# At the end of the cycle, leave time to trigger refill headers/blocks
|
# At the end of the cycle, leave time to trigger refill headers/blocks
|
||||||
await sleepAsync daemonWaitInterval
|
try: await sleepAsync daemonWaitInterval
|
||||||
|
except CancelledError: discard
|
||||||
|
|
||||||
|
|
||||||
proc runPool*(
|
proc runPool*(
|
||||||
@ -152,7 +161,10 @@ proc runPool*(
|
|||||||
true # stop
|
true # stop
|
||||||
|
|
||||||
|
|
||||||
proc runPeer*(buddy: BeaconBuddyRef; info: static[string]) {.async.} =
|
proc runPeer*(
|
||||||
|
buddy: BeaconBuddyRef;
|
||||||
|
info: static[string];
|
||||||
|
) {.async: (raises: []).} =
|
||||||
## This peer worker method is repeatedly invoked (exactly one per peer) while
|
## This peer worker method is repeatedly invoked (exactly one per peer) while
|
||||||
## the `buddy.ctrl.poolMode` flag is set `false`.
|
## the `buddy.ctrl.poolMode` flag is set `false`.
|
||||||
##
|
##
|
||||||
|
@ -43,7 +43,7 @@ proc fetchAndCheck(
|
|||||||
ivReq: BnRange;
|
ivReq: BnRange;
|
||||||
blk: ref BlocksForImport; # update in place
|
blk: ref BlocksForImport; # update in place
|
||||||
info: static[string];
|
info: static[string];
|
||||||
): Future[bool] {.async.} =
|
): Future[bool] {.async: (raises: []).} =
|
||||||
|
|
||||||
let
|
let
|
||||||
ctx = buddy.ctx
|
ctx = buddy.ctx
|
||||||
@ -148,7 +148,7 @@ func blocksStagedFetchOk*(ctx: BeaconCtxRef): bool =
|
|||||||
proc blocksStagedCollect*(
|
proc blocksStagedCollect*(
|
||||||
buddy: BeaconBuddyRef;
|
buddy: BeaconBuddyRef;
|
||||||
info: static[string];
|
info: static[string];
|
||||||
): Future[bool] {.async.} =
|
): Future[bool] {.async: (raises: []).} =
|
||||||
## Collect bodies and stage them.
|
## Collect bodies and stage them.
|
||||||
##
|
##
|
||||||
if buddy.ctx.blocksUnprocIsEmpty():
|
if buddy.ctx.blocksUnprocIsEmpty():
|
||||||
@ -204,7 +204,8 @@ proc blocksStagedCollect*(
|
|||||||
ctx.blocksUnprocCommit(iv.len, iv)
|
ctx.blocksUnprocCommit(iv.len, iv)
|
||||||
# At this stage allow a task switch so that some other peer might try
|
# At this stage allow a task switch so that some other peer might try
|
||||||
# to work on the currently returned interval.
|
# to work on the currently returned interval.
|
||||||
await sleepAsync asyncThreadSwitchTimeSlot
|
try: await sleepAsync asyncThreadSwitchTimeSlot
|
||||||
|
except CancelledError: discard
|
||||||
return false
|
return false
|
||||||
|
|
||||||
# So there were some bodies downloaded already. Turn back unused data
|
# So there were some bodies downloaded already. Turn back unused data
|
||||||
@ -245,7 +246,7 @@ proc blocksStagedImport*(
|
|||||||
ctx: BeaconCtxRef;
|
ctx: BeaconCtxRef;
|
||||||
info: static[string];
|
info: static[string];
|
||||||
): Future[bool]
|
): Future[bool]
|
||||||
{.async.} =
|
{.async: (raises: []).} =
|
||||||
## Import/execute blocks record from staged queue
|
## Import/execute blocks record from staged queue
|
||||||
##
|
##
|
||||||
let qItem = ctx.blk.staged.ge(0).valueOr:
|
let qItem = ctx.blk.staged.ge(0).valueOr:
|
||||||
@ -284,7 +285,8 @@ proc blocksStagedImport*(
|
|||||||
break importLoop
|
break importLoop
|
||||||
|
|
||||||
# Allow pseudo/async thread switch.
|
# Allow pseudo/async thread switch.
|
||||||
await sleepAsync asyncThreadSwitchTimeSlot
|
try: await sleepAsync asyncThreadSwitchTimeSlot
|
||||||
|
except CancelledError: discard
|
||||||
if not ctx.daemon:
|
if not ctx.daemon:
|
||||||
# Shutdown?
|
# Shutdown?
|
||||||
maxImport = ctx.chain.latestNumber()
|
maxImport = ctx.chain.latestNumber()
|
||||||
@ -311,7 +313,8 @@ proc blocksStagedImport*(
|
|||||||
break importLoop
|
break importLoop
|
||||||
|
|
||||||
# Allow pseudo/async thread switch.
|
# Allow pseudo/async thread switch.
|
||||||
await sleepAsync asyncThreadSwitchTimeSlot
|
try: await sleepAsync asyncThreadSwitchTimeSlot
|
||||||
|
except CancelledError: discard
|
||||||
if not ctx.daemon:
|
if not ctx.daemon:
|
||||||
maxImport = ctx.chain.latestNumber()
|
maxImport = ctx.chain.latestNumber()
|
||||||
break importLoop
|
break importLoop
|
||||||
|
@ -32,7 +32,7 @@ proc bodiesFetch*(
|
|||||||
blockHashes: seq[Hash32];
|
blockHashes: seq[Hash32];
|
||||||
info: static[string];
|
info: static[string];
|
||||||
): Future[Result[seq[BlockBody],void]]
|
): Future[Result[seq[BlockBody],void]]
|
||||||
{.async.} =
|
{.async: (raises: []).} =
|
||||||
## Fetch bodies from the network.
|
## Fetch bodies from the network.
|
||||||
let
|
let
|
||||||
peer = buddy.peer
|
peer = buddy.peer
|
||||||
@ -45,7 +45,7 @@ proc bodiesFetch*(
|
|||||||
var resp: Option[blockBodiesObj]
|
var resp: Option[blockBodiesObj]
|
||||||
try:
|
try:
|
||||||
resp = await peer.getBlockBodies(blockHashes)
|
resp = await peer.getBlockBodies(blockHashes)
|
||||||
except TransportError as e:
|
except CatchableError as e:
|
||||||
buddy.fetchRegisterError()
|
buddy.fetchRegisterError()
|
||||||
`info` info & " error", peer, nReq, elapsed=(Moment.now() - start).toStr,
|
`info` info & " error", peer, nReq, elapsed=(Moment.now() - start).toStr,
|
||||||
error=($e.name), msg=e.msg, nRespErrors=buddy.only.nBdyRespErrors
|
error=($e.name), msg=e.msg, nRespErrors=buddy.only.nBdyRespErrors
|
||||||
|
@ -30,7 +30,7 @@ proc fetchAndCheck(
|
|||||||
ivReq: BnRange;
|
ivReq: BnRange;
|
||||||
lhc: ref LinkedHChain; # update in place
|
lhc: ref LinkedHChain; # update in place
|
||||||
info: static[string];
|
info: static[string];
|
||||||
): Future[bool] {.async.} =
|
): Future[bool] {.async: (raises: []).} =
|
||||||
## Collect single header chain from the peer and stash it on the `staged`
|
## Collect single header chain from the peer and stash it on the `staged`
|
||||||
## queue. Returns the length of the stashed chain of headers.
|
## queue. Returns the length of the stashed chain of headers.
|
||||||
##
|
##
|
||||||
@ -55,7 +55,7 @@ proc fetchAndCheck(
|
|||||||
proc headerStagedUpdateTarget*(
|
proc headerStagedUpdateTarget*(
|
||||||
buddy: BeaconBuddyRef;
|
buddy: BeaconBuddyRef;
|
||||||
info: static[string];
|
info: static[string];
|
||||||
) {.async.} =
|
) {.async: (raises: []).} =
|
||||||
## Fetch finalised beacon header if there is an update available
|
## Fetch finalised beacon header if there is an update available
|
||||||
let
|
let
|
||||||
ctx = buddy.ctx
|
ctx = buddy.ctx
|
||||||
@ -100,7 +100,7 @@ proc headerStagedUpdateTarget*(
|
|||||||
proc headersStagedCollect*(
|
proc headersStagedCollect*(
|
||||||
buddy: BeaconBuddyRef;
|
buddy: BeaconBuddyRef;
|
||||||
info: static[string];
|
info: static[string];
|
||||||
): Future[bool] {.async.} =
|
): Future[bool] {.async: (raises: []).} =
|
||||||
## Collect a batch of chained headers totalling to at most `nHeaders`
|
## Collect a batch of chained headers totalling to at most `nHeaders`
|
||||||
## headers. Fetch the headers from the the peer and stash it blockwise on
|
## headers. Fetch the headers from the the peer and stash it blockwise on
|
||||||
## the `staged` queue. The function returns `true` it stashed a header
|
## the `staged` queue. The function returns `true` it stashed a header
|
||||||
@ -178,7 +178,8 @@ proc headersStagedCollect*(
|
|||||||
ctx.headersUnprocCommit(iv.len, iv)
|
ctx.headersUnprocCommit(iv.len, iv)
|
||||||
# At this stage allow a task switch so that some other peer might try
|
# At this stage allow a task switch so that some other peer might try
|
||||||
# to work on the currently returned interval.
|
# to work on the currently returned interval.
|
||||||
await sleepAsync asyncThreadSwitchTimeSlot
|
try: await sleepAsync asyncThreadSwitchTimeSlot
|
||||||
|
except CancelledError: discard
|
||||||
return false
|
return false
|
||||||
|
|
||||||
# So it is deterministic and there were some headers downloaded already.
|
# So it is deterministic and there were some headers downloaded already.
|
||||||
|
@ -38,7 +38,7 @@ proc headersFetchReversed*(
|
|||||||
topHash: Hash32;
|
topHash: Hash32;
|
||||||
info: static[string];
|
info: static[string];
|
||||||
): Future[Result[seq[Header],void]]
|
): Future[Result[seq[Header],void]]
|
||||||
{.async.} =
|
{.async: (raises: []).} =
|
||||||
## Get a list of headers in reverse order.
|
## Get a list of headers in reverse order.
|
||||||
let
|
let
|
||||||
peer = buddy.peer
|
peer = buddy.peer
|
||||||
@ -74,7 +74,7 @@ proc headersFetchReversed*(
|
|||||||
# reliably be used in a `withTimeout()` directive. It would rather crash
|
# reliably be used in a `withTimeout()` directive. It would rather crash
|
||||||
# in `rplx` with a violated `req.timeoutAt <= Moment.now()` assertion.
|
# in `rplx` with a violated `req.timeoutAt <= Moment.now()` assertion.
|
||||||
resp = await peer.getBlockHeaders(req)
|
resp = await peer.getBlockHeaders(req)
|
||||||
except TransportError as e:
|
except CatchableError as e:
|
||||||
buddy.registerError()
|
buddy.registerError()
|
||||||
`info` info & " error", peer, ivReq, nReq=req.maxResults, useHash,
|
`info` info & " error", peer, ivReq, nReq=req.maxResults, useHash,
|
||||||
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg,
|
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg,
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
## *runRelease(ctx: CtxRef[S])*
|
## *runRelease(ctx: CtxRef[S])*
|
||||||
## Global clean up, done with all the worker peers.
|
## Global clean up, done with all the worker peers.
|
||||||
##
|
##
|
||||||
## *runDaemon(ctx: CtxRef[S]) {.async.}*
|
## *runDaemon(ctx: CtxRef[S]) {.async: (raises: []).}*
|
||||||
## Global background job that will be re-started as long as the variable
|
## Global background job that will be re-started as long as the variable
|
||||||
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
|
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
|
||||||
## `ctx.daemon` to `false`, it will be restarted next after it was reset
|
## `ctx.daemon` to `false`, it will be restarted next after it was reset
|
||||||
@ -56,7 +56,7 @@
|
|||||||
## Note that this function does *not* run in `async` mode.
|
## Note that this function does *not* run in `async` mode.
|
||||||
##
|
##
|
||||||
##
|
##
|
||||||
## *runPeer(buddy: BuddyRef[S,W]) {.async.}*
|
## *runPeer(buddy: BuddyRef[S,W]) {.async: (raises: []).}*
|
||||||
## This peer worker method is repeatedly invoked (exactly one per peer) while
|
## This peer worker method is repeatedly invoked (exactly one per peer) while
|
||||||
## the `buddy.ctrl.poolMode` flag is set `false`.
|
## the `buddy.ctrl.poolMode` flag is set `false`.
|
||||||
##
|
##
|
||||||
@ -198,7 +198,7 @@ proc terminate[S,W](dsc: RunnerSyncRef[S,W]) =
|
|||||||
dsc.runCtrl = terminated
|
dsc.runCtrl = terminated
|
||||||
|
|
||||||
|
|
||||||
proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} =
|
proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async: (raises: []).} =
|
||||||
mixin runDaemon
|
mixin runDaemon
|
||||||
|
|
||||||
if dsc.ctx.daemon and dsc.runCtrl == running:
|
if dsc.ctx.daemon and dsc.runCtrl == running:
|
||||||
@ -220,13 +220,16 @@ proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} =
|
|||||||
elapsed = Moment.now() - startMoment
|
elapsed = Moment.now() - startMoment
|
||||||
suspend = if execLoopTimeElapsedMin <= elapsed: execLoopTaskSwitcher
|
suspend = if execLoopTimeElapsedMin <= elapsed: execLoopTaskSwitcher
|
||||||
else: execLoopTimeElapsedMin - elapsed
|
else: execLoopTimeElapsedMin - elapsed
|
||||||
await sleepAsync suspend
|
try:
|
||||||
|
await sleepAsync suspend
|
||||||
|
except CancelledError:
|
||||||
|
break # stop on error (must not end up in busy-loop)
|
||||||
# End while
|
# End while
|
||||||
|
|
||||||
dsc.daemonRunning = false
|
dsc.daemonRunning = false
|
||||||
|
|
||||||
|
|
||||||
proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
|
proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async: (raises: []).} =
|
||||||
mixin runPeer, runPool, runStop
|
mixin runPeer, runPool, runStop
|
||||||
let
|
let
|
||||||
dsc = buddy.dsc
|
dsc = buddy.dsc
|
||||||
@ -257,7 +260,12 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
|
|||||||
# clear to run as the only activated instance.
|
# clear to run as the only activated instance.
|
||||||
dsc.monitorLock = true
|
dsc.monitorLock = true
|
||||||
while 0 < dsc.activeMulti:
|
while 0 < dsc.activeMulti:
|
||||||
await sleepAsync execLoopPollingTime
|
try:
|
||||||
|
await sleepAsync execLoopPollingTime
|
||||||
|
except CancelledError:
|
||||||
|
# must not end up in busy-loop
|
||||||
|
dsc.monitorLock = false
|
||||||
|
break taskExecLoop
|
||||||
if not isActive():
|
if not isActive():
|
||||||
dsc.monitorLock = false
|
dsc.monitorLock = false
|
||||||
break taskExecLoop
|
break taskExecLoop
|
||||||
@ -316,7 +324,10 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
|
|||||||
elapsed = Moment.now() - startMoment
|
elapsed = Moment.now() - startMoment
|
||||||
suspend = if execLoopTimeElapsedMin <= elapsed: execLoopTaskSwitcher
|
suspend = if execLoopTimeElapsedMin <= elapsed: execLoopTaskSwitcher
|
||||||
else: execLoopTimeElapsedMin - elapsed
|
else: execLoopTimeElapsedMin - elapsed
|
||||||
await sleepAsync suspend
|
try:
|
||||||
|
await sleepAsync suspend
|
||||||
|
except CancelledError:
|
||||||
|
break # stop on error (must not end up in busy-loop)
|
||||||
# End while
|
# End while
|
||||||
|
|
||||||
# Note that `runStart()` was dispatched in `onPeerConnected()`
|
# Note that `runStart()` was dispatched in `onPeerConnected()`
|
||||||
|
Loading…
x
Reference in New Issue
Block a user