NoMonitor and removed clock check for SyncManager. (#3420)

* Add `NoMonitor` flag to stop SyncManager from monitoring sync situation.

* Remove `toleranceValue` and `PeerScoreHeadTooNew`.

Co-authored-by: Etan Kissling <etan@status.im>
This commit is contained in:
Eugene Kabanov 2022-04-14 16:17:44 +03:00 committed by GitHub
parent 28ba2d5544
commit 5592c7c674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 34 deletions

View File

@ -17,8 +17,6 @@ const
PeerScoreInvalidRequest* = -500
## This peer is sending malformed or nonsensical data
PeerScoreHeadTooNew* = -100
## The peer reports a head newer than our wall clock slot
PeerScoreNoStatus* = -100
## Peer did not answer `status` request.
PeerScoreStaleStatus* = -50

View File

@ -38,6 +38,9 @@ type
Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Queueing,
Processing
SyncManagerFlag* {.pure.} = enum
NoMonitor
SyncWorker*[A, B] = object
future: Future[void]
status: SyncWorkerStatus
@ -46,7 +49,6 @@ type
pool: PeerPool[A, B]
responseTimeout: chronos.Duration
maxHeadAge: uint64
toleranceValue: uint64
getLocalHeadSlot: GetSlotCallback
getLocalWallSlot: GetSlotCallback
getSafeSlot: GetSlotCallback
@ -66,6 +68,7 @@ type
syncStatus*: string
direction: SyncQueueKind
ident*: string
flags: set[SyncManagerFlag]
SyncMoment* = object
stamp*: chronos.Moment
@ -118,7 +121,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
blockVerifier: BlockVerifier,
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
chunkSize = uint64(SLOTS_PER_EPOCH),
toleranceValue = uint64(1),
flags: set[SyncManagerFlag] = {},
ident = "main"
): SyncManager[A, B] =
let (getFirstSlot, getLastSlot, getSafeSlot) = case direction
@ -140,7 +143,8 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
blockVerifier: blockVerifier,
notInSyncEvent: newAsyncEvent(),
direction: direction,
ident: ident
ident: ident,
flags: flags
)
res.initQueue()
res
@ -256,17 +260,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
headSlot = man.getLocalHeadSlot()
wallSlot = man.getLocalWallSlot()
if peerSlot > wallSlot + man.toleranceValue:
# If the peer reports a head slot higher than our wall slot, something is
# wrong: our clock is off or the peer is on a different network (or
# dishonest)
peer.updateScore(PeerScoreHeadTooNew)
warn "Peer reports a head newer than our wall clock - clock out of sync?",
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_head_slot = headSlot, tolerance_value = man.toleranceValue
return
if man.remainingSlots() <= man.maxHeadAge:
logScope:
peer = peer
@ -491,6 +484,21 @@ proc toTimeLeftString*(d: Duration): string =
res = res & "00m"
res
proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void],
speedTaskFut: Future[void]) {.async.} =
guardTaskFut.cancel()
speedTaskFut.cancel()
await allFutures(guardTaskFut, speedTaskFut)
let pendingTasks =
block:
var res: seq[Future[void]]
for worker in man.workers:
doAssert(worker.status in {Sleeping, WaitingPeer})
worker.future.cancel()
res.add(worker.future)
res
await allFutures(pendingTasks)
proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
logScope:
sync_ident = man.ident
@ -595,11 +603,20 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
case man.direction
of SyncQueueKind.Forward:
if man.inProgress:
man.inProgress = false
debug "Forward synchronization process finished, sleeping",
wall_head_slot = wallSlot, local_head_slot = headSlot,
difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge
if SyncManagerFlag.NoMonitor in man.flags:
await man.syncClose(guardTaskFut, averageSpeedTaskFut)
man.inProgress = false
debug "Forward synchronization process finished, exiting",
wall_head_slot = wallSlot, local_head_slot = headSlot,
difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge
break
else:
man.inProgress = false
debug "Forward synchronization process finished, sleeping",
wall_head_slot = wallSlot, local_head_slot = headSlot,
difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge
else:
debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
local_head_slot = headSlot,
@ -611,19 +628,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
# workers, guard task and speed calculation task).
# We first need to cancel and wait for guard task, because otherwise
# it will be able to restore cancelled workers.
guardTaskFut.cancel()
averageSpeedTaskFut.cancel()
await allFutures(guardTaskFut, averageSpeedTaskFut)
let pendingTasks =
block:
var res: seq[Future[void]]
for worker in man.workers:
# Because `pending == 0` there should be no active workers.
doAssert(worker.status in {Sleeping, WaitingPeer})
worker.future.cancel()
res.add(worker.future)
res
await allFutures(pendingTasks)
await man.syncClose(guardTaskFut, averageSpeedTaskFut)
man.inProgress = false
debug "Backward synchronization process finished, exiting",
wall_head_slot = wallSlot, local_head_slot = headSlot,