Backward sync support for SyncManager. (#3131)

* Unbundle SyncQueue from sync_manager.nim.
Unbundle Peer scores constants to peer_scores.nim.
Add Forward/Backward enum.

* Further improvements and tests.

* Adopt getRewindPoint() and fix MissingParent handler.

* Remove unused procedures.
Refactor `result` usage.
Fix resetWait().

* Add all the tests and fix the issue with rewind point.

* Fix get() issue.

* Fix flaky tests.

* test fixes

Co-authored-by: Jacek Sieka <jacek@status.im>
This commit is contained in:
Eugene Kabanov 2021-12-08 23:15:29 +02:00 committed by GitHub
parent 2ca28fb861
commit b05734f610
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1415 additions and 950 deletions

View File

@ -298,22 +298,26 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 4/4 Fail: 0/4 Skip: 0/4
## SyncManager test suite
```diff
+ [SyncQueue] Async pending and resetWait() test OK
+ [SyncQueue] Async unordered push start from zero OK
+ [SyncQueue] Async unordered push with not full start from non-zero OK
+ [SyncQueue] Full and incomplete success/fail start from non-zero OK
+ [SyncQueue] Full and incomplete success/fail start from zero OK
+ [SyncQueue] One smart and one stupid + debt split + empty OK
+ [SyncQueue] Smart and stupid success/fail OK
+ [SyncQueue] Start and finish slots equal OK
+ [SyncQueue] Two full requests success/fail OK
+ [SyncQueue#Backward] Async unordered push test OK
+ [SyncQueue#Backward] Async unordered push with rewind test OK
+ [SyncQueue#Backward] Pass through established limits test OK
+ [SyncQueue#Backward] Smoke test OK
+ [SyncQueue#Backward] Start and finish slots equal OK
+ [SyncQueue#Backward] Two full requests success/fail OK
+ [SyncQueue#Backward] getRewindPoint() test OK
+ [SyncQueue#Forward] Async unordered push test OK
+ [SyncQueue#Forward] Async unordered push with rewind test OK
+ [SyncQueue#Forward] Pass through established limits test OK
+ [SyncQueue#Forward] Smoke test OK
+ [SyncQueue#Forward] Start and finish slots equal OK
+ [SyncQueue#Forward] Two full requests success/fail OK
+ [SyncQueue#Forward] getRewindPoint() test OK
+ [SyncQueue] checkResponse() test OK
+ [SyncQueue] contains() test OK
+ [SyncQueue] getLastNonEmptySlot() test OK
+ [SyncQueue] getRewindPoint() test OK
+ [SyncQueue] hasEndGap() test OK
```
OK: 14/14 Fail: 0/14 Skip: 0/14
OK: 18/18 Fail: 0/18 Skip: 0/18
## Zero signature sanity checks
```diff
+ SSZ serialization roundtrip of SignedBeaconBlockHeader OK
@ -373,4 +377,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL---
OK: 205/207 Fail: 0/207 Skip: 2/207
OK: 209/211 Fail: 0/211 Skip: 2/211

View File

@ -0,0 +1,28 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
const
PeerScoreNoStatus* = -100
## Peer did not answer `status` request.
PeerScoreStaleStatus* = -50
## Peer's `status` answer do not progress in time.
PeerScoreUseless* = -10
## Peer's latest head is lower then ours.
PeerScoreGoodStatus* = 50
## Peer's `status` answer is fine.
PeerScoreNoBlocks* = -100
## Peer did not respond in time on `blocksByRange` request.
PeerScoreGoodBlocks* = 100
## Peer's `blocksByRange` answer is fine.
PeerScoreBadBlocks* = -1000
## Peer's response contains incorrect blocks.
PeerScoreBadResponse* = -1000
## Peer's response is not in requested range.
PeerScoreMissingBlocks* = -200
## Peer response contains too many empty blocks.

View File

@ -7,45 +7,24 @@
{.push raises: [Defect].}
import std/[
options, heapqueue, tables, strutils, sequtils, math, algorithm]
import std/[options, heapqueue, tables, strutils, sequtils, math, algorithm]
import stew/results, chronos, chronicles
import
../spec/datatypes/[phase0, altair],
../spec/datatypes/[base, phase0, altair, merge],
../spec/eth2_apis/rpc_types,
../spec/[helpers, forks],
../networking/[peer_pool, eth2_network]
../networking/[peer_pool, eth2_network],
../gossip_processing/block_processor,
../consensus_object_pools/block_pools_types,
./peer_scores, ./sync_queue
import ../gossip_processing/block_processor
import ../consensus_object_pools/block_pools_types
export phase0, altair, chronos, chronicles, results, block_pools_types,
helpers
export base, phase0, altair, merge, chronos, chronicles, results,
block_pools_types, helpers, peer_scores, sync_queue
logScope:
topics = "syncman"
const
PeerScoreNoStatus* = -100
## Peer did not answer `status` request.
PeerScoreStaleStatus* = -50
## Peer's `status` answer do not progress in time.
PeerScoreUseless* = -10
## Peer's latest head is lower then ours.
PeerScoreGoodStatus* = 50
## Peer's `status` answer is fine.
PeerScoreNoBlocks* = -100
## Peer did not respond in time on `blocksByRange` request.
PeerScoreGoodBlocks* = 100
## Peer's `blocksByRange` answer is fine.
PeerScoreBadBlocks* = -1000
## Peer's response contains incorrect blocks.
PeerScoreBadResponse* = -1000
## Peer's response is not in requested range.
PeerScoreMissingBlocks* = -200
## Peer response contains too many empty blocks.
SyncWorkersCount* = 10
## Number of sync workers to spawn
@ -64,45 +43,6 @@ type
BlockDownload,
BadResponse
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
SyncRequest*[T] = object
index*: uint64
slot*: Slot
count*: uint64
step*: uint64
item*: T
SyncResult*[T] = object
request*: SyncRequest[T]
data*: seq[ForkedSignedBeaconBlock]
SyncWaiter*[T] = object
future: Future[bool]
request: SyncRequest[T]
RewindPoint = object
failSlot: Slot
epochCount: uint64
SyncQueue*[T] = ref object
inpSlot*: Slot
outSlot*: Slot
startSlot*: Slot
lastSlot: Slot
chunkSize*: uint64
queueSize*: int
counter*: uint64
opcounter*: uint64
pending*: Table[uint64, SyncRequest[T]]
waiters: seq[SyncWaiter[T]]
getFinalizedSlot*: GetSlotCallback
debtsQueue: HeapQueue[SyncRequest[T]]
debtsCount: uint64
readyQueue: HeapQueue[SyncResult[T]]
rewind: Option[RewindPoint]
blockProcessor: ref BlockProcessor
SyncWorkerStatus* {.pure.} = enum
Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Processing
@ -148,563 +88,10 @@ type
SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]]
proc validate*[T](sq: SyncQueue[T],
blk: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] =
let resfut = newFuture[Result[void, BlockError]]("sync.manager.validate")
sq.blockProcessor[].addBlock(blk, resfut)
resfut
proc getShortMap*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): string =
## Returns all slot numbers in ``data`` as placement map.
var res = newStringOfCap(req.count)
var slider = req.slot
var last = 0
for i in 0 ..< req.count:
if last < len(data):
for k in last ..< len(data):
if slider == data[k].slot:
res.add('x')
last = k + 1
break
elif slider < data[k].slot:
res.add('.')
break
else:
res.add('.')
slider = slider + req.step
res
proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} =
slot >= req.slot and slot < req.slot + req.count * req.step and
((slot - req.slot) mod req.step == 0)
proc cmp*[T](a, b: SyncRequest[T]): int =
cmp(uint64(a.slot), uint64(b.slot))
proc checkResponse*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): bool =
if len(data) == 0:
# Impossible to verify empty response.
return true
if uint64(len(data)) > req.count:
# Number of blocks in response should be less or equal to number of
# requested blocks.
return false
var slot = req.slot
var rindex = 0'u64
var dindex = 0
while (rindex < req.count) and (dindex < len(data)):
if slot < data[dindex].slot:
discard
elif slot == data[dindex].slot:
inc(dindex)
else:
return false
slot = slot + req.step
rindex = rindex + 1'u64
if dindex == len(data):
return true
else:
return false
proc getFullMap*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): string =
# Returns all slot numbers in ``data`` as comma-delimeted string.
mapIt(data, $it.message.slot).join(", ")
proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], slot: Slot,
count: uint64): SyncRequest[T] =
SyncRequest[T](slot: slot, count: count, step: 1'u64)
proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], start: Slot,
finish: Slot): SyncRequest[T] =
let count = finish - start + 1'u64
SyncRequest[T](slot: start, count: count, step: 1'u64)
proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], slot: Slot,
count: uint64, item: T): SyncRequest[T] =
SyncRequest[T](slot: slot, count: count, item: item, step: 1'u64)
proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], start: Slot,
finish: Slot, item: T): SyncRequest[T] =
let count = finish - start + 1'u64
SyncRequest[T](slot: start, count: count, step: 1'u64, item: item)
proc init*[T](t1: typedesc[SyncFailure], kind: SyncFailureKind,
peer: T): SyncFailure[T] =
SyncFailure[T](kind: kind, peer: peer, stamp: now(chronos.Moment))
proc empty*[T](t: typedesc[SyncRequest],
t2: typedesc[T]): SyncRequest[T] {.inline.} =
SyncRequest[T](step: 0'u64, count: 0'u64)
proc setItem*[T](sr: var SyncRequest[T], item: T) =
sr.item = item
proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
(sr.step == 0'u64) and (sr.count == 0'u64)
proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
start, last: Slot, chunkSize: uint64,
getFinalizedSlotCb: GetSlotCallback,
blockProcessor: ref BlockProcessor,
syncQueueSize: int = -1): SyncQueue[T] =
## Create new synchronization queue with parameters
##
## ``start`` and ``last`` are starting and finishing Slots.
##
## ``chunkSize`` maximum number of slots in one request.
##
## ``syncQueueSize`` maximum queue size for incoming data. If ``syncQueueSize > 0``
## queue will help to keep backpressure under control. If ``syncQueueSize <= 0``
## then queue size is unlimited (default).
##
## ``updateCb`` procedure which will be used to send downloaded blocks to
## consumer. Procedure should return ``false`` only when it receives
## incorrect blocks, and ``true`` if sequence of blocks is correct.
# SyncQueue is the core of sync manager, this data structure distributes
# requests to peers and manages responses from peers.
#
# Because SyncQueue is async data structure it manages backpressure and
# order of incoming responses and it also resolves "joker's" problem.
#
# Joker's problem
#
# According to current Ethereum2 network specification
# > Clients MUST respond with at least one block, if they have it and it
# > exists in the range. Clients MAY limit the number of blocks in the
# > response.
#
# Such rule can lead to very uncertain responses, for example let slots from
# 10 to 12 will be not empty. Client which follows specification can answer
# with any response from this list (X - block, `-` empty space):
#
# 1. X X X
# 2. - - X
# 3. - X -
# 4. - X X
# 5. X - -
# 6. X - X
# 7. X X -
#
# If peer answers with `1` everything will be fine and `block_pool` will be
# able to process all 3 blocks. In case of `2`, `3`, `4`, `6` - `block_pool`
# will fail immediately with chunk and report "parent is missing" error.
# But in case of `5` and `7` blocks will be processed by `block_pool` without
# any problems, however it will start producing problems right from this
# uncertain last slot. SyncQueue will start producing requests for next
# blocks, but all the responses from this point will fail with "parent is
# missing" error. Lets call such peers "jokers", because they are joking
# with responses.
#
# To fix "joker" problem we going to perform rollback to the latest finalized
# epoch's first slot.
doAssert(chunkSize > 0'u64, "Chunk size should not be zero")
result = SyncQueue[T](
startSlot: start,
lastSlot: last,
chunkSize: chunkSize,
queueSize: syncQueueSize,
getFinalizedSlot: getFinalizedSlotCb,
waiters: newSeq[SyncWaiter[T]](),
counter: 1'u64,
pending: initTable[uint64, SyncRequest[T]](),
debtsQueue: initHeapQueue[SyncRequest[T]](),
inpSlot: start,
outSlot: start,
blockProcessor: blockProcessor
)
proc `<`*[T](a, b: SyncRequest[T]): bool {.inline.} =
a.slot < b.slot
proc `<`*[T](a, b: SyncResult[T]): bool {.inline.} =
a.request.slot < b.request.slot
proc `==`*[T](a, b: SyncRequest[T]): bool {.inline.} =
result = ((a.slot == b.slot) and (a.count == b.count) and
(a.step == b.step))
proc lastSlot*[T](req: SyncRequest[T]): Slot {.inline.} =
## Returns last slot for request ``req``.
req.slot + req.count - 1'u64
proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) =
req.index = sq.counter
sq.counter = sq.counter + 1'u64
sq.pending[req.index] = req
proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
## Update last slot stored in queue ``sq`` with value ``last``.
doAssert(sq.lastSlot <= last,
"Last slot could not be lower then stored one " &
$sq.lastSlot & " <= " & $last)
sq.lastSlot = last
proc wakeupWaiters[T](sq: SyncQueue[T], flag = true) =
## Wakeup one or all blocked waiters.
for item in sq.waiters:
if not(item.future.finished()):
item.future.complete(flag)
proc waitForChanges[T](sq: SyncQueue[T],
req: SyncRequest[T]): Future[bool] {.async.} =
## Create new waiter and wait for completion from `wakeupWaiters()`.
var waitfut = newFuture[bool]("SyncQueue.waitForChanges")
let waititem = SyncWaiter[T](future: waitfut, request: req)
sq.waiters.add(waititem)
try:
result = await waitfut
finally:
sq.waiters.delete(sq.waiters.find(waititem))
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} =
## This procedure will perform wakeupWaiters(false) and blocks until last
## waiter will be awakened.
var waitChanges = sq.waitForChanges(SyncRequest.empty(T))
sq.wakeupWaiters(false)
discard await waitChanges
proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} =
## Perform reset of all the blocked waiters in SyncQueue.
##
## We adding one more waiter to the waiters sequence and
## call wakeupWaiters(false). Because our waiter is last in sequence of
## waiters it will be resumed only after all waiters will be awakened and
## finished.
# We are clearing pending list, so that all requests that are still running
# around (still downloading, but not yet pushed to the SyncQueue) will be
# expired. Its important to perform this call first (before await), otherwise
# you can introduce race problem.
sq.pending.clear()
# We calculating minimal slot number to which we will be able to reset,
# without missing any blocks. There 3 sources:
# 1. Debts queue.
# 2. Processing queue (`inpSlot`, `outSlot`).
# 3. Requested slot `toSlot`.
#
# Queue's `outSlot` is the lowest slot we added to `block_pool`, but
# `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not
# added slot requests, so it can't be bigger then `outSlot` value.
var minSlot = sq.outSlot
if toSlot.isSome():
minSlot = min(toSlot.get(), sq.outSlot)
sq.debtsQueue.clear()
sq.debtsCount = 0
sq.readyQueue.clear()
sq.inpSlot = minSlot
sq.outSlot = minSlot
# We are going to wakeup all the waiters and wait for last one.
await sq.wakeupAndWaitWaiters()
proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} =
## Returns ``true`` if response chain of blocks is empty (has only empty
## slots).
len(sr.data) == 0
proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} =
## Returns ``true`` if response chain of blocks has gap at the end.
let lastslot = sr.request.slot + sr.request.count - 1'u64
if len(sr.data) == 0:
return true
if sr.data[^1].slot != lastslot:
return true
return false
proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} =
## Returns last non-empty slot from result ``sr``. If response has only
## empty slots, original request slot will be returned.
if len(sr.data) == 0:
# If response has only empty slots we going to use original request slot
sr.request.slot
else:
sr.data[^1].slot
proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
sq.debtsQueue.push(sr)
sq.debtsCount = sq.debtsCount + sr.count
proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
finalizedSlot: Slot): Slot =
# Calculate the latest finalized epoch.
let finalizedEpoch = compute_epoch_at_slot(finalizedSlot)
# Calculate failure epoch.
let failEpoch = compute_epoch_at_slot(failSlot)
# Calculate exponential rewind point in number of epochs.
let epochCount =
if sq.rewind.isSome():
let rewind = sq.rewind.get()
if failSlot == rewind.failSlot:
# `MissingParent` happened at same slot so we increase rewind point by
# factor of 2.
if failEpoch > finalizedEpoch:
let rewindPoint = rewind.epochCount shl 1
if rewindPoint < rewind.epochCount:
# If exponential rewind point produces `uint64` overflow we will
# make rewind to latest finalized epoch.
failEpoch - finalizedEpoch
else:
if (failEpoch < rewindPoint) or
(failEpoch - rewindPoint < finalizedEpoch):
# If exponential rewind point points to position which is far
# behind latest finalized epoch.
failEpoch - finalizedEpoch
else:
rewindPoint
else:
warn "Trying to rewind over the last finalized epoch",
finalized_slot = finalizedSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
rewind_epoch_count = rewind.epochCount,
finalized_epoch = finalizedEpoch
0'u64
else:
# `MissingParent` happened at different slot so we going to rewind for
# 1 epoch only.
if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch):
warn "Сould not rewind further than the last finalized epoch",
finalized_slot = finalizedSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
rewind_epoch_count = rewind.epochCount,
finalized_epoch = finalizedEpoch
0'u64
else:
1'u64
else:
# `MissingParent` happened first time.
if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch):
warn "Сould not rewind further than the last finalized epoch",
finalized_slot = finalizedSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
finalized_epoch = finalizedEpoch
0'u64
else:
1'u64
# echo "epochCount = ", epochCount
if epochCount == 0'u64:
warn "Unable to continue syncing, please restart the node",
finalized_slot = finalizedSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
finalized_epoch = finalizedEpoch
# Calculate the rewind epoch, which will be equal to last rewind point or
# finalizedEpoch
let rewindEpoch =
if sq.rewind.isNone():
finalizedEpoch
else:
compute_epoch_at_slot(sq.rewind.get().failSlot) -
sq.rewind.get().epochCount
compute_start_slot_at_epoch(rewindEpoch)
else:
# Calculate the rewind epoch, which should not be less than the latest
# finalized epoch.
let rewindEpoch = failEpoch - epochCount
# Update and save new rewind point in SyncQueue.
sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount))
compute_start_slot_at_epoch(rewindEpoch)
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ForkedSignedBeaconBlock]) {.async, gcsafe.} =
## Push successful result to queue ``sq``.
mixin updateScore
if sr.index notin sq.pending:
# If request `sr` not in our pending list, it only means that
# SyncQueue.resetWait() happens and all pending requests are expired, so
# we swallow `old` requests, and in such way sync-workers are able to get
# proper new requests from SyncQueue.
return
sq.pending.del(sr.index)
# This is backpressure handling algorithm, this algorithm is blocking
# all pending `push` requests if `request.slot` not in range:
# [current_queue_slot, current_queue_slot + sq.queueSize * sq.chunkSize].
var exitNow = false
while true:
if (sq.queueSize > 0) and
(sr.slot >= sq.outSlot + uint64(sq.queueSize) * sq.chunkSize):
let res = await sq.waitForChanges(sr)
if res:
continue
else:
# SyncQueue reset happens. We are exiting to wake up sync-worker.
exitNow = true
break
let syncres = SyncResult[T](request: sr, data: data)
sq.readyQueue.push(syncres)
exitNow = false
break
if exitNow:
return
while len(sq.readyQueue) > 0:
let minSlot = sq.readyQueue[0].request.slot
if sq.outSlot != minSlot:
break
let item = sq.readyQueue.pop()
# Validating received blocks one by one
var res: Result[void, BlockError]
var failSlot: Option[Slot]
if len(item.data) > 0:
for blk in item.data:
trace "Pushing block", block_root = blk.root,
block_slot = blk.slot
res = await sq.validate(blk)
if not(res.isOk):
failSlot = some(blk.slot)
break
else:
res = Result[void, BlockError].ok()
# Increase progress counter, so watch task will be able to know that we are
# not stuck.
inc(sq.opcounter)
if res.isOk:
sq.outSlot = sq.outSlot + item.request.count
if len(item.data) > 0:
# If there no error and response was not empty we should reward peer
# with some bonus score.
item.request.item.updateScore(PeerScoreGoodBlocks)
sq.wakeupWaiters()
else:
debug "Block pool rejected peer's response", peer = item.request.item,
request_slot = item.request.slot,
request_count = item.request.count,
request_step = item.request.step,
blocks_map = getShortMap(item.request, item.data),
blocks_count = len(item.data), errCode = res.error,
topics = "syncman"
var resetSlot: Option[Slot]
if res.error == BlockError.MissingParent:
# If we got `BlockError.MissingParent` it means that peer returns chain
# of blocks with holes or `block_pool` is in incomplete state. We going
# to rewind to the first slot at latest finalized epoch.
let req = item.request
let finalizedSlot = sq.getFinalizedSlot()
if finalizedSlot < req.slot:
let rewindSlot = sq.getRewindPoint(failSlot.get(), finalizedSlot)
warn "Unexpected missing parent, rewind happens",
peer = req.item, rewind_to_slot = rewindSlot,
rewind_epoch_count = sq.rewind.get().epochCount,
rewind_fail_slot = failSlot.get(),
finalized_slot = finalized_slot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
resetSlot = some(rewindSlot)
req.item.updateScore(PeerScoreMissingBlocks)
else:
error "Unexpected missing parent at finalized epoch slot",
peer = req.item, to_slot = finalizedSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
elif res.error == BlockError.Invalid:
let req = item.request
warn "Received invalid sequence of blocks", peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
else:
let req = item.request
warn "Received unexpected response from block_pool", peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), errorCode = res.error,
topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
# We need to move failed response to the debts queue.
sq.toDebtsQueue(item.request)
if resetSlot.isSome():
await sq.resetWait(resetSlot)
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
rewind_epoch_count = sq.rewind.get().epochCount,
rewind_fail_slot = sq.rewind.get().failSlot,
reset_slot = resetSlot, topics = "syncman"
break
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
## Push failed request back to queue.
if sr.index notin sq.pending:
# If request `sr` not in our pending list, it only means that
# SyncQueue.resetWait() happens and all pending requests are expired, so
# we swallow `old` requests, and in such way sync-workers are able to get
# proper new requests from SyncQueue.
return
sq.pending.del(sr.index)
sq.toDebtsQueue(sr)
proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
if len(sq.debtsQueue) > 0:
if maxSlot < sq.debtsQueue[0].slot:
return SyncRequest.empty(T)
var sr = sq.debtsQueue.pop()
if sr.lastSlot() <= maxSlot:
sq.debtsCount = sq.debtsCount - sr.count
sr.setItem(item)
sq.makePending(sr)
return sr
var sr1 = SyncRequest.init(T, sr.slot, maxslot, item)
let sr2 = SyncRequest.init(T, maxslot + 1'u64, sr.lastSlot())
sq.debtsQueue.push(sr2)
sq.debtsCount = sq.debtsCount - sr1.count
sq.makePending(sr1)
return sr1
else:
if maxSlot < sq.inpSlot:
return SyncRequest.empty(T)
if sq.inpSlot > sq.lastSlot:
return SyncRequest.empty(T)
let lastSlot = min(maxslot, sq.lastSlot)
let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
var sr = SyncRequest.init(T, sq.inpSlot, count, item)
sq.inpSlot = sq.inpSlot + count
sq.makePending(sr)
return sr
proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} =
## Returns number of slots left in queue ``sq``.
if sq.inpSlot > sq.lastSlot:
result = sq.debtsCount
else:
result = sq.lastSlot - sq.inpSlot + 1'u64 - sq.debtsCount
proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} =
## Returns total number of slots in queue ``sq``.
sq.lastSlot - sq.startSlot + 1'u64
proc progress*[T](sq: SyncQueue[T]): uint64 =
## Returns queue's ``sq`` progress string.
let curSlot = sq.outSlot - sq.startSlot
(curSlot * 100'u64) div sq.total()
proc now*(sm: typedesc[SyncMoment], slot: Slot): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slot: slot)
@ -734,8 +121,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
rangeAge = uint64(SLOTS_PER_EPOCH * 4)
): SyncManager[A, B] =
let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(),
chunkSize, getFinalizedSlotCb, blockProcessor, 1)
let queue = SyncQueue.init(A, SyncQueueKind.Forward, getLocalHeadSlotCb(),
getLocalWallSlotCb(), chunkSize,
getFinalizedSlotCb, blockProcessor, 1)
result = SyncManager[A, B](
pool: pool,
@ -955,7 +343,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
local_head_slot = headSlot, remote_head_slot = peerSlot,
queue_input_slot = man.queue.inpSlot,
queue_output_slot = man.queue.outSlot,
queue_last_slot = man.queue.lastSlot,
queue_last_slot = man.queue.finalSlot,
peer_speed = peer.netKbps(), peer_score = peer.getScore(),
index = index, topics = "syncman"
await sleepAsync(RESP_TIMEOUT)
@ -1176,7 +564,8 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
if not(man.notInSyncEvent.isSet()):
# We get here only if we lost sync for more then `maxHeadAge` period.
if pending == 0:
man.queue = SyncQueue.init(A, man.getLocalHeadSlot(),
man.queue = SyncQueue.init(A, SyncQueueKind.Forward,
man.getLocalHeadSlot(),
man.getLocalWallSlot(),
man.chunkSize, man.getFinalizedSlot,
man.blockProcessor, 1)

View File

@ -0,0 +1,783 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import std/[options, heapqueue, tables, strutils, sequtils, math, algorithm]
import stew/results, chronos, chronicles
import
../spec/datatypes/[base, phase0, altair, merge],
../spec/eth2_apis/rpc_types,
../spec/[helpers, forks],
../networking/[peer_pool, eth2_network],
../gossip_processing/block_processor,
../consensus_object_pools/block_pools_types,
./peer_scores
export base, phase0, altair, merge, chronos, chronicles, results,
block_pools_types, helpers, peer_scores
logScope:
topics = "syncqueue"
type
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
SyncQueueKind* {.pure.} = enum
Forward, Backward
SyncRequest*[T] = object
kind: SyncQueueKind
index*: uint64
slot*: Slot
count*: uint64
step*: uint64
item*: T
SyncResult*[T] = object
request*: SyncRequest[T]
data*: seq[ForkedSignedBeaconBlock]
SyncWaiter*[T] = object
future: Future[bool]
request: SyncRequest[T]
RewindPoint = object
failSlot: Slot
epochCount: uint64
SyncQueue*[T] = ref object
kind*: SyncQueueKind
inpSlot*: Slot
outSlot*: Slot
startSlot*: Slot
finalSlot*: Slot
chunkSize*: uint64
queueSize*: int
counter*: uint64
opcounter*: uint64
pending*: Table[uint64, SyncRequest[T]]
waiters: seq[SyncWaiter[T]]
getSafeSlot*: GetSlotCallback
debtsQueue: HeapQueue[SyncRequest[T]]
debtsCount: uint64
readyQueue: HeapQueue[SyncResult[T]]
rewind: Option[RewindPoint]
blockProcessor: ref BlockProcessor
SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]]
proc validate*[T](sq: SyncQueue[T],
blk: ForkedSignedBeaconBlock
): Future[Result[void, BlockError]] =
let resfut = newFuture[Result[void, BlockError]]("sync.manager.validate")
sq.blockProcessor[].addBlock(blk, resfut)
resfut
proc getShortMap*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): string =
## Returns all slot numbers in ``data`` as placement map.
var res = newStringOfCap(req.count)
var slider = req.slot
var last = 0
for i in 0 ..< req.count:
if last < len(data):
for k in last ..< len(data):
if slider == data[k].slot:
res.add('x')
last = k + 1
break
elif slider < data[k].slot:
res.add('.')
break
else:
res.add('.')
slider = slider + req.step
res
proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} =
slot >= req.slot and slot < req.slot + req.count * req.step and
((slot - req.slot) mod req.step == 0)
proc cmp*[T](a, b: SyncRequest[T]): int =
cmp(uint64(a.slot), uint64(b.slot))
proc checkResponse*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): bool =
if len(data) == 0:
# Impossible to verify empty response.
return true
if uint64(len(data)) > req.count:
# Number of blocks in response should be less or equal to number of
# requested blocks.
return false
var slot = req.slot
var rindex = 0'u64
var dindex = 0
while (rindex < req.count) and (dindex < len(data)):
if slot < data[dindex].slot:
discard
elif slot == data[dindex].slot:
inc(dindex)
else:
return false
slot = slot + req.step
rindex = rindex + 1'u64
if dindex == len(data):
return true
else:
return false
proc getFullMap*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): string =
# Returns all slot numbers in ``data`` as comma-delimeted string.
mapIt(data, $it.message.slot).join(", ")
proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot,
finish: Slot, t2: typedesc[T]): SyncRequest[T] =
let count = finish - start + 1'u64
SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64)
proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, slot: Slot,
count: uint64, item: T): SyncRequest[T] =
SyncRequest[T](kind: kind, slot: slot, count: count, item: item, step: 1'u64)
proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot,
finish: Slot, item: T): SyncRequest[T] =
let count = finish - start + 1'u64
SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64, item: item)
proc empty*[T](t: typedesc[SyncRequest], kind: SyncQueueKind,
t2: typedesc[T]): SyncRequest[T] {.inline.} =
SyncRequest[T](kind: kind, step: 0'u64, count: 0'u64)
proc setItem*[T](sr: var SyncRequest[T], item: T) =
sr.item = item
proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
(sr.step == 0'u64) and (sr.count == 0'u64)
proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
queueKind: SyncQueueKind,
start, final: Slot, chunkSize: uint64,
getSafeSlotCb: GetSlotCallback,
blockProcessor: ref BlockProcessor,
syncQueueSize: int = -1): SyncQueue[T] =
## Create new synchronization queue with parameters
##
## ``start`` and ``last`` are starting and finishing Slots.
##
## ``chunkSize`` maximum number of slots in one request.
##
## ``syncQueueSize`` maximum queue size for incoming data.
## If ``syncQueueSize > 0`` queue will help to keep backpressure under
## control. If ``syncQueueSize <= 0`` then queue size is unlimited (default).
# SyncQueue is the core of sync manager, this data structure distributes
# requests to peers and manages responses from peers.
#
# Because SyncQueue is async data structure it manages backpressure and
# order of incoming responses and it also resolves "joker's" problem.
#
# Joker's problem
#
# According to current Ethereum2 network specification
# > Clients MUST respond with at least one block, if they have it and it
# > exists in the range. Clients MAY limit the number of blocks in the
# > response.
#
# Such rule can lead to very uncertain responses, for example let slots from
# 10 to 12 will be not empty. Client which follows specification can answer
# with any response from this list (X - block, `-` empty space):
#
# 1. X X X
# 2. - - X
# 3. - X -
# 4. - X X
# 5. X - -
# 6. X - X
# 7. X X -
#
# If peer answers with `1` everything will be fine and `block_pool` will be
# able to process all 3 blocks. In case of `2`, `3`, `4`, `6` - `block_pool`
# will fail immediately with chunk and report "parent is missing" error.
# But in case of `5` and `7` blocks will be processed by `block_pool` without
# any problems, however it will start producing problems right from this
# uncertain last slot. SyncQueue will start producing requests for next
# blocks, but all the responses from this point will fail with "parent is
# missing" error. Lets call such peers "jokers", because they are joking
# with responses.
#
# To fix "joker" problem we going to perform rollback to the latest finalized
# epoch's first slot.
doAssert(chunkSize > 0'u64, "Chunk size should not be zero")
SyncQueue[T](
kind: queueKind,
startSlot: start,
finalSlot: final,
chunkSize: chunkSize,
queueSize: syncQueueSize,
getSafeSlot: getSafeSlotCb,
waiters: newSeq[SyncWaiter[T]](),
counter: 1'u64,
pending: initTable[uint64, SyncRequest[T]](),
debtsQueue: initHeapQueue[SyncRequest[T]](),
inpSlot: start,
outSlot: start,
blockProcessor: blockProcessor
)
proc `<`*[T](a, b: SyncRequest[T]): bool =
doAssert(a.kind == b.kind)
case a.kind
of SyncQueueKind.Forward:
a.slot < b.slot
of SyncQueueKind.Backward:
a.slot > b.slot
proc `<`*[T](a, b: SyncResult[T]): bool =
doAssert(a.request.kind == b.request.kind)
case a.request.kind
of SyncQueueKind.Forward:
a.request.slot < b.request.slot
of SyncQueueKind.Backward:
a.request.slot > b.request.slot
proc `==`*[T](a, b: SyncRequest[T]): bool =
(a.kind == b.kind) and (a.slot == b.slot) and (a.count == b.count) and
(a.step == b.step)
proc lastSlot*[T](req: SyncRequest[T]): Slot =
## Returns last slot for request ``req``.
req.slot + req.count - 1'u64
proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) =
req.index = sq.counter
sq.counter = sq.counter + 1'u64
sq.pending[req.index] = req
proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
## Update last slot stored in queue ``sq`` with value ``last``.
case sq.kind
of SyncQueueKind.Forward:
doAssert(sq.finalSlot <= last,
"Last slot could not be lower then stored one " &
$sq.finalSlot & " <= " & $last)
sq.finalSlot = last
of SyncQueueKind.Backward:
doAssert(sq.finalSlot >= last,
"Last slot could not be higher then stored one " &
$sq.finalSlot & " >= " & $last)
sq.finalSlot = last
proc wakeupWaiters[T](sq: SyncQueue[T], flag = true) =
## Wakeup one or all blocked waiters.
for item in sq.waiters:
if not(item.future.finished()):
item.future.complete(flag)
proc waitForChanges[T](sq: SyncQueue[T],
req: SyncRequest[T]): Future[bool] {.async.} =
## Create new waiter and wait for completion from `wakeupWaiters()`.
var waitfut = newFuture[bool]("SyncQueue.waitForChanges")
let waititem = SyncWaiter[T](future: waitfut, request: req)
sq.waiters.add(waititem)
try:
let res = await waitfut
return res
finally:
sq.waiters.delete(sq.waiters.find(waititem))
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} =
## This procedure will perform wakeupWaiters(false) and blocks until last
## waiter will be awakened.
var waitChanges = sq.waitForChanges(SyncRequest.empty(sq.kind, T))
sq.wakeupWaiters(false)
discard await waitChanges
proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} =
## Perform reset of all the blocked waiters in SyncQueue.
##
## We adding one more waiter to the waiters sequence and
## call wakeupWaiters(false). Because our waiter is last in sequence of
## waiters it will be resumed only after all waiters will be awakened and
## finished.
# We are clearing pending list, so that all requests that are still running
# around (still downloading, but not yet pushed to the SyncQueue) will be
# expired. Its important to perform this call first (before await), otherwise
# you can introduce race problem.
sq.pending.clear()
# We calculating minimal slot number to which we will be able to reset,
# without missing any blocks. There 3 sources:
# 1. Debts queue.
# 2. Processing queue (`inpSlot`, `outSlot`).
# 3. Requested slot `toSlot`.
#
# Queue's `outSlot` is the lowest slot we added to `block_pool`, but
# `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not
# added slot requests, so it can't be bigger then `outSlot` value.
let minSlot =
case sq.kind
of SyncQueueKind.Forward:
if toSlot.isSome():
min(toSlot.get(), sq.outSlot)
else:
sq.outSlot
of SyncQueueKind.Backward:
if toSlot.isSome():
toSlot.get()
else:
sq.outSlot
sq.debtsQueue.clear()
sq.debtsCount = 0
sq.readyQueue.clear()
sq.inpSlot = minSlot
sq.outSlot = minSlot
# We are going to wakeup all the waiters and wait for last one.
await sq.wakeupAndWaitWaiters()
proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} =
## Returns ``true`` if response chain of blocks is empty (has only empty
## slots).
len(sr.data) == 0
proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} =
## Returns ``true`` if response chain of blocks has gap at the end.
let lastslot = sr.request.slot + sr.request.count - 1'u64
if len(sr.data) == 0:
return true
if sr.data[^1].slot != lastslot:
return true
return false
proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} =
## Returns last non-empty slot from result ``sr``. If response has only
## empty slots, original request slot will be returned.
if len(sr.data) == 0:
# If response has only empty slots we going to use original request slot
sr.request.slot
else:
sr.data[^1].slot
proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
sq.debtsQueue.push(sr)
sq.debtsCount = sq.debtsCount + sr.count
proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
safeSlot: Slot): Slot =
case sq.kind
of SyncQueueKind.Forward:
# Calculate the latest finalized epoch.
let finalizedEpoch = compute_epoch_at_slot(safeSlot)
# Calculate failure epoch.
let failEpoch = compute_epoch_at_slot(failSlot)
# Calculate exponential rewind point in number of epochs.
let epochCount =
if sq.rewind.isSome():
let rewind = sq.rewind.get()
if failSlot == rewind.failSlot:
# `MissingParent` happened at same slot so we increase rewind point by
# factor of 2.
if failEpoch > finalizedEpoch:
let rewindPoint = rewind.epochCount shl 1
if rewindPoint < rewind.epochCount:
# If exponential rewind point produces `uint64` overflow we will
# make rewind to latest finalized epoch.
failEpoch - finalizedEpoch
else:
if (failEpoch < rewindPoint) or
(failEpoch - rewindPoint < finalizedEpoch):
# If exponential rewind point points to position which is far
# behind latest finalized epoch.
failEpoch - finalizedEpoch
else:
rewindPoint
else:
warn "Trying to rewind over the last finalized epoch",
finalized_slot = safeSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
rewind_epoch_count = rewind.epochCount,
finalized_epoch = finalizedEpoch
0'u64
else:
# `MissingParent` happened at different slot so we going to rewind for
# 1 epoch only.
if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch):
warn "Сould not rewind further than the last finalized epoch",
finalized_slot = safeSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
rewind_epoch_count = rewind.epochCount,
finalized_epoch = finalizedEpoch
0'u64
else:
1'u64
else:
# `MissingParent` happened first time.
if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch):
warn "Сould not rewind further than the last finalized epoch",
finalized_slot = safeSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
finalized_epoch = finalizedEpoch
0'u64
else:
1'u64
if epochCount == 0'u64:
warn "Unable to continue syncing, please restart the node",
finalized_slot = safeSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
finalized_epoch = finalizedEpoch
# Calculate the rewind epoch, which will be equal to last rewind point or
# finalizedEpoch
let rewindEpoch =
if sq.rewind.isNone():
finalizedEpoch
else:
compute_epoch_at_slot(sq.rewind.get().failSlot) -
sq.rewind.get().epochCount
compute_start_slot_at_epoch(rewindEpoch)
else:
# Calculate the rewind epoch, which should not be less than the latest
# finalized epoch.
let rewindEpoch = failEpoch - epochCount
# Update and save new rewind point in SyncQueue.
sq.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount))
compute_start_slot_at_epoch(rewindEpoch)
of SyncQueueKind.Backward:
# While we perform backward sync, the only possible slot we could rewind is
# latest stored block.
if failSlot == safeSlot:
warn "Unable to continue syncing, please restart the node",
safe_slot = safeSlot, fail_slot = failSlot
safeSlot
iterator blocks*[T](sq: SyncQueue[T],
sr: SyncResult[T]): ForkedSignedBeaconBlock =
case sq.kind
of SyncQueueKind.Forward:
for i in countup(0, len(sr.data) - 1):
yield sr.data[i]
of SyncQueueKind.Backward:
for i in countdown(len(sr.data) - 1, 0):
yield sr.data[i]
proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) =
case sq.kind
of SyncQueueKind.Forward:
sq.outSlot = sq.outSlot + number
of SyncQueueKind.Backward:
sq.outSlot = sq.outSlot - number
proc advanceInput[T](sq: SyncQueue[T], number: uint64) =
case sq.kind
of SyncQueueKind.Forward:
sq.inpSlot = sq.inpSlot + number
of SyncQueueKind.Backward:
sq.inpSlot = sq.inpSlot - number
proc notInRange[T](sq: SyncQueue[T], slot: Slot): bool =
case sq.kind
of SyncQueueKind.Forward:
(sq.queueSize > 0) and
(slot >= sq.outSlot + uint64(sq.queueSize) * sq.chunkSize)
of SyncQueueKind.Backward:
(sq.queueSize > 0) and
(uint64(sq.queueSize) * sq.chunkSize <= sq.outSlot - slot)
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ForkedSignedBeaconBlock]) {.async, gcsafe.} =
## Push successful result to queue ``sq``.
mixin updateScore
if sr.index notin sq.pending:
# If request `sr` not in our pending list, it only means that
# SyncQueue.resetWait() happens and all pending requests are expired, so
# we swallow `old` requests, and in such way sync-workers are able to get
# proper new requests from SyncQueue.
return
sq.pending.del(sr.index)
# This is backpressure handling algorithm, this algorithm is blocking
# all pending `push` requests if `request.slot` not in range:
# [current_queue_slot, current_queue_slot + sq.queueSize * sq.chunkSize].
var exitNow = false
while true:
if sq.notInRange(sr.slot):
let res = await sq.waitForChanges(sr)
if res:
continue
else:
# SyncQueue reset happens. We are exiting to wake up sync-worker.
exitNow = true
break
let syncres = SyncResult[T](request: sr, data: data)
sq.readyQueue.push(syncres)
exitNow = false
break
if exitNow:
return
while len(sq.readyQueue) > 0:
let reqres =
case sq.kind
of SyncQueueKind.Forward:
let minSlot = sq.readyQueue[0].request.slot
if sq.outSlot != minSlot:
none[SyncResult[T]]()
else:
some(sq.readyQueue.pop())
of SyncQueueKind.Backward:
let maxSlot = sq.readyQueue[0].request.slot +
(sq.readyQueue[0].request.count - 1'u64)
if sq.outSlot != maxSlot:
none[SyncResult[T]]()
else:
some(sq.readyQueue.pop())
let item =
if reqres.isSome():
reqres.get()
else:
let rewindSlot = sq.getRewindPoint(sq.outSlot, sq.getSafeSlot())
warn "Got incorrect sync result in queue, rewind happens",
request_slot = sq.readyQueue[0].request.slot,
request_count = sq.readyQueue[0].request.count,
request_step = sq.readyQueue[0].request.step,
blocks_map = getShortMap(sq.readyQueue[0].request,
sq.readyQueue[0].data),
blocks_count = len(sq.readyQueue[0].data),
output_slot = sq.outSlot, input_slot = sq.inpSlot,
peer = sq.readyQueue[0].request.item, rewind_to_slot = rewindSlot,
topics = "syncman"
await sq.resetWait(some(rewindSlot))
break
# Validating received blocks one by one
var res: Result[void, BlockError]
var failSlot: Option[Slot]
if len(item.data) > 0:
for blk in sq.blocks(item):
trace "Pushing block", block_root = blk.root,
block_slot = blk.slot
res = await sq.validate(blk)
if res.isErr():
failSlot = some(blk.slot)
break
else:
res = Result[void, BlockError].ok()
# Increase progress counter, so watch task will be able to know that we are
# not stuck.
inc(sq.opcounter)
if res.isOk():
sq.advanceOutput(item.request.count)
if len(item.data) > 0:
# If there no error and response was not empty we should reward peer
# with some bonus score.
item.request.item.updateScore(PeerScoreGoodBlocks)
sq.wakeupWaiters()
else:
debug "Block pool rejected peer's response", peer = item.request.item,
request_slot = item.request.slot,
request_count = item.request.count,
request_step = item.request.step,
blocks_map = getShortMap(item.request, item.data),
blocks_count = len(item.data), errCode = res.error,
topics = "syncman"
var resetSlot: Option[Slot]
if res.error == BlockError.MissingParent:
# If we got `BlockError.MissingParent` it means that peer returns chain
# of blocks with holes or `block_pool` is in incomplete state. We going
# to rewind to the first slot at latest finalized epoch.
let
req = item.request
safeSlot = sq.getSafeSlot()
case sq.kind
of SyncQueueKind.Forward:
if safeSlot < req.slot:
let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot)
warn "Unexpected missing parent, rewind happens",
peer = req.item, rewind_to_slot = rewindSlot,
rewind_epoch_count = sq.rewind.get().epochCount,
rewind_fail_slot = failSlot.get(),
finalized_slot = safeSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
resetSlot = some(rewindSlot)
req.item.updateScore(PeerScoreMissingBlocks)
else:
error "Unexpected missing parent at finalized epoch slot",
peer = req.item, to_slot = safeSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
of SyncQueueKind.Backward:
if safeSlot > req.slot:
let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot)
warn "Unexpected missing parent, rewind happens",
peer = req.item, rewind_to_slot = rewindSlot,
rewind_fail_slot = failSlot.get(),
finalized_slot = safeSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
resetSlot = some(rewindSlot)
req.item.updateScore(PeerScoreMissingBlocks)
else:
error "Unexpected missing parent at safe slot",
peer = req.item, to_slot = safeSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
elif res.error == BlockError.Invalid:
let req = item.request
warn "Received invalid sequence of blocks", peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
else:
let req = item.request
warn "Received unexpected response from block_pool", peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), errorCode = res.error,
topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
# We need to move failed response to the debts queue.
sq.toDebtsQueue(item.request)
if resetSlot.isSome():
await sq.resetWait(resetSlot)
case sq.kind
of SyncQueueKind.Forward:
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
rewind_epoch_count = sq.rewind.get().epochCount,
rewind_fail_slot = sq.rewind.get().failSlot,
reset_slot = resetSlot, topics = "syncman"
of SyncQueueKind.Backward:
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
reset_slot = resetSlot, topics = "syncman"
break
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
## Push failed request back to queue.
if sr.index notin sq.pending:
# If request `sr` not in our pending list, it only means that
# SyncQueue.resetWait() happens and all pending requests are expired, so
# we swallow `old` requests, and in such way sync-workers are able to get
# proper new requests from SyncQueue.
return
sq.pending.del(sr.index)
sq.toDebtsQueue(sr)
proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
## Create new request according to current SyncQueue parameters.
if len(sq.debtsQueue) > 0:
if maxSlot < sq.debtsQueue[0].slot:
# Peer's latest slot is less than starting request's slot.
return SyncRequest.empty(sq.kind, T)
if maxSlot < sq.debtsQueue[0].lastSlot():
# Peer's latest slot is less than finishing request's slot.
return SyncRequest.empty(sq.kind, T)
var sr = sq.debtsQueue.pop()
sq.debtsCount = sq.debtsCount - sr.count
sr.setItem(item)
sq.makePending(sr)
sr
else:
case sq.kind
of SyncQueueKind.Forward:
if maxSlot < sq.inpSlot:
# Peer's latest slot is less than queue's input slot.
return SyncRequest.empty(sq.kind, T)
if sq.inpSlot > sq.finalSlot:
# Queue's input slot is bigger than queue's final slot.
return SyncRequest.empty(sq.kind, T)
let lastSlot = min(maxslot, sq.finalSlot)
let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item)
sq.advanceInput(count)
sq.makePending(sr)
sr
of SyncQueueKind.Backward:
if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64:
return SyncRequest.empty(sq.kind, T)
if sq.inpSlot < sq.finalSlot:
return SyncRequest.empty(sq.kind, T)
let (slot, count) =
block:
let baseSlot = sq.inpSlot + 1'u64
if baseSlot - sq.finalSlot < sq.chunkSize:
let count = uint64(baseSlot - sq.finalSlot)
(baseSlot - count, count)
else:
(baseSlot - sq.chunkSize, sq.chunkSize)
if (maxSlot + 1'u64) < slot + count:
# Peer's latest slot is less than queue's input slot.
return SyncRequest.empty(sq.kind, T)
var sr = SyncRequest.init(sq.kind, slot, count, item)
sq.advanceInput(count)
sq.makePending(sr)
sr
proc debtLen*[T](sq: SyncQueue[T]): uint64 =
sq.debtsCount
proc pendingLen*[T](sq: SyncQueue[T]): uint64 =
case sq.kind
of SyncQueueKind.Forward:
# When moving forward `outSlot` will be <= of `inpSlot`.
sq.inpSlot - sq.outSlot
of SyncQueueKind.Backward:
# When moving backward `outSlot` will be >= of `inpSlot`
sq.outSlot - sq.inpSlot
proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} =
## Returns number of slots left in queue ``sq``.
case sq.kind
of SyncQueueKind.Forward:
sq.finalSlot + 1'u64 - sq.outSlot
of SyncQueueKind.Backward:
sq.outSlot + 1'u64 - sq.finalSlot
proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} =
## Returns total number of slots in queue ``sq``.
case sq.kind
of SyncQueueKind.Forward:
sq.finalSlot + 1'u64 - sq.startSlot
of SyncQueueKind.Backward:
sq.startSlot + 1'u64 - sq.finalSlot
proc progress*[T](sq: SyncQueue[T]): uint64 =
## Returns queue's ``sq`` progress string.
let curSlot =
case sq.kind
of SyncQueueKind.Forward:
sq.outSlot - sq.startSlot
of SyncQueueKind.Backward:
sq.startSlot - sq.outSlot
(curSlot * 100'u64) div sq.total()

View File

@ -14,14 +14,14 @@ digraph architecture{
Eth2RPC -> SyncProtocol [dir=both]
SyncProtocol -> SyncManager [dir=both, label="beaconBlocksByRange() (mixin)"]
GossipSub -> Eth2Processor [label="node.topicBeaconBlocks: blockValidator->validateBeaconBlock (no transition or signature check yet)\nthen enqueued in blocksQueue"];
GossipSub -> Eth2Processor [label="node.topicBeaconBlocks: blockValidator->validateBeaconBlock (no transition or signature check yet)\nthen enqueued in blockQueue"];
GossipSub -> Eth2Processor [dir=back, label="node.topicBeaconBlocks: blockValidator()->ValidationResult.Accept->libp2p/gossipsub.nim\nvalidate() in rpcHandler()"];
Eth2Processor -> Clearance [label="storeBlock(): enqueue in clearance/quarantine and callback to fork choice"];
SyncProtocol -> RequestManager [dir=both, label="fetchAncestorBlocksFromNetwork()"];
SyncManager -> SharedBlockQueue [dir=both, label="Eth2Processor.blocksQueue\n== SyncManager.outQueue (shared state!)"];
Eth2Processor -> SharedBlockQueue [dir=both, label="Eth2Processor.blocksQueue\n== RequestManager.outQueue (shared state!)"];
SyncManager -> SharedBlockQueue [dir=both, label="Eth2Processor.blockQueue\n== SyncManager.outQueue (shared state!)"];
Eth2Processor -> SharedBlockQueue [dir=both, label="Eth2Processor.blockQueue\n== RequestManager.outQueue (shared state!)"];
SharedBlockQueue -> RequestManager [dir=both, label="SyncManager.outQueue\n== RequestManager.outQueue (shared state!)"];
LocalValidatorDuties -> Clearance

View File

@ -94,7 +94,7 @@ How the various modules interact with block is described in a diagram:
![./block_flow.png](./block_flow.png)
It is important to note that 3 data structures are sharing the same `AsyncQueue[BlockEntry]`:
- Eth2Processor.blocksQueue
- Eth2Processor.blockQueue
- SyncManager.outQueue
- RequestManager.outQueue

View File

@ -20,6 +20,9 @@ proc updateScore(peer: SomeTPeer, score: int) =
proc getFirstSlotAtFinalizedEpoch(): Slot =
Slot(0)
proc getSafeSlot(): Slot =
Slot(1024)
proc newBlockProcessor(): ref BlockProcessor =
# Minimal block processor for test - the real block processor has an unbounded
# queue but the tests here
@ -31,369 +34,617 @@ suite "SyncManager test suite":
proc createChain(start, finish: Slot): seq[ForkedSignedBeaconBlock] =
doAssert(start <= finish)
let count = int(finish - start + 1'u64)
result = newSeq[ForkedSignedBeaconBlock](count)
var res = newSeq[ForkedSignedBeaconBlock](count)
var curslot = start
for item in result.mitems():
for item in res.mitems():
item.phase0Data.message.slot = curslot
curslot = curslot + 1'u64
res
test "[SyncQueue] Start and finish slots equal":
proc getSlice(chain: openarray[ForkedSignedBeaconBlock], startSlot: Slot,
request: SyncRequest[SomeTPeer]): seq[ForkedSignedBeaconBlock] =
let
startIndex = int(request.slot - startSlot)
finishIndex = int(request.slot - startSlot) + int(request.count) - 1
@chain[startIndex..finishIndex]
template startAndFinishSlotsEqual(kind: SyncQueueKind) =
let p1 = SomeTPeer()
let aq = newBlockProcessor()
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64,
var queue = SyncQueue.init(SomeTPeer, kind,
Slot(0), Slot(0), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
check len(queue) == 1
check:
len(queue) == 1
pendingLen(queue) == 0
debtLen(queue) == 0
var r11 = queue.pop(Slot(0), p1)
check len(queue) == 0
check:
len(queue) == 1
pendingLen(queue) == 1
debtLen(queue) == 0
queue.push(r11)
check len(queue) == 1
check:
pendingLen(queue) == 1
len(queue) == 1
debtLen(queue) == 1
var r11e = queue.pop(Slot(0), p1)
check:
len(queue) == 0
len(queue) == 1
pendingLen(queue) == 1
debtLen(queue) == 0
r11e == r11
r11.item == p1
r11e.item == r11.item
r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64
test "[SyncQueue] Two full requests success/fail":
template passThroughLimitsTest(kind: SyncQueueKind) =
let
p1 = SomeTPeer()
p2 = SomeTPeer()
let Checks =
case kind
of SyncQueueKind.Forward:
@[
# Tests with zero start.
(Slot(0), Slot(0), 1'u64, (Slot(0), 1'u64),
1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64),
(Slot(0), Slot(0), 16'u64, (Slot(0), 1'u64),
1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64),
(Slot(0), Slot(1), 2'u64, (Slot(0), 2'u64),
2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64),
(Slot(0), Slot(1), 16'u64, (Slot(0), 2'u64),
2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64),
(Slot(0), Slot(15), 16'u64, (Slot(0), 16'u64),
16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64),
(Slot(0), Slot(15), 32'u64, (Slot(0), 16'u64),
16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64),
# Tests with non-zero start.
(Slot(1021), Slot(1021), 1'u64, (Slot(1021), 1'u64),
1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64),
(Slot(1021), Slot(1021), 16'u64, (Slot(1021), 1'u64),
1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64),
(Slot(1021), Slot(1022), 2'u64, (Slot(1021), 2'u64),
2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64),
(Slot(1021), Slot(1022), 16'u64, (Slot(1021), 2'u64),
2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64),
(Slot(1021), Slot(1036), 16'u64, (Slot(1021), 16'u64),
16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64),
(Slot(1021), Slot(1036), 32'u64, (Slot(1021), 16'u64),
16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64),
]
of SyncQueueKind.Backward:
@[
# Tests with zero finish.
(Slot(0), Slot(0), 1'u64, (Slot(0), 1'u64),
1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64),
(Slot(0), Slot(0), 16'u64, (Slot(0), 1'u64),
1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64),
(Slot(1), Slot(0), 2'u64, (Slot(0), 2'u64),
2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64),
(Slot(1), Slot(0), 16'u64, (Slot(0), 2'u64),
2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64),
(Slot(15), Slot(0), 16'u64, (Slot(0), 16'u64),
16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64),
(Slot(15), Slot(0), 32'u64, (Slot(0), 16'u64),
16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64),
# Tests with non-zero finish.
(Slot(1021), Slot(1021), 1'u64, (Slot(1021), 1'u64),
1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64),
(Slot(1021), Slot(1021), 16'u64, (Slot(1021), 1'u64),
1'u64, 0'u64, 0'u64, 1'u64, 1'u64, 0'u64),
(Slot(1022), Slot(1021), 2'u64, (Slot(1021), 2'u64),
2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64),
(Slot(1022), Slot(1021), 16'u64, (Slot(1021), 2'u64),
2'u64, 0'u64, 0'u64, 2'u64, 2'u64, 0'u64),
(Slot(1036), Slot(1021), 16'u64, (Slot(1021), 16'u64),
16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64),
(Slot(1036), Slot(1021), 32'u64, (Slot(1021), 16'u64),
16'u64, 0'u64, 0'u64, 16'u64, 16'u64, 0'u64),
]
for item in Checks:
let aq = newBlockProcessor()
var queue = SyncQueue.init(SomeTPeer, kind,
item[0], item[1], item[2],
getFirstSlotAtFinalizedEpoch, aq)
check:
len(queue) == item[4]
pendingLen(queue) == item[5]
debtLen(queue) == item[6]
var req1 = queue.pop(max(item[0], item[1]), p1)
check:
len(queue) == item[7]
pendingLen(queue) == item[8]
debtLen(queue) == item[9]
var req2 = queue.pop(max(item[0], item[1]), p2)
check:
req1.isEmpty() == false
req1.slot == item[3][0]
req1.count == item[3][1]
req1.step == 1'u64
req2.isEmpty() == true
template twoFullRequests(kkind: SyncQueueKind) =
let aq = newBlockProcessor()
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
var queue =
case kkind
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(1), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
Slot(1), Slot(0), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
check len(queue) == 2
check:
len(queue) == 2
pendingLen(queue) == 0
debtLen(queue) == 0
var r21 = queue.pop(Slot(1), p1)
check len(queue) == 1
check:
len(queue) == 2
pendingLen(queue) == 1
debtLen(queue) == 0
var r22 = queue.pop(Slot(1), p2)
check len(queue) == 0
check:
len(queue) == 2
pendingLen(queue) == 2
debtLen(queue) == 0
queue.push(r22)
check len(queue) == 1
check:
len(queue) == 2
pendingLen(queue) == 2
debtLen(queue) == 1
queue.push(r21)
check len(queue) == 2
check:
len(queue) == 2
pendingLen(queue) == 2
debtLen(queue) == 2
var r21e = queue.pop(Slot(1), p1)
check len(queue) == 1
check:
len(queue) == 2
pendingLen(queue) == 2
debtLen(queue) == 1
var r22e = queue.pop(Slot(1), p2)
check:
len(queue) == 0
len(queue) == 2
pendingLen(queue) == 2
debtLen(queue) == 0
r21 == r21e
r22 == r22e
r21.item == p1
r22.item == p2
r21.item == r21e.item
r22.item == r22e.item
r21.slot == Slot(0) and r21.count == 1'u64 and r21.step == 1'u64
r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64
case kkind
of SyncQueueKind.Forward:
check:
r21.slot == Slot(0) and r21.count == 1'u64 and r21.step == 1'u64
r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64
of SyncQueueKind.Backward:
check:
r21.slot == Slot(1) and r21.count == 1'u64 and r21.step == 1'u64
r22.slot == Slot(0) and r22.count == 1'u64 and r22.step == 1'u64
template smokeTest(kkind: SyncQueueKind, start, finish: Slot,
chunkSize: uint64) =
let
aq = newBlockProcessor()
var counter =
case kkind
of SyncQueueKind.Forward:
int(start)
of SyncQueueKind.Backward:
int(finish)
proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
sblock.done()
else:
sblock.fail(BlockError.Invalid)
dec(counter)
proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
inc(counter)
sblock.done()
else:
sblock.fail(BlockError.Invalid)
var
queue =
case kkind
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
start, finish, chunkSize,
getFirstSlotAtFinalizedEpoch, aq)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finish, start, chunkSize,
getFirstSlotAtFinalizedEpoch, aq)
chain = createChain(start, finish)
validatorFut =
case kkind
of SyncQueueKind.Forward:
forwardValidator(aq[].blockQueue)
of SyncQueueKind.Backward:
backwardValidator(aq[].blockQueue)
test "[SyncQueue] Full and incomplete success/fail start from zero":
let aq = newBlockProcessor()
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
check len(queue) == 5
var r31 = queue.pop(Slot(4), p1)
check len(queue) == 3
var r32 = queue.pop(Slot(4), p2)
check len(queue) == 1
var r33 = queue.pop(Slot(4), p3)
check len(queue) == 0
queue.push(r33)
check len(queue) == 1
queue.push(r32)
check len(queue) == 3
queue.push(r31)
check len(queue) == 5
var r31e = queue.pop(Slot(4), p1)
check len(queue) == 3
var r32e = queue.pop(Slot(4), p2)
check len(queue) == 1
var r33e = queue.pop(Slot(4), p3)
check:
len(queue) == 0
r31 == r31e
r32 == r32e
r33 == r33e
r31.item == r31e.item
r32.item == r32e.item
r33.item == r33e.item
r31.item == p1
r32.item == p2
r33.item == p3
r31.slot == Slot(0) and r31.count == 2'u64 and r31.step == 1'u64
r32.slot == Slot(2) and r32.count == 2'u64 and r32.step == 1'u64
r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64
test "[SyncQueue] Full and incomplete success/fail start from non-zero":
let aq = newBlockProcessor()
var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
check len(queue) == 5
var r41 = queue.pop(Slot(5), p1)
check len(queue) == 2
var r42 = queue.pop(Slot(5), p2)
check len(queue) == 0
queue.push(r42)
check len(queue) == 2
queue.push(r41)
check len(queue) == 5
var r41e = queue.pop(Slot(5), p1)
check len(queue) == 2
var r42e = queue.pop(Slot(5), p2)
check:
len(queue) == 0
r41 == r41e
r42 == r42e
r41.item == r41e.item
r42.item == r42e.item
r41.item == p1
r42.item == p2
r41.slot == Slot(1) and r41.count == 3'u64 and r41.step == 1'u64
r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64
proc runSmokeTest() {.async.} =
while true:
var request = queue.pop(finish, p1)
if request.isEmpty():
break
await queue.push(request, getSlice(chain, start, request))
await validatorFut.cancelAndWait()
test "[SyncQueue] Smart and stupid success/fail":
let aq = newBlockProcessor()
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
check len(queue) == 5
var r51 = queue.pop(Slot(3), p1)
check len(queue) == 1
var r52 = queue.pop(Slot(4), p2)
check len(queue) == 0
queue.push(r52)
check len(queue) == 1
queue.push(r51)
check len(queue) == 5
var r51e = queue.pop(Slot(3), p1)
check len(queue) == 1
var r52e = queue.pop(Slot(4), p2)
check:
len(queue) == 0
r51 == r51e
r52 == r52e
r51.item == r51e.item
r52.item == r52e.item
r51.item == p1
r52.item == p2
r51.slot == Slot(0) and r51.count == 4'u64 and r51.step == 1'u64
r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64
waitFor runSmokeTest()
case kkind
of SyncQueueKind.Forward:
check (counter - 1) == int(finish)
of SyncQueueKind.Backward:
check (counter + 1) == int(start)
test "[SyncQueue] One smart and one stupid + debt split + empty":
let aq = newBlockProcessor()
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
let p4 = SomeTPeer()
check len(queue) == 5
var r61 = queue.pop(Slot(4), p1)
check len(queue) == 0
queue.push(r61)
var r61e = queue.pop(Slot(2), p1)
check len(queue) == 2
var r62e = queue.pop(Slot(2), p2)
check len(queue) == 2
check r62e.isEmpty()
var r63e = queue.pop(Slot(3), p3)
check len(queue) == 1
var r64e = queue.pop(Slot(4), p4)
check:
len(queue) == 0
r61.slot == Slot(0) and r61.count == 5'u64 and r61.step == 1'u64
r61e.slot == Slot(0) and r61e.count == 3'u64 and r61e.step == 1'u64
r62e.isEmpty()
r63e.slot == Slot(3) and r63e.count == 1'u64 and r63e.step == 1'u64
r64e.slot == Slot(4) and r64e.count == 1'u64 and r64e.step == 1'u64
r61.item == p1
r61e.item == p1
isNil(r62e.item) == true
r63e.item == p3
r64e.item == p4
template unorderedAsyncTest(kkind: SyncQueueKind, startSlot: Slot) =
let
aq = newBlockProcessor()
chunkSize = 3'u64
numberOfChunks = 3'u64
finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64)
queueSize = 1
test "[SyncQueue] Async unordered push start from zero":
proc test(): Future[bool] {.async.} =
var counter = 0
var counter =
case kkind
of SyncQueueKind.Forward:
int(startSlot)
of SyncQueueKind.Backward:
int(finishSlot)
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
inc(counter)
sblock.done()
else:
sblock.fail(BlockError.Invalid)
proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
sblock.done()
else:
sblock.fail(BlockError.Invalid)
dec(counter)
let aq = newBlockProcessor()
var chain = createChain(Slot(0), Slot(2))
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64,
getFirstSlotAtFinalizedEpoch, aq, 1)
proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
inc(counter)
sblock.done()
else:
sblock.fail(BlockError.Invalid)
var validatorFut = simpleValidator(aq[].blockQueue)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
var r11 = queue.pop(Slot(2), p1)
var r12 = queue.pop(Slot(2), p2)
var r13 = queue.pop(Slot(2), p3)
var f13 = queue.push(r13, @[chain[2]])
#
var
chain = createChain(startSlot, finishSlot)
queue =
case kkind
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
startSlot, finishSlot, chunkSize,
getFirstSlotAtFinalizedEpoch, aq,
queueSize)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finishSlot, startSlot, chunkSize,
getFirstSlotAtFinalizedEpoch, aq,
queueSize)
validatorFut =
case kkind
of SyncQueueKind.Forward:
forwardValidator(aq[].blockQueue)
of SyncQueueKind.Backward:
backwardValidator(aq[].blockQueue)
let
p1 = SomeTPeer()
p2 = SomeTPeer()
p3 = SomeTPeer()
proc runTest(): Future[bool] {.async.} =
var r11 = queue.pop(finishSlot, p1)
var r12 = queue.pop(finishSlot, p2)
var r13 = queue.pop(finishSlot, p3)
var f13 = queue.push(r13, chain.getSlice(startSlot, r13))
await sleepAsync(100.milliseconds)
# doAssert(f12.finished == false)
doAssert(f13.finished == false)
doAssert(counter == 0)
var f11 = queue.push(r11, @[chain[0]])
check:
f13.finished == false
case kkind
of SyncQueueKind.Forward: counter == int(startSlot)
of SyncQueueKind.Backward: counter == int(finishSlot)
var f11 = queue.push(r11, chain.getSlice(startSlot, r11))
await sleepAsync(100.milliseconds)
doAssert(counter == 1)
doAssert(f11.finished == true and f11.failed == false)
var f12 = queue.push(r12, @[chain[1]])
check:
case kkind
of SyncQueueKind.Forward: counter == int(startSlot + chunkSize)
of SyncQueueKind.Backward: counter == int(finishSlot - chunkSize)
f11.finished == true and f11.failed == false
f13.finished == false
var f12 = queue.push(r12, chain.getSlice(startSlot, r12))
await allFutures(f11, f12, f13)
check:
f12.finished == true and f12.failed == false
f13.finished == true and f13.failed == false
check:
case kkind
of SyncQueueKind.Forward: counter == int(finishSlot) + 1
of SyncQueueKind.Backward: counter == int(startSlot) - 1
r11.item == p1
r12.item == p2
r13.item == p3
await validatorFut.cancelAndWait()
return true
check waitFor(runTest()) == true
for k in {SyncQueueKind.Forward, SyncQueueKind.Backward}:
let prefix = "[SyncQueue#" & $k & "] "
test prefix & "Start and finish slots equal":
startAndFinishSlotsEqual(k)
test prefix & "Pass through established limits test":
passThroughLimitsTest(k)
test prefix & "Two full requests success/fail":
twoFullRequests(k)
test prefix & "Smoke test":
const SmokeTests = [
(Slot(0), Slot(547), 61'u64),
(Slot(193), Slot(389), 79'u64),
(Slot(1181), Slot(1399), 41'u64)
]
for item in SmokeTests:
smokeTest(k, item[0], item[1], item[2])
test prefix & "Async unordered push test":
const UnorderedTests = [
Slot(0), Slot(100)
]
for item in UnorderedTests:
unorderedAsyncTest(k, item)
test "[SyncQueue#Forward] Async unordered push with rewind test":
let
aq = newBlockProcessor()
startSlot = Slot(0)
chunkSize = SLOTS_PER_EPOCH
numberOfChunks = 4'u64
finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64)
queueSize = 1
var counter = int(startSlot)
proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
withBlck(sblock.blck):
if blck.message.proposer_index == 0xDEADBEAF'u64:
sblock.fail(BlockError.MissingParent)
else:
inc(counter)
sblock.done()
else:
sblock.fail(BlockError.Invalid)
var
chain = createChain(startSlot, finishSlot)
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
startSlot, finishSlot, chunkSize,
getFirstSlotAtFinalizedEpoch, aq,
queueSize)
validatorFut = forwardValidator(aq[].blockQueue)
let
p1 = SomeTPeer()
p2 = SomeTPeer()
p3 = SomeTPeer()
p4 = SomeTPeer()
p5 = SomeTPeer()
p6 = SomeTPeer()
p7 = SomeTPeer()
p8 = SomeTPeer()
proc runTest(): Future[bool] {.async.} =
var r11 = queue.pop(finishSlot, p1)
var r12 = queue.pop(finishSlot, p2)
var r13 = queue.pop(finishSlot, p3)
var r14 = queue.pop(finishSlot, p4)
var f14 = queue.push(r14, chain.getSlice(startSlot, r14))
await sleepAsync(100.milliseconds)
doAssert(f12.finished == true and f12.failed == false)
doAssert(f13.finished == true and f13.failed == false)
doAssert(counter == 3)
doAssert(r11.item == p1)
doAssert(r12.item == p2)
doAssert(r13.item == p3)
check:
f14.finished == false
counter == int(startSlot)
var f12 = queue.push(r12, chain.getSlice(startSlot, r12))
await sleepAsync(100.milliseconds)
check:
counter == int(startSlot)
f12.finished == false
f14.finished == false
var f11 = queue.push(r11, chain.getSlice(startSlot, r11))
await allFutures(f11, f12)
check:
counter == int(startSlot + chunkSize + chunkSize)
f11.finished == true and f11.failed == false
f12.finished == true and f12.failed == false
f14.finished == false
var missingSlice = chain.getSlice(startSlot, r13)
withBlck(missingSlice[0]):
blck.message.proposer_index = 0xDEADBEAF'u64
var f13 = queue.push(r13, missingSlice)
await allFutures(f13, f14)
check:
f11.finished == true and f11.failed == false
f12.finished == true and f12.failed == false
f13.finished == true and f13.failed == false
f14.finished == true and f14.failed == false
queue.inpSlot == Slot(SLOTS_PER_EPOCH)
queue.outSlot == Slot(SLOTS_PER_EPOCH)
queue.debtLen == 0
# Recovery process
counter = int(SLOTS_PER_EPOCH)
var r15 = queue.pop(finishSlot, p5)
var r16 = queue.pop(finishSlot, p6)
var r17 = queue.pop(finishSlot, p7)
var r18 = queue.pop(finishSlot, p8)
check r18.isEmpty() == true
var f17 = queue.push(r17, chain.getSlice(startSlot, r17))
await sleepAsync(100.milliseconds)
check f17.finished == false
var f16 = queue.push(r16, chain.getSlice(startSlot, r16))
await sleepAsync(100.milliseconds)
check f16.finished == false
var f15 = queue.push(r15, chain.getSlice(startSlot, r15))
await allFutures(f15, f16, f17)
check:
f15.finished == true and f15.failed == false
f16.finished == true and f16.failed == false
f17.finished == true and f17.failed == false
counter == int(finishSlot) + 1
await validatorFut.cancelAndWait()
result = true
return true
check waitFor(test())
check waitFor(runTest()) == true
test "[SyncQueue] Async unordered push with not full start from non-zero":
proc test(): Future[bool] {.async.} =
var counter = 5
test "[SyncQueue#Backward] Async unordered push with rewind test":
let
aq = newBlockProcessor()
startSlot = Slot(0)
chunkSize = SLOTS_PER_EPOCH
numberOfChunks = 4'u64
finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64)
queueSize = 1
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
inc(counter)
sblock.done()
else:
sblock.fail(BlockError.Invalid)
var
lastSafeSlot: Slot
counter = int(finishSlot)
let aq = newBlockProcessor()
var chain = createChain(Slot(5), Slot(11))
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64,
getFirstSlotAtFinalizedEpoch, aq, 2)
proc getSafeSlot(): Slot =
lastSafeSlot
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
let p4 = SomeTPeer()
proc backwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
withBlck(sblock.blck):
if blck.message.proposer_index == 0xDEADBEAF'u64:
sblock.fail(BlockError.MissingParent)
else:
lastSafeSlot = sblock.blck.slot
dec(counter)
sblock.done()
else:
sblock.fail(BlockError.Invalid)
var validatorFut = simpleValidator(aq[].blockQueue)
var
chain = createChain(startSlot, finishSlot)
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finishSlot, startSlot, chunkSize,
getSafeSlot, aq, queueSize)
validatorFut = backwardValidator(aq[].blockQueue)
var r21 = queue.pop(Slot(11), p1)
var r22 = queue.pop(Slot(11), p2)
var r23 = queue.pop(Slot(11), p3)
var r24 = queue.pop(Slot(11), p4)
let
p1 = SomeTPeer()
p2 = SomeTPeer()
p3 = SomeTPeer()
p4 = SomeTPeer()
p5 = SomeTPeer()
p6 = SomeTPeer()
p7 = SomeTPeer()
var f24 = queue.push(r24, @[chain[6]])
var f22 = queue.push(r22, @[chain[2], chain[3]])
doAssert(f24.finished == false)
doAssert(f22.finished == true and f22.failed == false)
doAssert(counter == 5)
var f21 = queue.push(r21, @[chain[0], chain[1]])
proc runTest(): Future[bool] {.async.} =
var r11 = queue.pop(finishSlot, p1)
var r12 = queue.pop(finishSlot, p2)
var r13 = queue.pop(finishSlot, p3)
var r14 = queue.pop(finishSlot, p4)
var f14 = queue.push(r14, chain.getSlice(startSlot, r14))
await sleepAsync(100.milliseconds)
doAssert(f21.finished == true and f21.failed == false)
doAssert(f24.finished == true and f24.failed == false)
doAssert(counter == 9)
var f23 = queue.push(r23, @[chain[4], chain[5]])
check:
f14.finished == false
counter == int(finishSlot)
var f12 = queue.push(r12, chain.getSlice(startSlot, r12))
await sleepAsync(100.milliseconds)
doAssert(f23.finished == true and f23.failed == false)
doAssert(counter == 12)
doAssert(counter == 12)
doAssert(r21.item == p1)
doAssert(r22.item == p2)
doAssert(r23.item == p3)
doAssert(r24.item == p4)
check:
counter == int(finishSlot)
f12.finished == false
f14.finished == false
var f11 = queue.push(r11, chain.getSlice(startSlot, r11))
await allFutures(f11, f12)
check:
counter == int(finishSlot - chunkSize - chunkSize)
f11.finished == true and f11.failed == false
f12.finished == true and f12.failed == false
f14.finished == false
var missingSlice = chain.getSlice(startSlot, r13)
withBlck(missingSlice[0]):
blck.message.proposer_index = 0xDEADBEAF'u64
var f13 = queue.push(r13, missingSlice)
await allFutures(f13, f14)
check:
f11.finished == true and f11.failed == false
f12.finished == true and f12.failed == false
f13.finished == true and f13.failed == false
f14.finished == true and f14.failed == false
# Recovery process
counter = int(SLOTS_PER_EPOCH) + 1
var r15 = queue.pop(finishSlot, p5)
var r16 = queue.pop(finishSlot, p6)
var r17 = queue.pop(finishSlot, p7)
check r17.isEmpty() == true
var f16 = queue.push(r16, chain.getSlice(startSlot, r16))
await sleepAsync(100.milliseconds)
check f16.finished == false
var f15 = queue.push(r15, chain.getSlice(startSlot, r15))
await allFutures(f15, f16)
check:
f15.finished == true and f15.failed == false
f16.finished == true and f16.failed == false
counter == int(startSlot) - 1
await validatorFut.cancelAndWait()
result = true
return true
check waitFor(test())
test "[SyncQueue] Async pending and resetWait() test":
proc test(): Future[bool] {.async.} =
var counter = 5
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blck.slot == Slot(counter):
inc(counter)
sblock.done()
else:
sblock.fail(BlockError.Invalid)
let aq = newBlockProcessor()
var chain = createChain(Slot(5), Slot(18))
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64,
getFirstSlotAtFinalizedEpoch, aq, 2)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
let p4 = SomeTPeer()
let p5 = SomeTPeer()
let p6 = SomeTPeer()
let p7 = SomeTPeer()
var validatorFut = simpleValidator(aq[].blockQueue)
var r21 = queue.pop(Slot(20), p1)
var r22 = queue.pop(Slot(20), p2)
var r23 = queue.pop(Slot(20), p3)
var r24 = queue.pop(Slot(20), p4)
var r25 = queue.pop(Slot(20), p5)
var r26 = queue.pop(Slot(20), p6)
var r27 = queue.pop(Slot(20), p7)
var f21 = queue.push(r21, @[chain[0], chain[1]])
# This should be silently ignored, because r21 is already processed.
var e21 = queue.push(r21, @[chain[0], chain[1]])
queue.push(r22)
queue.push(r23)
var f26 = queue.push(r26, @[chain[10], chain[11]])
var f27 = queue.push(r27, @[chain[12], chain[13]])
await sleepAsync(100.milliseconds)
doAssert(f21.finished == true and f21.failed == false)
doAssert(e21.finished == true and e21.failed == false)
doAssert(f26.finished == false)
doAssert(f27.finished == false)
await queue.resetWait(none[Slot]())
await sleepAsync(100.milliseconds)
doAssert(f26.finished == true and f26.failed == false)
doAssert(f27.finished == true and f27.failed == false)
doAssert(queue.inpSlot == Slot(7) and queue.outSlot == Slot(7))
doAssert(counter == 7)
doAssert(len(queue) == 12)
# This should be silently ignored, because r21 is already processed.
var o21 = queue.push(r21, @[chain[0], chain[1]])
var o22 = queue.push(r22, @[chain[2], chain[3]])
queue.push(r23)
queue.push(r24)
var o25 = queue.push(r25, @[chain[8], chain[9]])
var o26 = queue.push(r26, @[chain[10], chain[11]])
var o27 = queue.push(r27, @[chain[12], chain[13]])
await sleepAsync(100.milliseconds)
doAssert(o21.finished == true and o21.failed == false)
doAssert(o22.finished == true and o22.failed == false)
doAssert(o25.finished == true and o25.failed == false)
doAssert(o26.finished == true and o26.failed == false)
doAssert(o27.finished == true and o27.failed == false)
doAssert(len(queue) == 12)
await validatorFut.cancelAndWait()
result = true
check waitFor(test())
check waitFor(runTest()) == true
test "[SyncQueue] hasEndGap() test":
let chain1 = createChain(Slot(1), Slot(1))
@ -519,10 +770,10 @@ suite "SyncManager test suite":
checkResponse(r22, @[chain[4]]) == false
checkResponse(r22, @[chain[3], chain[1]]) == false
test "[SyncQueue] getRewindPoint() test":
test "[SyncQueue#Forward] getRewindPoint() test":
let aq = newBlockProcessor()
block:
var queue = SyncQueue.init(SomeTPeer,
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getFirstSlotAtFinalizedEpoch, aq, 2)
let finalizedSlot = compute_start_slot_at_epoch(Epoch(0'u64))
@ -533,7 +784,7 @@ suite "SyncManager test suite":
check queue.getRewindPoint(Slot(i), finalizedSlot) == finalizedSlot
block:
var queue = SyncQueue.init(SomeTPeer,
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getFirstSlotAtFinalizedEpoch, aq, 2)
let finalizedSlot = compute_start_slot_at_epoch(Epoch(1'u64))
@ -544,7 +795,7 @@ suite "SyncManager test suite":
check queue.getRewindPoint(Slot(i), finalizedSlot) == finalizedSlot
block:
var queue = SyncQueue.init(SomeTPeer,
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getFirstSlotAtFinalizedEpoch, aq, 2)
let finalizedSlot = compute_start_slot_at_epoch(Epoch(0'u64))
@ -561,7 +812,7 @@ suite "SyncManager test suite":
counter = counter shl 1
block:
var queue = SyncQueue.init(SomeTPeer,
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getFirstSlotAtFinalizedEpoch, aq, 2)
let finalizedSlot = compute_start_slot_at_epoch(Epoch(1'u64))
@ -575,3 +826,13 @@ suite "SyncManager test suite":
let rewindSlot = compute_start_slot_at_epoch(rewindEpoch)
check queue.getRewindPoint(failSlot, finalizedSlot) == rewindSlot
counter = counter shl 1
test "[SyncQueue#Backward] getRewindPoint() test":
let aq = newBlockProcessor()
block:
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
Slot(1024), Slot(0),
1'u64, getSafeSlot, aq, 2)
let safeSlot = getSafeSlot()
for i in countdown(1023, 0):
check queue.getRewindPoint(Slot(i), safeSlot) == safeSlot