Syncing V2 (#1602)

* Syncing workers now not bound to peers.
Sync status is now printed in statusbar.

* Add `SyncQueue.outSlot` to statusbar too.

* Add `inRangeEvent` and `rangeAge` parameter.

* Fix rangeAge is not depends on SyncQueue latest slot.
Fix syncManager to start from latest local head slot.

* Add notInRange event.

* Remove suspects field.
This commit is contained in:
Eugene Kabanov 2020-09-11 15:46:01 +03:00 committed by GitHub
parent 775683cf2e
commit c7c9b9d5f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 184 additions and 191 deletions

View File

@ -995,6 +995,14 @@ when hasPrompt:
balance += b balance += b
formatGwei(balance) formatGwei(balance)
of "sync_status":
if isNil(node.syncManager):
"pending"
else:
if node.syncManager.inProgress:
node.syncManager.syncStatus
else:
"synced"
else: else:
# We ignore typos for now and just render the expression # We ignore typos for now and just render the expression
# as it was written. TODO: come up with a good way to show # as it was written. TODO: come up with a good way to show

View File

@ -185,7 +185,8 @@ type
defaultValue: "peers: $connected_peers;" & defaultValue: "peers: $connected_peers;" &
"finalized: $finalized_root:$finalized_epoch;" & "finalized: $finalized_root:$finalized_epoch;" &
"head: $head_root:$head_epoch:$head_epoch_slot;" & "head: $head_root:$head_epoch:$head_epoch_slot;" &
"time: $epoch:$epoch_slot ($slot)|" "time: $epoch:$epoch_slot ($slot);" &
"sync: $sync_status|"
desc: "Textual template for the contents of the status bar" desc: "Textual template for the contents of the status bar"
name: "status-bar-contents" }: string name: "status-bar-contents" }: string

View File

