# beacon_chain # Copyright (c) 2018-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [Defect].} import std/[options, heapqueue, tables, strutils, sequtils, math, algorithm] import stew/results, chronos, chronicles import ../spec/datatypes/[base, phase0, altair], ../spec/eth2_apis/rpc_types, ../spec/[helpers, forks], ../networking/[peer_pool, eth2_network], ../gossip_processing/block_processor, ../consensus_object_pools/block_pools_types export base, phase0, altair, merge, chronos, chronicles, results, block_pools_types, helpers logScope: topics = "syncqueue" type GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} ProcessingCallback* = proc() {.gcsafe, raises: [Defect].} BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].} SyncQueueKind* {.pure.} = enum Forward, Backward SyncRequest*[T] = object kind: SyncQueueKind index*: uint64 slot*: Slot count*: uint64 step*: uint64 item*: T SyncResult*[T] = object request*: SyncRequest[T] data*: seq[ref ForkedSignedBeaconBlock] SyncWaiter* = ref object future: Future[void] reset: bool RewindPoint = object failSlot: Slot epochCount: uint64 SyncQueue*[T] = ref object kind*: SyncQueueKind inpSlot*: Slot outSlot*: Slot startSlot*: Slot finalSlot*: Slot chunkSize*: uint64 queueSize*: int counter*: uint64 pending*: Table[uint64, SyncRequest[T]] waiters: seq[SyncWaiter] getSafeSlot*: GetSlotCallback debtsQueue: HeapQueue[SyncRequest[T]] debtsCount: uint64 readyQueue: HeapQueue[SyncResult[T]] rewind: Option[RewindPoint] blockVerifier: BlockVerifier SyncManagerError* = object of CatchableError BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]] chronicles.formatIt SyncQueueKind: $it proc getShortMap*[T](req: SyncRequest[T], data: openArray[ref ForkedSignedBeaconBlock]): string = ## Returns all slot numbers in ``data`` as placement map. var res = newStringOfCap(req.count) var slider = req.slot var last = 0 for i in 0 ..< req.count: if last < len(data): for k in last ..< len(data): if slider == data[k][].slot: res.add('x') last = k + 1 break elif slider < data[k][].slot: res.add('.') break else: res.add('.') slider = slider + req.step res proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} = slot >= req.slot and slot < req.slot + req.count * req.step and ((slot - req.slot) mod req.step == 0) proc cmp*[T](a, b: SyncRequest[T]): int = cmp(uint64(a.slot), uint64(b.slot)) proc checkResponse*[T](req: SyncRequest[T], data: openArray[ref ForkedSignedBeaconBlock]): bool = if len(data) == 0: # Impossible to verify empty response. return true if uint64(len(data)) > req.count: # Number of blocks in response should be less or equal to number of # requested blocks. return false var slot = req.slot var rindex = 0'u64 var dindex = 0 while (rindex < req.count) and (dindex < len(data)): if slot < data[dindex][].slot: discard elif slot == data[dindex][].slot: inc(dindex) else: return false slot = slot + req.step rindex = rindex + 1'u64 if dindex == len(data): return true else: return false proc getFullMap*[T](req: SyncRequest[T], data: openArray[ForkedSignedBeaconBlock]): string = # Returns all slot numbers in ``data`` as comma-delimeted string. mapIt(data, $it.message.slot).join(", ") proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, finish: Slot, t2: typedesc[T]): SyncRequest[T] = let count = finish - start + 1'u64 SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64) proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, slot: Slot, count: uint64, item: T): SyncRequest[T] = SyncRequest[T](kind: kind, slot: slot, count: count, item: item, step: 1'u64) proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, finish: Slot, item: T): SyncRequest[T] = let count = finish - start + 1'u64 SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64, item: item) proc empty*[T](t: typedesc[SyncRequest], kind: SyncQueueKind, t2: typedesc[T]): SyncRequest[T] {.inline.} = SyncRequest[T](kind: kind, step: 0'u64, count: 0'u64) proc setItem*[T](sr: var SyncRequest[T], item: T) = sr.item = item proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} = (sr.step == 0'u64) and (sr.count == 0'u64) proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], queueKind: SyncQueueKind, start, final: Slot, chunkSize: uint64, getSafeSlotCb: GetSlotCallback, blockVerifier: BlockVerifier, syncQueueSize: int = -1): SyncQueue[T] = ## Create new synchronization queue with parameters ## ## ``start`` and ``last`` are starting and finishing Slots. ## ## ``chunkSize`` maximum number of slots in one request. ## ## ``syncQueueSize`` maximum queue size for incoming data. ## If ``syncQueueSize > 0`` queue will help to keep backpressure under ## control. If ``syncQueueSize <= 0`` then queue size is unlimited (default). # SyncQueue is the core of sync manager, this data structure distributes # requests to peers and manages responses from peers. # # Because SyncQueue is async data structure it manages backpressure and # order of incoming responses and it also resolves "joker's" problem. # # Joker's problem # # According to current Ethereum2 network specification # > Clients MUST respond with at least one block, if they have it and it # > exists in the range. Clients MAY limit the number of blocks in the # > response. # # Such rule can lead to very uncertain responses, for example let slots from # 10 to 12 will be not empty. Client which follows specification can answer # with any response from this list (X - block, `-` empty space): # # 1. X X X # 2. - - X # 3. - X - # 4. - X X # 5. X - - # 6. X - X # 7. X X - # # If peer answers with `1` everything will be fine and `block_pool` will be # able to process all 3 blocks. In case of `2`, `3`, `4`, `6` - `block_pool` # will fail immediately with chunk and report "parent is missing" error. # But in case of `5` and `7` blocks will be processed by `block_pool` without # any problems, however it will start producing problems right from this # uncertain last slot. SyncQueue will start producing requests for next # blocks, but all the responses from this point will fail with "parent is # missing" error. Lets call such peers "jokers", because they are joking # with responses. # # To fix "joker" problem we going to perform rollback to the latest finalized # epoch's first slot. doAssert(chunkSize > 0'u64, "Chunk size should not be zero") SyncQueue[T]( kind: queueKind, startSlot: start, finalSlot: final, chunkSize: chunkSize, queueSize: syncQueueSize, getSafeSlot: getSafeSlotCb, waiters: newSeq[SyncWaiter](), counter: 1'u64, pending: initTable[uint64, SyncRequest[T]](), debtsQueue: initHeapQueue[SyncRequest[T]](), inpSlot: start, outSlot: start, blockVerifier: blockVerifier ) proc `<`*[T](a, b: SyncRequest[T]): bool = doAssert(a.kind == b.kind) case a.kind of SyncQueueKind.Forward: a.slot < b.slot of SyncQueueKind.Backward: a.slot > b.slot proc `<`*[T](a, b: SyncResult[T]): bool = doAssert(a.request.kind == b.request.kind) case a.request.kind of SyncQueueKind.Forward: a.request.slot < b.request.slot of SyncQueueKind.Backward: a.request.slot > b.request.slot proc `==`*[T](a, b: SyncRequest[T]): bool = (a.kind == b.kind) and (a.slot == b.slot) and (a.count == b.count) and (a.step == b.step) proc lastSlot*[T](req: SyncRequest[T]): Slot = ## Returns last slot for request ``req``. req.slot + req.count - 1'u64 proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) = req.index = sq.counter sq.counter = sq.counter + 1'u64 sq.pending[req.index] = req proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} = ## Update last slot stored in queue ``sq`` with value ``last``. case sq.kind of SyncQueueKind.Forward: doAssert(sq.finalSlot <= last, "Last slot could not be lower then stored one " & $sq.finalSlot & " <= " & $last) sq.finalSlot = last of SyncQueueKind.Backward: doAssert(sq.finalSlot >= last, "Last slot could not be higher then stored one " & $sq.finalSlot & " >= " & $last) sq.finalSlot = last proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) = ## Wakeup one or all blocked waiters. for item in sq.waiters: if reset: item.reset = true if not(item.future.finished()): item.future.complete() proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = ## Create new waiter and wait for completion from `wakeupWaiters()`. var waitfut = newFuture[void]("SyncQueue.waitForChanges") let waititem = SyncWaiter(future: waitfut) sq.waiters.add(waititem) try: await waitfut return waititem.reset finally: sq.waiters.delete(sq.waiters.find(waititem)) proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = ## This procedure will perform wakeupWaiters(false) and blocks until last ## waiter will be awakened. var waitChanges = sq.waitForChanges() sq.wakeupWaiters(true) discard await waitChanges proc clearAndWakeup*[T](sq: SyncQueue[T]) = sq.pending.clear() sq.wakeupWaiters(true) proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = ## Perform reset of all the blocked waiters in SyncQueue. ## ## We adding one more waiter to the waiters sequence and ## call wakeupWaiters(true). Because our waiter is last in sequence of ## waiters it will be resumed only after all waiters will be awakened and ## finished. # We are clearing pending list, so that all requests that are still running # around (still downloading, but not yet pushed to the SyncQueue) will be # expired. Its important to perform this call first (before await), otherwise # you can introduce race problem. sq.pending.clear() # We calculating minimal slot number to which we will be able to reset, # without missing any blocks. There 3 sources: # 1. Debts queue. # 2. Processing queue (`inpSlot`, `outSlot`). # 3. Requested slot `toSlot`. # # Queue's `outSlot` is the lowest slot we added to `block_pool`, but # `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not # added slot requests, so it can't be bigger then `outSlot` value. let minSlot = case sq.kind of SyncQueueKind.Forward: if toSlot.isSome(): min(toSlot.get(), sq.outSlot) else: sq.outSlot of SyncQueueKind.Backward: if toSlot.isSome(): toSlot.get() else: sq.outSlot sq.debtsQueue.clear() sq.debtsCount = 0 sq.readyQueue.clear() sq.inpSlot = minSlot sq.outSlot = minSlot # We are going to wakeup all the waiters and wait for last one. await sq.wakeupAndWaitWaiters() proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} = ## Returns ``true`` if response chain of blocks is empty (has only empty ## slots). len(sr.data) == 0 proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} = ## Returns ``true`` if response chain of blocks has gap at the end. let lastslot = sr.request.slot + sr.request.count - 1'u64 if len(sr.data) == 0: return true if sr.data[^1][].slot != lastslot: return true return false proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} = ## Returns last non-empty slot from result ``sr``. If response has only ## empty slots, original request slot will be returned. if len(sr.data) == 0: # If response has only empty slots we going to use original request slot sr.request.slot else: sr.data[^1][].slot proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) = sq.debtsQueue.push(sr) sq.debtsCount = sq.debtsCount + sr.count proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, safeSlot: Slot): Slot = case sq.kind of SyncQueueKind.Forward: # Calculate the latest finalized epoch. let finalizedEpoch = epoch(safeSlot) # Calculate failure epoch. let failEpoch = epoch(failSlot) # Calculate exponential rewind point in number of epochs. let epochCount = if sq.rewind.isSome(): let rewind = sq.rewind.get() if failSlot == rewind.failSlot: # `MissingParent` happened at same slot so we increase rewind point by # factor of 2. if failEpoch > finalizedEpoch: let rewindPoint = rewind.epochCount shl 1 if rewindPoint < rewind.epochCount: # If exponential rewind point produces `uint64` overflow we will # make rewind to latest finalized epoch. failEpoch - finalizedEpoch else: if (failEpoch < rewindPoint) or (failEpoch - rewindPoint < finalizedEpoch): # If exponential rewind point points to position which is far # behind latest finalized epoch. failEpoch - finalizedEpoch else: rewindPoint else: warn "Trying to rewind over the last finalized epoch", finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, rewind_epoch_count = rewind.epochCount, finalized_epoch = finalizedEpoch, direction = sq.kind, topics = "syncman" 0'u64 else: # `MissingParent` happened at different slot so we going to rewind for # 1 epoch only. if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch): warn "Сould not rewind further than the last finalized epoch", finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, rewind_epoch_count = rewind.epochCount, finalized_epoch = finalizedEpoch, direction = sq.kind, topics = "syncman" 0'u64 else: 1'u64 else: # `MissingParent` happened first time. if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch): warn "Сould not rewind further than the last finalized epoch", finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, finalized_epoch = finalizedEpoch, direction = sq.kind, topics = "syncman" 0'u64 else: 1'u64 if epochCount == 0'u64: warn "Unable to continue syncing, please restart the node", finalized_slot = safeSlot, fail_slot = failSlot, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, finalized_epoch = finalizedEpoch, direction = sq.kind, topics = "syncman" # Calculate the rewind epoch, which will be equal to last rewind point or # finalizedEpoch let rewindEpoch = if sq.rewind.isNone(): finalizedEpoch else: epoch(sq.rewind.get().failSlot) - sq.rewind.get().epochCount rewindEpoch.start_slot() else: # Calculate the rewind epoch, which should not be less than the latest # finalized epoch. let rewindEpoch = failEpoch - epochCount # Update and save new rewind point in SyncQueue. sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount)) rewindEpoch.start_slot() of SyncQueueKind.Backward: # While we perform backward sync, the only possible slot we could rewind is # latest stored block. if failSlot == safeSlot: warn "Unable to continue syncing, please restart the node", safe_slot = safeSlot, fail_slot = failSlot, direction = sq.kind, topics = "syncman" safeSlot iterator blocks*[T](sq: SyncQueue[T], sr: SyncResult[T]): ref ForkedSignedBeaconBlock = case sq.kind of SyncQueueKind.Forward: for i in countup(0, len(sr.data) - 1): yield sr.data[i] of SyncQueueKind.Backward: for i in countdown(len(sr.data) - 1, 0): yield sr.data[i] proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) = case sq.kind of SyncQueueKind.Forward: sq.outSlot = sq.outSlot + number of SyncQueueKind.Backward: sq.outSlot = sq.outSlot - number proc advanceInput[T](sq: SyncQueue[T], number: uint64) = case sq.kind of SyncQueueKind.Forward: sq.inpSlot = sq.inpSlot + number of SyncQueueKind.Backward: sq.inpSlot = sq.inpSlot - number proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool = case sq.kind of SyncQueueKind.Forward: (sq.queueSize > 0) and (sr.slot != sq.outSlot) of SyncQueueKind.Backward: (sq.queueSize > 0) and (sr.slot + sr.count - 1'u64 != sq.outSlot) proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], data: seq[ref ForkedSignedBeaconBlock], processingCb: ProcessingCallback = nil) {.async.} = ## Push successful result to queue ``sq``. mixin updateScore if sr.index notin sq.pending: # If request `sr` not in our pending list, it only means that # SyncQueue.resetWait() happens and all pending requests are expired, so # we swallow `old` requests, and in such way sync-workers are able to get # proper new requests from SyncQueue. return sq.pending.del(sr.index) # This is backpressure handling algorithm, this algorithm is blocking # all pending `push` requests if `request.slot` not in range. while true: if sq.notInRange(sr): let reset = await sq.waitForChanges() if reset: # SyncQueue reset happens. We are exiting to wake up sync-worker. return else: let syncres = SyncResult[T](request: sr, data: data) sq.readyQueue.push(syncres) break while len(sq.readyQueue) > 0: let reqres = case sq.kind of SyncQueueKind.Forward: let minSlot = sq.readyQueue[0].request.slot if sq.outSlot != minSlot: none[SyncResult[T]]() else: some(sq.readyQueue.pop()) of SyncQueueKind.Backward: let maxSlot = sq.readyQueue[0].request.slot + (sq.readyQueue[0].request.count - 1'u64) if sq.outSlot != maxSlot: none[SyncResult[T]]() else: some(sq.readyQueue.pop()) let item = if reqres.isSome(): reqres.get() else: let rewindSlot = sq.getRewindPoint(sq.outSlot, sq.getSafeSlot()) warn "Got incorrect sync result in queue, rewind happens", request_slot = sq.readyQueue[0].request.slot, request_count = sq.readyQueue[0].request.count, request_step = sq.readyQueue[0].request.step, blocks_map = getShortMap(sq.readyQueue[0].request, sq.readyQueue[0].data), blocks_count = len(sq.readyQueue[0].data), output_slot = sq.outSlot, input_slot = sq.inpSlot, peer = sq.readyQueue[0].request.item, rewind_to_slot = rewindSlot, direction = sq.readyQueue[0].request.kind, topics = "syncman" await sq.resetWait(some(rewindSlot)) break if processingCb != nil: processingCb() # Validating received blocks one by one var hasOkBlock = false hasInvalidBlock = false unviableBlock: Option[(Eth2Digest, Slot)] missingParentSlot: Option[Slot] # compiler segfault if this is moved into the for loop, at time of writing res: Result[void, BlockError] for blk in sq.blocks(item): res = await sq.blockVerifier(blk[]) if res.isOk(): hasOkBlock = true else: case res.error() of BlockError.MissingParent: missingParentSlot = some(blk[].slot) break of BlockError.Duplicate: # Keep going, happens naturally discard of BlockError.UnviableFork: # Keep going so as to register other unviable blocks with the # quarantine if unviableBlock.isNone: # Remember the first unviable block, so we can log it unviableBlock = some((blk[].root, blk[].slot)) of BlockError.Invalid: hasInvalidBlock = true let req = item.request warn "Received invalid sequence of blocks", peer = req.item, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data), direction = req.kind, topics = "syncman" req.item.updateScore(PeerScoreBadBlocks) break # When errors happen while processing blocks, we retry the same request # with, hopefully, a different peer let retryRequest = hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome() if not retryRequest: sq.advanceOutput(item.request.count) if hasOkBlock: # If there no error and response was not empty we should reward peer # with some bonus score - not for duplicate blocks though. item.request.item.updateScore(PeerScoreGoodBlocks) sq.wakeupWaiters() else: debug "Block pool rejected peer's response", peer = item.request.item, request_slot = item.request.slot, request_count = item.request.count, request_step = item.request.step, blocks_map = getShortMap(item.request, item.data), blocks_count = len(item.data), ok = hasOkBlock, unviable = unviableBlock.isSome(), missing_parent = missingParentSlot.isSome(), direction = item.request.kind, topics = "syncman" # We need to move failed response to the debts queue. sq.toDebtsQueue(item.request) if unviableBlock.isSome: let req = item.request notice "Received blocks from an unviable fork", blockRoot = unviableBlock.get()[0], blockSlot = unviableBlock.get()[1], peer = req.item, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data), direction = req.kind, topics = "syncman" req.item.updateScore(PeerScoreUnviableFork) if missingParentSlot.isSome: var resetSlot: Option[Slot] failSlot = missingParentSlot.get() # If we got `BlockError.MissingParent` it means that peer returns chain # of blocks with holes or `block_pool` is in incomplete state. We going # to rewind to the first slot at latest finalized epoch. let req = item.request safeSlot = sq.getSafeSlot() case sq.kind of SyncQueueKind.Forward: if safeSlot < req.slot: let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) warn "Unexpected missing parent, rewind happens", peer = req.item, rewind_to_slot = rewindSlot, rewind_epoch_count = sq.rewind.get().epochCount, rewind_fail_slot = failSlot, finalized_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data), direction = req.kind, topics = "syncman" resetSlot = some(rewindSlot) req.item.updateScore(PeerScoreMissingBlocks) else: error "Unexpected missing parent at finalized epoch slot", peer = req.item, to_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data), direction = req.kind, topics = "syncman" req.item.updateScore(PeerScoreBadBlocks) of SyncQueueKind.Backward: if safeSlot > req.slot: let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) # It's quite common peers give us fewer blocks than we ask for info "Gap in block range response, rewinding", peer = req.item, rewind_to_slot = rewindSlot, rewind_fail_slot = failSlot, finalized_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data), direction = req.kind, topics = "syncman" resetSlot = some(rewindSlot) req.item.updateScore(PeerScoreMissingBlocks) else: error "Unexpected missing parent at safe slot", peer = req.item, to_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data), direction = req.kind, topics = "syncman" req.item.updateScore(PeerScoreBadBlocks) if resetSlot.isSome(): await sq.resetWait(resetSlot) case sq.kind of SyncQueueKind.Forward: debug "Rewind to slot was happened", reset_slot = reset_slot.get(), queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, rewind_epoch_count = sq.rewind.get().epochCount, rewind_fail_slot = sq.rewind.get().failSlot, reset_slot = resetSlot, direction = sq.kind, topics = "syncman" of SyncQueueKind.Backward: debug "Rewind to slot was happened", reset_slot = reset_slot.get(), queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, reset_slot = resetSlot, direction = sq.kind, topics = "syncman" break proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) = ## Push failed request back to queue. if sr.index notin sq.pending: # If request `sr` not in our pending list, it only means that # SyncQueue.resetWait() happens and all pending requests are expired, so # we swallow `old` requests, and in such way sync-workers are able to get # proper new requests from SyncQueue. return sq.pending.del(sr.index) sq.toDebtsQueue(sr) proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] = ## Create new request according to current SyncQueue parameters. if len(sq.debtsQueue) > 0: if maxSlot < sq.debtsQueue[0].slot: # Peer's latest slot is less than starting request's slot. return SyncRequest.empty(sq.kind, T) if maxSlot < sq.debtsQueue[0].lastSlot(): # Peer's latest slot is less than finishing request's slot. return SyncRequest.empty(sq.kind, T) var sr = sq.debtsQueue.pop() sq.debtsCount = sq.debtsCount - sr.count sr.setItem(item) sq.makePending(sr) sr else: case sq.kind of SyncQueueKind.Forward: if maxSlot < sq.inpSlot: # Peer's latest slot is less than queue's input slot. return SyncRequest.empty(sq.kind, T) if sq.inpSlot > sq.finalSlot: # Queue's input slot is bigger than queue's final slot. return SyncRequest.empty(sq.kind, T) let lastSlot = min(maxslot, sq.finalSlot) let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot) var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item) sq.advanceInput(count) sq.makePending(sr) sr of SyncQueueKind.Backward: if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64: return SyncRequest.empty(sq.kind, T) if sq.inpSlot < sq.finalSlot: return SyncRequest.empty(sq.kind, T) let (slot, count) = block: let baseSlot = sq.inpSlot + 1'u64 if baseSlot - sq.finalSlot < sq.chunkSize: let count = uint64(baseSlot - sq.finalSlot) (baseSlot - count, count) else: (baseSlot - sq.chunkSize, sq.chunkSize) if (maxSlot + 1'u64) < slot + count: # Peer's latest slot is less than queue's input slot. return SyncRequest.empty(sq.kind, T) var sr = SyncRequest.init(sq.kind, slot, count, item) sq.advanceInput(count) sq.makePending(sr) sr proc debtLen*[T](sq: SyncQueue[T]): uint64 = sq.debtsCount proc pendingLen*[T](sq: SyncQueue[T]): uint64 = case sq.kind of SyncQueueKind.Forward: # When moving forward `outSlot` will be <= of `inpSlot`. sq.inpSlot - sq.outSlot of SyncQueueKind.Backward: # When moving backward `outSlot` will be >= of `inpSlot` sq.outSlot - sq.inpSlot proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} = ## Returns number of slots left in queue ``sq``. case sq.kind of SyncQueueKind.Forward: sq.finalSlot + 1'u64 - sq.outSlot of SyncQueueKind.Backward: sq.outSlot + 1'u64 - sq.finalSlot proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} = ## Returns total number of slots in queue ``sq``. case sq.kind of SyncQueueKind.Forward: sq.finalSlot + 1'u64 - sq.startSlot of SyncQueueKind.Backward: sq.startSlot + 1'u64 - sq.finalSlot proc progress*[T](sq: SyncQueue[T]): uint64 = ## How many slots we've synced so far case sq.kind of SyncQueueKind.Forward: sq.outSlot - sq.startSlot of SyncQueueKind.Backward: sq.startSlot - sq.outSlot