From 868aa56df65eceedeafd083d02d0991cbbb3d763 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Fri, 22 Nov 2024 21:12:45 +0200 Subject: [PATCH] Initial commit. --- beacon_chain/sync/sync_manager.nim | 430 ++++---- beacon_chain/sync/sync_queue.nim | 1487 +++++++++++++--------------- 2 files changed, 923 insertions(+), 994 deletions(-) diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index ac8843c16..20dd64f09 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -34,6 +34,12 @@ const StatusExpirationTime* = chronos.minutes(2) ## Time time it takes for the peer's status information to expire. + ConcurrentRequestsCount* = 3 + ## Number of requests performed by one peer in single syncing step + + RepeatingFailuresCount* = 2 + ## Number of repeating errors before starting rewind process. + WeakSubjectivityLogMessage* = "Database state missing or too old, cannot sync - resync the client " & "using a trusted node or allow lenient long-range syncing with the " & @@ -81,6 +87,8 @@ type direction: SyncQueueKind ident*: string flags: set[SyncManagerFlag] + concurrentRequestsCount: int + repeatingFailuresCount: int SyncMoment* = object stamp*: chronos.Moment @@ -115,8 +123,10 @@ proc initQueue[A, B](man: SyncManager[A, B]) = of SyncQueueKind.Forward: man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(), man.getLastSlot(), man.chunkSize, + man.concurrentRequestsCount, + man.repeatingFailuresCount, man.getSafeSlot, man.blockVerifier, - 1, man.ident) + man.ident) of SyncQueueKind.Backward: let firstSlot = man.getFirstSlot() @@ -128,27 +138,34 @@ proc initQueue[A, B](man: SyncManager[A, B]) = else: firstSlot - 1'u64 man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot, - man.chunkSize, man.getSafeSlot, - man.blockVerifier, 1, man.ident) + man.chunkSize, + man.concurrentRequestsCount, + man.repeatingFailuresCount, + man.getSafeSlot, + man.blockVerifier, man.ident) + +proc newSyncManager*[A, B]( + pool: PeerPool[A, B], + denebEpoch: Epoch, + minEpochsForBlobSidecarsRequests: uint64, + direction: SyncQueueKind, + getLocalHeadSlotCb: GetSlotCallback, + getLocalWallSlotCb: GetSlotCallback, + getFinalizedSlotCb: GetSlotCallback, + getBackfillSlotCb: GetSlotCallback, + getFrontfillSlotCb: GetSlotCallback, + weakSubjectivityPeriodCb: GetBoolCallback, + progressPivot: Slot, + blockVerifier: BlockVerifier, + shutdownEvent: AsyncEvent, + maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), + chunkSize = uint64(SLOTS_PER_EPOCH), + flags: set[SyncManagerFlag] = {}, + concurrentRequestsCount = ConcurrentRequestsCount, + repeatingFailuresCount = RepeatingFailuresCount, + ident = "main" +): SyncManager[A, B] = -proc newSyncManager*[A, B](pool: PeerPool[A, B], - denebEpoch: Epoch, - minEpochsForBlobSidecarsRequests: uint64, - direction: SyncQueueKind, - getLocalHeadSlotCb: GetSlotCallback, - getLocalWallSlotCb: GetSlotCallback, - getFinalizedSlotCb: GetSlotCallback, - getBackfillSlotCb: GetSlotCallback, - getFrontfillSlotCb: GetSlotCallback, - weakSubjectivityPeriodCb: GetBoolCallback, - progressPivot: Slot, - blockVerifier: BlockVerifier, - shutdownEvent: AsyncEvent, - maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), - chunkSize = uint64(SLOTS_PER_EPOCH), - flags: set[SyncManagerFlag] = {}, - ident = "main" - ): SyncManager[A, B] = let (getFirstSlot, getLastSlot, getSafeSlot) = case direction of SyncQueueKind.Forward: (getLocalHeadSlotCb, getLocalWallSlotCb, getFinalizedSlotCb) @@ -173,7 +190,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], direction: direction, shutdownEvent: shutdownEvent, ident: ident, - flags: flags + flags: flags, + concurrentRequestsCount: concurrentRequestsCount, + repeatingFailuresCount: repeatingFailuresCount ) res.initQueue() res @@ -184,6 +203,8 @@ proc getBlocks[A, B](man: SyncManager[A, B], peer: A, mixin getScore, `==` logScope: + request = req + peer = peer peer_score = peer.getScore() peer_speed = peer.netKbps() sync_ident = man.ident @@ -191,9 +212,9 @@ proc getBlocks[A, B](man: SyncManager[A, B], peer: A, topics = "syncman" doAssert(not(req.isEmpty()), "Request must not be empty!") - debug "Requesting blocks from peer", request = req + debug "Requesting blocks from peer" - beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) + beaconBlocksByRange_v2(peer, req.data.slot, req.data.count, 1'u64) proc shouldGetBlobs[A, B](man: SyncManager[A, B], s: Slot): bool = let @@ -204,7 +225,8 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], s: Slot): bool = epoch >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) proc shouldGetBlobs[A, B](man: SyncManager[A, B], r: SyncRequest[A]): bool = - man.shouldGetBlobs(r.slot) or man.shouldGetBlobs(r.slot + (r.count - 1)) + man.shouldGetBlobs(r.data.slot) or + man.shouldGetBlobs(r.data.slot + (r.data.count - 1)) proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, req: SyncRequest[A]): Future[BlobSidecarsRes] @@ -212,6 +234,8 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, mixin getScore, `==` logScope: + request = req + peer = peer peer_score = peer.getScore() peer_speed = peer.netKbps() sync_ident = man.ident @@ -219,8 +243,8 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, topics = "syncman" doAssert(not(req.isEmpty()), "Request must not be empty!") - debug "Requesting blobs sidecars from peer", request = req - blobSidecarsByRange(peer, req.slot, req.count) + debug "Requesting blobs sidecars from peer" + blobSidecarsByRange(peer, req.data.slot, req.data.count) proc remainingSlots(man: SyncManager): uint64 = let @@ -238,8 +262,8 @@ proc remainingSlots(man: SyncManager): uint64 = 0'u64 func groupBlobs*( - blocks: seq[ref ForkedSignedBeaconBlock], - blobs: seq[ref BlobSidecar] + blocks: openArray[ref ForkedSignedBeaconBlock], + blobs: openArray[ref BlobSidecar] ): Result[seq[BlobSidecars], string] = var grouped = newSeq[BlobSidecars](len(blocks)) @@ -365,77 +389,156 @@ proc getSyncBlockData*[T]( ok(SyncBlockData(blocks: blocksRange, blobs: blobsRange)) -proc syncStep[A, B]( - man: SyncManager[A, B], index: int, peer: A -) {.async: (raises: [CancelledError]).} = +proc getSyncBlockData[A, B]( + man: SyncManager[A, B], + index: int, + sr: SyncRequest[A] +): Future[SyncBlockDataRes] {.async: (raises: [CancelledError]).} = logScope: + peer = sr.item + peer_score = sr.item.getScore() + peer_speed = sr.item.netKbps() + index = index + request = sr + sync_ident = man.ident + direction = man.direction + topics = "syncman" + + let + peer = sr.item + blocks = (await man.getBlocks(peer, sr)).valueOr: + peer.updateScore(PeerScoreNoValues) + return err("Failed to receive blocks on request, reason: " & $error) + blockSlots = mapIt(blocks, it[].slot) + + checkResponse(sr, blockSlots).isOkOr: + peer.updateScore(PeerScoreBadResponse) + return err("Incorrect blocks sequence received, reason: " & $error) + + let + shouldGetBlobs = + if not(man.shouldGetBlobs(sr)): + false + else: + var hasBlobs = false + for blck in blocks: + withBlck(blck[]): + when consensusFork >= ConsensusFork.Deneb: + if len(forkyBlck.message.body.blob_kzg_commitments) > 0: + hasBlobs = true + break + hasBlobs + blobs = + if shouldGetBlobs: + let + res = (await man.getBlobSidecars(peer, sr)).valueOr: + peer.updateScore(PeerScoreNoValues) + return err("Failed to receive blobs on request, reason: " & $error) + blobData = res.asSeq() + + debug "Received blobs on request", blobs_count = len(blobData) + + if len(blobData) > 0: + let blobSlots = mapIt(blobData, it[].signed_block_header.message.slot) + checkBlobsResponse(sr, blobSlots).isOkOr: + peer.updateScore(PeerScoreBadResponse) + return err("Incorrect blobs sequence received, reason: " & $error) + + let groupedBlobs = groupBlobs(blocks.asSeq(), blobData).valueOr: + peer.updateScore(PeerScoreNoValues) + return err( + "Received blobs sequence is inconsistent, reason: " & error) + + groupedBlobs.checkBlobs().isOkOr: + peer.updateScore(PeerScoreBadResponse) + return err("Received blobs verification failed, reason: " & error) + Opt.some(groupedBlobs) + else: + Opt.none(seq[BlobSidecars]) + + ok(SyncBlockData(blocks: blocks.asSeq(), blobs: blobs)) + +proc getOrUpdatePeerStatus[A, B]( + man: SyncManager[A, B], index: int, peer: A +): Future[Result[Slot, string]] {.async: (raises: [CancelledError]).} = + logScope: + peer = peer peer_score = peer.getScore() peer_speed = peer.netKbps() index = index sync_ident = man.ident + direction = man.direction topics = "syncman" - var + let headSlot = man.getLocalHeadSlot() wallSlot = man.getLocalWallSlot() peerSlot = peer.getHeadSlot() - block: # Check that peer status is recent and relevant - logScope: - peer = peer - direction = man.direction + debug "Peer's syncing status", + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_head_slot = headSlot - debug "Peer's syncing status", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot + let + peerStatusAge = Moment.now() - peer.getStatusLastTime() + needsUpdate = + # Latest status we got is old + peerStatusAge >= StatusExpirationTime or + # The point we need to sync is close to where the peer is + man.getFirstSlot() >= peerSlot - let - peerStatusAge = Moment.now() - peer.getStatusLastTime() - needsUpdate = - # Latest status we got is old - peerStatusAge >= StatusExpirationTime or - # The point we need to sync is close to where the peer is - man.getFirstSlot() >= peerSlot + if not(needsUpdate): + return ok(peerSlot) - if needsUpdate: - man.workers[index].status = SyncWorkerStatus.UpdatingStatus + man.workers[index].status = SyncWorkerStatus.UpdatingStatus - # Avoid a stampede of requests, but make them more frequent in case the - # peer is "close" to the slot range of interest - if peerStatusAge < StatusExpirationTime div 2: - await sleepAsync(StatusExpirationTime div 2 - peerStatusAge) + # Avoid a stampede of requests, but make them more frequent in case the + # peer is "close" to the slot range of interest + if peerStatusAge < (StatusExpirationTime div 2): + await sleepAsync((StatusExpirationTime div 2) - peerStatusAge) - trace "Updating peer's status information", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot + trace "Updating peer's status information", + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_head_slot = headSlot - if not(await peer.updateStatus()): - peer.updateScore(PeerScoreNoStatus) - debug "Failed to get remote peer's status, exiting", - peer_head_slot = peerSlot + if not(await peer.updateStatus()): + peer.updateScore(PeerScoreNoStatus) + return err("Failed to get remote peer status") - return + let newPeerSlot = peer.getHeadSlot() + if peerSlot >= newPeerSlot: + peer.updateScore(PeerScoreStaleStatus) + debug "Peer's status information is stale", + wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, + local_head_slot = headSlot, remote_new_head_slot = newPeerSlot + else: + debug "Peer's status information updated", wall_clock_slot = wallSlot, + remote_old_head_slot = peerSlot, local_head_slot = headSlot, + remote_new_head_slot = newPeerSlot + peer.updateScore(PeerScoreGoodStatus) + ok(newPeerSlot) - let newPeerSlot = peer.getHeadSlot() - if peerSlot >= newPeerSlot: - peer.updateScore(PeerScoreStaleStatus) - debug "Peer's status information is stale", - wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, - local_head_slot = headSlot, remote_new_head_slot = newPeerSlot - else: - debug "Peer's status information updated", wall_clock_slot = wallSlot, - remote_old_head_slot = peerSlot, local_head_slot = headSlot, - remote_new_head_slot = newPeerSlot - peer.updateScore(PeerScoreGoodStatus) - peerSlot = newPeerSlot +proc syncStep[A, B]( + man: SyncManager[A, B], index: int, peer: A +) {.async: (raises: [CancelledError]).} = + logScope: + peer = peer + peer_score = peer.getScore() + peer_speed = peer.netKbps() + index = index + direction = man.direction + sync_ident = man.ident + topics = "syncman" - # Time passed - enough to move slots, if sleep happened + let + peerSlot = (await man.getOrUpdatePeerStatus(index, peer)).valueOr: + debug "Failed to get remote peer's status, exiting", + peer_head_slot = peer.getHeadSlot() + return headSlot = man.getLocalHeadSlot() wallSlot = man.getLocalWallSlot() if man.remainingSlots() <= man.maxHeadAge: - logScope: - peer = peer - direction = man.direction - case man.direction of SyncQueueKind.Forward: info "We are in sync with network", wall_clock_slot = wallSlot, @@ -462,148 +565,69 @@ proc syncStep[A, B]( # Right now we decreasing peer's score a bit, so it will not be # disconnected due to low peer's score, but new fresh peers could replace # peers with low latest head. - debug "Peer's head slot is lower then local head slot", peer = peer, + debug "Peer's head slot is lower then local head slot", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_last_slot = man.getLastSlot(), - local_first_slot = man.getFirstSlot(), - direction = man.direction + local_first_slot = man.getFirstSlot() peer.updateScore(PeerScoreUseless) return # Wall clock keeps ticking, so we need to update the queue man.queue.updateLastSlot(man.getLastSlot()) - man.workers[index].status = SyncWorkerStatus.Requesting - let req = man.queue.pop(peerSlot, peer) - if req.isEmpty(): - # SyncQueue could return empty request in 2 cases: - # 1. There no more slots in SyncQueue to download (we are synced, but - # our ``notInSyncEvent`` is not yet cleared). - # 2. Current peer's known head slot is too low to satisfy request. - # - # To avoid endless loop we going to wait for RESP_TIMEOUT time here. - # This time is enough for all pending requests to finish and it is also - # enough for main sync loop to clear ``notInSyncEvent``. - debug "Empty request received from queue, exiting", peer = peer, - local_head_slot = headSlot, remote_head_slot = peerSlot, - queue_input_slot = man.queue.inpSlot, - queue_output_slot = man.queue.outSlot, - queue_last_slot = man.queue.finalSlot, direction = man.direction - await sleepAsync(RESP_TIMEOUT_DUR) - return + proc processCallback() = + man.workers[index].status = SyncWorkerStatus.Processing - debug "Creating new request for peer", wall_clock_slot = wallSlot, - remote_head_slot = peerSlot, local_head_slot = headSlot, - request = req + var jobs: seq[Future[void].Raising([CancelledError])] - man.workers[index].status = SyncWorkerStatus.Downloading + try: + for rindex in 0 ..< man.concurrentRequestsCount: + man.workers[index].status = SyncWorkerStatus.Requesting + let request = man.queue.pop(peerSlot, peer) + if request.isEmpty(): + # SyncQueue could return empty request in 2 cases: + # 1. There no more slots in SyncQueue to download (we are synced, but + # our ``notInSyncEvent`` is not yet cleared). + # 2. Current peer's known head slot is too low to satisfy request. + # + # To avoid endless loop we going to wait for RESP_TIMEOUT time here. + # This time is enough for all pending requests to finish and it is also + # enough for main sync loop to clear ``notInSyncEvent``. + debug "Empty request received from queue, exiting", + local_head_slot = headSlot, + remote_head_slot = peerSlot, + queue_input_slot = man.queue.inpSlot, + queue_output_slot = man.queue.outSlot, + queue_last_slot = man.queue.finalSlot + await sleepAsync(RESP_TIMEOUT_DUR) + break - let blocks = await man.getBlocks(peer, req) - if blocks.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive blocks on request", - request = req, err = blocks.error - return - let blockData = blocks.get().asSeq() - debug "Received blocks on request", blocks_count = len(blockData), - blocks_map = getShortMap(req, blockData), request = req + man.workers[index].status = SyncWorkerStatus.Downloading + let data = (await man.getSyncBlockData(index, request)).valueOr: + debug "Failed to get block data", reason = error + man.queue.push(request) + break - let slots = mapIt(blockData, it[].slot) - checkResponse(req, slots).isOkOr: - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Incorrect blocks sequence received", - blocks_count = len(blockData), - blocks_map = getShortMap(req, blockData), - request = req, - reason = error - return + # Scoring will happen in `syncUpdate`. + man.workers[index].status = SyncWorkerStatus.Queueing + let + peerFinalized = peer.getFinalizedEpoch().start_slot() + lastSlot = request.data.slot + request.data.count - 1 + # The peer claims the block is finalized - our own block processing will + # verify this point down the line + # TODO descore peers that lie + maybeFinalized = lastSlot < peerFinalized - let shouldGetBlobs = - if not man.shouldGetBlobs(req): - false - else: - var hasBlobs = false - for blck in blockData: - withBlck(blck[]): - when consensusFork >= ConsensusFork.Deneb: - if forkyBlck.message.body.blob_kzg_commitments.len > 0: - hasBlobs = true - break - hasBlobs + jobs.add(man.queue.push(request, data.blocks, data.blobs, maybeFinalized, + processCallback)) - let blobData = - if shouldGetBlobs: - let blobs = await man.getBlobSidecars(peer, req) - if blobs.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive blobs on request", - request = req, err = blobs.error - return - let blobData = blobs.get().asSeq() - debug "Received blobs on request", - blobs_count = len(blobData), - blobs_map = getShortMap(req, blobData), request = req + if len(jobs) > 0: + await allFutures(jobs) - if len(blobData) > 0: - let slots = mapIt(blobData, it[].signed_block_header.message.slot) - checkBlobsResponse(req, slots).isOkOr: - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Incorrect blobs sequence received", - blobs_count = len(blobData), - blobs_map = getShortMap(req, blobData), - request = req, - reason = error - return - let groupedBlobs = groupBlobs(blockData, blobData).valueOr: - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - info "Received blobs sequence is inconsistent", - blobs_map = getShortMap(req, blobData), - request = req, msg = error - return - groupedBlobs.checkBlobs().isOkOr: - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received blobs verification failed", - blobs_count = len(blobData), - blobs_map = getShortMap(req, blobData), - request = req, - reason = error - return - Opt.some(groupedBlobs) - else: - Opt.none(seq[BlobSidecars]) - - if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and - req.contains(man.getSafeSlot()): - # The sync protocol does not distinguish between: - # - All requested slots are empty - # - Peer does not have data available about requested range - # - # However, we include the `backfill` slot in backward sync requests. - # If we receive an empty response to a request covering that slot, - # we know that the response is incomplete and can descore. - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Response does not include known-to-exist block", request = req - return - - # Scoring will happen in `syncUpdate`. - man.workers[index].status = SyncWorkerStatus.Queueing - let - peerFinalized = peer.getFinalizedEpoch().start_slot() - lastSlot = req.slot + req.count - # The peer claims the block is finalized - our own block processing will - # verify this point down the line - # TODO descore peers that lie - maybeFinalized = lastSlot < peerFinalized - - await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = - man.workers[index].status = SyncWorkerStatus.Processing) + except CancelledError as exc: + let pending = jobs.filterIt(not(it.finished)).mapIt(cancelAndWait(it)) + await noCancel allFutures(pending) + raise exc proc syncWorker[A, B]( man: SyncManager[A, B], index: int diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 700faad61..82337f2ab 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -1,13 +1,4 @@ -# beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -{.push raises: [].} - -import std/[heapqueue, tables, strutils, sequtils, math] +import std/[deques, heapqueue, tables, strutils, sequtils, math] import stew/base10, chronos, chronicles, results import ../spec/datatypes/[base, phase0, altair], @@ -30,29 +21,56 @@ type blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + SyncRange* = object + slot*: Slot + count*: uint64 + + SyncPosition* = object + qindex*: int + sindex*: int + SyncQueueKind* {.pure.} = enum Forward, Backward SyncRequest*[T] = object kind*: SyncQueueKind - index*: uint64 - slot*: Slot - count*: uint64 + data*: SyncRange item*: T - SyncResult*[T] = object - request*: SyncRequest[T] - data*: seq[ref ForkedSignedBeaconBlock] - blobs*: Opt[seq[BlobSidecars]] + SyncRequestQueueItem*[T] = object + requests: seq[SyncRequest[T]] + data: SyncRange - GapItem*[T] = object - start*: Slot - finish*: Slot - item*: T + SyncQueueItem[T] = object + requests: seq[SyncRequest[T]] + data: SyncRange + failuresCount: Natural - SyncWaiter* = ref object + SyncWaiterItem[T] = ref object future: Future[void].Raising([CancelledError]) - reset: bool + request: SyncRequest[T] + resetFlag: bool + + SyncProcessError {.pure.} = enum + Invalid, + MissingParent, + GoodAndMissingParent, + UnviableFork, + Duplicate, + Empty, + NoError + + SyncBlock = object + slot: Slot + root: Eth2Digest + + SyncProcessingResult = object + code: SyncProcessError + blck: Opt[SyncBlock] + + GapItem[T] = object + data: SyncRange + item: T RewindPoint = object failSlot: Slot @@ -64,39 +82,49 @@ type outSlot*: Slot startSlot*: Slot finalSlot*: Slot - chunkSize*: uint64 - queueSize*: int - counter*: uint64 - pending*: Table[uint64, SyncRequest[T]] - gapList*: seq[GapItem[T]] - waiters: seq[SyncWaiter] - getSafeSlot*: GetSlotCallback - debtsQueue: HeapQueue[SyncRequest[T]] - debtsCount: uint64 - readyQueue: HeapQueue[SyncResult[T]] - rewind: Option[RewindPoint] + rewind: Opt[RewindPoint] + chunkSize: uint64 + requestsCount: Natural + failureResetThreshold: Natural + requests: Deque[SyncQueueItem[T]] + getSafeSlot: GetSlotCallback blockVerifier: BlockVerifier - ident*: string + waiters: seq[SyncWaiterItem[T]] + gapList: seq[GapItem[T]] + lock: AsyncLock + ident: string chronicles.formatIt SyncQueueKind: toLowerAscii($it) -template shortLog*[T](req: SyncRequest[T]): string = - Base10.toString(uint64(req.slot)) & ":" & - Base10.toString(req.count) & "@" & - Base10.toString(req.index) +proc `$`*(srange: SyncRange): string = + "[" & Base10.toString(uint64(srange.slot)) & ":" & + Base10.toString(uint64(srange.slot + srange.count - 1)) & "]" + +template shortLog[T](req: SyncRequest[T]): string = + $req.data & "@" & Base10.toString(req.data.count) chronicles.expandIt SyncRequest: `it` = shortLog(it) peer = shortLog(it.item) direction = toLowerAscii($it.kind) -proc getShortMap*[T](req: SyncRequest[T], - data: openArray[ref ForkedSignedBeaconBlock]): string = +chronicles.formatIt Opt[SyncBlock]: + if it.isSome(): + Base10.toString(uint64(it.get().slot)) & "@" & shortLog(it.get().root) + else: + "" + +func getShortMap*[T]( + req: SyncRequest[T], + data: openArray[ref ForkedSignedBeaconBlock] +): string = ## Returns all slot numbers in ``data`` as placement map. - var res = newStringOfCap(req.count) - var slider = req.slot - var last = 0 - for i in 0 ..< req.count: + var + res = newStringOfCap(req.data.count) + slider = req.data.slot + last = 0 + + for i in 0 ..< req.data.count: if last < len(data): for k in last ..< len(data): if slider == data[k][].slot: @@ -111,370 +139,167 @@ proc getShortMap*[T](req: SyncRequest[T], slider = slider + 1 res -proc getShortMap*[T](req: SyncRequest[T], - data: openArray[ref BlobSidecar]): string = - ## Returns all slot numbers in ``data`` as placement map. - var res = newStringOfCap(req.count * MAX_BLOBS_PER_BLOCK) - var cur : uint64 = 0 - for slot in req.slot..= lenu64(data): - res.add('|') - continue - if slot == data[cur].signed_block_header.message.slot: - for k in cur..= lenu64(data) or slot != data[k].signed_block_header.message.slot: - res.add('|') - break +proc getShortMap*[T]( + req: SyncRequest[T], + blobs: openArray[BlobSidecars] +): string = + static: + doAssert(MAX_BLOBS_PER_BLOCK < 10, "getShortMap(Blobs) should be revisited") + + var + res = newStringOfCap(req.data.count) + slider = req.data.slot + last = 0 + + for i in 0 ..< req.data.count: + if last < len(blobs): + if len(blobs[last]) > 0: + let + sidecar = blobs[last][0] + length = len(blobs[last]) + if sidecar.signed_block_header.message.slot == slider: + res.add(Base10.toString(uint64(length))) + inc(last) else: - inc(cur) - res.add('x') + res.add('.') + else: + res.add('.') else: - res.add('|') + res.add('.') + inc(slider) res -proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} = - slot >= req.slot and slot < req.slot + req.count +proc getShortMap*[T]( + req: SyncRequest[T], + data: Opt[seq[BlobSidecars]] +): string = + if data.isNone(): + return '.'.repeat(req.data.count) + getShortMap(req, data.get()) -proc cmp*[T](a, b: SyncRequest[T]): int = - cmp(uint64(a.slot), uint64(b.slot)) +func init*(t: typedesc[SyncRange], slot: Slot, count: uint64): SyncRange = + SyncRange(slot: slot, count: count) -proc checkResponse*[T](req: SyncRequest[T], - data: openArray[Slot]): Result[void, cstring] = - if len(data) == 0: - # Impossible to verify empty response. - return ok() +func init(t: typedesc[SyncProcessError], + kind: VerifierError): SyncProcessError = + case kind + of VerifierError.Invalid: + SyncProcessError.Invalid + of VerifierError.MissingParent: + SyncProcessError.MissingParent + of VerifierError.UnviableFork: + SyncProcessError.UnviableFork + of VerifierError.Duplicate: + SyncProcessError.Duplicate - if lenu64(data) > req.count: - # Number of blocks in response should be less or equal to number of - # requested blocks. - return err("Too many blocks received") +func init(t: typedesc[SyncBlock], slot: Slot, root: Eth2Digest): SyncBlock = + SyncBlock(slot: slot, root: root) - var - slot = req.slot - rindex = 0'u64 - dindex = 0 +func init(t: typedesc[SyncProcessError]): SyncProcessError = + SyncProcessError.NoError - while (rindex < req.count) and (dindex < len(data)): - if slot < data[dindex]: - discard - elif slot == data[dindex]: - inc(dindex) - else: - return err("Incorrect order or duplicate blocks found") - slot += 1'u64 - rindex += 1'u64 +func init(t: typedesc[SyncProcessingResult], se: SyncProcessError, + slot: Slot, root: Eth2Digest): SyncProcessingResult = + SyncProcessingResult(blck: Opt.some(SyncBlock.init(slot, root)), code: se) - if dindex != len(data): - return err("Some of the blocks are outside the requested range") +func init(t: typedesc[SyncProcessingResult], + se: SyncProcessError): SyncProcessingResult = + SyncProcessingResult(code: se) - ok() +func init(t: typedesc[SyncProcessingResult], se: SyncProcessError, + sblck: SyncBlock): SyncProcessingResult = + SyncProcessingResult(blck: Opt.some(sblck), code: se) -proc checkBlobsResponse*[T](req: SyncRequest[T], - data: openArray[Slot]): Result[void, cstring] = - if len(data) == 0: - # Impossible to verify empty response. - return ok() +func init(t: typedesc[SyncProcessingResult], ve: VerifierError, + slot: Slot, root: Eth2Digest): SyncProcessingResult = + SyncProcessingResult(blck: Opt.some(SyncBlock.init(slot, root)), + code: SyncProcessError.init(ve)) - if lenu64(data) > (req.count * MAX_BLOBS_PER_BLOCK): - # Number of blobs in response should be less or equal to number of - # requested (blocks * MAX_BLOBS_PER_BLOCK). - return err("Too many blobs received") +func init(t: typedesc[SyncProcessingResult], ve: VerifierError, + sblck: SyncBlock): SyncProcessingResult = + SyncProcessingResult(blck: Opt.some(sblck), code: SyncProcessError.init(ve)) - var - pslot = data[0] - counter = 0'u64 - for slot in data: - if (slot < req.slot) or (slot >= req.slot + req.count): - return err("Some of the blobs are not in requested range") - if slot < pslot: - return err("Incorrect order") - if slot == pslot: - inc(counter) - if counter > MAX_BLOBS_PER_BLOCK: - return err("Number of blobs in the block exceeds the limit") - else: - counter = 1'u64 - pslot = slot - - ok() - -proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, - finish: Slot, t2: typedesc[T]): SyncRequest[T] = - let count = finish - start + 1'u64 - SyncRequest[T](kind: kind, slot: start, count: count) - -proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, slot: Slot, - count: uint64, item: T): SyncRequest[T] = - SyncRequest[T](kind: kind, slot: slot, count: count, item: item) - -proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, - finish: Slot, item: T): SyncRequest[T] = - let count = finish - start + 1'u64 - SyncRequest[T](kind: kind, slot: start, count: count, item: item) - -proc empty*[T](t: typedesc[SyncRequest], kind: SyncQueueKind, - t2: typedesc[T]): SyncRequest[T] {.inline.} = - SyncRequest[T](kind: kind, count: 0'u64) - -proc setItem*[T](sr: var SyncRequest[T], item: T) = - sr.item = item - -proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} = - (sr.count == 0'u64) - -proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], - queueKind: SyncQueueKind, - start, final: Slot, chunkSize: uint64, - getSafeSlotCb: GetSlotCallback, - blockVerifier: BlockVerifier, - syncQueueSize: int = -1, - ident: string = "main"): SyncQueue[T] = - ## Create new synchronization queue with parameters - ## - ## ``start`` and ``final`` are starting and final Slots. - ## - ## ``chunkSize`` maximum number of slots in one request. - ## - ## ``syncQueueSize`` maximum queue size for incoming data. - ## If ``syncQueueSize > 0`` queue will help to keep backpressure under - ## control. If ``syncQueueSize <= 0`` then queue size is unlimited (default). - - # SyncQueue is the core of sync manager, this data structure distributes - # requests to peers and manages responses from peers. - # - # Because SyncQueue is async data structure it manages backpressure and - # order of incoming responses and it also resolves "joker's" problem. - # - # Joker's problem - # - # According to pre-v0.12.0 Ethereum consensus network specification - # > Clients MUST respond with at least one block, if they have it and it - # > exists in the range. Clients MAY limit the number of blocks in the - # > response. - # https://github.com/ethereum/consensus-specs/blob/v0.11.3/specs/phase0/p2p-interface.md#L590 - # - # Such rule can lead to very uncertain responses, for example let slots from - # 10 to 12 will be not empty. Client which follows specification can answer - # with any response from this list (X - block, `-` empty space): - # - # 1. X X X - # 2. - - X - # 3. - X - - # 4. - X X - # 5. X - - - # 6. X - X - # 7. X X - - # - # If peer answers with `1` everything will be fine and `block_processor` - # will be able to process all 3 blocks. - # In case of `2`, `3`, `4`, `6` - `block_processor` will fail immediately - # with chunk and report "parent is missing" error. - # But in case of `5` and `7` blocks will be processed by `block_processor` - # without any problems, however it will start producing problems right from - # this uncertain last slot. SyncQueue will start producing requests for next - # blocks, but all the responses from this point will fail with "parent is - # missing" error. Lets call such peers "jokers", because they are joking - # with responses. - # - # To fix "joker" problem we going to perform rollback to the latest finalized - # epoch's first slot. - # - # Note that as of spec v0.12.0, well-behaving clients are forbidden from - # answering this way. However, it still makes sense to attempt to handle - # this case to increase compatibility (e.g., with weak subjectivity nodes - # that are still backfilling blocks) - doAssert(chunkSize > 0'u64, "Chunk size should not be zero") - SyncQueue[T]( - kind: queueKind, - startSlot: start, - finalSlot: final, - chunkSize: chunkSize, - queueSize: syncQueueSize, - getSafeSlot: getSafeSlotCb, - waiters: newSeq[SyncWaiter](), - counter: 1'u64, - pending: initTable[uint64, SyncRequest[T]](), - debtsQueue: initHeapQueue[SyncRequest[T]](), - inpSlot: start, - outSlot: start, - blockVerifier: blockVerifier, - ident: ident +func init*[T](t: typedesc[SyncRequest], kind: SyncQueueKind, + item: T): SyncRequest[T] = + SyncRequest[T]( + kind: kind, + data: SyncRange(slot: FAR_FUTURE_SLOT, count: 0'u64), + item: item ) -proc `<`*[T](a, b: SyncRequest[T]): bool = - doAssert(a.kind == b.kind) - case a.kind - of SyncQueueKind.Forward: - a.slot < b.slot - of SyncQueueKind.Backward: - a.slot > b.slot +func init*[T](t: typedesc[SyncRequest], kind: SyncQueueKind, + data: SyncRange, item: T): SyncRequest[T] = + SyncRequest[T](kind: kind, data: data, item: item) -proc `<`*[T](a, b: SyncResult[T]): bool = - doAssert(a.request.kind == b.request.kind) - case a.request.kind - of SyncQueueKind.Forward: - a.request.slot < b.request.slot - of SyncQueueKind.Backward: - a.request.slot > b.request.slot +func init[T](t: typedesc[SyncQueueItem], + req: SyncRequest[T]): SyncQueueItem[T] = + SyncQueueItem[T](data: req.data, requests: @[req]) -proc `==`*[T](a, b: SyncRequest[T]): bool = - (a.kind == b.kind) and (a.slot == b.slot) and (a.count == b.count) +func init[T](t: typedesc[GapItem], req: SyncRequest[T]): GapItem[T] = + GapItem[T](data: req.data, item: req.item) -proc lastSlot*[T](req: SyncRequest[T]): Slot = - ## Returns last slot for request ``req``. - req.slot + req.count - 1'u64 +func next(srange: SyncRange): SyncRange {.inline.} = + let slot = srange.slot + srange.count + if slot == FAR_FUTURE_SLOT: + # Finish range + srange + elif slot < srange.slot: + # Range that causes uint64 overflow, fixing. + SyncRange.init(slot, uint64(FAR_FUTURE_SLOT - srange.count)) + else: + if slot + srange.count < slot: + SyncRange.init(slot, uint64(FAR_FUTURE_SLOT - srange.count)) + else: + SyncRange.init(slot, srange.count) -proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) = - req.index = sq.counter - sq.counter = sq.counter + 1'u64 - sq.pending[req.index] = req +func prev(srange: SyncRange): SyncRange {.inline.} = + if srange.slot == GENESIS_SLOT: + # Start range + srange + else: + let slot = srange.slot - srange.count + if slot > srange.slot: + # Range that causes uint64 underflow, fixing. + SyncRange.init(GENESIS_SLOT, uint64(srange.slot)) + else: + SyncRange.init(slot, srange.count) + +func contains(srange: SyncRange, slot: Slot): bool {.inline.} = + ## Returns `true` if `slot` is in range of `srange`. + if (srange.slot + srange.count) < srange.slot: + (slot >= srange.slot) and (slot <= FAR_FUTURE_SLOT) + else: + (slot >= srange.slot) and (slot < (srange.slot + srange.count)) + +func `>`(a, b: SyncRange): bool {.inline.} = + ## Returns `true` if range `a` is above of range `b`. + (a.slot > b.slot) and (a.slot + a.count - 1 > b.slot) + +func `<`(a, b: SyncRange): bool {.inline.} = + ## Returns `true` if range `a` is below of range `b`. + (a.slot < b.slot) and (a.slot + a.count - 1 < b.slot) + +func `==`(a, b: SyncRange): bool {.inline.} = + (a.slot == b.slot) and (a.count == b.count) + +func `==`[T](a, b: SyncRequest[T]): bool {.inline.} = + (a.kind == b.kind) and (a.item == b.item) and (a.data == b.data) + +proc hasEndGap*[T]( + sr: SyncRequest[T], + data: openArray[ref ForkedSignedBeaconBlock] +): bool {.inline.} = + ## Returns ``true`` if response chain of blocks has gap at the end. + if len(data) == 0: + return true + if data[^1][].slot != (sr.data.slot + sr.data.count - 1'u64): + return true + false proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} = ## Update last slot stored in queue ``sq`` with value ``last``. sq.finalSlot = last -proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) = - ## Wakeup one or all blocked waiters. - for item in sq.waiters: - if reset: - item.reset = true - - if not(item.future.finished()): - item.future.complete() - -proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async: (raises: [CancelledError]).} = - ## Create new waiter and wait for completion from `wakeupWaiters()`. - let waitfut = Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges") - let waititem = SyncWaiter(future: waitfut) - sq.waiters.add(waititem) - try: - await waitfut - return waititem.reset - finally: - sq.waiters.delete(sq.waiters.find(waititem)) - -proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async: (raises: [CancelledError]).} = - ## This procedure will perform wakeupWaiters(true) and blocks until last - ## waiter will be awakened. - var waitChanges = sq.waitForChanges() - sq.wakeupWaiters(true) - discard await waitChanges - -proc clearAndWakeup*[T](sq: SyncQueue[T]) = - sq.pending.clear() - sq.wakeupWaiters(true) - -proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async: (raises: [CancelledError]).} = - ## Perform reset of all the blocked waiters in SyncQueue. - ## - ## We adding one more waiter to the waiters sequence and - ## call wakeupWaiters(true). Because our waiter is last in sequence of - ## waiters it will be resumed only after all waiters will be awakened and - ## finished. - - # We are clearing pending list, so that all requests that are still running - # around (still downloading, but not yet pushed to the SyncQueue) will be - # expired. Its important to perform this call first (before await), otherwise - # you can introduce race problem. - sq.pending.clear() - - # We calculating minimal slot number to which we will be able to reset, - # without missing any blocks. There 3 sources: - # 1. Debts queue. - # 2. Processing queue (`inpSlot`, `outSlot`). - # 3. Requested slot `toSlot`. - # - # Queue's `outSlot` is the lowest slot we added to `block_pool`, but - # `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not - # added slot requests, so it can't be bigger then `outSlot` value. - let minSlot = - case sq.kind - of SyncQueueKind.Forward: - if toSlot.isSome(): - min(toSlot.get(), sq.outSlot) - else: - sq.outSlot - of SyncQueueKind.Backward: - if toSlot.isSome(): - toSlot.get() - else: - sq.outSlot - sq.debtsQueue.clear() - sq.debtsCount = 0 - sq.readyQueue.clear() - sq.inpSlot = minSlot - sq.outSlot = minSlot - # We are going to wakeup all the waiters and wait for last one. - await sq.wakeupAndWaitWaiters() - -proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} = - ## Returns ``true`` if response chain of blocks is empty (has only empty - ## slots). - len(sr.data) == 0 - -proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} = - ## Returns ``true`` if response chain of blocks has gap at the end. - let lastslot = sr.request.slot + sr.request.count - 1'u64 - if len(sr.data) == 0: - return true - if sr.data[^1][].slot != lastslot: - return true - return false - -proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} = - ## Returns last non-empty slot from result ``sr``. If response has only - ## empty slots, original request slot will be returned. - if len(sr.data) == 0: - # If response has only empty slots we going to use original request slot - sr.request.slot - else: - sr.data[^1][].slot - -proc processGap[T](sq: SyncQueue[T], sr: SyncResult[T]) = - if sr.isEmpty(): - let gitem = GapItem[T](start: sr.request.slot, - finish: sr.request.slot + sr.request.count - 1'u64, - item: sr.request.item) - sq.gapList.add(gitem) - else: - if sr.hasEndGap(): - let gitem = GapItem[T](start: sr.getLastNonEmptySlot() + 1'u64, - finish: sr.request.slot + sr.request.count - 1'u64, - item: sr.request.item) - sq.gapList.add(gitem) - else: - sq.gapList.reset() - -proc rewardForGaps[T](sq: SyncQueue[T], score: int) = - mixin updateScore, getStats - logScope: - sync_ident = sq.ident - direction = sq.kind - topics = "syncman" - - for gap in sq.gapList: - if score < 0: - # Every empty response increases penalty by 25%, but not more than 200%. - let - emptyCount = gap.item.getStats(SyncResponseKind.Empty) - goodCount = gap.item.getStats(SyncResponseKind.Good) - - if emptyCount <= goodCount: - gap.item.updateScore(score) - else: - let - weight = int(min(emptyCount - goodCount, 8'u64)) - newScore = score + score * weight div 4 - gap.item.updateScore(newScore) - debug "Peer received gap penalty", peer = gap.item, - penalty = newScore - else: - gap.item.updateScore(score) - -proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) = - sq.debtsQueue.push(sr) - sq.debtsCount = sq.debtsCount + sr.count - proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, safeSlot: Slot): Slot = logScope: @@ -559,7 +384,8 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, # finalized epoch. let rewindEpoch = failEpoch - epochCount # Update and save new rewind point in SyncQueue. - sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount)) + sq.rewind = Opt.some( + RewindPoint(failSlot: failSlot, epochCount: epochCount)) rewindEpoch.start_slot() of SyncQueueKind.Backward: # While we perform backward sync, the only possible slot we could rewind is @@ -569,458 +395,537 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, safe_slot = safeSlot, fail_slot = failSlot safeSlot -# This belongs inside the blocks iterator below, but can't be there due to -# https://github.com/nim-lang/Nim/issues/21242 +func init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], + queueKind: SyncQueueKind, + start, final: Slot, + chunkSize: uint64, + requestsCount: Natural, + failureResetThreshold: Natural, + getSafeSlotCb: GetSlotCallback, + blockVerifier: BlockVerifier, + ident: string = "main"): SyncQueue[T] = + doAssert(chunkSize > 0'u64, "Chunk size should not be zero") + doAssert(requestsCount > 0, "Number of requests should not be zero") + + SyncQueue[T]( + kind: queueKind, + startSlot: start, + finalSlot: final, + chunkSize: chunkSize, + requestsCount: requestsCount, + failureResetThreshold: failureResetThreshold, + getSafeSlot: getSafeSlotCb, + inpSlot: start, + outSlot: start, + blockVerifier: blockVerifier, + requests: initDeque[SyncQueueItem[T]](), + lock: newAsyncLock(), + ident: ident + ) + +func contains[T](requests: openArray[SyncRequest[T]], source: T): bool = + for req in requests: + if req.item == source: + return true + false + +func find[T](sq: SyncQueue[T], req: SyncRequest[T]): Opt[SyncPosition] = + if len(sq.requests) == 0: + return Opt.none(SyncPosition) + + case sq.kind + of SyncQueueKind.Forward: + if (req.data < sq.requests[0].data) or (req.data > sq.requests[^1].data): + return Opt.none(SyncPosition) + of SyncQueueKind.Backward: + if (req.data > sq.requests[0].data) or (req.data < sq.requests[^1].data) : + return Opt.none(SyncPosition) + + for qindex, qitem in sq.requests.pairs(): + for sindex, request in qitem.requests.pairs(): + if request == req: + return Opt.some(SyncPosition(qindex: qindex, sindex: sindex)) + + Opt.none(SyncPosition) + +proc del[T](sq: SyncQueue[T], position: SyncPosition) = + doAssert(len(sq.requests) > position.qindex) + doAssert(len(sq.requests[position.qindex].requests) > position.sindex) + del(sq.requests[position.qindex].requests, position.sindex) + +proc del[T](sq: SyncQueue[T], request: SyncRequest[T]) = + let pos = sq.find(request).valueOr: + return + sq.del(pos) + +proc rewardForGaps[T](sq: SyncQueue[T], score: int) = + mixin updateScore, getStats + + logScope: + sync_ident = sq.ident + direction = sq.kind + topics = "syncman" + + for gap in sq.gapList: + if score < 0: + # Every empty response increases penalty by 25%, but not more than 200%. + let + emptyCount = gap.item.getStats(SyncResponseKind.Empty) + goodCount = gap.item.getStats(SyncResponseKind.Good) + + if emptyCount <= goodCount: + gap.item.updateScore(score) + else: + let + weight = int(min(emptyCount - goodCount, 8'u64)) + newScore = score + score * weight div 4 + gap.item.updateScore(newScore) + debug "Peer received gap penalty", peer = gap.item, + penalty = newScore + else: + gap.item.updateScore(score) + +proc pop*[T](sq: SyncQueue[T], peerMaxSlot: Slot, item: T): SyncRequest[T] = + # Searching requests queue for an empty space. + var count = 0 + for qitem in sq.requests.mitems(): + if len(qitem.requests) < sq.requestsCount: + if item notin qitem.requests: + return + if qitem.data.slot > peerMaxSlot: + # Peer could not satisfy our request, returning empty one. + SyncRequest.init(sq.kind, item) + else: + doAssert(count < sq.requestsCount, + "You should not pop so many requests for single peer") + let request = SyncRequest.init(sq.kind, qitem.data, item) + qitem.requests.add(request) + request + else: + inc(count) + + doAssert(count < sq.requestsCount, + "You should not pop so many requests for single peer") + + # No empty spaces has been found in queue, so we adding new request. + let newrange = + if len(sq.requests) > 0: + # All requests are filled, adding one more request. + let lastrange = sq.requests[^1].data + if sq.finalSlot in lastrange: + # Requests queue is already at finish position, we are not going to add + # one more request range. + return SyncRequest.init(sq.kind, item) + + case sq.kind + of SyncQueueKind.Forward: + lastrange.next() + of SyncQueueKind.Backward: + lastrange.prev() + else: + case sq.kind + of SyncQueueKind.Forward: + SyncRange.init(sq.inpSlot, sq.chunkSize) + of SyncQueueKind.Backward: + SyncRange.init(sq.inpSlot - (sq.chunkSize - 1), sq.chunkSize) + + if newrange.slot > peerMaxSlot: + # Peer could not satisfy our request, returning empty one. + SyncRequest.init(sq.kind, item) + else: + let request = SyncRequest.init(sq.kind, newrange, item) + sq.requests.addLast(SyncQueueItem.init(request)) + request + +proc wakeupWaiters[T](sq: SyncQueue[T], resetFlag = false) = + ## Wakeup one or all blocked waiters. + for item in sq.waiters: + item.resetFlag = resetFlag + if not(item.future.finished()): + item.future.complete() + +proc waitForChanges[T]( + sq: SyncQueue[T] +): Future[bool] {.async: (raises: [CancelledError]).} = + ## Create new waiter and wait for completion from `wakeupWaiters()`. + let + future = + Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges") + item = SyncWaiterItem[T](future: future, resetFlag: false) + + sq.waiters.add(item) + + try: + await future + item.resetFlag + finally: + sq.waiters.delete(sq.waiters.find(item)) + +proc wakeupAndWaitWaiters[T]( + sq: SyncQueue[T] +) {.async: (raises: [CancelledError]).} = + ## This procedure will perform wakeupWaiters(true) and blocks until last + ## waiter will be awakened. + let waitChanges = sq.waitForChanges() + sq.wakeupWaiters(true) + discard await waitChanges + +template advanceImpl(kind, slot: untyped, number: uint64) = + case kind + of SyncQueueKind.Forward: + if slot + number < slot: + slot = FAR_FUTURE_SLOT + else: + slot = slot + number + of SyncQueueKind.Backward: + if slot - number > slot: + slot = GENESIS_SLOT + else: + slot = slot - number + +proc advanceOutput[T](sq: SyncQueue[T], number: uint64) = + advanceImpl(sq.kind, sq.outSlot, number) + +proc advanceInput[T](sq: SyncQueue[T], number: uint64) = + advanceImpl(sq.kind, sq.inpSlot, number) + +proc advanceQueue[T](sq: SyncQueue[T]) = + if len(sq.requests) > 0: + let item = sq.requests.popFirst() + sq.advanceInput(item.data.count) + sq.advanceOutput(item.data.count) + else: + sq.advanceInput(sq.chunkSize) + sq.advanceOutput(sq.chunkSize) + sq.wakeupWaiters() + +proc resetQueue[T](sq: SyncQueue[T]) = + sq.requests.reset() + +proc clearAndWakeup*[T](sq: SyncQueue[T]) = + # Reset queue and wakeup all the waiters. + sq.resetQueue() + sq.wakeupWaiters(true) + +proc isEmpty*[T](sr: SyncRequest[T]): bool = + # Returns `true` if request `sr` is empty. + sr.data.count == 0'u64 + +proc resetWait[T]( + sq: SyncQueue[T], + toSlot: Slot +) {.async: (raises: [CancelledError], raw: true).} = + sq.inpSlot = toSlot + sq.outSlot = toSlot + # We are going to wakeup all the waiters and wait for last one. + sq.resetQueue() + sq.wakeupAndWaitWaiters() + func getOpt(blobs: Opt[seq[BlobSidecars]], i: int): Opt[BlobSidecars] = if blobs.isSome: Opt.some(blobs.get()[i]) else: Opt.none(BlobSidecars) -iterator blocks[T](sq: SyncQueue[T], - sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars]) = - case sq.kind +iterator blocks( + kind: SyncQueueKind, + blcks: seq[ref ForkedSignedBeaconBlock], + blobs: Opt[seq[BlobSidecars]] +): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars]) = + case kind of SyncQueueKind.Forward: - for i in countup(0, len(sr.data) - 1): - yield (sr.data[i], sr.blobs.getOpt(i)) + for i in countup(0, len(blcks) - 1): + yield (blcks[i], blobs.getOpt(i)) of SyncQueueKind.Backward: - for i in countdown(len(sr.data) - 1, 0): - yield (sr.data[i], sr.blobs.getOpt(i)) + for i in countdown(len(blcks) - 1, 0): + yield (blcks[i], blobs.getOpt(i)) -proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) = - case sq.kind - of SyncQueueKind.Forward: - sq.outSlot = sq.outSlot + number - of SyncQueueKind.Backward: - sq.outSlot = sq.outSlot - number +proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) = + ## Push failed request back to queue. + let pos = sq.find(sr).valueOr: + debug "Request is no more relevant", request = sr + return + sq.del(pos) -proc advanceInput[T](sq: SyncQueue[T], number: uint64) = - case sq.kind - of SyncQueueKind.Forward: - sq.inpSlot = sq.inpSlot + number - of SyncQueueKind.Backward: - sq.inpSlot = sq.inpSlot - number +proc process[T]( + sq: SyncQueue[T], + sr: SyncRequest[T], + blcks: seq[ref ForkedSignedBeaconBlock], + blobs: Opt[seq[BlobSidecars]], + maybeFinalized: bool +): Future[SyncProcessingResult] {. + async: (raises: [CancelledError]).} = + var + slot: Opt[SyncBlock] + unviableBlock: Opt[SyncBlock] + dupBlock: Opt[SyncBlock] -proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool = - case sq.kind - of SyncQueueKind.Forward: - (sq.queueSize > 0) and (sr.slot > sq.outSlot) - of SyncQueueKind.Backward: - (sq.queueSize > 0) and (sr.lastSlot < sq.outSlot) + if len(blcks) == 0: + return SyncProcessingResult.init(SyncProcessError.Empty) -func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 = - ## Compute the number of slots covered by a given `SyncRequest` that are - ## already known and, hence, no longer relevant for sync progression. - let - outSlot = sq.outSlot - lowSlot = sr.slot - highSlot = sr.lastSlot - case sq.kind - of SyncQueueKind.Forward: - if outSlot > highSlot: - # Entire request is no longer relevant. - sr.count - elif outSlot > lowSlot: - # Request is only partially relevant. - outSlot - lowSlot + for blk, blb in blocks(sq.kind, blcks, blobs): + let res = await sq.blockVerifier(blk[], blb, maybeFinalized) + if res.isOk(): + slot = Opt.some(SyncBlock.init(blk[].slot, blk[].root)) else: - # Entire request is still relevant. - 0 - of SyncQueueKind.Backward: - if lowSlot > outSlot: - # Entire request is no longer relevant. - sr.count - elif highSlot > outSlot: - # Request is only partially relevant. - highSlot - outSlot - else: - # Entire request is still relevant. - 0 + case res.error() + of VerifierError.MissingParent: + if slot.isSome() or dupBlock.isSome(): + return SyncProcessingResult.init( + SyncProcessError.GoodAndMissingParent, blk[].slot, blk[].root) + else: + return SyncProcessingResult.init(res.error(), blk[].slot, blk[].root) + of VerifierError.Duplicate: + # Keep going, happens naturally + if dupBlock.isNone(): + dupBlock = Opt.some(SyncBlock.init(blk[].slot, blk[].root)) + of VerifierError.UnviableFork: + # Keep going so as to register other unviable blocks with the + # quarantine + if unviableBlock.isNone(): + # Remember the first unviable block, so we can log it + unviableBlock = Opt.some(SyncBlock.init(blk[].slot, blk[].root)) + of VerifierError.Invalid: + return SyncProcessingResult.init(res.error(), blk[].slot, blk[].root) + + if unviableBlock.isSome(): + return SyncProcessingResult.init(VerifierError.UnviableFork, + unviableBlock.get()) + if dupBlock.isSome(): + return SyncProcessingResult.init(VerifierError.Duplicate, + dupBlock.get()) + + SyncProcessingResult.init(SyncProcessError.NoError, slot.get()) + +func isError(e: SyncProcessError): bool = + case e + of SyncProcessError.Empty, SyncProcessError.NoError, + SyncProcessError.Duplicate, SyncProcessError.GoodAndMissingParent: + false + of SyncProcessError.Invalid, SyncProcessError.UnviableFork, + SyncProcessError.MissingParent: + true + +proc push*[T]( + sq: SyncQueue[T], + sr: SyncRequest[T], + data: seq[ref ForkedSignedBeaconBlock], + blobs: Opt[seq[BlobSidecars]], + maybeFinalized: bool = false, + processingCb: ProcessingCallback = nil +) {.async: (raises: [CancelledError]).} = + ## Push successful result to queue ``sq``. + mixin updateScore, updateStats, getStats -proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], - data: seq[ref ForkedSignedBeaconBlock], - blobs: Opt[seq[BlobSidecars]], - maybeFinalized: bool = false, - processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} = logScope: sync_ident = sq.ident topics = "syncman" - ## Push successful result to queue ``sq``. - mixin updateScore, updateStats, getStats - - if sr.index notin sq.pending: - # If request `sr` not in our pending list, it only means that - # SyncQueue.resetWait() happens and all pending requests are expired, so - # we swallow `old` requests, and in such way sync-workers are able to get - # proper new requests from SyncQueue. - return - - sq.pending.del(sr.index) - # This is backpressure handling algorithm, this algorithm is blocking - # all pending `push` requests if `request.slot` not in range. - while true: - if sq.notInRange(sr): - let reset = await sq.waitForChanges() - if reset: - # SyncQueue reset happens. We are exiting to wake up sync-worker. + # all pending `push` requests if `request` is not in range. + var + position = + block: + var pos: SyncPosition + while true: + pos = sq.find(sr).valueOr: + debug "Request is no more relevant", request = sr + # Request is not in queue anymore, probably reset happened. + return + + if pos.qindex == 0: + # Exiting loop when request is first in queue. + break + + try: + let res = await sq.waitForChanges() + if res: + # SyncQueue reset happen + debug "Request is no more relevant, reset happen", request = sr + return + except CancelledError as exc: + # Removing request from queue. + sq.del(sr) + raise exc + pos + + await sq.lock.acquire() + try: + block: + position = sq.find(sr).valueOr: + # Queue has advanced, the request is no longer relevant. + debug "Request is no more relevant", request = sr return - else: - let syncres = SyncResult[T](request: sr, data: data, blobs: blobs) - sq.readyQueue.push(syncres) - break - while len(sq.readyQueue) > 0: - let reqres = - case sq.kind - of SyncQueueKind.Forward: - let minSlot = sq.readyQueue[0].request.slot - if sq.outSlot < minSlot: - none[SyncResult[T]]() - else: - some(sq.readyQueue.pop()) - of SyncQueueKind.Backward: - let maxslot = sq.readyQueue[0].request.slot + - (sq.readyQueue[0].request.count - 1'u64) - if sq.outSlot > maxslot: - none[SyncResult[T]]() - else: - some(sq.readyQueue.pop()) - - let item = - if reqres.isSome(): - reqres.get() - else: - let rewindSlot = sq.getRewindPoint(sq.outSlot, sq.getSafeSlot()) - warn "Got incorrect sync result in queue, rewind happens", - blocks_map = getShortMap(sq.readyQueue[0].request, - sq.readyQueue[0].data), - blocks_count = len(sq.readyQueue[0].data), - output_slot = sq.outSlot, input_slot = sq.inpSlot, - rewind_to_slot = rewindSlot, request = sq.readyQueue[0].request - await sq.resetWait(some(rewindSlot)) - break - - if processingCb != nil: + if not(isNil(processingCb)): processingCb() - # Validating received blocks one by one - var - hasInvalidBlock = false - unviableBlock: Option[(Eth2Digest, Slot)] - missingParentSlot: Option[Slot] - goodBlock: Option[Slot] + let pres = await sq.process(sr, data, blobs, maybeFinalized) - # TODO when https://github.com/nim-lang/Nim/issues/21306 is fixed in used - # Nim versions, remove workaround and move `res` into for loop - res: Result[void, VerifierError] + case pres.code + of SyncProcessError.Empty: + # Empty responses does not affect failures count + debug "Received empty response", + request = sr, + blocks_count = len(data), + blocks_map = getShortMap(sr, data), + blobs_map = getShortMap(sr, blobs) - var i=0 - for blk, blb in sq.blocks(item): - res = await sq.blockVerifier(blk[], blb, maybeFinalized) - inc(i) + sr.item.updateStats(SyncResponseKind.Empty, 1'u64) + sq.gapList.add(GapItem.init(sr)) + sq.advanceQueue() - if res.isOk(): - goodBlock = some(blk[].slot) - else: - case res.error() - of VerifierError.MissingParent: - missingParentSlot = some(blk[].slot) - break - of VerifierError.Duplicate: - # Keep going, happens naturally - discard - of VerifierError.UnviableFork: - # Keep going so as to register other unviable blocks with the - # quarantine - if unviableBlock.isNone: - # Remember the first unviable block, so we can log it - unviableBlock = some((blk[].root, blk[].slot)) + of SyncProcessError.Duplicate: + # Duplicate responses does not affect failures count + debug "Received duplicate response", + request = sr, + blocks_count = len(data), + blocks_map = getShortMap(sr, data), + blobs_map = getShortMap(sr, blobs) + sq.gapList.reset() + sq.advanceQueue() - of VerifierError.Invalid: - hasInvalidBlock = true + of SyncProcessError.Invalid: + debug "Block pool rejected peer's response", + request = sr, + invalid_block = pres.blck, + failures_count = sq.requests[position.qindex].failuresCount, + blocks_count = len(data), + blocks_map = getShortMap(sr, data), + blobs_map = getShortMap(sr, blobs) - let req = item.request - notice "Received invalid sequence of blocks", request = req, - blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data) - req.item.updateScore(PeerScoreBadValues) - break + inc(sq.requests[position.qindex].failuresCount) + sq.del(position) - # When errors happen while processing blocks, we retry the same request - # with, hopefully, a different peer - let retryRequest = - hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome() - if not(retryRequest): - let numSlotsAdvanced = item.request.count - sq.numAlreadyKnownSlots(sr) - sq.advanceOutput(numSlotsAdvanced) + of SyncProcessError.UnviableFork: + notice "Received blocks from an unviable fork", + request = sr, + unviable_block = pres.blck, + failures_count = sq.requests[position.qindex].failuresCount, + blocks_count = len(data), + blocks_map = getShortMap(sr, data), + blobs_map = getShortMap(sr, blobs) - if goodBlock.isSome(): - # If there no error and response was not empty we should reward peer - # with some bonus score - not for duplicate blocks though. - item.request.item.updateScore(PeerScoreGoodValues) - item.request.item.updateStats(SyncResponseKind.Good, 1'u64) + sr.item.updateScore(PeerScoreUnviableFork) + inc(sq.requests[position.qindex].failuresCount) + sq.del(position) - # BlockProcessor reports good block, so we can reward all the peers - # who sent us empty responses. - sq.rewardForGaps(PeerScoreGoodValues) - sq.gapList.reset() - else: - # Response was empty - item.request.item.updateStats(SyncResponseKind.Empty, 1'u64) + of SyncProcessError.MissingParent: + debug "Unexpected missing parent", + request = sr, + missing_parent_block = pres.blck, + failures_count = sq.requests[position.qindex].failuresCount, + blocks_count = len(data), + blocks_map = getShortMap(sr, data), + blobs_map = getShortMap(sr, blobs) - sq.processGap(item) + sr.item.updateScore(PeerScoreMissingValues) + sq.rewardForGaps(PeerScoreMissingValues) + sq.gapList.reset() + inc(sq.requests[position.qindex].failuresCount) + sq.del(position) - if numSlotsAdvanced > 0: - sq.wakeupWaiters() - else: - debug "Block pool rejected peer's response", request = item.request, - blocks_map = getShortMap(item.request, item.data), - blocks_count = len(item.data), - ok = goodBlock.isSome(), - unviable = unviableBlock.isSome(), - missing_parent = missingParentSlot.isSome() - # We need to move failed response to the debts queue. - sq.toDebtsQueue(item.request) + of SyncProcessError.GoodAndMissingParent: + # Responses which has at least one good block and a gap does not affect + # failures count + debug "Unexpected missing parent, but no rewind needed", + request = sr, finalized_slot = sq.getSafeSlot(), + missing_parent_block = pres.blck, + failures_count = sq.requests[position.qindex].failuresCount, + blocks_count = len(data), + blocks_map = getShortMap(sr, data), + blobs_map = getShortMap(sr, blobs) - if unviableBlock.isSome(): - let req = item.request - notice "Received blocks from an unviable fork", request = req, - blockRoot = unviableBlock.get()[0], - blockSlot = unviableBlock.get()[1], - blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data) - req.item.updateScore(PeerScoreUnviableFork) + sr.item.updateScore(PeerScoreMissingValues) + sq.del(position) - if missingParentSlot.isSome(): - var - resetSlot: Option[Slot] - failSlot = missingParentSlot.get() + of SyncProcessError.NoError: + sr.item.updateScore(PeerScoreGoodValues) + sr.item.updateStats(SyncResponseKind.Good, 1'u64) + sq.rewardForGaps(PeerScoreGoodValues) + sq.gapList.reset() - # If we got `VerifierError.MissingParent` it means that peer returns - # chain of blocks with holes or `block_pool` is in incomplete state. We - # going to rewind the SyncQueue some distance back (2ⁿ, where n∈[0,∞], - # but no more than `finalized_epoch`). - let - req = item.request - safeSlot = sq.getSafeSlot() - gapsCount = len(sq.gapList) + if sr.hasEndGap(data): + sq.gapList.add(GapItem.init(sr)) - # We should penalize all the peers which responded with gaps. - sq.rewardForGaps(PeerScoreMissingValues) - sq.gapList.reset() + sq.advanceQueue() - case sq.kind - of SyncQueueKind.Forward: - if goodBlock.isSome(): - # `VerifierError.MissingParent` and `Success` present in response, - # it means that we just need to request this range one more time. - debug "Unexpected missing parent, but no rewind needed", - request = req, finalized_slot = safeSlot, - last_good_slot = goodBlock.get(), - missing_parent_slot = missingParentSlot.get(), - blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - gaps_count = gapsCount - req.item.updateScore(PeerScoreMissingValues) - else: - if safeSlot < req.slot: - let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) - debug "Unexpected missing parent, rewind happens", - request = req, rewind_to_slot = rewindSlot, - rewind_point = sq.rewind, finalized_slot = safeSlot, - blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - gaps_count = gapsCount - resetSlot = some(rewindSlot) - else: - error "Unexpected missing parent at finalized epoch slot", - request = req, rewind_to_slot = safeSlot, - blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - gaps_count = gapsCount - req.item.updateScore(PeerScoreBadValues) - of SyncQueueKind.Backward: - if safeSlot > failSlot: - let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) - # It's quite common peers give us fewer blocks than we ask for - debug "Gap in block range response, rewinding", request = req, - rewind_to_slot = rewindSlot, rewind_fail_slot = failSlot, - finalized_slot = safeSlot, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data) - resetSlot = some(rewindSlot) - req.item.updateScore(PeerScoreMissingValues) - else: - error "Unexpected missing parent at safe slot", request = req, - to_slot = safeSlot, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data) - req.item.updateScore(PeerScoreBadValues) + if pres.code.isError(): + if sq.requests[position.qindex].failuresCount >= sq.failureResetThreshold: + let point = sq.getRewindPoint(pres.blck.get().slot, sq.getSafeSlot()) + debug "Multiple repeating errors occured, rewinding", + failures_count = sq.requests[position.qindex].failuresCount, + rewind_slot = point + await sq.resetWait(point) - if resetSlot.isSome(): - await sq.resetWait(resetSlot) - case sq.kind - of SyncQueueKind.Forward: - debug "Rewind to slot has happened", reset_slot = resetSlot.get(), - queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, - rewind_point = sq.rewind, direction = sq.kind - of SyncQueueKind.Backward: - debug "Rewind to slot has happened", reset_slot = resetSlot.get(), - queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, - direction = sq.kind + except CancelledError as exc: + sq.del(sr) + raise exc + finally: + try: + sq.lock.release() + except AsyncLockError: + raiseAssert "Lock is not acquired" - break +proc checkResponse*[T](req: SyncRequest[T], + data: openArray[Slot]): Result[void, cstring] = + if len(data) == 0: + # Impossible to verify empty response. + return ok() -proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) = - ## Push failed request back to queue. - if sr.index notin sq.pending: - # If request `sr` not in our pending list, it only means that - # SyncQueue.resetWait() happens and all pending requests are expired, so - # we swallow `old` requests, and in such way sync-workers are able to get - # proper new requests from SyncQueue. - return - sq.pending.del(sr.index) - sq.toDebtsQueue(sr) + if lenu64(data) > req.data.count: + # Number of blocks in response should be less or equal to number of + # requested blocks. + return err("Too many blocks received") -proc handlePotentialSafeSlotAdvancement[T](sq: SyncQueue[T]) = - # It may happen that sync progress advanced to a newer `safeSlot`, either - # by a response that started with good values and only had errors late, or - # through an out-of-band mechanism, e.g., VC / REST. - # If that happens, advance to the new `safeSlot` to avoid repeating requests - # for data that is considered immutable and no longer relevant. - let safeSlot = sq.getSafeSlot() - func numSlotsBehindSafeSlot(slot: Slot): uint64 = - case sq.kind - of SyncQueueKind.Forward: - if safeSlot > slot: - safeSlot - slot - else: - 0 - of SyncQueueKind.Backward: - if slot > safeSlot: - slot - safeSlot - else: - 0 + var + slot = req.data.slot + rindex = 0'u64 + dindex = 0 - let - numOutSlotsAdvanced = sq.outSlot.numSlotsBehindSafeSlot - numInpSlotsAdvanced = - case sq.kind - of SyncQueueKind.Forward: - sq.inpSlot.numSlotsBehindSafeSlot - of SyncQueueKind.Backward: - if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64: - 0'u64 - else: - sq.inpSlot.numSlotsBehindSafeSlot - if numOutSlotsAdvanced != 0 or numInpSlotsAdvanced != 0: - debug "Sync progress advanced out-of-band", - safeSlot, outSlot = sq.outSlot, inpSlot = sq.inpSlot - if numOutSlotsAdvanced != 0: - sq.advanceOutput(numOutSlotsAdvanced) - if numInpSlotsAdvanced != 0: - sq.advanceInput(numInpSlotsAdvanced) - sq.wakeupWaiters() - -func updateRequestForNewSafeSlot[T](sq: SyncQueue[T], sr: var SyncRequest[T]) = - # Requests may have originated before the latest `safeSlot` advancement. - # Update it to not request any data prior to `safeSlot`. - let - outSlot = sq.outSlot - lowSlot = sr.slot - highSlot = sr.lastSlot - case sq.kind - of SyncQueueKind.Forward: - if outSlot <= lowSlot: - # Entire request is still relevant. + while (rindex < req.data.count) and (dindex < len(data)): + if slot < data[dindex]: discard - elif outSlot <= highSlot: - # Request is only partially relevant. - let - numSlotsDone = outSlot - lowSlot - sr.slot += numSlotsDone - sr.count -= numSlotsDone + elif slot == data[dindex]: + inc(dindex) else: - # Entire request is no longer relevant. - sr.count = 0 - of SyncQueueKind.Backward: - if outSlot >= highSlot: - # Entire request is still relevant. - discard - elif outSlot >= lowSlot: - # Request is only partially relevant. - let - numSlotsDone = highSlot - outSlot - sr.count -= numSlotsDone + return err("Incorrect order or duplicate blocks found") + slot += 1'u64 + rindex += 1'u64 + + if dindex != len(data): + return err("Some of the blocks are outside the requested range") + + ok() + +proc checkBlobsResponse*[T](req: SyncRequest[T], + data: openArray[Slot]): Result[void, cstring] = + if len(data) == 0: + # Impossible to verify empty response. + return ok() + + if lenu64(data) > (req.data.count * MAX_BLOBS_PER_BLOCK): + # Number of blobs in response should be less or equal to number of + # requested (blocks * MAX_BLOBS_PER_BLOCK). + return err("Too many blobs received") + + var + pslot = data[0] + counter = 0'u64 + for slot in data: + if slot notin req.data: + return err("Some of the blobs are not in requested range") + if slot < pslot: + return err("Incorrect order") + if slot == pslot: + inc(counter) + if counter > MAX_BLOBS_PER_BLOCK: + return err("Number of blobs in the block exceeds the limit") else: - # Entire request is no longer relevant. - sr.count = 0 + counter = 1'u64 + pslot = slot -proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] = - ## Create new request according to current SyncQueue parameters. - sq.handlePotentialSafeSlotAdvancement() - while len(sq.debtsQueue) > 0: - if maxslot < sq.debtsQueue[0].slot: - # Peer's latest slot is less than starting request's slot. - return SyncRequest.empty(sq.kind, T) - if maxslot < sq.debtsQueue[0].lastSlot(): - # Peer's latest slot is less than finishing request's slot. - return SyncRequest.empty(sq.kind, T) - var sr = sq.debtsQueue.pop() - sq.debtsCount = sq.debtsCount - sr.count - sq.updateRequestForNewSafeSlot(sr) - if sr.isEmpty: - continue - sr.setItem(item) - sq.makePending(sr) - return sr - - case sq.kind - of SyncQueueKind.Forward: - if maxslot < sq.inpSlot: - # Peer's latest slot is less than queue's input slot. - return SyncRequest.empty(sq.kind, T) - if sq.inpSlot > sq.finalSlot: - # Queue's input slot is bigger than queue's final slot. - return SyncRequest.empty(sq.kind, T) - let lastSlot = min(maxslot, sq.finalSlot) - let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot) - var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item) - sq.advanceInput(count) - sq.makePending(sr) - sr - of SyncQueueKind.Backward: - if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64: - return SyncRequest.empty(sq.kind, T) - if sq.inpSlot < sq.finalSlot: - return SyncRequest.empty(sq.kind, T) - let (slot, count) = - block: - let baseSlot = sq.inpSlot + 1'u64 - if baseSlot - sq.finalSlot < sq.chunkSize: - let count = uint64(baseSlot - sq.finalSlot) - (baseSlot - count, count) - else: - (baseSlot - sq.chunkSize, sq.chunkSize) - if (maxslot + 1'u64) < slot + count: - # Peer's latest slot is less than queue's input slot. - return SyncRequest.empty(sq.kind, T) - var sr = SyncRequest.init(sq.kind, slot, count, item) - sq.advanceInput(count) - sq.makePending(sr) - sr - -proc debtLen*[T](sq: SyncQueue[T]): uint64 = - sq.debtsCount - -proc pendingLen*[T](sq: SyncQueue[T]): uint64 = - case sq.kind - of SyncQueueKind.Forward: - # When moving forward `outSlot` will be <= of `inpSlot`. - sq.inpSlot - sq.outSlot - of SyncQueueKind.Backward: - # When moving backward `outSlot` will be >= of `inpSlot` - sq.outSlot - sq.inpSlot + ok() proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} = ## Returns number of slots left in queue ``sq``. @@ -1053,4 +958,4 @@ proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} = proc progress*[T](sq: SyncQueue[T]): uint64 = ## How many useful slots we've synced so far, adjusting for how much has ## become obsolete by time movements - sq.total - sq.len + sq.total() - len(sq)