diff --git a/beacon_chain/networking/peer_scores.nim b/beacon_chain/networking/peer_scores.nim index a356bb74d..b4c27dda9 100644 --- a/beacon_chain/networking/peer_scores.nim +++ b/beacon_chain/networking/peer_scores.nim @@ -17,8 +17,6 @@ const PeerScoreInvalidRequest* = -500 ## This peer is sending malformed or nonsensical data - PeerScoreHeadTooNew* = -100 - ## The peer reports a head newer than our wall clock slot PeerScoreNoStatus* = -100 ## Peer did not answer `status` request. PeerScoreStaleStatus* = -50 diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 2bd94be4d..d51022e00 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -38,6 +38,9 @@ type Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Queueing, Processing + SyncManagerFlag* {.pure.} = enum + NoMonitor + SyncWorker*[A, B] = object future: Future[void] status: SyncWorkerStatus @@ -46,7 +49,6 @@ type pool: PeerPool[A, B] responseTimeout: chronos.Duration maxHeadAge: uint64 - toleranceValue: uint64 getLocalHeadSlot: GetSlotCallback getLocalWallSlot: GetSlotCallback getSafeSlot: GetSlotCallback @@ -66,6 +68,7 @@ type syncStatus*: string direction: SyncQueueKind ident*: string + flags: set[SyncManagerFlag] SyncMoment* = object stamp*: chronos.Moment @@ -118,7 +121,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], blockVerifier: BlockVerifier, maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), chunkSize = uint64(SLOTS_PER_EPOCH), - toleranceValue = uint64(1), + flags: set[SyncManagerFlag] = {}, ident = "main" ): SyncManager[A, B] = let (getFirstSlot, getLastSlot, getSafeSlot) = case direction @@ -140,7 +143,8 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], blockVerifier: blockVerifier, notInSyncEvent: newAsyncEvent(), direction: direction, - ident: ident + ident: ident, + flags: flags ) res.initQueue() res @@ -256,17 +260,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = headSlot = man.getLocalHeadSlot() wallSlot = man.getLocalWallSlot() - if peerSlot > wallSlot + man.toleranceValue: - # If the peer reports a head slot higher than our wall slot, something is - # wrong: our clock is off or the peer is on a different network (or - # dishonest) - peer.updateScore(PeerScoreHeadTooNew) - - warn "Peer reports a head newer than our wall clock - clock out of sync?", - wall_clock_slot = wallSlot, remote_head_slot = peerSlot, - local_head_slot = headSlot, tolerance_value = man.toleranceValue - return - if man.remainingSlots() <= man.maxHeadAge: logScope: peer = peer @@ -491,6 +484,21 @@ proc toTimeLeftString*(d: Duration): string = res = res & "00m" res +proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void], + speedTaskFut: Future[void]) {.async.} = + guardTaskFut.cancel() + speedTaskFut.cancel() + await allFutures(guardTaskFut, speedTaskFut) + let pendingTasks = + block: + var res: seq[Future[void]] + for worker in man.workers: + doAssert(worker.status in {Sleeping, WaitingPeer}) + worker.future.cancel() + res.add(worker.future) + res + await allFutures(pendingTasks) + proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = logScope: sync_ident = man.ident @@ -595,11 +603,20 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = case man.direction of SyncQueueKind.Forward: if man.inProgress: - man.inProgress = false - debug "Forward synchronization process finished, sleeping", - wall_head_slot = wallSlot, local_head_slot = headSlot, - difference = (wallSlot - headSlot), - max_head_age = man.maxHeadAge + if SyncManagerFlag.NoMonitor in man.flags: + await man.syncClose(guardTaskFut, averageSpeedTaskFut) + man.inProgress = false + debug "Forward synchronization process finished, exiting", + wall_head_slot = wallSlot, local_head_slot = headSlot, + difference = (wallSlot - headSlot), + max_head_age = man.maxHeadAge + break + else: + man.inProgress = false + debug "Forward synchronization process finished, sleeping", + wall_head_slot = wallSlot, local_head_slot = headSlot, + difference = (wallSlot - headSlot), + max_head_age = man.maxHeadAge else: debug "Synchronization loop sleeping", wall_head_slot = wallSlot, local_head_slot = headSlot, @@ -611,19 +628,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = # workers, guard task and speed calculation task). # We first need to cancel and wait for guard task, because otherwise # it will be able to restore cancelled workers. - guardTaskFut.cancel() - averageSpeedTaskFut.cancel() - await allFutures(guardTaskFut, averageSpeedTaskFut) - let pendingTasks = - block: - var res: seq[Future[void]] - for worker in man.workers: - # Because `pending == 0` there should be no active workers. - doAssert(worker.status in {Sleeping, WaitingPeer}) - worker.future.cancel() - res.add(worker.future) - res - await allFutures(pendingTasks) + await man.syncClose(guardTaskFut, averageSpeedTaskFut) man.inProgress = false debug "Backward synchronization process finished, exiting", wall_head_slot = wallSlot, local_head_slot = headSlot,