diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 592734982..ad3860d80 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -10,17 +10,23 @@ import std/[strutils, sequtils] import unittest2 -import chronos +import chronos, chronos/unittest2/asynctests +import ../beacon_chain/networking/peer_scores import ../beacon_chain/gossip_processing/block_processor, ../beacon_chain/sync/sync_manager, + ../beacon_chain/sync/sync_queue, ../beacon_chain/spec/forks type SomeTPeer = ref object + id: string score: int +func init(t: typedesc[SomeTPeer], id: string, score = 1000): SomeTPeer = + SomeTPeer(id: id, score: score) + func `$`(peer: SomeTPeer): string = - "SomeTPeer" + "peer#" & peer.id template shortLog(peer: SomeTPeer): string = $peer @@ -44,1025 +50,1272 @@ type blck*: ForkedSignedBeaconBlock resfut*: Future[Result[void, VerifierError]] -func collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = - # This sets up a fake block verifiation collector that simply puts the blocks - # in the async queue, similar to how BlockProcessor does it - as far as - # testing goes, this is risky because it might introduce differences between - # the BlockProcessor and this test - proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], - maybeFinalized: bool): - Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = - let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init() - try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut)) - except CatchableError as exc: raiseAssert exc.msg - return fut +func createChain(slots: Slice[Slot]): seq[ref ForkedSignedBeaconBlock] = + var res = newSeqOfCap[ref ForkedSignedBeaconBlock](len(slots)) + for slot in slots: + let item = newClone ForkedSignedBeaconBlock(kind: ConsensusFork.Deneb) + item[].denebData.message.slot = slot + res.add(item) + res - return verify +proc createChain(srange: SyncRange): seq[ref ForkedSignedBeaconBlock] = + createChain(srange.slot .. (srange.slot + srange.count - 1)) -suite "SyncManager test suite": - proc createChain(start, finish: Slot): seq[ref ForkedSignedBeaconBlock] = - doAssert(start <= finish) - let count = int(finish - start + 1'u64) - var res = newSeq[ref ForkedSignedBeaconBlock](count) - var curslot = start - for item in res.mitems(): - item = newClone ForkedSignedBeaconBlock(kind: ConsensusFork.Deneb) - item[].denebData.message.slot = curslot - curslot = curslot + 1'u64 - res - - func createBlobs( - blocks: var seq[ref ForkedSignedBeaconBlock], slots: seq[Slot] - ): seq[ref BlobSidecar] = - var res = newSeq[ref BlobSidecar](len(slots)) - for blck in blocks: - withBlck(blck[]): - when consensusFork >= ConsensusFork.Deneb: - template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments +func createBlobs( + blocks: var seq[ref ForkedSignedBeaconBlock], + slots: openArray[Slot] +): seq[ref BlobSidecar] = + var res = newSeq[ref BlobSidecar](len(slots)) + for blck in blocks: + withBlck(blck[]): + when consensusFork >= ConsensusFork.Deneb: + template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments + for i, slot in slots: + if slot == forkyBlck.message.slot: + doAssert kzgs.add default(KzgCommitment) + if kzgs.len > 0: + forkyBlck.root = hash_tree_root(forkyBlck.message) + var + kzg_proofs: KzgProofs + blobs: Blobs + for _ in kzgs: + doAssert kzg_proofs.add default(KzgProof) + doAssert blobs.add default(Blob) + let sidecars = forkyBlck.create_blob_sidecars(kzg_proofs, blobs) + var sidecarIdx = 0 for i, slot in slots: if slot == forkyBlck.message.slot: - doAssert kzgs.add default(KzgCommitment) - if kzgs.len > 0: - forkyBlck.root = hash_tree_root(forkyBlck.message) - var - kzg_proofs: KzgProofs - blobs: Blobs - for _ in kzgs: - doAssert kzg_proofs.add default(KzgProof) - doAssert blobs.add default(Blob) - let sidecars = forkyBlck.create_blob_sidecars(kzg_proofs, blobs) - var sidecarIdx = 0 - for i, slot in slots: - if slot == forkyBlck.message.slot: - res[i] = newClone sidecars[sidecarIdx] - inc sidecarIdx - res + res[i] = newClone sidecars[sidecarIdx] + inc sidecarIdx + res - proc getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot, - request: SyncRequest[SomeTPeer]): seq[ref ForkedSignedBeaconBlock] = - let - startIndex = int(request.slot - startSlot) - finishIndex = int(request.slot - startSlot) + int(request.count) - 1 - var res = newSeq[ref ForkedSignedBeaconBlock](1 + finishIndex - startIndex) - for i in 0.. 0, "Empty scenarios are not allowed") - var queue = SyncQueue.init(SomeTPeer, kind, - Slot(0), Slot(0), 1'u64, - getStaticSlotCb(Slot(0)), - collector(aq)) - check: - len(queue) == 1 - pendingLen(queue) == 0 - debtLen(queue) == 0 - var r11 = queue.pop(Slot(0), p1) - check: - len(queue) == 1 - pendingLen(queue) == 1 - debtLen(queue) == 0 - queue.push(r11) - check: - pendingLen(queue) == 1 - len(queue) == 1 - debtLen(queue) == 1 - var r11e = queue.pop(Slot(0), p1) - check: - len(queue) == 1 - pendingLen(queue) == 1 - debtLen(queue) == 0 - r11e == r11 - r11.item == p1 - r11e.item == r11.item - r11.slot == Slot(0) and r11.count == 1'u64 - - template passThroughLimitsTest(kind: SyncQueueKind) = - let - p1 = SomeTPeer() - p2 = SomeTPeer() - - let Checks = - case kind - of SyncQueueKind.Forward: - @[ - # Tests with zero start. - (Slot(0), Slot(0), 1'u64, (Slot(0), 1'u64), - 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), - (Slot(0), Slot(0), 16'u64, (Slot(0), 1'u64), - 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), - (Slot(0), Slot(1), 2'u64, (Slot(0), 2'u64), - 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), - (Slot(0), Slot(1), 16'u64, (Slot(0), 2'u64), - 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), - (Slot(0), Slot(15), 16'u64, (Slot(0), 16'u64), - 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), - (Slot(0), Slot(15), 32'u64, (Slot(0), 16'u64), - 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), - # Tests with non-zero start. - (Slot(1021), Slot(1021), 1'u64, (Slot(1021), 1'u64), - 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), - (Slot(1021), Slot(1021), 16'u64, (Slot(1021), 1'u64), - 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), - (Slot(1021), Slot(1022), 2'u64, (Slot(1021), 2'u64), - 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), - (Slot(1021), Slot(1022), 16'u64, (Slot(1021), 2'u64), - 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), - (Slot(1021), Slot(1036), 16'u64, (Slot(1021), 16'u64), - 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), - (Slot(1021), Slot(1036), 32'u64, (Slot(1021), 16'u64), - 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), - ] - of SyncQueueKind.Backward: - @[ - # Tests with zero finish. - (Slot(0), Slot(0), 1'u64, (Slot(0), 1'u64), - 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), - (Slot(0), Slot(0), 16'u64, (Slot(0), 1'u64), - 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), - (Slot(1), Slot(0), 2'u64, (Slot(0), 2'u64), - 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), - (Slot(1), Slot(0), 16'u64, (Slot(0), 2'u64), - 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), - (Slot(15), Slot(0), 16'u64, (Slot(0), 16'u64), - 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), - (Slot(15), Slot(0), 32'u64, (Slot(0), 16'u64), - 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), - # Tests with non-zero finish. - (Slot(1021), Slot(1021), 1'u64, (Slot(1021), 1'u64), - 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), - (Slot(1021), Slot(1021), 16'u64, (Slot(1021), 1'u64), - 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), - (Slot(1022), Slot(1021), 2'u64, (Slot(1021), 2'u64), - 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), - (Slot(1022), Slot(1021), 16'u64, (Slot(1021), 2'u64), - 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), - (Slot(1036), Slot(1021), 16'u64, (Slot(1021), 16'u64), - 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), - (Slot(1036), Slot(1021), 32'u64, (Slot(1021), 16'u64), - 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), - ] - - for item in Checks: - let aq = newAsyncQueue[BlockEntry]() - var queue = SyncQueue.init(SomeTPeer, kind, - item[0], item[1], item[2], - getStaticSlotCb(item[0]), - collector(aq)) - check: - len(queue) == item[4] - pendingLen(queue) == item[5] - debtLen(queue) == item[6] - var req1 = queue.pop(max(item[0], item[1]), p1) - check: - len(queue) == item[7] - pendingLen(queue) == item[8] - debtLen(queue) == item[9] - var req2 = queue.pop(max(item[0], item[1]), p2) - check: - req1.isEmpty() == false - req1.slot == item[3][0] - req1.count == item[3][1] - req2.isEmpty() == true - - template twoFullRequests(kkind: SyncQueueKind) = - let aq = newAsyncQueue[BlockEntry]() - var queue = - case kkind - of SyncQueueKind.Forward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - Slot(0), Slot(1), 1'u64, - getStaticSlotCb(Slot(0)), collector(aq)) - of SyncQueueKind.Backward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, - Slot(1), Slot(0), 1'u64, - getStaticSlotCb(Slot(1)), collector(aq)) - - let p1 = SomeTPeer() - let p2 = SomeTPeer() - check: - len(queue) == 2 - pendingLen(queue) == 0 - debtLen(queue) == 0 - var r21 = queue.pop(Slot(1), p1) - check: - len(queue) == 2 - pendingLen(queue) == 1 - debtLen(queue) == 0 - var r22 = queue.pop(Slot(1), p2) - check: - len(queue) == 2 - pendingLen(queue) == 2 - debtLen(queue) == 0 - queue.push(r22) - check: - len(queue) == 2 - pendingLen(queue) == 2 - debtLen(queue) == 1 - queue.push(r21) - check: - len(queue) == 2 - pendingLen(queue) == 2 - debtLen(queue) == 2 - var r21e = queue.pop(Slot(1), p1) - check: - len(queue) == 2 - pendingLen(queue) == 2 - debtLen(queue) == 1 - var r22e = queue.pop(Slot(1), p2) - check: - len(queue) == 2 - pendingLen(queue) == 2 - debtLen(queue) == 0 - r21 == r21e - r22 == r22e - r21.item == p1 - r22.item == p2 - r21.item == r21e.item - r22.item == r22e.item - case kkind - of SyncQueueKind.Forward: - check: - r21.slot == Slot(0) and r21.count == 1'u64 - r22.slot == Slot(1) and r22.count == 1'u64 - of SyncQueueKind.Backward: - check: - r21.slot == Slot(1) and r21.count == 1'u64 - r22.slot == Slot(0) and r22.count == 1'u64 + var + scenario = @sc + aq = newAsyncQueue[BlockEntry]() template done(b: BlockEntry) = b.resfut.complete(Result[void, VerifierError].ok()) template fail(b: BlockEntry, e: untyped) = b.resfut.complete(Result[void, VerifierError].err(e)) + template verifyBlock(i, e, s, v: untyped): untyped = + let item = await queue.popFirst() + if item.blck.slot == s: + if e.code.isSome(): + item.fail(e.code.get()) + else: + item.done() + else: + raiseAssert "Verifier got block from incorrect slot, " & + "expected " & $s & ", got " & + $item.blck.slot & ", position [" & + $i & ", " & $s & "]" + inc(v) - template smokeTest(kkind: SyncQueueKind, start, finish: Slot, - chunkSize: uint64) = - let aq = newAsyncQueue[BlockEntry]() - - var counter = - case kkind - of SyncQueueKind.Forward: - int(start) - of SyncQueueKind.Backward: - int(finish) - - proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - sblock.done() - else: - sblock.fail(VerifierError.Invalid) - dec(counter) - - proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - inc(counter) - sblock.done() - else: - sblock.fail(VerifierError.Invalid) - - var - queue = - case kkind + proc verifier(queue: AsyncQueue[BlockEntry]) {.async: (raises: []).} = + var slotsVerified = 0 + try: + for index, entry in scenario.pairs(): + case skind of SyncQueueKind.Forward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - start, finish, chunkSize, - getStaticSlotCb(start), collector(aq)) + for slot in countup(entry.slots.a, entry.slots.b): + verifyBlock(index, entry, slot, slotsVerified) of SyncQueueKind.Backward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, - finish, start, chunkSize, - getStaticSlotCb(finish), collector(aq)) - chain = createChain(start, finish) - validatorFut = - case kkind - of SyncQueueKind.Forward: - forwardValidator(aq) - of SyncQueueKind.Backward: - backwardValidator(aq) + for slot in countdown(entry.slots.b, entry.slots.a): + verifyBlock(index, entry, slot, slotsVerified) + except CancelledError: + raiseAssert "Scenario is not completed, " & + "number of slots passed " & $slotsVerified - let p1 = SomeTPeer() + (collector(aq), verifier(aq)) - proc runSmokeTest() {.async.} = - while true: - var request = queue.pop(finish, p1) - if request.isEmpty(): - break - await queue.push(request, getSlice(chain, start, request), - Opt.none(seq[BlobSidecars])) - await validatorFut.cancelAndWait() - - waitFor runSmokeTest() - case kkind - of SyncQueueKind.Forward: - check (counter - 1) == int(finish) - of SyncQueueKind.Backward: - check (counter + 1) == int(start) - - template unorderedAsyncTest(kkind: SyncQueueKind, startSlot: Slot) = - let - aq = newAsyncQueue[BlockEntry]() - chunkSize = 3'u64 - numberOfChunks = 3'u64 - finishSlot = startSlot + numberOfChunks * chunkSize - 1'u64 - queueSize = 1 - - var counter = - case kkind - of SyncQueueKind.Forward: - int(startSlot) - of SyncQueueKind.Backward: - int(finishSlot) - - proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - sblock.done() - else: - sblock.fail(VerifierError.Invalid) - dec(counter) - - proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - inc(counter) - sblock.done() - else: - sblock.fail(VerifierError.Invalid) - - var - chain = createChain(startSlot, finishSlot) - queue = - case kkind - of SyncQueueKind.Forward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - startSlot, finishSlot, chunkSize, - getStaticSlotCb(startSlot), collector(aq), - queueSize) - of SyncQueueKind.Backward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, - finishSlot, startSlot, chunkSize, - getStaticSlotCb(finishSlot), collector(aq), - queueSize) - validatorFut = - case kkind - of SyncQueueKind.Forward: - forwardValidator(aq) - of SyncQueueKind.Backward: - backwardValidator(aq) - - let - p1 = SomeTPeer() - p2 = SomeTPeer() - p3 = SomeTPeer() - - proc runTest(): Future[bool] {.async.} = - var r11 = queue.pop(finishSlot, p1) - var r12 = queue.pop(finishSlot, p2) - var r13 = queue.pop(finishSlot, p3) - - var f13 = queue.push(r13, chain.getSlice(startSlot, r13), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) - check: - f13.finished == false - case kkind - of SyncQueueKind.Forward: counter == int(startSlot) - of SyncQueueKind.Backward: counter == int(finishSlot) - - var f11 = queue.push(r11, chain.getSlice(startSlot, r11), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) - check: - case kkind - of SyncQueueKind.Forward: counter == int(startSlot + chunkSize) - of SyncQueueKind.Backward: counter == int(finishSlot - chunkSize) - f11.finished == true and f11.failed == false - f13.finished == false - - var f12 = queue.push(r12, chain.getSlice(startSlot, r12), - Opt.none(seq[BlobSidecars])) - await allFutures(f11, f12, f13) - check: - f12.finished == true and f12.failed == false - f13.finished == true and f13.failed == false - check: - case kkind - of SyncQueueKind.Forward: counter == int(finishSlot) + 1 - of SyncQueueKind.Backward: counter == int(startSlot) - 1 - r11.item == p1 - r12.item == p2 - r13.item == p3 - await validatorFut.cancelAndWait() - return true - - check waitFor(runTest()) == true - - template partialGoodResponseTest(kkind: SyncQueueKind, start, finish: Slot, - chunkSize: uint64) = - let aq = newAsyncQueue[BlockEntry]() - - var counter = - case kkind - of SyncQueueKind.Forward: - int(start) - of SyncQueueKind.Backward: - int(finish) - - proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - dec(counter) - sblock.done() - elif sblock.blck.slot < Slot(counter): - # There was a gap, report missing parent - sblock.fail(VerifierError.MissingParent) - else: - sblock.fail(VerifierError.Duplicate) - - proc getBackwardSafeSlotCb(): Slot = - min((Slot(counter).epoch + 1).start_slot, finish) - - proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - inc(counter) - sblock.done() - elif sblock.blck.slot > Slot(counter): - # There was a gap, report missing parent - sblock.fail(VerifierError.MissingParent) - else: - sblock.fail(VerifierError.Duplicate) - - proc getFowardSafeSlotCb(): Slot = - max(Slot(max(counter, 1) - 1).epoch.start_slot, start) - - var - queue = - case kkind - of SyncQueueKind.Forward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - start, finish, chunkSize, - getFowardSafeSlotCb, collector(aq)) - of SyncQueueKind.Backward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, - finish, start, chunkSize, - getBackwardSafeSlotCb, collector(aq)) - chain = createChain(start, finish) - validatorFut = - case kkind - of SyncQueueKind.Forward: - forwardValidator(aq) - of SyncQueueKind.Backward: - backwardValidator(aq) - - let p1 = SomeTPeer() - - var expectedScore = 0 - proc runTest() {.async.} = - while true: - var request = queue.pop(finish, p1) - if request.isEmpty(): - break - var response = getSlice(chain, start, request) - if response.len >= (SLOTS_PER_EPOCH + 3).int: - # Create gap close to end of response, to simulate behaviour where - # the remote peer is sending valid data but does not have it fully - # available (e.g., still doing backfill after checkpoint sync) - case kkind +suite "SyncManager test suite": + for kind in [SyncQueueKind.Forward, SyncQueueKind.Backward]: + asyncTest "[SyncQueue# & " & $kind & "] Smoke [single peer] test": + # Four ranges was distributed to single peer only. + let + scenario = [ + (Slot(0) .. Slot(127), Opt.none(VerifierError)) + ] + verifier = setupVerifier(kind, scenario) + sq = + case kind of SyncQueueKind.Forward: - response.delete(response.len - 2) + SyncQueue.init(SomeTPeer, kind, Slot(0), Slot(127), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(0)), + verifier.collector) of SyncQueueKind.Backward: - response.delete(1) - expectedScore += PeerScoreMissingValues - if response.len >= 1: - # Ensure requested values are past `safeSlot` - case kkind + SyncQueue.init(SomeTPeer, kind, Slot(127), Slot(0), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(127)), + verifier.collector) + peer = SomeTPeer.init("1") + r1 = sq.pop(Slot(127), peer) + r2 = sq.pop(Slot(127), peer) + r3 = sq.pop(Slot(127), peer) + d1 = createChain(r1.data) + d2 = createChain(r2.data) + d3 = createChain(r3.data) + + let + f1 = sq.push(r1, d1, Opt.none(seq[BlobSidecars])) + f2 = sq.push(r2, d2, Opt.none(seq[BlobSidecars])) + f3 = sq.push(r3, d3, Opt.none(seq[BlobSidecars])) + + check: + f1.finished == false + f2.finished == false + f3.finished == false + + await noCancel f1 + + check: + f1.finished == true + f2.finished == false + f3.finished == false + + await noCancel f2 + + check: + f1.finished == true + f2.finished == true + f3.finished == false + + await noCancel f3 + + check: + f1.finished == true + f2.finished == true + f3.finished == true + + let + r4 = sq.pop(Slot(127), peer) + d4 = createChain(r4.data) + f4 = sq.push(r4, d4, Opt.none(seq[BlobSidecars])) + + await noCancel f4 + + check: + f1.finished == true + f2.finished == true + f3.finished == true + f4.finished == true + + await noCancel wait(verifier.verifier, 2.seconds) + + asyncTest "[SyncQueue# & " & $kind & "] Smoke [3 peers] test": + # Three ranges was distributed between 3 peers, every range is going to + # be pushed by all peers. + let + scenario = [ + (Slot(0) .. Slot(127), Opt.none(VerifierError)) + ] + verifier = setupVerifier(kind, scenario) + sq = + case kind of SyncQueueKind.Forward: - check response[0][].slot >= getFowardSafeSlotCb() - else: - check response[^1][].slot <= getBackwardSafeSlotCb() - await queue.push(request, response, Opt.none(seq[BlobSidecars])) - await validatorFut.cancelAndWait() + SyncQueue.init(SomeTPeer, kind, Slot(0), Slot(127), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(0)), + verifier.collector) + of SyncQueueKind.Backward: + SyncQueue.init(SomeTPeer, kind, Slot(127), Slot(0), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(127)), + verifier.collector) + peer1 = SomeTPeer.init("1") + peer2 = SomeTPeer.init("2") + peer3 = SomeTPeer.init("3") + r11 = sq.pop(Slot(127), peer1) + r12 = sq.pop(Slot(127), peer2) + r13 = sq.pop(Slot(127), peer3) + d11 = createChain(r11.data) + d12 = createChain(r12.data) + d13 = createChain(r13.data) + r21 = sq.pop(Slot(127), peer1) + r22 = sq.pop(Slot(127), peer2) + r23 = sq.pop(Slot(127), peer3) + d21 = createChain(r21.data) + d22 = createChain(r22.data) + d23 = createChain(r23.data) + r31 = sq.pop(Slot(127), peer1) + r32 = sq.pop(Slot(127), peer2) + r33 = sq.pop(Slot(127), peer3) + d31 = createChain(r31.data) + d32 = createChain(r32.data) + d33 = createChain(r33.data) - waitFor runTest() - case kkind - of SyncQueueKind.Forward: - check (counter - 1) == int(finish) - of SyncQueueKind.Backward: - check (counter + 1) == int(start) - check p1.score >= expectedScore + let + f11 = sq.push(r11, d11, Opt.none(seq[BlobSidecars])) + f12 = sq.push(r12, d12, Opt.none(seq[BlobSidecars])) + f13 = sq.push(r13, d13, Opt.none(seq[BlobSidecars])) - template outOfBandAdvancementTest(kkind: SyncQueueKind, start, finish: Slot, - chunkSize: uint64) = - let aq = newAsyncQueue[BlockEntry]() + f22 = sq.push(r22, d22, Opt.none(seq[BlobSidecars])) + f21 = sq.push(r21, d21, Opt.none(seq[BlobSidecars])) + f23 = sq.push(r23, d23, Opt.none(seq[BlobSidecars])) - var counter = - case kkind - of SyncQueueKind.Forward: - int(start) - of SyncQueueKind.Backward: - int(finish) + f33 = sq.push(r33, d33, Opt.none(seq[BlobSidecars])) + f32 = sq.push(r32, d32, Opt.none(seq[BlobSidecars])) + f31 = sq.push(r31, d31, Opt.none(seq[BlobSidecars])) - proc failingValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - sblock.fail(VerifierError.Invalid) - - proc getBackwardSafeSlotCb(): Slot = - let progress = (uint64(int(finish) - counter) div chunkSize) * chunkSize - finish - progress - - proc getFowardSafeSlotCb(): Slot = - let progress = (uint64(counter - int(start)) div chunkSize) * chunkSize - start + progress - - template advanceSafeSlot() = - case kkind - of SyncQueueKind.Forward: - counter += int(chunkSize) - if counter > int(finish) + 1: - counter = int(finish) + 1 - break - of SyncQueueKind.Backward: - counter -= int(chunkSize) - if counter < int(start) - 1: - counter = int(start) - 1 - break - - var - queue = - case kkind - of SyncQueueKind.Forward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - start, finish, chunkSize, - getFowardSafeSlotCb, collector(aq)) - of SyncQueueKind.Backward: - SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, - finish, start, chunkSize, - getBackwardSafeSlotCb, collector(aq)) - chain = createChain(start, finish) - validatorFut = failingValidator(aq) - - let - p1 = SomeTPeer() - p2 = SomeTPeer() - - proc runTest() {.async.} = - while true: - var - request1 = queue.pop(finish, p1) - request2 = queue.pop(finish, p2) - if request1.isEmpty(): - break - - # Simulate failing request 2. - queue.push(request2) - check debtLen(queue) == request2.count - - # Advance `safeSlot` out of band. - advanceSafeSlot() - - # Handle request 1. Should be re-enqueued as it simulates `Invalid`. - let response1 = getSlice(chain, start, request1) - await queue.push(request1, response1, Opt.none(seq[BlobSidecars])) - check debtLen(queue) == request2.count + request1.count - - # Request 1 should be discarded as it is no longer relevant. - # Request 2 should be re-issued. - var request3 = queue.pop(finish, p1) - check: - request3 == request2 - debtLen(queue) == 0 - - # Handle request 3. Should be re-enqueued as it simulates `Invalid`. - let response3 = getSlice(chain, start, request3) - await queue.push(request3, response3, Opt.none(seq[BlobSidecars])) - check debtLen(queue) == request3.count - - # Request 2 should be re-issued. - var request4 = queue.pop(finish, p1) - check: - request4 == request2 - debtLen(queue) == 0 - - # Advance `safeSlot` out of band. - advanceSafeSlot() - - # Handle request 4. Should be re-enqueued as it simulates `Invalid`. - let response4 = getSlice(chain, start, request4) - await queue.push(request4, response4, Opt.none(seq[BlobSidecars])) - check debtLen(queue) == request4.count - - # Advance `safeSlot` out of band. - advanceSafeSlot() - - # Fetch a request. It should take into account the new `safeSlot`. - let request5 = queue.pop(finish, p1) - if request5.isEmpty(): - break - case kkind - of SyncQueueKind.Forward: - check request5.slot >= getFowardSafeSlotCb() - else: - check request5.lastSlot <= getBackwardSafeSlotCb() - queue.push(request5) - - await validatorFut.cancelAndWait() - - waitFor runTest() - case kkind - of SyncQueueKind.Forward: - check (counter - 1) == int(finish) - of SyncQueueKind.Backward: - check (counter + 1) == int(start) - - for k in {SyncQueueKind.Forward, SyncQueueKind.Backward}: - let prefix = "[SyncQueue#" & $k & "] " - - test prefix & "Start and finish slots equal": - startAndFinishSlotsEqual(k) - - test prefix & "Pass through established limits test": - passThroughLimitsTest(k) - - test prefix & "Two full requests success/fail": - twoFullRequests(k) - - test prefix & "Smoke test": - const SmokeTests = [ - (Slot(0), Slot(547), 61'u64), - (Slot(193), Slot(389), 79'u64), - (Slot(1181), Slot(1399), 41'u64) - ] - for item in SmokeTests: - smokeTest(k, item[0], item[1], item[2]) - - test prefix & "Async unordered push test": - const UnorderedTests = [ - Slot(0), Slot(100) - ] - for item in UnorderedTests: - unorderedAsyncTest(k, item) - - test prefix & "Good response with missing values towards end": - const PartialGoodResponseTests = [ - (Slot(0), Slot(200), (SLOTS_PER_EPOCH + 3).uint64) - ] - for item in PartialGoodResponseTests: - partialGoodResponseTest(k, item[0], item[1], item[2]) - - test prefix & "Handle out-of-band sync progress advancement": - const OutOfBandAdvancementTests = [ - (Slot(0), Slot(500), SLOTS_PER_EPOCH.uint64) - ] - for item in OutOfBandAdvancementTests: - outOfBandAdvancementTest(k, item[0], item[1], item[2]) - - test "[SyncQueue#Forward] Async unordered push with rewind test": - let - aq = newAsyncQueue[BlockEntry]() - startSlot = Slot(0) - chunkSize = SLOTS_PER_EPOCH - numberOfChunks = 4'u64 - finishSlot = startSlot + numberOfChunks * chunkSize - 1'u64 - queueSize = 1 - - var counter = int(startSlot) - - proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - withBlck(sblock.blck): - if forkyBlck.message.proposer_index == 0xDEADBEAF'u64: - sblock.fail(VerifierError.MissingParent) - else: - inc(counter) - sblock.done() - else: - sblock.fail(VerifierError.Invalid) - - var - chain = createChain(startSlot, finishSlot) - queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - startSlot, finishSlot, chunkSize, - getStaticSlotCb(startSlot), collector(aq), - queueSize) - validatorFut = forwardValidator(aq) - - let - p1 = SomeTPeer() - p2 = SomeTPeer() - p3 = SomeTPeer() - p4 = SomeTPeer() - p5 = SomeTPeer() - p6 = SomeTPeer() - p7 = SomeTPeer() - p8 = SomeTPeer() - - proc runTest(): Future[bool] {.async.} = - var r11 = queue.pop(finishSlot, p1) - var r12 = queue.pop(finishSlot, p2) - var r13 = queue.pop(finishSlot, p3) - var r14 = queue.pop(finishSlot, p4) - - var f14 = queue.push(r14, chain.getSlice(startSlot, r14), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) + await noCancel f11 check: - f14.finished == false - counter == int(startSlot) + f11.finished == true + # We do not check f12 and f13 here because their state is undefined + # at this time. + f21.finished == false + f22.finished == false + f23.finished == false + f31.finished == false + f32.finished == false + f33.finished == false - var f12 = queue.push(r12, chain.getSlice(startSlot, r12), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) + await noCancel f22 check: - counter == int(startSlot) - f12.finished == false - f14.finished == false + f11.finished == true + f12.finished == true + f13.finished == true + f22.finished == true + # We do not check f21 and f23 here because their state is undefined + # at this time. + f31.finished == false + f32.finished == false + f33.finished == false - var f11 = queue.push(r11, chain.getSlice(startSlot, r11), - Opt.none(seq[BlobSidecars])) - await allFutures(f11, f12) + await noCancel f33 check: - counter == int(startSlot + chunkSize + chunkSize) - f11.finished == true and f11.failed == false - f12.finished == true and f12.failed == false - f14.finished == false + f11.finished == true + f12.finished == true + f13.finished == true + f21.finished == true + f22.finished == true + f23.finished == true + f33.finished == true + # We do not check f31 and f32 here because their state is undefined + # at this time. - var missingSlice = chain.getSlice(startSlot, r13) - withBlck(missingSlice[0][]): - forkyBlck.message.proposer_index = 0xDEADBEAF'u64 - var f13 = queue.push(r13, missingSlice, - Opt.none(seq[BlobSidecars])) - await allFutures(f13, f14) - check: - f11.finished == true and f11.failed == false - f12.finished == true and f12.failed == false - f13.finished == true and f13.failed == false - f14.finished == true and f14.failed == false - queue.inpSlot == Slot(SLOTS_PER_EPOCH) - queue.outSlot == Slot(SLOTS_PER_EPOCH) - queue.debtLen == 0 + let + r41 = sq.pop(Slot(127), peer1) + d41 = createChain(r41.data) - # Recovery process - counter = int(SLOTS_PER_EPOCH) - - var r15 = queue.pop(finishSlot, p5) - var r16 = queue.pop(finishSlot, p6) - var r17 = queue.pop(finishSlot, p7) - var r18 = queue.pop(finishSlot, p8) - - check r18.isEmpty() == true - - var f17 = queue.push(r17, chain.getSlice(startSlot, r17), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) - check f17.finished == false - - var f16 = queue.push(r16, chain.getSlice(startSlot, r16), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) - check f16.finished == false - - var f15 = queue.push(r15, chain.getSlice(startSlot, r15), - Opt.none(seq[BlobSidecars])) - await allFutures(f15, f16, f17) - check: - f15.finished == true and f15.failed == false - f16.finished == true and f16.failed == false - f17.finished == true and f17.failed == false - counter == int(finishSlot) + 1 - - await validatorFut.cancelAndWait() - return true - - check waitFor(runTest()) == true - - test "Process all unviable blocks": - let - aq = newAsyncQueue[BlockEntry]() - startSlot = Slot(0) - chunkSize = SLOTS_PER_EPOCH - numberOfChunks = 1'u64 - finishSlot = startSlot + numberOfChunks * chunkSize - 1'u64 - queueSize = 1 - - var counter = int(startSlot) - - proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - withBlck(sblock.blck): - sblock.fail(VerifierError.UnviableFork) - inc(counter) - - var - chain = createChain(startSlot, finishSlot) - queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - startSlot, finishSlot, chunkSize, - getStaticSlotCb(startSlot), collector(aq), - queueSize) - validatorFut = forwardValidator(aq) - - let - p1 = SomeTPeer() - - proc runTest(): Future[bool] {.async.} = - var r11 = queue.pop(finishSlot, p1) - - # Push a single request that will fail with all blocks being unviable - var f11 = queue.push(r11, chain.getSlice(startSlot, r11), - Opt.none(seq[BlobSidecars])) - discard await f11.withTimeout(1.seconds) + await noCancel sq.push(r41, d41, Opt.none(seq[BlobSidecars])) check: f11.finished == true - counter == int(startSlot + chunkSize) # should process all unviable blocks - debtLen(queue) == chunkSize # The range must be retried + f12.finished == true + f13.finished == true + f21.finished == true + f22.finished == true + f23.finished == true + f31.finished == true + f32.finished == true + f33.finished == true - await validatorFut.cancelAndWait() - return true + await noCancel wait(verifier.verifier, 2.seconds) - check waitFor(runTest()) == true + asyncTest "[SyncQueue# & " & $kind & "] Failure request push test": + let + scenario = + case kind + of SyncQueueKind.Forward: + [ + (Slot(0) .. Slot(31), Opt.none(VerifierError)), + (Slot(32) .. Slot(63), Opt.none(VerifierError)) + ] + of SyncQueueKind.Backward: + [ + (Slot(32) .. Slot(63), Opt.none(VerifierError)), + (Slot(0) .. Slot(31), Opt.none(VerifierError)) + ] + verifier = setupVerifier(kind, scenario) + sq = + case kind + of SyncQueueKind.Forward: + SyncQueue.init(SomeTPeer, kind, Slot(0), Slot(63), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(0)), + verifier.collector) + of SyncQueueKind.Backward: + SyncQueue.init(SomeTPeer, kind, Slot(63), Slot(0), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(63)), + verifier.collector) + peer1 = SomeTPeer.init("1") + peer2 = SomeTPeer.init("2") + peer3 = SomeTPeer.init("3") - test "[SyncQueue#Backward] Async unordered push with rewind test": + block: + let + r11 = sq.pop(Slot(63), peer1) + r12 = sq.pop(Slot(63), peer2) + r13 = sq.pop(Slot(63), peer3) + + sq.push(r11) + sq.push(r12) + sq.push(r13) + # Next couple of calls should be detected as non relevant + sq.push(r11) + sq.push(r12) + sq.push(r13) + + block: + let + r11 = sq.pop(Slot(63), peer1) + r12 = sq.pop(Slot(63), peer2) + r13 = sq.pop(Slot(63), peer3) + d12 = createChain(r12.data) + + sq.push(r11) + await noCancel sq.push(r12, d12, Opt.none(seq[BlobSidecars])) + sq.push(r13) + # Next couple of calls should be detected as non relevant + sq.push(r11) + sq.push(r12) + sq.push(r13) + + block: + let + r11 = sq.pop(Slot(63), peer1) + r12 = sq.pop(Slot(63), peer2) + r13 = sq.pop(Slot(63), peer3) + d13 = createChain(r13.data) + + sq.push(r11) + sq.push(r12) + await noCancel sq.push(r13, d13, Opt.none(seq[BlobSidecars])) + # Next couple of calls should be detected as non relevant + sq.push(r11) + sq.push(r12) + sq.push(r13) + + await noCancel wait(verifier.verifier, 2.seconds) + + asyncTest "[SyncQueue# & " & $kind & "] Invalid block [3 peers] test": + # This scenario performs test for 2 cases. + # 1. When first error encountered it just drops the the response and + # increases `failuresCounter`. + # 2. When another error encountered it will reset whole queue to the + # last known good/safe point (rewind process). + let + scenario = + case kind + of SyncQueueKind.Forward: + [ + (Slot(0) .. Slot(31), Opt.none(VerifierError)), + (Slot(32) .. Slot(40), Opt.none(VerifierError)), + (Slot(41) .. Slot(41), Opt.some(VerifierError.Invalid)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(41), Opt.some(VerifierError.Invalid)), + (Slot(0) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(41), Opt.none(VerifierError)), + (Slot(42) .. Slot(63), Opt.none(VerifierError)) + ] + of SyncQueueKind.Backward: + [ + (Slot(32) .. Slot(63), Opt.none(VerifierError)), + (Slot(22) .. Slot(31), Opt.none(VerifierError)), + (Slot(21) .. Slot(21), Opt.some(VerifierError.Invalid)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(21) .. Slot(21), Opt.some(VerifierError.Invalid)), + (Slot(32) .. Slot(63), Opt.some(VerifierError.Duplicate)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(21) .. Slot(21), Opt.none(VerifierError)), + (Slot(0) .. Slot(20), Opt.none(VerifierError)), + ] + verifier = setupVerifier(kind, scenario) + sq = + case kind + of SyncQueueKind.Forward: + SyncQueue.init(SomeTPeer, kind, Slot(0), Slot(63), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(0)), + verifier.collector) + of SyncQueueKind.Backward: + SyncQueue.init(SomeTPeer, kind, Slot(63), Slot(0), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(63)), + verifier.collector) + peer1 = SomeTPeer.init("1") + peer2 = SomeTPeer.init("2") + peer3 = SomeTPeer.init("3") + r11 = sq.pop(Slot(63), peer1) + r12 = sq.pop(Slot(63), peer2) + r13 = sq.pop(Slot(63), peer3) + d11 = createChain(r11.data) + d12 = createChain(r12.data) + d13 = createChain(r13.data) + r21 = sq.pop(Slot(63), peer1) + r22 = sq.pop(Slot(63), peer2) + r23 = sq.pop(Slot(63), peer3) + d21 = createChain(r21.data) + d22 = createChain(r22.data) + d23 = createChain(r23.data) + + let + f11 = sq.push(r11, d11, Opt.none(seq[BlobSidecars])) + f12 = sq.push(r12, d12, Opt.none(seq[BlobSidecars])) + f13 = sq.push(r13, d13, Opt.none(seq[BlobSidecars])) + + await noCancel f11 + check f11.finished == true + + let + f21 = sq.push(r21, d21, Opt.none(seq[BlobSidecars])) + f22 = sq.push(r22, d22, Opt.none(seq[BlobSidecars])) + f23 = sq.push(r23, d23, Opt.none(seq[BlobSidecars])) + + await noCancel f21 + check: + f21.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + await noCancel f22 + check: + f21.finished == true + f22.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + await noCancel f23 + check: + f21.finished == true + f22.finished == true + f23.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + let + r31 = sq.pop(Slot(63), peer1) + r32 = sq.pop(Slot(63), peer2) + r33 = sq.pop(Slot(63), peer3) + d31 = createChain(r31.data) + d32 = createChain(r32.data) + d33 = createChain(r33.data) + r41 = sq.pop(Slot(63), peer1) + r42 = sq.pop(Slot(63), peer2) + r43 = sq.pop(Slot(63), peer3) + d41 = createChain(r41.data) + d42 = createChain(r42.data) + d43 = createChain(r43.data) + + let + f31 = sq.push(r31, d31, Opt.none(seq[BlobSidecars])) + f32 = sq.push(r32, d32, Opt.none(seq[BlobSidecars])) + f33 = sq.push(r33, d33, Opt.none(seq[BlobSidecars])) + f42 = sq.push(r42, d42, Opt.none(seq[BlobSidecars])) + f41 = sq.push(r41, d41, Opt.none(seq[BlobSidecars])) + f43 = sq.push(r43, d43, Opt.none(seq[BlobSidecars])) + + await noCancel f31 + check: + f31.finished == true + + await noCancel f42 + check: + f31.finished == true + f32.finished == true + f33.finished == true + f42.finished == true + + await noCancel f43 + check: + f31.finished == true + f32.finished == true + f33.finished == true + f41.finished == true + f42.finished == true + f43.finished == true + + await noCancel wait(verifier.verifier, 2.seconds) + + asyncTest "[SyncQueue# & " & $kind & "] Unviable block [3 peers] test": + # This scenario performs test for 2 cases. + # 1. When first error encountered it just drops the the response and + # increases `failuresCounter`. + # 2. When another error encountered it will reset whole queue to the + # last known good/safe point (rewind process). + # Unviable fork blocks processed differently from invalid blocks, all + # this blocks should be added to quarantine, so blocks range is not get + # failed immediately. + let + scenario = + case kind + of SyncQueueKind.Forward: + [ + (Slot(0) .. Slot(31), Opt.none(VerifierError)), + (Slot(32) .. Slot(40), Opt.none(VerifierError)), + (Slot(41) .. Slot(63), Opt.some(VerifierError.UnviableFork)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(63), Opt.some(VerifierError.UnviableFork)), + (Slot(0) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(63), Opt.none(VerifierError)) + ] + of SyncQueueKind.Backward: + [ + (Slot(32) .. Slot(63), Opt.none(VerifierError)), + (Slot(22) .. Slot(31), Opt.none(VerifierError)), + (Slot(0) .. Slot(21), Opt.some(VerifierError.UnviableFork)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(0) .. Slot(21), Opt.some(VerifierError.UnviableFork)), + (Slot(32) .. Slot(63), Opt.some(VerifierError.Duplicate)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(0) .. Slot(21), Opt.none(VerifierError)) + ] + verifier = setupVerifier(kind, scenario) + sq = + case kind + of SyncQueueKind.Forward: + SyncQueue.init(SomeTPeer, kind, Slot(0), Slot(63), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(0)), + verifier.collector) + of SyncQueueKind.Backward: + SyncQueue.init(SomeTPeer, kind, Slot(63), Slot(0), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(63)), + verifier.collector) + peer1 = SomeTPeer.init("1") + peer2 = SomeTPeer.init("2") + peer3 = SomeTPeer.init("3") + r11 = sq.pop(Slot(63), peer1) + r12 = sq.pop(Slot(63), peer2) + r13 = sq.pop(Slot(63), peer3) + d11 = createChain(r11.data) + d12 = createChain(r12.data) + d13 = createChain(r13.data) + r21 = sq.pop(Slot(63), peer1) + r22 = sq.pop(Slot(63), peer2) + r23 = sq.pop(Slot(63), peer3) + d21 = createChain(r21.data) + d22 = createChain(r22.data) + d23 = createChain(r23.data) + + let + f11 = sq.push(r11, d11, Opt.none(seq[BlobSidecars])) + f12 = sq.push(r12, d12, Opt.none(seq[BlobSidecars])) + f13 = sq.push(r13, d13, Opt.none(seq[BlobSidecars])) + + await noCancel f11 + check f11.finished == true + + let + f21 = sq.push(r21, d21, Opt.none(seq[BlobSidecars])) + f22 = sq.push(r22, d22, Opt.none(seq[BlobSidecars])) + f23 = sq.push(r23, d23, Opt.none(seq[BlobSidecars])) + + await noCancel f21 + check: + f21.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + await noCancel f22 + check: + f21.finished == true + f22.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + await noCancel f23 + check: + f21.finished == true + f22.finished == true + f23.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + let + r31 = sq.pop(Slot(63), peer1) + r32 = sq.pop(Slot(63), peer2) + r33 = sq.pop(Slot(63), peer3) + + let + d31 = createChain(r31.data) + d32 = createChain(r32.data) + d33 = createChain(r33.data) + r41 = sq.pop(Slot(63), peer1) + r42 = sq.pop(Slot(63), peer2) + r43 = sq.pop(Slot(63), peer3) + d41 = createChain(r41.data) + d42 = createChain(r42.data) + d43 = createChain(r43.data) + + let + f31 = sq.push(r31, d31, Opt.none(seq[BlobSidecars])) + f32 = sq.push(r32, d32, Opt.none(seq[BlobSidecars])) + f33 = sq.push(r33, d33, Opt.none(seq[BlobSidecars])) + f42 = sq.push(r42, d42, Opt.none(seq[BlobSidecars])) + f41 = sq.push(r41, d41, Opt.none(seq[BlobSidecars])) + f43 = sq.push(r43, d43, Opt.none(seq[BlobSidecars])) + + await noCancel f31 + check: + f31.finished == true + + await noCancel f42 + check: + f31.finished == true + f32.finished == true + f33.finished == true + f42.finished == true + + await noCancel f43 + check: + f31.finished == true + f32.finished == true + f33.finished == true + f41.finished == true + f42.finished == true + f43.finished == true + + await noCancel wait(verifier.verifier, 2.seconds) + + asyncTest "[SyncQueue# & " & $kind & "] Combination of missing parent " & + "and good blocks [3 peers] test": + let + scenario = + case kind + of SyncQueueKind.Forward: + [ + (Slot(0) .. Slot(31), Opt.none(VerifierError)), + (Slot(32) .. Slot(40), Opt.none(VerifierError)), + (Slot(41) .. Slot(41), Opt.some(VerifierError.MissingParent)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(41), Opt.some(VerifierError.MissingParent)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(41), Opt.some(VerifierError.MissingParent)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(41), Opt.some(VerifierError.MissingParent)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(41), Opt.some(VerifierError.MissingParent)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(41), Opt.some(VerifierError.MissingParent)), + (Slot(32) .. Slot(40), Opt.some(VerifierError.Duplicate)), + (Slot(41) .. Slot(63), Opt.none(VerifierError)) + ] + of SyncQueueKind.Backward: + [ + (Slot(32) .. Slot(63), Opt.none(VerifierError)), + (Slot(22) .. Slot(31), Opt.none(VerifierError)), + (Slot(21) .. Slot(21), Opt.some(VerifierError.MissingParent)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(21) .. Slot(21), Opt.some(VerifierError.MissingParent)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(21) .. Slot(21), Opt.some(VerifierError.MissingParent)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(21) .. Slot(21), Opt.some(VerifierError.MissingParent)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(21) .. Slot(21), Opt.some(VerifierError.MissingParent)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(21) .. Slot(21), Opt.some(VerifierError.MissingParent)), + (Slot(22) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(0) .. Slot(21), Opt.none(VerifierError)), + ] + verifier = setupVerifier(kind, scenario) + sq = + case kind + of SyncQueueKind.Forward: + SyncQueue.init(SomeTPeer, kind, Slot(0), Slot(63), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(0)), + verifier.collector) + of SyncQueueKind.Backward: + SyncQueue.init(SomeTPeer, kind, Slot(63), Slot(0), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(63)), + verifier.collector) + peer1 = SomeTPeer.init("1") + peer2 = SomeTPeer.init("2") + peer3 = SomeTPeer.init("3") + r11 = sq.pop(Slot(63), peer1) + r12 = sq.pop(Slot(63), peer2) + r13 = sq.pop(Slot(63), peer3) + d11 = createChain(r11.data) + d12 = createChain(r12.data) + d13 = createChain(r13.data) + r21 = sq.pop(Slot(63), peer1) + r22 = sq.pop(Slot(63), peer2) + r23 = sq.pop(Slot(63), peer3) + d21 = createChain(r21.data) + d22 = createChain(r22.data) + d23 = createChain(r23.data) + + let + f11 = sq.push(r11, d11, Opt.none(seq[BlobSidecars])) + f12 = sq.push(r12, d12, Opt.none(seq[BlobSidecars])) + f13 = sq.push(r13, d13, Opt.none(seq[BlobSidecars])) + + await noCancel f11 + check f11.finished == true + + let + f21 = sq.push(r21, d21, Opt.none(seq[BlobSidecars])) + f22 = sq.push(r22, d22, Opt.none(seq[BlobSidecars])) + f23 = sq.push(r23, d23, Opt.none(seq[BlobSidecars])) + + await noCancel f21 + check: + f21.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + await noCancel f22 + check: + f21.finished == true + f22.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + await noCancel f23 + check: + f21.finished == true + f22.finished == true + f23.finished == true + f11.finished == true + f12.finished == true + f13.finished == true + + let + r31 = sq.pop(Slot(63), peer1) + r32 = sq.pop(Slot(63), peer2) + r33 = sq.pop(Slot(63), peer3) + d31 = createChain(r31.data) + d32 = createChain(r32.data) + d33 = createChain(r33.data) + f31 = sq.push(r31, d31, Opt.none(seq[BlobSidecars])) + f32 = sq.push(r32, d32, Opt.none(seq[BlobSidecars])) + f33 = sq.push(r33, d33, Opt.none(seq[BlobSidecars])) + + await noCancel f31 + await noCancel f32 + await noCancel f33 + + let + r41 = sq.pop(Slot(63), peer1) + r42 = sq.pop(Slot(63), peer2) + r43 = sq.pop(Slot(63), peer3) + d41 = createChain(r41.data) + d42 = createChain(r42.data) + d43 = createChain(r43.data) + f42 = sq.push(r32, d42, Opt.none(seq[BlobSidecars])) + f41 = sq.push(r31, d41, Opt.none(seq[BlobSidecars])) + f43 = sq.push(r33, d43, Opt.none(seq[BlobSidecars])) + + await noCancel allFutures(f42, f41, f43) + + await noCancel wait(verifier.verifier, 2.seconds) + + asyncTest "[SyncQueue#Forward] Missing parent and exponential rewind " & + "[3 peers] test": let - aq = newAsyncQueue[BlockEntry]() - startSlot = Slot(0) - chunkSize = SLOTS_PER_EPOCH - numberOfChunks = 4'u64 - finishSlot = startSlot + numberOfChunks * chunkSize - 1'u64 - queueSize = 1 + scenario = + [ + (Slot(0) .. Slot(31), Opt.none(VerifierError)), + # .. 3 ranges are empty + (Slot(128) .. Slot(128), Opt.some(VerifierError.MissingParent)), + (Slot(128) .. Slot(128), Opt.some(VerifierError.MissingParent)), + # 1st rewind should be to (failed_slot - 1 * epoch) = 96 + (Slot(128) .. Slot(128), Opt.some(VerifierError.MissingParent)), + (Slot(128) .. Slot(128), Opt.some(VerifierError.MissingParent)), + # 2nd rewind should be to (failed_slot - 2 * epoch) = 64 + (Slot(128) .. Slot(128), Opt.some(VerifierError.MissingParent)), + (Slot(128) .. Slot(128), Opt.some(VerifierError.MissingParent)), + # 3rd rewind should be to (failed_slot - 4 * epoch) = 0 + (Slot(0) .. Slot(31), Opt.some(VerifierError.Duplicate)), + (Slot(32) .. Slot(63), Opt.none(VerifierError)), + (Slot(64) .. Slot(95), Opt.none(VerifierError)), + (Slot(96) .. Slot(127), Opt.none(VerifierError)), + (Slot(128) .. Slot(159), Opt.none(VerifierError)), + ] + kind = SyncQueueKind.Forward + verifier = setupVerifier(kind, scenario) + sq = SyncQueue.init(SomeTPeer, kind, Slot(0), Slot(159), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(0)), + verifier.collector) + peer1 = SomeTPeer.init("1") + peer2 = SomeTPeer.init("2") + peer3 = SomeTPeer.init("3") + r11 = sq.pop(Slot(159), peer1) + r12 = sq.pop(Slot(159), peer2) + r13 = sq.pop(Slot(159), peer3) + d11 = createChain(r11.data) + d12 = createChain(r12.data) + d13 = createChain(r13.data) + f11 = sq.push(r11, d11, Opt.none(seq[BlobSidecars])) + f12 = sq.push(r12, d12, Opt.none(seq[BlobSidecars])) + f13 = sq.push(r13, d13, Opt.none(seq[BlobSidecars])) - var - lastSafeSlot = finishSlot - counter = int(finishSlot) + await noCancel f11 + await noCancel f12 + await noCancel f13 - proc getSafeSlot(): Slot = - lastSafeSlot + for i in 0 ..< 3: + let + re1 = sq.pop(Slot(159), peer1) + re2 = sq.pop(Slot(159), peer2) + re3 = sq.pop(Slot(159), peer3) + de1 = default(seq[ref ForkedSignedBeaconBlock]) + de2 = default(seq[ref ForkedSignedBeaconBlock]) + de3 = default(seq[ref ForkedSignedBeaconBlock]) + fe1 = sq.push(re1, de1, Opt.none(seq[BlobSidecars])) + fe2 = sq.push(re2, de2, Opt.none(seq[BlobSidecars])) + fe3 = sq.push(re3, de3, Opt.none(seq[BlobSidecars])) - proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - withBlck(sblock.blck): - if forkyBlck.message.proposer_index == 0xDEADBEAF'u64: - sblock.fail(VerifierError.MissingParent) - else: - lastSafeSlot = sblock.blck.slot - dec(counter) - sblock.done() - else: - sblock.fail(VerifierError.Invalid) - - var - chain = createChain(startSlot, finishSlot) - queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, - finishSlot, startSlot, chunkSize, - getSafeSlot, collector(aq), queueSize) - validatorFut = backwardValidator(aq) + await noCancel fe1 + await noCancel fe2 + await noCancel fe3 let - p1 = SomeTPeer() - p2 = SomeTPeer() - p3 = SomeTPeer() - p4 = SomeTPeer() - p5 = SomeTPeer() - p6 = SomeTPeer() - p7 = SomeTPeer() + r21 = sq.pop(Slot(159), peer1) + r22 = sq.pop(Slot(159), peer2) + r23 = sq.pop(Slot(159), peer3) + d21 = createChain(r21.data) + d22 = createChain(r22.data) + d23 = createChain(r23.data) + f21 = sq.push(r21, d21, Opt.none(seq[BlobSidecars])) + f22 = sq.push(r22, d22, Opt.none(seq[BlobSidecars])) + f23 = sq.push(r23, d23, Opt.none(seq[BlobSidecars])) - proc runTest(): Future[bool] {.async.} = - var r11 = queue.pop(finishSlot, p1) - var r12 = queue.pop(finishSlot, p2) - var r13 = queue.pop(finishSlot, p3) - var r14 = queue.pop(finishSlot, p4) + await noCancel f21 + await noCancel f22 + await noCancel f23 - var f14 = queue.push(r14, chain.getSlice(startSlot, r14), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) - check: - f14.finished == false - counter == int(finishSlot) + for i in 0 ..< 1: + let + re1 = sq.pop(Slot(159), peer1) + re2 = sq.pop(Slot(159), peer2) + re3 = sq.pop(Slot(159), peer3) + de1 = default(seq[ref ForkedSignedBeaconBlock]) + de2 = default(seq[ref ForkedSignedBeaconBlock]) + de3 = default(seq[ref ForkedSignedBeaconBlock]) + fe1 = sq.push(re1, de1, Opt.none(seq[BlobSidecars])) + fe2 = sq.push(re2, de2, Opt.none(seq[BlobSidecars])) + fe3 = sq.push(re3, de3, Opt.none(seq[BlobSidecars])) - var f12 = queue.push(r12, chain.getSlice(startSlot, r12), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) - check: - counter == int(finishSlot) - f12.finished == false - f14.finished == false + await noCancel fe1 + await noCancel fe2 + await noCancel fe3 - var f11 = queue.push(r11, chain.getSlice(startSlot, r11), - Opt.none(seq[BlobSidecars])) - await allFutures(f11, f12) - check: - counter == int(finishSlot - chunkSize - chunkSize) - f11.finished == true and f11.failed == false - f12.finished == true and f12.failed == false - f14.finished == false + let + r31 = sq.pop(Slot(159), peer1) + r32 = sq.pop(Slot(159), peer2) + r33 = sq.pop(Slot(159), peer3) + d31 = createChain(r31.data) + d32 = createChain(r32.data) + d33 = createChain(r33.data) + f31 = sq.push(r31, d31, Opt.none(seq[BlobSidecars])) + f32 = sq.push(r32, d32, Opt.none(seq[BlobSidecars])) + f33 = sq.push(r33, d33, Opt.none(seq[BlobSidecars])) - var missingSlice = chain.getSlice(startSlot, r13) - withBlck(missingSlice[0][]): - forkyBlck.message.proposer_index = 0xDEADBEAF'u64 - var f13 = queue.push(r13, missingSlice, Opt.none(seq[BlobSidecars])) - await allFutures(f13, f14) - check: - f11.finished == true and f11.failed == false - f12.finished == true and f12.failed == false - f13.finished == true and f13.failed == false - f14.finished == true and f14.failed == false + await noCancel f31 + await noCancel f32 + await noCancel f33 - # Recovery process - counter = int(SLOTS_PER_EPOCH) + 1 + for i in 0 ..< 2: + let + re1 = sq.pop(Slot(159), peer1) + re2 = sq.pop(Slot(159), peer2) + re3 = sq.pop(Slot(159), peer3) + de1 = default(seq[ref ForkedSignedBeaconBlock]) + de2 = default(seq[ref ForkedSignedBeaconBlock]) + de3 = default(seq[ref ForkedSignedBeaconBlock]) + fe1 = sq.push(re1, de1, Opt.none(seq[BlobSidecars])) + fe2 = sq.push(re2, de2, Opt.none(seq[BlobSidecars])) + fe3 = sq.push(re3, de3, Opt.none(seq[BlobSidecars])) - var r15 = queue.pop(finishSlot, p5) - var r16 = queue.pop(finishSlot, p6) - var r17 = queue.pop(finishSlot, p7) + await noCancel fe1 + await noCancel fe2 + await noCancel fe3 - check r17.isEmpty() == true + let + r41 = sq.pop(Slot(159), peer1) + r42 = sq.pop(Slot(159), peer2) + r43 = sq.pop(Slot(159), peer3) + d41 = createChain(r41.data) + d42 = createChain(r42.data) + d43 = createChain(r43.data) + f41 = sq.push(r41, d41, Opt.none(seq[BlobSidecars])) + f42 = sq.push(r42, d42, Opt.none(seq[BlobSidecars])) + f43 = sq.push(r43, d43, Opt.none(seq[BlobSidecars])) - var f16 = queue.push(r16, chain.getSlice(startSlot, r16), - Opt.none(seq[BlobSidecars])) - await sleepAsync(100.milliseconds) - check f16.finished == false + await noCancel f41 + await noCancel f42 + await noCancel f43 - var f15 = queue.push(r15, chain.getSlice(startSlot, r15), - Opt.none(seq[BlobSidecars])) - await allFutures(f15, f16) - check: - f15.finished == true and f15.failed == false - f16.finished == true and f16.failed == false - counter == int(startSlot) - 1 + for i in 0 ..< 5: + let + rf1 = sq.pop(Slot(159), peer1) + rf2 = sq.pop(Slot(159), peer2) + rf3 = sq.pop(Slot(159), peer3) + df1 = createChain(rf1.data) + df2 = createChain(rf2.data) + df3 = createChain(rf3.data) + ff1 = sq.push(rf1, df1, Opt.none(seq[BlobSidecars])) + ff2 = sq.push(rf2, df2, Opt.none(seq[BlobSidecars])) + ff3 = sq.push(rf3, df3, Opt.none(seq[BlobSidecars])) - await validatorFut.cancelAndWait() - return true + await noCancel ff1 + await noCancel ff2 + await noCancel ff3 - check waitFor(runTest()) == true + await noCancel wait(verifier.verifier, 2.seconds) + + asyncTest "[SyncQueue#Backward] Missing parent and exponential rewind " & + "[3 peers] test": + let + scenario = + [ + (Slot(128) .. Slot(159), Opt.none(VerifierError)), + # .. 3 ranges are empty + (Slot(31) .. Slot(31), Opt.some(VerifierError.MissingParent)), + (Slot(31) .. Slot(31), Opt.some(VerifierError.MissingParent)), + (Slot(128) .. Slot(159), Opt.some(VerifierError.Duplicate)), + (Slot(96) .. Slot(127), Opt.none(VerifierError)), + # .. 2 ranges are empty + (Slot(31) .. Slot(31), Opt.some(VerifierError.MissingParent)), + (Slot(31) .. Slot(31), Opt.some(VerifierError.MissingParent)), + (Slot(128) .. Slot(159), Opt.some(VerifierError.Duplicate)), + (Slot(96) .. Slot(127), Opt.some(VerifierError.Duplicate)), + (Slot(64) .. Slot(95), Opt.none(VerifierError)), + # .. 1 range is empty + (Slot(31) .. Slot(31), Opt.some(VerifierError.MissingParent)), + (Slot(31) .. Slot(31), Opt.some(VerifierError.MissingParent)), + (Slot(128) .. Slot(159), Opt.some(VerifierError.Duplicate)), + (Slot(96) .. Slot(127), Opt.some(VerifierError.Duplicate)), + (Slot(64) .. Slot(95), Opt.some(VerifierError.Duplicate)), + (Slot(32) .. Slot(63), Opt.none(VerifierError)), + (Slot(0) .. Slot(31), Opt.none(VerifierError)) + ] + kind = SyncQueueKind.Backward + verifier = setupVerifier(kind, scenario) + sq = SyncQueue.init(SomeTPeer, kind, Slot(159), Slot(0), + 32'u64, # 32 slots per request + 3, # 3 concurrent requests + 2, # 2 failures allowed + getStaticSlotCb(Slot(159)), + verifier.collector) + peer1 = SomeTPeer.init("1") + peer2 = SomeTPeer.init("2") + peer3 = SomeTPeer.init("3") + r11 = sq.pop(Slot(159), peer1) + r12 = sq.pop(Slot(159), peer2) + r13 = sq.pop(Slot(159), peer3) + d11 = createChain(r11.data) + d12 = createChain(r12.data) + d13 = createChain(r13.data) + f11 = sq.push(r11, d11, Opt.none(seq[BlobSidecars])) + f12 = sq.push(r12, d12, Opt.none(seq[BlobSidecars])) + f13 = sq.push(r13, d13, Opt.none(seq[BlobSidecars])) + + await noCancel f11 + await noCancel f12 + await noCancel f13 + + for i in 0 ..< 3: + let + re1 = sq.pop(Slot(159), peer1) + re2 = sq.pop(Slot(159), peer2) + re3 = sq.pop(Slot(159), peer3) + de1 = default(seq[ref ForkedSignedBeaconBlock]) + de2 = default(seq[ref ForkedSignedBeaconBlock]) + de3 = default(seq[ref ForkedSignedBeaconBlock]) + fe1 = sq.push(re1, de1, Opt.none(seq[BlobSidecars])) + fe2 = sq.push(re2, de2, Opt.none(seq[BlobSidecars])) + fe3 = sq.push(re3, de3, Opt.none(seq[BlobSidecars])) + + await noCancel fe1 + await noCancel fe2 + await noCancel fe3 + + let + r21 = sq.pop(Slot(159), peer1) + r22 = sq.pop(Slot(159), peer2) + r23 = sq.pop(Slot(159), peer3) + d21 = createChain(r21.data) + d22 = createChain(r22.data) + d23 = createChain(r23.data) + f21 = sq.push(r21, d21, Opt.none(seq[BlobSidecars])) + f22 = sq.push(r22, d22, Opt.none(seq[BlobSidecars])) + f23 = sq.push(r23, d23, Opt.none(seq[BlobSidecars])) + + await noCancel f21 + await noCancel f22 + await noCancel f23 + + for i in 0 ..< 2: + let + r31 = sq.pop(Slot(159), peer1) + r32 = sq.pop(Slot(159), peer2) + r33 = sq.pop(Slot(159), peer3) + d31 = createChain(r31.data) + d32 = createChain(r32.data) + d33 = createChain(r33.data) + f31 = sq.push(r31, d31, Opt.none(seq[BlobSidecars])) + f32 = sq.push(r32, d32, Opt.none(seq[BlobSidecars])) + f33 = sq.push(r33, d33, Opt.none(seq[BlobSidecars])) + + await noCancel f31 + await noCancel f32 + await noCancel f33 + + for i in 0 ..< 2: + let + re1 = sq.pop(Slot(159), peer1) + re2 = sq.pop(Slot(159), peer2) + re3 = sq.pop(Slot(159), peer3) + de1 = default(seq[ref ForkedSignedBeaconBlock]) + de2 = default(seq[ref ForkedSignedBeaconBlock]) + de3 = default(seq[ref ForkedSignedBeaconBlock]) + fe1 = sq.push(re1, de1, Opt.none(seq[BlobSidecars])) + fe2 = sq.push(re2, de2, Opt.none(seq[BlobSidecars])) + fe3 = sq.push(re3, de3, Opt.none(seq[BlobSidecars])) + + await noCancel fe1 + await noCancel fe2 + await noCancel fe3 + + let + r41 = sq.pop(Slot(159), peer1) + r42 = sq.pop(Slot(159), peer2) + r43 = sq.pop(Slot(159), peer3) + d41 = createChain(r41.data) + d42 = createChain(r42.data) + d43 = createChain(r43.data) + f41 = sq.push(r41, d41, Opt.none(seq[BlobSidecars])) + f42 = sq.push(r42, d42, Opt.none(seq[BlobSidecars])) + f43 = sq.push(r43, d43, Opt.none(seq[BlobSidecars])) + + await noCancel f41 + await noCancel f42 + await noCancel f43 + + for i in 0 ..< 3: + let + r51 = sq.pop(Slot(159), peer1) + r52 = sq.pop(Slot(159), peer2) + r53 = sq.pop(Slot(159), peer3) + d51 = createChain(r51.data) + d52 = createChain(r52.data) + d53 = createChain(r53.data) + f51 = sq.push(r51, d51, Opt.none(seq[BlobSidecars])) + f52 = sq.push(r52, d52, Opt.none(seq[BlobSidecars])) + f53 = sq.push(r53, d53, Opt.none(seq[BlobSidecars])) + + await noCancel f51 + await noCancel f52 + await noCancel f53 + + for i in 0 ..< 1: + let + re1 = sq.pop(Slot(159), peer1) + re2 = sq.pop(Slot(159), peer2) + re3 = sq.pop(Slot(159), peer3) + de1 = default(seq[ref ForkedSignedBeaconBlock]) + de2 = default(seq[ref ForkedSignedBeaconBlock]) + de3 = default(seq[ref ForkedSignedBeaconBlock]) + fe1 = sq.push(re1, de1, Opt.none(seq[BlobSidecars])) + fe2 = sq.push(re2, de2, Opt.none(seq[BlobSidecars])) + fe3 = sq.push(re3, de3, Opt.none(seq[BlobSidecars])) + + await noCancel fe1 + await noCancel fe2 + await noCancel fe3 + + let + r61 = sq.pop(Slot(159), peer1) + r62 = sq.pop(Slot(159), peer2) + r63 = sq.pop(Slot(159), peer3) + d61 = createChain(r61.data) + d62 = createChain(r62.data) + d63 = createChain(r63.data) + f61 = sq.push(r61, d61, Opt.none(seq[BlobSidecars])) + f62 = sq.push(r62, d62, Opt.none(seq[BlobSidecars])) + f63 = sq.push(r63, d63, Opt.none(seq[BlobSidecars])) + + await noCancel f61 + await noCancel f62 + await noCancel f63 + + for i in 0 ..< 5: + let + r71 = sq.pop(Slot(159), peer1) + r72 = sq.pop(Slot(159), peer2) + r73 = sq.pop(Slot(159), peer3) + d71 = createChain(r71.data) + d72 = createChain(r72.data) + d73 = createChain(r73.data) + f71 = sq.push(r71, d71, Opt.none(seq[BlobSidecars])) + f72 = sq.push(r72, d72, Opt.none(seq[BlobSidecars])) + f73 = sq.push(r73, d73, Opt.none(seq[BlobSidecars])) + + await noCancel f71 + await noCancel f72 + await noCancel f73 + + await noCancel wait(verifier.verifier, 2.seconds) + + test "[SyncQueue#Forward] getRewindPoint() test": + let aq = newAsyncQueue[BlockEntry]() + block: + let + queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), + 1'u64, 3, 2, getStaticSlotCb(Slot(0)), + collector(aq)) + finalizedSlot = start_slot(Epoch(0'u64)) + epochStartSlot = start_slot(Epoch(0'u64)) + 1'u64 + finishSlot = start_slot(Epoch(2'u64)) + + for i in uint64(epochStartSlot) ..< uint64(finishSlot): + check queue.getRewindPoint(Slot(i), finalizedSlot) == finalizedSlot + + block: + let + queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), + 1'u64, 3, 2, getStaticSlotCb(Slot(0)), + collector(aq)) + finalizedSlot = start_slot(Epoch(1'u64)) + epochStartSlot = start_slot(Epoch(1'u64)) + 1'u64 + finishSlot = start_slot(Epoch(3'u64)) + + for i in uint64(epochStartSlot) ..< uint64(finishSlot) : + check queue.getRewindPoint(Slot(i), finalizedSlot) == finalizedSlot + + block: + let + queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), + 1'u64, 3, 2, getStaticSlotCb(Slot(0)), + collector(aq)) + finalizedSlot = start_slot(Epoch(0'u64)) + failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64) + failEpoch = epoch(failSlot) + + var counter = 1'u64 + for i in 0 ..< 64: + if counter >= failEpoch: + break + let rewindEpoch = failEpoch - counter + let rewindSlot = start_slot(rewindEpoch) + check queue.getRewindPoint(failSlot, finalizedSlot) == rewindSlot + counter = counter shl 1 + + block: + let + queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), + 1'u64, 3, 2, getStaticSlotCb(Slot(0)), + collector(aq)) + let + finalizedSlot = start_slot(Epoch(1'u64)) + failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64) + failEpoch = epoch(failSlot) + + var counter = 1'u64 + for i in 0 ..< 64: + if counter >= failEpoch: + break + let + rewindEpoch = failEpoch - counter + rewindSlot = start_slot(rewindEpoch) + check queue.getRewindPoint(failSlot, finalizedSlot) == rewindSlot + counter = counter shl 1 + + test "[SyncQueue#Backward] getRewindPoint() test": + let aq = newAsyncQueue[BlockEntry]() + block: + let + getSafeSlot = getStaticSlotCb(Slot(1024)) + queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, + Slot(1024), Slot(0), + 1'u64, 3, 2, getSafeSlot, collector(aq)) + safeSlot = getSafeSlot() + + for i in countdown(1023, 0): + check queue.getRewindPoint(Slot(i), safeSlot) == safeSlot test "[SyncQueue] hasEndGap() test": - let chain1 = createChain(Slot(1), Slot(1)) - let chain2 = newSeq[ref ForkedSignedBeaconBlock]() + let + chain1 = createChain(Slot(1) .. Slot(1)) + chain2 = newSeq[ref ForkedSignedBeaconBlock]() for counter in countdown(32'u64, 2'u64): - let req = SyncRequest[SomeTPeer](slot: Slot(1), count: counter) - let sr = SyncResult[SomeTPeer](request: req, data: chain1) - check sr.hasEndGap() == true + let + srange = SyncRange.init(Slot(1), counter) + req = SyncRequest[SomeTPeer](data: srange) + check req.hasEndGap(chain1) == true - let req = SyncRequest[SomeTPeer](slot: Slot(1), count: 1'u64) - let sr1 = SyncResult[SomeTPeer](request: req, data: chain1) - let sr2 = SyncResult[SomeTPeer](request: req, data: chain2) + let req = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(1), 1'u64)) check: - sr1.hasEndGap() == false - sr2.hasEndGap() == true - - test "[SyncQueue] getLastNonEmptySlot() test": - let chain1 = createChain(Slot(10), Slot(10)) - let chain2 = newSeq[ref ForkedSignedBeaconBlock]() - - for counter in countdown(32'u64, 2'u64): - let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter) - let sr = SyncResult[SomeTPeer](request: req, data: chain1) - check sr.getLastNonEmptySlot() == Slot(10) - - let req = SyncRequest[SomeTPeer](slot: Slot(100), count: 1'u64) - let sr = SyncResult[SomeTPeer](request: req, data: chain2) - check sr.getLastNonEmptySlot() == Slot(100) - - test "[SyncQueue] contains() test": - proc checkRange[T](req: SyncRequest[T]): bool = - var slot = req.slot - var counter = 0'u64 - while counter < req.count: - if not(req.contains(slot)): - return false - slot = slot + 1 - counter = counter + 1'u64 - return true - - var req1 = SyncRequest[SomeTPeer](slot: Slot(5), count: 10'u64) - - check: - req1.checkRange() == true - - req1.contains(Slot(4)) == false - req1.contains(Slot(15)) == false + req.hasEndGap(chain1) == false + req.hasEndGap(chain2) == true test "[SyncQueue] checkResponse() test": let - r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64) - r2 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64) - r3 = SyncRequest[SomeTPeer](slot: Slot(11), count: 3'u64) + r1 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 1'u64)) + r2 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 2'u64)) + r3 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 3'u64)) check: checkResponse(r1, [Slot(11)]).isOk() == true @@ -1111,9 +1364,9 @@ suite "SyncManager test suite": test "[SyncQueue] checkBlobsResponse() test": let - r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64) - r2 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64) - r3 = SyncRequest[SomeTPeer](slot: Slot(11), count: 3'u64) + r1 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 1'u64)) + r2 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 2'u64)) + r3 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 3'u64)) d1 = Slot(11).repeat(MAX_BLOBS_PER_BLOCK) d2 = Slot(12).repeat(MAX_BLOBS_PER_BLOCK) @@ -1183,13 +1436,12 @@ suite "SyncManager test suite": test "[SyncManager] groupBlobs() test": var - blocks = createChain(Slot(10), Slot(15)) + blocks = createChain(Slot(10) .. Slot(15)) blobs = createBlobs(blocks, @[Slot(11), Slot(11), Slot(12), Slot(14)]) let groupedRes = groupBlobs(blocks, blobs) - check: - groupedRes.isOk() + check groupedRes.isOk() let grouped = groupedRes.get() @@ -1232,77 +1484,3 @@ suite "SyncManager test suite": check: groupedRes3.isErr() - - - - test "[SyncQueue#Forward] getRewindPoint() test": - let aq = newAsyncQueue[BlockEntry]() - block: - var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), - 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), 2) - let finalizedSlot = start_slot(Epoch(0'u64)) - let epochStartSlot = start_slot(Epoch(0'u64)) + 1'u64 - let finishSlot = start_slot(Epoch(2'u64)) - - for i in uint64(epochStartSlot) ..< uint64(finishSlot): - check queue.getRewindPoint(Slot(i), finalizedSlot) == finalizedSlot - - block: - var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), - 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), 2) - let finalizedSlot = start_slot(Epoch(1'u64)) - let epochStartSlot = start_slot(Epoch(1'u64)) + 1'u64 - let finishSlot = start_slot(Epoch(3'u64)) - - for i in uint64(epochStartSlot) ..< uint64(finishSlot) : - check queue.getRewindPoint(Slot(i), finalizedSlot) == finalizedSlot - - block: - var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), - 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), 2) - let finalizedSlot = start_slot(Epoch(0'u64)) - let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64) - let failEpoch = epoch(failSlot) - - var counter = 1'u64 - for i in 0 ..< 64: - if counter >= failEpoch: - break - let rewindEpoch = failEpoch - counter - let rewindSlot = start_slot(rewindEpoch) - check queue.getRewindPoint(failSlot, finalizedSlot) == rewindSlot - counter = counter shl 1 - - block: - var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, - Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), - 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), 2) - let finalizedSlot = start_slot(Epoch(1'u64)) - let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64) - let failEpoch = epoch(failSlot) - var counter = 1'u64 - for i in 0 ..< 64: - if counter >= failEpoch: - break - let rewindEpoch = failEpoch - counter - let rewindSlot = start_slot(rewindEpoch) - check queue.getRewindPoint(failSlot, finalizedSlot) == rewindSlot - counter = counter shl 1 - - test "[SyncQueue#Backward] getRewindPoint() test": - let aq = newAsyncQueue[BlockEntry]() - block: - let getSafeSlot = getStaticSlotCb(Slot(1024)) - var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, - Slot(1024), Slot(0), - 1'u64, getSafeSlot, collector(aq), 2) - let safeSlot = getSafeSlot() - for i in countdown(1023, 0): - check queue.getRewindPoint(Slot(i), safeSlot) == safeSlot