mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-27 06:47:13 +00:00
2eb56f6e1b
The various `PeerScore` constants are used for both beacon blocks and LC objects, and will likely also find use for EIP4844 blob sidecars. Renaming them to use more generically applicable names not referring to blocks explicitly aymore.
682 lines
24 KiB
Nim
682 lines
24 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018-2022 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import std/[options, heapqueue, tables, strutils, sequtils, algorithm]
|
|
import stew/[results, base10], chronos, chronicles
|
|
import
|
|
../spec/datatypes/[phase0, altair],
|
|
../spec/eth2_apis/rest_types,
|
|
../spec/[helpers, forks, network],
|
|
../networking/[peer_pool, peer_scores, eth2_network],
|
|
../beacon_clock,
|
|
"."/[sync_protocol, sync_queue]
|
|
|
|
export phase0, altair, merge, chronos, chronicles, results,
|
|
helpers, peer_scores, sync_queue, forks, sync_protocol
|
|
|
|
logScope:
|
|
topics = "syncman"
|
|
|
|
const
|
|
SyncWorkersCount* = 10
|
|
## Number of sync workers to spawn
|
|
|
|
StatusUpdateInterval* = chronos.minutes(1)
|
|
## Minimum time between two subsequent calls to update peer's status
|
|
|
|
StatusExpirationTime* = chronos.minutes(2)
|
|
## Time time it takes for the peer's status information to expire.
|
|
|
|
type
|
|
SyncWorkerStatus* {.pure.} = enum
|
|
Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Queueing,
|
|
Processing
|
|
|
|
SyncManagerFlag* {.pure.} = enum
|
|
NoMonitor
|
|
|
|
SyncWorker*[A, B] = object
|
|
future: Future[void]
|
|
status: SyncWorkerStatus
|
|
|
|
SyncManager*[A, B] = ref object
|
|
pool: PeerPool[A, B]
|
|
responseTimeout: chronos.Duration
|
|
maxHeadAge: uint64
|
|
getLocalHeadSlot: GetSlotCallback
|
|
getLocalWallSlot: GetSlotCallback
|
|
getSafeSlot: GetSlotCallback
|
|
getFirstSlot: GetSlotCallback
|
|
getLastSlot: GetSlotCallback
|
|
progressPivot: Slot
|
|
workers: array[SyncWorkersCount, SyncWorker[A, B]]
|
|
notInSyncEvent: AsyncEvent
|
|
rangeAge: uint64
|
|
chunkSize: uint64
|
|
queue: SyncQueue[A]
|
|
syncFut: Future[void]
|
|
blockVerifier: BlockVerifier
|
|
inProgress*: bool
|
|
insSyncSpeed*: float
|
|
avgSyncSpeed*: float
|
|
syncStatus*: string
|
|
direction: SyncQueueKind
|
|
ident*: string
|
|
flags: set[SyncManagerFlag]
|
|
|
|
SyncMoment* = object
|
|
stamp*: chronos.Moment
|
|
slots*: uint64
|
|
|
|
BeaconBlocksRes = NetRes[List[ref ForkedSignedBeaconBlock, MAX_REQUEST_BLOCKS]]
|
|
|
|
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
|
|
SyncMoment(stamp: now(chronos.Moment), slots: slots)
|
|
|
|
proc speed*(start, finish: SyncMoment): float {.inline.} =
|
|
## Returns number of slots per second.
|
|
if finish.slots <= start.slots or finish.stamp <= start.stamp:
|
|
0.0 # replays for example
|
|
else:
|
|
let
|
|
slots = float(finish.slots - start.slots)
|
|
dur = toFloatSeconds(finish.stamp - start.stamp)
|
|
slots / dur
|
|
|
|
proc initQueue[A, B](man: SyncManager[A, B]) =
|
|
case man.direction
|
|
of SyncQueueKind.Forward:
|
|
man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(),
|
|
man.getLastSlot(), man.chunkSize,
|
|
man.getSafeSlot, man.blockVerifier, 1,
|
|
man.ident)
|
|
of SyncQueueKind.Backward:
|
|
let
|
|
firstSlot = man.getFirstSlot()
|
|
lastSlot = man.getLastSlot()
|
|
startSlot = if firstSlot == lastSlot:
|
|
# This case should never be happened in real life because
|
|
# there is present check `needsBackfill().
|
|
firstSlot
|
|
else:
|
|
Slot(firstSlot - 1'u64)
|
|
man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot,
|
|
man.chunkSize, man.getSafeSlot,
|
|
man.blockVerifier, 1,
|
|
man.ident)
|
|
|
|
proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
|
direction: SyncQueueKind,
|
|
getLocalHeadSlotCb: GetSlotCallback,
|
|
getLocalWallSlotCb: GetSlotCallback,
|
|
getFinalizedSlotCb: GetSlotCallback,
|
|
getBackfillSlotCb: GetSlotCallback,
|
|
getFrontfillSlotCb: GetSlotCallback,
|
|
progressPivot: Slot,
|
|
blockVerifier: BlockVerifier,
|
|
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
|
|
chunkSize = uint64(SLOTS_PER_EPOCH),
|
|
flags: set[SyncManagerFlag] = {},
|
|
ident = "main"
|
|
): SyncManager[A, B] =
|
|
let (getFirstSlot, getLastSlot, getSafeSlot) = case direction
|
|
of SyncQueueKind.Forward:
|
|
(getLocalHeadSlotCb, getLocalWallSlotCb, getFinalizedSlotCb)
|
|
of SyncQueueKind.Backward:
|
|
(getBackfillSlotCb, getFrontfillSlotCb, getBackfillSlotCb)
|
|
|
|
var res = SyncManager[A, B](
|
|
pool: pool,
|
|
getLocalHeadSlot: getLocalHeadSlotCb,
|
|
getLocalWallSlot: getLocalWallSlotCb,
|
|
getSafeSlot: getSafeSlot,
|
|
getFirstSlot: getFirstSlot,
|
|
getLastSlot: getLastSlot,
|
|
progressPivot: progressPivot,
|
|
maxHeadAge: maxHeadAge,
|
|
chunkSize: chunkSize,
|
|
blockVerifier: blockVerifier,
|
|
notInSyncEvent: newAsyncEvent(),
|
|
direction: direction,
|
|
ident: ident,
|
|
flags: flags
|
|
)
|
|
res.initQueue()
|
|
res
|
|
|
|
proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
|
|
req: SyncRequest): Future[BeaconBlocksRes] {.async.} =
|
|
mixin beaconBlocksByRange, getScore, `==`
|
|
|
|
logScope:
|
|
peer_score = peer.getScore()
|
|
peer_speed = peer.netKbps()
|
|
sync_ident = man.ident
|
|
direction = man.direction
|
|
topics = "syncman"
|
|
|
|
doAssert(not(req.isEmpty()), "Request must not be empty!")
|
|
debug "Requesting blocks from peer", request = req
|
|
try:
|
|
let res = await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
|
|
|
|
if res.isErr():
|
|
debug "Error, while reading getBlocks response", request = req,
|
|
error = $res.error()
|
|
return
|
|
return res
|
|
except CancelledError:
|
|
debug "Interrupt, while waiting getBlocks response", request = req
|
|
return
|
|
except CatchableError as exc:
|
|
debug "Error, while waiting getBlocks response", request = req,
|
|
errName = exc.name, errMsg = exc.msg
|
|
return
|
|
|
|
proc remainingSlots(man: SyncManager): uint64 =
|
|
if man.direction == SyncQueueKind.Forward:
|
|
man.getLastSlot() - man.getFirstSlot()
|
|
else:
|
|
man.getFirstSlot() - man.getLastSlot()
|
|
|
|
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
|
logScope:
|
|
peer_score = peer.getScore()
|
|
peer_speed = peer.netKbps()
|
|
index = index
|
|
sync_ident = man.ident
|
|
topics = "syncman"
|
|
|
|
var
|
|
headSlot = man.getLocalHeadSlot()
|
|
wallSlot = man.getLocalWallSlot()
|
|
peerSlot = peer.getHeadSlot()
|
|
|
|
block: # Check that peer status is recent and relevant
|
|
logScope:
|
|
peer = peer
|
|
direction = man.direction
|
|
|
|
debug "Peer's syncing status", wall_clock_slot = wallSlot,
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot
|
|
|
|
let
|
|
peerStatusAge = Moment.now() - peer.state(BeaconSync).statusLastTime
|
|
needsUpdate =
|
|
# Latest status we got is old
|
|
peerStatusAge >= StatusExpirationTime or
|
|
# The point we need to sync is close to where the peer is
|
|
man.getFirstSlot() >= peerSlot
|
|
|
|
if needsUpdate:
|
|
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
|
|
|
|
# Avoid a stampede of requests, but make them more frequent in case the
|
|
# peer is "close" to the slot range of interest
|
|
if peerStatusAge < StatusExpirationTime div 2:
|
|
await sleepAsync(StatusExpirationTime div 2 - peerStatusAge)
|
|
|
|
trace "Updating peer's status information", wall_clock_slot = wallSlot,
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot
|
|
|
|
try:
|
|
let res = await peer.updateStatus()
|
|
if not(res):
|
|
peer.updateScore(PeerScoreNoStatus)
|
|
debug "Failed to get remote peer's status, exiting",
|
|
peer_head_slot = peerSlot
|
|
|
|
return
|
|
except CatchableError as exc:
|
|
debug "Unexpected exception while updating peer's status",
|
|
peer_head_slot = peerSlot, errName = exc.name, errMsg = exc.msg
|
|
return
|
|
|
|
let newPeerSlot = peer.getHeadSlot()
|
|
if peerSlot >= newPeerSlot:
|
|
peer.updateScore(PeerScoreStaleStatus)
|
|
debug "Peer's status information is stale",
|
|
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
|
|
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot
|
|
else:
|
|
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
|
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
|
|
remote_new_head_slot = newPeerSlot
|
|
peer.updateScore(PeerScoreGoodStatus)
|
|
peerSlot = newPeerSlot
|
|
|
|
# Time passed - enough to move slots, if sleep happened
|
|
headSlot = man.getLocalHeadSlot()
|
|
wallSlot = man.getLocalWallSlot()
|
|
|
|
if man.remainingSlots() <= man.maxHeadAge:
|
|
logScope:
|
|
peer = peer
|
|
direction = man.direction
|
|
|
|
case man.direction
|
|
of SyncQueueKind.Forward:
|
|
info "We are in sync with network", wall_clock_slot = wallSlot,
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot
|
|
of SyncQueueKind.Backward:
|
|
info "Backfill complete", wall_clock_slot = wallSlot,
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot
|
|
|
|
# We clear SyncManager's `notInSyncEvent` so all the workers will become
|
|
# sleeping soon.
|
|
man.notInSyncEvent.clear()
|
|
return
|
|
|
|
# Find out if the peer potentially can give useful blocks - in the case of
|
|
# forward sync, they can be useful if they have blocks newer than our head -
|
|
# in the case of backwards sync, they're useful if they have blocks newer than
|
|
# the backfill point
|
|
if man.getFirstSlot() >= peerSlot:
|
|
# This is not very good solution because we should not discriminate and/or
|
|
# penalize peers which are in sync process too, but their latest head is
|
|
# lower then our latest head. We should keep connections with such peers
|
|
# (so this peers are able to get in sync using our data), but we should
|
|
# not use this peers for syncing because this peers are useless for us.
|
|
# Right now we decreasing peer's score a bit, so it will not be
|
|
# disconnected due to low peer's score, but new fresh peers could replace
|
|
# peers with low latest head.
|
|
debug "Peer's head slot is lower then local head slot", peer = peer,
|
|
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
|
|
local_last_slot = man.getLastSlot(),
|
|
local_first_slot = man.getFirstSlot(),
|
|
direction = man.direction
|
|
peer.updateScore(PeerScoreUseless)
|
|
return
|
|
|
|
if man.direction == SyncQueueKind.Forward:
|
|
# Wall clock keeps ticking, so we need to update the queue
|
|
man.queue.updateLastSlot(man.getLastSlot())
|
|
|
|
man.workers[index].status = SyncWorkerStatus.Requesting
|
|
let req = man.queue.pop(peerSlot, peer)
|
|
if req.isEmpty():
|
|
# SyncQueue could return empty request in 2 cases:
|
|
# 1. There no more slots in SyncQueue to download (we are synced, but
|
|
# our ``notInSyncEvent`` is not yet cleared).
|
|
# 2. Current peer's known head slot is too low to satisfy request.
|
|
#
|
|
# To avoid endless loop we going to wait for RESP_TIMEOUT time here.
|
|
# This time is enough for all pending requests to finish and it is also
|
|
# enough for main sync loop to clear ``notInSyncEvent``.
|
|
debug "Empty request received from queue, exiting", peer = peer,
|
|
local_head_slot = headSlot, remote_head_slot = peerSlot,
|
|
queue_input_slot = man.queue.inpSlot,
|
|
queue_output_slot = man.queue.outSlot,
|
|
queue_last_slot = man.queue.finalSlot, direction = man.direction
|
|
await sleepAsync(RESP_TIMEOUT)
|
|
return
|
|
|
|
debug "Creating new request for peer", wall_clock_slot = wallSlot,
|
|
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
|
request = req
|
|
|
|
man.workers[index].status = SyncWorkerStatus.Downloading
|
|
|
|
try:
|
|
let blocks = await man.getBlocks(peer, req)
|
|
if blocks.isOk():
|
|
let data = blocks.get().asSeq()
|
|
let smap = getShortMap(req, data)
|
|
debug "Received blocks on request", blocks_count = len(data),
|
|
blocks_map = smap, request = req
|
|
|
|
if not(checkResponse(req, data)):
|
|
peer.updateScore(PeerScoreBadResponse)
|
|
warn "Received blocks sequence is not in requested range",
|
|
blocks_count = len(data), blocks_map = smap,
|
|
request = req
|
|
return
|
|
|
|
if len(data) == 0 and man.direction == SyncQueueKind.Backward and
|
|
req.contains(man.getSafeSlot()):
|
|
# The sync protocol does not distinguish between:
|
|
# - All requested slots are empty
|
|
# - Peer does not have data available about requested range
|
|
#
|
|
# However, we include the `backfill` slot in backward sync requests.
|
|
# If we receive an empty response to a request covering that slot,
|
|
# we know that the response is incomplete and can descore.
|
|
peer.updateScore(PeerScoreNoValues)
|
|
man.queue.push(req)
|
|
debug "Response does not include known-to-exist block", request = req
|
|
return
|
|
|
|
# Scoring will happen in `syncUpdate`.
|
|
man.workers[index].status = SyncWorkerStatus.Queueing
|
|
await man.queue.push(req, data, proc() =
|
|
man.workers[index].status = SyncWorkerStatus.Processing)
|
|
else:
|
|
peer.updateScore(PeerScoreNoValues)
|
|
man.queue.push(req)
|
|
debug "Failed to receive blocks on request", request = req
|
|
return
|
|
|
|
except CatchableError as exc:
|
|
debug "Unexpected exception while receiving blocks", request = req,
|
|
errName = exc.name, errMsg = exc.msg
|
|
return
|
|
|
|
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
|
|
mixin getKey, getScore, getHeadSlot
|
|
|
|
logScope:
|
|
index = index
|
|
sync_ident = man.ident
|
|
direction = man.direction
|
|
topics = "syncman"
|
|
|
|
debug "Starting syncing worker"
|
|
|
|
while true:
|
|
var peer: A = nil
|
|
let doBreak =
|
|
try:
|
|
man.workers[index].status = SyncWorkerStatus.Sleeping
|
|
# This event is going to be set until we are not in sync with network
|
|
await man.notInSyncEvent.wait()
|
|
man.workers[index].status = SyncWorkerStatus.WaitingPeer
|
|
peer = await man.pool.acquire()
|
|
await man.syncStep(index, peer)
|
|
man.pool.release(peer)
|
|
false
|
|
except CancelledError:
|
|
if not(isNil(peer)):
|
|
man.pool.release(peer)
|
|
true
|
|
except CatchableError as exc:
|
|
debug "Unexpected exception in sync worker",
|
|
peer = peer, peer_score = peer.getScore(),
|
|
peer_speed = peer.netKbps(),
|
|
errName = exc.name, errMsg = exc.msg
|
|
true
|
|
if doBreak:
|
|
break
|
|
|
|
debug "Sync worker stopped"
|
|
|
|
proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
|
|
sleeping: int,
|
|
waiting: int,
|
|
pending: int] =
|
|
var map = newString(len(man.workers))
|
|
var sleeping, waiting, pending: int
|
|
for i in 0 ..< len(man.workers):
|
|
var ch: char
|
|
case man.workers[i].status
|
|
of SyncWorkerStatus.Sleeping:
|
|
ch = 's'
|
|
inc(sleeping)
|
|
of SyncWorkerStatus.WaitingPeer:
|
|
ch = 'w'
|
|
inc(waiting)
|
|
of SyncWorkerStatus.UpdatingStatus:
|
|
ch = 'U'
|
|
inc(pending)
|
|
of SyncWorkerStatus.Requesting:
|
|
ch = 'R'
|
|
inc(pending)
|
|
of SyncWorkerStatus.Downloading:
|
|
ch = 'D'
|
|
inc(pending)
|
|
of SyncWorkerStatus.Queueing:
|
|
ch = 'Q'
|
|
inc(pending)
|
|
of SyncWorkerStatus.Processing:
|
|
ch = 'P'
|
|
inc(pending)
|
|
map[i] = ch
|
|
(map, sleeping, waiting, pending)
|
|
|
|
proc guardTask[A, B](man: SyncManager[A, B]) {.async.} =
|
|
logScope:
|
|
index = index
|
|
sync_ident = man.ident
|
|
direction = man.direction
|
|
topics = "syncman"
|
|
|
|
var pending: array[SyncWorkersCount, Future[void]]
|
|
|
|
# Starting all the synchronization workers.
|
|
for i in 0 ..< len(man.workers):
|
|
let future = syncWorker[A, B](man, i)
|
|
man.workers[i].future = future
|
|
pending[i] = future
|
|
|
|
# Wait for synchronization worker's failure and replace it with new one.
|
|
while true:
|
|
let failFuture = await one(pending)
|
|
let index = pending.find(failFuture)
|
|
if failFuture.failed():
|
|
warn "Synchronization worker stopped working unexpectedly with an error",
|
|
errName = failFuture.error.name, errMsg = failFuture.error.msg
|
|
else:
|
|
warn "Synchronization worker stopped working unexpectedly without error"
|
|
|
|
let future = syncWorker[A, B](man, index)
|
|
man.workers[index].future = future
|
|
pending[index] = future
|
|
|
|
proc toTimeLeftString*(d: Duration): string =
|
|
if d == InfiniteDuration:
|
|
"--h--m"
|
|
else:
|
|
var v = d
|
|
var res = ""
|
|
let ndays = chronos.days(v)
|
|
if ndays > 0:
|
|
res = res & (if ndays < 10: "0" & $ndays else: $ndays) & "d"
|
|
v = v - chronos.days(ndays)
|
|
|
|
let nhours = chronos.hours(v)
|
|
if nhours > 0:
|
|
res = res & (if nhours < 10: "0" & $nhours else: $nhours) & "h"
|
|
v = v - chronos.hours(nhours)
|
|
else:
|
|
res = res & "00h"
|
|
|
|
let nmins = chronos.minutes(v)
|
|
if nmins > 0:
|
|
res = res & (if nmins < 10: "0" & $nmins else: $nmins) & "m"
|
|
v = v - chronos.minutes(nmins)
|
|
else:
|
|
res = res & "00m"
|
|
res
|
|
|
|
proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void],
|
|
speedTaskFut: Future[void]) {.async.} =
|
|
guardTaskFut.cancel()
|
|
speedTaskFut.cancel()
|
|
await allFutures(guardTaskFut, speedTaskFut)
|
|
let pendingTasks =
|
|
block:
|
|
var res: seq[Future[void]]
|
|
for worker in man.workers:
|
|
doAssert(worker.status in {Sleeping, WaitingPeer})
|
|
worker.future.cancel()
|
|
res.add(worker.future)
|
|
res
|
|
await allFutures(pendingTasks)
|
|
|
|
proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
|
logScope:
|
|
sync_ident = man.ident
|
|
direction = man.direction
|
|
topics = "syncman"
|
|
|
|
mixin getKey, getScore
|
|
var pauseTime = 0
|
|
|
|
var guardTaskFut = man.guardTask()
|
|
|
|
debug "Synchronization loop started"
|
|
|
|
proc averageSpeedTask() {.async.} =
|
|
while true:
|
|
# Reset sync speeds between each loss-of-sync event
|
|
man.avgSyncSpeed = 0
|
|
man.insSyncSpeed = 0
|
|
|
|
await man.notInSyncEvent.wait()
|
|
|
|
# Give the node time to connect to peers and get the sync process started
|
|
await sleepAsync(seconds(SECONDS_PER_SLOT.int64))
|
|
|
|
var
|
|
stamp = SyncMoment.now(man.queue.progress())
|
|
syncCount = 0
|
|
|
|
while man.inProgress:
|
|
await sleepAsync(seconds(SECONDS_PER_SLOT.int64))
|
|
|
|
let
|
|
newStamp = SyncMoment.now(man.queue.progress())
|
|
slotsPerSec = speed(stamp, newStamp)
|
|
|
|
syncCount += 1
|
|
|
|
man.insSyncSpeed = slotsPerSec
|
|
man.avgSyncSpeed =
|
|
man.avgSyncSpeed + (slotsPerSec - man.avgSyncSpeed) / float(syncCount)
|
|
|
|
stamp = newStamp
|
|
|
|
var averageSpeedTaskFut = averageSpeedTask()
|
|
|
|
while true:
|
|
let wallSlot = man.getLocalWallSlot()
|
|
let headSlot = man.getLocalHeadSlot()
|
|
|
|
let (map, sleeping, waiting, pending) = man.getWorkersStats()
|
|
|
|
debug "Current syncing state", workers_map = map,
|
|
sleeping_workers_count = sleeping,
|
|
waiting_workers_count = waiting,
|
|
pending_workers_count = pending,
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
pause_time = $chronos.seconds(pauseTime),
|
|
avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed
|
|
|
|
let
|
|
pivot = man.progressPivot
|
|
progress =
|
|
case man.queue.kind
|
|
of SyncQueueKind.Forward:
|
|
man.queue.outSlot - pivot
|
|
of SyncQueueKind.Backward:
|
|
pivot - man.queue.outSlot
|
|
total =
|
|
case man.queue.kind
|
|
of SyncQueueKind.Forward:
|
|
man.queue.finalSlot + 1'u64 - pivot
|
|
of SyncQueueKind.Backward:
|
|
pivot + 1'u64 - man.queue.finalSlot
|
|
remaining = total - progress
|
|
done =
|
|
if total > 0:
|
|
progress.float / total.float
|
|
else:
|
|
1.0
|
|
timeleft =
|
|
if man.avgSyncSpeed >= 0.001:
|
|
Duration.fromFloatSeconds(remaining.float / man.avgSyncSpeed)
|
|
else:
|
|
InfiniteDuration
|
|
currentSlot = Base10.toString(
|
|
if man.queue.kind == SyncQueueKind.Forward:
|
|
max(uint64(man.queue.outSlot), 1'u64) - 1'u64
|
|
else:
|
|
uint64(man.queue.outSlot) + 1'u64
|
|
)
|
|
|
|
# Update status string
|
|
man.syncStatus = timeleft.toTimeLeftString() & " (" &
|
|
(done * 100).formatBiggestFloat(ffDecimal, 2) & "%) " &
|
|
man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) &
|
|
"slots/s (" & map & ":" & currentSlot & ")"
|
|
|
|
if man.remainingSlots() <= man.maxHeadAge:
|
|
man.notInSyncEvent.clear()
|
|
# We are marking SyncManager as not working only when we are in sync and
|
|
# all sync workers are in `Sleeping` state.
|
|
if pending > 0:
|
|
debug "Synchronization loop waits for workers completion",
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge,
|
|
sleeping_workers_count = sleeping,
|
|
waiting_workers_count = waiting, pending_workers_count = pending
|
|
# We already synced, so we should reset all the pending workers from
|
|
# any state they have.
|
|
man.queue.clearAndWakeup()
|
|
man.inProgress = true
|
|
else:
|
|
case man.direction
|
|
of SyncQueueKind.Forward:
|
|
if man.inProgress:
|
|
if SyncManagerFlag.NoMonitor in man.flags:
|
|
await man.syncClose(guardTaskFut, averageSpeedTaskFut)
|
|
man.inProgress = false
|
|
debug "Forward synchronization process finished, exiting",
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
difference = (wallSlot - headSlot),
|
|
max_head_age = man.maxHeadAge
|
|
break
|
|
else:
|
|
man.inProgress = false
|
|
debug "Forward synchronization process finished, sleeping",
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
difference = (wallSlot - headSlot),
|
|
max_head_age = man.maxHeadAge
|
|
else:
|
|
debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
|
|
local_head_slot = headSlot,
|
|
difference = (wallSlot - headSlot),
|
|
max_head_age = man.maxHeadAge
|
|
of SyncQueueKind.Backward:
|
|
# Backward syncing is going to be executed only once, so we exit loop
|
|
# and stop all pending tasks which belongs to this instance (sync
|
|
# workers, guard task and speed calculation task).
|
|
# We first need to cancel and wait for guard task, because otherwise
|
|
# it will be able to restore cancelled workers.
|
|
await man.syncClose(guardTaskFut, averageSpeedTaskFut)
|
|
man.inProgress = false
|
|
debug "Backward synchronization process finished, exiting",
|
|
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
|
backfill_slot = man.getLastSlot(),
|
|
max_head_age = man.maxHeadAge
|
|
break
|
|
else:
|
|
if not(man.notInSyncEvent.isSet()):
|
|
# We get here only if we lost sync for more then `maxHeadAge` period.
|
|
if pending == 0:
|
|
man.initQueue()
|
|
man.notInSyncEvent.fire()
|
|
man.inProgress = true
|
|
debug "Node lost sync for more then preset period",
|
|
period = man.maxHeadAge, wall_head_slot = wallSlot,
|
|
local_head_slot = headSlot,
|
|
missing_slots = man.remainingSlots(),
|
|
progress = float(man.queue.progress())
|
|
else:
|
|
man.notInSyncEvent.fire()
|
|
man.inProgress = true
|
|
|
|
await sleepAsync(chronos.seconds(2))
|
|
|
|
proc start*[A, B](man: SyncManager[A, B]) =
|
|
## Starts SyncManager's main loop.
|
|
man.syncFut = man.syncLoop()
|