avoid re-requesting finalized blocks during sync (#3461)
When a `beaconBlocksByRange` response advances the `safeSlot`, but later has errors, the sync queue keeps repeating that same request until it is fulfilled without errors. Data up through `safeSlot` is considered to be immutable, i.e., finalized, so re-requesting that data is not useful. By advancing the sync progress in that scenario, those redundant query portions can be avoided. Note, the finalized block _itself_ is always requested, even in the initial request. This behaviour is kept same.
This commit is contained in:
parent
725692544e
commit
6d1d31dd01
|
@ -402,6 +402,8 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
|
|||
+ Process all unviable blocks OK
|
||||
+ [SyncQueue#Backward] Async unordered push test OK
|
||||
+ [SyncQueue#Backward] Async unordered push with rewind test OK
|
||||
+ [SyncQueue#Backward] Good response with missing values towards end OK
|
||||
+ [SyncQueue#Backward] Handle out-of-band sync progress advancement OK
|
||||
+ [SyncQueue#Backward] Pass through established limits test OK
|
||||
+ [SyncQueue#Backward] Smoke test OK
|
||||
+ [SyncQueue#Backward] Start and finish slots equal OK
|
||||
|
@ -409,6 +411,8 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
|
|||
+ [SyncQueue#Backward] getRewindPoint() test OK
|
||||
+ [SyncQueue#Forward] Async unordered push test OK
|
||||
+ [SyncQueue#Forward] Async unordered push with rewind test OK
|
||||
+ [SyncQueue#Forward] Good response with missing values towards end OK
|
||||
+ [SyncQueue#Forward] Handle out-of-band sync progress advancement OK
|
||||
+ [SyncQueue#Forward] Pass through established limits test OK
|
||||
+ [SyncQueue#Forward] Smoke test OK
|
||||
+ [SyncQueue#Forward] Start and finish slots equal OK
|
||||
|
@ -419,7 +423,7 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
|
|||
+ [SyncQueue] getLastNonEmptySlot() test OK
|
||||
+ [SyncQueue] hasEndGap() test OK
|
||||
```
|
||||
OK: 19/19 Fail: 0/19 Skip: 0/19
|
||||
OK: 23/23 Fail: 0/23 Skip: 0/23
|
||||
## Zero signature sanity checks
|
||||
```diff
|
||||
+ SSZ serialization roundtrip of SignedBeaconBlockHeader OK
|
||||
|
@ -521,4 +525,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
|
|||
OK: 1/1 Fail: 0/1 Skip: 0/1
|
||||
|
||||
---TOTAL---
|
||||
OK: 285/291 Fail: 0/291 Skip: 6/291
|
||||
OK: 289/295 Fail: 0/295 Skip: 6/295
|
||||
|
|
|
@ -511,9 +511,38 @@ proc advanceInput[T](sq: SyncQueue[T], number: uint64) =
|
|||
proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool =
|
||||
case sq.kind
|
||||
of SyncQueueKind.Forward:
|
||||
(sq.queueSize > 0) and (sr.slot != sq.outSlot)
|
||||
(sq.queueSize > 0) and (sr.slot > sq.outSlot)
|
||||
of SyncQueueKind.Backward:
|
||||
(sq.queueSize > 0) and (sr.slot + sr.count - 1'u64 != sq.outSlot)
|
||||
(sq.queueSize > 0) and (sr.lastSlot < sq.outSlot)
|
||||
|
||||
func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
|
||||
## Compute the number of slots covered by a given `SyncRequest` that are
|
||||
## already known and, hence, no longer relevant for sync progression.
|
||||
let
|
||||
outSlot = sq.outSlot
|
||||
lowSlot = sr.slot
|
||||
highSlot = sr.lastSlot
|
||||
case sq.kind
|
||||
of SyncQueueKind.Forward:
|
||||
if outSlot > highSlot:
|
||||
# Entire request is no longer relevant.
|
||||
sr.count
|
||||
elif outSlot > lowSlot:
|
||||
# Request is only partially relevant.
|
||||
outSlot - lowSlot
|
||||
else:
|
||||
# Entire request is still relevant.
|
||||
0
|
||||
of SyncQueueKind.Backward:
|
||||
if lowSlot > outSlot:
|
||||
# Entire request is no longer relevant.
|
||||
sr.count
|
||||
elif highSlot > outSlot:
|
||||
# Request is only partially relevant.
|
||||
highSlot - outSlot
|
||||
else:
|
||||
# Entire request is still relevant.
|
||||
0
|
||||
|
||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
data: seq[ref ForkedSignedBeaconBlock],
|
||||
|
@ -552,14 +581,14 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
|||
case sq.kind
|
||||
of SyncQueueKind.Forward:
|
||||
let minSlot = sq.readyQueue[0].request.slot
|
||||
if sq.outSlot != minSlot:
|
||||
if sq.outSlot < minSlot:
|
||||
none[SyncResult[T]]()
|
||||
else:
|
||||
some(sq.readyQueue.pop())
|
||||
of SyncQueueKind.Backward:
|
||||
let maxSlot = sq.readyQueue[0].request.slot +
|
||||
(sq.readyQueue[0].request.count - 1'u64)
|
||||
if sq.outSlot != maxSlot:
|
||||
if sq.outSlot > maxSlot:
|
||||
none[SyncResult[T]]()
|
||||
else:
|
||||
some(sq.readyQueue.pop())
|
||||
|
@ -625,14 +654,16 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
|||
let retryRequest =
|
||||
hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome()
|
||||
if not retryRequest:
|
||||
sq.advanceOutput(item.request.count)
|
||||
let numSlotsAdvanced = item.request.count - sq.numAlreadyKnownSlots(sr)
|
||||
sq.advanceOutput(numSlotsAdvanced)
|
||||
|
||||
if hasOkBlock:
|
||||
# If there no error and response was not empty we should reward peer
|
||||
# with some bonus score - not for duplicate blocks though.
|
||||
item.request.item.updateScore(PeerScoreGoodBlocks)
|
||||
|
||||
sq.wakeupWaiters()
|
||||
if numSlotsAdvanced > 0:
|
||||
sq.wakeupWaiters()
|
||||
else:
|
||||
debug "Block pool rejected peer's response", request = item.request,
|
||||
blocks_map = getShortMap(item.request, item.data),
|
||||
|
@ -721,9 +752,74 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
|
|||
sq.pending.del(sr.index)
|
||||
sq.toDebtsQueue(sr)
|
||||
|
||||
proc handlePotentialSafeSlotAdvancement[T](sq: SyncQueue[T]) =
|
||||
# It may happen that sync progress advanced to a newer `safeSlot`, either
|
||||
# by a response that started with good values and only had errors late, or
|
||||
# through an out-of-band mechanism, e.g., VC / REST.
|
||||
# If that happens, advance to the new `safeSlot` to avoid repeating requests
|
||||
# for data that is considered immutable and no longer relevant.
|
||||
let
|
||||
safeSlot = sq.getSafeSlot()
|
||||
numSlotsAdvanced: uint64 =
|
||||
case sq.kind
|
||||
of SyncQueueKind.Forward:
|
||||
if safeSlot > sq.outSlot:
|
||||
safeSlot - sq.outSlot
|
||||
else:
|
||||
0
|
||||
of SyncQueueKind.Backward:
|
||||
if sq.outSlot > safeSlot:
|
||||
sq.outSlot - safeSlot
|
||||
else:
|
||||
0
|
||||
if numSlotsAdvanced != 0:
|
||||
debug "Sync progress advanced out-of-band",
|
||||
slot_before = sq.outSlot, slot_after = safeSlot
|
||||
sq.advanceOutput(numSlotsAdvanced)
|
||||
sq.wakeupWaiters()
|
||||
|
||||
func updateRequestForNewSafeSlot[T](sq: SyncQueue[T], sr: var SyncRequest[T]) =
|
||||
# Requests may have originated before the latest `safeSlot` advancement.
|
||||
# Update it to not request any data prior to `safeSlot`.
|
||||
let
|
||||
outSlot = sq.outSlot
|
||||
lowSlot = sr.slot
|
||||
highSlot = sr.lastSlot
|
||||
case sq.kind
|
||||
of SyncQueueKind.Forward:
|
||||
if outSlot <= lowSlot:
|
||||
# Entire request is still relevant.
|
||||
discard
|
||||
elif outSlot <= highSlot:
|
||||
# Request is only partially relevant.
|
||||
let
|
||||
numSlotsDone = outSlot - lowSlot
|
||||
numStepsDone = (numSlotsDone + sr.step - 1) div sr.step
|
||||
sr.slot += numStepsDone * sr.step
|
||||
sr.count -= numStepsDone
|
||||
else:
|
||||
# Entire request is no longer relevant.
|
||||
sr.step = 0
|
||||
sr.count = 0
|
||||
of SyncQueueKind.Backward:
|
||||
if outSlot >= highSlot:
|
||||
# Entire request is still relevant.
|
||||
discard
|
||||
elif outSlot >= lowSlot:
|
||||
# Request is only partially relevant.
|
||||
let
|
||||
numSlotsDone = highSlot - outSlot
|
||||
numStepsDone = (numSlotsDone + sr.step - 1) div sr.step
|
||||
sr.count -= numStepsDone
|
||||
else:
|
||||
# Entire request is no longer relevant.
|
||||
sr.step = 0
|
||||
sr.count = 0
|
||||
|
||||
proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
|
||||
## Create new request according to current SyncQueue parameters.
|
||||
if len(sq.debtsQueue) > 0:
|
||||
sq.handlePotentialSafeSlotAdvancement()
|
||||
while len(sq.debtsQueue) > 0:
|
||||
if maxSlot < sq.debtsQueue[0].slot:
|
||||
# Peer's latest slot is less than starting request's slot.
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
|
@ -732,44 +828,47 @@ proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
|
|||
return SyncRequest.empty(sq.kind, T)
|
||||
var sr = sq.debtsQueue.pop()
|
||||
sq.debtsCount = sq.debtsCount - sr.count
|
||||
sq.updateRequestForNewSafeSlot(sr)
|
||||
if sr.isEmpty:
|
||||
continue
|
||||
sr.setItem(item)
|
||||
sq.makePending(sr)
|
||||
return sr
|
||||
|
||||
case sq.kind
|
||||
of SyncQueueKind.Forward:
|
||||
if maxSlot < sq.inpSlot:
|
||||
# Peer's latest slot is less than queue's input slot.
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
if sq.inpSlot > sq.finalSlot:
|
||||
# Queue's input slot is bigger than queue's final slot.
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
let lastSlot = min(maxslot, sq.finalSlot)
|
||||
let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
|
||||
var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item)
|
||||
sq.advanceInput(count)
|
||||
sq.makePending(sr)
|
||||
sr
|
||||
of SyncQueueKind.Backward:
|
||||
if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64:
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
if sq.inpSlot < sq.finalSlot:
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
let (slot, count) =
|
||||
block:
|
||||
let baseSlot = sq.inpSlot + 1'u64
|
||||
if baseSlot - sq.finalSlot < sq.chunkSize:
|
||||
let count = uint64(baseSlot - sq.finalSlot)
|
||||
(baseSlot - count, count)
|
||||
else:
|
||||
(baseSlot - sq.chunkSize, sq.chunkSize)
|
||||
if (maxSlot + 1'u64) < slot + count:
|
||||
# Peer's latest slot is less than queue's input slot.
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
var sr = SyncRequest.init(sq.kind, slot, count, item)
|
||||
sq.advanceInput(count)
|
||||
sq.makePending(sr)
|
||||
sr
|
||||
else:
|
||||
case sq.kind
|
||||
of SyncQueueKind.Forward:
|
||||
if maxSlot < sq.inpSlot:
|
||||
# Peer's latest slot is less than queue's input slot.
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
if sq.inpSlot > sq.finalSlot:
|
||||
# Queue's input slot is bigger than queue's final slot.
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
let lastSlot = min(maxslot, sq.finalSlot)
|
||||
let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
|
||||
var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item)
|
||||
sq.advanceInput(count)
|
||||
sq.makePending(sr)
|
||||
sr
|
||||
of SyncQueueKind.Backward:
|
||||
if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64:
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
if sq.inpSlot < sq.finalSlot:
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
let (slot, count) =
|
||||
block:
|
||||
let baseSlot = sq.inpSlot + 1'u64
|
||||
if baseSlot - sq.finalSlot < sq.chunkSize:
|
||||
let count = uint64(baseSlot - sq.finalSlot)
|
||||
(baseSlot - count, count)
|
||||
else:
|
||||
(baseSlot - sq.chunkSize, sq.chunkSize)
|
||||
if (maxSlot + 1'u64) < slot + count:
|
||||
# Peer's latest slot is less than queue's input slot.
|
||||
return SyncRequest.empty(sq.kind, T)
|
||||
var sr = SyncRequest.init(sq.kind, slot, count, item)
|
||||
sq.advanceInput(count)
|
||||
sq.makePending(sr)
|
||||
sr
|
||||
|
||||
proc debtLen*[T](sq: SyncQueue[T]): uint64 =
|
||||
sq.debtsCount
|
||||
|
|
|
@ -1,3 +1,10 @@
|
|||
# beacon_chain
|
||||
# Copyright (c) 2020-2022 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
{.used.}
|
||||
|
||||
import std/strutils
|
||||
|
@ -20,11 +27,10 @@ template shortLog(peer: SomeTPeer): string =
|
|||
proc updateScore(peer: SomeTPeer, score: int) =
|
||||
discard
|
||||
|
||||
proc getFirstSlotAtFinalizedEpoch(): Slot =
|
||||
Slot(0)
|
||||
|
||||
proc getSafeSlot(): Slot =
|
||||
Slot(1024)
|
||||
func getStaticSlotCb(slot: Slot): GetSlotCallback =
|
||||
proc getSlot(): Slot =
|
||||
slot
|
||||
getSlot
|
||||
|
||||
type
|
||||
BlockEntry = object
|
||||
|
@ -71,7 +77,7 @@ suite "SyncManager test suite":
|
|||
|
||||
var queue = SyncQueue.init(SomeTPeer, kind,
|
||||
Slot(0), Slot(0), 1'u64,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq))
|
||||
getStaticSlotCb(Slot(0)), collector(aq))
|
||||
check:
|
||||
len(queue) == 1
|
||||
pendingLen(queue) == 0
|
||||
|
@ -166,7 +172,7 @@ suite "SyncManager test suite":
|
|||
let aq = newAsyncQueue[BlockEntry]()
|
||||
var queue = SyncQueue.init(SomeTPeer, kind,
|
||||
item[0], item[1], item[2],
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq))
|
||||
getStaticSlotCb(item[0]), collector(aq))
|
||||
check:
|
||||
len(queue) == item[4]
|
||||
pendingLen(queue) == item[5]
|
||||
|
@ -191,11 +197,11 @@ suite "SyncManager test suite":
|
|||
of SyncQueueKind.Forward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
Slot(0), Slot(1), 1'u64,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq))
|
||||
getStaticSlotCb(Slot(0)), collector(aq))
|
||||
of SyncQueueKind.Backward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
|
||||
Slot(1), Slot(0), 1'u64,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq))
|
||||
getStaticSlotCb(Slot(1)), collector(aq))
|
||||
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
|
@ -289,11 +295,11 @@ suite "SyncManager test suite":
|
|||
of SyncQueueKind.Forward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
start, finish, chunkSize,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq))
|
||||
getStaticSlotCb(start), collector(aq))
|
||||
of SyncQueueKind.Backward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
|
||||
finish, start, chunkSize,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq))
|
||||
getStaticSlotCb(finish), collector(aq))
|
||||
chain = createChain(start, finish)
|
||||
validatorFut =
|
||||
case kkind
|
||||
|
@ -359,12 +365,12 @@ suite "SyncManager test suite":
|
|||
of SyncQueueKind.Forward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
startSlot, finishSlot, chunkSize,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq),
|
||||
getStaticSlotCb(startSlot), collector(aq),
|
||||
queueSize)
|
||||
of SyncQueueKind.Backward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
|
||||
finishSlot, startSlot, chunkSize,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq),
|
||||
getStaticSlotCb(finishSlot), collector(aq),
|
||||
queueSize)
|
||||
validatorFut =
|
||||
case kkind
|
||||
|
@ -417,6 +423,210 @@ suite "SyncManager test suite":
|
|||
|
||||
check waitFor(runTest()) == true
|
||||
|
||||
template partialGoodResponseTest(kkind: SyncQueueKind, start, finish: Slot,
|
||||
chunkSize: uint64) =
|
||||
let aq = newAsyncQueue[BlockEntry]()
|
||||
|
||||
var counter =
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
int(start)
|
||||
of SyncQueueKind.Backward:
|
||||
int(finish)
|
||||
|
||||
proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
|
||||
while true:
|
||||
let sblock = await aq.popFirst()
|
||||
if sblock.blck.slot == Slot(counter):
|
||||
dec(counter)
|
||||
sblock.done()
|
||||
elif sblock.blck.slot < Slot(counter):
|
||||
# There was a gap, report missing parent
|
||||
sblock.fail(BlockError.MissingParent)
|
||||
else:
|
||||
sblock.fail(BlockError.Duplicate)
|
||||
|
||||
proc getBackwardSafeSlotCb(): Slot =
|
||||
min((Slot(counter).epoch + 1).start_slot, finish)
|
||||
|
||||
proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
|
||||
while true:
|
||||
let sblock = await aq.popFirst()
|
||||
if sblock.blck.slot == Slot(counter):
|
||||
inc(counter)
|
||||
sblock.done()
|
||||
elif sblock.blck.slot > Slot(counter):
|
||||
# There was a gap, report missing parent
|
||||
sblock.fail(BlockError.MissingParent)
|
||||
else:
|
||||
sblock.fail(BlockError.Duplicate)
|
||||
|
||||
proc getFowardSafeSlotCb(): Slot =
|
||||
max(Slot(max(counter, 1) - 1).epoch.start_slot, start)
|
||||
|
||||
var
|
||||
queue =
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
start, finish, chunkSize,
|
||||
getFowardSafeSlotCb, collector(aq))
|
||||
of SyncQueueKind.Backward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
|
||||
finish, start, chunkSize,
|
||||
getBackwardSafeSlotCb, collector(aq))
|
||||
chain = createChain(start, finish)
|
||||
validatorFut =
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
forwardValidator(aq)
|
||||
of SyncQueueKind.Backward:
|
||||
backwardValidator(aq)
|
||||
|
||||
let p1 = SomeTPeer()
|
||||
|
||||
proc runTest() {.async.} =
|
||||
while true:
|
||||
var request = queue.pop(finish, p1)
|
||||
if request.isEmpty():
|
||||
break
|
||||
var response = getSlice(chain, start, request)
|
||||
if response.len >= (SLOTS_PER_EPOCH + 3).int:
|
||||
# Create gap close to end of response, to simulate behaviour where
|
||||
# the remote peer is sending valid data but does not have it fully
|
||||
# available (e.g., still doing backfill after checkpoint sync)
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
response.delete(response.len - 2)
|
||||
of SyncQueueKind.Backward:
|
||||
response.delete(1)
|
||||
if response.len >= 1:
|
||||
# Ensure requested values are past `safeSlot`
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
check response[0][].slot >= getFowardSafeSlotCb()
|
||||
else:
|
||||
check response[^1][].slot <= getBackwardSafeSlotCb()
|
||||
await queue.push(request, response)
|
||||
await validatorFut.cancelAndWait()
|
||||
|
||||
waitFor runTest()
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
check (counter - 1) == int(finish)
|
||||
of SyncQueueKind.Backward:
|
||||
check (counter + 1) == int(start)
|
||||
|
||||
template outOfBandAdvancementTest(kkind: SyncQueueKind, start, finish: Slot,
|
||||
chunkSize: uint64) =
|
||||
let aq = newAsyncQueue[BlockEntry]()
|
||||
|
||||
var counter =
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
int(start)
|
||||
of SyncQueueKind.Backward:
|
||||
int(finish)
|
||||
|
||||
proc failingValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
|
||||
while true:
|
||||
let sblock = await aq.popFirst()
|
||||
sblock.fail(BlockError.Invalid)
|
||||
|
||||
proc getBackwardSafeSlotCb(): Slot =
|
||||
let progress = (uint64(int(finish) - counter) div chunkSize) * chunkSize
|
||||
finish - progress
|
||||
|
||||
proc getFowardSafeSlotCb(): Slot =
|
||||
let progress = (uint64(counter - int(start)) div chunkSize) * chunkSize
|
||||
start + progress
|
||||
|
||||
template advanceSafeSlot() =
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
counter += int(chunkSize)
|
||||
if counter > int(finish) + 1:
|
||||
counter = int(finish) + 1
|
||||
break
|
||||
of SyncQueueKind.Backward:
|
||||
counter -= int(chunkSize)
|
||||
if counter < int(start) - 1:
|
||||
counter = int(start) - 1
|
||||
break
|
||||
|
||||
var
|
||||
queue =
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
start, finish, chunkSize,
|
||||
getFowardSafeSlotCb, collector(aq))
|
||||
of SyncQueueKind.Backward:
|
||||
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
|
||||
finish, start, chunkSize,
|
||||
getBackwardSafeSlotCb, collector(aq))
|
||||
chain = createChain(start, finish)
|
||||
validatorFut = failingValidator(aq)
|
||||
|
||||
let
|
||||
p1 = SomeTPeer()
|
||||
p2 = SomeTPeer()
|
||||
|
||||
proc runTest() {.async.} =
|
||||
while true:
|
||||
var
|
||||
request1 = queue.pop(finish, p1)
|
||||
request2 = queue.pop(finish, p2)
|
||||
if request1.isEmpty():
|
||||
break
|
||||
|
||||
# Simulate failing request 2.
|
||||
queue.push(request2)
|
||||
check debtLen(queue) == request2.count
|
||||
|
||||
# Advance `safeSlot` out of band.
|
||||
advanceSafeSlot()
|
||||
|
||||
# Handle request 1. Should be re-enqueued as it simulates `Invalid`.
|
||||
let response1 = getSlice(chain, start, request1)
|
||||
await queue.push(request1, response1)
|
||||
check debtLen(queue) == request2.count + request1.count
|
||||
|
||||
# Request 1 should be discarded as it is no longer relevant.
|
||||
# Request 2 should be re-issued.
|
||||
var request3 = queue.pop(finish, p1)
|
||||
check:
|
||||
request3 == request2
|
||||
debtLen(queue) == 0
|
||||
|
||||
# Handle request 3. Should be re-enqueued as it simulates `Invalid`.
|
||||
let response3 = getSlice(chain, start, request3)
|
||||
await queue.push(request3, response3)
|
||||
check debtLen(queue) == request3.count
|
||||
|
||||
# Request 2 should be re-issued.
|
||||
var request4 = queue.pop(finish, p1)
|
||||
check:
|
||||
request4 == request2
|
||||
debtLen(queue) == 0
|
||||
|
||||
# Advance `safeSlot` out of band.
|
||||
advanceSafeSlot()
|
||||
|
||||
# Handle request 4. Should be re-enqueued as it simulates `Invalid`.
|
||||
let response4 = getSlice(chain, start, request4)
|
||||
await queue.push(request4, response4)
|
||||
check debtLen(queue) == request4.count
|
||||
|
||||
await validatorFut.cancelAndWait()
|
||||
|
||||
waitFor runTest()
|
||||
case kkind
|
||||
of SyncQueueKind.Forward:
|
||||
check (counter - 1) == int(finish)
|
||||
of SyncQueueKind.Backward:
|
||||
check (counter + 1) == int(start)
|
||||
|
||||
for k in {SyncQueueKind.Forward, SyncQueueKind.Backward}:
|
||||
let prefix = "[SyncQueue#" & $k & "] "
|
||||
|
||||
|
@ -445,6 +655,20 @@ suite "SyncManager test suite":
|
|||
for item in UnorderedTests:
|
||||
unorderedAsyncTest(k, item)
|
||||
|
||||
test prefix & "Good response with missing values towards end":
|
||||
const PartialGoodResponseTests = [
|
||||
(Slot(0), Slot(200), (SLOTS_PER_EPOCH + 3).uint64)
|
||||
]
|
||||
for item in PartialGoodResponseTests:
|
||||
partialGoodResponseTest(k, item[0], item[1], item[2])
|
||||
|
||||
test prefix & "Handle out-of-band sync progress advancement":
|
||||
const OutOfBandAdvancementTests = [
|
||||
(Slot(0), Slot(200), SLOTS_PER_EPOCH.uint64)
|
||||
]
|
||||
for item in OutOfBandAdvancementTests:
|
||||
outOfBandAdvancementTest(k, item[0], item[1], item[2])
|
||||
|
||||
test "[SyncQueue#Forward] Async unordered push with rewind test":
|
||||
let
|
||||
aq = newAsyncQueue[BlockEntry]()
|
||||
|
@ -473,7 +697,7 @@ suite "SyncManager test suite":
|
|||
chain = createChain(startSlot, finishSlot)
|
||||
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
startSlot, finishSlot, chunkSize,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq),
|
||||
getStaticSlotCb(startSlot), collector(aq),
|
||||
queueSize)
|
||||
validatorFut = forwardValidator(aq)
|
||||
|
||||
|
@ -581,7 +805,7 @@ suite "SyncManager test suite":
|
|||
chain = createChain(startSlot, finishSlot)
|
||||
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
startSlot, finishSlot, chunkSize,
|
||||
getFirstSlotAtFinalizedEpoch, collector(aq),
|
||||
getStaticSlotCb(startSlot), collector(aq),
|
||||
queueSize)
|
||||
validatorFut = forwardValidator(aq)
|
||||
|
||||
|
@ -615,7 +839,7 @@ suite "SyncManager test suite":
|
|||
queueSize = 1
|
||||
|
||||
var
|
||||
lastSafeSlot: Slot
|
||||
lastSafeSlot = finishSlot
|
||||
counter = int(finishSlot)
|
||||
|
||||
proc getSafeSlot(): Slot =
|
||||
|
@ -843,7 +1067,7 @@ suite "SyncManager test suite":
|
|||
block:
|
||||
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
|
||||
1'u64, getFirstSlotAtFinalizedEpoch,
|
||||
1'u64, getStaticSlotCb(Slot(0)),
|
||||
collector(aq), 2)
|
||||
let finalizedSlot = start_slot(Epoch(0'u64))
|
||||
let startSlot = start_slot(Epoch(0'u64)) + 1'u64
|
||||
|
@ -855,7 +1079,7 @@ suite "SyncManager test suite":
|
|||
block:
|
||||
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
|
||||
1'u64, getFirstSlotAtFinalizedEpoch,
|
||||
1'u64, getStaticSlotCb(Slot(0)),
|
||||
collector(aq), 2)
|
||||
let finalizedSlot = start_slot(Epoch(1'u64))
|
||||
let startSlot = start_slot(Epoch(1'u64)) + 1'u64
|
||||
|
@ -867,7 +1091,7 @@ suite "SyncManager test suite":
|
|||
block:
|
||||
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
|
||||
1'u64, getFirstSlotAtFinalizedEpoch,
|
||||
1'u64, getStaticSlotCb(Slot(0)),
|
||||
collector(aq), 2)
|
||||
let finalizedSlot = start_slot(Epoch(0'u64))
|
||||
let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64)
|
||||
|
@ -885,7 +1109,7 @@ suite "SyncManager test suite":
|
|||
block:
|
||||
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
|
||||
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
|
||||
1'u64, getFirstSlotAtFinalizedEpoch,
|
||||
1'u64, getStaticSlotCb(Slot(0)),
|
||||
collector(aq), 2)
|
||||
let finalizedSlot = start_slot(Epoch(1'u64))
|
||||
let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64)
|
||||
|
@ -902,6 +1126,7 @@ suite "SyncManager test suite":
|
|||
test "[SyncQueue#Backward] getRewindPoint() test":
|
||||
let aq = newAsyncQueue[BlockEntry]()
|
||||
block:
|
||||
let getSafeSlot = getStaticSlotCb(Slot(1024))
|
||||
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
|
||||
Slot(1024), Slot(0),
|
||||
1'u64, getSafeSlot, collector(aq), 2)
|
||||
|
|
Loading…
Reference in New Issue