Syncing fixes (#1909)

* Fix continuous sync queue rewinds on slow PCs.
Fix recurring disconnects on low peer score.

* Calculate average syncing speed, not the current one.
Move speed calculation to different task.

* Address review comments.
This commit is contained in:
Eugene Kabanov 2020-10-27 11:25:28 +02:00 committed by GitHub
parent ee2ebc96a8
commit c82ff24b5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 39 deletions

View File

@ -658,22 +658,25 @@ proc startSyncManager(node: BeaconNode) =
proc scoreCheck(peer: Peer): bool =
if peer.score < PeerScoreLowLimit:
try:
debug "Peer score is too low, disconnecting", peer = peer,
peer_score = peer.score, score_low_limit = PeerScoreLowLimit,
score_high_limit = PeerScoreHighLimit
# We do not care about result of this operation, because even if
# disconnect operation fails, peer will still be added to SeenTable
# and removed from PeerPool. So it will be not reused for syncing for
# `SeenTablePenaltyError` time.
asyncSpawn peer.disconnect(PeerScoreLow)
except:
discard
false
else:
true
proc onDeletePeer(peer: Peer) =
if peer.connectionState notin {Disconnecting, Disconnected}:
if peer.score < PeerScoreLowLimit:
debug "Peer was removed from PeerPool due to low score", peer = peer,
peer_score = peer.score, score_low_limit = PeerScoreLowLimit,
score_high_limit = PeerScoreHighLimit
asyncSpawn peer.disconnect(PeerScoreLow)
else:
debug "Peer was removed from PeerPool", peer = peer,
peer_score = peer.score, score_low_limit = PeerScoreLowLimit,
score_high_limit = PeerScoreHighLimit
asyncSpawn peer.disconnect(FaultOrError)
node.network.peerPool.setScoreCheck(scoreCheck)
node.network.peerPool.setOnDeletePeer(onDeletePeer)
node.syncManager = newSyncManager[Peer, PeerID](
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,

View File

@ -34,6 +34,8 @@ type
PeerCounterCallback* = proc() {.gcsafe, raises: [Defect].}
PeerOnDeleteCallback*[T] = proc(peer: T) {.gcsafe.}
PeerPool*[A, B] = ref object
incNotEmptyEvent*: AsyncEvent
outNotEmptyEvent*: AsyncEvent
@ -45,6 +47,7 @@ type
storage: seq[PeerItem[A]]
cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.}
scoreCheck: PeerScoreCheckCallback[A]
onDeletePeer: PeerOnDeleteCallback[A]
peerCounter: PeerCounterCallback
maxPeersCount: int
maxIncPeersCount: int
@ -134,7 +137,8 @@ proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
maxOutgoingPeers = -1,
scoreCheckCb: PeerScoreCheckCallback[A] = nil,
peerCounterCb: PeerCounterCallback = nil): PeerPool[A, B] =
peerCounterCb: PeerCounterCallback = nil,
onDeleteCb: PeerOnDeleteCallback[A] = nil): PeerPool[A, B] =
## Create new PeerPool.
##
## ``maxPeers`` - maximum number of peers allowed. All the peers which
@ -156,6 +160,8 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
## ``peerCountCb`` - callback to be called when number of peers in PeerPool
## has been changed.
##
## ``onDeleteCb`` - callback to be called when peer is leaving PeerPool.
##
## Please note, that if ``maxPeers`` is positive non-zero value, then equation
## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``.
var res = PeerPool[A, B]()
@ -183,6 +189,7 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
res.registry = initTable[B, PeerIndex]()
res.scoreCheck = scoreCheckCb
res.peerCounter = peerCounterCb
res.onDeletePeer = onDeleteCb
res.storage = newSeq[PeerItem[A]]()
proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} =
@ -267,6 +274,11 @@ proc peerCountChanged[A, B](pool: PeerPool[A, B]) {.inline.} =
if not(isNil(pool.peerCounter)):
pool.peerCounter()
proc peerDeleted[A, B](pool: PeerPool[A, B], peer: A) {.inline.} =
## Call callback when peer is leaving PeerPool.
if not(isNil(pool.onDeletePeer)):
pool.onDeletePeer(peer)
proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
## Remove ``peer`` from PeerPool ``pool``.
##
@ -294,6 +306,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
# Cleanup storage with default item, and removing key from hashtable.
pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key)
pool.peerDeleted(peer)
pool.peerCountChanged()
else:
if item[].peerType == PeerType.Incoming:
@ -318,6 +331,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
# Cleanup storage with default item, and removing key from hashtable.
pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key)
pool.peerDeleted(peer)
pool.peerCountChanged()
true
else:
@ -720,10 +734,15 @@ proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} =
proc setScoreCheck*[A, B](pool: PeerPool[A, B],
scoreCheckCb: PeerScoreCheckCallback[A]) =
## Add ScoreCheck callback.
## Sets ScoreCheck callback.
pool.scoreCheck = scoreCheckCb
proc setOnDeletePeer*[A, B](pool: PeerPool[A, B],
deletePeerCb: PeerOnDeleteCallback[A]) =
## Sets DeletePeer callback.
pool.onDeletePeer = deletePeerCb
proc setPeerCounter*[A, B](pool: PeerPool[A, B],
peerCounterCb: PeerCounterCallback) =
## Add PeerCounter callback.
## Sets PeerCounter callback.
pool.peerCounter = peerCounterCb

