diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index d755f7589..e69c3454d 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -995,6 +995,14 @@ when hasPrompt: balance += b formatGwei(balance) + of "sync_status": + if isNil(node.syncManager): + "pending" + else: + if node.syncManager.inProgress: + node.syncManager.syncStatus + else: + "synced" else: # We ignore typos for now and just render the expression # as it was written. TODO: come up with a good way to show diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 1d8f89043..26a0c2330 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -185,7 +185,8 @@ type defaultValue: "peers: $connected_peers;" & "finalized: $finalized_root:$finalized_epoch;" & "head: $head_root:$head_epoch:$head_epoch_slot;" & - "time: $epoch:$epoch_slot ($slot)|" + "time: $epoch:$epoch_slot ($slot);" & + "sync: $sync_status|" desc: "Textual template for the contents of the status bar" name: "status-bar-contents" }: string diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index e8a71d27f..eb453073c 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -29,6 +29,9 @@ const PeerScoreMissingBlocks* = -200 ## Peer response contains too many empty blocks. + SyncWorkersCount* = 20 + ## Number of sync workers to spawn + type SyncFailureKind* = enum StatusInvalid, @@ -69,14 +72,19 @@ type debtsQueue: HeapQueue[SyncRequest[T]] debtsCount: uint64 readyQueue: HeapQueue[SyncResult[T]] - suspects: seq[SyncResult[T]] outQueue: AsyncQueue[BlockEntry] + SyncWorkerStatus* {.pure.} = enum + Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Processing + + SyncWorker*[A, B] = object + future: Future[void] + status: SyncWorkerStatus + SyncManager*[A, B] = ref object pool: PeerPool[A, B] responseTimeout: chronos.Duration sleepTime: chronos.Duration - maxWorkersCount: int maxStatusAge: uint64 maxHeadAge: uint64 maxRecurringFailures: int @@ -84,12 +92,19 @@ type getLocalHeadSlot: GetSlotCallback getLocalWallSlot: GetSlotCallback getFinalizedSlot: GetSlotCallback + workers: array[SyncWorkersCount, SyncWorker[A, B]] + notInSyncEvent: AsyncEvent + rangeAge: uint64 + inRangeEvent*: AsyncEvent + notInRangeEvent*: AsyncEvent chunkSize: uint64 queue: SyncQueue[A] failures: seq[SyncFailure[A]] syncFut: Future[void] outQueue: AsyncQueue[BlockEntry] inProgress*: bool + syncSpeed*: float + syncStatus*: string SyncMoment* = object stamp*: chronos.Moment @@ -585,22 +600,21 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], getLocalWallSlotCb: GetSlotCallback, getFinalizedSlotCb: GetSlotCallback, outputQueue: AsyncQueue[BlockEntry], - maxWorkers = 10, maxStatusAge = uint64(SLOTS_PER_EPOCH * 4), maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), sleepTime = (int(SLOTS_PER_EPOCH) * int(SECONDS_PER_SLOT)).seconds, chunkSize = uint64(SLOTS_PER_EPOCH), toleranceValue = uint64(1), - maxRecurringFailures = 3 + maxRecurringFailures = 3, + rangeAge = uint64(SLOTS_PER_EPOCH * 4) ): SyncManager[A, B] = - let queue = SyncQueue.init(A, getFinalizedSlotCb(), getLocalWallSlotCb(), + let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(), chunkSize, getFinalizedSlotCb, outputQueue, 1) result = SyncManager[A, B]( pool: pool, - maxWorkersCount: maxWorkers, maxStatusAge: maxStatusAge, getLocalHeadSlot: getLocalHeadSlotCb, getLocalWallSlot: getLocalWallSlotCb, @@ -610,7 +624,11 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], sleepTime: sleepTime, chunkSize: chunkSize, queue: queue, - outQueue: outputQueue + outQueue: outputQueue, + notInSyncEvent: newAsyncEvent(), + inRangeEvent: newAsyncEvent(), + notInRangeEvent: newAsyncEvent(), + rangeAge: rangeAge ) proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, @@ -642,6 +660,9 @@ template headAge(): uint64 = template peerAge(): uint64 = if peerSlot > wallSlot: 0'u64 else: wallSlot - peerSlot +template queueAge(): uint64 = + wallSlot - man.queue.outSlot + template checkPeerScore(peer, body: untyped): untyped = mixin getScore let currentScore = peer.getScore() @@ -656,33 +677,25 @@ template checkPeerScore(peer, body: untyped): untyped = func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 = man.queue.len -proc syncWorker*[A, B](man: SyncManager[A, B], - peer: A): Future[A] {.async.} = - # Sync worker is the lowest level loop which performs syncing with single - # peer. - # - # Logic here is pretty simple: - # 1. Obtain request from SyncQueue. - # 2. Send this request to a peer and obtain response. - # 3. Push response to the SyncQueue, (doesn't matter if it success or failure) - # 4. Update main SyncQueue last slot with wall time slot number. - # 5. From time to time we also requesting peer's status information. - # 6. If our current head slot is near equal to peer's head slot we are - # exiting this loop and finishing that sync-worker task. - # 7. Repeat - +proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = mixin getKey, getScore, getHeadSlot - debug "Starting syncing with peer", peer = peer, - peer_score = peer.getScore(), - peer_speed = peer.netKbps(), - topics = "syncman" - try: - while true: - var wallSlot = man.getLocalWallSlot() - var headSlot = man.getLocalHeadSlot() + debug "Starting syncing worker", index = index, topics = "syncman" + + while true: + man.workers[index].status = SyncWorkerStatus.Sleeping + # This event is going to be set until we are not in sync with network + await man.notInSyncEvent.wait() + + man.workers[index].status = SyncWorkerStatus.WaitingPeer + let peer = await man.pool.acquire() + + try: + let wallSlot = man.getLocalWallSlot() + let headSlot = man.getLocalHeadSlot() var peerSlot = peer.getHeadSlot() + # We updating SyncQueue's last slot all the time man.queue.updateLastSlot(wallSlot) debug "Peer's syncing status", wall_clock_slot = wallSlot, @@ -690,26 +703,28 @@ proc syncWorker*[A, B](man: SyncManager[A, B], peer_score = peer.getScore(), peer = peer, peer_speed = peer.netKbps(), topics = "syncman" + # Check if peer's head slot is bigger than our wall clock slot. if peerSlot > wallSlot + man.toleranceValue: # Our wall timer is broken, or peer's status information is invalid. - debug "Local timer is broken or peer's status information is invalid", + warn "Local timer is broken or peer's status information is invalid", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, peer = peer, tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(), peer_score = peer.getScore(), topics = "syncman" let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer) man.failures.add(failure) - break + continue + # Check if we need to update peer's status information if peerAge >= man.maxStatusAge: - # Peer's status information is very old, we going to update it. + # Peer's status information is very old, its time to update it + man.workers[index].status = SyncWorkerStatus.UpdatingStatus debug "Updating peer's status information", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, peer = peer, peer_score = peer.getScore(), peer_speed = peer.netKbps(), topics = "syncman" - checkPeerScore peer: - let res = await peer.updateStatus() + let res = await peer.updateStatus() if not(res): peer.updateScore(PeerScoreNoStatus) @@ -718,37 +733,43 @@ proc syncWorker*[A, B](man: SyncManager[A, B], peer_speed = peer.netKbps(), topics = "syncman" let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer) man.failures.add(failure) - break + continue let newPeerSlot = peer.getHeadSlot() if peerSlot >= newPeerSlot: peer.updateScore(PeerScoreStaleStatus) - debug "Peer's status information is stale, exiting", + debug "Peer's status information is stale", wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, peer = peer, peer_score = peer.getScore(), peer_speed = peer.netKbps(), topics = "syncman" - let failure = SyncFailure.init(SyncFailureKind.StatusStale, peer) - man.failures.add(failure) - break - - debug "Peer's status information updated", wall_clock_slot = wallSlot, - remote_old_head_slot = peerSlot, local_head_slot = headSlot, - remote_new_head_slot = newPeerSlot, peer = peer, - peer_score = peer.getScore(), peer_speed = peer.netKbps(), - topics = "syncman" - peer.updateScore(PeerScoreGoodStatus) - peerSlot = newPeerSlot + else: + debug "Peer's status information updated", wall_clock_slot = wallSlot, + remote_old_head_slot = peerSlot, local_head_slot = headSlot, + remote_new_head_slot = newPeerSlot, peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + topics = "syncman" + peer.updateScore(PeerScoreGoodStatus) + peerSlot = newPeerSlot if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge): debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, peer = peer, peer_score = peer.getScore(), peer_speed = peer.netKbps(), topics = "syncman" - break + continue + man.workers[index].status = SyncWorkerStatus.Requesting let req = man.queue.pop(peerSlot, peer) if req.isEmpty(): + # SyncQueue could return empty request in 2 cases: + # 1. There no more slots in SyncQueue to download (we are synced, but + # our ``notInSyncEvent`` is not yet cleared). + # 2. Current peer's known head slot is too low to satisfy request. + # + # To avoid endless loop we going to wait for RESP_TIMEOUT time here. + # This time is enough for all pending requests to finish and it is also + # enough for main sync loop to clear ``notInSyncEvent``. debug "Empty request received from queue, exiting", peer = peer, local_head_slot = headSlot, remote_head_slot = peerSlot, queue_input_slot = man.queue.inpSlot, @@ -756,25 +777,17 @@ proc syncWorker*[A, B](man: SyncManager[A, B], queue_last_slot = man.queue.lastSlot, peer_speed = peer.netKbps(), peer_score = peer.getScore(), topics = "syncman" - # Sometimes when syncing is almost done but last requests are still - # pending, this can fall into endless cycle, when low number of peers - # are available in PeerPool. We going to wait for RESP_TIMEOUT time, - # so all pending requests should be finished at this moment. - checkPeerScore peer: - await sleepAsync(RESP_TIMEOUT) - - let failure = SyncFailure.init(SyncFailureKind.EmptyProblem, peer) - man.failures.add(failure) - break + await sleepAsync(RESP_TIMEOUT) + continue + man.workers[index].status = SyncWorkerStatus.Downloading debug "Creating new request for peer", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, peer = peer, peer_speed = peer.netKbps(), peer_score = peer.getScore(), topics = "syncman" - checkPeerScore peer: - let blocks = await man.getBlocks(peer, req) + let blocks = await man.getBlocks(peer, req) if blocks.isOk: let data = blocks.get() @@ -795,11 +808,12 @@ proc syncWorker*[A, B](man: SyncManager[A, B], topics = "syncman" let failure = SyncFailure.init(SyncFailureKind.BadResponse, peer) man.failures.add(failure) - break + continue # Scoring will happen in `syncUpdate`. - checkPeerScore peer, - await man.queue.push(req, data) + man.workers[index].status = SyncWorkerStatus.Processing + await man.queue.push(req, data) + # Cleaning up failures. man.failures.setLen(0) else: @@ -812,170 +826,140 @@ proc syncWorker*[A, B](man: SyncManager[A, B], topics = "syncman" let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer) man.failures.add(failure) - break - result = peer - finally: - man.pool.release(peer) + except CatchableError as exc: + debug "Unexpected exception happened", topics = "syncman", + excName = $exc.name, excMsg = exc.msg + finally: + man.pool.release(peer) + +proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, + sleeping: int, + waiting: int, + pending: int] = + var map = newString(len(man.workers)) + var sleeping, waiting, pending: int + for i in 0 ..< len(man.workers): + var ch: char + case man.workers[i].status + of SyncWorkerStatus.Sleeping: + ch = 's' + inc(sleeping) + of SyncWorkerStatus.WaitingPeer: + ch = 'w' + inc(waiting) + of SyncWorkerStatus.UpdatingStatus: + ch = 'U' + inc(pending) + of SyncWorkerStatus.Requesting: + ch = 'R' + inc(pending) + of SyncWorkerStatus.Downloading: + ch = 'D' + inc(pending) + of SyncWorkerStatus.Processing: + ch = 'P' + inc(pending) + map[i] = ch + (map, sleeping, waiting, pending) proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = - # This procedure manages main loop of SyncManager and in this loop it - # performs - # 1. It checks for current sync status, "are we synced?". - # 2. If we are in active syncing, it tries to acquire new peers from PeerPool - # and spawns new sync-workers. Number of spawned sync-workers can be - # controlled by `maxWorkersCount` value. - # 3. It stops spawning sync-workers when we are "in sync". - # 4. It calculates syncing performance. mixin getKey, getScore - var pending = newSeq[Future[A]]() - var acquireFut: Future[A] - var syncSpeed: float = 0.0 - template workersCount(): int = - if isNil(acquireFut): len(pending) else: (len(pending) - 1) + # Starting all sync workers + for i in 0 ..< len(man.workers): + man.workers[i].future = syncWorker[A, B](man, i) - proc watchTask() {.async.} = + debug "Synchronization loop started", topics = "syncman" + + proc watchAndSpeedTask() {.async.} = while true: + let wallSlot = man.getLocalWallSlot() + let headSlot = man.getLocalHeadSlot() let lsm1 = SyncMoment.now(man.getLocalHeadSlot()) await sleepAsync(chronos.seconds(int(SECONDS_PER_SLOT))) let lsm2 = SyncMoment.now(man.getLocalHeadSlot()) - if workersCount() == 0: - syncSpeed = 0.0 + + let (map, sleeping, waiting, pending) = man.getWorkersStats() + if pending == 0: + man.syncSpeed = 0.0 else: - if (lsm2.slot - lsm1.slot == 0'u64) and (workersCount() > 1): + if (lsm2.slot - lsm1.slot == 0'u64) and (pending > 1): debug "Syncing process is not progressing, reset the queue", - workers_count = workersCount(), + pending_workers_count = pending, to_slot = man.queue.outSlot, local_head_slot = lsm1.slot, topics = "syncman" await man.queue.resetWait(none[Slot]()) else: - syncSpeed = speed(lsm1, lsm2) + man.syncSpeed = speed(lsm1, lsm2) - debug "Synchronization loop started", topics = "syncman" + debug "Synchronization loop tick", wall_head_slot = wallSlot, + local_head_slot = headSlot, queue_start_slot = man.queue.startSlot, + queue_last_slot = man.queue.lastSlot, + sync_speed = man.syncSpeed, pending_workers_count = pending, + topics = "syncman" - traceAsyncErrors watchTask() + traceAsyncErrors watchAndSpeedTask() while true: let wallSlot = man.getLocalWallSlot() let headSlot = man.getLocalHeadSlot() - let progress = - if headSlot <= man.queue.lastSlot: - man.queue.progress() - else: - 100'u64 + let (map, sleeping, waiting, pending) = man.getWorkersStats() - debug "Synchronization loop tick", wall_head_slot = wallSlot, - local_head_slot = headSlot, queue_status = progress, - queue_start_slot = man.queue.startSlot, - queue_last_slot = man.queue.lastSlot, - waiting_for_new_peer = $not(isNil(acquireFut)), - sync_speed = syncSpeed, workers_count = workersCount(), + debug "Current syncing state", workers_map = map, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, + pending_workers_count = pending, + wall_head_slot = wallSlot, local_head_slot = headSlot, topics = "syncman" - var temp = newSeqOfCap[Future[A]](len(pending)) - for fut in pending: - if fut.finished(): - if fut == acquireFut: - # We acquired new peer from PeerPool. - if acquireFut.failed(): - debug "Synchronization loop failed to get new peer", - wall_head_slot = wallSlot, local_head_slot = headSlot, - workers_count = workersCount(), - errMsg = acquireFut.readError().msg, topics = "syncman" - else: - let peer = acquireFut.read() - if headAge <= man.maxHeadAge: - # If we are already in sync, we going to release just acquired - # peer and do not acquire peers - debug "Synchronization loop reached sync barrier", peer = peer, - wall_head_slot = wallSlot, local_head_slot = headSlot, - peer_score = peer.getScore(), topics = "syncman" - man.pool.release(peer) - else: - if headSlot > man.queue.lastSlot: - debug "Synchronization lost, restoring", - wall_head_slot = wallSlot, local_head_slot = headSlot, - queue_last_slot = man.queue.lastSlot, topics = "syncman" - man.queue = SyncQueue.init(A, man.getFinalizedSlot(), wallSlot, - man.chunkSize, man.getFinalizedSlot, - man.outQueue, 1) - - debug "Synchronization loop starting new worker", peer = peer, - wall_head_slot = wallSlot, local_head_slot = headSlot, - peer_score = peer.getScore(), peer_speed = peer.netKbps(), - topics = "syncman" - temp.add(syncWorker(man, peer)) - - # We will create new `acquireFut` later. - acquireFut = nil - else: - # We got worker finished its work - if fut.failed(): - debug "Synchronization loop got worker finished with an error", - wall_head_slot = wallSlot, local_head_slot = headSlot, - errMsg = fut.readError().msg, topics = "syncman" - else: - let peer = fut.read() - debug "Synchronization loop got worker finished", - wall_head_slot = wallSlot, local_head_slot = headSlot, - peer = peer, peer_score = peer.getScore(), - peer_speed = peer.netKbps(), - topics = "syncman" - else: - if fut == acquireFut: - # Task which waits for new peer from PeerPool is not yet finished. - if headAge <= man.maxHeadAge: - debug "Synchronization loop reached sync barrier", - wall_head_slot = wallSlot, local_head_slot = headSlot, - topics = "syncman" - acquireFut = nil - fut.cancel() - else: - temp.add(fut) - else: - temp.add(fut) - - pending = temp + # Update status string + man.syncStatus = map & ":" & $pending & ":" & + man.syncSpeed.formatBiggestFloat(ffDecimal, 4) & + " (" & $man.queue.outSlot & ")" if headAge <= man.maxHeadAge: - debug "Synchronization loop sleeping", wall_head_slot = wallSlot, - local_head_slot = headSlot, workers_count = workersCount(), - difference = (wallSlot - headSlot), - max_head_age = man.maxHeadAge, topics = "syncman" - if len(pending) == 0: - man.inProgress = false - await sleepAsync(man.sleepTime) + man.notInSyncEvent.clear() + # We are marking SyncManager as not working only when we are in sync and + # all sync workers are in `Sleeping` state. + if pending > 0: + debug "Synchronization loop waits for workers completion", + wall_head_slot = wallSlot, local_head_slot = headSlot, + difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, pending_workers_count = pending, + topics = "syncman" + man.inProgress = true else: - debug "Synchronization loop waiting for workers completion", - workers_count = workersCount(), topics = "syncman" - discard await withTimeout(one(pending), man.sleepTime) + debug "Synchronization loop sleeping", wall_head_slot = wallSlot, + local_head_slot = headSlot, difference = (wallSlot - headSlot), + max_head_age = man.maxHeadAge, topics = "syncman" + man.inProgress = false else: + man.notInSyncEvent.fire() man.inProgress = true - if isNil(acquireFut) and len(pending) < man.maxWorkersCount: - acquireFut = man.pool.acquire() - pending.add(acquireFut) - debug "Synchronization loop waiting for new peer", - wall_head_slot = wallSlot, local_head_slot = headSlot, - workers_count = workersCount(), topics = "syncman", - sleep_time = $man.sleepTime - else: - debug "Synchronization loop waiting for workers", - wall_head_slot = wallSlot, local_head_slot = headSlot, - workers_count = workersCount(), topics = "syncman", - sleep_time = $man.sleep_time + if queueAge <= man.rangeAge: + # We are in requested range ``man.rangeAge``. + man.inRangeEvent.fire() + man.notInRangeEvent.clear() + else: + # We are not in requested range anymore ``man.rangeAge``. + man.inRangeEvent.clear() + man.notInRangeEvent.fire() - discard await withTimeout(one(pending), man.sleepTime) - - if len(man.failures) > man.maxRecurringFailures and (workersCount() > 1): + if len(man.failures) > man.maxRecurringFailures and pending > 1: debug "Number of recurring failures exceeds limit, reseting queue", - workers_count = workers_count(), rec_failures = len(man.failures) + pending_workers_count = pending, sleeping_workers_count = sleeping, + waiting_workers_count = waiting, rec_failures = len(man.failures), + topics = "syncman" # Cleaning up failures. man.failures.setLen(0) await man.queue.resetWait(none[Slot]()) + await sleepAsync(chronos.seconds(2)) + proc start*[A, B](man: SyncManager[A, B]) = ## Starts SyncManager's main loop. man.syncFut = man.syncLoop()