From b05734f61063e1ce1f9a329fcc1c258112096560 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 8 Dec 2021 23:15:29 +0200 Subject: [PATCH] Backward sync support for SyncManager. (#3131) * Unbundle SyncQueue from sync_manager.nim. Unbundle Peer scores constants to peer_scores.nim. Add Forward/Backward enum. * Further improvements and tests. * Adopt getRewindPoint() and fix MissingParent handler. * Remove unused procedures. Refactor `result` usage. Fix resetWait(). * Add all the tests and fix the issue with rewind point. * Fix get() issue. * Fix flaky tests. * test fixes Co-authored-by: Jacek Sieka --- AllTests-mainnet.md | 28 +- beacon_chain/sync/peer_scores.nim | 28 + beacon_chain/sync/sync_manager.nim | 639 +-------------------- beacon_chain/sync/sync_queue.nim | 783 +++++++++++++++++++++++++ docs/block_flow.dot | 6 +- docs/block_flow.md | 2 +- tests/test_sync_manager.nim | 879 +++++++++++++++++++---------- 7 files changed, 1415 insertions(+), 950 deletions(-) create mode 100644 beacon_chain/sync/peer_scores.nim create mode 100644 beacon_chain/sync/sync_queue.nim diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 3b083f9af..1cdaf6ac7 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -298,22 +298,26 @@ OK: 2/2 Fail: 0/2 Skip: 0/2 OK: 4/4 Fail: 0/4 Skip: 0/4 ## SyncManager test suite ```diff -+ [SyncQueue] Async pending and resetWait() test OK -+ [SyncQueue] Async unordered push start from zero OK -+ [SyncQueue] Async unordered push with not full start from non-zero OK -+ [SyncQueue] Full and incomplete success/fail start from non-zero OK -+ [SyncQueue] Full and incomplete success/fail start from zero OK -+ [SyncQueue] One smart and one stupid + debt split + empty OK -+ [SyncQueue] Smart and stupid success/fail OK -+ [SyncQueue] Start and finish slots equal OK -+ [SyncQueue] Two full requests success/fail OK ++ [SyncQueue#Backward] Async unordered push test OK ++ [SyncQueue#Backward] Async unordered push with rewind test OK ++ [SyncQueue#Backward] Pass through established limits test OK ++ [SyncQueue#Backward] Smoke test OK ++ [SyncQueue#Backward] Start and finish slots equal OK ++ [SyncQueue#Backward] Two full requests success/fail OK ++ [SyncQueue#Backward] getRewindPoint() test OK ++ [SyncQueue#Forward] Async unordered push test OK ++ [SyncQueue#Forward] Async unordered push with rewind test OK ++ [SyncQueue#Forward] Pass through established limits test OK ++ [SyncQueue#Forward] Smoke test OK ++ [SyncQueue#Forward] Start and finish slots equal OK ++ [SyncQueue#Forward] Two full requests success/fail OK ++ [SyncQueue#Forward] getRewindPoint() test OK + [SyncQueue] checkResponse() test OK + [SyncQueue] contains() test OK + [SyncQueue] getLastNonEmptySlot() test OK -+ [SyncQueue] getRewindPoint() test OK + [SyncQueue] hasEndGap() test OK ``` -OK: 14/14 Fail: 0/14 Skip: 0/14 +OK: 18/18 Fail: 0/18 Skip: 0/18 ## Zero signature sanity checks ```diff + SSZ serialization roundtrip of SignedBeaconBlockHeader OK @@ -373,4 +377,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1 ---TOTAL--- -OK: 205/207 Fail: 0/207 Skip: 2/207 +OK: 209/211 Fail: 0/211 Skip: 2/211 diff --git a/beacon_chain/sync/peer_scores.nim b/beacon_chain/sync/peer_scores.nim new file mode 100644 index 000000000..2d12ee60c --- /dev/null +++ b/beacon_chain/sync/peer_scores.nim @@ -0,0 +1,28 @@ +# beacon_chain +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +const + PeerScoreNoStatus* = -100 + ## Peer did not answer `status` request. + PeerScoreStaleStatus* = -50 + ## Peer's `status` answer do not progress in time. + PeerScoreUseless* = -10 + ## Peer's latest head is lower then ours. + PeerScoreGoodStatus* = 50 + ## Peer's `status` answer is fine. + PeerScoreNoBlocks* = -100 + ## Peer did not respond in time on `blocksByRange` request. + PeerScoreGoodBlocks* = 100 + ## Peer's `blocksByRange` answer is fine. + PeerScoreBadBlocks* = -1000 + ## Peer's response contains incorrect blocks. + PeerScoreBadResponse* = -1000 + ## Peer's response is not in requested range. + PeerScoreMissingBlocks* = -200 + ## Peer response contains too many empty blocks. diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index d7beb05b9..b09d29d06 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -7,45 +7,24 @@ {.push raises: [Defect].} -import std/[ - options, heapqueue, tables, strutils, sequtils, math, algorithm] +import std/[options, heapqueue, tables, strutils, sequtils, math, algorithm] import stew/results, chronos, chronicles - import - ../spec/datatypes/[phase0, altair], + ../spec/datatypes/[base, phase0, altair, merge], ../spec/eth2_apis/rpc_types, ../spec/[helpers, forks], - ../networking/[peer_pool, eth2_network] + ../networking/[peer_pool, eth2_network], + ../gossip_processing/block_processor, + ../consensus_object_pools/block_pools_types, + ./peer_scores, ./sync_queue -import ../gossip_processing/block_processor -import ../consensus_object_pools/block_pools_types - -export phase0, altair, chronos, chronicles, results, block_pools_types, - helpers +export base, phase0, altair, merge, chronos, chronicles, results, + block_pools_types, helpers, peer_scores, sync_queue logScope: topics = "syncman" const - PeerScoreNoStatus* = -100 - ## Peer did not answer `status` request. - PeerScoreStaleStatus* = -50 - ## Peer's `status` answer do not progress in time. - PeerScoreUseless* = -10 - ## Peer's latest head is lower then ours. - PeerScoreGoodStatus* = 50 - ## Peer's `status` answer is fine. - PeerScoreNoBlocks* = -100 - ## Peer did not respond in time on `blocksByRange` request. - PeerScoreGoodBlocks* = 100 - ## Peer's `blocksByRange` answer is fine. - PeerScoreBadBlocks* = -1000 - ## Peer's response contains incorrect blocks. - PeerScoreBadResponse* = -1000 - ## Peer's response is not in requested range. - PeerScoreMissingBlocks* = -200 - ## Peer response contains too many empty blocks. - SyncWorkersCount* = 10 ## Number of sync workers to spawn @@ -64,45 +43,6 @@ type BlockDownload, BadResponse - GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} - - SyncRequest*[T] = object - index*: uint64 - slot*: Slot - count*: uint64 - step*: uint64 - item*: T - - SyncResult*[T] = object - request*: SyncRequest[T] - data*: seq[ForkedSignedBeaconBlock] - - SyncWaiter*[T] = object - future: Future[bool] - request: SyncRequest[T] - - RewindPoint = object - failSlot: Slot - epochCount: uint64 - - SyncQueue*[T] = ref object - inpSlot*: Slot - outSlot*: Slot - startSlot*: Slot - lastSlot: Slot - chunkSize*: uint64 - queueSize*: int - counter*: uint64 - opcounter*: uint64 - pending*: Table[uint64, SyncRequest[T]] - waiters: seq[SyncWaiter[T]] - getFinalizedSlot*: GetSlotCallback - debtsQueue: HeapQueue[SyncRequest[T]] - debtsCount: uint64 - readyQueue: HeapQueue[SyncResult[T]] - rewind: Option[RewindPoint] - blockProcessor: ref BlockProcessor - SyncWorkerStatus* {.pure.} = enum Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Processing @@ -148,563 +88,10 @@ type SyncManagerError* = object of CatchableError BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]] -proc validate*[T](sq: SyncQueue[T], - blk: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] = - let resfut = newFuture[Result[void, BlockError]]("sync.manager.validate") - sq.blockProcessor[].addBlock(blk, resfut) - resfut - -proc getShortMap*[T](req: SyncRequest[T], - data: openArray[ForkedSignedBeaconBlock]): string = - ## Returns all slot numbers in ``data`` as placement map. - var res = newStringOfCap(req.count) - var slider = req.slot - var last = 0 - for i in 0 ..< req.count: - if last < len(data): - for k in last ..< len(data): - if slider == data[k].slot: - res.add('x') - last = k + 1 - break - elif slider < data[k].slot: - res.add('.') - break - else: - res.add('.') - slider = slider + req.step - res - -proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} = - slot >= req.slot and slot < req.slot + req.count * req.step and - ((slot - req.slot) mod req.step == 0) - -proc cmp*[T](a, b: SyncRequest[T]): int = - cmp(uint64(a.slot), uint64(b.slot)) - -proc checkResponse*[T](req: SyncRequest[T], - data: openArray[ForkedSignedBeaconBlock]): bool = - if len(data) == 0: - # Impossible to verify empty response. - return true - - if uint64(len(data)) > req.count: - # Number of blocks in response should be less or equal to number of - # requested blocks. - return false - - var slot = req.slot - var rindex = 0'u64 - var dindex = 0 - - while (rindex < req.count) and (dindex < len(data)): - if slot < data[dindex].slot: - discard - elif slot == data[dindex].slot: - inc(dindex) - else: - return false - slot = slot + req.step - rindex = rindex + 1'u64 - - if dindex == len(data): - return true - else: - return false - -proc getFullMap*[T](req: SyncRequest[T], - data: openArray[ForkedSignedBeaconBlock]): string = - # Returns all slot numbers in ``data`` as comma-delimeted string. - mapIt(data, $it.message.slot).join(", ") - -proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], slot: Slot, - count: uint64): SyncRequest[T] = - SyncRequest[T](slot: slot, count: count, step: 1'u64) - -proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], start: Slot, - finish: Slot): SyncRequest[T] = - let count = finish - start + 1'u64 - SyncRequest[T](slot: start, count: count, step: 1'u64) - -proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], slot: Slot, - count: uint64, item: T): SyncRequest[T] = - SyncRequest[T](slot: slot, count: count, item: item, step: 1'u64) - -proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], start: Slot, - finish: Slot, item: T): SyncRequest[T] = - let count = finish - start + 1'u64 - SyncRequest[T](slot: start, count: count, step: 1'u64, item: item) - proc init*[T](t1: typedesc[SyncFailure], kind: SyncFailureKind, peer: T): SyncFailure[T] = SyncFailure[T](kind: kind, peer: peer, stamp: now(chronos.Moment)) -proc empty*[T](t: typedesc[SyncRequest], - t2: typedesc[T]): SyncRequest[T] {.inline.} = - SyncRequest[T](step: 0'u64, count: 0'u64) - -proc setItem*[T](sr: var SyncRequest[T], item: T) = - sr.item = item - -proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} = - (sr.step == 0'u64) and (sr.count == 0'u64) - -proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], - start, last: Slot, chunkSize: uint64, - getFinalizedSlotCb: GetSlotCallback, - blockProcessor: ref BlockProcessor, - syncQueueSize: int = -1): SyncQueue[T] = - ## Create new synchronization queue with parameters - ## - ## ``start`` and ``last`` are starting and finishing Slots. - ## - ## ``chunkSize`` maximum number of slots in one request. - ## - ## ``syncQueueSize`` maximum queue size for incoming data. If ``syncQueueSize > 0`` - ## queue will help to keep backpressure under control. If ``syncQueueSize <= 0`` - ## then queue size is unlimited (default). - ## - ## ``updateCb`` procedure which will be used to send downloaded blocks to - ## consumer. Procedure should return ``false`` only when it receives - ## incorrect blocks, and ``true`` if sequence of blocks is correct. - - # SyncQueue is the core of sync manager, this data structure distributes - # requests to peers and manages responses from peers. - # - # Because SyncQueue is async data structure it manages backpressure and - # order of incoming responses and it also resolves "joker's" problem. - # - # Joker's problem - # - # According to current Ethereum2 network specification - # > Clients MUST respond with at least one block, if they have it and it - # > exists in the range. Clients MAY limit the number of blocks in the - # > response. - # - # Such rule can lead to very uncertain responses, for example let slots from - # 10 to 12 will be not empty. Client which follows specification can answer - # with any response from this list (X - block, `-` empty space): - # - # 1. X X X - # 2. - - X - # 3. - X - - # 4. - X X - # 5. X - - - # 6. X - X - # 7. X X - - # - # If peer answers with `1` everything will be fine and `block_pool` will be - # able to process all 3 blocks. In case of `2`, `3`, `4`, `6` - `block_pool` - # will fail immediately with chunk and report "parent is missing" error. - # But in case of `5` and `7` blocks will be processed by `block_pool` without - # any problems, however it will start producing problems right from this - # uncertain last slot. SyncQueue will start producing requests for next - # blocks, but all the responses from this point will fail with "parent is - # missing" error. Lets call such peers "jokers", because they are joking - # with responses. - # - # To fix "joker" problem we going to perform rollback to the latest finalized - # epoch's first slot. - doAssert(chunkSize > 0'u64, "Chunk size should not be zero") - result = SyncQueue[T]( - startSlot: start, - lastSlot: last, - chunkSize: chunkSize, - queueSize: syncQueueSize, - getFinalizedSlot: getFinalizedSlotCb, - waiters: newSeq[SyncWaiter[T]](), - counter: 1'u64, - pending: initTable[uint64, SyncRequest[T]](), - debtsQueue: initHeapQueue[SyncRequest[T]](), - inpSlot: start, - outSlot: start, - blockProcessor: blockProcessor - ) - -proc `<`*[T](a, b: SyncRequest[T]): bool {.inline.} = - a.slot < b.slot - -proc `<`*[T](a, b: SyncResult[T]): bool {.inline.} = - a.request.slot < b.request.slot - -proc `==`*[T](a, b: SyncRequest[T]): bool {.inline.} = - result = ((a.slot == b.slot) and (a.count == b.count) and - (a.step == b.step)) - -proc lastSlot*[T](req: SyncRequest[T]): Slot {.inline.} = - ## Returns last slot for request ``req``. - req.slot + req.count - 1'u64 - -proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) = - req.index = sq.counter - sq.counter = sq.counter + 1'u64 - sq.pending[req.index] = req - -proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} = - ## Update last slot stored in queue ``sq`` with value ``last``. - doAssert(sq.lastSlot <= last, - "Last slot could not be lower then stored one " & - $sq.lastSlot & " <= " & $last) - sq.lastSlot = last - -proc wakeupWaiters[T](sq: SyncQueue[T], flag = true) = - ## Wakeup one or all blocked waiters. - for item in sq.waiters: - if not(item.future.finished()): - item.future.complete(flag) - -proc waitForChanges[T](sq: SyncQueue[T], - req: SyncRequest[T]): Future[bool] {.async.} = - ## Create new waiter and wait for completion from `wakeupWaiters()`. - var waitfut = newFuture[bool]("SyncQueue.waitForChanges") - let waititem = SyncWaiter[T](future: waitfut, request: req) - sq.waiters.add(waititem) - try: - result = await waitfut - finally: - sq.waiters.delete(sq.waiters.find(waititem)) - -proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = - ## This procedure will perform wakeupWaiters(false) and blocks until last - ## waiter will be awakened. - var waitChanges = sq.waitForChanges(SyncRequest.empty(T)) - sq.wakeupWaiters(false) - discard await waitChanges - -proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = - ## Perform reset of all the blocked waiters in SyncQueue. - ## - ## We adding one more waiter to the waiters sequence and - ## call wakeupWaiters(false). Because our waiter is last in sequence of - ## waiters it will be resumed only after all waiters will be awakened and - ## finished. - - # We are clearing pending list, so that all requests that are still running - # around (still downloading, but not yet pushed to the SyncQueue) will be - # expired. Its important to perform this call first (before await), otherwise - # you can introduce race problem. - sq.pending.clear() - - # We calculating minimal slot number to which we will be able to reset, - # without missing any blocks. There 3 sources: - # 1. Debts queue. - # 2. Processing queue (`inpSlot`, `outSlot`). - # 3. Requested slot `toSlot`. - # - # Queue's `outSlot` is the lowest slot we added to `block_pool`, but - # `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not - # added slot requests, so it can't be bigger then `outSlot` value. - var minSlot = sq.outSlot - if toSlot.isSome(): - minSlot = min(toSlot.get(), sq.outSlot) - sq.debtsQueue.clear() - sq.debtsCount = 0 - sq.readyQueue.clear() - sq.inpSlot = minSlot - sq.outSlot = minSlot - - # We are going to wakeup all the waiters and wait for last one. - await sq.wakeupAndWaitWaiters() - -proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} = - ## Returns ``true`` if response chain of blocks is empty (has only empty - ## slots). - len(sr.data) == 0 - -proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} = - ## Returns ``true`` if response chain of blocks has gap at the end. - let lastslot = sr.request.slot + sr.request.count - 1'u64 - if len(sr.data) == 0: - return true - if sr.data[^1].slot != lastslot: - return true - return false - -proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} = - ## Returns last non-empty slot from result ``sr``. If response has only - ## empty slots, original request slot will be returned. - if len(sr.data) == 0: - # If response has only empty slots we going to use original request slot - sr.request.slot - else: - sr.data[^1].slot - -proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) = - sq.debtsQueue.push(sr) - sq.debtsCount = sq.debtsCount + sr.count - -proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, - finalizedSlot: Slot): Slot = - # Calculate the latest finalized epoch. - let finalizedEpoch = compute_epoch_at_slot(finalizedSlot) - - # Calculate failure epoch. - let failEpoch = compute_epoch_at_slot(failSlot) - - # Calculate exponential rewind point in number of epochs. - let epochCount = - if sq.rewind.isSome(): - let rewind = sq.rewind.get() - if failSlot == rewind.failSlot: - # `MissingParent` happened at same slot so we increase rewind point by - # factor of 2. - if failEpoch > finalizedEpoch: - let rewindPoint = rewind.epochCount shl 1 - if rewindPoint < rewind.epochCount: - # If exponential rewind point produces `uint64` overflow we will - # make rewind to latest finalized epoch. - failEpoch - finalizedEpoch - else: - if (failEpoch < rewindPoint) or - (failEpoch - rewindPoint < finalizedEpoch): - # If exponential rewind point points to position which is far - # behind latest finalized epoch. - failEpoch - finalizedEpoch - else: - rewindPoint - else: - warn "Trying to rewind over the last finalized epoch", - finalized_slot = finalizedSlot, fail_slot = failSlot, - finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, - rewind_epoch_count = rewind.epochCount, - finalized_epoch = finalizedEpoch - 0'u64 - else: - # `MissingParent` happened at different slot so we going to rewind for - # 1 epoch only. - if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch): - warn "Сould not rewind further than the last finalized epoch", - finalized_slot = finalizedSlot, fail_slot = failSlot, - finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, - rewind_epoch_count = rewind.epochCount, - finalized_epoch = finalizedEpoch - 0'u64 - else: - 1'u64 - else: - # `MissingParent` happened first time. - if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch): - warn "Сould not rewind further than the last finalized epoch", - finalized_slot = finalizedSlot, fail_slot = failSlot, - finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, - finalized_epoch = finalizedEpoch - 0'u64 - else: - 1'u64 - - # echo "epochCount = ", epochCount - - if epochCount == 0'u64: - warn "Unable to continue syncing, please restart the node", - finalized_slot = finalizedSlot, fail_slot = failSlot, - finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, - finalized_epoch = finalizedEpoch - # Calculate the rewind epoch, which will be equal to last rewind point or - # finalizedEpoch - let rewindEpoch = - if sq.rewind.isNone(): - finalizedEpoch - else: - compute_epoch_at_slot(sq.rewind.get().failSlot) - - sq.rewind.get().epochCount - compute_start_slot_at_epoch(rewindEpoch) - else: - # Calculate the rewind epoch, which should not be less than the latest - # finalized epoch. - let rewindEpoch = failEpoch - epochCount - # Update and save new rewind point in SyncQueue. - sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount)) - compute_start_slot_at_epoch(rewindEpoch) - -proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], - data: seq[ForkedSignedBeaconBlock]) {.async, gcsafe.} = - ## Push successful result to queue ``sq``. - mixin updateScore - - if sr.index notin sq.pending: - # If request `sr` not in our pending list, it only means that - # SyncQueue.resetWait() happens and all pending requests are expired, so - # we swallow `old` requests, and in such way sync-workers are able to get - # proper new requests from SyncQueue. - return - - sq.pending.del(sr.index) - - # This is backpressure handling algorithm, this algorithm is blocking - # all pending `push` requests if `request.slot` not in range: - # [current_queue_slot, current_queue_slot + sq.queueSize * sq.chunkSize]. - var exitNow = false - while true: - if (sq.queueSize > 0) and - (sr.slot >= sq.outSlot + uint64(sq.queueSize) * sq.chunkSize): - let res = await sq.waitForChanges(sr) - if res: - continue - else: - # SyncQueue reset happens. We are exiting to wake up sync-worker. - exitNow = true - break - let syncres = SyncResult[T](request: sr, data: data) - sq.readyQueue.push(syncres) - exitNow = false - break - - if exitNow: - return - - while len(sq.readyQueue) > 0: - let minSlot = sq.readyQueue[0].request.slot - if sq.outSlot != minSlot: - break - let item = sq.readyQueue.pop() - - # Validating received blocks one by one - var res: Result[void, BlockError] - var failSlot: Option[Slot] - if len(item.data) > 0: - for blk in item.data: - trace "Pushing block", block_root = blk.root, - block_slot = blk.slot - res = await sq.validate(blk) - if not(res.isOk): - failSlot = some(blk.slot) - break - else: - res = Result[void, BlockError].ok() - - # Increase progress counter, so watch task will be able to know that we are - # not stuck. - inc(sq.opcounter) - - if res.isOk: - sq.outSlot = sq.outSlot + item.request.count - if len(item.data) > 0: - # If there no error and response was not empty we should reward peer - # with some bonus score. - item.request.item.updateScore(PeerScoreGoodBlocks) - sq.wakeupWaiters() - else: - debug "Block pool rejected peer's response", peer = item.request.item, - request_slot = item.request.slot, - request_count = item.request.count, - request_step = item.request.step, - blocks_map = getShortMap(item.request, item.data), - blocks_count = len(item.data), errCode = res.error, - topics = "syncman" - - var resetSlot: Option[Slot] - - if res.error == BlockError.MissingParent: - # If we got `BlockError.MissingParent` it means that peer returns chain - # of blocks with holes or `block_pool` is in incomplete state. We going - # to rewind to the first slot at latest finalized epoch. - let req = item.request - let finalizedSlot = sq.getFinalizedSlot() - if finalizedSlot < req.slot: - let rewindSlot = sq.getRewindPoint(failSlot.get(), finalizedSlot) - warn "Unexpected missing parent, rewind happens", - peer = req.item, rewind_to_slot = rewindSlot, - rewind_epoch_count = sq.rewind.get().epochCount, - rewind_fail_slot = failSlot.get(), - finalized_slot = finalized_slot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), topics = "syncman" - resetSlot = some(rewindSlot) - req.item.updateScore(PeerScoreMissingBlocks) - else: - error "Unexpected missing parent at finalized epoch slot", - peer = req.item, to_slot = finalizedSlot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), topics = "syncman" - req.item.updateScore(PeerScoreBadBlocks) - elif res.error == BlockError.Invalid: - let req = item.request - warn "Received invalid sequence of blocks", peer = req.item, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), topics = "syncman" - req.item.updateScore(PeerScoreBadBlocks) - else: - let req = item.request - warn "Received unexpected response from block_pool", peer = req.item, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), errorCode = res.error, - topics = "syncman" - req.item.updateScore(PeerScoreBadBlocks) - - # We need to move failed response to the debts queue. - sq.toDebtsQueue(item.request) - if resetSlot.isSome(): - await sq.resetWait(resetSlot) - debug "Rewind to slot was happened", reset_slot = reset_slot.get(), - queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, - rewind_epoch_count = sq.rewind.get().epochCount, - rewind_fail_slot = sq.rewind.get().failSlot, - reset_slot = resetSlot, topics = "syncman" - break - -proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) = - ## Push failed request back to queue. - if sr.index notin sq.pending: - # If request `sr` not in our pending list, it only means that - # SyncQueue.resetWait() happens and all pending requests are expired, so - # we swallow `old` requests, and in such way sync-workers are able to get - # proper new requests from SyncQueue. - return - sq.pending.del(sr.index) - sq.toDebtsQueue(sr) - -proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] = - if len(sq.debtsQueue) > 0: - if maxSlot < sq.debtsQueue[0].slot: - return SyncRequest.empty(T) - - var sr = sq.debtsQueue.pop() - if sr.lastSlot() <= maxSlot: - sq.debtsCount = sq.debtsCount - sr.count - sr.setItem(item) - sq.makePending(sr) - return sr - - var sr1 = SyncRequest.init(T, sr.slot, maxslot, item) - let sr2 = SyncRequest.init(T, maxslot + 1'u64, sr.lastSlot()) - sq.debtsQueue.push(sr2) - sq.debtsCount = sq.debtsCount - sr1.count - sq.makePending(sr1) - return sr1 - else: - if maxSlot < sq.inpSlot: - return SyncRequest.empty(T) - - if sq.inpSlot > sq.lastSlot: - return SyncRequest.empty(T) - - let lastSlot = min(maxslot, sq.lastSlot) - let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot) - var sr = SyncRequest.init(T, sq.inpSlot, count, item) - sq.inpSlot = sq.inpSlot + count - sq.makePending(sr) - return sr - -proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} = - ## Returns number of slots left in queue ``sq``. - if sq.inpSlot > sq.lastSlot: - result = sq.debtsCount - else: - result = sq.lastSlot - sq.inpSlot + 1'u64 - sq.debtsCount - -proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} = - ## Returns total number of slots in queue ``sq``. - sq.lastSlot - sq.startSlot + 1'u64 - -proc progress*[T](sq: SyncQueue[T]): uint64 = - ## Returns queue's ``sq`` progress string. - let curSlot = sq.outSlot - sq.startSlot - (curSlot * 100'u64) div sq.total() - proc now*(sm: typedesc[SyncMoment], slot: Slot): SyncMoment {.inline.} = SyncMoment(stamp: now(chronos.Moment), slot: slot) @@ -734,8 +121,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], rangeAge = uint64(SLOTS_PER_EPOCH * 4) ): SyncManager[A, B] = - let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(), - chunkSize, getFinalizedSlotCb, blockProcessor, 1) + let queue = SyncQueue.init(A, SyncQueueKind.Forward, getLocalHeadSlotCb(), + getLocalWallSlotCb(), chunkSize, + getFinalizedSlotCb, blockProcessor, 1) result = SyncManager[A, B]( pool: pool, @@ -955,7 +343,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = local_head_slot = headSlot, remote_head_slot = peerSlot, queue_input_slot = man.queue.inpSlot, queue_output_slot = man.queue.outSlot, - queue_last_slot = man.queue.lastSlot, + queue_last_slot = man.queue.finalSlot, peer_speed = peer.netKbps(), peer_score = peer.getScore(), index = index, topics = "syncman" await sleepAsync(RESP_TIMEOUT) @@ -1176,7 +564,8 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = if not(man.notInSyncEvent.isSet()): # We get here only if we lost sync for more then `maxHeadAge` period. if pending == 0: - man.queue = SyncQueue.init(A, man.getLocalHeadSlot(), + man.queue = SyncQueue.init(A, SyncQueueKind.Forward, + man.getLocalHeadSlot(), man.getLocalWallSlot(), man.chunkSize, man.getFinalizedSlot, man.blockProcessor, 1) diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim new file mode 100644 index 000000000..183e71b00 --- /dev/null +++ b/beacon_chain/sync/sync_queue.nim @@ -0,0 +1,783 @@ +# beacon_chain +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import std/[options, heapqueue, tables, strutils, sequtils, math, algorithm] +import stew/results, chronos, chronicles +import + ../spec/datatypes/[base, phase0, altair, merge], + ../spec/eth2_apis/rpc_types, + ../spec/[helpers, forks], + ../networking/[peer_pool, eth2_network], + ../gossip_processing/block_processor, + ../consensus_object_pools/block_pools_types, + ./peer_scores + +export base, phase0, altair, merge, chronos, chronicles, results, + block_pools_types, helpers, peer_scores + +logScope: + topics = "syncqueue" + +type + GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} + + SyncQueueKind* {.pure.} = enum + Forward, Backward + + SyncRequest*[T] = object + kind: SyncQueueKind + index*: uint64 + slot*: Slot + count*: uint64 + step*: uint64 + item*: T + + SyncResult*[T] = object + request*: SyncRequest[T] + data*: seq[ForkedSignedBeaconBlock] + + SyncWaiter*[T] = object + future: Future[bool] + request: SyncRequest[T] + + RewindPoint = object + failSlot: Slot + epochCount: uint64 + + SyncQueue*[T] = ref object + kind*: SyncQueueKind + inpSlot*: Slot + outSlot*: Slot + startSlot*: Slot + finalSlot*: Slot + chunkSize*: uint64 + queueSize*: int + counter*: uint64 + opcounter*: uint64 + pending*: Table[uint64, SyncRequest[T]] + waiters: seq[SyncWaiter[T]] + getSafeSlot*: GetSlotCallback + debtsQueue: HeapQueue[SyncRequest[T]] + debtsCount: uint64 + readyQueue: HeapQueue[SyncResult[T]] + rewind: Option[RewindPoint] + blockProcessor: ref BlockProcessor + + SyncManagerError* = object of CatchableError + BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]] + +proc validate*[T](sq: SyncQueue[T], + blk: ForkedSignedBeaconBlock + ): Future[Result[void, BlockError]] = + let resfut = newFuture[Result[void, BlockError]]("sync.manager.validate") + sq.blockProcessor[].addBlock(blk, resfut) + resfut + +proc getShortMap*[T](req: SyncRequest[T], + data: openArray[ForkedSignedBeaconBlock]): string = + ## Returns all slot numbers in ``data`` as placement map. + var res = newStringOfCap(req.count) + var slider = req.slot + var last = 0 + for i in 0 ..< req.count: + if last < len(data): + for k in last ..< len(data): + if slider == data[k].slot: + res.add('x') + last = k + 1 + break + elif slider < data[k].slot: + res.add('.') + break + else: + res.add('.') + slider = slider + req.step + res + +proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} = + slot >= req.slot and slot < req.slot + req.count * req.step and + ((slot - req.slot) mod req.step == 0) + +proc cmp*[T](a, b: SyncRequest[T]): int = + cmp(uint64(a.slot), uint64(b.slot)) + +proc checkResponse*[T](req: SyncRequest[T], + data: openArray[ForkedSignedBeaconBlock]): bool = + if len(data) == 0: + # Impossible to verify empty response. + return true + + if uint64(len(data)) > req.count: + # Number of blocks in response should be less or equal to number of + # requested blocks. + return false + + var slot = req.slot + var rindex = 0'u64 + var dindex = 0 + + while (rindex < req.count) and (dindex < len(data)): + if slot < data[dindex].slot: + discard + elif slot == data[dindex].slot: + inc(dindex) + else: + return false + slot = slot + req.step + rindex = rindex + 1'u64 + + if dindex == len(data): + return true + else: + return false + +proc getFullMap*[T](req: SyncRequest[T], + data: openArray[ForkedSignedBeaconBlock]): string = + # Returns all slot numbers in ``data`` as comma-delimeted string. + mapIt(data, $it.message.slot).join(", ") + +proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, + finish: Slot, t2: typedesc[T]): SyncRequest[T] = + let count = finish - start + 1'u64 + SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64) + +proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, slot: Slot, + count: uint64, item: T): SyncRequest[T] = + SyncRequest[T](kind: kind, slot: slot, count: count, item: item, step: 1'u64) + +proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, + finish: Slot, item: T): SyncRequest[T] = + let count = finish - start + 1'u64 + SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64, item: item) + +proc empty*[T](t: typedesc[SyncRequest], kind: SyncQueueKind, + t2: typedesc[T]): SyncRequest[T] {.inline.} = + SyncRequest[T](kind: kind, step: 0'u64, count: 0'u64) + +proc setItem*[T](sr: var SyncRequest[T], item: T) = + sr.item = item + +proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} = + (sr.step == 0'u64) and (sr.count == 0'u64) + +proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], + queueKind: SyncQueueKind, + start, final: Slot, chunkSize: uint64, + getSafeSlotCb: GetSlotCallback, + blockProcessor: ref BlockProcessor, + syncQueueSize: int = -1): SyncQueue[T] = + ## Create new synchronization queue with parameters + ## + ## ``start`` and ``last`` are starting and finishing Slots. + ## + ## ``chunkSize`` maximum number of slots in one request. + ## + ## ``syncQueueSize`` maximum queue size for incoming data. + ## If ``syncQueueSize > 0`` queue will help to keep backpressure under + ## control. If ``syncQueueSize <= 0`` then queue size is unlimited (default). + + # SyncQueue is the core of sync manager, this data structure distributes + # requests to peers and manages responses from peers. + # + # Because SyncQueue is async data structure it manages backpressure and + # order of incoming responses and it also resolves "joker's" problem. + # + # Joker's problem + # + # According to current Ethereum2 network specification + # > Clients MUST respond with at least one block, if they have it and it + # > exists in the range. Clients MAY limit the number of blocks in the + # > response. + # + # Such rule can lead to very uncertain responses, for example let slots from + # 10 to 12 will be not empty. Client which follows specification can answer + # with any response from this list (X - block, `-` empty space): + # + # 1. X X X + # 2. - - X + # 3. - X - + # 4. - X X + # 5. X - - + # 6. X - X + # 7. X X - + # + # If peer answers with `1` everything will be fine and `block_pool` will be + # able to process all 3 blocks. In case of `2`, `3`, `4`, `6` - `block_pool` + # will fail immediately with chunk and report "parent is missing" error. + # But in case of `5` and `7` blocks will be processed by `block_pool` without + # any problems, however it will start producing problems right from this + # uncertain last slot. SyncQueue will start producing requests for next + # blocks, but all the responses from this point will fail with "parent is + # missing" error. Lets call such peers "jokers", because they are joking + # with responses. + # + # To fix "joker" problem we going to perform rollback to the latest finalized + # epoch's first slot. + doAssert(chunkSize > 0'u64, "Chunk size should not be zero") + SyncQueue[T]( + kind: queueKind, + startSlot: start, + finalSlot: final, + chunkSize: chunkSize, + queueSize: syncQueueSize, + getSafeSlot: getSafeSlotCb, + waiters: newSeq[SyncWaiter[T]](), + counter: 1'u64, + pending: initTable[uint64, SyncRequest[T]](), + debtsQueue: initHeapQueue[SyncRequest[T]](), + inpSlot: start, + outSlot: start, + blockProcessor: blockProcessor + ) + +proc `<`*[T](a, b: SyncRequest[T]): bool = + doAssert(a.kind == b.kind) + case a.kind + of SyncQueueKind.Forward: + a.slot < b.slot + of SyncQueueKind.Backward: + a.slot > b.slot + +proc `<`*[T](a, b: SyncResult[T]): bool = + doAssert(a.request.kind == b.request.kind) + case a.request.kind + of SyncQueueKind.Forward: + a.request.slot < b.request.slot + of SyncQueueKind.Backward: + a.request.slot > b.request.slot + +proc `==`*[T](a, b: SyncRequest[T]): bool = + (a.kind == b.kind) and (a.slot == b.slot) and (a.count == b.count) and + (a.step == b.step) + +proc lastSlot*[T](req: SyncRequest[T]): Slot = + ## Returns last slot for request ``req``. + req.slot + req.count - 1'u64 + +proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) = + req.index = sq.counter + sq.counter = sq.counter + 1'u64 + sq.pending[req.index] = req + +proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} = + ## Update last slot stored in queue ``sq`` with value ``last``. + case sq.kind + of SyncQueueKind.Forward: + doAssert(sq.finalSlot <= last, + "Last slot could not be lower then stored one " & + $sq.finalSlot & " <= " & $last) + sq.finalSlot = last + of SyncQueueKind.Backward: + doAssert(sq.finalSlot >= last, + "Last slot could not be higher then stored one " & + $sq.finalSlot & " >= " & $last) + sq.finalSlot = last + +proc wakeupWaiters[T](sq: SyncQueue[T], flag = true) = + ## Wakeup one or all blocked waiters. + for item in sq.waiters: + if not(item.future.finished()): + item.future.complete(flag) + +proc waitForChanges[T](sq: SyncQueue[T], + req: SyncRequest[T]): Future[bool] {.async.} = + ## Create new waiter and wait for completion from `wakeupWaiters()`. + var waitfut = newFuture[bool]("SyncQueue.waitForChanges") + let waititem = SyncWaiter[T](future: waitfut, request: req) + sq.waiters.add(waititem) + try: + let res = await waitfut + return res + finally: + sq.waiters.delete(sq.waiters.find(waititem)) + +proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = + ## This procedure will perform wakeupWaiters(false) and blocks until last + ## waiter will be awakened. + var waitChanges = sq.waitForChanges(SyncRequest.empty(sq.kind, T)) + sq.wakeupWaiters(false) + discard await waitChanges + +proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = + ## Perform reset of all the blocked waiters in SyncQueue. + ## + ## We adding one more waiter to the waiters sequence and + ## call wakeupWaiters(false). Because our waiter is last in sequence of + ## waiters it will be resumed only after all waiters will be awakened and + ## finished. + + # We are clearing pending list, so that all requests that are still running + # around (still downloading, but not yet pushed to the SyncQueue) will be + # expired. Its important to perform this call first (before await), otherwise + # you can introduce race problem. + sq.pending.clear() + + # We calculating minimal slot number to which we will be able to reset, + # without missing any blocks. There 3 sources: + # 1. Debts queue. + # 2. Processing queue (`inpSlot`, `outSlot`). + # 3. Requested slot `toSlot`. + # + # Queue's `outSlot` is the lowest slot we added to `block_pool`, but + # `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not + # added slot requests, so it can't be bigger then `outSlot` value. + let minSlot = + case sq.kind + of SyncQueueKind.Forward: + if toSlot.isSome(): + min(toSlot.get(), sq.outSlot) + else: + sq.outSlot + of SyncQueueKind.Backward: + if toSlot.isSome(): + toSlot.get() + else: + sq.outSlot + sq.debtsQueue.clear() + sq.debtsCount = 0 + sq.readyQueue.clear() + sq.inpSlot = minSlot + sq.outSlot = minSlot + # We are going to wakeup all the waiters and wait for last one. + await sq.wakeupAndWaitWaiters() + +proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} = + ## Returns ``true`` if response chain of blocks is empty (has only empty + ## slots). + len(sr.data) == 0 + +proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} = + ## Returns ``true`` if response chain of blocks has gap at the end. + let lastslot = sr.request.slot + sr.request.count - 1'u64 + if len(sr.data) == 0: + return true + if sr.data[^1].slot != lastslot: + return true + return false + +proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} = + ## Returns last non-empty slot from result ``sr``. If response has only + ## empty slots, original request slot will be returned. + if len(sr.data) == 0: + # If response has only empty slots we going to use original request slot + sr.request.slot + else: + sr.data[^1].slot + +proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) = + sq.debtsQueue.push(sr) + sq.debtsCount = sq.debtsCount + sr.count + +proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, + safeSlot: Slot): Slot = + case sq.kind + of SyncQueueKind.Forward: + # Calculate the latest finalized epoch. + let finalizedEpoch = compute_epoch_at_slot(safeSlot) + + # Calculate failure epoch. + let failEpoch = compute_epoch_at_slot(failSlot) + + # Calculate exponential rewind point in number of epochs. + let epochCount = + if sq.rewind.isSome(): + let rewind = sq.rewind.get() + if failSlot == rewind.failSlot: + # `MissingParent` happened at same slot so we increase rewind point by + # factor of 2. + if failEpoch > finalizedEpoch: + let rewindPoint = rewind.epochCount shl 1 + if rewindPoint < rewind.epochCount: + # If exponential rewind point produces `uint64` overflow we will + # make rewind to latest finalized epoch. + failEpoch - finalizedEpoch + else: + if (failEpoch < rewindPoint) or + (failEpoch - rewindPoint < finalizedEpoch): + # If exponential rewind point points to position which is far + # behind latest finalized epoch. + failEpoch - finalizedEpoch + else: + rewindPoint + else: + warn "Trying to rewind over the last finalized epoch", + finalized_slot = safeSlot, fail_slot = failSlot, + finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, + rewind_epoch_count = rewind.epochCount, + finalized_epoch = finalizedEpoch + 0'u64 + else: + # `MissingParent` happened at different slot so we going to rewind for + # 1 epoch only. + if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch): + warn "Сould not rewind further than the last finalized epoch", + finalized_slot = safeSlot, fail_slot = failSlot, + finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, + rewind_epoch_count = rewind.epochCount, + finalized_epoch = finalizedEpoch + 0'u64 + else: + 1'u64 + else: + # `MissingParent` happened first time. + if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch): + warn "Сould not rewind further than the last finalized epoch", + finalized_slot = safeSlot, fail_slot = failSlot, + finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, + finalized_epoch = finalizedEpoch + 0'u64 + else: + 1'u64 + + if epochCount == 0'u64: + warn "Unable to continue syncing, please restart the node", + finalized_slot = safeSlot, fail_slot = failSlot, + finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, + finalized_epoch = finalizedEpoch + # Calculate the rewind epoch, which will be equal to last rewind point or + # finalizedEpoch + let rewindEpoch = + if sq.rewind.isNone(): + finalizedEpoch + else: + compute_epoch_at_slot(sq.rewind.get().failSlot) - + sq.rewind.get().epochCount + compute_start_slot_at_epoch(rewindEpoch) + else: + # Calculate the rewind epoch, which should not be less than the latest + # finalized epoch. + let rewindEpoch = failEpoch - epochCount + # Update and save new rewind point in SyncQueue. + sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount)) + compute_start_slot_at_epoch(rewindEpoch) + of SyncQueueKind.Backward: + # While we perform backward sync, the only possible slot we could rewind is + # latest stored block. + if failSlot == safeSlot: + warn "Unable to continue syncing, please restart the node", + safe_slot = safeSlot, fail_slot = failSlot + safeSlot + +iterator blocks*[T](sq: SyncQueue[T], + sr: SyncResult[T]): ForkedSignedBeaconBlock = + case sq.kind + of SyncQueueKind.Forward: + for i in countup(0, len(sr.data) - 1): + yield sr.data[i] + of SyncQueueKind.Backward: + for i in countdown(len(sr.data) - 1, 0): + yield sr.data[i] + +proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) = + case sq.kind + of SyncQueueKind.Forward: + sq.outSlot = sq.outSlot + number + of SyncQueueKind.Backward: + sq.outSlot = sq.outSlot - number + +proc advanceInput[T](sq: SyncQueue[T], number: uint64) = + case sq.kind + of SyncQueueKind.Forward: + sq.inpSlot = sq.inpSlot + number + of SyncQueueKind.Backward: + sq.inpSlot = sq.inpSlot - number + +proc notInRange[T](sq: SyncQueue[T], slot: Slot): bool = + case sq.kind + of SyncQueueKind.Forward: + (sq.queueSize > 0) and + (slot >= sq.outSlot + uint64(sq.queueSize) * sq.chunkSize) + of SyncQueueKind.Backward: + (sq.queueSize > 0) and + (uint64(sq.queueSize) * sq.chunkSize <= sq.outSlot - slot) + +proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], + data: seq[ForkedSignedBeaconBlock]) {.async, gcsafe.} = + ## Push successful result to queue ``sq``. + mixin updateScore + + if sr.index notin sq.pending: + # If request `sr` not in our pending list, it only means that + # SyncQueue.resetWait() happens and all pending requests are expired, so + # we swallow `old` requests, and in such way sync-workers are able to get + # proper new requests from SyncQueue. + return + + sq.pending.del(sr.index) + + # This is backpressure handling algorithm, this algorithm is blocking + # all pending `push` requests if `request.slot` not in range: + # [current_queue_slot, current_queue_slot + sq.queueSize * sq.chunkSize]. + var exitNow = false + while true: + if sq.notInRange(sr.slot): + let res = await sq.waitForChanges(sr) + if res: + continue + else: + # SyncQueue reset happens. We are exiting to wake up sync-worker. + exitNow = true + break + let syncres = SyncResult[T](request: sr, data: data) + sq.readyQueue.push(syncres) + exitNow = false + break + + if exitNow: + return + + while len(sq.readyQueue) > 0: + let reqres = + case sq.kind + of SyncQueueKind.Forward: + let minSlot = sq.readyQueue[0].request.slot + if sq.outSlot != minSlot: + none[SyncResult[T]]() + else: + some(sq.readyQueue.pop()) + of SyncQueueKind.Backward: + let maxSlot = sq.readyQueue[0].request.slot + + (sq.readyQueue[0].request.count - 1'u64) + if sq.outSlot != maxSlot: + none[SyncResult[T]]() + else: + some(sq.readyQueue.pop()) + + let item = + if reqres.isSome(): + reqres.get() + else: + let rewindSlot = sq.getRewindPoint(sq.outSlot, sq.getSafeSlot()) + warn "Got incorrect sync result in queue, rewind happens", + request_slot = sq.readyQueue[0].request.slot, + request_count = sq.readyQueue[0].request.count, + request_step = sq.readyQueue[0].request.step, + blocks_map = getShortMap(sq.readyQueue[0].request, + sq.readyQueue[0].data), + blocks_count = len(sq.readyQueue[0].data), + output_slot = sq.outSlot, input_slot = sq.inpSlot, + peer = sq.readyQueue[0].request.item, rewind_to_slot = rewindSlot, + topics = "syncman" + await sq.resetWait(some(rewindSlot)) + break + + # Validating received blocks one by one + var res: Result[void, BlockError] + var failSlot: Option[Slot] + if len(item.data) > 0: + for blk in sq.blocks(item): + trace "Pushing block", block_root = blk.root, + block_slot = blk.slot + res = await sq.validate(blk) + if res.isErr(): + failSlot = some(blk.slot) + break + else: + res = Result[void, BlockError].ok() + + # Increase progress counter, so watch task will be able to know that we are + # not stuck. + inc(sq.opcounter) + + if res.isOk(): + sq.advanceOutput(item.request.count) + if len(item.data) > 0: + # If there no error and response was not empty we should reward peer + # with some bonus score. + item.request.item.updateScore(PeerScoreGoodBlocks) + sq.wakeupWaiters() + else: + debug "Block pool rejected peer's response", peer = item.request.item, + request_slot = item.request.slot, + request_count = item.request.count, + request_step = item.request.step, + blocks_map = getShortMap(item.request, item.data), + blocks_count = len(item.data), errCode = res.error, + topics = "syncman" + + var resetSlot: Option[Slot] + + if res.error == BlockError.MissingParent: + # If we got `BlockError.MissingParent` it means that peer returns chain + # of blocks with holes or `block_pool` is in incomplete state. We going + # to rewind to the first slot at latest finalized epoch. + let + req = item.request + safeSlot = sq.getSafeSlot() + case sq.kind + of SyncQueueKind.Forward: + if safeSlot < req.slot: + let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot) + warn "Unexpected missing parent, rewind happens", + peer = req.item, rewind_to_slot = rewindSlot, + rewind_epoch_count = sq.rewind.get().epochCount, + rewind_fail_slot = failSlot.get(), + finalized_slot = safeSlot, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data), topics = "syncman" + resetSlot = some(rewindSlot) + req.item.updateScore(PeerScoreMissingBlocks) + else: + error "Unexpected missing parent at finalized epoch slot", + peer = req.item, to_slot = safeSlot, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data), topics = "syncman" + req.item.updateScore(PeerScoreBadBlocks) + of SyncQueueKind.Backward: + if safeSlot > req.slot: + let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot) + warn "Unexpected missing parent, rewind happens", + peer = req.item, rewind_to_slot = rewindSlot, + rewind_fail_slot = failSlot.get(), + finalized_slot = safeSlot, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data), topics = "syncman" + resetSlot = some(rewindSlot) + req.item.updateScore(PeerScoreMissingBlocks) + else: + error "Unexpected missing parent at safe slot", + peer = req.item, to_slot = safeSlot, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data), topics = "syncman" + req.item.updateScore(PeerScoreBadBlocks) + elif res.error == BlockError.Invalid: + let req = item.request + warn "Received invalid sequence of blocks", peer = req.item, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data), topics = "syncman" + req.item.updateScore(PeerScoreBadBlocks) + else: + let req = item.request + warn "Received unexpected response from block_pool", peer = req.item, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data), errorCode = res.error, + topics = "syncman" + req.item.updateScore(PeerScoreBadBlocks) + + # We need to move failed response to the debts queue. + sq.toDebtsQueue(item.request) + if resetSlot.isSome(): + await sq.resetWait(resetSlot) + case sq.kind + of SyncQueueKind.Forward: + debug "Rewind to slot was happened", reset_slot = reset_slot.get(), + queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, + rewind_epoch_count = sq.rewind.get().epochCount, + rewind_fail_slot = sq.rewind.get().failSlot, + reset_slot = resetSlot, topics = "syncman" + of SyncQueueKind.Backward: + debug "Rewind to slot was happened", reset_slot = reset_slot.get(), + queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, + reset_slot = resetSlot, topics = "syncman" + break + +proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) = + ## Push failed request back to queue. + if sr.index notin sq.pending: + # If request `sr` not in our pending list, it only means that + # SyncQueue.resetWait() happens and all pending requests are expired, so + # we swallow `old` requests, and in such way sync-workers are able to get + # proper new requests from SyncQueue. + return + sq.pending.del(sr.index) + sq.toDebtsQueue(sr) + +proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] = + ## Create new request according to current SyncQueue parameters. + if len(sq.debtsQueue) > 0: + if maxSlot < sq.debtsQueue[0].slot: + # Peer's latest slot is less than starting request's slot. + return SyncRequest.empty(sq.kind, T) + if maxSlot < sq.debtsQueue[0].lastSlot(): + # Peer's latest slot is less than finishing request's slot. + return SyncRequest.empty(sq.kind, T) + var sr = sq.debtsQueue.pop() + sq.debtsCount = sq.debtsCount - sr.count + sr.setItem(item) + sq.makePending(sr) + sr + else: + case sq.kind + of SyncQueueKind.Forward: + if maxSlot < sq.inpSlot: + # Peer's latest slot is less than queue's input slot. + return SyncRequest.empty(sq.kind, T) + if sq.inpSlot > sq.finalSlot: + # Queue's input slot is bigger than queue's final slot. + return SyncRequest.empty(sq.kind, T) + let lastSlot = min(maxslot, sq.finalSlot) + let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot) + var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item) + sq.advanceInput(count) + sq.makePending(sr) + sr + of SyncQueueKind.Backward: + if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64: + return SyncRequest.empty(sq.kind, T) + if sq.inpSlot < sq.finalSlot: + return SyncRequest.empty(sq.kind, T) + let (slot, count) = + block: + let baseSlot = sq.inpSlot + 1'u64 + if baseSlot - sq.finalSlot < sq.chunkSize: + let count = uint64(baseSlot - sq.finalSlot) + (baseSlot - count, count) + else: + (baseSlot - sq.chunkSize, sq.chunkSize) + if (maxSlot + 1'u64) < slot + count: + # Peer's latest slot is less than queue's input slot. + return SyncRequest.empty(sq.kind, T) + var sr = SyncRequest.init(sq.kind, slot, count, item) + sq.advanceInput(count) + sq.makePending(sr) + sr + +proc debtLen*[T](sq: SyncQueue[T]): uint64 = + sq.debtsCount + +proc pendingLen*[T](sq: SyncQueue[T]): uint64 = + case sq.kind + of SyncQueueKind.Forward: + # When moving forward `outSlot` will be <= of `inpSlot`. + sq.inpSlot - sq.outSlot + of SyncQueueKind.Backward: + # When moving backward `outSlot` will be >= of `inpSlot` + sq.outSlot - sq.inpSlot + +proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} = + ## Returns number of slots left in queue ``sq``. + case sq.kind + of SyncQueueKind.Forward: + sq.finalSlot + 1'u64 - sq.outSlot + of SyncQueueKind.Backward: + sq.outSlot + 1'u64 - sq.finalSlot + +proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} = + ## Returns total number of slots in queue ``sq``. + case sq.kind + of SyncQueueKind.Forward: + sq.finalSlot + 1'u64 - sq.startSlot + of SyncQueueKind.Backward: + sq.startSlot + 1'u64 - sq.finalSlot + +proc progress*[T](sq: SyncQueue[T]): uint64 = + ## Returns queue's ``sq`` progress string. + let curSlot = + case sq.kind + of SyncQueueKind.Forward: + sq.outSlot - sq.startSlot + of SyncQueueKind.Backward: + sq.startSlot - sq.outSlot + (curSlot * 100'u64) div sq.total() diff --git a/docs/block_flow.dot b/docs/block_flow.dot index bb49c154e..e3db8f138 100644 --- a/docs/block_flow.dot +++ b/docs/block_flow.dot @@ -14,14 +14,14 @@ digraph architecture{ Eth2RPC -> SyncProtocol [dir=both] SyncProtocol -> SyncManager [dir=both, label="beaconBlocksByRange() (mixin)"] - GossipSub -> Eth2Processor [label="node.topicBeaconBlocks: blockValidator->validateBeaconBlock (no transition or signature check yet)\nthen enqueued in blocksQueue"]; + GossipSub -> Eth2Processor [label="node.topicBeaconBlocks: blockValidator->validateBeaconBlock (no transition or signature check yet)\nthen enqueued in blockQueue"]; GossipSub -> Eth2Processor [dir=back, label="node.topicBeaconBlocks: blockValidator()->ValidationResult.Accept->libp2p/gossipsub.nim\nvalidate() in rpcHandler()"]; Eth2Processor -> Clearance [label="storeBlock(): enqueue in clearance/quarantine and callback to fork choice"]; SyncProtocol -> RequestManager [dir=both, label="fetchAncestorBlocksFromNetwork()"]; - SyncManager -> SharedBlockQueue [dir=both, label="Eth2Processor.blocksQueue\n== SyncManager.outQueue (shared state!)"]; - Eth2Processor -> SharedBlockQueue [dir=both, label="Eth2Processor.blocksQueue\n== RequestManager.outQueue (shared state!)"]; + SyncManager -> SharedBlockQueue [dir=both, label="Eth2Processor.blockQueue\n== SyncManager.outQueue (shared state!)"]; + Eth2Processor -> SharedBlockQueue [dir=both, label="Eth2Processor.blockQueue\n== RequestManager.outQueue (shared state!)"]; SharedBlockQueue -> RequestManager [dir=both, label="SyncManager.outQueue\n== RequestManager.outQueue (shared state!)"]; LocalValidatorDuties -> Clearance diff --git a/docs/block_flow.md b/docs/block_flow.md index 0e18a7958..1cffcdcb9 100644 --- a/docs/block_flow.md +++ b/docs/block_flow.md @@ -94,7 +94,7 @@ How the various modules interact with block is described in a diagram: ![./block_flow.png](./block_flow.png) It is important to note that 3 data structures are sharing the same `AsyncQueue[BlockEntry]`: -- Eth2Processor.blocksQueue +- Eth2Processor.blockQueue - SyncManager.outQueue - RequestManager.outQueue diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index b217cf867..a7d49ac42 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -20,6 +20,9 @@ proc updateScore(peer: SomeTPeer, score: int) = proc getFirstSlotAtFinalizedEpoch(): Slot = Slot(0) +proc getSafeSlot(): Slot = + Slot(1024) + proc newBlockProcessor(): ref BlockProcessor = # Minimal block processor for test - the real block processor has an unbounded # queue but the tests here @@ -31,369 +34,617 @@ suite "SyncManager test suite": proc createChain(start, finish: Slot): seq[ForkedSignedBeaconBlock] = doAssert(start <= finish) let count = int(finish - start + 1'u64) - result = newSeq[ForkedSignedBeaconBlock](count) + var res = newSeq[ForkedSignedBeaconBlock](count) var curslot = start - for item in result.mitems(): + for item in res.mitems(): item.phase0Data.message.slot = curslot curslot = curslot + 1'u64 + res - test "[SyncQueue] Start and finish slots equal": + proc getSlice(chain: openarray[ForkedSignedBeaconBlock], startSlot: Slot, + request: SyncRequest[SomeTPeer]): seq[ForkedSignedBeaconBlock] = + let + startIndex = int(request.slot - startSlot) + finishIndex = int(request.slot - startSlot) + int(request.count) - 1 + @chain[startIndex..finishIndex] + + template startAndFinishSlotsEqual(kind: SyncQueueKind) = let p1 = SomeTPeer() let aq = newBlockProcessor() - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64, + var queue = SyncQueue.init(SomeTPeer, kind, + Slot(0), Slot(0), 1'u64, getFirstSlotAtFinalizedEpoch, aq) - check len(queue) == 1 + check: + len(queue) == 1 + pendingLen(queue) == 0 + debtLen(queue) == 0 var r11 = queue.pop(Slot(0), p1) - check len(queue) == 0 + check: + len(queue) == 1 + pendingLen(queue) == 1 + debtLen(queue) == 0 queue.push(r11) - check len(queue) == 1 + check: + pendingLen(queue) == 1 + len(queue) == 1 + debtLen(queue) == 1 var r11e = queue.pop(Slot(0), p1) check: - len(queue) == 0 + len(queue) == 1 + pendingLen(queue) == 1 + debtLen(queue) == 0 r11e == r11 r11.item == p1 r11e.item == r11.item r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64 - test "[SyncQueue] Two full requests success/fail": + template passThroughLimitsTest(kind: SyncQueueKind) = + let + p1 = SomeTPeer() + p2 = SomeTPeer() + + let Checks = + case kind + of SyncQueueKind.Forward: + @[ + # Tests with zero start. + (Slot(0), Slot(0), 1'u64, (Slot(0), 1'u64), + 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), + (Slot(0), Slot(0), 16'u64, (Slot(0), 1'u64), + 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), + (Slot(0), Slot(1), 2'u64, (Slot(0), 2'u64), + 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), + (Slot(0), Slot(1), 16'u64, (Slot(0), 2'u64), + 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), + (Slot(0), Slot(15), 16'u64, (Slot(0), 16'u64), + 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), + (Slot(0), Slot(15), 32'u64, (Slot(0), 16'u64), + 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), + # Tests with non-zero start. + (Slot(1021), Slot(1021), 1'u64, (Slot(1021), 1'u64), + 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), + (Slot(1021), Slot(1021), 16'u64, (Slot(1021), 1'u64), + 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), + (Slot(1021), Slot(1022), 2'u64, (Slot(1021), 2'u64), + 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), + (Slot(1021), Slot(1022), 16'u64, (Slot(1021), 2'u64), + 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), + (Slot(1021), Slot(1036), 16'u64, (Slot(1021), 16'u64), + 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), + (Slot(1021), Slot(1036), 32'u64, (Slot(1021), 16'u64), + 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), + ] + of SyncQueueKind.Backward: + @[ + # Tests with zero finish. + (Slot(0), Slot(0), 1'u64, (Slot(0), 1'u64), + 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), + (Slot(0), Slot(0), 16'u64, (Slot(0), 1'u64), + 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), + (Slot(1), Slot(0), 2'u64, (Slot(0), 2'u64), + 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), + (Slot(1), Slot(0), 16'u64, (Slot(0), 2'u64), + 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), + (Slot(15), Slot(0), 16'u64, (Slot(0), 16'u64), + 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), + (Slot(15), Slot(0), 32'u64, (Slot(0), 16'u64), + 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), + # Tests with non-zero finish. + (Slot(1021), Slot(1021), 1'u64, (Slot(1021), 1'u64), + 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), + (Slot(1021), Slot(1021), 16'u64, (Slot(1021), 1'u64), + 1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64), + (Slot(1022), Slot(1021), 2'u64, (Slot(1021), 2'u64), + 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), + (Slot(1022), Slot(1021), 16'u64, (Slot(1021), 2'u64), + 2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64), + (Slot(1036), Slot(1021), 16'u64, (Slot(1021), 16'u64), + 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), + (Slot(1036), Slot(1021), 32'u64, (Slot(1021), 16'u64), + 16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64), + ] + + for item in Checks: + let aq = newBlockProcessor() + var queue = SyncQueue.init(SomeTPeer, kind, + item[0], item[1], item[2], + getFirstSlotAtFinalizedEpoch, aq) + check: + len(queue) == item[4] + pendingLen(queue) == item[5] + debtLen(queue) == item[6] + var req1 = queue.pop(max(item[0], item[1]), p1) + check: + len(queue) == item[7] + pendingLen(queue) == item[8] + debtLen(queue) == item[9] + var req2 = queue.pop(max(item[0], item[1]), p2) + check: + req1.isEmpty() == false + req1.slot == item[3][0] + req1.count == item[3][1] + req1.step == 1'u64 + req2.isEmpty() == true + + template twoFullRequests(kkind: SyncQueueKind) = let aq = newBlockProcessor() - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64, - getFirstSlotAtFinalizedEpoch, aq) + var queue = + case kkind + of SyncQueueKind.Forward: + SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + Slot(0), Slot(1), 1'u64, + getFirstSlotAtFinalizedEpoch, aq) + of SyncQueueKind.Backward: + SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, + Slot(1), Slot(0), 1'u64, + getFirstSlotAtFinalizedEpoch, aq) + let p1 = SomeTPeer() let p2 = SomeTPeer() - check len(queue) == 2 + check: + len(queue) == 2 + pendingLen(queue) == 0 + debtLen(queue) == 0 var r21 = queue.pop(Slot(1), p1) - check len(queue) == 1 + check: + len(queue) == 2 + pendingLen(queue) == 1 + debtLen(queue) == 0 var r22 = queue.pop(Slot(1), p2) - check len(queue) == 0 + check: + len(queue) == 2 + pendingLen(queue) == 2 + debtLen(queue) == 0 queue.push(r22) - check len(queue) == 1 + check: + len(queue) == 2 + pendingLen(queue) == 2 + debtLen(queue) == 1 queue.push(r21) - check len(queue) == 2 + check: + len(queue) == 2 + pendingLen(queue) == 2 + debtLen(queue) == 2 var r21e = queue.pop(Slot(1), p1) - check len(queue) == 1 + check: + len(queue) == 2 + pendingLen(queue) == 2 + debtLen(queue) == 1 var r22e = queue.pop(Slot(1), p2) check: - len(queue) == 0 + len(queue) == 2 + pendingLen(queue) == 2 + debtLen(queue) == 0 r21 == r21e r22 == r22e r21.item == p1 r22.item == p2 r21.item == r21e.item r22.item == r22e.item - r21.slot == Slot(0) and r21.count == 1'u64 and r21.step == 1'u64 - r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64 + case kkind + of SyncQueueKind.Forward: + check: + r21.slot == Slot(0) and r21.count == 1'u64 and r21.step == 1'u64 + r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64 + of SyncQueueKind.Backward: + check: + r21.slot == Slot(1) and r21.count == 1'u64 and r21.step == 1'u64 + r22.slot == Slot(0) and r22.count == 1'u64 and r22.step == 1'u64 + + template smokeTest(kkind: SyncQueueKind, start, finish: Slot, + chunkSize: uint64) = + let + aq = newBlockProcessor() + + var counter = + case kkind + of SyncQueueKind.Forward: + int(start) + of SyncQueueKind.Backward: + int(finish) + + proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blck.slot == Slot(counter): + sblock.done() + else: + sblock.fail(BlockError.Invalid) + dec(counter) + + proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blck.slot == Slot(counter): + inc(counter) + sblock.done() + else: + sblock.fail(BlockError.Invalid) + + var + queue = + case kkind + of SyncQueueKind.Forward: + SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + start, finish, chunkSize, + getFirstSlotAtFinalizedEpoch, aq) + of SyncQueueKind.Backward: + SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, + finish, start, chunkSize, + getFirstSlotAtFinalizedEpoch, aq) + chain = createChain(start, finish) + validatorFut = + case kkind + of SyncQueueKind.Forward: + forwardValidator(aq[].blockQueue) + of SyncQueueKind.Backward: + backwardValidator(aq[].blockQueue) - test "[SyncQueue] Full and incomplete success/fail start from zero": - let aq = newBlockProcessor() - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64, - getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() - let p2 = SomeTPeer() - let p3 = SomeTPeer() - check len(queue) == 5 - var r31 = queue.pop(Slot(4), p1) - check len(queue) == 3 - var r32 = queue.pop(Slot(4), p2) - check len(queue) == 1 - var r33 = queue.pop(Slot(4), p3) - check len(queue) == 0 - queue.push(r33) - check len(queue) == 1 - queue.push(r32) - check len(queue) == 3 - queue.push(r31) - check len(queue) == 5 - var r31e = queue.pop(Slot(4), p1) - check len(queue) == 3 - var r32e = queue.pop(Slot(4), p2) - check len(queue) == 1 - var r33e = queue.pop(Slot(4), p3) - check: - len(queue) == 0 - r31 == r31e - r32 == r32e - r33 == r33e - r31.item == r31e.item - r32.item == r32e.item - r33.item == r33e.item - r31.item == p1 - r32.item == p2 - r33.item == p3 - r31.slot == Slot(0) and r31.count == 2'u64 and r31.step == 1'u64 - r32.slot == Slot(2) and r32.count == 2'u64 and r32.step == 1'u64 - r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64 - test "[SyncQueue] Full and incomplete success/fail start from non-zero": - let aq = newBlockProcessor() - var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64, - getFirstSlotAtFinalizedEpoch, aq) - let p1 = SomeTPeer() - let p2 = SomeTPeer() - check len(queue) == 5 - var r41 = queue.pop(Slot(5), p1) - check len(queue) == 2 - var r42 = queue.pop(Slot(5), p2) - check len(queue) == 0 - queue.push(r42) - check len(queue) == 2 - queue.push(r41) - check len(queue) == 5 - var r41e = queue.pop(Slot(5), p1) - check len(queue) == 2 - var r42e = queue.pop(Slot(5), p2) - check: - len(queue) == 0 - r41 == r41e - r42 == r42e - r41.item == r41e.item - r42.item == r42e.item - r41.item == p1 - r42.item == p2 - r41.slot == Slot(1) and r41.count == 3'u64 and r41.step == 1'u64 - r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64 + proc runSmokeTest() {.async.} = + while true: + var request = queue.pop(finish, p1) + if request.isEmpty(): + break + await queue.push(request, getSlice(chain, start, request)) + await validatorFut.cancelAndWait() - test "[SyncQueue] Smart and stupid success/fail": - let aq = newBlockProcessor() - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, - getFirstSlotAtFinalizedEpoch, aq) - let p1 = SomeTPeer() - let p2 = SomeTPeer() - check len(queue) == 5 - var r51 = queue.pop(Slot(3), p1) - check len(queue) == 1 - var r52 = queue.pop(Slot(4), p2) - check len(queue) == 0 - queue.push(r52) - check len(queue) == 1 - queue.push(r51) - check len(queue) == 5 - var r51e = queue.pop(Slot(3), p1) - check len(queue) == 1 - var r52e = queue.pop(Slot(4), p2) - check: - len(queue) == 0 - r51 == r51e - r52 == r52e - r51.item == r51e.item - r52.item == r52e.item - r51.item == p1 - r52.item == p2 - r51.slot == Slot(0) and r51.count == 4'u64 and r51.step == 1'u64 - r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64 + waitFor runSmokeTest() + case kkind + of SyncQueueKind.Forward: + check (counter - 1) == int(finish) + of SyncQueueKind.Backward: + check (counter + 1) == int(start) - test "[SyncQueue] One smart and one stupid + debt split + empty": - let aq = newBlockProcessor() - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, - getFirstSlotAtFinalizedEpoch, aq) - let p1 = SomeTPeer() - let p2 = SomeTPeer() - let p3 = SomeTPeer() - let p4 = SomeTPeer() - check len(queue) == 5 - var r61 = queue.pop(Slot(4), p1) - check len(queue) == 0 - queue.push(r61) - var r61e = queue.pop(Slot(2), p1) - check len(queue) == 2 - var r62e = queue.pop(Slot(2), p2) - check len(queue) == 2 - check r62e.isEmpty() - var r63e = queue.pop(Slot(3), p3) - check len(queue) == 1 - var r64e = queue.pop(Slot(4), p4) - check: - len(queue) == 0 - r61.slot == Slot(0) and r61.count == 5'u64 and r61.step == 1'u64 - r61e.slot == Slot(0) and r61e.count == 3'u64 and r61e.step == 1'u64 - r62e.isEmpty() - r63e.slot == Slot(3) and r63e.count == 1'u64 and r63e.step == 1'u64 - r64e.slot == Slot(4) and r64e.count == 1'u64 and r64e.step == 1'u64 - r61.item == p1 - r61e.item == p1 - isNil(r62e.item) == true - r63e.item == p3 - r64e.item == p4 + template unorderedAsyncTest(kkind: SyncQueueKind, startSlot: Slot) = + let + aq = newBlockProcessor() + chunkSize = 3'u64 + numberOfChunks = 3'u64 + finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64) + queueSize = 1 - test "[SyncQueue] Async unordered push start from zero": - proc test(): Future[bool] {.async.} = - var counter = 0 + var counter = + case kkind + of SyncQueueKind.Forward: + int(startSlot) + of SyncQueueKind.Backward: + int(finishSlot) - proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - inc(counter) - sblock.done() - else: - sblock.fail(BlockError.Invalid) + proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blck.slot == Slot(counter): + sblock.done() + else: + sblock.fail(BlockError.Invalid) + dec(counter) - let aq = newBlockProcessor() - var chain = createChain(Slot(0), Slot(2)) - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64, - getFirstSlotAtFinalizedEpoch, aq, 1) + proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blck.slot == Slot(counter): + inc(counter) + sblock.done() + else: + sblock.fail(BlockError.Invalid) - var validatorFut = simpleValidator(aq[].blockQueue) - let p1 = SomeTPeer() - let p2 = SomeTPeer() - let p3 = SomeTPeer() - var r11 = queue.pop(Slot(2), p1) - var r12 = queue.pop(Slot(2), p2) - var r13 = queue.pop(Slot(2), p3) - var f13 = queue.push(r13, @[chain[2]]) - # + var + chain = createChain(startSlot, finishSlot) + queue = + case kkind + of SyncQueueKind.Forward: + SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + startSlot, finishSlot, chunkSize, + getFirstSlotAtFinalizedEpoch, aq, + queueSize) + of SyncQueueKind.Backward: + SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, + finishSlot, startSlot, chunkSize, + getFirstSlotAtFinalizedEpoch, aq, + queueSize) + validatorFut = + case kkind + of SyncQueueKind.Forward: + forwardValidator(aq[].blockQueue) + of SyncQueueKind.Backward: + backwardValidator(aq[].blockQueue) + + let + p1 = SomeTPeer() + p2 = SomeTPeer() + p3 = SomeTPeer() + + proc runTest(): Future[bool] {.async.} = + var r11 = queue.pop(finishSlot, p1) + var r12 = queue.pop(finishSlot, p2) + var r13 = queue.pop(finishSlot, p3) + + var f13 = queue.push(r13, chain.getSlice(startSlot, r13)) await sleepAsync(100.milliseconds) - # doAssert(f12.finished == false) - doAssert(f13.finished == false) - doAssert(counter == 0) - var f11 = queue.push(r11, @[chain[0]]) + check: + f13.finished == false + case kkind + of SyncQueueKind.Forward: counter == int(startSlot) + of SyncQueueKind.Backward: counter == int(finishSlot) + + var f11 = queue.push(r11, chain.getSlice(startSlot, r11)) await sleepAsync(100.milliseconds) - doAssert(counter == 1) - doAssert(f11.finished == true and f11.failed == false) - var f12 = queue.push(r12, @[chain[1]]) + check: + case kkind + of SyncQueueKind.Forward: counter == int(startSlot + chunkSize) + of SyncQueueKind.Backward: counter == int(finishSlot - chunkSize) + f11.finished == true and f11.failed == false + f13.finished == false + + var f12 = queue.push(r12, chain.getSlice(startSlot, r12)) + await allFutures(f11, f12, f13) + check: + f12.finished == true and f12.failed == false + f13.finished == true and f13.failed == false + check: + case kkind + of SyncQueueKind.Forward: counter == int(finishSlot) + 1 + of SyncQueueKind.Backward: counter == int(startSlot) - 1 + r11.item == p1 + r12.item == p2 + r13.item == p3 + await validatorFut.cancelAndWait() + return true + + check waitFor(runTest()) == true + + for k in {SyncQueueKind.Forward, SyncQueueKind.Backward}: + let prefix = "[SyncQueue#" & $k & "] " + + test prefix & "Start and finish slots equal": + startAndFinishSlotsEqual(k) + + test prefix & "Pass through established limits test": + passThroughLimitsTest(k) + + test prefix & "Two full requests success/fail": + twoFullRequests(k) + + test prefix & "Smoke test": + const SmokeTests = [ + (Slot(0), Slot(547), 61'u64), + (Slot(193), Slot(389), 79'u64), + (Slot(1181), Slot(1399), 41'u64) + ] + for item in SmokeTests: + smokeTest(k, item[0], item[1], item[2]) + + test prefix & "Async unordered push test": + const UnorderedTests = [ + Slot(0), Slot(100) + ] + for item in UnorderedTests: + unorderedAsyncTest(k, item) + + test "[SyncQueue#Forward] Async unordered push with rewind test": + let + aq = newBlockProcessor() + startSlot = Slot(0) + chunkSize = SLOTS_PER_EPOCH + numberOfChunks = 4'u64 + finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64) + queueSize = 1 + + var counter = int(startSlot) + + proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blck.slot == Slot(counter): + withBlck(sblock.blck): + if blck.message.proposer_index == 0xDEADBEAF'u64: + sblock.fail(BlockError.MissingParent) + else: + inc(counter) + sblock.done() + else: + sblock.fail(BlockError.Invalid) + + var + chain = createChain(startSlot, finishSlot) + queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + startSlot, finishSlot, chunkSize, + getFirstSlotAtFinalizedEpoch, aq, + queueSize) + validatorFut = forwardValidator(aq[].blockQueue) + + let + p1 = SomeTPeer() + p2 = SomeTPeer() + p3 = SomeTPeer() + p4 = SomeTPeer() + p5 = SomeTPeer() + p6 = SomeTPeer() + p7 = SomeTPeer() + p8 = SomeTPeer() + + proc runTest(): Future[bool] {.async.} = + var r11 = queue.pop(finishSlot, p1) + var r12 = queue.pop(finishSlot, p2) + var r13 = queue.pop(finishSlot, p3) + var r14 = queue.pop(finishSlot, p4) + + var f14 = queue.push(r14, chain.getSlice(startSlot, r14)) await sleepAsync(100.milliseconds) - doAssert(f12.finished == true and f12.failed == false) - doAssert(f13.finished == true and f13.failed == false) - doAssert(counter == 3) - doAssert(r11.item == p1) - doAssert(r12.item == p2) - doAssert(r13.item == p3) + check: + f14.finished == false + counter == int(startSlot) + + var f12 = queue.push(r12, chain.getSlice(startSlot, r12)) + await sleepAsync(100.milliseconds) + check: + counter == int(startSlot) + f12.finished == false + f14.finished == false + + var f11 = queue.push(r11, chain.getSlice(startSlot, r11)) + await allFutures(f11, f12) + check: + counter == int(startSlot + chunkSize + chunkSize) + f11.finished == true and f11.failed == false + f12.finished == true and f12.failed == false + f14.finished == false + + var missingSlice = chain.getSlice(startSlot, r13) + withBlck(missingSlice[0]): + blck.message.proposer_index = 0xDEADBEAF'u64 + var f13 = queue.push(r13, missingSlice) + await allFutures(f13, f14) + check: + f11.finished == true and f11.failed == false + f12.finished == true and f12.failed == false + f13.finished == true and f13.failed == false + f14.finished == true and f14.failed == false + queue.inpSlot == Slot(SLOTS_PER_EPOCH) + queue.outSlot == Slot(SLOTS_PER_EPOCH) + queue.debtLen == 0 + + # Recovery process + counter = int(SLOTS_PER_EPOCH) + + var r15 = queue.pop(finishSlot, p5) + var r16 = queue.pop(finishSlot, p6) + var r17 = queue.pop(finishSlot, p7) + var r18 = queue.pop(finishSlot, p8) + + check r18.isEmpty() == true + + var f17 = queue.push(r17, chain.getSlice(startSlot, r17)) + await sleepAsync(100.milliseconds) + check f17.finished == false + + var f16 = queue.push(r16, chain.getSlice(startSlot, r16)) + await sleepAsync(100.milliseconds) + check f16.finished == false + + var f15 = queue.push(r15, chain.getSlice(startSlot, r15)) + await allFutures(f15, f16, f17) + check: + f15.finished == true and f15.failed == false + f16.finished == true and f16.failed == false + f17.finished == true and f17.failed == false + counter == int(finishSlot) + 1 await validatorFut.cancelAndWait() - result = true + return true - check waitFor(test()) + check waitFor(runTest()) == true - test "[SyncQueue] Async unordered push with not full start from non-zero": - proc test(): Future[bool] {.async.} = - var counter = 5 + test "[SyncQueue#Backward] Async unordered push with rewind test": + let + aq = newBlockProcessor() + startSlot = Slot(0) + chunkSize = SLOTS_PER_EPOCH + numberOfChunks = 4'u64 + finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64) + queueSize = 1 - proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - inc(counter) - sblock.done() - else: - sblock.fail(BlockError.Invalid) + var + lastSafeSlot: Slot + counter = int(finishSlot) - let aq = newBlockProcessor() - var chain = createChain(Slot(5), Slot(11)) - var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64, - getFirstSlotAtFinalizedEpoch, aq, 2) + proc getSafeSlot(): Slot = + lastSafeSlot - let p1 = SomeTPeer() - let p2 = SomeTPeer() - let p3 = SomeTPeer() - let p4 = SomeTPeer() + proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blck.slot == Slot(counter): + withBlck(sblock.blck): + if blck.message.proposer_index == 0xDEADBEAF'u64: + sblock.fail(BlockError.MissingParent) + else: + lastSafeSlot = sblock.blck.slot + dec(counter) + sblock.done() + else: + sblock.fail(BlockError.Invalid) - var validatorFut = simpleValidator(aq[].blockQueue) + var + chain = createChain(startSlot, finishSlot) + queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, + finishSlot, startSlot, chunkSize, + getSafeSlot, aq, queueSize) + validatorFut = backwardValidator(aq[].blockQueue) - var r21 = queue.pop(Slot(11), p1) - var r22 = queue.pop(Slot(11), p2) - var r23 = queue.pop(Slot(11), p3) - var r24 = queue.pop(Slot(11), p4) + let + p1 = SomeTPeer() + p2 = SomeTPeer() + p3 = SomeTPeer() + p4 = SomeTPeer() + p5 = SomeTPeer() + p6 = SomeTPeer() + p7 = SomeTPeer() - var f24 = queue.push(r24, @[chain[6]]) - var f22 = queue.push(r22, @[chain[2], chain[3]]) - doAssert(f24.finished == false) - doAssert(f22.finished == true and f22.failed == false) - doAssert(counter == 5) - var f21 = queue.push(r21, @[chain[0], chain[1]]) + proc runTest(): Future[bool] {.async.} = + var r11 = queue.pop(finishSlot, p1) + var r12 = queue.pop(finishSlot, p2) + var r13 = queue.pop(finishSlot, p3) + var r14 = queue.pop(finishSlot, p4) + + var f14 = queue.push(r14, chain.getSlice(startSlot, r14)) await sleepAsync(100.milliseconds) - doAssert(f21.finished == true and f21.failed == false) - doAssert(f24.finished == true and f24.failed == false) - doAssert(counter == 9) - var f23 = queue.push(r23, @[chain[4], chain[5]]) + check: + f14.finished == false + counter == int(finishSlot) + + var f12 = queue.push(r12, chain.getSlice(startSlot, r12)) await sleepAsync(100.milliseconds) - doAssert(f23.finished == true and f23.failed == false) - doAssert(counter == 12) - doAssert(counter == 12) - doAssert(r21.item == p1) - doAssert(r22.item == p2) - doAssert(r23.item == p3) - doAssert(r24.item == p4) + check: + counter == int(finishSlot) + f12.finished == false + f14.finished == false + + var f11 = queue.push(r11, chain.getSlice(startSlot, r11)) + await allFutures(f11, f12) + check: + counter == int(finishSlot - chunkSize - chunkSize) + f11.finished == true and f11.failed == false + f12.finished == true and f12.failed == false + f14.finished == false + + var missingSlice = chain.getSlice(startSlot, r13) + withBlck(missingSlice[0]): + blck.message.proposer_index = 0xDEADBEAF'u64 + var f13 = queue.push(r13, missingSlice) + await allFutures(f13, f14) + check: + f11.finished == true and f11.failed == false + f12.finished == true and f12.failed == false + f13.finished == true and f13.failed == false + f14.finished == true and f14.failed == false + + # Recovery process + counter = int(SLOTS_PER_EPOCH) + 1 + + var r15 = queue.pop(finishSlot, p5) + var r16 = queue.pop(finishSlot, p6) + var r17 = queue.pop(finishSlot, p7) + + check r17.isEmpty() == true + + var f16 = queue.push(r16, chain.getSlice(startSlot, r16)) + await sleepAsync(100.milliseconds) + check f16.finished == false + + var f15 = queue.push(r15, chain.getSlice(startSlot, r15)) + await allFutures(f15, f16) + check: + f15.finished == true and f15.failed == false + f16.finished == true and f16.failed == false + counter == int(startSlot) - 1 await validatorFut.cancelAndWait() - result = true + return true - check waitFor(test()) - - test "[SyncQueue] Async pending and resetWait() test": - proc test(): Future[bool] {.async.} = - var counter = 5 - - proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} = - while true: - let sblock = await aq.popFirst() - if sblock.blck.slot == Slot(counter): - inc(counter) - sblock.done() - else: - sblock.fail(BlockError.Invalid) - - let aq = newBlockProcessor() - var chain = createChain(Slot(5), Slot(18)) - var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64, - getFirstSlotAtFinalizedEpoch, aq, 2) - let p1 = SomeTPeer() - let p2 = SomeTPeer() - let p3 = SomeTPeer() - let p4 = SomeTPeer() - let p5 = SomeTPeer() - let p6 = SomeTPeer() - let p7 = SomeTPeer() - - var validatorFut = simpleValidator(aq[].blockQueue) - - var r21 = queue.pop(Slot(20), p1) - var r22 = queue.pop(Slot(20), p2) - var r23 = queue.pop(Slot(20), p3) - var r24 = queue.pop(Slot(20), p4) - var r25 = queue.pop(Slot(20), p5) - var r26 = queue.pop(Slot(20), p6) - var r27 = queue.pop(Slot(20), p7) - - var f21 = queue.push(r21, @[chain[0], chain[1]]) - # This should be silently ignored, because r21 is already processed. - var e21 = queue.push(r21, @[chain[0], chain[1]]) - queue.push(r22) - queue.push(r23) - var f26 = queue.push(r26, @[chain[10], chain[11]]) - var f27 = queue.push(r27, @[chain[12], chain[13]]) - - await sleepAsync(100.milliseconds) - doAssert(f21.finished == true and f21.failed == false) - doAssert(e21.finished == true and e21.failed == false) - doAssert(f26.finished == false) - doAssert(f27.finished == false) - await queue.resetWait(none[Slot]()) - await sleepAsync(100.milliseconds) - doAssert(f26.finished == true and f26.failed == false) - doAssert(f27.finished == true and f27.failed == false) - doAssert(queue.inpSlot == Slot(7) and queue.outSlot == Slot(7)) - doAssert(counter == 7) - doAssert(len(queue) == 12) - # This should be silently ignored, because r21 is already processed. - var o21 = queue.push(r21, @[chain[0], chain[1]]) - var o22 = queue.push(r22, @[chain[2], chain[3]]) - queue.push(r23) - queue.push(r24) - var o25 = queue.push(r25, @[chain[8], chain[9]]) - var o26 = queue.push(r26, @[chain[10], chain[11]]) - var o27 = queue.push(r27, @[chain[12], chain[13]]) - await sleepAsync(100.milliseconds) - doAssert(o21.finished == true and o21.failed == false) - doAssert(o22.finished == true and o22.failed == false) - doAssert(o25.finished == true and o25.failed == false) - doAssert(o26.finished == true and o26.failed == false) - doAssert(o27.finished == true and o27.failed == false) - doAssert(len(queue) == 12) - - await validatorFut.cancelAndWait() - result = true - - check waitFor(test()) + check waitFor(runTest()) == true test "[SyncQueue] hasEndGap() test": let chain1 = createChain(Slot(1), Slot(1)) @@ -519,10 +770,10 @@ suite "SyncManager test suite": checkResponse(r22, @[chain[4]]) == false checkResponse(r22, @[chain[3], chain[1]]) == false - test "[SyncQueue] getRewindPoint() test": + test "[SyncQueue#Forward] getRewindPoint() test": let aq = newBlockProcessor() block: - var queue = SyncQueue.init(SomeTPeer, + var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), 1'u64, getFirstSlotAtFinalizedEpoch, aq, 2) let finalizedSlot = compute_start_slot_at_epoch(Epoch(0'u64)) @@ -533,7 +784,7 @@ suite "SyncManager test suite": check queue.getRewindPoint(Slot(i), finalizedSlot) == finalizedSlot block: - var queue = SyncQueue.init(SomeTPeer, + var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), 1'u64, getFirstSlotAtFinalizedEpoch, aq, 2) let finalizedSlot = compute_start_slot_at_epoch(Epoch(1'u64)) @@ -544,7 +795,7 @@ suite "SyncManager test suite": check queue.getRewindPoint(Slot(i), finalizedSlot) == finalizedSlot block: - var queue = SyncQueue.init(SomeTPeer, + var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), 1'u64, getFirstSlotAtFinalizedEpoch, aq, 2) let finalizedSlot = compute_start_slot_at_epoch(Epoch(0'u64)) @@ -561,7 +812,7 @@ suite "SyncManager test suite": counter = counter shl 1 block: - var queue = SyncQueue.init(SomeTPeer, + var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), 1'u64, getFirstSlotAtFinalizedEpoch, aq, 2) let finalizedSlot = compute_start_slot_at_epoch(Epoch(1'u64)) @@ -575,3 +826,13 @@ suite "SyncManager test suite": let rewindSlot = compute_start_slot_at_epoch(rewindEpoch) check queue.getRewindPoint(failSlot, finalizedSlot) == rewindSlot counter = counter shl 1 + + test "[SyncQueue#Backward] getRewindPoint() test": + let aq = newBlockProcessor() + block: + var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, + Slot(1024), Slot(0), + 1'u64, getSafeSlot, aq, 2) + let safeSlot = getSafeSlot() + for i in countdown(1023, 0): + check queue.getRewindPoint(Slot(i), safeSlot) == safeSlot