SyncManager cleanups for backfill support (#3189)

* SyncManager cleanups for backfill support

Cleanups, fixes and simplifications, in anticipation of backfill support
for the `SyncManager`:

* reformat sync progress indicator to show time left and % done more
prominently:
  * old: `sync="sPssPsssss:2:2.4229:00h57m (2706898)"`
  * new: `sync="14d12h31m (0.52%) 1.1378slots/s (wQQQQQDDQQ:1287520)"`
* reset average speed when going out of sync
* pass all block errors to sync manager, including duplicate/unviable
* penalize peers for reporting a head block that is outside of our
expected wall clock time (they're likely on a different network or
trying to disrupt sync)
* remove `SyncFailureKind` (unused)
* remove `inRange` (unused)
* add `Q` for sync queue requests that are in the `SyncQueue` but not
yet in the `BlockProcessor` queue
* update last slot in `SyncQueue` after getting peer status
* fix race condition between `wakeupWaiters` and `resetWait`, where
workers would not be correctly reset if block verification returned a
completed future without event loop
* log syncmanager direction

* Fix ordering issue.
Some of the requests size of which are not equal to `chunkSize` could be processed in wrong order which could lead to sync process freezes.

Co-authored-by: cheatfate <eugene.kabanov@status.im>
This commit is contained in:
Jacek Sieka 2021-12-16 15:57:16 +01:00 committed by GitHub
parent 5eabeef75d
commit 118840d241
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 328 additions and 293 deletions

View File

@ -8,6 +8,7 @@
{.push raises: [Defect].}
import
std/math,
chronos, chronicles,
./spec/helpers
@ -140,6 +141,15 @@ func shortLog*(d: Duration): string =
func toFloatSeconds*(d: Duration): float =
float(milliseconds(d)) / 1000.0
func fromFloatSeconds*(T: type Duration, f: float): Duration =
case classify(f)
of fcNormal:
if f >= float(int64.high() div 1_000_000_000): InfiniteDuration
elif f <= 0: ZeroDuration
else: nanoseconds(int64(f * 1_000_000_000))
of fcSubnormal, fcZero, fcNegZero, fcNan, fcNegInf: ZeroDuration
of fcInf: InfiniteDuration
func `$`*(v: BeaconTime): string = $(Duration v)
func shortLog*(v: BeaconTime): string = $(Duration v)

View File

@ -94,18 +94,6 @@ proc new*(T: type BlockProcessor,
# Sync callbacks
# ------------------------------------------------------------------------------
proc done*(entry: BlockEntry) =
## Send signal to [Sync/Request]Manager that the block ``entry`` has passed
## verification successfully.
if entry.resfut != nil:
entry.resfut.complete(Result[void, BlockError].ok())
proc fail*(entry: BlockEntry, error: BlockError) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
## verification with specific ``error``.
if entry.resfut != nil:
entry.resfut.complete(Result[void, BlockError].err(error))
proc hasBlocks*(self: BlockProcessor): bool =
self.blockQueue.len() > 0
@ -125,8 +113,15 @@ proc addBlock*(
# - SyncManager (during sync)
# - RequestManager (missing ancestor blocks)
# addLast doesn't fail with unbounded queues, but we'll add asyncSpawn as a
# sanity check
withBlck(blck):
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
# let backfill blocks skip the queue - these are always "fast" to process
# because there are no state rewinds to deal with
let res = self.consensusManager.dag.addBackfillBlock(blck)
if resFut != nil:
resFut.complete(res)
return
try:
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
@ -240,12 +235,10 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
res = withBlck(entry.blck):
self.storeBlock(blck, wallSlot, entry.queueTick, entry.validationDur)
if res.isOk() or res.error() == BlockError.Duplicate:
# Duplicate blocks are ok from a sync point of view, so we mark
# them as successful
entry.done()
else:
entry.fail(res.error())
if entry.resfut != nil:
entry.resfut.complete(
if res.isOk(): Result[void, BlockError].ok()
else: Result[void, BlockError].err(res.error()))
proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
while true:

View File

@ -33,7 +33,7 @@ import
./validators/[
validator_duties, validator_pool,
slashing_protection, keystore_management],
./sync/[sync_manager, sync_protocol, request_manager],
./sync/[sync_protocol],
./rpc/[rest_api, rpc_api],
./spec/datatypes/[altair, merge, phase0],
./spec/eth2_apis/rpc_beacon_client,
@ -393,6 +393,9 @@ proc init*(T: type BeaconNode,
func getFirstSlotAtFinalizedEpoch(): Slot =
dag.finalizedHead.slot
func getBackfillSlot(): Slot =
dag.backfill.slot
let
slashingProtectionDB =
SlashingProtectionDB.init(
@ -406,13 +409,22 @@ proc init*(T: type BeaconNode,
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, BlockError]]("blockVerifier")
blockProcessor[].addBlock(signedBlock, resfut)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, dag, attestationPool, exitPool, validatorPool,
syncCommitteeMsgPool, quarantine, rng, getBeaconTime, taskpool)
syncManager = newSyncManager[Peer, PeerID](
network.peerPool, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, blockProcessor, chunkSize = 32)
network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, getBackfillSlot, blockVerifier)
var node = BeaconNode(
nickname: nickname,
@ -432,7 +444,7 @@ proc init*(T: type BeaconNode,
rpcServer: rpcServer,
restServer: restServer,
eventBus: eventBus,
requestManager: RequestManager.init(network, blockProcessor),
requestManager: RequestManager.init(network, blockVerifier),
syncManager: syncManager,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
processor: processor,

View File

@ -8,6 +8,8 @@
{.push raises: [Defect].}
const
PeerScoreHeadTooNew* = -100
## The peer reports a head newer than our wall clock slot
PeerScoreNoStatus* = -100
## Peer did not answer `status` request.
PeerScoreStaleStatus* = -50

View File

@ -13,7 +13,6 @@ import
../spec/datatypes/[phase0, altair],
../spec/forks,
../networking/eth2_network,
../gossip_processing/block_processor,
"."/sync_protocol, "."/sync_manager
export sync_manager
@ -28,10 +27,14 @@ const
## Number of peers we using to resolve our request.
type
BlockVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].}
RequestManager* = object
network*: Eth2Node
inpQueue*: AsyncQueue[FetchRecord]
blockProcessor: ref BlockProcessor
blockVerifier: BlockVerifier
loopFuture: Future[void]
func shortLog*(x: seq[Eth2Digest]): string =
@ -41,11 +44,11 @@ func shortLog*(x: seq[FetchRecord]): string =
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"
proc init*(T: type RequestManager, network: Eth2Node,
blockProcessor: ref BlockProcessor): RequestManager =
blockVerifier: BlockVerifier): RequestManager =
RequestManager(
network: network,
inpQueue: newAsyncQueue[FetchRecord](),
blockProcessor: blockProcessor
blockVerifier: blockVerifier
)
proc checkResponse(roots: openArray[Eth2Digest],
@ -62,12 +65,6 @@ proc checkResponse(roots: openArray[Eth2Digest],
checks.del(res)
return true
proc validate(rman: RequestManager,
b: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] =
let resfut = newFuture[Result[void, BlockError]]("request.manager.validate")
rman.blockProcessor[].addBlock(b, resfut)
resfut
proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
items: seq[Eth2Digest]) {.async.} =
var peer: Peer
@ -88,7 +85,7 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
var res: Result[void, BlockError]
if len(ublocks) > 0:
for b in ublocks:
res = await rman.validate(b)
res = await rman.blockVerifier(b)
if res.isErr():
case res.error()
of BlockError.MissingParent:
@ -146,7 +143,7 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =
rootList.add(rman.inpQueue.popFirstNoWait().root)
dec(count)
let start = SyncMoment.now(Slot(0))
let start = SyncMoment.now(0)
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchAncestorBlocksFromNetwork(rootList)
@ -154,7 +151,7 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =
# We do not care about
await allFutures(workers)
let finish = SyncMoment.now(Slot(0) + uint64(len(rootList)))
let finish = SyncMoment.now(uint64(len(rootList)))
var succeed = 0
for worker in workers:

View File

@ -7,19 +7,18 @@
{.push raises: [Defect].}
import std/[options, heapqueue, tables, strutils, sequtils, math, algorithm]
import std/[options, heapqueue, tables, strutils, sequtils, algorithm]
import stew/results, chronos, chronicles
import
../spec/datatypes/[base, phase0, altair, merge],
../spec/datatypes/[phase0, altair, merge],
../spec/eth2_apis/rpc_types,
../spec/[helpers, forks],
../networking/[peer_pool, eth2_network],
../gossip_processing/block_processor,
../consensus_object_pools/block_pools_types,
../beacon_clock,
./peer_scores, ./sync_queue
export base, phase0, altair, merge, chronos, chronicles, results,
block_pools_types, helpers, peer_scores, sync_queue
export phase0, altair, merge, chronos, chronicles, results,
helpers, peer_scores, sync_queue, forks
logScope:
topics = "syncman"
@ -35,16 +34,9 @@ const
## Time time it takes for the peer's status information to expire.
type
SyncFailureKind* = enum
StatusInvalid,
StatusDownload,
StatusStale,
EmptyProblem,
BlockDownload,
BadResponse
SyncWorkerStatus* {.pure.} = enum
Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Processing
Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Queueing,
Processing
SyncWorker*[A, B] = object
future: Future[void]
@ -59,88 +51,83 @@ type
toleranceValue: uint64
getLocalHeadSlot: GetSlotCallback
getLocalWallSlot: GetSlotCallback
getFinalizedSlot: GetSlotCallback
getSafeSlot: GetSlotCallback
getFirstSlot: GetSlotCallback
getLastSlot: GetSlotCallback
workers: array[SyncWorkersCount, SyncWorker[A, B]]
notInSyncEvent: AsyncEvent
rangeAge: uint64
inRangeEvent*: AsyncEvent
notInRangeEvent*: AsyncEvent
chunkSize: uint64
queue: SyncQueue[A]
syncFut: Future[void]
blockProcessor: ref BlockProcessor
blockVerifier: BlockVerifier
inProgress*: bool
insSyncSpeed*: float
avgSyncSpeed*: float
timeLeft*: Duration
syncCount*: uint64
syncStatus*: string
direction: SyncQueueKind
SyncMoment* = object
stamp*: chronos.Moment
slot*: Slot
SyncFailure*[T] = object
kind*: SyncFailureKind
peer*: T
stamp*: chronos.Moment
slots*: uint64
SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]]
proc init*[T](t1: typedesc[SyncFailure], kind: SyncFailureKind,
peer: T): SyncFailure[T] =
SyncFailure[T](kind: kind, peer: peer, stamp: now(chronos.Moment))
proc now*(sm: typedesc[SyncMoment], slot: Slot): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slot: slot)
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slots: slots)
proc speed*(start, finish: SyncMoment): float {.inline.} =
## Returns number of slots per second.
let slots = finish.slot - start.slot
let dur = finish.stamp - start.stamp
let secs = float(chronos.seconds(1).nanoseconds)
if isZero(dur):
result = 0.0
if finish.slots <= start.slots or finish.stamp <= start.stamp:
0.0 # replays for example
else:
let v = float(slots) * (secs / float(dur.nanoseconds))
# We doing round manually because stdlib.round is deprecated
result = round(v * 10000) / 10000
let
slots = float(finish.slots - start.slots)
dur = toFloatSeconds(finish.stamp - start.stamp)
slots / dur
proc initQueue[A, B](man: SyncManager[A, B]) =
man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(),
man.getLastSlot(), man.chunkSize,
man.getSafeSlot, man.blockVerifier, 1)
proc newSyncManager*[A, B](pool: PeerPool[A, B],
direction: SyncQueueKind,
getLocalHeadSlotCb: GetSlotCallback,
getLocalWallSlotCb: GetSlotCallback,
getFinalizedSlotCb: GetSlotCallback,
blockProcessor: ref BlockProcessor,
getBackfillSlotCb: GetSlotCallback,
blockVerifier: BlockVerifier,
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
sleepTime = (int(SLOTS_PER_EPOCH) *
int(SECONDS_PER_SLOT)).seconds,
chunkSize = uint64(SLOTS_PER_EPOCH),
toleranceValue = uint64(1),
rangeAge = uint64(SLOTS_PER_EPOCH * 4)
toleranceValue = uint64(1)
): SyncManager[A, B] =
let queue = SyncQueue.init(A, SyncQueueKind.Forward, getLocalHeadSlotCb(),
getLocalWallSlotCb(), chunkSize,
getFinalizedSlotCb, blockProcessor, 1)
let (getFirstSlot, getLastSlot, getSafeSlot) = case direction
of SyncQueueKind.Forward:
(getLocalHeadSlotCb, getLocalWallSlotCb, getFinalizedSlotCb)
of SyncQueueKind.Backward:
(getBackfillSlotCb, GetSlotCallback(proc(): Slot = Slot(0)), getBackfillSlotCb)
result = SyncManager[A, B](
pool: pool,
maxStatusAge: maxStatusAge,
getLocalHeadSlot: getLocalHeadSlotCb,
getLocalWallSlot: getLocalWallSlotCb,
getFinalizedSlot: getFinalizedSlotCb,
getSafeSlot: getSafeSlot,
getFirstSlot: getFirstSlot,
getLastSlot: getLastSlot,
maxHeadAge: maxHeadAge,
sleepTime: sleepTime,
chunkSize: chunkSize,
queue: queue,
blockProcessor: blockProcessor,
blockVerifier: blockVerifier,
notInSyncEvent: newAsyncEvent(),
inRangeEvent: newAsyncEvent(),
notInRangeEvent: newAsyncEvent(),
rangeAge: rangeAge
direction: direction
)
result.initQueue()
proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BeaconBlocksRes] {.async.} =
@ -149,21 +136,21 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
debug "Requesting blocks from peer", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
direction = man.direction, topics = "syncman"
if peer.useSyncV2():
var workFut = awaitne beaconBlocksByRange_v2(peer, req.slot, req.count, req.step)
if workFut.failed():
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errMsg = workFut.readError().msg, peer_speed = peer.netKbps(),
topics = "syncman"
direction = man.direction, topics = "syncman"
else:
let res = workFut.read()
if res.isErr:
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
step = req.step, peer_speed = peer.netKbps(),
topics = "syncman", error = $res.error()
direction = man.direction, topics = "syncman", error = $res.error()
result = res
else:
var workFut = awaitne beaconBlocksByRange(peer, req.slot, req.count, req.step)
@ -171,7 +158,7 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errMsg = workFut.readError().msg, peer_speed = peer.netKbps(),
topics = "syncman"
direction = man.direction, topics = "syncman"
else:
let res = workFut.read()
if res.isErr:
@ -195,22 +182,22 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
let headSlot = man.getLocalHeadSlot()
var peerSlot = peer.getHeadSlot()
# We updating SyncQueue's last slot all the time
man.queue.updateLastSlot(wallSlot)
debug "Peer's syncing status", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), peer = peer, index = index,
peer_speed = peer.netKbps(), topics = "syncman"
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
# Check if peer's head slot is bigger than our wall clock slot.
if peerSlot > wallSlot + man.toleranceValue:
warn "Local timer is broken or peer's status information is invalid",
peer.updateScore(PeerScoreHeadTooNew)
warn "Peer reports a head newer than our wall clock - clock out of sync?",
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_head_slot = headSlot, peer = peer, index = index,
tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), topics = "syncman"
discard SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
peer_score = peer.getScore(), direction = man.direction,
topics = "syncman"
return
# Check if we need to update peer's status information
@ -220,7 +207,8 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
trace "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), topics = "syncman"
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
try:
let res = await peer.updateStatus()
@ -228,14 +216,15 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting", peer = peer,
peer_score = peer.getScore(), peer_head_slot = peerSlot,
peer_speed = peer.netKbps(), index = index, topics = "syncman"
discard SyncFailure.init(SyncFailureKind.StatusDownload, peer)
peer_speed = peer.netKbps(), index = index,
direction = man.direction, topics = "syncman"
return
except CatchableError as exc:
debug "Unexpected exception while updating peer's status",
peer = peer, peer_score = peer.getScore(),
peer_head_slot = peerSlot, peer_speed = peer.netKbps(),
index = index, errMsg = exc.msg, topics = "syncman"
index = index, errMsg = exc.msg, direction = man.direction,
topics = "syncman"
return
let newPeerSlot = peer.getHeadSlot()
@ -245,13 +234,14 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), topics = "syncman"
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
else:
debug "Peer's status information updated", wall_clock_slot = wallSlot,
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
index = index, topics = "syncman"
index = index, direction = man.direction, topics = "syncman"
peer.updateScore(PeerScoreGoodStatus)
peerSlot = newPeerSlot
@ -269,7 +259,8 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
debug "We are in sync with peer; refreshing peer's status information",
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_head_slot = headSlot, peer = peer, peer_score = peer.getScore(),
index = index, peer_speed = peer.netKbps(), topics = "syncman"
index = index, peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
@ -282,14 +273,15 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting", peer = peer,
peer_score = peer.getScore(), peer_head_slot = peerSlot,
peer_speed = peer.netKbps(), index = index, topics = "syncman"
discard SyncFailure.init(SyncFailureKind.StatusDownload, peer)
peer_speed = peer.netKbps(), index = index,
direction = man.direction, topics = "syncman"
return
except CatchableError as exc:
debug "Unexpected exception while updating peer's status",
peer = peer, peer_score = peer.getScore(),
peer_head_slot = peerSlot, peer_speed = peer.netKbps(),
index = index, errMsg = exc.msg, topics = "syncman"
index = index, errMsg = exc.msg, direction = man.direction,
topics = "syncman"
return
let newPeerSlot = peer.getHeadSlot()
@ -299,7 +291,8 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), topics = "syncman"
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
else:
# This is not very good solution because we should not discriminate and/or
# penalize peers which are in sync process too, but their latest head is
@ -315,19 +308,23 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), index = index, topics = "syncman"
peer_speed = peer.netKbps(), index = index,
direction = man.direction, topics = "syncman"
peer.updateScore(PeerScoreUseless)
else:
debug "Peer's status information updated", wall_clock_slot = wallSlot,
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
index = index, topics = "syncman"
index = index, direction = man.direction, topics = "syncman"
peer.updateScore(PeerScoreGoodStatus)
peerSlot = newPeerSlot
return
# We updating SyncQueue's last slot all the time
man.queue.updateLastSlot(man.getLastSlot())
man.workers[index].status = SyncWorkerStatus.Requesting
let req = man.queue.pop(peerSlot, peer)
if req.isEmpty():
@ -345,7 +342,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
queue_output_slot = man.queue.outSlot,
queue_last_slot = man.queue.finalSlot,
peer_speed = peer.netKbps(), peer_score = peer.getScore(),
index = index, topics = "syncman"
index = index, direction = man.direction, topics = "syncman"
await sleepAsync(RESP_TIMEOUT)
return
@ -353,7 +350,8 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
remote_head_slot = peerSlot, local_head_slot = headSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), index = index, topics = "syncman"
peer_score = peer.getScore(), index = index,
direction = man.direction, topics = "syncman"
man.workers[index].status = SyncWorkerStatus.Downloading
@ -366,7 +364,8 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
blocks_map = smap, request_slot = req.slot,
request_count = req.count, request_step = req.step,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), index = index, topics = "syncman"
peer_speed = peer.netKbps(), index = index,
direction = man.direction, topics = "syncman"
if not(checkResponse(req, data)):
peer.updateScore(PeerScoreBadResponse)
@ -375,13 +374,13 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
index = index, topics = "syncman"
discard SyncFailure.init(SyncFailureKind.BadResponse, peer)
index = index, direction = man.direction, topics = "syncman"
return
# Scoring will happen in `syncUpdate`.
man.workers[index].status = SyncWorkerStatus.Processing
await man.queue.push(req, data)
man.workers[index].status = SyncWorkerStatus.Queueing
await man.queue.push(req, data, proc() =
man.workers[index].status = SyncWorkerStatus.Processing)
else:
peer.updateScore(PeerScoreNoBlocks)
man.queue.push(req)
@ -389,8 +388,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
discard SyncFailure.init(SyncFailureKind.BlockDownload, peer)
direction = man.direction, topics = "syncman"
return
except CatchableError as exc:
@ -398,13 +396,14 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
errMsg = exc.msg, topics = "syncman"
errMsg = exc.msg, direction = man.direction, topics = "syncman"
return
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
mixin getKey, getScore, getHeadSlot
debug "Starting syncing worker", index = index, topics = "syncman"
debug "Starting syncing worker",
index = index, direction = man.direction, topics = "syncman"
while true:
man.workers[index].status = SyncWorkerStatus.Sleeping
@ -439,6 +438,9 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
of SyncWorkerStatus.Downloading:
ch = 'D'
inc(pending)
of SyncWorkerStatus.Queueing:
ch = 'Q'
inc(pending)
of SyncWorkerStatus.Processing:
ch = 'P'
inc(pending)
@ -460,37 +462,40 @@ proc guardTask[A, B](man: SyncManager[A, B]) {.async.} =
let index = pending.find(failFuture)
if failFuture.failed():
warn "Synchronization worker stopped working unexpectedly with an error",
index = index, errMsg = failFuture.error.msg
index = index, errMsg = failFuture.error.msg, direction = man.direction
else:
warn "Synchronization worker stopped working unexpectedly without error",
index = index
index = index, direction = man.direction
let future = syncWorker[A, B](man, index)
man.workers[index].future = future
pending[index] = future
proc toTimeLeftString(d: Duration): string =
var v = d
var res = ""
let ndays = chronos.days(v)
if ndays > 0:
res = res & (if ndays < 10: "0" & $ndays else: $ndays) & "d"
v = v - chronos.days(ndays)
let nhours = chronos.hours(v)
if nhours > 0:
res = res & (if nhours < 10: "0" & $nhours else: $nhours) & "h"
v = v - chronos.hours(nhours)
if d == InfiniteDuration:
"--h--m"
else:
res = res & "00h"
var v = d
var res = ""
let ndays = chronos.days(v)
if ndays > 0:
res = res & (if ndays < 10: "0" & $ndays else: $ndays) & "d"
v = v - chronos.days(ndays)
let nmins = chronos.minutes(v)
if nmins > 0:
res = res & (if nmins < 10: "0" & $nmins else: $nmins) & "m"
v = v - chronos.minutes(nmins)
else:
res = res & "00m"
res
let nhours = chronos.hours(v)
if nhours > 0:
res = res & (if nhours < 10: "0" & $nhours else: $nhours) & "h"
v = v - chronos.hours(nhours)
else:
res = res & "00h"
let nmins = chronos.minutes(v)
if nmins > 0:
res = res & (if nmins < 10: "0" & $nmins else: $nmins) & "m"
v = v - chronos.minutes(nmins)
else:
res = res & "00m"
res
proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
mixin getKey, getScore
@ -502,23 +507,33 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
proc averageSpeedTask() {.async.} =
while true:
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
let lsm1 = SyncMoment.now(man.getLocalHeadSlot())
await sleepAsync(chronos.seconds(int(SECONDS_PER_SLOT)))
let lsm2 = SyncMoment.now(man.getLocalHeadSlot())
let bps =
if lsm2.slot - lsm1.slot == 0'u64:
0.0
else:
speed(lsm1, lsm2)
inc(man.syncCount)
man.insSyncSpeed = bps
man.avgSyncSpeed = man.avgSyncSpeed +
(bps - man.avgSyncSpeed) / float(man.syncCount)
let nsec = (float(wallSlot - headSlot) / man.avgSyncSpeed) *
1_000_000_000.0
man.timeLeft = chronos.nanoseconds(int64(nsec))
# Reset sync speeds between each loss-of-sync event
man.avgSyncSpeed = 0
man.insSyncSpeed = 0
await man.notInSyncEvent.wait()
# Give the node time to connect to peers and get the sync process started
await sleepAsync(seconds(SECONDS_PER_SLOT.int64))
var
stamp = SyncMoment.now(man.queue.progress())
syncCount = 0
while man.inProgress:
await sleepAsync(seconds(SECONDS_PER_SLOT.int64))
let
newStamp = SyncMoment.now(man.queue.progress())
slotsPerSec = speed(stamp, newStamp)
syncCount += 1
man.insSyncSpeed = slotsPerSec
man.avgSyncSpeed =
man.avgSyncSpeed + (slotsPerSec - man.avgSyncSpeed) / float(syncCount)
stamp = newStamp
asyncSpawn averageSpeedTask()
@ -535,13 +550,23 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
wall_head_slot = wallSlot, local_head_slot = headSlot,
pause_time = $chronos.seconds(pauseTime),
avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed,
topics = "syncman"
direction = man.direction, topics = "syncman"
let
progress = float(man.queue.progress())
total = float(man.queue.total())
remaining = total - progress
done = if total > 0.0: progress / total else: 1.0
timeleft =
if man.avgSyncSpeed >= 0.001:
Duration.fromFloatSeconds(remaining / man.avgSyncSpeed)
else: InfiniteDuration
# Update status string
man.syncStatus = map & ":" & $pending & ":" &
man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) & ":" &
man.timeLeft.toTimeLeftString() &
" (" & $man.queue.outSlot & ")"
man.syncStatus = timeLeft.toTimeLeftString() & " (" &
(done * 100).formatBiggestFloat(ffDecimal, 2) & "%) " &
man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) &
"slots/s (" & map & ":" & $man.queue.outSlot & ")"
if headAge <= man.maxHeadAge:
man.notInSyncEvent.clear()
@ -553,37 +578,25 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge,
sleeping_workers_count = sleeping,
waiting_workers_count = waiting, pending_workers_count = pending,
topics = "syncman"
direction = man.direction, topics = "syncman"
man.inProgress = true
else:
debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
local_head_slot = headSlot, difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge, topics = "syncman"
max_head_age = man.maxHeadAge, direction = man.direction,
topics = "syncman"
man.inProgress = false
else:
if not(man.notInSyncEvent.isSet()):
# We get here only if we lost sync for more then `maxHeadAge` period.
if pending == 0:
man.queue = SyncQueue.init(A, SyncQueueKind.Forward,
man.getLocalHeadSlot(),
man.getLocalWallSlot(),
man.chunkSize, man.getFinalizedSlot,
man.blockProcessor, 1)
man.initQueue()
man.notInSyncEvent.fire()
man.inProgress = true
else:
man.notInSyncEvent.fire()
man.inProgress = true
if queueAge <= man.rangeAge:
# We are in requested range ``man.rangeAge``.
man.inRangeEvent.fire()
man.notInRangeEvent.clear()
else:
# We are not in requested range anymore ``man.rangeAge``.
man.inRangeEvent.clear()
man.notInRangeEvent.fire()
await sleepAsync(chronos.seconds(2))
proc start*[A, B](man: SyncManager[A, B]) =