@ -29,6 +29,9 @@ const
PeerScoreMissingBlocks* = -200 PeerScoreMissingBlocks* = -200
## Peer response contains too many empty blocks. ## Peer response contains too many empty blocks.
SyncWorkersCount* = 20
## Number of sync workers to spawn
type type
SyncFailureKind* = enum SyncFailureKind* = enum
StatusInvalid, StatusInvalid,
@ -69,14 +72,19 @@ type
debtsQueue: HeapQueue[SyncRequest[T]] debtsQueue: HeapQueue[SyncRequest[T]]
debtsCount: uint64 debtsCount: uint64
readyQueue: HeapQueue[SyncResult[T]] readyQueue: HeapQueue[SyncResult[T]]
suspects: seq[SyncResult[T]]
outQueue: AsyncQueue[BlockEntry] outQueue: AsyncQueue[BlockEntry]
SyncWorkerStatus* {.pure.} = enum
Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Processing
SyncWorker*[A, B] = object
future: Future[void]
status: SyncWorkerStatus
SyncManager*[A, B] = ref object SyncManager*[A, B] = ref object
pool: PeerPool[A, B] pool: PeerPool[A, B]
responseTimeout: chronos.Duration responseTimeout: chronos.Duration
sleepTime: chronos.Duration sleepTime: chronos.Duration
maxWorkersCount: int
maxStatusAge: uint64 maxStatusAge: uint64
maxHeadAge: uint64 maxHeadAge: uint64
maxRecurringFailures: int maxRecurringFailures: int
@ -84,12 +92,19 @@ type
getLocalHeadSlot: GetSlotCallback getLocalHeadSlot: GetSlotCallback
getLocalWallSlot: GetSlotCallback getLocalWallSlot: GetSlotCallback
getFinalizedSlot: GetSlotCallback getFinalizedSlot: GetSlotCallback
workers: array[SyncWorkersCount, SyncWorker[A, B]]
notInSyncEvent: AsyncEvent
rangeAge: uint64
inRangeEvent*: AsyncEvent
notInRangeEvent*: AsyncEvent
chunkSize: uint64 chunkSize: uint64
queue: SyncQueue[A] queue: SyncQueue[A]
failures: seq[SyncFailure[A]] failures: seq[SyncFailure[A]]
syncFut: Future[void] syncFut: Future[void]
outQueue: AsyncQueue[BlockEntry] outQueue: AsyncQueue[BlockEntry]
inProgress*: bool inProgress*: bool
syncSpeed*: float
syncStatus*: string
SyncMoment* = object SyncMoment* = object
stamp*: chronos.Moment stamp*: chronos.Moment
@ -585,22 +600,21 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalWallSlotCb: GetSlotCallback, getLocalWallSlotCb: GetSlotCallback,
getFinalizedSlotCb: GetSlotCallback, getFinalizedSlotCb: GetSlotCallback,
outputQueue: AsyncQueue[BlockEntry], outputQueue: AsyncQueue[BlockEntry],
maxWorkers = 10,
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4), maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
sleepTime = (int(SLOTS_PER_EPOCH) * sleepTime = (int(SLOTS_PER_EPOCH) *
int(SECONDS_PER_SLOT)).seconds, int(SECONDS_PER_SLOT)).seconds,
chunkSize = uint64(SLOTS_PER_EPOCH), chunkSize = uint64(SLOTS_PER_EPOCH),
toleranceValue = uint64(1), toleranceValue = uint64(1),
maxRecurringFailures = 3 maxRecurringFailures = 3,
rangeAge = uint64(SLOTS_PER_EPOCH * 4)
): SyncManager[A, B] = ): SyncManager[A, B] =
let queue = SyncQueue.init(A, getFinalizedSlotCb(), getLocalWallSlotCb(), let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(),
chunkSize, getFinalizedSlotCb, outputQueue, 1) chunkSize, getFinalizedSlotCb, outputQueue, 1)
result = SyncManager[A, B]( result = SyncManager[A, B](
pool: pool, pool: pool,
maxWorkersCount: maxWorkers,
maxStatusAge: maxStatusAge, maxStatusAge: maxStatusAge,
getLocalHeadSlot: getLocalHeadSlotCb, getLocalHeadSlot: getLocalHeadSlotCb,
getLocalWallSlot: getLocalWallSlotCb, getLocalWallSlot: getLocalWallSlotCb,
@ -610,7 +624,11 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
sleepTime: sleepTime, sleepTime: sleepTime,
chunkSize: chunkSize, chunkSize: chunkSize,
queue: queue, queue: queue,
outQueue: outputQueue outQueue: outputQueue,
notInSyncEvent: newAsyncEvent(),
inRangeEvent: newAsyncEvent(),
notInRangeEvent: newAsyncEvent(),
rangeAge: rangeAge
) )
proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
@ -642,6 +660,9 @@ template headAge(): uint64 =
template peerAge(): uint64 = template peerAge(): uint64 =
if peerSlot > wallSlot: 0'u64 else: wallSlot - peerSlot if peerSlot > wallSlot: 0'u64 else: wallSlot - peerSlot
template queueAge(): uint64 =
wallSlot - man.queue.outSlot
template checkPeerScore(peer, body: untyped): untyped = template checkPeerScore(peer, body: untyped): untyped =
mixin getScore mixin getScore
let currentScore = peer.getScore() let currentScore = peer.getScore()
@ -656,33 +677,25 @@ template checkPeerScore(peer, body: untyped): untyped =
func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 = func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 =
man.queue.len man.queue.len
proc syncWorker*[A, B](man: SyncManager[A, B], proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
peer: A): Future[A] {.async.} =
# Sync worker is the lowest level loop which performs syncing with single
# peer.
#
# Logic here is pretty simple:
# 1. Obtain request from SyncQueue.
# 2. Send this request to a peer and obtain response.
# 3. Push response to the SyncQueue, (doesn't matter if it success or failure)
# 4. Update main SyncQueue last slot with wall time slot number.
# 5. From time to time we also requesting peer's status information.
# 6. If our current head slot is near equal to peer's head slot we are
# exiting this loop and finishing that sync-worker task.
# 7. Repeat
mixin getKey, getScore, getHeadSlot mixin getKey, getScore, getHeadSlot
debug "Starting syncing with peer", peer = peer, debug "Starting syncing worker", index = index, topics = "syncman"
peer_score = peer.getScore(),
peer_speed = peer.netKbps(), while true:
topics = "syncman" man.workers[index].status = SyncWorkerStatus.Sleeping
try: # This event is going to be set until we are not in sync with network
while true: await man.notInSyncEvent.wait()
var wallSlot = man.getLocalWallSlot()
var headSlot = man.getLocalHeadSlot() man.workers[index].status = SyncWorkerStatus.WaitingPeer
let peer = await man.pool.acquire()
try:
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
var peerSlot = peer.getHeadSlot() var peerSlot = peer.getHeadSlot()
# We updating SyncQueue's last slot all the time
man.queue.updateLastSlot(wallSlot) man.queue.updateLastSlot(wallSlot)
debug "Peer's syncing status", wall_clock_slot = wallSlot, debug "Peer's syncing status", wall_clock_slot = wallSlot,
@ -690,26 +703,28 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
peer_score = peer.getScore(), peer = peer, peer_score = peer.getScore(), peer = peer,
peer_speed = peer.netKbps(), topics = "syncman" peer_speed = peer.netKbps(), topics = "syncman"
# Check if peer's head slot is bigger than our wall clock slot.
if peerSlot > wallSlot + man.toleranceValue: if peerSlot > wallSlot + man.toleranceValue:
# Our wall timer is broken, or peer's status information is invalid. # Our wall timer is broken, or peer's status information is invalid.
debug "Local timer is broken or peer's status information is invalid", warn "Local timer is broken or peer's status information is invalid",
wall_clock_slot = wallSlot, remote_head_slot = peerSlot, wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_head_slot = headSlot, peer = peer, local_head_slot = headSlot, peer = peer,
tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(), tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), topics = "syncman" peer_score = peer.getScore(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer) let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
man.failures.add(failure) man.failures.add(failure)
break continue
# Check if we need to update peer's status information
if peerAge >= man.maxStatusAge: if peerAge >= man.maxStatusAge:
# Peer's status information is very old, we going to update it. # Peer's status information is very old, its time to update it
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
debug "Updating peer's status information", wall_clock_slot = wallSlot, debug "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(), peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman" peer_speed = peer.netKbps(), topics = "syncman"
checkPeerScore peer: let res = await peer.updateStatus()
let res = await peer.updateStatus()
if not(res): if not(res):
peer.updateScore(PeerScoreNoStatus) peer.updateScore(PeerScoreNoStatus)
@ -718,37 +733,43 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
peer_speed = peer.netKbps(), topics = "syncman" peer_speed = peer.netKbps(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer) let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
man.failures.add(failure) man.failures.add(failure)
break continue
let newPeerSlot = peer.getHeadSlot() let newPeerSlot = peer.getHeadSlot()
if peerSlot >= newPeerSlot: if peerSlot >= newPeerSlot:
peer.updateScore(PeerScoreStaleStatus) peer.updateScore(PeerScoreStaleStatus)
debug "Peer's status information is stale, exiting", debug "Peer's status information is stale",
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(), peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman" peer_speed = peer.netKbps(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusStale, peer) else:
man.failures.add(failure) debug "Peer's status information updated", wall_clock_slot = wallSlot,
break remote_old_head_slot = peerSlot, local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot, peer = peer,
debug "Peer's status information updated", wall_clock_slot = wallSlot, peer_score = peer.getScore(), peer_speed = peer.netKbps(),
remote_old_head_slot = peerSlot, local_head_slot = headSlot, topics = "syncman"
remote_new_head_slot = newPeerSlot, peer = peer, peer.updateScore(PeerScoreGoodStatus)
peer_score = peer.getScore(), peer_speed = peer.netKbps(), peerSlot = newPeerSlot
topics = "syncman"
peer.updateScore(PeerScoreGoodStatus)
peerSlot = newPeerSlot
if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge): if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge):
debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot, debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(), peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman" peer_speed = peer.netKbps(), topics = "syncman"
break continue
man.workers[index].status = SyncWorkerStatus.Requesting
let req = man.queue.pop(peerSlot, peer) let req = man.queue.pop(peerSlot, peer)
if req.isEmpty(): if req.isEmpty():
# SyncQueue could return empty request in 2 cases:
# 1. There no more slots in SyncQueue to download (we are synced, but
# our ``notInSyncEvent`` is not yet cleared).
# 2. Current peer's known head slot is too low to satisfy request.
#
# To avoid endless loop we going to wait for RESP_TIMEOUT time here.
# This time is enough for all pending requests to finish and it is also
# enough for main sync loop to clear ``notInSyncEvent``.
debug "Empty request received from queue, exiting", peer = peer, debug "Empty request received from queue, exiting", peer = peer,
local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot,
queue_input_slot = man.queue.inpSlot, queue_input_slot = man.queue.inpSlot,
@ -756,25 +777,17 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
queue_last_slot = man.queue.lastSlot, queue_last_slot = man.queue.lastSlot,
peer_speed = peer.netKbps(), peer_score = peer.getScore(), peer_speed = peer.netKbps(), peer_score = peer.getScore(),
topics = "syncman" topics = "syncman"
# Sometimes when syncing is almost done but last requests are still await sleepAsync(RESP_TIMEOUT)
# pending, this can fall into endless cycle, when low number of peers continue
# are available in PeerPool. We going to wait for RESP_TIMEOUT time,
# so all pending requests should be finished at this moment.
checkPeerScore peer:
await sleepAsync(RESP_TIMEOUT)
let failure = SyncFailure.init(SyncFailureKind.EmptyProblem, peer)
man.failures.add(failure)
break
man.workers[index].status = SyncWorkerStatus.Downloading
debug "Creating new request for peer", wall_clock_slot = wallSlot, debug "Creating new request for peer", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot,
request_slot = req.slot, request_count = req.count, request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, peer_speed = peer.netKbps(), request_step = req.step, peer = peer, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), topics = "syncman" peer_score = peer.getScore(), topics = "syncman"
checkPeerScore peer: let blocks = await man.getBlocks(peer, req)
let blocks = await man.getBlocks(peer, req)
if blocks.isOk: if blocks.isOk:
let data = blocks.get() let data = blocks.get()
@ -795,11 +808,12 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
topics = "syncman" topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.BadResponse, peer) let failure = SyncFailure.init(SyncFailureKind.BadResponse, peer)
man.failures.add(failure) man.failures.add(failure)
break continue
# Scoring will happen in `syncUpdate`. # Scoring will happen in `syncUpdate`.
checkPeerScore peer, man.workers[index].status = SyncWorkerStatus.Processing
await man.queue.push(req, data) await man.queue.push(req, data)
# Cleaning up failures. # Cleaning up failures.
man.failures.setLen(0) man.failures.setLen(0)
else: else:
@ -812,170 +826,140 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
topics = "syncman" topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer) let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer)
man.failures.add(failure) man.failures.add(failure)
break
result = peer except CatchableError as exc:
finally: debug "Unexpected exception happened", topics = "syncman",
man.pool.release(peer) excName = $exc.name, excMsg = exc.msg
finally:
man.pool.release(peer)
proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
sleeping: int,
waiting: int,
pending: int] =
var map = newString(len(man.workers))
var sleeping, waiting, pending: int
for i in 0 ..< len(man.workers):
var ch: char
case man.workers[i].status
of SyncWorkerStatus.Sleeping:
ch = 's'
inc(sleeping)
of SyncWorkerStatus.WaitingPeer:
ch = 'w'
inc(waiting)
of SyncWorkerStatus.UpdatingStatus:
ch = 'U'
inc(pending)
of SyncWorkerStatus.Requesting:
ch = 'R'
inc(pending)
of SyncWorkerStatus.Downloading:
ch = 'D'
inc(pending)
of SyncWorkerStatus.Processing:
ch = 'P'
inc(pending)
map[i] = ch
(map, sleeping, waiting, pending)
proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
# This procedure manages main loop of SyncManager and in this loop it
# performs
# 1. It checks for current sync status, "are we synced?".
# 2. If we are in active syncing, it tries to acquire new peers from PeerPool
# and spawns new sync-workers. Number of spawned sync-workers can be
# controlled by `maxWorkersCount` value.
# 3. It stops spawning sync-workers when we are "in sync".
# 4. It calculates syncing performance.
mixin getKey, getScore mixin getKey, getScore
var pending = newSeq[Future[A]]()
var acquireFut: Future[A]
var syncSpeed: float = 0.0
template workersCount(): int = # Starting all sync workers
if isNil(acquireFut): len(pending) else: (len(pending) - 1) for i in 0 ..< len(man.workers):
man.workers[i].future = syncWorker[A, B](man, i)
proc watchTask() {.async.} = debug "Synchronization loop started", topics = "syncman"
proc watchAndSpeedTask() {.async.} =
while true: while true:
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
let lsm1 = SyncMoment.now(man.getLocalHeadSlot()) let lsm1 = SyncMoment.now(man.getLocalHeadSlot())
await sleepAsync(chronos.seconds(int(SECONDS_PER_SLOT))) await sleepAsync(chronos.seconds(int(SECONDS_PER_SLOT)))
let lsm2 = SyncMoment.now(man.getLocalHeadSlot()) let lsm2 = SyncMoment.now(man.getLocalHeadSlot())
if workersCount() == 0:
syncSpeed = 0.0 let (map, sleeping, waiting, pending) = man.getWorkersStats()
if pending == 0:
man.syncSpeed = 0.0
else: else:
if (lsm2.slot - lsm1.slot == 0'u64) and (workersCount() > 1): if (lsm2.slot - lsm1.slot == 0'u64) and (pending > 1):
debug "Syncing process is not progressing, reset the queue", debug "Syncing process is not progressing, reset the queue",
workers_count = workersCount(), pending_workers_count = pending,
to_slot = man.queue.outSlot, to_slot = man.queue.outSlot,
local_head_slot = lsm1.slot, topics = "syncman" local_head_slot = lsm1.slot, topics = "syncman"
await man.queue.resetWait(none[Slot]()) await man.queue.resetWait(none[Slot]())
else: else:
syncSpeed = speed(lsm1, lsm2) man.syncSpeed = speed(lsm1, lsm2)
debug "Synchronization loop started", topics = "syncman" debug "Synchronization loop tick", wall_head_slot = wallSlot,
local_head_slot = headSlot, queue_start_slot = man.queue.startSlot,
queue_last_slot = man.queue.lastSlot,
sync_speed = man.syncSpeed, pending_workers_count = pending,
topics = "syncman"
traceAsyncErrors watchTask() traceAsyncErrors watchAndSpeedTask()
while true: while true:
let wallSlot = man.getLocalWallSlot() let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot() let headSlot = man.getLocalHeadSlot()
let progress = let (map, sleeping, waiting, pending) = man.getWorkersStats()
if headSlot <= man.queue.lastSlot:
man.queue.progress()
else:
100'u64
debug "Synchronization loop tick", wall_head_slot = wallSlot, debug "Current syncing state", workers_map = map,
local_head_slot = headSlot, queue_status = progress, sleeping_workers_count = sleeping,
queue_start_slot = man.queue.startSlot, waiting_workers_count = waiting,
queue_last_slot = man.queue.lastSlot, pending_workers_count = pending,
waiting_for_new_peer = $not(isNil(acquireFut)), wall_head_slot = wallSlot, local_head_slot = headSlot,
sync_speed = syncSpeed, workers_count = workersCount(),
topics = "syncman" topics = "syncman"
var temp = newSeqOfCap[Future[A]](len(pending)) # Update status string
for fut in pending: man.syncStatus = map & ":" & $pending & ":" &
if fut.finished(): man.syncSpeed.formatBiggestFloat(ffDecimal, 4) &
if fut == acquireFut: " (" & $man.queue.outSlot & ")"
# We acquired new peer from PeerPool.
if acquireFut.failed():
debug "Synchronization loop failed to get new peer",
wall_head_slot = wallSlot, local_head_slot = headSlot,
workers_count = workersCount(),
errMsg = acquireFut.readError().msg, topics = "syncman"
else:
let peer = acquireFut.read()
if headAge <= man.maxHeadAge:
# If we are already in sync, we going to release just acquired
# peer and do not acquire peers
debug "Synchronization loop reached sync barrier", peer = peer,
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), topics = "syncman"
man.pool.release(peer)
else:
if headSlot > man.queue.lastSlot:
debug "Synchronization lost, restoring",
wall_head_slot = wallSlot, local_head_slot = headSlot,
queue_last_slot = man.queue.lastSlot, topics = "syncman"
man.queue = SyncQueue.init(A, man.getFinalizedSlot(), wallSlot,
man.chunkSize, man.getFinalizedSlot,
man.outQueue, 1)
debug "Synchronization loop starting new worker", peer = peer,
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
temp.add(syncWorker(man, peer))
# We will create new `acquireFut` later.
acquireFut = nil
else:
# We got worker finished its work
if fut.failed():
debug "Synchronization loop got worker finished with an error",
wall_head_slot = wallSlot, local_head_slot = headSlot,
errMsg = fut.readError().msg, topics = "syncman"
else:
let peer = fut.read()
debug "Synchronization loop got worker finished",
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(),
topics = "syncman"
else:
if fut == acquireFut:
# Task which waits for new peer from PeerPool is not yet finished.
if headAge <= man.maxHeadAge:
debug "Synchronization loop reached sync barrier",
wall_head_slot = wallSlot, local_head_slot = headSlot,
topics = "syncman"
acquireFut = nil
fut.cancel()
else:
temp.add(fut)
else:
temp.add(fut)
pending = temp
if headAge <= man.maxHeadAge: if headAge <= man.maxHeadAge:
debug "Synchronization loop sleeping", wall_head_slot = wallSlot, man.notInSyncEvent.clear()
local_head_slot = headSlot, workers_count = workersCount(), # We are marking SyncManager as not working only when we are in sync and
difference = (wallSlot - headSlot), # all sync workers are in `Sleeping` state.
max_head_age = man.maxHeadAge, topics = "syncman" if pending > 0:
if len(pending) == 0: debug "Synchronization loop waits for workers completion",
man.inProgress = false wall_head_slot = wallSlot, local_head_slot = headSlot,
await sleepAsync(man.sleepTime) difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge,
sleeping_workers_count = sleeping,
waiting_workers_count = waiting, pending_workers_count = pending,
topics = "syncman"
man.inProgress = true
else: else:
debug "Synchronization loop waiting for workers completion", debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
workers_count = workersCount(), topics = "syncman" local_head_slot = headSlot, difference = (wallSlot - headSlot),
discard await withTimeout(one(pending), man.sleepTime) max_head_age = man.maxHeadAge, topics = "syncman"
man.inProgress = false
else: else:
man.notInSyncEvent.fire()
man.inProgress = true man.inProgress = true
if isNil(acquireFut) and len(pending) < man.maxWorkersCount: if queueAge <= man.rangeAge:
acquireFut = man.pool.acquire() # We are in requested range ``man.rangeAge``.
pending.add(acquireFut) man.inRangeEvent.fire()
debug "Synchronization loop waiting for new peer", man.notInRangeEvent.clear()
wall_head_slot = wallSlot, local_head_slot = headSlot, else:
workers_count = workersCount(), topics = "syncman", # We are not in requested range anymore ``man.rangeAge``.
sleep_time = $man.sleepTime man.inRangeEvent.clear()
else: man.notInRangeEvent.fire()
debug "Synchronization loop waiting for workers",
wall_head_slot = wallSlot, local_head_slot = headSlot,
workers_count = workersCount(), topics = "syncman",
sleep_time = $man.sleep_time
discard await withTimeout(one(pending), man.sleepTime) if len(man.failures) > man.maxRecurringFailures and pending > 1:
if len(man.failures) > man.maxRecurringFailures and (workersCount() > 1):
debug "Number of recurring failures exceeds limit, reseting queue", debug "Number of recurring failures exceeds limit, reseting queue",
workers_count = workers_count(), rec_failures = len(man.failures) pending_workers_count = pending, sleeping_workers_count = sleeping,
waiting_workers_count = waiting, rec_failures = len(man.failures),
topics = "syncman"
# Cleaning up failures. # Cleaning up failures.
man.failures.setLen(0) man.failures.setLen(0)
await man.queue.resetWait(none[Slot]()) await man.queue.resetWait(none[Slot]())
await sleepAsync(chronos.seconds(2))
proc start*[A, B](man: SyncManager[A, B]) = proc start*[A, B](man: SyncManager[A, B]) =
## Starts SyncManager's main loop. ## Starts SyncManager's main loop.
man.syncFut = man.syncLoop() man.syncFut = man.syncLoop()