Add maximum number of workers (peers used) by SyncManager (default: 10) (#1172)

Refactor and simplification of `sync` procedure.
Fix aggressive looping on excessive recurring failures.
This commit is contained in:
Eugene Kabanov 2020-06-14 12:45:53 +03:00 committed by GitHub
parent 360ebd705f
commit 50c5d47250
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 58 additions and 55 deletions

View File

@ -85,6 +85,7 @@ type
pool: PeerPool[A, B]
responseTimeout: chronos.Duration
sleepTime: chronos.Duration
maxWorkersCount: int
maxStatusAge: uint64
maxHeadAge: uint64
maxRecurringFailures: int
@ -680,6 +681,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalWallSlotCb: GetSlotCallback,
getFSAFECb: GetSlotCallback,
updateLocalBlocksCb: UpdateLocalBlocksCallback,
maxWorkers = 10,
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
sleepTime = (int(SLOTS_PER_EPOCH) *
@ -702,6 +704,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
result = SyncManager[A, B](
pool: pool,
maxWorkersCount: maxWorkers,
maxStatusAge: maxStatusAge,
getLocalHeadSlot: getLocalHeadSlotCb,
syncUpdate: syncUpdate,
@ -909,14 +912,14 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
# This procedure manages main loop of SyncManager and in this loop it
# performs
# 1. It checks for current sync status, "are we synced?".
# 2. If we are in active syncing, it tries to acquire peers from PeerPool and
# spawns new sync-workers.
# 2. If we are in active syncing, it tries to acquire new peers from PeerPool
# and spawns new sync-workers. Number of spawned sync-workers can be
# controlled by `maxWorkersCount` value.
# 3. It stops spawning sync-workers when we are "in sync".
# 4. It calculates syncing performance.
mixin getKey, getScore
var pending = newSeq[Future[A]]()
var acquireFut: Future[A]
var wallSlot, headSlot: Slot
var syncSpeed: float = 0.0
template workersCount(): int =
@ -944,53 +947,24 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
traceAsyncErrors watchTask()
while true:
wallSlot = man.getLocalWallSlot()
headSlot = man.getLocalHeadSlot()
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
var progress: uint64
if headSlot <= man.queue.lastSlot:
progress = man.queue.progress()
else:
progress = 100'u64
let progress =
if headSlot <= man.queue.lastSlot:
man.queue.progress()
else:
100'u64
debug "Synchronization loop start tick", wall_head_slot = wallSlot,
debug "Synchronization loop tick", wall_head_slot = wallSlot,
local_head_slot = headSlot, queue_status = progress,
queue_start_slot = man.queue.startSlot,
queue_last_slot = man.queue.lastSlot,
workers_count = workersCount(), topics = "syncman"
if headAge <= man.maxHeadAge:
debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
local_head_slot = headSlot, workers_count = workersCount(),
difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge, topics = "syncman"
if len(pending) == 0:
man.inProgress = false
await sleepAsync(man.sleepTime)
else:
var peerFut = one(pending)
# We do not care about result here because we going to check peerFut
# later.
discard await withTimeout(peerFut, man.sleepTime)
else:
man.inProgress = true
if isNil(acquireFut):
acquireFut = man.pool.acquire()
pending.add(acquireFut)
debug "Synchronization loop waiting for new peer",
wall_head_slot = wallSlot, local_head_slot = headSlot,
workers_count = workersCount(), topics = "syncman"
var peerFut = one(pending)
# We do not care about result here, because we going to check peerFut
# later.
discard await withTimeout(peerFut, man.sleepTime)
waiting_for_new_peer = $not(isNil(acquireFut)),
sync_speed = syncSpeed, workers_count = workersCount(),
topics = "syncman"
var temp = newSeqOfCap[Future[A]](len(pending))
# Update slots to with more recent data
wallSlot = man.getLocalWallSlot()
headSlot = man.getLocalHeadSlot()
for fut in pending:
if fut.finished():
if fut == acquireFut:
@ -1001,7 +975,7 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
workers_count = workersCount(),
errMsg = acquireFut.readError().msg, topics = "syncman"
else:
var peer = acquireFut.read()
let peer = acquireFut.read()
if headAge <= man.maxHeadAge:
# If we are already in sync, we going to release just acquired
# peer and do not acquire peers
@ -1011,20 +985,22 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
man.pool.release(peer)
else:
if headSlot > man.queue.lastSlot:
debug "Synchronization lost, restoring",
wall_head_slot = wallSlot, local_head_slot = headSlot,
queue_last_slot = man.queue.lastSlot, topics = "syncman"
man.queue = SyncQueue.init(A, headSlot, wallSlot,
man.chunkSize, man.syncUpdate,
man.getFirstSlotAFE, 2)
debug "Synchronization loop starting new worker", peer = peer,
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), topics = "syncman"
temp.add(syncWorker(man, peer))
acquireFut = nil
if headAge > man.maxHeadAge:
acquireFut = man.pool.acquire()
temp.add(acquireFut)
# We will create new `acquireFut` later.
acquireFut = nil
else:
# Worker finished its work
# We got worker finished its work
if fut.failed():
debug "Synchronization loop got worker finished with an error",
wall_head_slot = wallSlot, local_head_slot = headSlot,
@ -1037,6 +1013,7 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
topics = "syncman"
else:
if fut == acquireFut:
# Task which waits for new peer from PeerPool is not yet finished.
if headAge <= man.maxHeadAge:
debug "Synchronization loop reached sync barrier",
wall_head_slot = wallSlot, local_head_slot = headSlot,
@ -1050,13 +1027,39 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
pending = temp
if headAge <= man.maxHeadAge:
debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
local_head_slot = headSlot, workers_count = workersCount(),
difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge, topics = "syncman"
if len(pending) == 0:
man.inProgress = false
await sleepAsync(man.sleepTime)
else:
debug "Synchronization loop waiting for workers completion",
workers_count = workersCount()
discard await withTimeout(one(pending), man.sleepTime)
else:
man.inProgress = true
if isNil(acquireFut) and len(pending) < man.maxWorkersCount:
acquireFut = man.pool.acquire()
pending.add(acquireFut)
debug "Synchronization loop waiting for new peer",
wall_head_slot = wallSlot, local_head_slot = headSlot,
workers_count = workersCount(), topics = "syncman",
sleep_time = $man.sleepTime
else:
debug "Synchronization loop waiting for workers",
wall_head_slot = wallSlot, local_head_slot = headSlot,
workers_count = workersCount(), topics = "syncman",
sleep_time = $man.sleep_time
discard await withTimeout(one(pending), man.sleepTime)
if len(man.failures) > man.maxRecurringFailures and (workersCount() > 1):
debug "Number of recurring failures exceeds limit, reseting queue",
workers_count = workers_count(), rec_failures = len(man.failures)
# Cleaning up failures.
man.failures.setLen(0)
await man.queue.resetWait(none[Slot]())
debug "Synchronization loop end tick", wall_head_slot = wallSlot,
local_head_slot = headSlot, workers_count = workersCount(),
waiting_for_new_peer = $not(isNil(acquireFut)),
sync_speed = syncSpeed, queue_slot = man.queue.outSlot,
topics = "syncman"