View File

@ -26,6 +26,10 @@ logScope:
type
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
ProcessingCallback* = proc() {.gcsafe, raises: [Defect].}
BlockVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].}
SyncQueueKind* {.pure.} = enum
Forward, Backward
@ -42,9 +46,9 @@ type
request*: SyncRequest[T]
data*: seq[ForkedSignedBeaconBlock]
SyncWaiter*[T] = object
future: Future[bool]
request: SyncRequest[T]
SyncWaiter* = ref object
future: Future[void]
reset: bool
RewindPoint = object
failSlot: Slot
@ -61,24 +65,17 @@ type
counter*: uint64
opcounter*: uint64
pending*: Table[uint64, SyncRequest[T]]
waiters: seq[SyncWaiter[T]]
waiters: seq[SyncWaiter]
getSafeSlot*: GetSlotCallback
debtsQueue: HeapQueue[SyncRequest[T]]
debtsCount: uint64
readyQueue: HeapQueue[SyncResult[T]]
rewind: Option[RewindPoint]
blockProcessor: ref BlockProcessor
blockVerifier: BlockVerifier
SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]]
proc validate*[T](sq: SyncQueue[T],
blk: ForkedSignedBeaconBlock
): Future[Result[void, BlockError]] =
let resfut = newFuture[Result[void, BlockError]]("sync.manager.validate")
sq.blockProcessor[].addBlock(blk, resfut)
resfut
proc getShortMap*[T](req: SyncRequest[T],
data: openArray[ForkedSignedBeaconBlock]): string =
## Returns all slot numbers in ``data`` as placement map.
@ -170,7 +167,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
queueKind: SyncQueueKind,
start, final: Slot, chunkSize: uint64,
getSafeSlotCb: GetSlotCallback,
blockProcessor: ref BlockProcessor,
blockVerifier: BlockVerifier,
syncQueueSize: int = -1): SyncQueue[T] =
## Create new synchronization queue with parameters
##
@ -227,13 +224,13 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
chunkSize: chunkSize,
queueSize: syncQueueSize,
getSafeSlot: getSafeSlotCb,
waiters: newSeq[SyncWaiter[T]](),
waiters: newSeq[SyncWaiter](),
counter: 1'u64,
pending: initTable[uint64, SyncRequest[T]](),
debtsQueue: initHeapQueue[SyncRequest[T]](),
inpSlot: start,
outSlot: start,
blockProcessor: blockProcessor
blockVerifier: blockVerifier
)
proc `<`*[T](a, b: SyncRequest[T]): bool =
@ -279,36 +276,38 @@ proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
$sq.finalSlot & " >= " & $last)
sq.finalSlot = last
proc wakeupWaiters[T](sq: SyncQueue[T], flag = true) =
proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) =
## Wakeup one or all blocked waiters.
for item in sq.waiters:
if not(item.future.finished()):
item.future.complete(flag)
if reset:
item.reset = true
proc waitForChanges[T](sq: SyncQueue[T],
req: SyncRequest[T]): Future[bool] {.async.} =
if not(item.future.finished()):
item.future.complete()
proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} =
## Create new waiter and wait for completion from `wakeupWaiters()`.
var waitfut = newFuture[bool]("SyncQueue.waitForChanges")
let waititem = SyncWaiter[T](future: waitfut, request: req)
var waitfut = newFuture[void]("SyncQueue.waitForChanges")
let waititem = SyncWaiter(future: waitfut)
sq.waiters.add(waititem)
try:
let res = await waitfut
return res
await waitfut
return waititem.reset
finally:
sq.waiters.delete(sq.waiters.find(waititem))
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} =
## This procedure will perform wakeupWaiters(false) and blocks until last
## waiter will be awakened.
var waitChanges = sq.waitForChanges(SyncRequest.empty(sq.kind, T))
sq.wakeupWaiters(false)
var waitChanges = sq.waitForChanges()
sq.wakeupWaiters(true)
discard await waitChanges
proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} =
## Perform reset of all the blocked waiters in SyncQueue.
##
## We adding one more waiter to the waiters sequence and
## call wakeupWaiters(false). Because our waiter is last in sequence of
## call wakeupWaiters(true). Because our waiter is last in sequence of
## waiters it will be resumed only after all waiters will be awakened and
## finished.
@ -488,17 +487,16 @@ proc advanceInput[T](sq: SyncQueue[T], number: uint64) =
of SyncQueueKind.Backward:
sq.inpSlot = sq.inpSlot - number
proc notInRange[T](sq: SyncQueue[T], slot: Slot): bool =
proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool =
case sq.kind
of SyncQueueKind.Forward:
(sq.queueSize > 0) and
(slot >= sq.outSlot + uint64(sq.queueSize) * sq.chunkSize)
(sq.queueSize > 0) and (sr.slot != sq.outSlot)
of SyncQueueKind.Backward:
(sq.queueSize > 0) and
(uint64(sq.queueSize) * sq.chunkSize <= sq.outSlot - slot)
(sq.queueSize > 0) and (sr.slot + sr.count - 1'u64 != sq.outSlot)
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ForkedSignedBeaconBlock]) {.async, gcsafe.} =
data: seq[ForkedSignedBeaconBlock],
processingCb: ProcessingCallback = nil) {.async.} =
## Push successful result to queue ``sq``.
mixin updateScore
@ -512,25 +510,17 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
sq.pending.del(sr.index)
# This is backpressure handling algorithm, this algorithm is blocking
# all pending `push` requests if `request.slot` not in range:
# [current_queue_slot, current_queue_slot + sq.queueSize * sq.chunkSize].
var exitNow = false
# all pending `push` requests if `request.slot` not in range.
while true:
if sq.notInRange(sr.slot):
let res = await sq.waitForChanges(sr)
if res:
continue
else:
if sq.notInRange(sr):
let reset = await sq.waitForChanges()
if reset:
# SyncQueue reset happens. We are exiting to wake up sync-worker.
exitNow = true
break
let syncres = SyncResult[T](request: sr, data: data)
sq.readyQueue.push(syncres)
exitNow = false
break
if exitNow:
return
return
else:
let syncres = SyncResult[T](request: sr, data: data)
sq.readyQueue.push(syncres)
break
while len(sq.readyQueue) > 0:
let reqres =
@ -567,6 +557,12 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
await sq.resetWait(some(rewindSlot))
break
if processingCb != nil:
processingCb()
template isOkResponse(res: auto): bool =
res.isOk() or res.error in {BlockError.Duplicate, BlockError.UnviableFork}
# Validating received blocks one by one
var res: Result[void, BlockError]
var failSlot: Option[Slot]
@ -574,8 +570,8 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
for blk in sq.blocks(item):
trace "Pushing block", block_root = blk.root,
block_slot = blk.slot
res = await sq.validate(blk)
if res.isErr():
res = await sq.blockVerifier(blk)
if not res.isOkResponse():
failSlot = some(blk.slot)
break
else:
@ -585,7 +581,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# not stuck.
inc(sq.opcounter)
if res.isOk():
if res.isOkResponse():
sq.advanceOutput(item.request.count)
if len(item.data) > 0:
# If there no error and response was not empty we should reward peer
@ -603,7 +599,8 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
var resetSlot: Option[Slot]
if res.error == BlockError.MissingParent:
case res.error
of BlockError.MissingParent:
# If we got `BlockError.MissingParent` it means that peer returns chain
# of blocks with holes or `block_pool` is in incomplete state. We going
# to rewind to the first slot at latest finalized epoch.
@ -650,21 +647,15 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
elif res.error == BlockError.Invalid:
of BlockError.Invalid:
let req = item.request
warn "Received invalid sequence of blocks", peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
else:
let req = item.request
warn "Received unexpected response from block_pool", peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data), errorCode = res.error,
topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
of BlockError.Duplicate, BlockError.UnviableFork:
raiseAssert "Handled above"
# We need to move failed response to the debts queue.
sq.toDebtsQueue(item.request)
@ -773,11 +764,9 @@ proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} =
sq.startSlot + 1'u64 - sq.finalSlot
proc progress*[T](sq: SyncQueue[T]): uint64 =
## Returns queue's ``sq`` progress string.
let curSlot =
case sq.kind
of SyncQueueKind.Forward:
sq.outSlot - sq.startSlot
of SyncQueueKind.Backward:
sq.startSlot - sq.outSlot
(curSlot * 100'u64) div sq.total()
## How many slots we've synced so far
case sq.kind
of SyncQueueKind.Forward:
sq.outSlot - sq.startSlot
of SyncQueueKind.Backward:
sq.startSlot - sq.outSlot

View File

@ -23,13 +23,23 @@ proc getFirstSlotAtFinalizedEpoch(): Slot =
proc getSafeSlot(): Slot =
Slot(1024)
proc newBlockProcessor(): ref BlockProcessor =
# Minimal block processor for test - the real block processor has an unbounded
# queue but the tests here
(ref BlockProcessor)(
blockQueue: newAsyncQueue[BlockEntry]()
)
type
BlockEntry = object
blck*: ForkedSignedBeaconBlock
resfut*: Future[Result[void, BlockError]]
proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier =
# This sets up a fake block verifiation collector that simply puts the blocks
# in the async queue, similar to how BlockProcessor does it - as far as
# testing goes, this is risky because it might introduce differences between
# the BlockProcessor and this test
proc verify(signedBlock: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] =
let fut = newFuture[Result[void, BlockError]]()
try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut))
except CatchableError as exc: raiseAssert exc.msg
return fut
return verify
suite "SyncManager test suite":
proc createChain(start, finish: Slot): seq[ForkedSignedBeaconBlock] =
doAssert(start <= finish)
@ -50,10 +60,11 @@ suite "SyncManager test suite":
template startAndFinishSlotsEqual(kind: SyncQueueKind) =
let p1 = SomeTPeer()
let aq = newBlockProcessor()
let aq = newAsyncQueue[BlockEntry]()
var queue = SyncQueue.init(SomeTPeer, kind,
Slot(0), Slot(0), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
getFirstSlotAtFinalizedEpoch, collector(aq))
check:
len(queue) == 1
pendingLen(queue) == 0
@ -145,10 +156,10 @@ suite "SyncManager test suite":
]
for item in Checks:
let aq = newBlockProcessor()
let aq = newAsyncQueue[BlockEntry]()
var queue = SyncQueue.init(SomeTPeer, kind,
item[0], item[1], item[2],
getFirstSlotAtFinalizedEpoch, aq)
getFirstSlotAtFinalizedEpoch, collector(aq))
check:
len(queue) == item[4]
pendingLen(queue) == item[5]
@ -167,17 +178,17 @@ suite "SyncManager test suite":
req2.isEmpty() == true
template twoFullRequests(kkind: SyncQueueKind) =
let aq = newBlockProcessor()
let aq = newAsyncQueue[BlockEntry]()
var queue =
case kkind
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(1), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
getFirstSlotAtFinalizedEpoch, collector(aq))
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
Slot(1), Slot(0), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
getFirstSlotAtFinalizedEpoch, collector(aq))
let p1 = SomeTPeer()
let p2 = SomeTPeer()
@ -231,10 +242,14 @@ suite "SyncManager test suite":
r21.slot == Slot(1) and r21.count == 1'u64 and r21.step == 1'u64
r22.slot == Slot(0) and r22.count == 1'u64 and r22.step == 1'u64
template done(b: BlockEntry) =
b.resfut.complete(Result[void, BlockError].ok())
template fail(b: BlockEntry, e: untyped) =
b.resfut.complete(Result[void, BlockError].err(e))
template smokeTest(kkind: SyncQueueKind, start, finish: Slot,
chunkSize: uint64) =
let
aq = newBlockProcessor()
let aq = newAsyncQueue[BlockEntry]()
var counter =
case kkind
@ -267,18 +282,18 @@ suite "SyncManager test suite":
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
start, finish, chunkSize,
getFirstSlotAtFinalizedEpoch, aq)
getFirstSlotAtFinalizedEpoch, collector(aq))
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finish, start, chunkSize,
getFirstSlotAtFinalizedEpoch, aq)
getFirstSlotAtFinalizedEpoch, collector(aq))
chain = createChain(start, finish)
validatorFut =
case kkind
of SyncQueueKind.Forward:
forwardValidator(aq[].blockQueue)
forwardValidator(aq)
of SyncQueueKind.Backward:
backwardValidator(aq[].blockQueue)
backwardValidator(aq)
let p1 = SomeTPeer()
@ -299,7 +314,7 @@ suite "SyncManager test suite":
template unorderedAsyncTest(kkind: SyncQueueKind, startSlot: Slot) =
let
aq = newBlockProcessor()
aq = newAsyncQueue[BlockEntry]()
chunkSize = 3'u64
numberOfChunks = 3'u64
finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64)
@ -337,19 +352,19 @@ suite "SyncManager test suite":
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
startSlot, finishSlot, chunkSize,
getFirstSlotAtFinalizedEpoch, aq,
getFirstSlotAtFinalizedEpoch, collector(aq),
queueSize)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finishSlot, startSlot, chunkSize,
getFirstSlotAtFinalizedEpoch, aq,
getFirstSlotAtFinalizedEpoch, collector(aq),
queueSize)
validatorFut =
case kkind
of SyncQueueKind.Forward:
forwardValidator(aq[].blockQueue)
forwardValidator(aq)
of SyncQueueKind.Backward:
backwardValidator(aq[].blockQueue)
backwardValidator(aq)
let
p1 = SomeTPeer()
@ -425,7 +440,7 @@ suite "SyncManager test suite":
test "[SyncQueue#Forward] Async unordered push with rewind test":
let
aq = newBlockProcessor()
aq = newAsyncQueue[BlockEntry]()
startSlot = Slot(0)
chunkSize = SLOTS_PER_EPOCH
numberOfChunks = 4'u64
@ -451,9 +466,9 @@ suite "SyncManager test suite":
chain = createChain(startSlot, finishSlot)
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
startSlot, finishSlot, chunkSize,
getFirstSlotAtFinalizedEpoch, aq,
getFirstSlotAtFinalizedEpoch, collector(aq),
queueSize)
validatorFut = forwardValidator(aq[].blockQueue)
validatorFut = forwardValidator(aq)
let
p1 = SomeTPeer()
@ -539,7 +554,7 @@ suite "SyncManager test suite":
test "[SyncQueue#Backward] Async unordered push with rewind test":
let
aq = newBlockProcessor()
aq = newAsyncQueue[BlockEntry]()
startSlot = Slot(0)
chunkSize = SLOTS_PER_EPOCH
numberOfChunks = 4'u64
@ -571,8 +586,8 @@ suite "SyncManager test suite":
chain = createChain(startSlot, finishSlot)
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finishSlot, startSlot, chunkSize,
getSafeSlot, aq, queueSize)
validatorFut = backwardValidator(aq[].blockQueue)
getSafeSlot, collector(aq), queueSize)
validatorFut = backwardValidator(aq)
let
p1 = SomeTPeer()
@ -771,11 +786,12 @@ suite "SyncManager test suite":
checkResponse(r22, @[chain[3], chain[1]]) == false
test "[SyncQueue#Forward] getRewindPoint() test":
let aq = newBlockProcessor()
let aq = newAsyncQueue[BlockEntry]()
block:
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getFirstSlotAtFinalizedEpoch, aq, 2)
1'u64, getFirstSlotAtFinalizedEpoch,
collector(aq), 2)
let finalizedSlot = compute_start_slot_at_epoch(Epoch(0'u64))
let startSlot = compute_start_slot_at_epoch(Epoch(0'u64)) + 1'u64
let finishSlot = compute_start_slot_at_epoch(Epoch(2'u64))
@ -786,7 +802,8 @@ suite "SyncManager test suite":
block:
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getFirstSlotAtFinalizedEpoch, aq, 2)
1'u64, getFirstSlotAtFinalizedEpoch,
collector(aq), 2)
let finalizedSlot = compute_start_slot_at_epoch(Epoch(1'u64))
let startSlot = compute_start_slot_at_epoch(Epoch(1'u64)) + 1'u64
let finishSlot = compute_start_slot_at_epoch(Epoch(3'u64))
@ -797,7 +814,8 @@ suite "SyncManager test suite":
block:
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getFirstSlotAtFinalizedEpoch, aq, 2)
1'u64, getFirstSlotAtFinalizedEpoch,
collector(aq), 2)
let finalizedSlot = compute_start_slot_at_epoch(Epoch(0'u64))
let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64)
let failEpoch = compute_epoch_at_slot(failSlot)
@ -814,7 +832,8 @@ suite "SyncManager test suite":
block:
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getFirstSlotAtFinalizedEpoch, aq, 2)
1'u64, getFirstSlotAtFinalizedEpoch,
collector(aq), 2)
let finalizedSlot = compute_start_slot_at_epoch(Epoch(1'u64))
let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64)
let failEpoch = compute_epoch_at_slot(failSlot)
@ -828,11 +847,11 @@ suite "SyncManager test suite":
counter = counter shl 1
test "[SyncQueue#Backward] getRewindPoint() test":
let aq = newBlockProcessor()
let aq = newAsyncQueue[BlockEntry]()
block:
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
Slot(1024), Slot(0),
1'u64, getSafeSlot, aq, 2)
1'u64, getSafeSlot, collector(aq), 2)
let safeSlot = getSafeSlot()
for i in countdown(1023, 0):
check queue.getRewindPoint(Slot(i), safeSlot) == safeSlot