From c82ff24b5c6564f35b5e2336bdba8e1f0c44e07e Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Tue, 27 Oct 2020 11:25:28 +0200 Subject: [PATCH] Syncing fixes (#1909) * Fix continuous sync queue rewinds on slow PCs. Fix recurring disconnects on low peer score. * Calculate average syncing speed, not the current one. Move speed calculation to different task. * Address review comments. --- beacon_chain/beacon_node.nim | 25 ++++++---- beacon_chain/peer_pool.nim | 25 ++++++++-- beacon_chain/sync_manager.nim | 94 +++++++++++++++++++++++++---------- 3 files changed, 105 insertions(+), 39 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 5bc34ab76..446be5a6e 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -658,22 +658,25 @@ proc startSyncManager(node: BeaconNode) = proc scoreCheck(peer: Peer): bool = if peer.score < PeerScoreLowLimit: - try: - debug "Peer score is too low, disconnecting", peer = peer, - peer_score = peer.score, score_low_limit = PeerScoreLowLimit, - score_high_limit = PeerScoreHighLimit - # We do not care about result of this operation, because even if - # disconnect operation fails, peer will still be added to SeenTable - # and removed from PeerPool. So it will be not reused for syncing for - # `SeenTablePenaltyError` time. - asyncSpawn peer.disconnect(PeerScoreLow) - except: - discard false else: true + proc onDeletePeer(peer: Peer) = + if peer.connectionState notin {Disconnecting, Disconnected}: + if peer.score < PeerScoreLowLimit: + debug "Peer was removed from PeerPool due to low score", peer = peer, + peer_score = peer.score, score_low_limit = PeerScoreLowLimit, + score_high_limit = PeerScoreHighLimit + asyncSpawn peer.disconnect(PeerScoreLow) + else: + debug "Peer was removed from PeerPool", peer = peer, + peer_score = peer.score, score_low_limit = PeerScoreLowLimit, + score_high_limit = PeerScoreHighLimit + asyncSpawn peer.disconnect(FaultOrError) + node.network.peerPool.setScoreCheck(scoreCheck) + node.network.peerPool.setOnDeletePeer(onDeletePeer) node.syncManager = newSyncManager[Peer, PeerID]( node.network.peerPool, getLocalHeadSlot, getLocalWallSlot, diff --git a/beacon_chain/peer_pool.nim b/beacon_chain/peer_pool.nim index 9f940e076..f7af4ca40 100644 --- a/beacon_chain/peer_pool.nim +++ b/beacon_chain/peer_pool.nim @@ -34,6 +34,8 @@ type PeerCounterCallback* = proc() {.gcsafe, raises: [Defect].} + PeerOnDeleteCallback*[T] = proc(peer: T) {.gcsafe.} + PeerPool*[A, B] = ref object incNotEmptyEvent*: AsyncEvent outNotEmptyEvent*: AsyncEvent @@ -45,6 +47,7 @@ type storage: seq[PeerItem[A]] cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.} scoreCheck: PeerScoreCheckCallback[A] + onDeletePeer: PeerOnDeleteCallback[A] peerCounter: PeerCounterCallback maxPeersCount: int maxIncPeersCount: int @@ -134,7 +137,8 @@ proc waitNotFullEvent[A, B](pool: PeerPool[A, B], proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, maxOutgoingPeers = -1, scoreCheckCb: PeerScoreCheckCallback[A] = nil, - peerCounterCb: PeerCounterCallback = nil): PeerPool[A, B] = + peerCounterCb: PeerCounterCallback = nil, + onDeleteCb: PeerOnDeleteCallback[A] = nil): PeerPool[A, B] = ## Create new PeerPool. ## ## ``maxPeers`` - maximum number of peers allowed. All the peers which @@ -156,6 +160,8 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, ## ``peerCountCb`` - callback to be called when number of peers in PeerPool ## has been changed. ## + ## ``onDeleteCb`` - callback to be called when peer is leaving PeerPool. + ## ## Please note, that if ``maxPeers`` is positive non-zero value, then equation ## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``. var res = PeerPool[A, B]() @@ -183,6 +189,7 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, res.registry = initTable[B, PeerIndex]() res.scoreCheck = scoreCheckCb res.peerCounter = peerCounterCb + res.onDeletePeer = onDeleteCb res.storage = newSeq[PeerItem[A]]() proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} = @@ -267,6 +274,11 @@ proc peerCountChanged[A, B](pool: PeerPool[A, B]) {.inline.} = if not(isNil(pool.peerCounter)): pool.peerCounter() +proc peerDeleted[A, B](pool: PeerPool[A, B], peer: A) {.inline.} = + ## Call callback when peer is leaving PeerPool. + if not(isNil(pool.onDeletePeer)): + pool.onDeletePeer(peer) + proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = ## Remove ``peer`` from PeerPool ``pool``. ## @@ -294,6 +306,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = # Cleanup storage with default item, and removing key from hashtable. pool.storage[pindex] = PeerItem[A]() pool.registry.del(key) + pool.peerDeleted(peer) pool.peerCountChanged() else: if item[].peerType == PeerType.Incoming: @@ -318,6 +331,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = # Cleanup storage with default item, and removing key from hashtable. pool.storage[pindex] = PeerItem[A]() pool.registry.del(key) + pool.peerDeleted(peer) pool.peerCountChanged() true else: @@ -720,10 +734,15 @@ proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} = proc setScoreCheck*[A, B](pool: PeerPool[A, B], scoreCheckCb: PeerScoreCheckCallback[A]) = - ## Add ScoreCheck callback. + ## Sets ScoreCheck callback. pool.scoreCheck = scoreCheckCb +proc setOnDeletePeer*[A, B](pool: PeerPool[A, B], + deletePeerCb: PeerOnDeleteCallback[A]) = + ## Sets DeletePeer callback. + pool.onDeletePeer = deletePeerCb + proc setPeerCounter*[A, B](pool: PeerPool[A, B], peerCounterCb: PeerCounterCallback) = - ## Add PeerCounter callback. + ## Sets PeerCounter callback. pool.peerCounter = peerCounterCb diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index 08fdd65c1..e41cb450c 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -2,7 +2,6 @@ import chronicles import options, deques, heapqueue, tables, strutils, sequtils, math, algorithm import stew/results, chronos, chronicles import spec/[datatypes, digest], peer_pool, eth2_network -import eth/async_utils import ./eth2_processor import block_pools/block_pools_types @@ -109,7 +108,9 @@ type syncFut: Future[void] outQueue: AsyncQueue[BlockEntry] inProgress*: bool - syncSpeed*: float + insSyncSpeed*: float + avgSyncSpeed*: float + syncCount*: uint64 syncStatus*: string SyncMoment* = object @@ -927,6 +928,7 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = mixin getKey, getScore + var pauseTime = 0 # Starting all sync workers for i in 0 ..< len(man.workers): @@ -934,34 +936,73 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = debug "Synchronization loop started", topics = "syncman" - proc watchAndSpeedTask() {.async.} = + proc watchTask() {.async.} = + const + MaxPauseTime = int(SECONDS_PER_SLOT) * int(SLOTS_PER_EPOCH) + MinPauseTime = int(SECONDS_PER_SLOT) + + pauseTime = MinPauseTime + + while true: + let wallSlot = man.getLocalWallSlot() + let headSlot = man.getLocalHeadSlot() + + let lsm1 = SyncMoment.now(man.getLocalHeadSlot()) + await sleepAsync(chronos.seconds(pauseTime)) + let lsm2 = SyncMoment.now(man.getLocalHeadSlot()) + + let (map, sleeping, waiting, pending) = man.getWorkersStats() + if pending == 0: + pauseTime = MinPauseTime + else: + if (lsm2.slot - lsm1.slot == 0'u64) and (pending > 1): + # Syncing is NOT progressing, we double `pauseTime` value, but value + # could not be bigger then `MaxPauseTime`. + if (pauseTime shl 1) > MaxPauseTime: + pauseTime = MaxPauseTime + else: + pauseTime = pauseTime shl 1 + info "Syncing process is not progressing, reset the queue", + pending_workers_count = pending, + to_slot = man.queue.outSlot, + pause_time = $(chronos.seconds(pauseTime)), + local_head_slot = lsm1.slot, topics = "syncman" + await man.queue.resetWait(none[Slot]()) + else: + # Syncing progressing, so reduce `pauseTime` value in half, but value + # could not be less then `MinPauseTime`. + if (pauseTime shr 1) < MinPauseTime: + pauseTime = MinPauseTime + else: + pauseTime = pauseTime shr 1 + + debug "Synchronization watch loop tick", wall_head_slot = wallSlot, + local_head_slot = headSlot, queue_start_slot = man.queue.startSlot, + queue_last_slot = man.queue.lastSlot, + pause_time = $(chronos.seconds(pauseTime)), + avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed, + pending_workers_count = pending, + topics = "syncman" + + proc averageSpeedTask() {.async.} = while true: let wallSlot = man.getLocalWallSlot() let headSlot = man.getLocalHeadSlot() let lsm1 = SyncMoment.now(man.getLocalHeadSlot()) await sleepAsync(chronos.seconds(int(SECONDS_PER_SLOT))) let lsm2 = SyncMoment.now(man.getLocalHeadSlot()) - - let (map, sleeping, waiting, pending) = man.getWorkersStats() - if pending == 0: - man.syncSpeed = 0.0 - else: - if (lsm2.slot - lsm1.slot == 0'u64) and (pending > 1): - info "Syncing process is not progressing, reset the queue", - pending_workers_count = pending, - to_slot = man.queue.outSlot, - local_head_slot = lsm1.slot, topics = "syncman" - await man.queue.resetWait(none[Slot]()) + let bps = + if lsm2.slot - lsm1.slot == 0'u64: + 0.0 else: - man.syncSpeed = speed(lsm1, lsm2) + speed(lsm1, lsm2) + inc(man.syncCount) + man.insSyncSpeed = bps + man.avgSyncSpeed = man.avgSyncSpeed + + (bps - man.avgSyncSpeed) / float(man.syncCount) - trace "Synchronization loop tick", wall_head_slot = wallSlot, - local_head_slot = headSlot, queue_start_slot = man.queue.startSlot, - queue_last_slot = man.queue.lastSlot, - sync_speed = man.syncSpeed, pending_workers_count = pending, - topics = "syncman" - - traceAsyncErrors watchAndSpeedTask() + asyncSpawn watchTask() + asyncSpawn averageSpeedTask() while true: let wallSlot = man.getLocalWallSlot() @@ -969,17 +1010,20 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = let (map, sleeping, waiting, pending) = man.getWorkersStats() - trace "Current syncing state", workers_map = map, + debug "Current syncing state", workers_map = map, sleeping_workers_count = sleeping, waiting_workers_count = waiting, pending_workers_count = pending, wall_head_slot = wallSlot, local_head_slot = headSlot, + pause_time = $chronos.seconds(pauseTime), + avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed, topics = "syncman" # Update status string man.syncStatus = map & ":" & $pending & ":" & - man.syncSpeed.formatBiggestFloat(ffDecimal, 4) & - " (" & $man.queue.outSlot & ")" + man.insSyncSpeed.formatBiggestFloat(ffDecimal, 4) & ":" & + man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) & + " (" & $man.queue.outSlot & ")" if headAge <= man.maxHeadAge: man.notInSyncEvent.clear()