View File

@ -2,7 +2,6 @@ import chronicles
import options, deques, heapqueue, tables, strutils, sequtils, math, algorithm
import stew/results, chronos, chronicles
import spec/[datatypes, digest], peer_pool, eth2_network
import eth/async_utils
import ./eth2_processor
import block_pools/block_pools_types
@ -109,7 +108,9 @@ type
syncFut: Future[void]
outQueue: AsyncQueue[BlockEntry]
inProgress*: bool
syncSpeed*: float
insSyncSpeed*: float
avgSyncSpeed*: float
syncCount*: uint64
syncStatus*: string
SyncMoment* = object
@ -927,6 +928,7 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
mixin getKey, getScore
var pauseTime = 0
# Starting all sync workers
for i in 0 ..< len(man.workers):
@ -934,34 +936,73 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
debug "Synchronization loop started", topics = "syncman"
proc watchAndSpeedTask() {.async.} =
proc watchTask() {.async.} =
const
MaxPauseTime = int(SECONDS_PER_SLOT) * int(SLOTS_PER_EPOCH)
MinPauseTime = int(SECONDS_PER_SLOT)
pauseTime = MinPauseTime
while true:
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
let lsm1 = SyncMoment.now(man.getLocalHeadSlot())
await sleepAsync(chronos.seconds(pauseTime))
let lsm2 = SyncMoment.now(man.getLocalHeadSlot())
let (map, sleeping, waiting, pending) = man.getWorkersStats()
if pending == 0:
pauseTime = MinPauseTime
else:
if (lsm2.slot - lsm1.slot == 0'u64) and (pending > 1):
# Syncing is NOT progressing, we double `pauseTime` value, but value
# could not be bigger then `MaxPauseTime`.
if (pauseTime shl 1) > MaxPauseTime:
pauseTime = MaxPauseTime
else:
pauseTime = pauseTime shl 1
info "Syncing process is not progressing, reset the queue",
pending_workers_count = pending,
to_slot = man.queue.outSlot,
pause_time = $(chronos.seconds(pauseTime)),
local_head_slot = lsm1.slot, topics = "syncman"
await man.queue.resetWait(none[Slot]())
else:
# Syncing progressing, so reduce `pauseTime` value in half, but value
# could not be less then `MinPauseTime`.
if (pauseTime shr 1) < MinPauseTime:
pauseTime = MinPauseTime
else:
pauseTime = pauseTime shr 1
debug "Synchronization watch loop tick", wall_head_slot = wallSlot,
local_head_slot = headSlot, queue_start_slot = man.queue.startSlot,
queue_last_slot = man.queue.lastSlot,
pause_time = $(chronos.seconds(pauseTime)),
avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed,
pending_workers_count = pending,
topics = "syncman"
proc averageSpeedTask() {.async.} =
while true:
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
let lsm1 = SyncMoment.now(man.getLocalHeadSlot())
await sleepAsync(chronos.seconds(int(SECONDS_PER_SLOT)))
let lsm2 = SyncMoment.now(man.getLocalHeadSlot())
let (map, sleeping, waiting, pending) = man.getWorkersStats()
if pending == 0:
man.syncSpeed = 0.0
else:
if (lsm2.slot - lsm1.slot == 0'u64) and (pending > 1):
info "Syncing process is not progressing, reset the queue",
pending_workers_count = pending,
to_slot = man.queue.outSlot,
local_head_slot = lsm1.slot, topics = "syncman"
await man.queue.resetWait(none[Slot]())
let bps =
if lsm2.slot - lsm1.slot == 0'u64:
0.0
else:
man.syncSpeed = speed(lsm1, lsm2)
speed(lsm1, lsm2)
inc(man.syncCount)
man.insSyncSpeed = bps
man.avgSyncSpeed = man.avgSyncSpeed +
(bps - man.avgSyncSpeed) / float(man.syncCount)
trace "Synchronization loop tick", wall_head_slot = wallSlot,
local_head_slot = headSlot, queue_start_slot = man.queue.startSlot,
queue_last_slot = man.queue.lastSlot,
sync_speed = man.syncSpeed, pending_workers_count = pending,
topics = "syncman"
traceAsyncErrors watchAndSpeedTask()
asyncSpawn watchTask()
asyncSpawn averageSpeedTask()
while true:
let wallSlot = man.getLocalWallSlot()
@ -969,17 +1010,20 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
let (map, sleeping, waiting, pending) = man.getWorkersStats()
trace "Current syncing state", workers_map = map,
debug "Current syncing state", workers_map = map,
sleeping_workers_count = sleeping,
waiting_workers_count = waiting,
pending_workers_count = pending,
wall_head_slot = wallSlot, local_head_slot = headSlot,
pause_time = $chronos.seconds(pauseTime),
avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed,
topics = "syncman"
# Update status string
man.syncStatus = map & ":" & $pending & ":" &
man.syncSpeed.formatBiggestFloat(ffDecimal, 4) &
" (" & $man.queue.outSlot & ")"
man.insSyncSpeed.formatBiggestFloat(ffDecimal, 4) & ":" &
man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) &
" (" & $man.queue.outSlot & ")"
if headAge <= man.maxHeadAge:
man.notInSyncEvent.clear()