mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-02 09:46:26 +00:00
Sync freeze fixes. (#1072)
* Add ability to reset state of sync manager. Fix bug when sync got stuck on `zero-point` reset. Fix bug when sync got stuck when some of the workers waiting for failing one. * Remove debugging comments and imports. * Remove not used pendingLock.
This commit is contained in:
parent
e238e8186c
commit
21131e629b
@ -398,13 +398,17 @@ proc runSyncLoop(node: BeaconNode) {.async.} =
|
||||
let epoch = node.beaconClock.now().toSlot().slot.compute_epoch_at_slot() + 1'u64
|
||||
result = epoch.compute_start_slot_at_epoch()
|
||||
|
||||
proc updateLocalBlocks(list: openarray[SignedBeaconBlock]): bool =
|
||||
proc updateLocalBlocks(list: openarray[SignedBeaconBlock]): Result[void, BlockError] =
|
||||
debug "Forward sync imported blocks", count = len(list),
|
||||
local_head_slot = getLocalHeadSlot()
|
||||
let sm = now(chronos.Moment)
|
||||
for blk in list:
|
||||
if node.storeBlock(blk).isErr:
|
||||
return false
|
||||
let res = node.storeBlock(blk)
|
||||
# We going to ignore `BlockError.Old` errors because we have working
|
||||
# backward sync and it can happens that we can perform overlapping
|
||||
# requests.
|
||||
if res.isErr and res.error != BlockError.Old:
|
||||
return res
|
||||
discard node.updateHead()
|
||||
|
||||
let dur = now(chronos.Moment) - sm
|
||||
@ -417,13 +421,14 @@ proc runSyncLoop(node: BeaconNode) {.async.} =
|
||||
|
||||
info "Forward sync blocks got imported sucessfully", count = len(list),
|
||||
local_head_slot = getLocalHeadSlot(), store_speed = storeSpeed
|
||||
result = true
|
||||
ok()
|
||||
|
||||
proc scoreCheck(peer: Peer): bool =
|
||||
if peer.score < PeerScoreLimit:
|
||||
if peer.score < PeerScoreLowLimit:
|
||||
try:
|
||||
debug "Peer score is too low, removing it from PeerPool", peer = peer,
|
||||
peer_score = peer.score, score_limit = PeerScoreLimit
|
||||
peer_score = peer.score, score_low_limit = PeerScoreLowLimit,
|
||||
score_high_limit = PeerScoreHighLimit
|
||||
except:
|
||||
discard
|
||||
result = false
|
||||
|
@ -187,8 +187,10 @@ const
|
||||
|
||||
NewPeerScore* = 200
|
||||
## Score which will be assigned to new connected Peer
|
||||
PeerScoreLimit* = 0
|
||||
PeerScoreLowLimit* = 0
|
||||
## Score after which peer will be kicked
|
||||
PeerScoreHighLimit* = 1000
|
||||
## Max value of peer's score
|
||||
|
||||
template neterr(kindParam: Eth2NetworkingErrorKind): auto =
|
||||
err(type(result), Eth2NetworkingError(kind: kindParam))
|
||||
@ -272,9 +274,11 @@ proc `<`*(a, b: Peer): bool =
|
||||
proc getScore*(a: Peer): int =
|
||||
result = a.score
|
||||
|
||||
proc updateScore*(peer: Peer, score: int) =
|
||||
proc updateScore*(peer: Peer, score: int) {.inline.} =
|
||||
## Update peer's ``peer`` score with value ``score``.
|
||||
peer.score = peer.score + score
|
||||
if peer.score > PeerScoreHighLimit:
|
||||
peer.score = PeerScoreHighLimit
|
||||
|
||||
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
||||
notifyOtherPeer = false) {.async.} =
|
||||
|
@ -1,9 +1,11 @@
|
||||
import chronicles
|
||||
import options, deques, heapqueue, tables, strutils, sequtils, math
|
||||
import stew/bitseqs, chronos, chronicles
|
||||
import stew/[bitseqs, results], chronos, chronicles
|
||||
import spec/datatypes, spec/digest, peer_pool, eth2_network
|
||||
import eth/async_utils
|
||||
export datatypes, digest, chronos, chronicles
|
||||
|
||||
import block_pools/block_pools_types
|
||||
export datatypes, digest, chronos, chronicles, results, block_pools_types
|
||||
|
||||
logScope:
|
||||
topics = "syncman"
|
||||
@ -25,16 +27,26 @@ const
|
||||
## Peer response contains too many empty blocks
|
||||
|
||||
type
|
||||
SyncFailureKind* = enum
|
||||
StatusInvalid,
|
||||
StatusDownload,
|
||||
StatusStale,
|
||||
EmptyProblem,
|
||||
BlockDownload
|
||||
|
||||
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
|
||||
|
||||
UpdateLocalBlocksCallback* =
|
||||
proc(list: openarray[SignedBeaconBlock]): bool {.gcsafe.}
|
||||
proc(list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.
|
||||
gcsafe.}
|
||||
|
||||
SyncUpdateCallback*[T] =
|
||||
proc(req: SyncRequest[T],
|
||||
list: openarray[SignedBeaconBlock]): bool {.gcsafe.}
|
||||
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.
|
||||
gcsafe.}
|
||||
|
||||
SyncRequest*[T] = object
|
||||
index*: uint64
|
||||
slot*: Slot
|
||||
count*: uint64
|
||||
step*: uint64
|
||||
@ -53,7 +65,10 @@ type
|
||||
chunkSize*: uint64
|
||||
queueSize*: int
|
||||
|
||||
notFullEvent*: AsyncEvent
|
||||
counter*: uint64
|
||||
pending*: Table[uint64, Slot]
|
||||
|
||||
waiters: seq[Future[bool]]
|
||||
syncUpdate*: SyncUpdateCallback[T]
|
||||
|
||||
debtsQueue: HeapQueue[SyncRequest[T]]
|
||||
@ -69,17 +84,24 @@ type
|
||||
sleepTime: chronos.Duration
|
||||
maxStatusAge: uint64
|
||||
maxHeadAge: uint64
|
||||
maxRecurringFailures: int
|
||||
toleranceValue: uint64
|
||||
getLocalHeadSlot: GetSlotCallback
|
||||
getLocalWallSlot: GetSlotCallback
|
||||
syncUpdate: SyncUpdateCallback[A]
|
||||
chunkSize: uint64
|
||||
queue: SyncQueue[A]
|
||||
failures: seq[SyncFailure[A]]
|
||||
|
||||
SyncMoment* = object
|
||||
stamp*: chronos.Moment
|
||||
slot*: Slot
|
||||
|
||||
SyncFailure*[T] = object
|
||||
kind*: SyncFailureKind
|
||||
peer*: T
|
||||
stamp*: chronos.Moment
|
||||
|
||||
SyncManagerError* = object of CatchableError
|
||||
BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]]
|
||||
|
||||
@ -127,6 +149,10 @@ proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], start: Slot,
|
||||
let count = finish - start + 1'u64
|
||||
result = SyncRequest[T](slot: start, count: count, step: 1'u64, item: item)
|
||||
|
||||
proc init*[T](t1: typedesc[SyncFailure], kind: SyncFailureKind,
|
||||
peer: T): SyncFailure[T] {.inline.} =
|
||||
result = SyncFailure[T](kind: kind, peer: peer, stamp: now(chronos.Moment))
|
||||
|
||||
proc empty*[T](t: typedesc[SyncRequest],
|
||||
t2: typedesc[T]): SyncRequest[T] {.inline.} =
|
||||
result = SyncRequest[T](step: 0'u64, count: 0'u64)
|
||||
@ -225,7 +251,9 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
|
||||
chunkSize: chunkSize,
|
||||
queueSize: queueSize,
|
||||
syncUpdate: updateCb,
|
||||
notFullEvent: newAsyncEvent(),
|
||||
waiters: newSeq[Future[bool]](),
|
||||
counter: 1'u64,
|
||||
pending: initTable[uint64, Slot](),
|
||||
debtsQueue: initHeapQueue[SyncRequest[T]](),
|
||||
inpSlot: start,
|
||||
outSlot: start
|
||||
@ -245,6 +273,11 @@ proc lastSlot*[T](req: SyncRequest[T]): Slot {.inline.} =
|
||||
## Returns last slot for request ``req``.
|
||||
result = req.slot + req.count - 1'u64
|
||||
|
||||
proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) =
|
||||
req.index = sq.counter
|
||||
sq.counter = sq.counter + 1'u64
|
||||
sq.pending[req.index] = req.slot
|
||||
|
||||
proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
|
||||
## Update last slot stored in queue ``sq`` with value ``last``.
|
||||
doAssert(sq.lastSlot <= last,
|
||||
@ -252,12 +285,70 @@ proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
|
||||
$sq.lastSlot & " <= " & $last)
|
||||
sq.lastSlot = last
|
||||
|
||||
proc wakeupWaiters[T](sq: SyncQueue[T], flag = true) {.inline.} =
|
||||
## Wakeup one or all blocked waiters.
|
||||
for waiter in sq.waiters:
|
||||
if not(waiter.finished()):
|
||||
waiter.complete(flag)
|
||||
|
||||
proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} =
|
||||
## Create new waiter and wait for completion from `wakeupWaiters()`.
|
||||
var waiter = newFuture[bool]("SyncQueue.waitForChanges")
|
||||
sq.waiters.add(waiter)
|
||||
try:
|
||||
result = await waiter
|
||||
finally:
|
||||
sq.waiters.delete(sq.waiters.find(waiter))
|
||||
|
||||
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} =
|
||||
## This procedure will perform wakeupWaiters(false) and blocks until last
|
||||
## waiter will be awakened.
|
||||
var waitChanges = sq.waitForChanges()
|
||||
sq.wakeupWaiters(false)
|
||||
discard await waitChanges
|
||||
|
||||
proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} =
|
||||
## Perform reset of all the blocked waiters in SyncQueue.
|
||||
##
|
||||
## We adding one more waiter to the waiters sequence and
|
||||
## call wakeupWaiters(false). Because our waiter is last in sequence of
|
||||
## waiters it will be resumed only after all waiters will be awakened and
|
||||
## finished.
|
||||
|
||||
# We are clearing pending list, so that all requests that are still running
|
||||
# around (still downloading, but not yet pushed to the SyncQueue) will be
|
||||
# expired. Its important to perform this call first (before await), otherwise
|
||||
# you can introduce race problem.
|
||||
sq.pending.clear()
|
||||
|
||||
# We calculating minimal slot number to which we will be able to reset,
|
||||
# without missing any blocks. There 3 sources:
|
||||
# 1. Debts queue.
|
||||
# 2. Processing queue (`inpSlot`, `outSlot`).
|
||||
# 3. Requested slot `toSlot` (which can be `zero-point` slot).
|
||||
#
|
||||
# Queue's `outSlot` is the lowest slot we added to `block_pool`, but
|
||||
# `zero-point` slot can be less then `outSlot`. `debtsQueue` holds only not
|
||||
# added slot requests, so it can't be bigger then `outSlot` value.
|
||||
var minSlot = sq.outSlot
|
||||
if toSlot.isSome():
|
||||
minSlot = min(toSlot.get(), sq.outSlot)
|
||||
sq.debtsQueue.clear()
|
||||
sq.debtsCount = 0
|
||||
sq.readyQueue.clear()
|
||||
sq.inpSlot = minSlot
|
||||
sq.outSlot = minSlot
|
||||
|
||||
# We are going to wakeup all the waiters and wait for last one.
|
||||
await sq.wakeupAndWaitWaiters()
|
||||
|
||||
proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} =
|
||||
## Returns ``true`` if response has only empty slots.
|
||||
## Returns ``true`` if response chain of blocks is empty (has only empty
|
||||
## slots).
|
||||
len(sr.data) == 0
|
||||
|
||||
proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} =
|
||||
## Returns ``true`` if response has gap at the end.
|
||||
## Returns ``true`` if response chain of blocks has gap at the end.
|
||||
let lastslot = sr.request.slot + sr.request.count - 1'u64
|
||||
if len(sr.data) == 0:
|
||||
return true
|
||||
@ -274,28 +365,54 @@ proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} =
|
||||
else:
|
||||
sr.data[^1].message.slot
|
||||
|
||||
proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) {.inline.} =
|
||||
sq.debtsQueue.push(sr)
|
||||
sq.debtsCount = sq.debtsCount + sr.count
|
||||
|
||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
data: seq[SignedBeaconBlock]) {.async, gcsafe.} =
|
||||
## Push successfull result to queue ``sq``.
|
||||
mixin updateScore
|
||||
|
||||
if sr.index notin sq.pending:
|
||||
# If request `sr` not in our pending list, it only means that
|
||||
# SyncQueue.resetWait() happens and all pending requests are expired, so
|
||||
# we swallow `old` requests, and in such way sync-workers are able to get
|
||||
# proper new requests from SyncQueue.
|
||||
return
|
||||
|
||||
sq.pending.del(sr.index)
|
||||
|
||||
# This is backpressure handling algorithm, this algorithm is blocking
|
||||
# all pending `push` requests if `request.slot` not in range:
|
||||
# [current_queue_slot, current_queue_slot + sq.queueSize * sq.chunkSize].
|
||||
var exitNow = false
|
||||
while true:
|
||||
if (sq.queueSize > 0) and
|
||||
(sr.slot >= sq.outSlot + uint64(sq.queueSize) * sq.chunkSize):
|
||||
await sq.notFullEvent.wait()
|
||||
sq.notFullEvent.clear()
|
||||
continue
|
||||
let res = SyncResult[T](request: sr, data: data)
|
||||
sq.readyQueue.push(res)
|
||||
let res = await sq.waitForChanges()
|
||||
if res:
|
||||
continue
|
||||
else:
|
||||
# SyncQueue reset happens (it can't be `zero-point` reset, or continous
|
||||
# failure reset). We are exiting to wake up sync-worker.
|
||||
exitNow = true
|
||||
break
|
||||
let syncres = SyncResult[T](request: sr, data: data)
|
||||
sq.readyQueue.push(syncres)
|
||||
exitNow = false
|
||||
break
|
||||
|
||||
if exitNow:
|
||||
return
|
||||
|
||||
while len(sq.readyQueue) > 0:
|
||||
let minSlot = sq.readyQueue[0].request.slot
|
||||
if sq.outSlot != minSlot:
|
||||
break
|
||||
let item = sq.readyQueue.pop()
|
||||
let res = sq.syncUpdate(item.request, item.data)
|
||||
if res:
|
||||
if res.isOk:
|
||||
if sq.zeroPoint.isSome():
|
||||
if item.isEmpty():
|
||||
# If the `zeropoint` is set and response is empty, we will add this
|
||||
@ -346,67 +463,96 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
sq.zeroPoint = some(item.getLastNonEmptySlot())
|
||||
|
||||
sq.outSlot = sq.outSlot + item.request.count
|
||||
sq.notFullEvent.fire()
|
||||
sq.wakeupWaiters()
|
||||
else:
|
||||
# TODO: At this point we can handle different `block_pool` errors,
|
||||
# because `Parent missing` errors are not so harsh like `Incorrect block`
|
||||
# errors.
|
||||
debug "Block pool rejected peer's response", peer = item.request.item,
|
||||
request_slot = item.request.slot,
|
||||
request_count = item.request.count,
|
||||
request_step = item.request.step,
|
||||
blocks_map = getShortMap(item.request, item.data),
|
||||
blocks_count = len(item.data)
|
||||
blocks_count = len(item.data), errCode = res.error
|
||||
|
||||
if sq.zeroPoint.isSome():
|
||||
# If the `zeropoint` is set and we are unable to store response in
|
||||
# `block_pool` we are going to revert suspicious responses list.
|
||||
var resetSlot: Option[Slot]
|
||||
|
||||
# If `zeropoint` is set, suspicious list should not be empty.
|
||||
var req: SyncRequest[T]
|
||||
if isEmpty(sq.suspects[0]):
|
||||
# If initial suspicious response is an empty list, then previous chunk
|
||||
# of blocks did not have a gap at the end. So we are going to request
|
||||
# suspicious response one more time without any changes.
|
||||
req = sq.suspects[0].request
|
||||
else:
|
||||
# If initial suspicious response is not an empty list, we are going to
|
||||
# request only gap at the end of the suspicious response.
|
||||
let startSlot = sq.suspects[0].getLastNonEmptySlot() + 1'u64
|
||||
let lastSlot = sq.suspects[0].request.lastSlot()
|
||||
req = SyncRequest.init(T, startSlot, lastSlot)
|
||||
if res.error == BlockError.MissingParent:
|
||||
if sq.zeroPoint.isSome():
|
||||
# If the `zeropoint` is set and we are unable to store response in
|
||||
# `block_pool` we are going to revert suspicious responses list.
|
||||
|
||||
debug "Resolve joker's problem", request_slot = req.slot,
|
||||
request_count = req.count,
|
||||
request_step = req.step,
|
||||
# If `zeropoint` is set, suspicious list should not be empty.
|
||||
var req: SyncRequest[T]
|
||||
if isEmpty(sq.suspects[0]):
|
||||
# If initial suspicious response is an empty list, then previous
|
||||
# chunk of blocks did not have a gap at the end. So we are going to
|
||||
# request suspicious response one more time without any changes.
|
||||
req = sq.suspects[0].request
|
||||
else:
|
||||
# If initial suspicious response is not an empty list, we are going
|
||||
# to request only gap at the end of the suspicious response.
|
||||
let startSlot = sq.suspects[0].getLastNonEmptySlot() + 1'u64
|
||||
let lastSlot = sq.suspects[0].request.lastSlot()
|
||||
req = SyncRequest.init(T, startSlot, lastSlot)
|
||||
|
||||
debug "Resolve joker's problem", request_slot = req.slot,
|
||||
request_count = req.count,
|
||||
request_step = req.step,
|
||||
suspects_count = (len(sq.suspects) - 1)
|
||||
|
||||
sq.suspects[0].request.item.updateScore(PeerScoreJokeBlocks)
|
||||
sq.suspects[0].request.item.updateScore(PeerScoreJokeBlocks)
|
||||
|
||||
sq.debtsQueue.push(req)
|
||||
sq.debtsCount = sq.debtsCount + req.count
|
||||
sq.toDebtsQueue(req)
|
||||
# We move all left suspicious responses to the debts queue.
|
||||
if len(sq.suspects) > 1:
|
||||
for i in 1 ..< len(sq.suspects):
|
||||
sq.toDebtsQueue(sq.suspects[i].request)
|
||||
sq.suspects[i].request.item.updateScore(PeerScoreJokeBlocks)
|
||||
|
||||
# We move all left suspicious responses to the debts queue.
|
||||
if len(sq.suspects) > 1:
|
||||
for i in 1 ..< len(sq.suspects):
|
||||
sq.debtsQueue.push(sq.suspects[i].request)
|
||||
sq.debtsCount = sq.debtsCount + sq.suspects[i].request.count
|
||||
sq.suspects[i].request.item.updateScore(PeerScoreJokeBlocks)
|
||||
|
||||
# Reset state to the `zeropoint`.
|
||||
sq.suspects.setLen(0)
|
||||
sq.outSlot = sq.zeroPoint.get()
|
||||
sq.zeroPoint = none[Slot]()
|
||||
# Reset state to the `zeropoint`.
|
||||
sq.suspects.setLen(0)
|
||||
resetSlot = sq.zeroPoint
|
||||
sq.zeroPoint = none[Slot]()
|
||||
else:
|
||||
# If we got `BlockError.MissingParent` and `zero-point` is not set
|
||||
# it means that peer returns chain of blocks with holes.
|
||||
let req = item.request
|
||||
warn "Received sequence of blocks with holes", peer = req.item,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
req.item.updateScore(PeerScoreBadBlocks)
|
||||
elif res.error == BlockError.Invalid:
|
||||
let req = item.request
|
||||
warn "Received invalid sequence of blocks", peer = req.item,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
req.item.updateScore(PeerScoreBadBlocks)
|
||||
else:
|
||||
let req = item.request
|
||||
warn "Received unexpected response from block_pool", peer = req.item,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data), errorCode = res.error
|
||||
req.item.updateScore(PeerScoreBadBlocks)
|
||||
|
||||
# We need to move failed response to the debts queue.
|
||||
sq.debtsQueue.push(item.request)
|
||||
sq.debtsCount = sq.debtsCount + item.request.count
|
||||
sq.toDebtsQueue(item.request)
|
||||
if resetSlot.isSome():
|
||||
await sq.resetWait(resetSlot)
|
||||
debug "Zero-point reset happens", queue_input_slot = sq.inpSlot,
|
||||
queue_output_slot = sq.outSlot
|
||||
break
|
||||
|
||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
|
||||
## Push failed request back to queue.
|
||||
sq.debtsQueue.push(sr)
|
||||
sq.debtsCount = sq.debtsCount + sr.count
|
||||
if sr.index notin sq.pending:
|
||||
# If request `sr` not in our pending list, it only means that
|
||||
# SyncQueue.resetWait() happens and all pending requests are expired, so
|
||||
# we swallow `old` requests, and in such way sync-workers are able to get
|
||||
# proper new requests from SyncQueue.
|
||||
return
|
||||
sq.pending.del(sr.index)
|
||||
sq.toDebtsQueue(sr)
|
||||
|
||||
proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
|
||||
if len(sq.debtsQueue) > 0:
|
||||
@ -417,12 +563,14 @@ proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
|
||||
if sr.lastSlot() <= maxSlot:
|
||||
sq.debtsCount = sq.debtsCount - sr.count
|
||||
sr.setItem(item)
|
||||
sq.makePending(sr)
|
||||
return sr
|
||||
|
||||
let sr1 = SyncRequest.init(T, sr.slot, maxslot, item)
|
||||
var sr1 = SyncRequest.init(T, sr.slot, maxslot, item)
|
||||
let sr2 = SyncRequest.init(T, maxslot + 1'u64, sr.lastSlot())
|
||||
sq.debtsQueue.push(sr2)
|
||||
sq.debtsCount = sq.debtsCount - sr1.count
|
||||
sq.makePending(sr1)
|
||||
return sr1
|
||||
else:
|
||||
if maxSlot < sq.inpSlot:
|
||||
@ -433,8 +581,9 @@ proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
|
||||
|
||||
let lastSlot = min(maxslot, sq.lastSlot)
|
||||
let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
|
||||
let sr = SyncRequest.init(T, sq.inpSlot, count, item)
|
||||
var sr = SyncRequest.init(T, sq.inpSlot, count, item)
|
||||
sq.inpSlot = sq.inpSlot + count
|
||||
sq.makePending(sr)
|
||||
return sr
|
||||
|
||||
proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} =
|
||||
@ -442,7 +591,7 @@ proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} =
|
||||
if sq.inpSlot > sq.lastSlot:
|
||||
result = sq.debtsCount
|
||||
else:
|
||||
result = sq.lastSlot - sq.inpSlot + 1'u64 + sq.debtsCount
|
||||
result = sq.lastSlot - sq.inpSlot + 1'u64 - sq.debtsCount
|
||||
|
||||
proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} =
|
||||
## Returns total number of slots in queue ``sq``.
|
||||
@ -477,18 +626,17 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||
sleepTime = (int(SLOTS_PER_EPOCH) *
|
||||
int(SECONDS_PER_SLOT)).seconds,
|
||||
chunkSize = uint64(SLOTS_PER_EPOCH),
|
||||
toleranceValue = uint64(1)
|
||||
toleranceValue = uint64(1),
|
||||
maxRecurringFailures = 3
|
||||
): SyncManager[A, B] =
|
||||
|
||||
proc syncUpdate(req: SyncRequest[A],
|
||||
list: openarray[SignedBeaconBlock]): bool {.gcsafe.} =
|
||||
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.gcsafe.} =
|
||||
let peer = req.item
|
||||
if updateLocalBlocksCb(list):
|
||||
let res = updateLocalBlocksCb(list)
|
||||
if res.isOk:
|
||||
peer.updateScore(PeerScoreGoodBlocks)
|
||||
result = true
|
||||
else:
|
||||
# Scoring will be done inside of SyncQueue.
|
||||
result = false
|
||||
return res
|
||||
|
||||
let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(),
|
||||
chunkSize, syncUpdate, 2)
|
||||
@ -500,6 +648,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||
syncUpdate: syncUpdate,
|
||||
getLocalWallSlot: getLocalWallSlotCb,
|
||||
maxHeadAge: maxHeadAge,
|
||||
maxRecurringFailures: maxRecurringFailures,
|
||||
sleepTime: sleepTime,
|
||||
chunkSize: chunkSize,
|
||||
queue: queue
|
||||
@ -570,6 +719,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
||||
local_head_slot = headSlot, peer = peer,
|
||||
tolerance_value = man.toleranceValue,
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
# let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
|
||||
# man.failures.add(failure)
|
||||
break
|
||||
|
||||
if peerAge >= man.maxStatusAge:
|
||||
@ -583,16 +734,20 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
||||
debug "Failed to get remote peer's status, exiting", peer = peer,
|
||||
peer_score = peer.getScore(), peer_head_slot = peerSlot,
|
||||
topics = "syncman"
|
||||
# let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
|
||||
# man.failures.add(failure)
|
||||
break
|
||||
|
||||
let newPeerSlot = peer.getHeadSlot()
|
||||
if peerSlot >= newPeerSlot:
|
||||
peer.updateScore(PeerScoreStaleStatus)
|
||||
debug "Peer's status information is stale, exiting",
|
||||
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
|
||||
local_head_slot = headSlot,
|
||||
remote_new_head_slot = newPeerSlot,
|
||||
peer = peer, peer_score = peer.getScore(), topics = "syncman"
|
||||
peer.updateScore(PeerScoreStaleStatus)
|
||||
# let failure = SyncFailure.init(SyncFailureKind.StatusStale, peer)
|
||||
# man.failures.add(failure)
|
||||
break
|
||||
|
||||
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
||||
@ -621,6 +776,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
||||
# are available in PeerPool. We going to wait for RESP_TIMEOUT time,
|
||||
# so all pending requests should be finished at this moment.
|
||||
await sleepAsync(RESP_TIMEOUT)
|
||||
# let failure = SyncFailure.init(SyncFailureKind.EmptyProblem, peer)
|
||||
# man.failures.add(failure)
|
||||
break
|
||||
|
||||
debug "Creating new request for peer", wall_clock_slot = wallSlot,
|
||||
@ -639,6 +796,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
||||
peer = peer, peer_score = peer.getScore(), topics = "syncman"
|
||||
# Scoring will happen in `syncUpdate`.
|
||||
await man.queue.push(req, data)
|
||||
# Cleaning up failures.
|
||||
man.failures.setLen(0)
|
||||
else:
|
||||
peer.updateScore(PeerScoreNoBlocks)
|
||||
man.queue.push(req)
|
||||
@ -646,6 +805,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, peer = peer,
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
# let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer)
|
||||
# man.failures.add(failure)
|
||||
break
|
||||
|
||||
result = peer
|
||||
@ -786,6 +947,11 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
|
||||
|
||||
pending = temp
|
||||
|
||||
if len(man.failures) > man.maxRecurringFailures and (workersCount() > 1):
|
||||
debug "Number of recurring failures exceeds limit, reseting queue",
|
||||
workers_count = $workers_count(), rec_failures = $len(man.failures)
|
||||
await man.queue.resetWait(none[Slot]())
|
||||
|
||||
debug "Synchronization loop end tick", wall_head_slot = wallSlot,
|
||||
local_head_slot = headSlot, workers_count = workersCount(),
|
||||
waiting_for_new_peer = $not(isNil(acquireFut)),
|
||||
|
@ -24,7 +24,8 @@ suite "SyncManager test suite":
|
||||
curslot = curslot + 1'u64
|
||||
|
||||
proc syncUpdate(req: SyncRequest[SomeTPeer],
|
||||
data: openarray[SignedBeaconBlock]): bool {.gcsafe.} =
|
||||
data: openarray[SignedBeaconBlock]): Result[void, BlockError] {.
|
||||
gcsafe.} =
|
||||
discard
|
||||
|
||||
test "[SyncQueue] Start and finish slots equal":
|
||||
@ -198,14 +199,14 @@ suite "SyncManager test suite":
|
||||
var counter = 0
|
||||
|
||||
proc syncReceiver(req: SyncRequest[SomeTPeer],
|
||||
list: openarray[SignedBeaconBlock]): bool {.gcsafe.} =
|
||||
result = true
|
||||
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.
|
||||
gcsafe.} =
|
||||
for item in list:
|
||||
if item.message.slot == Slot(counter):
|
||||
inc(counter)
|
||||
else:
|
||||
result = false
|
||||
break
|
||||
return err(Invalid)
|
||||
return ok()
|
||||
|
||||
var chain = createChain(Slot(0), Slot(2))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64,
|
||||
@ -241,14 +242,14 @@ suite "SyncManager test suite":
|
||||
var counter = 5
|
||||
|
||||
proc syncReceiver(req: SyncRequest[SomeTPeer],
|
||||
list: openarray[SignedBeaconBlock]): bool {.gcsafe.} =
|
||||
result = true
|
||||
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.
|
||||
gcsafe.} =
|
||||
for item in list:
|
||||
if item.message.slot == Slot(counter):
|
||||
inc(counter)
|
||||
else:
|
||||
result = false
|
||||
break
|
||||
return err(Invalid)
|
||||
return ok()
|
||||
|
||||
var chain = createChain(Slot(5), Slot(11))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64,
|
||||
@ -286,6 +287,75 @@ suite "SyncManager test suite":
|
||||
|
||||
check waitFor(test())
|
||||
|
||||
test "[SyncQueue] Async pending and resetWait() test":
|
||||
proc test(): Future[bool] {.async.} =
|
||||
var counter = 5
|
||||
|
||||
proc syncReceiver(req: SyncRequest[SomeTPeer],
|
||||
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.
|
||||
gcsafe.} =
|
||||
for item in list:
|
||||
if item.message.slot == Slot(counter):
|
||||
inc(counter)
|
||||
else:
|
||||
return err(Invalid)
|
||||
return ok()
|
||||
|
||||
var chain = createChain(Slot(5), Slot(18))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64,
|
||||
syncReceiver, 2)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
let p3 = SomeTPeer()
|
||||
let p4 = SomeTPeer()
|
||||
let p5 = SomeTPeer()
|
||||
let p6 = SomeTPeer()
|
||||
let p7 = SomeTPeer()
|
||||
|
||||
var r21 = queue.pop(Slot(20), p1)
|
||||
var r22 = queue.pop(Slot(20), p2)
|
||||
var r23 = queue.pop(Slot(20), p3)
|
||||
var r24 = queue.pop(Slot(20), p4)
|
||||
var r25 = queue.pop(Slot(20), p5)
|
||||
var r26 = queue.pop(Slot(20), p6)
|
||||
var r27 = queue.pop(Slot(20), p7)
|
||||
|
||||
var f21 = queue.push(r21, @[chain[0], chain[1]])
|
||||
# This should be silently ignored, because r21 is already processed.
|
||||
var e21 = queue.push(r21, @[chain[0], chain[1]])
|
||||
queue.push(r22)
|
||||
queue.push(r23)
|
||||
var f26 = queue.push(r26, @[chain[10], chain[11]])
|
||||
var f27 = queue.push(r27, @[chain[12], chain[13]])
|
||||
|
||||
doAssert(f21.finished == true and f21.failed == false)
|
||||
doAssert(e21.finished == true and e21.failed == false)
|
||||
doAssert(f26.finished == false)
|
||||
doAssert(f27.finished == false)
|
||||
await queue.resetWait(none[Slot]())
|
||||
doAssert(f26.finished == true and f26.failed == false)
|
||||
doAssert(f27.finished == true and f27.failed == false)
|
||||
doAssert(queue.inpSlot == Slot(7) and queue.outSlot == Slot(7))
|
||||
doAssert(counter == 7)
|
||||
doAssert(len(queue) == 12)
|
||||
# This should be silently ignored, because r21 is already processed.
|
||||
var o21 = queue.push(r21, @[chain[0], chain[1]])
|
||||
var o22 = queue.push(r22, @[chain[2], chain[3]])
|
||||
queue.push(r23)
|
||||
queue.push(r24)
|
||||
var o25 = queue.push(r25, @[chain[8], chain[9]])
|
||||
var o26 = queue.push(r26, @[chain[10], chain[11]])
|
||||
var o27 = queue.push(r27, @[chain[12], chain[13]])
|
||||
doAssert(o21.finished == true and o21.failed == false)
|
||||
doAssert(o22.finished == true and o22.failed == false)
|
||||
doAssert(o25.finished == true and o25.failed == false)
|
||||
doAssert(o26.finished == true and o26.failed == false)
|
||||
doAssert(o27.finished == true and o27.failed == false)
|
||||
doAssert(len(queue) == 12)
|
||||
result = true
|
||||
|
||||
check waitFor(test())
|
||||
|
||||
test "[SyncQueue] hasEndGap() test":
|
||||
let chain1 = createChain(Slot(1), Slot(1))
|
||||
let chain2 = newSeq[SignedBeaconBlock]()
|
||||
|
Loading…
x
Reference in New Issue
Block a user