# beacon_chain
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
#   * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
#   * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

when (NimMajor, NimMinor) < (1, 4):
  {.push raises: [Defect].}
else:
  {.push raises: [].}

import std/[options, heapqueue, tables, strutils, sequtils, math]
import stew/[results, base10], chronos, chronicles
import
  ../spec/datatypes/[base, phase0, altair],
  ../spec/[helpers, forks],
  ../networking/[peer_pool, eth2_network],
  ../gossip_processing/block_processor,
  ../consensus_object_pools/block_pools_types

export base, phase0, altair, merge, chronos, chronicles, results,
       block_pools_types, helpers

logScope:
  topics = "syncqueue"

type
  GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
  ProcessingCallback* = proc() {.gcsafe, raises: [Defect].}
  BlockVerifier* =
    proc(signedBlock: ForkedSignedBeaconBlock):
      Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].}

  SyncQueueKind* {.pure.} = enum
    Forward, Backward

  SyncRequest*[T] = object
    kind*: SyncQueueKind
    index*: uint64
    slot*: Slot
    count*: uint64
    item*: T

  SyncResult*[T] = object
    request*: SyncRequest[T]
    data*: seq[ref ForkedSignedBeaconBlock]

  GapItem*[T] = object
    start*: Slot
    finish*: Slot
    item*: T

  SyncWaiter* = ref object
    future: Future[void]
    reset: bool

  RewindPoint = object
    failSlot: Slot
    epochCount: uint64

  SyncQueue*[T] = ref object
    kind*: SyncQueueKind
    inpSlot*: Slot
    outSlot*: Slot
    startSlot*: Slot
    finalSlot*: Slot
    chunkSize*: uint64
    queueSize*: int
    counter*: uint64
    pending*: Table[uint64, SyncRequest[T]]
    gapList*: seq[GapItem[T]]
    waiters: seq[SyncWaiter]
    getSafeSlot*: GetSlotCallback
    debtsQueue: HeapQueue[SyncRequest[T]]
    debtsCount: uint64
    readyQueue: HeapQueue[SyncResult[T]]
    rewind: Option[RewindPoint]
    blockVerifier: BlockVerifier
    ident*: string

  SyncManagerError* = object of CatchableError
  BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]]

chronicles.formatIt SyncQueueKind: toLowerAscii($it)

template shortLog*[T](req: SyncRequest[T]): string =
  Base10.toString(uint64(req.slot)) & ":" &
  Base10.toString(req.count) & "@" &
  Base10.toString(req.index)

chronicles.expandIt SyncRequest:
  `it` = shortLog(it)
  peer = shortLog(it.item)
  direction = toLowerAscii($it.kind)

proc getShortMap*[T](req: SyncRequest[T],
                     data: openArray[ref ForkedSignedBeaconBlock]): string =
  ## Returns all slot numbers in ``data`` as placement map.
  var res = newStringOfCap(req.count)
  var slider = req.slot
  var last = 0
  for i in 0 ..< req.count:
    if last < len(data):
      for k in last ..< len(data):
        if slider == data[k][].slot:
          res.add('x')
          last = k + 1
          break
        elif slider < data[k][].slot:
          res.add('.')
          break
    else:
      res.add('.')
    slider = slider + 1
  res

proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} =
  slot >= req.slot and slot < req.slot + req.count

proc cmp*[T](a, b: SyncRequest[T]): int =
  cmp(uint64(a.slot), uint64(b.slot))

proc checkResponse*[T](req: SyncRequest[T],
                       data: openArray[ref ForkedSignedBeaconBlock]): bool =
  if len(data) == 0:
    # Impossible to verify empty response.
    return true

  if uint64(len(data)) > req.count:
    # Number of blocks in response should be less or equal to number of
    # requested blocks.
    return false

  var slot = req.slot
  var rindex = 0'u64
  var dindex = 0

  while (rindex < req.count) and (dindex < len(data)):
    if slot < data[dindex][].slot:
      discard
    elif slot == data[dindex][].slot:
      inc(dindex)
    else:
      return false
    slot += 1'u64
    rindex += 1'u64

  if dindex == len(data):
    return true
  else:
    return false

proc getFullMap*[T](req: SyncRequest[T],
                    data: openArray[ForkedSignedBeaconBlock]): string =
  # Returns all slot numbers in ``data`` as comma-delimeted string.
  mapIt(data, $it.message.slot).join(", ")

proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot,
             finish: Slot, t2: typedesc[T]): SyncRequest[T] =
  let count = finish - start + 1'u64
  SyncRequest[T](kind: kind, slot: start, count: count)

proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, slot: Slot,
             count: uint64, item: T): SyncRequest[T] =
  SyncRequest[T](kind: kind, slot: slot, count: count, item: item)

proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot,
             finish: Slot, item: T): SyncRequest[T] =
  let count = finish - start + 1'u64
  SyncRequest[T](kind: kind, slot: start, count: count, item: item)

proc empty*[T](t: typedesc[SyncRequest], kind: SyncQueueKind,
               t2: typedesc[T]): SyncRequest[T] {.inline.} =
  SyncRequest[T](kind: kind, count: 0'u64)

proc setItem*[T](sr: var SyncRequest[T], item: T) =
  sr.item = item

proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
  (sr.count == 0'u64)

proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
              queueKind: SyncQueueKind,
              start, final: Slot, chunkSize: uint64,
              getSafeSlotCb: GetSlotCallback,
              blockVerifier: BlockVerifier,
              syncQueueSize: int = -1,
              ident: string = "main"): SyncQueue[T] =
  ## Create new synchronization queue with parameters
  ##
  ## ``start`` and ``last`` are starting and finishing Slots.
  ##
  ## ``chunkSize`` maximum number of slots in one request.
  ##
  ## ``syncQueueSize`` maximum queue size for incoming data.
  ## If ``syncQueueSize > 0`` queue will help to keep backpressure under
  ## control. If ``syncQueueSize <= 0`` then queue size is unlimited (default).

  # SyncQueue is the core of sync manager, this data structure distributes
  # requests to peers and manages responses from peers.
  #
  # Because SyncQueue is async data structure it manages backpressure and
  # order of incoming responses and it also resolves "joker's" problem.
  #
  # Joker's problem
  #
  # According to current Ethereum2 network specification
  # > Clients MUST respond with at least one block, if they have it and it
  # > exists in the range. Clients MAY limit the number of blocks in the
  # > response.
  #
  # Such rule can lead to very uncertain responses, for example let slots from
  # 10 to 12 will be not empty. Client which follows specification can answer
  # with any response from this list (X - block, `-` empty space):
  #
  # 1.   X X X
  # 2.   - - X
  # 3.   - X -
  # 4.   - X X
  # 5.   X - -
  # 6.   X - X
  # 7.   X X -
  #
  # If peer answers with `1` everything will be fine and `block_pool` will be
  # able to process all 3 blocks. In case of `2`, `3`, `4`, `6` - `block_pool`
  # will fail immediately with chunk and report "parent is missing" error.
  # But in case of `5` and `7` blocks will be processed by `block_pool` without
  # any problems, however it will start producing problems right from this
  # uncertain last slot. SyncQueue will start producing requests for next
  # blocks, but all the responses from this point will fail with "parent is
  # missing" error. Lets call such peers "jokers", because they are joking
  # with responses.
  #
  # To fix "joker" problem we going to perform rollback to the latest finalized
  # epoch's first slot.
  doAssert(chunkSize > 0'u64, "Chunk size should not be zero")
  SyncQueue[T](
    kind: queueKind,
    startSlot: start,
    finalSlot: final,
    chunkSize: chunkSize,
    queueSize: syncQueueSize,
    getSafeSlot: getSafeSlotCb,
    waiters: newSeq[SyncWaiter](),
    counter: 1'u64,
    pending: initTable[uint64, SyncRequest[T]](),
    debtsQueue: initHeapQueue[SyncRequest[T]](),
    inpSlot: start,
    outSlot: start,
    blockVerifier: blockVerifier,
    ident: ident
  )

proc `<`*[T](a, b: SyncRequest[T]): bool =
  doAssert(a.kind == b.kind)
  case a.kind
  of SyncQueueKind.Forward:
    a.slot < b.slot
  of SyncQueueKind.Backward:
    a.slot > b.slot

proc `<`*[T](a, b: SyncResult[T]): bool =
  doAssert(a.request.kind == b.request.kind)
  case a.request.kind
  of SyncQueueKind.Forward:
    a.request.slot < b.request.slot
  of SyncQueueKind.Backward:
    a.request.slot > b.request.slot

proc `==`*[T](a, b: SyncRequest[T]): bool =
  (a.kind == b.kind) and (a.slot == b.slot) and (a.count == b.count)

proc lastSlot*[T](req: SyncRequest[T]): Slot =
  ## Returns last slot for request ``req``.
  req.slot + req.count - 1'u64

proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) =
  req.index = sq.counter
  sq.counter = sq.counter + 1'u64
  sq.pending[req.index] = req

proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
  ## Update last slot stored in queue ``sq`` with value ``last``.
  case sq.kind
  of SyncQueueKind.Forward:
    doAssert(sq.finalSlot <= last,
             "Last slot could not be lower then stored one " &
             $sq.finalSlot & " <= " & $last)
    sq.finalSlot = last
  of SyncQueueKind.Backward:
    doAssert(sq.finalSlot >= last,
             "Last slot could not be higher then stored one " &
             $sq.finalSlot & " >= " & $last)
    sq.finalSlot = last

proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) =
  ## Wakeup one or all blocked waiters.
  for item in sq.waiters:
    if reset:
      item.reset = true

    if not(item.future.finished()):
      item.future.complete()

proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} =
  ## Create new waiter and wait for completion from `wakeupWaiters()`.
  var waitfut = newFuture[void]("SyncQueue.waitForChanges")
  let waititem = SyncWaiter(future: waitfut)
  sq.waiters.add(waititem)
  try:
    await waitfut
    return waititem.reset
  finally:
    sq.waiters.delete(sq.waiters.find(waititem))

proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} =
  ## This procedure will perform wakeupWaiters(false) and blocks until last
  ## waiter will be awakened.
  var waitChanges = sq.waitForChanges()
  sq.wakeupWaiters(true)
  discard await waitChanges

proc clearAndWakeup*[T](sq: SyncQueue[T]) =
  sq.pending.clear()
  sq.wakeupWaiters(true)

proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} =
  ## Perform reset of all the blocked waiters in SyncQueue.
  ##
  ## We adding one more waiter to the waiters sequence and
  ## call wakeupWaiters(true). Because our waiter is last in sequence of
  ## waiters it will be resumed only after all waiters will be awakened and
  ## finished.

  # We are clearing pending list, so that all requests that are still running
  # around (still downloading, but not yet pushed to the SyncQueue) will be
  # expired. Its important to perform this call first (before await), otherwise
  # you can introduce race problem.
  sq.pending.clear()

  # We calculating minimal slot number to which we will be able to reset,
  # without missing any blocks. There 3 sources:
  # 1. Debts queue.
  # 2. Processing queue (`inpSlot`, `outSlot`).
  # 3. Requested slot `toSlot`.
  #
  # Queue's `outSlot` is the lowest slot we added to `block_pool`, but
  # `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not
  # added slot requests, so it can't be bigger then `outSlot` value.
  let minSlot =
    case sq.kind
    of SyncQueueKind.Forward:
      if toSlot.isSome():
        min(toSlot.get(), sq.outSlot)
      else:
        sq.outSlot
    of SyncQueueKind.Backward:
      if toSlot.isSome():
        toSlot.get()
      else:
        sq.outSlot
  sq.debtsQueue.clear()
  sq.debtsCount = 0
  sq.readyQueue.clear()
  sq.inpSlot = minSlot
  sq.outSlot = minSlot
  # We are going to wakeup all the waiters and wait for last one.
  await sq.wakeupAndWaitWaiters()

proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} =
  ## Returns ``true`` if response chain of blocks is empty (has only empty
  ## slots).
  len(sr.data) == 0

proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} =
  ## Returns ``true`` if response chain of blocks has gap at the end.
  let lastslot = sr.request.slot + sr.request.count - 1'u64
  if len(sr.data) == 0:
    return true
  if sr.data[^1][].slot != lastslot:
    return true
  return false

proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} =
  ## Returns last non-empty slot from result ``sr``. If response has only
  ## empty slots, original request slot will be returned.
  if len(sr.data) == 0:
    # If response has only empty slots we going to use original request slot
    sr.request.slot
  else:
    sr.data[^1][].slot

