Some syncing fixes (#1919)
* Add exponential rewind on MissingParent. * Try to avoid peers which are useless for syncing. Fix forward sync restart at proper point. Fix getLocalWallSlot() to not return slots from the future. * Fix incorrect logs. * Fix logging. Enable peer's status messages log on DEBUG level. * Fix watch task to monitor operation progress, but not local head progress. * Add more logging information. Remove recurring failures detection mechanism.
This commit is contained in:
parent
81f4fe0783
commit
3f5c7c36bc
|
@ -639,9 +639,7 @@ proc startSyncManager(node: BeaconNode) =
|
|||
node.chainDag.head.slot
|
||||
|
||||
proc getLocalWallSlot(): Slot =
|
||||
let epoch = node.beaconClock.now().slotOrZero.compute_epoch_at_slot() +
|
||||
1'u64
|
||||
epoch.compute_start_slot_at_epoch()
|
||||
node.beaconClock.now().slotOrZero
|
||||
|
||||
func getFirstSlotAtFinalizedEpoch(): Slot =
|
||||
node.chainDag.finalizedHead.slot
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import chronicles
|
||||
import options, deques, heapqueue, tables, strutils, sequtils, math, algorithm
|
||||
import stew/results, chronos, chronicles
|
||||
import spec/[datatypes, digest], peer_pool, eth2_network
|
||||
import spec/[datatypes, digest, helpers], peer_pool, eth2_network
|
||||
|
||||
import ./eth2_processor
|
||||
import block_pools/block_pools_types
|
||||
|
@ -15,6 +15,8 @@ const
|
|||
## Peer did not answer `status` request.
|
||||
PeerScoreStaleStatus* = -50
|
||||
## Peer's `status` answer do not progress in time.
|
||||
PeerScoreUseless* = -10
|
||||
## Peer's latest head is lower then ours.
|
||||
PeerScoreGoodStatus* = 50
|
||||
## Peer's `status` answer is fine.
|
||||
PeerScoreNoBlocks* = -100
|
||||
|
@ -63,6 +65,10 @@ type
|
|||
future: Future[bool]
|
||||
request: SyncRequest[T]
|
||||
|
||||
RewindPoint = object
|
||||
failSlot: Slot
|
||||
epochCount: uint64
|
||||
|
||||
SyncQueue*[T] = ref object
|
||||
inpSlot*: Slot
|
||||
outSlot*: Slot
|
||||
|
@ -71,12 +77,14 @@ type
|
|||
chunkSize*: uint64
|
||||
queueSize*: int
|
||||
counter*: uint64
|
||||
opcounter*: uint64
|
||||
pending*: Table[uint64, SyncRequest[T]]
|
||||
waiters: seq[SyncWaiter[T]]
|
||||
getFinalizedSlot*: GetSlotCallback
|
||||
debtsQueue: HeapQueue[SyncRequest[T]]
|
||||
debtsCount: uint64
|
||||
readyQueue: HeapQueue[SyncResult[T]]
|
||||
rewind: Option[RewindPoint]
|
||||
outQueue: AsyncQueue[BlockEntry]
|
||||
|
||||
SyncWorkerStatus* {.pure.} = enum
|
||||
|
@ -92,7 +100,6 @@ type
|
|||
sleepTime: chronos.Duration
|
||||
maxStatusAge: uint64
|
||||
maxHeadAge: uint64
|
||||
maxRecurringFailures: int
|
||||
toleranceValue: uint64
|
||||
getLocalHeadSlot: GetSlotCallback
|
||||
getLocalWallSlot: GetSlotCallback
|
||||
|
@ -104,7 +111,6 @@ type
|
|||
notInRangeEvent*: AsyncEvent
|
||||
chunkSize: uint64
|
||||
queue: SyncQueue[A]
|
||||
failures: seq[SyncFailure[A]]
|
||||
syncFut: Future[void]
|
||||
outQueue: AsyncQueue[BlockEntry]
|
||||
inProgress*: bool
|
||||
|
@ -413,6 +419,43 @@ proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) {.inline.} =
|
|||
sq.debtsQueue.push(sr)
|
||||
sq.debtsCount = sq.debtsCount + sr.count
|
||||
|
||||
proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
|
||||
finalizedSlot: Slot): Slot =
|
||||
# Calculate exponential rewind point in number of epochs.
|
||||
let epochCount =
|
||||
if sq.rewind.isSome():
|
||||
let rewind = sq.rewind.get()
|
||||
if failSlot == rewind.failSlot:
|
||||
# `MissingParent` happened at same slot so we increase rewind point by
|
||||
# factor of 2.
|
||||
let epochs = rewind.epochCount * 2
|
||||
sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochs))
|
||||
epochs
|
||||
else:
|
||||
# `MissingParent` happened at different slot so we going to rewind for
|
||||
# 1 epoch only.
|
||||
sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: 1'u64))
|
||||
1'u64
|
||||
else:
|
||||
# `MissingParent` happened first time.
|
||||
sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: 1'u64))
|
||||
1'u64
|
||||
|
||||
# Calculate the latest finalized epoch.
|
||||
let finalizedEpoch = compute_epoch_at_slot(finalizedSlot)
|
||||
|
||||
# Calculate the rewind epoch, which should not be less than the latest
|
||||
# finalized epoch.
|
||||
let rewindEpoch =
|
||||
block:
|
||||
let failEpoch = compute_epoch_at_slot(failSlot)
|
||||
if failEpoch < finalizedEpoch + epochCount:
|
||||
finalizedEpoch
|
||||
else:
|
||||
failEpoch - epochCount
|
||||
|
||||
compute_start_slot_at_epoch(rewindEpoch)
|
||||
|
||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
data: seq[SignedBeaconBlock]) {.async, gcsafe.} =
|
||||
## Push successfull result to queue ``sq``.
|
||||
|
@ -457,16 +500,22 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
|||
|
||||
# Validating received blocks one by one
|
||||
var res: Result[void, BlockError]
|
||||
var failSlot: Option[Slot]
|
||||
if len(item.data) > 0:
|
||||
for blk in item.data:
|
||||
trace "Pushing block", block_root = blk.root,
|
||||
block_slot = blk.message.slot
|
||||
res = await sq.validate(blk)
|
||||
if not(res.isOk):
|
||||
failSlot = some(blk.message.slot)
|
||||
break
|
||||
else:
|
||||
res = Result[void, BlockError].ok()
|
||||
|
||||
# Increase progress counter, so watch task will be able to know that we are
|
||||
# not stuck.
|
||||
inc(sq.opcounter)
|
||||
|
||||
if res.isOk:
|
||||
sq.outSlot = sq.outSlot + item.request.count
|
||||
if len(item.data) > 0:
|
||||
|
@ -492,12 +541,16 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
|||
let req = item.request
|
||||
let finalizedSlot = sq.getFinalizedSlot()
|
||||
if finalizedSlot < req.slot:
|
||||
let rewindSlot = sq.getRewindPoint(failSlot.get(), finalizedSlot)
|
||||
warn "Unexpected missing parent, rewind happens",
|
||||
peer = req.item, rewind_to_slot = finalizedSlot,
|
||||
peer = req.item, rewind_to_slot = rewindSlot,
|
||||
rewind_epoch_count = sq.rewind.get().epochCount,
|
||||
rewind_fail_slot = failSlot.get(),
|
||||
finalized_slot = finalized_slot,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data), topics = "syncman"
|
||||
resetSlot = some(finalizedSlot)
|
||||
resetSlot = some(rewindSlot)
|
||||
req.item.updateScore(PeerScoreMissingBlocks)
|
||||
else:
|
||||
error "Unexpected missing parent at finalized epoch slot",
|
||||
|
@ -527,9 +580,10 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
|||
if resetSlot.isSome():
|
||||
await sq.resetWait(resetSlot)
|
||||
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
|
||||
queue_input_slot = sq.inpSlot,
|
||||
queue_output_slot = sq.outSlot,
|
||||
topics = "syncman"
|
||||
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
|
||||
rewind_epoch_count = sq.rewind.get().epochCount,
|
||||
rewind_fail_slot = sq.rewind.get().failSlot,
|
||||
reset_slot = resetSlot, topics = "syncman"
|
||||
break
|
||||
|
||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
|
||||
|
@ -617,7 +671,6 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
|||
int(SECONDS_PER_SLOT)).seconds,
|
||||
chunkSize = uint64(SLOTS_PER_EPOCH),
|
||||
toleranceValue = uint64(1),
|
||||
maxRecurringFailures = 3,
|
||||
rangeAge = uint64(SLOTS_PER_EPOCH * 4)
|
||||
): SyncManager[A, B] =
|
||||
|
||||
|
@ -631,7 +684,6 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
|||
getLocalWallSlot: getLocalWallSlotCb,
|
||||
getFinalizedSlot: getFinalizedSlotCb,
|
||||
maxHeadAge: maxHeadAge,
|
||||
maxRecurringFailures: maxRecurringFailures,
|
||||
sleepTime: sleepTime,
|
||||
chunkSize: chunkSize,
|
||||
queue: queue,
|
||||
|
@ -701,7 +753,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
|||
tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(),
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
|
||||
man.failures.add(failure)
|
||||
return
|
||||
|
||||
# Check if we need to update peer's status information
|
||||
|
@ -721,7 +772,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
|||
peer_score = peer.getScore(), peer_head_slot = peerSlot,
|
||||
peer_speed = peer.netKbps(), index = index, topics = "syncman"
|
||||
let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
|
||||
man.failures.add(failure)
|
||||
return
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected exception while updating peer's status",
|
||||
|
@ -776,7 +826,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
|||
peer_score = peer.getScore(), peer_head_slot = peerSlot,
|
||||
peer_speed = peer.netKbps(), index = index, topics = "syncman"
|
||||
let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
|
||||
man.failures.add(failure)
|
||||
return
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected exception while updating peer's status",
|
||||
|
@ -794,13 +843,30 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
|||
peer = peer, peer_score = peer.getScore(), index = index,
|
||||
peer_speed = peer.netKbps(), topics = "syncman"
|
||||
else:
|
||||
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
||||
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
|
||||
remote_new_head_slot = newPeerSlot, peer = peer,
|
||||
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||
index = index, topics = "syncman"
|
||||
peer.updateScore(PeerScoreGoodStatus)
|
||||
peerSlot = newPeerSlot
|
||||
# This is not very good solution because we should not discriminate and/or
|
||||
# penalize peers which are in sync process too, but their latest head is
|
||||
# lower then our latest head. We should keep connections with such peers
|
||||
# (so this peers are able to get in sync using our data), but we should
|
||||
# not use this peers for syncing because this peers are useless for us.
|
||||
# Right now we decreasing peer's score a bit, so it will not be
|
||||
# disconnected due to low peer's score, but new fresh peers could replace
|
||||
# peers with low latest head.
|
||||
if headSlot >= newPeerSlot - man.maxHeadAge:
|
||||
# Peer's head slot is still lower then ours.
|
||||
debug "Peer's head slot is lower then local head slot",
|
||||
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
|
||||
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
|
||||
peer = peer, peer_score = peer.getScore(),
|
||||
peer_speed = peer.netKbps(), index = index, topics = "syncman"
|
||||
peer.updateScore(PeerScoreUseless)
|
||||
else:
|
||||
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
||||
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
|
||||
remote_new_head_slot = newPeerSlot, peer = peer,
|
||||
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||
index = index, topics = "syncman"
|
||||
peer.updateScore(PeerScoreGoodStatus)
|
||||
peerSlot = newPeerSlot
|
||||
|
||||
return
|
||||
|
||||
|
@ -853,15 +919,11 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
|||
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||
index = index, topics = "syncman"
|
||||
let failure = SyncFailure.init(SyncFailureKind.BadResponse, peer)
|
||||
man.failures.add(failure)
|
||||
return
|
||||
|
||||
# Scoring will happen in `syncUpdate`.
|
||||
man.workers[index].status = SyncWorkerStatus.Processing
|
||||
await man.queue.push(req, data)
|
||||
|
||||
# Cleaning up failures.
|
||||
man.failures.setLen(0)
|
||||
else:
|
||||
peer.updateScore(PeerScoreNoBlocks)
|
||||
man.queue.push(req)
|
||||
|
@ -871,7 +933,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
|||
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||
topics = "syncman"
|
||||
let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer)
|
||||
man.failures.add(failure)
|
||||
return
|
||||
|
||||
except CatchableError as exc:
|
||||
|
@ -944,18 +1005,14 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
pauseTime = MinPauseTime
|
||||
|
||||
while true:
|
||||
let wallSlot = man.getLocalWallSlot()
|
||||
let headSlot = man.getLocalHeadSlot()
|
||||
|
||||
let lsm1 = SyncMoment.now(man.getLocalHeadSlot())
|
||||
let op1 = man.queue.opcounter
|
||||
await sleepAsync(chronos.seconds(pauseTime))
|
||||
let lsm2 = SyncMoment.now(man.getLocalHeadSlot())
|
||||
|
||||
let op2 = man.queue.opcounter
|
||||
let (map, sleeping, waiting, pending) = man.getWorkersStats()
|
||||
if pending == 0:
|
||||
pauseTime = MinPauseTime
|
||||
else:
|
||||
if (lsm2.slot - lsm1.slot == 0'u64) and (pending > 1):
|
||||
if (op2 - op1 == 0'u64) and (pending > 1):
|
||||
# Syncing is NOT progressing, we double `pauseTime` value, but value
|
||||
# could not be bigger then `MaxPauseTime`.
|
||||
if (pauseTime shl 1) > MaxPauseTime:
|
||||
|
@ -963,10 +1020,11 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
else:
|
||||
pauseTime = pauseTime shl 1
|
||||
info "Syncing process is not progressing, reset the queue",
|
||||
start_op = op1, end_op = op2,
|
||||
pending_workers_count = pending,
|
||||
to_slot = man.queue.outSlot,
|
||||
reset_to_slot = man.queue.outSlot,
|
||||
pause_time = $(chronos.seconds(pauseTime)),
|
||||
local_head_slot = lsm1.slot, topics = "syncman"
|
||||
local_head_slot = man.getLocalHeadSlot(), topics = "syncman"
|
||||
await man.queue.resetWait(none[Slot]())
|
||||
else:
|
||||
# Syncing progressing, so reduce `pauseTime` value in half, but value
|
||||
|
@ -976,13 +1034,15 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
else:
|
||||
pauseTime = pauseTime shr 1
|
||||
|
||||
debug "Synchronization watch loop tick", wall_head_slot = wallSlot,
|
||||
local_head_slot = headSlot, queue_start_slot = man.queue.startSlot,
|
||||
queue_last_slot = man.queue.lastSlot,
|
||||
pause_time = $(chronos.seconds(pauseTime)),
|
||||
avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed,
|
||||
pending_workers_count = pending,
|
||||
topics = "syncman"
|
||||
debug "Synchronization watch loop tick",
|
||||
wall_head_slot = man.getLocalWallSlot(),
|
||||
local_head_slot = man.getLocalHeadSlot(),
|
||||
queue_start_slot = man.queue.startSlot,
|
||||
queue_last_slot = man.queue.lastSlot,
|
||||
pause_time = $(chronos.seconds(pauseTime)),
|
||||
avg_sync_speed = man.avgSyncSpeed,
|
||||
ins_sync_speed = man.insSyncSpeed,
|
||||
pending_workers_count = pending, topics = "syncman"
|
||||
|
||||
proc averageSpeedTask() {.async.} =
|
||||
while true:
|
||||
|
@ -1043,8 +1103,18 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
max_head_age = man.maxHeadAge, topics = "syncman"
|
||||
man.inProgress = false
|
||||
else:
|
||||
man.notInSyncEvent.fire()
|
||||
man.inProgress = true
|
||||
if not(man.notInSyncEvent.isSet()):
|
||||
# We get here only if we lost sync for more then `maxHeadAge` period.
|
||||
if pending == 0:
|
||||
man.queue = SyncQueue.init(A, man.getLocalHeadSlot(),
|
||||
man.getLocalWallSlot(),
|
||||
man.chunkSize, man.getFinalizedSlot,
|
||||
man.outQueue, 1)
|
||||
man.notInSyncEvent.fire()
|
||||
man.inProgress = true
|
||||
else:
|
||||
man.notInSyncEvent.fire()
|
||||
man.inProgress = true
|
||||
|
||||
if queueAge <= man.rangeAge:
|
||||
# We are in requested range ``man.rangeAge``.
|
||||
|
@ -1055,15 +1125,6 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
man.inRangeEvent.clear()
|
||||
man.notInRangeEvent.fire()
|
||||
|
||||
if len(man.failures) > man.maxRecurringFailures and pending > 1:
|
||||
debug "Number of recurring failures exceeds limit, reseting queue",
|
||||
pending_workers_count = pending, sleeping_workers_count = sleeping,
|
||||
waiting_workers_count = waiting, rec_failures = len(man.failures),
|
||||
topics = "syncman"
|
||||
# Cleaning up failures.
|
||||
man.failures.setLen(0)
|
||||
await man.queue.resetWait(none[Slot]())
|
||||
|
||||
await sleepAsync(chronos.seconds(2))
|
||||
|
||||
proc start*[A, B](man: SyncManager[A, B]) =
|
||||
|
|
|
@ -206,7 +206,7 @@ p2pProtocol BeaconSync(version = 1,
|
|||
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer
|
||||
|
||||
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
|
||||
trace "Peer status", peer, statusMsg
|
||||
debug "Peer status", peer, statusMsg
|
||||
peer.state(BeaconSync).statusMsg = statusMsg
|
||||
peer.state(BeaconSync).statusLastTime = Moment.now()
|
||||
|
||||
|
|
Loading…
Reference in New Issue