2021-03-26 07:52:01 +01:00
|
|
|
# beacon_chain
|
2024-01-06 15:26:56 +01:00
|
|
|
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
2021-03-26 07:52:01 +01:00
|
|
|
# Licensed and distributed under either of
|
|
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
|
2023-01-20 14:14:37 +00:00
|
|
|
{.push raises: [].}
|
2021-03-26 07:52:01 +01:00
|
|
|
|
2024-01-13 10:54:24 +01:00
|
|
|
import std/[strutils, sequtils, algorithm]
|
2022-01-21 11:35:54 +02:00
|
|
|
import stew/[results, base10], chronos, chronicles
|
2021-08-12 15:08:20 +02:00
|
|
|
import
|
2022-01-18 13:36:52 +00:00
|
|
|
../spec/datatypes/[phase0, altair],
|
2022-06-02 12:39:08 +03:00
|
|
|
../spec/eth2_apis/rest_types,
|
2024-07-01 17:42:29 +05:30
|
|
|
../spec/[helpers, forks, network, eip7594_helpers],
|
2022-01-26 13:20:08 +01:00
|
|
|
../networking/[peer_pool, peer_scores, eth2_network],
|
2023-03-07 21:19:17 +01:00
|
|
|
../gossip_processing/block_processor,
|
2021-12-16 15:57:16 +01:00
|
|
|
../beacon_clock,
|
2022-02-11 21:40:49 +01:00
|
|
|
"."/[sync_protocol, sync_queue]
|
2021-08-12 15:08:20 +02:00
|
|
|
|
2021-12-16 15:57:16 +01:00
|
|
|
export phase0, altair, merge, chronos, chronicles, results,
|
2022-02-11 21:40:49 +01:00
|
|
|
helpers, peer_scores, sync_queue, forks, sync_protocol
|
2020-01-21 20:30:21 +02:00
|
|
|
|
2020-01-22 14:47:55 +02:00
|
|
|
logScope:
|
|
|
|
topics = "syncman"
|
2020-01-21 20:30:21 +02:00
|
|
|
|
2020-04-23 18:31:00 +03:00
|
|
|
const
|
2024-07-04 16:49:55 +05:30
|
|
|
SyncWorkersCount* = 20
|
2020-09-11 15:46:01 +03:00
|
|
|
## Number of sync workers to spawn
|
|
|
|
|
2020-09-23 18:58:02 +03:00
|
|
|
StatusUpdateInterval* = chronos.minutes(1)
|
|
|
|
## Minimum time between two subsequent calls to update peer's status
|
|
|
|
|
2020-10-08 15:50:48 +03:00
|
|
|
StatusExpirationTime* = chronos.minutes(2)
|
|
|
|
## Time time it takes for the peer's status information to expire.
|
|
|
|
|
2020-01-21 20:30:21 +02:00
|
|
|
type
|
2020-09-11 15:46:01 +03:00
|
|
|
SyncWorkerStatus* {.pure.} = enum
|
2021-12-16 15:57:16 +01:00
|
|
|
Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Queueing,
|
|
|
|
Processing
|
2020-09-11 15:46:01 +03:00
|
|
|
|
2022-04-14 16:17:44 +03:00
|
|
|
SyncManagerFlag* {.pure.} = enum
|
|
|
|
NoMonitor
|
|
|
|
|
2020-09-11 15:46:01 +03:00
|
|
|
SyncWorker*[A, B] = object
|
2024-02-09 08:35:41 +00:00
|
|
|
future: Future[void].Raising([CancelledError])
|
2020-09-11 15:46:01 +03:00
|
|
|
status: SyncWorkerStatus
|
|
|
|
|
2020-04-20 17:59:18 +03:00
|
|
|
SyncManager*[A, B] = ref object
|
|
|
|
pool: PeerPool[A, B]
|
2023-02-15 16:44:09 +02:00
|
|
|
DENEB_FORK_EPOCH: Epoch
|
2023-11-09 21:41:17 +01:00
|
|
|
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: uint64
|
2024-07-02 03:09:49 +05:30
|
|
|
MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS: uint64
|
2020-04-20 17:59:18 +03:00
|
|
|
responseTimeout: chronos.Duration
|
2024-07-22 19:04:04 +05:30
|
|
|
supernode: bool
|
2020-04-20 17:59:18 +03:00
|
|
|
maxHeadAge: uint64
|
|
|
|
getLocalHeadSlot: GetSlotCallback
|
|
|
|
getLocalWallSlot: GetSlotCallback
|
2021-12-16 15:57:16 +01:00
|
|
|
getSafeSlot: GetSlotCallback
|
|
|
|
getFirstSlot: GetSlotCallback
|
|
|
|
getLastSlot: GetSlotCallback
|
2022-01-13 10:37:53 +01:00
|
|
|
progressPivot: Slot
|
2020-09-11 15:46:01 +03:00
|
|
|
workers: array[SyncWorkersCount, SyncWorker[A, B]]
|
|
|
|
notInSyncEvent: AsyncEvent
|
|
|
|
rangeAge: uint64
|
2020-04-23 18:31:00 +03:00
|
|
|
chunkSize: uint64
|
2020-05-19 15:08:50 +03:00
|
|
|
queue: SyncQueue[A]
|
2020-08-10 10:15:50 +03:00
|
|
|
syncFut: Future[void]
|
2021-12-16 15:57:16 +01:00
|
|
|
blockVerifier: BlockVerifier
|
2020-06-03 11:46:29 +03:00
|
|
|
inProgress*: bool
|
2020-10-27 11:25:28 +02:00
|
|
|
insSyncSpeed*: float
|
|
|
|
avgSyncSpeed*: float
|
2020-09-11 15:46:01 +03:00
|
|
|
syncStatus*: string
|
2021-12-16 15:57:16 +01:00
|
|
|
direction: SyncQueueKind
|
2022-03-03 09:05:33 +01:00
|
|
|
ident*: string
|
2022-04-14 16:17:44 +03:00
|
|
|
flags: set[SyncManagerFlag]
|
2020-05-19 15:08:50 +03:00
|
|
|
|
|
|
|
SyncMoment* = object
|
|
|
|
stamp*: chronos.Moment
|
2021-12-16 15:57:16 +01:00
|
|
|
slots*: uint64
|
2020-05-28 08:02:28 +03:00
|
|
|
|
2023-08-12 03:10:12 +00:00
|
|
|
BeaconBlocksRes =
|
|
|
|
NetRes[List[ref ForkedSignedBeaconBlock, Limit MAX_REQUEST_BLOCKS]]
|
2023-04-17 19:18:54 +02:00
|
|
|
BlobSidecarsRes = NetRes[List[ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)]]
|
2024-06-21 14:51:54 +05:30
|
|
|
DataColumnSidecarsRes =
|
|
|
|
NetRes[List[ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMNS)]]
|
2020-04-20 17:59:18 +03:00
|
|
|
|
2021-12-16 15:57:16 +01:00
|
|
|
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
|
|
|
|
SyncMoment(stamp: now(chronos.Moment), slots: slots)
|
2020-05-19 15:08:50 +03:00
|
|
|
|
|
|
|
proc speed*(start, finish: SyncMoment): float {.inline.} =
|
|
|
|
## Returns number of slots per second.
|
2021-12-16 15:57:16 +01:00
|
|
|
if finish.slots <= start.slots or finish.stamp <= start.stamp:
|
|
|
|
0.0 # replays for example
|
2020-05-19 15:08:50 +03:00
|
|
|
else:
|
2021-12-16 15:57:16 +01:00
|
|
|
let
|
|
|
|
slots = float(finish.slots - start.slots)
|
|
|
|
dur = toFloatSeconds(finish.stamp - start.stamp)
|
|
|
|
slots / dur
|
|
|
|
|
|
|
|
proc initQueue[A, B](man: SyncManager[A, B]) =
|
2022-01-20 08:25:45 +01:00
|
|
|
case man.direction
|
|
|
|
of SyncQueueKind.Forward:
|
|
|
|
man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(),
|
|
|
|
man.getLastSlot(), man.chunkSize,
|
2023-02-11 21:48:35 +01:00
|
|
|
man.getSafeSlot, man.blockVerifier,
|
2023-04-18 02:12:57 +02:00
|
|
|
1, man.ident)
|
2022-01-20 08:25:45 +01:00
|
|
|
of SyncQueueKind.Backward:
|
|
|
|
let
|
|
|
|
firstSlot = man.getFirstSlot()
|
|
|
|
lastSlot = man.getLastSlot()
|
|
|
|
startSlot = if firstSlot == lastSlot:
|
|
|
|
# This case should never be happened in real life because
|
|
|
|
# there is present check `needsBackfill().
|
|
|
|
firstSlot
|
|
|
|
else:
|
2024-05-22 11:56:37 +00:00
|
|
|
firstSlot - 1'u64
|
2022-02-13 19:13:18 +01:00
|
|
|
man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot,
|
2022-01-20 08:25:45 +01:00
|
|
|
man.chunkSize, man.getSafeSlot,
|
2023-04-18 02:12:57 +02:00
|
|
|
man.blockVerifier, 1, man.ident)
|
2020-05-19 15:08:50 +03:00
|
|
|
|
2020-01-21 20:30:21 +02:00
|
|
|
proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
2023-03-11 01:28:19 +01:00
|
|
|
denebEpoch: Epoch,
|
2023-11-09 21:41:17 +01:00
|
|
|
minEpochsForBlobSidecarsRequests: uint64,
|
2024-07-22 19:04:04 +05:30
|
|
|
supernode: bool,
|
2021-12-16 15:57:16 +01:00
|
|
|
direction: SyncQueueKind,
|
2020-04-20 17:59:18 +03:00
|
|
|
getLocalHeadSlotCb: GetSlotCallback,
|
|
|
|
getLocalWallSlotCb: GetSlotCallback,
|
2020-07-06 15:53:48 +03:00
|
|
|
getFinalizedSlotCb: GetSlotCallback,
|
2021-12-16 15:57:16 +01:00
|
|
|
getBackfillSlotCb: GetSlotCallback,
|
era: load blocks and states (#3394)
* era: load blocks and states
Era files contain finalized history and can be thought of as an
alternative source for block and state data that allows clients to avoid
syncing this information from the P2P network - the P2P network is then
used to "top up" the client with the most recent data. They can be
freely shared in the community via whatever means (http, torrent, etc)
and serve as a permanent cold store of consensus data (and, after the
merge, execution data) for history buffs and bean counters alike.
This PR gently introduces support for loading blocks and states in two
cases: block requests from rest/p2p and frontfilling when doing
checkpoint sync.
The era files are used as a secondary source if the information is not
found in the database - compared to the database, there are a few key
differences:
* the database stores the block indexed by block root while the era file
indexes by slot - the former is used only in rest, while the latter is
used both by p2p and rest.
* when loading blocks from era files, the root is no longer trivially
available - if it is needed, it must either be computed (slow) or cached
(messy) - the good news is that for p2p requests, it is not needed
* in era files, "framed" snappy encoding is used while in the database
we store unframed snappy - for p2p2 requests, the latter requires
recompression while the former could avoid it
* front-filling is the process of using era files to replace backfilling
- in theory this front-filling could happen from any block and
front-fills with gaps could also be entertained, but our backfilling
algorithm cannot take advantage of this because there's no (simple) way
to tell it to "skip" a range.
* front-filling, as implemented, is a bit slow (10s to load mainnet): we
load the full BeaconState for every era to grab the roots of the blocks
- it would be better to partially load the state - as such, it would
also be good to be able to partially decompress snappy blobs
* lookups from REST via root are served by first looking up a block
summary in the database, then using the slot to load the block data from
the era file - however, there needs to be an option to create the
summary table from era files to fully support historical queries
To test this, `ncli_db` has an era file exporter: the files it creates
should be placed in an `era` folder next to `db` in the data directory.
What's interesting in particular about this setup is that `db` remains
as the source of truth for security purposes - it stores the latest
synced head root which in turn determines where a node "starts" its
consensus participation - the era directory however can be freely shared
between nodes / people without any (significant) security implications,
assuming the era files are consistent / not broken.
There's lots of future improvements to be had:
* we can drop the in-memory `BlockRef` index almost entirely - at this
point, resident memory usage of Nimbus should drop to a cool 500-600 mb
* we could serve era files via REST trivially: this would drop backfill
times to whatever time it takes to download the files - unlike the
current implementation that downloads block by block, downloading an era
at a time almost entirely cuts out request overhead
* we can "reasonably" recreate detailed state history from almost any
point in time, turning an O(slot) process into O(1) effectively - we'll
still need caches and indices to do this with sufficient efficiency for
the rest api, but at least it cuts the whole process down to minutes
instead of hours, for arbitrary points in time
* CI: ignore failures with Nim-1.6 (temporary)
* test fixes
Co-authored-by: Ștefan Talpalaru <stefantalpalaru@yahoo.com>
2022-03-23 09:58:17 +01:00
|
|
|
getFrontfillSlotCb: GetSlotCallback,
|
2022-01-13 10:37:53 +01:00
|
|
|
progressPivot: Slot,
|
2021-12-16 15:57:16 +01:00
|
|
|
blockVerifier: BlockVerifier,
|
2020-06-07 18:36:24 +03:00
|
|
|
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
|
2020-04-20 17:59:18 +03:00
|
|
|
chunkSize = uint64(SLOTS_PER_EPOCH),
|
2022-04-14 16:17:44 +03:00
|
|
|
flags: set[SyncManagerFlag] = {},
|
2022-03-03 09:05:33 +01:00
|
|
|
ident = "main"
|
2020-04-20 17:59:18 +03:00
|
|
|
): SyncManager[A, B] =
|
2021-12-16 15:57:16 +01:00
|
|
|
let (getFirstSlot, getLastSlot, getSafeSlot) = case direction
|
|
|
|
of SyncQueueKind.Forward:
|
|
|
|
(getLocalHeadSlotCb, getLocalWallSlotCb, getFinalizedSlotCb)
|
|
|
|
of SyncQueueKind.Backward:
|
era: load blocks and states (#3394)
* era: load blocks and states
Era files contain finalized history and can be thought of as an
alternative source for block and state data that allows clients to avoid
syncing this information from the P2P network - the P2P network is then
used to "top up" the client with the most recent data. They can be
freely shared in the community via whatever means (http, torrent, etc)
and serve as a permanent cold store of consensus data (and, after the
merge, execution data) for history buffs and bean counters alike.
This PR gently introduces support for loading blocks and states in two
cases: block requests from rest/p2p and frontfilling when doing
checkpoint sync.
The era files are used as a secondary source if the information is not
found in the database - compared to the database, there are a few key
differences:
* the database stores the block indexed by block root while the era file
indexes by slot - the former is used only in rest, while the latter is
used both by p2p and rest.
* when loading blocks from era files, the root is no longer trivially
available - if it is needed, it must either be computed (slow) or cached
(messy) - the good news is that for p2p requests, it is not needed
* in era files, "framed" snappy encoding is used while in the database
we store unframed snappy - for p2p2 requests, the latter requires
recompression while the former could avoid it
* front-filling is the process of using era files to replace backfilling
- in theory this front-filling could happen from any block and
front-fills with gaps could also be entertained, but our backfilling
algorithm cannot take advantage of this because there's no (simple) way
to tell it to "skip" a range.
* front-filling, as implemented, is a bit slow (10s to load mainnet): we
load the full BeaconState for every era to grab the roots of the blocks
- it would be better to partially load the state - as such, it would
also be good to be able to partially decompress snappy blobs
* lookups from REST via root are served by first looking up a block
summary in the database, then using the slot to load the block data from
the era file - however, there needs to be an option to create the
summary table from era files to fully support historical queries
To test this, `ncli_db` has an era file exporter: the files it creates
should be placed in an `era` folder next to `db` in the data directory.
What's interesting in particular about this setup is that `db` remains
as the source of truth for security purposes - it stores the latest
synced head root which in turn determines where a node "starts" its
consensus participation - the era directory however can be freely shared
between nodes / people without any (significant) security implications,
assuming the era files are consistent / not broken.
There's lots of future improvements to be had:
* we can drop the in-memory `BlockRef` index almost entirely - at this
point, resident memory usage of Nimbus should drop to a cool 500-600 mb
* we could serve era files via REST trivially: this would drop backfill
times to whatever time it takes to download the files - unlike the
current implementation that downloads block by block, downloading an era
at a time almost entirely cuts out request overhead
* we can "reasonably" recreate detailed state history from almost any
point in time, turning an O(slot) process into O(1) effectively - we'll
still need caches and indices to do this with sufficient efficiency for
the rest api, but at least it cuts the whole process down to minutes
instead of hours, for arbitrary points in time
* CI: ignore failures with Nim-1.6 (temporary)
* test fixes
Co-authored-by: Ștefan Talpalaru <stefantalpalaru@yahoo.com>
2022-03-23 09:58:17 +01:00
|
|
|
(getBackfillSlotCb, getFrontfillSlotCb, getBackfillSlotCb)
|
2020-05-19 15:08:50 +03:00
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
var res = SyncManager[A, B](
|
2020-04-20 17:59:18 +03:00
|
|
|
pool: pool,
|
2023-03-11 01:28:19 +01:00
|
|
|
DENEB_FORK_EPOCH: denebEpoch,
|
2023-11-09 21:41:17 +01:00
|
|
|
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: minEpochsForBlobSidecarsRequests,
|
2024-07-22 19:04:04 +05:30
|
|
|
supernode: supernode,
|
2020-04-20 17:59:18 +03:00
|
|
|
getLocalHeadSlot: getLocalHeadSlotCb,
|
|
|
|
getLocalWallSlot: getLocalWallSlotCb,
|
2021-12-16 15:57:16 +01:00
|
|
|
getSafeSlot: getSafeSlot,
|
|
|
|
getFirstSlot: getFirstSlot,
|
|
|
|
getLastSlot: getLastSlot,
|
2022-01-13 10:37:53 +01:00
|
|
|
progressPivot: progressPivot,
|
2020-04-20 17:59:18 +03:00
|
|
|
maxHeadAge: maxHeadAge,
|
2020-04-23 18:31:00 +03:00
|
|
|
chunkSize: chunkSize,
|
2021-12-16 15:57:16 +01:00
|
|
|
blockVerifier: blockVerifier,
|
2020-09-11 15:46:01 +03:00
|
|
|
notInSyncEvent: newAsyncEvent(),
|
2022-03-03 09:05:33 +01:00
|
|
|
direction: direction,
|
2022-04-14 16:17:44 +03:00
|
|
|
ident: ident,
|
|
|
|
flags: flags
|
2020-04-20 17:59:18 +03:00
|
|
|
)
|
2022-01-20 08:25:45 +01:00
|
|
|
res.initQueue()
|
|
|
|
res
|
2020-04-20 17:59:18 +03:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
proc getBlocks[A, B](man: SyncManager[A, B], peer: A,
|
|
|
|
req: SyncRequest): Future[BeaconBlocksRes] {.
|
|
|
|
async: (raises: [CancelledError], raw: true).} =
|
2023-06-05 20:52:16 +02:00
|
|
|
mixin getScore, `==`
|
2022-03-03 09:05:33 +01:00
|
|
|
|
|
|
|
logScope:
|
|
|
|
peer_score = peer.getScore()
|
|
|
|
peer_speed = peer.netKbps()
|
|
|
|
sync_ident = man.ident
|
|
|
|
direction = man.direction
|
|
|
|
topics = "syncman"
|
|
|
|
|
2020-04-20 17:59:18 +03:00
|
|
|
doAssert(not(req.isEmpty()), "Request must not be empty!")
|
2022-03-03 09:05:33 +01:00
|
|
|
debug "Requesting blocks from peer", request = req
|
2024-02-09 08:35:41 +00:00
|
|
|
|
|
|
|
beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
|
2022-01-20 08:25:45 +01:00
|
|
|
|
2023-02-11 21:48:35 +01:00
|
|
|
proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
|
|
|
|
let wallEpoch = man.getLocalWallSlot().epoch
|
2023-02-15 16:44:09 +02:00
|
|
|
e >= man.DENEB_FORK_EPOCH and
|
2023-11-09 21:41:17 +01:00
|
|
|
(wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or
|
|
|
|
e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
|
2023-02-11 21:48:35 +01:00
|
|
|
|
2024-07-01 17:42:29 +05:30
|
|
|
proc shouldGetDataColumns[A, B](man: SyncManager[A, B], e: Epoch): bool =
|
|
|
|
let wallEpoch = man.getLocalWallSlot().epoch
|
|
|
|
e >= man.DENEB_FORK_EPOCH and
|
2024-07-02 03:09:49 +05:30
|
|
|
(wallEpoch < man.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS or
|
2024-07-01 17:42:29 +05:30
|
|
|
e >= wallEpoch - man.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS)
|
|
|
|
|
2024-01-18 15:45:10 +01:00
|
|
|
proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
|
2024-02-09 08:35:41 +00:00
|
|
|
req: SyncRequest): Future[BlobSidecarsRes]
|
|
|
|
{.async: (raises: [CancelledError], raw: true).} =
|
2023-02-11 21:48:35 +01:00
|
|
|
mixin getScore, `==`
|
|
|
|
|
|
|
|
logScope:
|
|
|
|
peer_score = peer.getScore()
|
|
|
|
peer_speed = peer.netKbps()
|
|
|
|
sync_ident = man.ident
|
|
|
|
direction = man.direction
|
|
|
|
topics = "syncman"
|
|
|
|
|
|
|
|
doAssert(not(req.isEmpty()), "Request must not be empty!")
|
|
|
|
debug "Requesting blobs sidecars from peer", request = req
|
2024-02-09 08:35:41 +00:00
|
|
|
blobSidecarsByRange(peer, req.slot, req.count)
|
2023-02-11 21:48:35 +01:00
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
proc remainingSlots(man: SyncManager): uint64 =
|
2022-12-23 08:42:55 +01:00
|
|
|
let
|
|
|
|
first = man.getFirstSlot()
|
|
|
|
last = man.getLastSlot()
|
2022-01-20 08:25:45 +01:00
|
|
|
if man.direction == SyncQueueKind.Forward:
|
2022-12-23 08:42:55 +01:00
|
|
|
if last > first:
|
|
|
|
man.getLastSlot() - man.getFirstSlot()
|
|
|
|
else:
|
|
|
|
0'u64
|
2022-01-20 08:25:45 +01:00
|
|
|
else:
|
2022-12-23 08:42:55 +01:00
|
|
|
if first > last:
|
|
|
|
man.getFirstSlot() - man.getLastSlot()
|
|
|
|
else:
|
|
|
|
0'u64
|
2020-09-23 18:58:02 +03:00
|
|
|
|
2023-05-06 10:58:50 +02:00
|
|
|
func groupBlobs*[T](req: SyncRequest[T],
|
|
|
|
blocks: seq[ref ForkedSignedBeaconBlock],
|
|
|
|
blobs: seq[ref BlobSidecar]):
|
|
|
|
Result[seq[BlobSidecars], string] =
|
2024-01-18 15:45:10 +01:00
|
|
|
var
|
|
|
|
grouped = newSeq[BlobSidecars](len(blocks))
|
|
|
|
blob_cursor = 0
|
|
|
|
for block_idx, blck in blocks:
|
|
|
|
withBlck(blck[]):
|
|
|
|
when consensusFork >= ConsensusFork.Deneb:
|
|
|
|
template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments
|
|
|
|
if kzgs.len == 0:
|
|
|
|
continue
|
|
|
|
# Clients MUST include all blob sidecars of each block from which they include blob sidecars.
|
|
|
|
# The following blob sidecars, where they exist, MUST be sent in consecutive (slot, index) order.
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blobsidecarsbyrange-v1
|
|
|
|
let header = forkyBlck.toSignedBeaconBlockHeader()
|
|
|
|
for blob_idx, kzg_commitment in kzgs:
|
|
|
|
if blob_cursor >= blobs.len:
|
|
|
|
return err("BlobSidecar: response too short")
|
|
|
|
let blob_sidecar = blobs[blob_cursor]
|
|
|
|
if blob_sidecar.index != BlobIndex blob_idx:
|
|
|
|
return err("BlobSidecar: unexpected index")
|
|
|
|
if blob_sidecar.kzg_commitment != kzg_commitment:
|
|
|
|
return err("BlobSidecar: unexpected kzg_commitment")
|
|
|
|
if blob_sidecar.signed_block_header != header:
|
|
|
|
return err("BlobSidecar: unexpected signed_block_header")
|
|
|
|
grouped[block_idx].add(blob_sidecar)
|
|
|
|
inc blob_cursor
|
|
|
|
|
|
|
|
if blob_cursor != len(blobs):
|
2023-05-06 10:58:50 +02:00
|
|
|
# we reached end of blocks without consuming all blobs so either
|
|
|
|
# the peer we got too few blocks in the paired request, or the
|
|
|
|
# peer is sending us spurious blobs.
|
|
|
|
Result[seq[BlobSidecars], string].err "invalid block or blob sequence"
|
|
|
|
else:
|
|
|
|
Result[seq[BlobSidecars], string].ok grouped
|
2023-03-07 21:19:17 +01:00
|
|
|
|
2024-01-18 15:45:10 +01:00
|
|
|
func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =
|
|
|
|
for blob_sidecars in blobs:
|
|
|
|
for blob_sidecar in blob_sidecars:
|
|
|
|
? blob_sidecar[].verify_blob_sidecar_inclusion_proof()
|
|
|
|
ok()
|
|
|
|
|
2024-07-01 17:42:29 +05:30
|
|
|
proc getDataColumnSidecars[A, B](man: SyncManager[A, B], peer: A,
|
|
|
|
req: SyncRequest): Future[DataColumnSidecarsRes]
|
|
|
|
{.async: (raises: [CancelledError], raw: true).} =
|
|
|
|
mixin getScore, `==`
|
|
|
|
|
|
|
|
logScope:
|
|
|
|
peer_score = peer.getScore()
|
|
|
|
peer_speed = peer.netKbps()
|
2024-07-02 00:14:58 +05:30
|
|
|
sync_ident = man.ident
|
2024-07-01 17:42:29 +05:30
|
|
|
direction = man.direction
|
|
|
|
topics = "syncman"
|
|
|
|
|
2024-08-18 21:42:10 +05:30
|
|
|
let
|
2024-08-23 12:31:54 +05:30
|
|
|
remoteCustodySubnetCount = peer.lookupCscFromPeer()
|
2024-08-18 21:42:10 +05:30
|
|
|
|
|
|
|
let
|
|
|
|
remoteNodeId = getNodeIdFromPeer(peer)
|
|
|
|
remoteCustodyColumns =
|
2024-09-09 02:09:24 +05:30
|
|
|
remoteNodeId.get_custody_column_list(remoteCustodySubnetCount)
|
2024-08-18 21:42:10 +05:30
|
|
|
|
2024-07-01 17:42:29 +05:30
|
|
|
doAssert(not(req.isEmpty()), "Request must not be empty!")
|
|
|
|
debug "Requesting data column sidecars from peer", request = req
|
2024-08-18 21:42:10 +05:30
|
|
|
dataColumnSidecarsByRange(peer, req.slot, req.count, remoteCustodyColumns)
|
2024-07-01 17:42:29 +05:30
|
|
|
|
|
|
|
func groupDataColumns*[T](req: SyncRequest[T],
|
|
|
|
blocks: seq[ref ForkedSignedBeaconBlock],
|
|
|
|
data_columns: seq[ref DataColumnSidecar]):
|
|
|
|
Result[seq[DataColumnSidecars], string] =
|
|
|
|
var
|
|
|
|
grouped = newSeq[DataColumnSidecars](len(blocks))
|
|
|
|
column_cursor = 0
|
2024-07-02 00:14:58 +05:30
|
|
|
for block_idx, blck in blocks:
|
2024-07-01 17:42:29 +05:30
|
|
|
withBlck(blck[]):
|
2024-07-02 00:14:58 +05:30
|
|
|
when consensusFork >= ConsensusFork.Deneb:
|
2024-07-01 17:42:29 +05:30
|
|
|
template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments
|
|
|
|
if kzgs.len == 0:
|
|
|
|
continue
|
2024-07-15 13:11:09 +05:30
|
|
|
# Clients MUST include all data column sidecars of each block from which they include data column sidecars.
|
|
|
|
# The following data column sidecars, where they exist, MUST be sent in consecutive (slot, index) order.
|
2024-07-01 17:42:29 +05:30
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/specs/_features/eip7594/p2p-interface.md
|
|
|
|
let header = forkyBlck.toSignedBeaconBlockHeader()
|
|
|
|
for column_idx, kzg_commitment in kzgs:
|
|
|
|
if column_cursor >= data_columns.len:
|
|
|
|
return err("DataColumnSidecar: response too short")
|
|
|
|
let data_column_sidecar = data_columns[column_cursor]
|
|
|
|
if kzg_commitment notin data_column_sidecar.kzg_commitments:
|
|
|
|
return err("DataColumnSidecar: unexpected kzg_commitment")
|
|
|
|
if data_column_sidecar.signed_block_header != header:
|
|
|
|
return err("DataColumnSidecar: unexpected signed_block_header")
|
|
|
|
grouped[block_idx].add(data_column_sidecar)
|
|
|
|
inc column_cursor
|
|
|
|
|
|
|
|
if column_cursor != len(data_columns):
|
|
|
|
# we reached end of blocks without consuming all data columns so either
|
|
|
|
# the peer we got too few blocks in the paired request, or the
|
|
|
|
# peer is sending us spurious data columns.
|
|
|
|
Result[seq[DataColumnSidecars], string].err "invalid block or data column sequence"
|
|
|
|
else:
|
|
|
|
Result[seq[DataColumnSidecars], string].ok grouped
|
|
|
|
|
|
|
|
func checkDataColumns(data_columns: seq[DataColumnSidecars]): Result[void, string] =
|
|
|
|
for data_column_sidecars in data_columns:
|
|
|
|
for data_column_sidecar in data_column_sidecars:
|
|
|
|
? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof()
|
|
|
|
ok()
|
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
|
|
|
|
{.async: (raises: [CancelledError]).} =
|
2022-03-03 09:05:33 +01:00
|
|
|
logScope:
|
|
|
|
peer_score = peer.getScore()
|
|
|
|
peer_speed = peer.netKbps()
|
|
|
|
index = index
|
|
|
|
sync_ident = man.ident
|
|
|
|
topics = "syncman"
|
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
var
|
|
|
|
headSlot = man.getLocalHeadSlot()
|
|
|
|
wallSlot = man.getLocalWallSlot()
|
|
|
|
peerSlot = peer.getHeadSlot()
|
2021-12-16 15:57:16 +01:00
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
block: # Check that peer status is recent and relevant
|
2022-03-03 09:05:33 +01:00
|
|
|
logScope:
|
|
|
|
peer = peer
|
|
|
|
direction = man.direction
|
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
debug "Peer's syncing status", wall_clock_slot = wallSlot,
|
2022-03-03 09:05:33 +01:00
|
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot
|
2020-09-11 15:46:01 +03:00
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
let
|
2024-01-13 10:54:24 +01:00
|
|
|
peerStatusAge = Moment.now() - peer.getStatusLastTime()
|
2022-01-20 08:25:45 +01:00
|
|
|
needsUpdate =
|
|
|
|
# Latest status we got is old
|
|
|
|
peerStatusAge >= StatusExpirationTime or
|
|
|
|
# The point we need to sync is close to where the peer is
|
|
|
|
man.getFirstSlot() >= peerSlot
|
|
|
|
|
|
|
|
if needsUpdate:
|
|
|
|
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
|
|
|
|
|
|
|
|
# Avoid a stampede of requests, but make them more frequent in case the
|
|
|
|
# peer is "close" to the slot range of interest
|
|
|
|
if peerStatusAge < StatusExpirationTime div 2:
|
|
|
|
await sleepAsync(StatusExpirationTime div 2 - peerStatusAge)
|
|
|
|
|
|
|
|
trace "Updating peer's status information", wall_clock_slot = wallSlot,
|
2022-03-03 09:05:33 +01:00
|
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot
|
2020-09-24 15:14:29 +03:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
if not(await peer.updateStatus()):
|
|
|
|
peer.updateScore(PeerScoreNoStatus)
|
|
|
|
debug "Failed to get remote peer's status, exiting",
|
|
|
|
peer_head_slot = peerSlot
|
2022-03-03 09:05:33 +01:00
|
|
|
|
2020-09-23 18:58:02 +03:00
|
|
|
return
|
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
let newPeerSlot = peer.getHeadSlot()
|
|
|
|
if peerSlot >= newPeerSlot:
|
|
|
|
peer.updateScore(PeerScoreStaleStatus)
|
|
|
|
debug "Peer's status information is stale",
|
2020-10-30 14:33:52 +02:00
|
|
|
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
|
2022-03-03 09:05:33 +01:00
|
|
|
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot
|
2020-10-30 14:33:52 +02:00
|
|
|
else:
|
|
|
|
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
|
|
|
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
|
2022-03-03 09:05:33 +01:00
|
|
|
remote_new_head_slot = newPeerSlot
|
2020-10-30 14:33:52 +02:00
|
|
|
peer.updateScore(PeerScoreGoodStatus)
|
|
|
|
peerSlot = newPeerSlot
|
2020-09-23 18:58:02 +03:00
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
# Time passed - enough to move slots, if sleep happened
|
|
|
|
headSlot = man.getLocalHeadSlot()
|
|
|
|
wallSlot = man.getLocalWallSlot()
|
|
|
|
|
|
|
|
if man.remainingSlots() <= man.maxHeadAge:
|
2022-03-03 09:05:33 +01:00
|
|
|
logScope:
|
|
|
|
peer = peer
|
|
|
|
direction = man.direction
|
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
case man.direction
|
|
|
|
of SyncQueueKind.Forward:
|
|
|
|
info "We are in sync with network", wall_clock_slot = wallSlot,
|
2022-03-03 09:05:33 +01:00
|
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot
|
2022-01-20 08:25:45 +01:00
|
|
|
of SyncQueueKind.Backward:
|
|
|
|
info "Backfill complete", wall_clock_slot = wallSlot,
|
2022-03-03 09:05:33 +01:00
|
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot
|
2022-01-20 08:25:45 +01:00
|
|
|
|
|
|
|
# We clear SyncManager's `notInSyncEvent` so all the workers will become
|
|
|
|
# sleeping soon.
|
|
|
|
man.notInSyncEvent.clear()
|
|
|
|
return
|
|
|
|
|
|
|
|
# Find out if the peer potentially can give useful blocks - in the case of
|
|
|
|
# forward sync, they can be useful if they have blocks newer than our head -
|
|
|
|
# in the case of backwards sync, they're useful if they have blocks newer than
|
|
|
|
# the backfill point
|
|
|
|
if man.getFirstSlot() >= peerSlot:
|
|
|
|
# This is not very good solution because we should not discriminate and/or
|
|
|
|
# penalize peers which are in sync process too, but their latest head is
|
|
|
|
# lower then our latest head. We should keep connections with such peers
|
|
|
|
# (so this peers are able to get in sync using our data), but we should
|
|
|
|
# not use this peers for syncing because this peers are useless for us.
|
|
|
|
# Right now we decreasing peer's score a bit, so it will not be
|
|
|
|
# disconnected due to low peer's score, but new fresh peers could replace
|
|
|
|
# peers with low latest head.
|
2022-03-03 09:05:33 +01:00
|
|
|
debug "Peer's head slot is lower then local head slot", peer = peer,
|
2022-01-20 08:25:45 +01:00
|
|
|
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
|
|
|
|
local_last_slot = man.getLastSlot(),
|
2022-03-03 09:05:33 +01:00
|
|
|
local_first_slot = man.getFirstSlot(),
|
|
|
|
direction = man.direction
|
2022-01-20 08:25:45 +01:00
|
|
|
peer.updateScore(PeerScoreUseless)
|
2020-09-21 19:02:27 +03:00
|
|
|
return
|
2020-04-20 17:59:18 +03:00
|
|
|
|
2022-12-23 08:42:55 +01:00
|
|
|
# Wall clock keeps ticking, so we need to update the queue
|
|
|
|
man.queue.updateLastSlot(man.getLastSlot())
|
2021-12-16 15:57:16 +01:00
|
|
|
|
2020-09-21 19:02:27 +03:00
|
|
|
man.workers[index].status = SyncWorkerStatus.Requesting
|
|
|
|
let req = man.queue.pop(peerSlot, peer)
|
|
|
|
if req.isEmpty():
|
|
|
|
# SyncQueue could return empty request in 2 cases:
|
|
|
|
# 1. There no more slots in SyncQueue to download (we are synced, but
|
|
|
|
# our ``notInSyncEvent`` is not yet cleared).
|
|
|
|
# 2. Current peer's known head slot is too low to satisfy request.
|
|
|
|
#
|
|
|
|
# To avoid endless loop we going to wait for RESP_TIMEOUT time here.
|
|
|
|
# This time is enough for all pending requests to finish and it is also
|
|
|
|
# enough for main sync loop to clear ``notInSyncEvent``.
|
|
|
|
debug "Empty request received from queue, exiting", peer = peer,
|
|
|
|
local_head_slot = headSlot, remote_head_slot = peerSlot,
|
|
|
|
queue_input_slot = man.queue.inpSlot,
|
|
|
|
queue_output_slot = man.queue.outSlot,
|
2022-03-03 09:05:33 +01:00
|
|
|
queue_last_slot = man.queue.finalSlot, direction = man.direction
|
2023-08-12 03:10:12 +00:00
|
|
|
await sleepAsync(RESP_TIMEOUT_DUR)
|
2020-09-21 19:02:27 +03:00
|
|
|
return
|
2020-04-23 18:31:00 +03:00
|
|
|
|
2020-09-21 19:02:27 +03:00
|
|
|
debug "Creating new request for peer", wall_clock_slot = wallSlot,
|
|
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
2022-03-03 09:05:33 +01:00
|
|
|
request = req
|
2020-09-21 19:02:27 +03:00
|
|
|
|
|
|
|
man.workers[index].status = SyncWorkerStatus.Downloading
|
2020-04-20 17:59:18 +03:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
let blocks = await man.getBlocks(peer, req)
|
|
|
|
if blocks.isErr():
|
|
|
|
peer.updateScore(PeerScoreNoValues)
|
|
|
|
man.queue.push(req)
|
2024-03-22 01:26:50 +01:00
|
|
|
debug "Failed to receive blocks on request",
|
|
|
|
request = req, err = blocks.error
|
2024-02-09 08:35:41 +00:00
|
|
|
return
|
|
|
|
let blockData = blocks.get().asSeq()
|
|
|
|
let blockSmap = getShortMap(req, blockData)
|
|
|
|
debug "Received blocks on request", blocks_count = len(blockData),
|
|
|
|
blocks_map = blockSmap, request = req
|
|
|
|
|
|
|
|
let slots = mapIt(blockData, it[].slot)
|
|
|
|
if not(checkResponse(req, slots)):
|
|
|
|
peer.updateScore(PeerScoreBadResponse)
|
|
|
|
man.queue.push(req)
|
|
|
|
warn "Received blocks sequence is not in requested range",
|
|
|
|
blocks_count = len(blockData), blocks_map = blockSmap,
|
|
|
|
request = req
|
|
|
|
return
|
|
|
|
|
2024-08-23 12:31:54 +05:30
|
|
|
# let shouldGetBlobs =
|
|
|
|
# if not man.shouldGetBlobs(req.slot.epoch):
|
|
|
|
# false
|
|
|
|
# else:
|
|
|
|
# var hasBlobs = false
|
|
|
|
# for blck in blockData:
|
|
|
|
# withBlck(blck[]):
|
|
|
|
# when consensusFork >= ConsensusFork.Deneb:
|
|
|
|
# if forkyBlck.message.body.blob_kzg_commitments.len > 0:
|
|
|
|
# hasBlobs = true
|
|
|
|
# break
|
|
|
|
# hasBlobs
|
2024-03-22 03:27:02 +01:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
func combine(acc: seq[Slot], cur: Slot): seq[Slot] =
|
|
|
|
var copy = acc
|
|
|
|
if copy[copy.len-1] != cur:
|
|
|
|
copy.add(cur)
|
|
|
|
copy
|
|
|
|
|
2024-08-23 12:31:54 +05:30
|
|
|
# let blobData =
|
|
|
|
# if shouldGetBlobs:
|
|
|
|
# let blobs = await man.getBlobSidecars(peer, req)
|
|
|
|
# if blobs.isErr():
|
|
|
|
# peer.updateScore(PeerScoreNoValues)
|
|
|
|
# man.queue.push(req)
|
|
|
|
# debug "Failed to receive blobs on request",
|
|
|
|
# request = req, err = blobs.error
|
|
|
|
# return
|
|
|
|
# let blobData = blobs.get().asSeq()
|
|
|
|
# let blobSmap = getShortMap(req, blobData)
|
|
|
|
# debug "Received blobs on request", blobs_count = len(blobData),
|
|
|
|
# blobs_map = blobSmap, request = req
|
|
|
|
|
|
|
|
# if len(blobData) > 0:
|
|
|
|
# let slots = mapIt(blobData, it[].signed_block_header.message.slot)
|
|
|
|
# let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
|
|
|
# if not(checkResponse(req, uniqueSlots)):
|
|
|
|
# peer.updateScore(PeerScoreBadResponse)
|
|
|
|
# man.queue.push(req)
|
|
|
|
# warn "Received blobs sequence is not in requested range",
|
|
|
|
# blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
|
|
|
|
# request = req
|
|
|
|
# return
|
|
|
|
# let groupedBlobs = groupBlobs(req, blockData, blobData)
|
|
|
|
# if groupedBlobs.isErr():
|
|
|
|
# peer.updateScore(PeerScoreNoValues)
|
|
|
|
# man.queue.push(req)
|
|
|
|
# info "Received blobs sequence is inconsistent",
|
|
|
|
# blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
|
|
|
|
# return
|
|
|
|
# if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
|
|
|
|
# peer.updateScore(PeerScoreBadResponse)
|
|
|
|
# man.queue.push(req)
|
|
|
|
# warn "Received blobs sequence is invalid",
|
|
|
|
# blobs_count = len(blobData),
|
|
|
|
# blobs_map = getShortMap(req, blobData),
|
|
|
|
# request = req,
|
|
|
|
# msg = checkRes.error
|
|
|
|
# return
|
|
|
|
# Opt.some(groupedBlobs.get())
|
|
|
|
# else:
|
|
|
|
# Opt.none(seq[BlobSidecars])
|
2024-01-28 22:45:52 +00:00
|
|
|
|
2024-07-02 00:14:58 +05:30
|
|
|
let shouldGetDataColumns =
|
2024-07-02 03:09:49 +05:30
|
|
|
if not man.shouldGetDataColumns(req.slot.epoch):
|
2024-07-02 00:14:58 +05:30
|
|
|
false
|
|
|
|
else:
|
|
|
|
var hasColumns = false
|
|
|
|
for blck in blockData:
|
|
|
|
withBlck(blck[]):
|
|
|
|
when consensusFork >= ConsensusFork.Deneb:
|
|
|
|
if forkyBlck.message.body.blob_kzg_commitments.len > 0:
|
2024-07-22 19:04:04 +05:30
|
|
|
hasColumns = true
|
2024-07-02 00:14:58 +05:30
|
|
|
break
|
|
|
|
hasColumns
|
|
|
|
|
|
|
|
let dataColumnData =
|
|
|
|
if shouldGetDataColumns:
|
2024-08-17 20:51:36 +05:30
|
|
|
let data_columns = await man.getDataColumnSidecars(peer, req)
|
|
|
|
if data_columns.isErr():
|
|
|
|
# peer.updateScore(PeerScoreNoValues)
|
|
|
|
man.queue.push(req)
|
|
|
|
debug "Failed to receive data columns on request",
|
|
|
|
request = req, err = data_columns.error
|
|
|
|
return
|
|
|
|
let dataColumnData = data_columns.get().asSeq()
|
|
|
|
let dataColumnSmap = getShortMap(req, dataColumnData)
|
|
|
|
debug "Received data columns on request", data_columns_count = len(dataColumnData),
|
|
|
|
data_columns_map = dataColumnSmap, request = req
|
|
|
|
|
|
|
|
if len(dataColumnData) > 0:
|
|
|
|
let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot)
|
|
|
|
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
|
|
|
if not(checkResponse(req, uniqueSlots)):
|
2024-07-02 22:37:09 +05:30
|
|
|
# peer.updateScore(PeerScoreBadResponse)
|
2024-07-02 00:14:58 +05:30
|
|
|
man.queue.push(req)
|
2024-08-17 20:51:36 +05:30
|
|
|
warn "Received data columns sequence is not in requested range",
|
|
|
|
data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData),
|
|
|
|
request = req
|
2024-07-02 00:14:58 +05:30
|
|
|
return
|
2024-08-17 20:51:36 +05:30
|
|
|
let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData)
|
|
|
|
if groupedDataColumns.isErr():
|
|
|
|
# peer.updateScore(PeerScoreNoValues)
|
|
|
|
man.queue.push(req)
|
|
|
|
# warn "Received data columns is inconsistent",
|
|
|
|
# data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error()
|
|
|
|
return
|
|
|
|
if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr):
|
|
|
|
# peer.updateScore(PeerScoreBadResponse)
|
|
|
|
man.queue.push(req)
|
|
|
|
warn "Received data columns is invalid",
|
|
|
|
data_columns_count = len(dataColumnData),
|
|
|
|
data_columns_map = getShortMap(req, dataColumnData),
|
|
|
|
request = req,
|
|
|
|
msg = checkRes.error
|
|
|
|
return
|
|
|
|
Opt.some(groupedDataColumns.get())
|
|
|
|
|
2024-07-02 00:14:58 +05:30
|
|
|
else:
|
|
|
|
Opt.none(seq[DataColumnSidecars])
|
|
|
|
|
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
|
|
|
|
req.contains(man.getSafeSlot()):
|
|
|
|
# The sync protocol does not distinguish between:
|
|
|
|
# - All requested slots are empty
|
|
|
|
# - Peer does not have data available about requested range
|
|
|
|
#
|
|
|
|
# However, we include the `backfill` slot in backward sync requests.
|
|
|
|
# If we receive an empty response to a request covering that slot,
|
|
|
|
# we know that the response is incomplete and can descore.
|
|
|
|
peer.updateScore(PeerScoreNoValues)
|
2023-11-22 09:01:51 +01:00
|
|
|
man.queue.push(req)
|
2024-02-09 08:35:41 +00:00
|
|
|
debug "Response does not include known-to-exist block", request = req
|
2020-09-21 19:02:27 +03:00
|
|
|
return
|
2020-01-21 20:30:21 +02:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
# Scoring will happen in `syncUpdate`.
|
|
|
|
man.workers[index].status = SyncWorkerStatus.Queueing
|
|
|
|
let
|
|
|
|
peerFinalized = peer.getFinalizedEpoch().start_slot()
|
|
|
|
lastSlot = req.slot + req.count
|
|
|
|
# The peer claims the block is finalized - our own block processing will
|
|
|
|
# verify this point down the line
|
|
|
|
# TODO descore peers that lie
|
|
|
|
maybeFinalized = lastSlot < peerFinalized
|
|
|
|
|
2024-07-02 14:36:44 +05:30
|
|
|
await man.queue.push(req, blockData, Opt.none(seq[BlobSidecars]), dataColumnData, maybeFinalized, proc() =
|
2024-02-09 08:35:41 +00:00
|
|
|
man.workers[index].status = SyncWorkerStatus.Processing)
|
|
|
|
|
|
|
|
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} =
|
2020-09-21 19:02:27 +03:00
|
|
|
mixin getKey, getScore, getHeadSlot
|
|
|
|
|
2022-03-03 09:05:33 +01:00
|
|
|
logScope:
|
|
|
|
index = index
|
|
|
|
sync_ident = man.ident
|
|
|
|
direction = man.direction
|
|
|
|
topics = "syncman"
|
|
|
|
|
|
|
|
debug "Starting syncing worker"
|
2020-09-21 19:02:27 +03:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
var peer: A = nil
|
|
|
|
|
|
|
|
try:
|
|
|
|
while true:
|
|
|
|
man.workers[index].status = SyncWorkerStatus.Sleeping
|
|
|
|
# This event is going to be set until we are not in sync with network
|
|
|
|
await man.notInSyncEvent.wait()
|
|
|
|
man.workers[index].status = SyncWorkerStatus.WaitingPeer
|
|
|
|
peer = await man.pool.acquire()
|
|
|
|
await man.syncStep(index, peer)
|
|
|
|
man.pool.release(peer)
|
|
|
|
peer = nil
|
|
|
|
finally:
|
|
|
|
if not(isNil(peer)):
|
|
|
|
man.pool.release(peer)
|
2022-01-20 08:25:45 +01:00
|
|
|
|
2022-03-03 09:05:33 +01:00
|
|
|
debug "Sync worker stopped"
|
2020-09-11 15:46:01 +03:00
|
|
|
|
|
|
|
proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
|
|
|
|
sleeping: int,
|
|
|
|
waiting: int,
|
|
|
|
pending: int] =
|
|
|
|
var map = newString(len(man.workers))
|
|
|
|
var sleeping, waiting, pending: int
|
|
|
|
for i in 0 ..< len(man.workers):
|
|
|
|
var ch: char
|
|
|
|
case man.workers[i].status
|
|
|
|
of SyncWorkerStatus.Sleeping:
|
|
|
|
ch = 's'
|
|
|
|
inc(sleeping)
|
|
|
|
of SyncWorkerStatus.WaitingPeer:
|
|
|
|
ch = 'w'
|
|
|
|
inc(waiting)
|
|
|
|
of SyncWorkerStatus.UpdatingStatus:
|
|
|
|
ch = 'U'
|
|
|
|
inc(pending)
|
|
|
|
of SyncWorkerStatus.Requesting:
|
|
|
|
ch = 'R'
|
|
|
|
inc(pending)
|
|
|
|
of SyncWorkerStatus.Downloading:
|
|
|
|
ch = 'D'
|
|
|
|
inc(pending)
|
2021-12-16 15:57:16 +01:00
|
|
|
of SyncWorkerStatus.Queueing:
|
|
|
|
ch = 'Q'
|
|
|
|
inc(pending)
|
2020-09-11 15:46:01 +03:00
|
|
|
of SyncWorkerStatus.Processing:
|
|
|
|
ch = 'P'
|
|
|
|
inc(pending)
|
|
|
|
map[i] = ch
|
|
|
|
(map, sleeping, waiting, pending)
|
2020-01-21 20:30:21 +02:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
proc startWorkers[A, B](man: SyncManager[A, B]) =
|
2020-11-10 15:47:26 +02:00
|
|
|
# Starting all the synchronization workers.
|
|
|
|
for i in 0 ..< len(man.workers):
|
2024-02-09 08:35:41 +00:00
|
|
|
man.workers[i].future = syncWorker[A, B](man, i)
|
2020-11-10 15:47:26 +02:00
|
|
|
|
2022-01-17 10:27:08 +01:00
|
|
|
proc toTimeLeftString*(d: Duration): string =
|
2021-12-16 15:57:16 +01:00
|
|
|
if d == InfiniteDuration:
|
|
|
|
"--h--m"
|
2020-11-12 13:16:51 +02:00
|
|
|
else:
|
2021-12-16 15:57:16 +01:00
|
|
|
var v = d
|
|
|
|
var res = ""
|
|
|
|
let ndays = chronos.days(v)
|
|
|
|
if ndays > 0:
|
|
|
|
res = res & (if ndays < 10: "0" & $ndays else: $ndays) & "d"
|
|
|
|
v = v - chronos.days(ndays)
|
|
|
|
|
|
|
|
let nhours = chronos.hours(v)
|
|
|
|
if nhours > 0:
|
|
|
|
res = res & (if nhours < 10: "0" & $nhours else: $nhours) & "h"
|
|
|
|
v = v - chronos.hours(nhours)
|
|
|
|
else:
|
|
|
|
res = res & "00h"
|
2020-11-12 13:16:51 +02:00
|
|
|
|
2021-12-16 15:57:16 +01:00
|
|
|
let nmins = chronos.minutes(v)
|
|
|
|
if nmins > 0:
|
|
|
|
res = res & (if nmins < 10: "0" & $nmins else: $nmins) & "m"
|
|
|
|
v = v - chronos.minutes(nmins)
|
|
|
|
else:
|
|
|
|
res = res & "00m"
|
|
|
|
res
|
2020-11-12 13:16:51 +02:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
proc syncClose[A, B](man: SyncManager[A, B],
|
2022-04-14 16:17:44 +03:00
|
|
|
speedTaskFut: Future[void]) {.async.} =
|
2023-09-22 14:06:27 +03:00
|
|
|
var pending: seq[FutureBase]
|
|
|
|
if not(speedTaskFut.finished()):
|
|
|
|
pending.add(speedTaskFut.cancelAndWait())
|
|
|
|
for worker in man.workers:
|
|
|
|
doAssert(worker.status in {Sleeping, WaitingPeer})
|
|
|
|
pending.add(worker.future.cancelAndWait())
|
|
|
|
await noCancel allFutures(pending)
|
2022-04-14 16:17:44 +03:00
|
|
|
|
2020-08-10 10:15:50 +03:00
|
|
|
proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
2022-03-03 09:05:33 +01:00
|
|
|
logScope:
|
|
|
|
sync_ident = man.ident
|
|
|
|
direction = man.direction
|
|
|
|
topics = "syncman"
|
|
|
|
|
2020-04-20 17:59:18 +03:00
|
|
|
mixin getKey, getScore
|
2020-10-27 11:25:28 +02:00
|
|
|
var pauseTime = 0
|
2020-01-21 20:30:21 +02:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
man.startWorkers()
|
2020-01-21 20:30:21 +02:00
|
|
|
|
2022-03-03 09:05:33 +01:00
|
|
|
debug "Synchronization loop started"
|
2020-09-11 15:46:01 +03:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
proc averageSpeedTask() {.async: (raises: [CancelledError]).} =
|
2020-10-27 11:25:28 +02:00
|
|
|
while true:
|
2021-12-16 15:57:16 +01:00
|
|
|
# Reset sync speeds between each loss-of-sync event
|
|
|
|
man.avgSyncSpeed = 0
|
|
|
|
man.insSyncSpeed = 0
|
|
|
|
|
|
|
|
await man.notInSyncEvent.wait()
|
|
|
|
|
|
|
|
# Give the node time to connect to peers and get the sync process started
|
|
|
|
await sleepAsync(seconds(SECONDS_PER_SLOT.int64))
|
|
|
|
|
|
|
|
var
|
|
|
|
stamp = SyncMoment.now(man.queue.progress())
|
|
|
|
syncCount = 0
|
|
|
|
|
|
|
|
while man.inProgress:
|
|
|
|
await sleepAsync(seconds(SECONDS_PER_SLOT.int64))
|
|
|
|
|
|
|
|
let
|
|
|
|
newStamp = SyncMoment.now(man.queue.progress())
|
|
|
|
slotsPerSec = speed(stamp, newStamp)
|
|
|
|
|
|
|
|
syncCount += 1
|
|
|
|
|
|
|
|
man.insSyncSpeed = slotsPerSec
|
|
|
|
man.avgSyncSpeed =
|
|
|
|
man.avgSyncSpeed + (slotsPerSec - man.avgSyncSpeed) / float(syncCount)
|
|
|
|
|
|
|
|
stamp = newStamp
|
2020-10-27 11:25:28 +02:00
|
|
|
|
2024-02-09 08:35:41 +00:00
|
|
|
let averageSpeedTaskFut = averageSpeedTask()
|
2020-05-19 15:08:50 +03:00
|
|
|
|
2020-04-20 17:59:18 +03:00
|
|
|
while true:
|
2020-06-14 12:45:53 +03:00
|
|
|
let wallSlot = man.getLocalWallSlot()
|
|
|
|
let headSlot = man.getLocalHeadSlot()
|
2020-04-20 17:59:18 +03:00
|
|
|
|
2020-09-11 15:46:01 +03:00
|
|
|
let (map, sleeping, waiting, pending) = man.getWorkersStats()
|
2020-04-23 18:31:00 +03:00
|
|
|
|
2020-10-27 11:25:28 +02:00
|
|
|
debug "Current syncing state", workers_map = map,
|
2020-09-11 15:46:01 +03:00
|
|
|
sleeping_workers_count = sleeping,
|
|
|
|
waiting_workers_count = waiting,
|
|
|
|
pending_workers_count = pending,
|
|
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
2020-10-27 11:25:28 +02:00
|
|
|
pause_time = $chronos.seconds(pauseTime),
|
2022-03-03 09:05:33 +01:00
|
|
|
avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed
|
2021-12-16 15:57:16 +01:00
|
|
|
|
|
|
|
let
|
2022-01-13 10:37:53 +01:00
|
|
|
pivot = man.progressPivot
|
2022-06-13 19:00:36 +02:00
|
|
|
progress =
|
|
|
|
case man.queue.kind
|
|
|
|
of SyncQueueKind.Forward:
|
2022-12-23 08:42:55 +01:00
|
|
|
if man.queue.outSlot >= pivot:
|
|
|
|
man.queue.outSlot - pivot
|
|
|
|
else:
|
|
|
|
0'u64
|
2022-06-13 19:00:36 +02:00
|
|
|
of SyncQueueKind.Backward:
|
2022-12-23 08:42:55 +01:00
|
|
|
if pivot >= man.queue.outSlot:
|
|
|
|
pivot - man.queue.outSlot
|
|
|
|
else:
|
|
|
|
0'u64
|
2022-06-13 19:00:36 +02:00
|
|
|
total =
|
|
|
|
case man.queue.kind
|
|
|
|
of SyncQueueKind.Forward:
|
2022-12-23 08:42:55 +01:00
|
|
|
if man.queue.finalSlot >= pivot:
|
|
|
|
man.queue.finalSlot + 1'u64 - pivot
|
|
|
|
else:
|
|
|
|
0'u64
|
2022-06-13 19:00:36 +02:00
|
|
|
of SyncQueueKind.Backward:
|
2022-12-23 08:42:55 +01:00
|
|
|
if pivot >= man.queue.finalSlot:
|
|
|
|
pivot + 1'u64 - man.queue.finalSlot
|
|
|
|
else:
|
|
|
|
0'u64
|
2021-12-16 15:57:16 +01:00
|
|
|
remaining = total - progress
|
2022-06-13 19:00:36 +02:00
|
|
|
done =
|
|
|
|
if total > 0:
|
|
|
|
progress.float / total.float
|
|
|
|
else:
|
|
|
|
1.0
|
2021-12-16 15:57:16 +01:00
|
|
|
timeleft =
|
|
|
|
if man.avgSyncSpeed >= 0.001:
|
2022-06-13 19:00:36 +02:00
|
|
|
Duration.fromFloatSeconds(remaining.float / man.avgSyncSpeed)
|
|
|
|
else:
|
|
|
|
InfiniteDuration
|
2022-01-21 11:35:54 +02:00
|
|
|
currentSlot = Base10.toString(
|
|
|
|
if man.queue.kind == SyncQueueKind.Forward:
|
2022-02-14 12:04:04 +01:00
|
|
|
max(uint64(man.queue.outSlot), 1'u64) - 1'u64
|
2022-01-21 11:35:54 +02:00
|
|
|
else:
|
|
|
|
uint64(man.queue.outSlot) + 1'u64
|
|
|
|
)
|
2020-04-20 17:59:18 +03:00
|
|
|
|
2020-09-11 15:46:01 +03:00
|
|
|
# Update status string
|
2022-04-08 18:22:49 +02:00
|
|
|
man.syncStatus = timeleft.toTimeLeftString() & " (" &
|
2021-12-16 15:57:16 +01:00
|
|
|
(done * 100).formatBiggestFloat(ffDecimal, 2) & "%) " &
|
|
|
|
man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) &
|
2022-01-21 11:35:54 +02:00
|
|
|
"slots/s (" & map & ":" & currentSlot & ")"
|
2020-05-19 15:08:50 +03:00
|
|
|
|
2022-01-20 08:25:45 +01:00
|
|
|
if man.remainingSlots() <= man.maxHeadAge:
|
2020-09-11 15:46:01 +03:00
|
|
|
man.notInSyncEvent.clear()
|
|
|
|
# We are marking SyncManager as not working only when we are in sync and
|
|
|
|
# all sync workers are in `Sleeping` state.
|
|
|
|
if pending > 0:
|
|
|
|
debug "Synchronization loop waits for workers completion",
|
|
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
|
|
difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge,
|
|
|
|
sleeping_workers_count = sleeping,
|
2022-03-03 09:05:33 +01:00
|
|
|
waiting_workers_count = waiting, pending_workers_count = pending
|
2022-01-20 08:25:45 +01:00
|
|
|
# We already synced, so we should reset all the pending workers from
|
|
|
|
# any state they have.
|
|
|
|
man.queue.clearAndWakeup()
|
2020-09-11 15:46:01 +03:00
|
|
|
man.inProgress = true
|
|
|
|
else:
|
2022-01-20 08:25:45 +01:00
|
|
|
case man.direction
|
|
|
|
of SyncQueueKind.Forward:
|
|
|
|
if man.inProgress:
|
2022-04-14 16:17:44 +03:00
|
|
|
if SyncManagerFlag.NoMonitor in man.flags:
|
2024-02-09 08:35:41 +00:00
|
|
|
await man.syncClose(averageSpeedTaskFut)
|
2022-04-14 16:17:44 +03:00
|
|
|
man.inProgress = false
|
|
|
|
debug "Forward synchronization process finished, exiting",
|
|
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
|
|
difference = (wallSlot - headSlot),
|
|
|
|
max_head_age = man.maxHeadAge
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
man.inProgress = false
|
|
|
|
debug "Forward synchronization process finished, sleeping",
|
|
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
|
|
difference = (wallSlot - headSlot),
|
|
|
|
max_head_age = man.maxHeadAge
|
2022-01-20 08:25:45 +01:00
|
|
|
else:
|
|
|
|
debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
|
|
|
|
local_head_slot = headSlot,
|
|
|
|
difference = (wallSlot - headSlot),
|
2022-03-03 09:05:33 +01:00
|
|
|
max_head_age = man.maxHeadAge
|
2022-01-20 08:25:45 +01:00
|
|
|
of SyncQueueKind.Backward:
|
|
|
|
# Backward syncing is going to be executed only once, so we exit loop
|
|
|
|
# and stop all pending tasks which belongs to this instance (sync
|
2024-02-09 08:35:41 +00:00
|
|
|
# workers, speed calculation task).
|
|
|
|
await man.syncClose(averageSpeedTaskFut)
|
2022-01-21 11:35:54 +02:00
|
|
|
man.inProgress = false
|
2022-01-20 08:25:45 +01:00
|
|
|
debug "Backward synchronization process finished, exiting",
|
|
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
|
|
backfill_slot = man.getLastSlot(),
|
2022-03-03 09:05:33 +01:00
|
|
|
max_head_age = man.maxHeadAge
|
2022-01-20 08:25:45 +01:00
|
|
|
break
|
2020-06-14 12:45:53 +03:00
|
|
|
else:
|
2020-10-30 14:33:52 +02:00
|
|
|
if not(man.notInSyncEvent.isSet()):
|
|
|
|
# We get here only if we lost sync for more then `maxHeadAge` period.
|
|
|
|
if pending == 0:
|
2021-12-16 15:57:16 +01:00
|
|
|
man.initQueue()
|
2020-10-30 14:33:52 +02:00
|
|
|
man.notInSyncEvent.fire()
|
|
|
|
man.inProgress = true
|
2022-01-20 08:25:45 +01:00
|
|
|
debug "Node lost sync for more then preset period",
|
|
|
|
period = man.maxHeadAge, wall_head_slot = wallSlot,
|
|
|
|
local_head_slot = headSlot,
|
|
|
|
missing_slots = man.remainingSlots(),
|
2022-03-03 09:05:33 +01:00
|
|
|
progress = float(man.queue.progress())
|
2020-10-30 14:33:52 +02:00
|
|
|
else:
|
|
|
|
man.notInSyncEvent.fire()
|
|
|
|
man.inProgress = true
|
2020-06-14 12:45:53 +03:00
|
|
|
|
2020-09-11 15:46:01 +03:00
|
|
|
await sleepAsync(chronos.seconds(2))
|
|
|
|
|
2020-08-10 10:15:50 +03:00
|
|
|
proc start*[A, B](man: SyncManager[A, B]) =
|
|
|
|
## Starts SyncManager's main loop.
|
|
|
|
man.syncFut = man.syncLoop()
|