proc processGap[T](sq: SyncQueue[T], sr: SyncResult[T]) =
  if sr.isEmpty():
    let gitem = GapItem[T](start: sr.request.slot,
                           finish: sr.request.slot + sr.request.count - 1'u64,
                           item: sr.request.item)
    sq.gapList.add(gitem)
  else:
    if sr.hasEndGap():
      let gitem = GapItem[T](start: sr.getLastNonEmptySlot() + 1'u64,
                             finish: sr.request.slot + sr.request.count - 1'u64,
                             item: sr.request.item)
      sq.gapList.add(gitem)
    else:
      sq.gapList.reset()

proc rewardForGaps[T](sq: SyncQueue[T], score: int) =
  mixin updateScore, getStats
  logScope:
    sync_ident = sq.ident
    direction = sq.kind
    topics = "syncman"

  for gap in sq.gapList:
    if score < 0:
      # Every empty response increases penalty by 25%, but not more than 200%.
      let
        emptyCount = gap.item.getStats(SyncResponseKind.Empty)
        goodCount = gap.item.getStats(SyncResponseKind.Good)

      if emptyCount <= goodCount:
        gap.item.updateScore(score)
      else:
        let
          weight = int(min(emptyCount - goodCount, 8'u64))
          newScore = score + score * weight div 4
        gap.item.updateScore(newScore)
        debug "Peer received gap penalty", peer = gap.item,
              penalty = newScore
    else:
      gap.item.updateScore(score)

proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
  sq.debtsQueue.push(sr)
  sq.debtsCount = sq.debtsCount + sr.count

proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
                        safeSlot: Slot): Slot =
  logScope:
    sync_ident = sq.ident
    direction = sq.kind
    topics = "syncman"

  case sq.kind
  of SyncQueueKind.Forward:
    # Calculate the latest finalized epoch.
    let finalizedEpoch = epoch(safeSlot)

    # Calculate failure epoch.
    let failEpoch = epoch(failSlot)

    # Calculate exponential rewind point in number of epochs.
    let epochCount =
      if sq.rewind.isSome():
        let rewind = sq.rewind.get()
        if failSlot == rewind.failSlot:
          # `MissingParent` happened at same slot so we increase rewind point by
          # factor of 2.
          if failEpoch > finalizedEpoch:
            let rewindPoint = rewind.epochCount shl 1
            if rewindPoint < rewind.epochCount:
              # If exponential rewind point produces `uint64` overflow we will
              # make rewind to latest finalized epoch.
              failEpoch - finalizedEpoch
            else:
              if (failEpoch < rewindPoint) or
                 (failEpoch - rewindPoint < finalizedEpoch):
                # If exponential rewind point points to position which is far
                # behind latest finalized epoch.
                failEpoch - finalizedEpoch
              else:
                rewindPoint
          else:
            warn "Trying to rewind over the last finalized epoch",
                 finalized_slot = safeSlot, fail_slot = failSlot,
                 finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
                 rewind_epoch_count = rewind.epochCount,
                 finalized_epoch = finalizedEpoch
            0'u64
        else:
          # `MissingParent` happened at different slot so we going to rewind for
          # 1 epoch only.
          if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch):
            warn "Сould not rewind further than the last finalized epoch",
                 finalized_slot = safeSlot, fail_slot = failSlot,
                 finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
                 rewind_epoch_count = rewind.epochCount,
                 finalized_epoch = finalizedEpoch
            0'u64
          else:
            1'u64
      else:
        # `MissingParent` happened first time.
        if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch):
          warn "Сould not rewind further than the last finalized epoch",
               finalized_slot = safeSlot, fail_slot = failSlot,
               finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
               finalized_epoch = finalizedEpoch
          0'u64
        else:
          1'u64

    if epochCount == 0'u64:
      warn "Unable to continue syncing, please restart the node",
           finalized_slot = safeSlot, fail_slot = failSlot,
           finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
           finalized_epoch = finalizedEpoch
      # Calculate the rewind epoch, which will be equal to last rewind point or
      # finalizedEpoch
      let rewindEpoch =
        if sq.rewind.isNone():
          finalizedEpoch
        else:
          epoch(sq.rewind.get().failSlot) - sq.rewind.get().epochCount
      rewindEpoch.start_slot()
    else:
      # Calculate the rewind epoch, which should not be less than the latest
      # finalized epoch.
      let rewindEpoch = failEpoch - epochCount
      # Update and save new rewind point in SyncQueue.
      sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount))
      rewindEpoch.start_slot()
  of SyncQueueKind.Backward:
    # While we perform backward sync, the only possible slot we could rewind is
    # latest stored block.
    if failSlot == safeSlot:
      warn "Unable to continue syncing, please restart the node",
           safe_slot = safeSlot, fail_slot = failSlot
    safeSlot

iterator blocks*[T](sq: SyncQueue[T],
                    sr: SyncResult[T]): ref ForkedSignedBeaconBlock =
  case sq.kind
  of SyncQueueKind.Forward:
    for i in countup(0, len(sr.data) - 1):
      yield sr.data[i]
  of SyncQueueKind.Backward:
    for i in countdown(len(sr.data) - 1, 0):
      yield sr.data[i]

proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) =
  case sq.kind
  of SyncQueueKind.Forward:
    sq.outSlot = sq.outSlot + number
  of SyncQueueKind.Backward:
    sq.outSlot = sq.outSlot - number

proc advanceInput[T](sq: SyncQueue[T], number: uint64) =
  case sq.kind
  of SyncQueueKind.Forward:
    sq.inpSlot = sq.inpSlot + number
  of SyncQueueKind.Backward:
    sq.inpSlot = sq.inpSlot - number

proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool =
  case sq.kind
  of SyncQueueKind.Forward:
    (sq.queueSize > 0) and (sr.slot > sq.outSlot)
  of SyncQueueKind.Backward:
    (sq.queueSize > 0) and (sr.lastSlot < sq.outSlot)

func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
  ## Compute the number of slots covered by a given `SyncRequest` that are
  ## already known and, hence, no longer relevant for sync progression.
  let
    outSlot = sq.outSlot
    lowSlot = sr.slot
    highSlot = sr.lastSlot
  case sq.kind
  of SyncQueueKind.Forward:
    if outSlot > highSlot:
      # Entire request is no longer relevant.
      sr.count
    elif outSlot > lowSlot:
      # Request is only partially relevant.
      outSlot - lowSlot
    else:
      # Entire request is still relevant.
      0
  of SyncQueueKind.Backward:
    if lowSlot > outSlot:
      # Entire request is no longer relevant.
      sr.count
    elif highSlot > outSlot:
      # Request is only partially relevant.
      highSlot - outSlot
    else:
      # Entire request is still relevant.
      0

proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
              data: seq[ref ForkedSignedBeaconBlock],
              processingCb: ProcessingCallback = nil) {.async.} =
  logScope:
    sync_ident = sq.ident
    topics = "syncman"

  ## Push successful result to queue ``sq``.
  mixin updateScore, updateStats, getStats

  if sr.index notin sq.pending:
    # If request `sr` not in our pending list, it only means that
    # SyncQueue.resetWait() happens and all pending requests are expired, so
    # we swallow `old` requests, and in such way sync-workers are able to get
    # proper new requests from SyncQueue.
    return

  sq.pending.del(sr.index)

  # This is backpressure handling algorithm, this algorithm is blocking
  # all pending `push` requests if `request.slot` not in range.
  while true:
    if sq.notInRange(sr):
      let reset = await sq.waitForChanges()
      if reset:
        # SyncQueue reset happens. We are exiting to wake up sync-worker.
        return
    else:
      let syncres = SyncResult[T](request: sr, data: data)
      sq.readyQueue.push(syncres)
      break

  while len(sq.readyQueue) > 0:
    let reqres =
      case sq.kind
      of SyncQueueKind.Forward:
        let minSlot = sq.readyQueue[0].request.slot
        if sq.outSlot < minSlot:
          none[SyncResult[T]]()
        else:
          some(sq.readyQueue.pop())
      of SyncQueueKind.Backward:
        let maxslot = sq.readyQueue[0].request.slot +
                      (sq.readyQueue[0].request.count - 1'u64)
        if sq.outSlot > maxslot:
          none[SyncResult[T]]()
        else:
          some(sq.readyQueue.pop())

    let item =
      if reqres.isSome():
        reqres.get()
      else:
        let rewindSlot = sq.getRewindPoint(sq.outSlot, sq.getSafeSlot())
        warn "Got incorrect sync result in queue, rewind happens",
             blocks_map = getShortMap(sq.readyQueue[0].request,
                                      sq.readyQueue[0].data),
             blocks_count = len(sq.readyQueue[0].data),
             output_slot = sq.outSlot, input_slot = sq.inpSlot,
             rewind_to_slot = rewindSlot, request = sq.readyQueue[0].request
        await sq.resetWait(some(rewindSlot))
        break

    if processingCb != nil:
      processingCb()

    # Validating received blocks one by one
    var
      hasInvalidBlock = false
      unviableBlock: Option[(Eth2Digest, Slot)]
      missingParentSlot: Option[Slot]
      goodBlock: Option[Slot]

      # compiler segfault if this is moved into the for loop, at time of writing
      # TODO this does segfault in 1.2 but not 1.6, so remove workaround when 1.2
      # is dropped.
      res: Result[void, BlockError]

    for blk in sq.blocks(item):
      res = await sq.blockVerifier(blk[])
      if res.isOk():
        goodBlock = some(blk[].slot)
      else:
        case res.error()
        of BlockError.MissingParent:
          missingParentSlot = some(blk[].slot)
          break
        of BlockError.Duplicate:
          # Keep going, happens naturally
          discard
        of BlockError.UnviableFork:
          # Keep going so as to register other unviable blocks with the
          # quarantine
          if unviableBlock.isNone:
            # Remember the first unviable block, so we can log it
            unviableBlock = some((blk[].root, blk[].slot))

        of BlockError.Invalid:
          hasInvalidBlock = true

          let req = item.request
          notice "Received invalid sequence of blocks", request = req,
                  blocks_count = len(item.data),
                  blocks_map = getShortMap(req, item.data)
          req.item.updateScore(PeerScoreBadBlocks)
          break

    # When errors happen while processing blocks, we retry the same request
    # with, hopefully, a different peer
    let retryRequest =
      hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome()
    if not(retryRequest):
      let numSlotsAdvanced = item.request.count - sq.numAlreadyKnownSlots(sr)
      sq.advanceOutput(numSlotsAdvanced)

      if goodBlock.isSome():
        # If there no error and response was not empty we should reward peer
        # with some bonus score - not for duplicate blocks though.
        item.request.item.updateScore(PeerScoreGoodBlocks)
        item.request.item.updateStats(SyncResponseKind.Good, 1'u64)

        # BlockProcessor reports good block, so we can reward all the peers
        # who sent us empty responses.
        sq.rewardForGaps(PeerScoreGoodBlocks)
        sq.gapList.reset()
      else:
        # Response was empty
        item.request.item.updateStats(SyncResponseKind.Empty, 1'u64)

      sq.processGap(item)

      if numSlotsAdvanced > 0:
        sq.wakeupWaiters()
    else:
      debug "Block pool rejected peer's response", request = item.request,
            blocks_map = getShortMap(item.request, item.data),
            blocks_count = len(item.data),
            ok = goodBlock.isSome(),
            unviable = unviableBlock.isSome(),
            missing_parent = missingParentSlot.isSome()
      # We need to move failed response to the debts queue.
      sq.toDebtsQueue(item.request)

      if unviableBlock.isSome():
        let req = item.request
        notice "Received blocks from an unviable fork", request = req,
              blockRoot = unviableBlock.get()[0],
              blockSlot = unviableBlock.get()[1],
              blocks_count = len(item.data),
              blocks_map = getShortMap(req, item.data)
        req.item.updateScore(PeerScoreUnviableFork)

      if missingParentSlot.isSome():
        var
          resetSlot: Option[Slot]
          failSlot = missingParentSlot.get()

        # If we got `BlockError.MissingParent` it means that peer returns chain
        # of blocks with holes or `block_pool` is in incomplete state. We going
        # to rewind the SyncQueue some distance back (2ⁿ, where n∈[0,∞], but
        # no more than `finalized_epoch`).
        let
          req = item.request
          safeSlot = sq.getSafeSlot()
          gapsCount = len(sq.gapList)

        # We should penalize all the peers which responded with gaps.
        sq.rewardForGaps(PeerScoreMissingBlocks)
        sq.gapList.reset()

        case sq.kind
        of SyncQueueKind.Forward:
          if goodBlock.isSome():
            # `BlockError.MissingParent` and `Success` present in response,
            # it means that we just need to request this range one more time.
            debug "Unexpected missing parent, but no rewind needed",
                  request = req, finalized_slot = safeSlot,
                  last_good_slot = goodBlock.get(),
                  missing_parent_slot = missingParentSlot.get(),
                  blocks_count = len(item.data),
                  blocks_map = getShortMap(req, item.data),
                  gaps_count = gapsCount
            req.item.updateScore(PeerScoreMissingBlocks)
          else:
            if safeSlot < req.slot:
              let rewindSlot = sq.getRewindPoint(failSlot, safeSlot)
              debug "Unexpected missing parent, rewind happens",
                   request = req, rewind_to_slot = rewindSlot,
                   rewind_point = sq.rewind, finalized_slot = safeSlot,
                   blocks_count = len(item.data),
                   blocks_map = getShortMap(req, item.data),
                   gaps_count = gapsCount
              resetSlot = some(rewindSlot)
            else:
              error "Unexpected missing parent at finalized epoch slot",
                  request = req, rewind_to_slot = safeSlot,
                  blocks_count = len(item.data),
                  blocks_map = getShortMap(req, item.data),
                  gaps_count = gapsCount
              req.item.updateScore(PeerScoreBadBlocks)
        of SyncQueueKind.Backward:
          if safeSlot > failSlot:
            let rewindSlot = sq.getRewindPoint(failSlot, safeSlot)
            # It's quite common peers give us fewer blocks than we ask for
            info "Gap in block range response, rewinding", request = req,
                 rewind_to_slot = rewindSlot, rewind_fail_slot = failSlot,
                 finalized_slot = safeSlot, blocks_count = len(item.data),
                 blocks_map = getShortMap(req, item.data)
            resetSlot = some(rewindSlot)
            req.item.updateScore(PeerScoreMissingBlocks)
          else:
            error "Unexpected missing parent at safe slot", request = req,
                  to_slot = safeSlot, blocks_count = len(item.data),
                  blocks_map = getShortMap(req, item.data)
            req.item.updateScore(PeerScoreBadBlocks)

        if resetSlot.isSome():
          await sq.resetWait(resetSlot)
          case sq.kind
          of SyncQueueKind.Forward:
            debug "Rewind to slot has happened", reset_slot = resetSlot.get(),
                  queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
                  rewind_point = sq.rewind, direction = sq.kind
          of SyncQueueKind.Backward:
            debug "Rewind to slot has happened", reset_slot = resetSlot.get(),
                  queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
                  direction = sq.kind

      break

proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
  ## Push failed request back to queue.
  if sr.index notin sq.pending:
    # If request `sr` not in our pending list, it only means that
    # SyncQueue.resetWait() happens and all pending requests are expired, so
    # we swallow `old` requests, and in such way sync-workers are able to get
    # proper new requests from SyncQueue.
    return
  sq.pending.del(sr.index)
  sq.toDebtsQueue(sr)

proc handlePotentialSafeSlotAdvancement[T](sq: SyncQueue[T]) =
  # It may happen that sync progress advanced to a newer `safeSlot`, either
  # by a response that started with good values and only had errors late, or
  # through an out-of-band mechanism, e.g., VC / REST.
  # If that happens, advance to the new `safeSlot` to avoid repeating requests
  # for data that is considered immutable and no longer relevant.
  let safeSlot = sq.getSafeSlot()
  func numSlotsBehindSafeSlot(slot: Slot): uint64 =
    case sq.kind
    of SyncQueueKind.Forward:
      if safeSlot > slot:
        safeSlot - slot
      else:
        0
    of SyncQueueKind.Backward:
      if slot > safeSlot:
        slot - safeSlot
      else:
        0

  let
    numOutSlotsAdvanced = sq.outSlot.numSlotsBehindSafeSlot
    numInpSlotsAdvanced =
      case sq.kind
      of SyncQueueKind.Forward:
        sq.inpSlot.numSlotsBehindSafeSlot
      of SyncQueueKind.Backward:
        if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64:
          0'u64
        else:
          sq.inpSlot.numSlotsBehindSafeSlot
  if numOutSlotsAdvanced != 0 or numInpSlotsAdvanced != 0:
    debug "Sync progress advanced out-of-band",
      safeSlot, outSlot = sq.outSlot, inpSlot = sq.inpSlot
    if numOutSlotsAdvanced != 0:
      sq.advanceOutput(numOutSlotsAdvanced)
    if numInpSlotsAdvanced != 0:
      sq.advanceInput(numInpSlotsAdvanced)
    sq.wakeupWaiters()

func updateRequestForNewSafeSlot[T](sq: SyncQueue[T], sr: var SyncRequest[T]) =
  # Requests may have originated before the latest `safeSlot` advancement.
  # Update it to not request any data prior to `safeSlot`.
  let
    outSlot = sq.outSlot
    lowSlot = sr.slot
    highSlot = sr.lastSlot
  case sq.kind
  of SyncQueueKind.Forward:
    if outSlot <= lowSlot:
      # Entire request is still relevant.
      discard
    elif outSlot <= highSlot:
      # Request is only partially relevant.
      let
        numSlotsDone = outSlot - lowSlot
      sr.slot += numSlotsDone
      sr.count -= numSlotsDone
    else:
      # Entire request is no longer relevant.
      sr.count = 0
  of SyncQueueKind.Backward:
    if outSlot >= highSlot:
      # Entire request is still relevant.
      discard
    elif outSlot >= lowSlot:
      # Request is only partially relevant.
      let
        numSlotsDone = highSlot - outSlot
      sr.count -= numSlotsDone
    else:
      # Entire request is no longer relevant.
      sr.count = 0

proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
  ## Create new request according to current SyncQueue parameters.
  sq.handlePotentialSafeSlotAdvancement()
  while len(sq.debtsQueue) > 0:
    if maxslot < sq.debtsQueue[0].slot:
      # Peer's latest slot is less than starting request's slot.
      return SyncRequest.empty(sq.kind, T)
    if maxslot < sq.debtsQueue[0].lastSlot():
      # Peer's latest slot is less than finishing request's slot.
      return SyncRequest.empty(sq.kind, T)
    var sr = sq.debtsQueue.pop()
    sq.debtsCount = sq.debtsCount - sr.count
    sq.updateRequestForNewSafeSlot(sr)
    if sr.isEmpty:
      continue
    sr.setItem(item)
    sq.makePending(sr)
    return sr

  case sq.kind
  of SyncQueueKind.Forward:
    if maxslot < sq.inpSlot:
      # Peer's latest slot is less than queue's input slot.
      return SyncRequest.empty(sq.kind, T)
    if sq.inpSlot > sq.finalSlot:
      # Queue's input slot is bigger than queue's final slot.
      return SyncRequest.empty(sq.kind, T)
    let lastSlot = min(maxslot, sq.finalSlot)
    let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
    var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item)
    sq.advanceInput(count)
    sq.makePending(sr)
    sr
  of SyncQueueKind.Backward:
    if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64:
      return SyncRequest.empty(sq.kind, T)
    if sq.inpSlot < sq.finalSlot:
      return SyncRequest.empty(sq.kind, T)
    let (slot, count) =
      block:
        let baseSlot = sq.inpSlot + 1'u64
        if baseSlot - sq.finalSlot < sq.chunkSize:
          let count = uint64(baseSlot - sq.finalSlot)
          (baseSlot - count, count)
        else:
          (baseSlot - sq.chunkSize, sq.chunkSize)
    if (maxslot + 1'u64) < slot + count:
      # Peer's latest slot is less than queue's input slot.
      return SyncRequest.empty(sq.kind, T)
    var sr = SyncRequest.init(sq.kind, slot, count, item)
    sq.advanceInput(count)
    sq.makePending(sr)
    sr

proc debtLen*[T](sq: SyncQueue[T]): uint64 =
  sq.debtsCount

proc pendingLen*[T](sq: SyncQueue[T]): uint64 =
  case sq.kind
  of SyncQueueKind.Forward:
    # When moving forward `outSlot` will be <= of `inpSlot`.
    sq.inpSlot - sq.outSlot
  of SyncQueueKind.Backward:
    # When moving backward `outSlot` will be >= of `inpSlot`
    sq.outSlot - sq.inpSlot

proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} =
  ## Returns number of slots left in queue ``sq``.
  case sq.kind
  of SyncQueueKind.Forward:
    sq.finalSlot + 1'u64 - sq.outSlot
  of SyncQueueKind.Backward:
    sq.outSlot + 1'u64 - sq.finalSlot

proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} =
  ## Returns total number of slots in queue ``sq``.
  case sq.kind
  of SyncQueueKind.Forward:
    sq.finalSlot + 1'u64 - sq.startSlot
  of SyncQueueKind.Backward:
    sq.startSlot + 1'u64 - sq.finalSlot

proc progress*[T](sq: SyncQueue[T]): uint64 =
  ## How many slots we've synced so far
  case sq.kind
  of SyncQueueKind.Forward:
    sq.outSlot - sq.startSlot
  of SyncQueueKind.Backward:
    sq.startSlot - sq.outSlot