diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 6fabdd0d2..7e8eb83e5 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -60,6 +60,7 @@ type vcProcess*: Process requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerID] + backfiller*: SyncManager[Peer, PeerID] genesisSnapshotContent*: string actionTracker*: ActionTracker processor*: ref Eth2Processor diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index c3217998b..9fb3df5ac 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -233,12 +233,10 @@ type name: "weak-subjectivity-checkpoint" }: Option[Checkpoint] finalizedCheckpointState* {. - hidden # TODO unhide when backfilling is done desc: "SSZ file specifying a recent finalized state" name: "finalized-checkpoint-state" }: Option[InputFile] finalizedCheckpointBlock* {. - hidden # TODO unhide when backfilling is done desc: "SSZ file specifying a recent finalized block" name: "finalized-checkpoint-block" }: Option[InputFile] diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 4c9a76a06..a8246bf74 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -1664,3 +1664,6 @@ proc getBlockSSZ*(dag: ChainDAGRef, id: BlockId, bytes: var seq[byte]): bool = dag.db.getAltairBlockSSZ(id.root, bytes) of BeaconBlockFork.Bellatrix: dag.db.getMergeBlockSSZ(id.root, bytes) + +func needsBackfill*(dag: ChainDAGRef): bool = + dag.backfill.slot > dag.genesis.slot diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index c2ea8eacb..9ad2babdb 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -467,9 +467,13 @@ proc init*(T: type BeaconNode, validatorPool, syncCommitteeMsgPool, quarantine, rng, getBeaconTime, taskpool) syncManager = newSyncManager[Peer, PeerID]( - network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, - getFirstSlotAtFinalizedEpoch, getBackfillSlot, dag.tail.slot, - blockVerifier) + network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, + getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, + dag.tail.slot, blockVerifier) + backfiller = newSyncManager[Peer, PeerID]( + network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot, + getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, + dag.backfill.slot, blockVerifier, maxHeadAge = 0) let stateTtlCache = if config.restCacheSize > 0: StateTtlCache.init( @@ -500,6 +504,7 @@ proc init*(T: type BeaconNode, eventBus: eventBus, requestManager: RequestManager.init(network, blockVerifier), syncManager: syncManager, + backfiller: backfiller, actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets), processor: processor, blockProcessor: blockProcessor, @@ -917,6 +922,11 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = # above, this will be done just before the next slot starts await node.updateGossipStatus(slot + 1) +proc syncStatus(node: BeaconNode): string = + if node.syncManager.inProgress: node.syncManager.syncStatus + elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus + else: "synced" + proc onSlotStart( node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} = ## Called at the beginning of a slot - usually every slot, but sometimes might @@ -936,9 +946,7 @@ proc onSlotStart( info "Slot start", slot = shortLog(wallSlot), epoch = shortLog(wallSlot.epoch), - sync = - if node.syncManager.inProgress: node.syncManager.syncStatus - else: "synced", + sync = node.syncStatus(), peers = len(node.network.peerPool), head = shortLog(node.dag.head), finalized = shortLog(getStateField( @@ -1127,6 +1135,18 @@ proc stop(node: BeaconNode) = node.db.close() notice "Databases closed" +proc startBackfillTask(node: BeaconNode) {.async.} = + while node.dag.needsBackfill: + if not node.syncManager.inProgress: + # Only start the backfiller if it's needed _and_ head sync has completed - + # if we lose sync after having synced head, we could stop the backfilller, + # but this should be a fringe case - might as well keep the logic simple for + # now + node.backfiller.start() + return + + await sleepAsync(chronos.seconds(2)) + proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} = bnStatus = BeaconNodeStatus.Running @@ -1150,6 +1170,8 @@ proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} = node.requestManager.start() node.syncManager.start() + if node.dag.needsBackfill(): asyncSpawn node.startBackfillTask() + waitFor node.updateGossipStatus(wallSlot) asyncSpawn runSlotLoop(node, wallTime, onSlotStart) @@ -1327,13 +1349,7 @@ proc initStatusBar(node: BeaconNode) {.raises: [Defect, ValueError].} = formatGwei(node.attachedValidatorBalanceTotal) of "sync_status": - if isNil(node.syncManager): - "pending" - else: - if node.syncManager.inProgress: - node.syncManager.syncStatus - else: - "synced" + node.syncStatus() else: # We ignore typos for now and just render the expression # as it was written. TODO: come up with a good way to show diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 070e7ad9b..b7c652780 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -89,9 +89,24 @@ proc speed*(start, finish: SyncMoment): float {.inline.} = slots / dur proc initQueue[A, B](man: SyncManager[A, B]) = - man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(), - man.getLastSlot(), man.chunkSize, - man.getSafeSlot, man.blockVerifier, 1) + case man.direction + of SyncQueueKind.Forward: + man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(), + man.getLastSlot(), man.chunkSize, + man.getSafeSlot, man.blockVerifier, 1) + of SyncQueueKind.Backward: + let + firstSlot = man.getFirstSlot() + lastSlot = man.getLastSlot() + startSlot = if firstSlot == lastSlot: + # This case should never be happened in real life because + # there is present check `needsBackfill(). + firstSlot + else: + Slot(firstSlot - 1'u64) + man.queue = SyncQueue.init(A, man.direction, firstSlot, lastSlot, + man.chunkSize, man.getSafeSlot, + man.blockVerifier, 1) proc newSyncManager*[A, B](pool: PeerPool[A, B], direction: SyncQueueKind, @@ -112,9 +127,10 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], of SyncQueueKind.Forward: (getLocalHeadSlotCb, getLocalWallSlotCb, getFinalizedSlotCb) of SyncQueueKind.Backward: - (getBackfillSlotCb, GetSlotCallback(proc(): Slot = Slot(0)), getBackfillSlotCb) + (getBackfillSlotCb, GetSlotCallback(proc(): Slot = Slot(0)), + getBackfillSlotCb) - result = SyncManager[A, B]( + var res = SyncManager[A, B]( pool: pool, maxStatusAge: maxStatusAge, getLocalHeadSlot: getLocalHeadSlotCb, @@ -130,7 +146,8 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], notInSyncEvent: newAsyncEvent(), direction: direction ) - result.initQueue() + res.initQueue() + res proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, req: SyncRequest): Future[BeaconBlocksRes] {.async.} = @@ -141,176 +158,124 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, peer_score = peer.getScore(), peer_speed = peer.netKbps(), direction = man.direction, topics = "syncman" if peer.useSyncV2(): - var workFut = awaitne beaconBlocksByRange_v2(peer, req.slot, req.count, req.step) - if workFut.failed(): - debug "Error, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, - errMsg = workFut.readError().msg, peer_speed = peer.netKbps(), - direction = man.direction, topics = "syncman" - else: - let res = workFut.read() - if res.isErr: - debug "Error, while reading getBlocks response", + let res = + try: + await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step) + except CancelledError: + debug "Interrupt, while waiting getBlocks response", peer = peer, + slot = req.slot, slot_count = req.count, step = req.step, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + return + except CatchableError as exc: + debug "Error, while waiting getBlocks response", peer = peer, + slot = req.slot, slot_count = req.count, step = req.step, + errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(), + direction = man.direction, topics = "syncman" + return + if res.isErr(): + debug "Error, while reading getBlocks response", peer = peer, slot = req.slot, count = req.count, step = req.step, peer_speed = peer.netKbps(), - direction = man.direction, topics = "syncman", error = $res.error() - result = res + direction = man.direction, topics = "syncman", + error = $res.error() + return + return res else: - var workFut = awaitne beaconBlocksByRange(peer, req.slot, req.count, req.step) - if workFut.failed(): - debug "Error, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, - errMsg = workFut.readError().msg, peer_speed = peer.netKbps(), - direction = man.direction, topics = "syncman" - else: - let res = workFut.read() - if res.isErr: - debug "Error, while reading getBlocks response", - peer = peer, slot = req.slot, count = req.count, - step = req.step, peer_speed = peer.netKbps(), - topics = "syncman", error = $res.error() - result = res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: blcks.mapIt(ForkedSignedBeaconBlock.init(it)) + let res = + try: + await beaconBlocksByRange(peer, req.slot, req.count, req.step) + except CancelledError: + debug "Interrupt, while waiting getBlocks response", peer = peer, + slot = req.slot, slot_count = req.count, step = req.step, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + return + except CatchableError as exc: + debug "Error, while waiting getBlocks response", peer = peer, + slot = req.slot, slot_count = req.count, step = req.step, + errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(), + direction = man.direction, topics = "syncman" + return + if res.isErr(): + debug "Error, while reading getBlocks response", + peer = peer, slot = req.slot, count = req.count, + step = req.step, peer_speed = peer.netKbps(), + direction = man.direction, error = $res.error(), + topics = "syncman" + return + let forked = + res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: + blcks.mapIt(ForkedSignedBeaconBlock.init(it)) + return forked -template headAge(): uint64 = - wallSlot - headSlot - -template peerStatusAge(): Duration = - Moment.now() - peer.state(BeaconSync).statusLastTime +proc remainingSlots(man: SyncManager): uint64 = + if man.direction == SyncQueueKind.Forward: + man.getLastSlot() - man.getFirstSlot() + else: + man.getFirstSlot() - man.getLastSlot() proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = - let wallSlot = man.getLocalWallSlot() - let headSlot = man.getLocalHeadSlot() - var peerSlot = peer.getHeadSlot() + var + headSlot = man.getLocalHeadSlot() + wallSlot = man.getLocalWallSlot() + peerSlot = peer.getHeadSlot() - debug "Peer's syncing status", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot, - peer_score = peer.getScore(), peer = peer, index = index, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" - - # Check if peer's head slot is bigger than our wall clock slot. - if peerSlot > wallSlot + man.toleranceValue: - peer.updateScore(PeerScoreHeadTooNew) - - warn "Peer reports a head newer than our wall clock - clock out of sync?", - wall_clock_slot = wallSlot, remote_head_slot = peerSlot, - local_head_slot = headSlot, peer = peer, index = index, - tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(), - peer_score = peer.getScore(), direction = man.direction, - topics = "syncman" - return - - # Check if we need to update peer's status information - if peerStatusAge >= StatusExpirationTime: - # Peer's status information is very old, its time to update it - man.workers[index].status = SyncWorkerStatus.UpdatingStatus - trace "Updating peer's status information", wall_clock_slot = wallSlot, + block: # Check that peer status is recent and relevant + debug "Peer's syncing status", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, - peer = peer, peer_score = peer.getScore(), index = index, + peer_score = peer.getScore(), peer = peer, index = index, peer_speed = peer.netKbps(), direction = man.direction, topics = "syncman" - try: - let res = await peer.updateStatus() - if not(res): - peer.updateScore(PeerScoreNoStatus) - debug "Failed to get remote peer's status, exiting", peer = peer, - peer_score = peer.getScore(), peer_head_slot = peerSlot, - peer_speed = peer.netKbps(), index = index, - direction = man.direction, topics = "syncman" - return - except CatchableError as exc: - debug "Unexpected exception while updating peer's status", - peer = peer, peer_score = peer.getScore(), - peer_head_slot = peerSlot, peer_speed = peer.netKbps(), - index = index, errMsg = exc.msg, direction = man.direction, - topics = "syncman" - return + let + peerStatusAge = Moment.now() - peer.state(BeaconSync).statusLastTime + needsUpdate = + # Latest status we got is old + peerStatusAge >= StatusExpirationTime or + # The point we need to sync is close to where the peer is + man.getFirstSlot() >= peerSlot - let newPeerSlot = peer.getHeadSlot() - if peerSlot >= newPeerSlot: - peer.updateScore(PeerScoreStaleStatus) - debug "Peer's status information is stale", - wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, - local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, + if needsUpdate: + man.workers[index].status = SyncWorkerStatus.UpdatingStatus + + # Avoid a stampede of requests, but make them more frequent in case the + # peer is "close" to the slot range of interest + if peerStatusAge < StatusExpirationTime div 2: + await sleepAsync(StatusExpirationTime div 2 - peerStatusAge) + + trace "Updating peer's status information", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, peer = peer, peer_score = peer.getScore(), index = index, peer_speed = peer.netKbps(), direction = man.direction, topics = "syncman" - else: - debug "Peer's status information updated", wall_clock_slot = wallSlot, - remote_old_head_slot = peerSlot, local_head_slot = headSlot, - remote_new_head_slot = newPeerSlot, peer = peer, - peer_score = peer.getScore(), peer_speed = peer.netKbps(), - index = index, direction = man.direction, topics = "syncman" - peer.updateScore(PeerScoreGoodStatus) - peerSlot = newPeerSlot - if headAge <= man.maxHeadAge: - info "We are in sync with network", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot, - peer = peer, peer_score = peer.getScore(), index = index, - peer_speed = peer.netKbps(), topics = "syncman" - # We clear SyncManager's `notInSyncEvent` so all the workers will become - # sleeping soon. - man.notInSyncEvent.clear() - return - - if headSlot >= peerSlot - man.maxHeadAge: - debug "We are in sync with peer; refreshing peer's status information", - wall_clock_slot = wallSlot, remote_head_slot = peerSlot, - local_head_slot = headSlot, peer = peer, peer_score = peer.getScore(), - index = index, peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" - - man.workers[index].status = SyncWorkerStatus.UpdatingStatus - - if peerStatusAge <= StatusUpdateInterval: - await sleepAsync(StatusUpdateInterval - peerStatusAge) - - try: - let res = await peer.updateStatus() - if not(res): - peer.updateScore(PeerScoreNoStatus) - debug "Failed to get remote peer's status, exiting", peer = peer, - peer_score = peer.getScore(), peer_head_slot = peerSlot, - peer_speed = peer.netKbps(), index = index, - direction = man.direction, topics = "syncman" + try: + let res = await peer.updateStatus() + if not(res): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, exiting", peer = peer, + peer_score = peer.getScore(), peer_head_slot = peerSlot, + peer_speed = peer.netKbps(), index = index, + direction = man.direction, topics = "syncman" + return + except CatchableError as exc: + debug "Unexpected exception while updating peer's status", + peer = peer, peer_score = peer.getScore(), + peer_head_slot = peerSlot, peer_speed = peer.netKbps(), + index = index, errMsg = exc.msg, direction = man.direction, + topics = "syncman" return - except CatchableError as exc: - debug "Unexpected exception while updating peer's status", - peer = peer, peer_score = peer.getScore(), - peer_head_slot = peerSlot, peer_speed = peer.netKbps(), - index = index, errMsg = exc.msg, direction = man.direction, - topics = "syncman" - return - let newPeerSlot = peer.getHeadSlot() - if peerSlot >= newPeerSlot: - peer.updateScore(PeerScoreStaleStatus) - debug "Peer's status information is stale", - wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, - local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, - peer = peer, peer_score = peer.getScore(), index = index, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" - else: - # This is not very good solution because we should not discriminate and/or - # penalize peers which are in sync process too, but their latest head is - # lower then our latest head. We should keep connections with such peers - # (so this peers are able to get in sync using our data), but we should - # not use this peers for syncing because this peers are useless for us. - # Right now we decreasing peer's score a bit, so it will not be - # disconnected due to low peer's score, but new fresh peers could replace - # peers with low latest head. - if headSlot >= newPeerSlot - man.maxHeadAge: - # Peer's head slot is still lower then ours. - debug "Peer's head slot is lower then local head slot", + let newPeerSlot = peer.getHeadSlot() + if peerSlot >= newPeerSlot: + peer.updateScore(PeerScoreStaleStatus) + debug "Peer's status information is stale", wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, - peer = peer, peer_score = peer.getScore(), - peer_speed = peer.netKbps(), index = index, - direction = man.direction, topics = "syncman" - peer.updateScore(PeerScoreUseless) + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" else: debug "Peer's status information updated", wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, local_head_slot = headSlot, @@ -320,10 +285,70 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = peer.updateScore(PeerScoreGoodStatus) peerSlot = newPeerSlot + # Time passed - enough to move slots, if sleep happened + headSlot = man.getLocalHeadSlot() + wallSlot = man.getLocalWallSlot() + + if peerSlot > wallSlot + man.toleranceValue: + # If the peer reports a head slot higher than our wall slot, something is + # wrong: our clock is off or the peer is on a different network (or + # dishonest) + peer.updateScore(PeerScoreHeadTooNew) + + warn "Peer reports a head newer than our wall clock - clock out of sync?", + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_head_slot = headSlot, peer = peer, index = index, + tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(), + peer_score = peer.getScore(), direction = man.direction, + topics = "syncman" + return + + if man.remainingSlots() <= man.maxHeadAge: + case man.direction + of SyncQueueKind.Forward: + info "We are in sync with network", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + of SyncQueueKind.Backward: + info "Backfill complete", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + + # We clear SyncManager's `notInSyncEvent` so all the workers will become + # sleeping soon. + man.notInSyncEvent.clear() return - # We updating SyncQueue's last slot all the time - man.queue.updateLastSlot(man.getLastSlot()) + # Find out if the peer potentially can give useful blocks - in the case of + # forward sync, they can be useful if they have blocks newer than our head - + # in the case of backwards sync, they're useful if they have blocks newer than + # the backfill point + if man.getFirstSlot() >= peerSlot: + # This is not very good solution because we should not discriminate and/or + # penalize peers which are in sync process too, but their latest head is + # lower then our latest head. We should keep connections with such peers + # (so this peers are able to get in sync using our data), but we should + # not use this peers for syncing because this peers are useless for us. + # Right now we decreasing peer's score a bit, so it will not be + # disconnected due to low peer's score, but new fresh peers could replace + # peers with low latest head. + debug "Peer's head slot is lower then local head slot", + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_last_slot = man.getLastSlot(), + local_first_slot = man.getFirstSlot(), peer = peer, + peer_score = peer.getScore(), + peer_speed = peer.netKbps(), index = index, + direction = man.direction, topics = "syncman" + peer.updateScore(PeerScoreUseless) + return + + if man.direction == SyncQueueKind.Forward: + # Wall clock keeps ticking, so we need to update the queue + man.queue.updateLastSlot(man.getLastSlot()) man.workers[index].status = SyncWorkerStatus.Requesting let req = man.queue.pop(peerSlot, peer) @@ -357,7 +382,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = try: let blocks = await man.getBlocks(peer, req) - if blocks.isOk: + if blocks.isOk(): let data = blocks.get() let smap = getShortMap(req, data) debug "Received blocks on request", blocks_count = len(data), @@ -393,26 +418,47 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = except CatchableError as exc: debug "Unexpected exception while receiving blocks", - request_slot = req.slot, request_count = req.count, - request_step = req.step, peer = peer, index = index, - peer_score = peer.getScore(), peer_speed = peer.netKbps(), - errMsg = exc.msg, direction = man.direction, topics = "syncman" + request_slot = req.slot, request_count = req.count, + request_step = req.step, peer = peer, index = index, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + errName = exc.name, errMsg = exc.msg, direction = man.direction, + topics = "syncman" return proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = mixin getKey, getScore, getHeadSlot - debug "Starting syncing worker", - index = index, direction = man.direction, topics = "syncman" + debug "Starting syncing worker", index = index, direction = man.direction, + topics = "syncman" while true: - man.workers[index].status = SyncWorkerStatus.Sleeping - # This event is going to be set until we are not in sync with network - await man.notInSyncEvent.wait() - man.workers[index].status = SyncWorkerStatus.WaitingPeer - let peer = await man.pool.acquire() - await man.syncStep(index, peer) - man.pool.release(peer) + var peer: A = nil + let doBreak = + try: + man.workers[index].status = SyncWorkerStatus.Sleeping + # This event is going to be set until we are not in sync with network + await man.notInSyncEvent.wait() + man.workers[index].status = SyncWorkerStatus.WaitingPeer + peer = await man.pool.acquire() + await man.syncStep(index, peer) + man.pool.release(peer) + false + except CancelledError: + if not(isNil(peer)): + man.pool.release(peer) + true + except CatchableError as exc: + debug "Unexpected exception in sync worker", + peer = peer, index = index, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + errName = exc.name, errMsg = exc.msg, direction = man.direction, + topics = "syncman" + true + if doBreak: + break + + debug "Sync worker stopped", index = index, direction = man.direction, + topics = "syncman" proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, sleeping: int, @@ -462,7 +508,8 @@ proc guardTask[A, B](man: SyncManager[A, B]) {.async.} = let index = pending.find(failFuture) if failFuture.failed(): warn "Synchronization worker stopped working unexpectedly with an error", - index = index, errMsg = failFuture.error.msg, direction = man.direction + index = index, errMsg = failFuture.error.msg, + direction = man.direction else: warn "Synchronization worker stopped working unexpectedly without error", index = index, direction = man.direction @@ -501,9 +548,10 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = mixin getKey, getScore var pauseTime = 0 - asyncSpawn man.guardTask() + var guardTaskFut = man.guardTask() - debug "Synchronization loop started", topics = "syncman" + debug "Synchronization loop started", topics = "syncman", + direction = man.direction proc averageSpeedTask() {.async.} = while true: @@ -535,7 +583,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = stamp = newStamp - asyncSpawn averageSpeedTask() + var averageSpeedTaskFut = averageSpeedTask() while true: let wallSlot = man.getLocalWallSlot() @@ -573,7 +621,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) & "slots/s (" & map & ":" & $man.queue.outSlot & ")" - if headAge <= man.maxHeadAge: + if man.remainingSlots() <= man.maxHeadAge: man.notInSyncEvent.clear() # We are marking SyncManager as not working only when we are in sync and # all sync workers are in `Sleeping` state. @@ -584,13 +632,51 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = sleeping_workers_count = sleeping, waiting_workers_count = waiting, pending_workers_count = pending, direction = man.direction, topics = "syncman" + # We already synced, so we should reset all the pending workers from + # any state they have. + man.queue.clearAndWakeup() man.inProgress = true else: - debug "Synchronization loop sleeping", wall_head_slot = wallSlot, - local_head_slot = headSlot, difference = (wallSlot - headSlot), - max_head_age = man.maxHeadAge, direction = man.direction, - topics = "syncman" - man.inProgress = false + case man.direction + of SyncQueueKind.Forward: + if man.inProgress: + man.inProgress = false + debug "Forward synchronization process finished, sleeping", + wall_head_slot = wallSlot, local_head_slot = headSlot, + difference = (wallSlot - headSlot), + max_head_age = man.maxHeadAge, direction = man.direction, + topics = "syncman" + else: + debug "Synchronization loop sleeping", wall_head_slot = wallSlot, + local_head_slot = headSlot, + difference = (wallSlot - headSlot), + max_head_age = man.maxHeadAge, direction = man.direction, + topics = "syncman" + of SyncQueueKind.Backward: + # Backward syncing is going to be executed only once, so we exit loop + # and stop all pending tasks which belongs to this instance (sync + # workers, guard task and speed calculation task). + # We first need to cancel and wait for guard task, because otherwise + # it will be able to restore cancelled workers. + guardTaskFut.cancel() + averageSpeedTaskFut.cancel() + await allFutures(guardTaskFut, averageSpeedTaskFut) + let pendingTasks = + block: + var res: seq[Future[void]] + for worker in man.workers: + # Because `pending == 0` there should be no active workers. + doAssert(worker.status in {Sleeping, WaitingPeer}) + worker.future.cancel() + res.add(worker.future) + res + await allFutures(pendingTasks) + debug "Backward synchronization process finished, exiting", + wall_head_slot = wallSlot, local_head_slot = headSlot, + backfill_slot = man.getLastSlot(), + max_head_age = man.maxHeadAge, direction = man.direction, + topics = "syncman" + break else: if not(man.notInSyncEvent.isSet()): # We get here only if we lost sync for more then `maxHeadAge` period. @@ -598,6 +684,12 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = man.initQueue() man.notInSyncEvent.fire() man.inProgress = true + debug "Node lost sync for more then preset period", + period = man.maxHeadAge, wall_head_slot = wallSlot, + local_head_slot = headSlot, + missing_slots = man.remainingSlots(), + progress = float(man.queue.progress()), + topics = "syncman" else: man.notInSyncEvent.fire() man.inProgress = true diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 318ddf0e3..7bf1233ce 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -76,6 +76,8 @@ type SyncManagerError* = object of CatchableError BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]] +chronicles.formatIt SyncQueueKind: $it + proc getShortMap*[T](req: SyncRequest[T], data: openArray[ForkedSignedBeaconBlock]): string = ## Returns all slot numbers in ``data`` as placement map. @@ -303,6 +305,10 @@ proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = sq.wakeupWaiters(true) discard await waitChanges +proc clearAndWakeup*[T](sq: SyncQueue[T]) = + sq.pending.clear() + sq.wakeupWaiters(true) + proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = ## Perform reset of all the blocked waiters in SyncQueue. ## @@ -409,7 +415,8 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, rewind_epoch_count = rewind.epochCount, - finalized_epoch = finalizedEpoch + finalized_epoch = finalizedEpoch, direction = sq.kind, + topics = "syncman" 0'u64 else: # `MissingParent` happened at different slot so we going to rewind for @@ -419,7 +426,8 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, rewind_epoch_count = rewind.epochCount, - finalized_epoch = finalizedEpoch + finalized_epoch = finalizedEpoch, direction = sq.kind, + topics = "syncman" 0'u64 else: 1'u64 @@ -429,7 +437,8 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, warn "Сould not rewind further than the last finalized epoch", finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, - finalized_epoch = finalizedEpoch + finalized_epoch = finalizedEpoch, direction = sq.kind, + topics = "syncman" 0'u64 else: 1'u64 @@ -438,7 +447,8 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, warn "Unable to continue syncing, please restart the node", finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, - finalized_epoch = finalizedEpoch + finalized_epoch = finalizedEpoch, direction = sq.kind, + topics = "syncman" # Calculate the rewind epoch, which will be equal to last rewind point or # finalizedEpoch let rewindEpoch = @@ -459,7 +469,8 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, # latest stored block. if failSlot == safeSlot: warn "Unable to continue syncing, please restart the node", - safe_slot = safeSlot, fail_slot = failSlot + safe_slot = safeSlot, fail_slot = failSlot, direction = sq.kind, + topics = "syncman" safeSlot iterator blocks*[T](sq: SyncQueue[T], @@ -552,7 +563,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], blocks_count = len(sq.readyQueue[0].data), output_slot = sq.outSlot, input_slot = sq.inpSlot, peer = sq.readyQueue[0].request.item, rewind_to_slot = rewindSlot, - topics = "syncman" + direction = sq.readyQueue[0].request.kind, topics = "syncman" await sq.resetWait(some(rewindSlot)) break @@ -594,7 +605,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], request_step = item.request.step, blocks_map = getShortMap(item.request, item.data), blocks_count = len(item.data), errCode = res.error, - topics = "syncman" + direction = item.request.kind, topics = "syncman" var resetSlot: Option[Slot] @@ -617,7 +628,8 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], finalized_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), topics = "syncman" + blocks_map = getShortMap(req, item.data), + direction = req.kind, topics = "syncman" resetSlot = some(rewindSlot) req.item.updateScore(PeerScoreMissingBlocks) else: @@ -625,18 +637,21 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], peer = req.item, to_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), topics = "syncman" + blocks_map = getShortMap(req, item.data), + direction = req.kind, topics = "syncman" req.item.updateScore(PeerScoreBadBlocks) of SyncQueueKind.Backward: if safeSlot > req.slot: let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot) - warn "Unexpected missing parent, rewind happens", + # It's quite common peers give us fewer blocks than we ask for + info "Gap in block range response, rewinding", peer = req.item, rewind_to_slot = rewindSlot, rewind_fail_slot = failSlot.get(), finalized_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), topics = "syncman" + blocks_map = getShortMap(req, item.data), + direction = req.kind, topics = "syncman" resetSlot = some(rewindSlot) req.item.updateScore(PeerScoreMissingBlocks) else: @@ -644,14 +659,16 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], peer = req.item, to_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), topics = "syncman" + blocks_map = getShortMap(req, item.data), + direction = req.kind, topics = "syncman" req.item.updateScore(PeerScoreBadBlocks) of BlockError.Invalid: let req = item.request warn "Received invalid sequence of blocks", peer = req.item, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), topics = "syncman" + blocks_map = getShortMap(req, item.data), + direction = req.kind, topics = "syncman" req.item.updateScore(PeerScoreBadBlocks) of BlockError.Duplicate, BlockError.UnviableFork: raiseAssert "Handled above" @@ -666,11 +683,11 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, rewind_epoch_count = sq.rewind.get().epochCount, rewind_fail_slot = sq.rewind.get().failSlot, - reset_slot = resetSlot, topics = "syncman" + reset_slot = resetSlot, direction = sq.kind, topics = "syncman" of SyncQueueKind.Backward: debug "Rewind to slot was happened", reset_slot = reset_slot.get(), queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, - reset_slot = resetSlot, topics = "syncman" + reset_slot = resetSlot, direction = sq.kind, topics = "syncman" break proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =