SyncManager fix to process blocks one by one. (#1464)

* Allow sync manager process blocks one by one.

* Log storeBlock() and updateHead() duration.

* Calculate duration only for blocks added without any error.

* Fix float compilation error.

* Fix duration.

* Fix SyncQueue tests.
This commit is contained in:
Eugene Kabanov 2020-08-10 10:15:50 +03:00 committed by GitHub
parent cd0f4a2d23
commit 55fcece0b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 155 additions and 110 deletions

View File

@ -564,34 +564,6 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} =
let fepoch = node.chainDag.headState.data.data.finalized_checkpoint.epoch let fepoch = node.chainDag.headState.data.data.finalized_checkpoint.epoch
compute_start_slot_at_epoch(fepoch) compute_start_slot_at_epoch(fepoch)
proc updateLocalBlocks(list: openArray[SignedBeaconBlock]): Result[void, BlockError] =
debug "Forward sync imported blocks", count = len(list),
local_head_slot = getLocalHeadSlot()
let sm = now(chronos.Moment)
for blk in list:
let res = node.storeBlock(blk)
# We going to ignore `BlockError.Unviable` errors because we have working
# backward sync and it can happens that we can perform overlapping
# requests.
# For the same reason we ignore Duplicate blocks as if they are duplicate
# from before the current finalized epoch, we can drop them
# (and they may have no parents anymore in the fork choice if it was pruned)
if res.isErr and res.error notin {BlockError.Unviable, BlockError.Old, BLockError.Duplicate}:
return res
discard node.updateHead(node.beaconClock.now().slotOrZero)
let dur = now(chronos.Moment) - sm
let secs = float(chronos.seconds(1).nanoseconds)
var storeSpeed = 0.0
if not(dur.isZero()):
let v = float(len(list)) * (secs / float(dur.nanoseconds))
# We doing round manually because stdlib.round is deprecated
storeSpeed = round(v * 10000) / 10000
info "Forward sync blocks got imported successfully", count = len(list),
local_head_slot = getLocalHeadSlot(), store_speed = storeSpeed
ok()
proc scoreCheck(peer: Peer): bool = proc scoreCheck(peer: Peer): bool =
if peer.score < PeerScoreLowLimit: if peer.score < PeerScoreLowLimit:
try: try:
@ -608,16 +580,41 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} =
node.syncManager = newSyncManager[Peer, PeerID]( node.syncManager = newSyncManager[Peer, PeerID](
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot, node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, updateLocalBlocks, getFirstSlotAtFinalizedEpoch, chunkSize = 32
# 4 blocks per chunk is the optimal value right now, because our current
# syncing speed is around 4 blocks per second. So there no need to request
# more then 4 blocks right now. As soon as `store_speed` value become
# significantly more then 4 blocks per second you can increase this
# value appropriately.
chunkSize = 4
) )
await node.syncManager.sync() node.syncManager.start()
while true:
let sblock = await node.syncManager.getBlock()
let sm1 = now(chronos.Moment)
let res = node.storeBlock(sblock.blk)
let em1 = now(chronos.Moment)
if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}):
let sm2 = now(chronos.Moment)
discard node.updateHead(node.beaconClock.now().slotOrZero)
let em2 = now(chronos.Moment)
sblock.done()
let duration1 = if res.isOk(): em1 - sm1 else: ZeroDuration
let duration2 = if res.isOk(): em2 - sm2 else: ZeroDuration
let duration = if res.isOk(): em2 - sm1 else: ZeroDuration
let storeSpeed =
block:
let secs = float(chronos.seconds(1).nanoseconds)
if not(duration.isZero()):
let v = secs / float(duration.nanoseconds)
round(v * 10_000) / 10_000
else:
0.0
debug "Block got imported successfully",
local_head_slot = getLocalHeadSlot(), store_speed = storeSpeed,
block_root = shortLog(sblock.blk.root),
block_slot = sblock.blk.message.slot,
store_block_duration = $duration1,
update_head_duration = $duration2,
store_duration = $duration
else:
sblock.fail(res.error)
proc currentSlot(node: BeaconNode): Slot = proc currentSlot(node: BeaconNode): Slot =
node.beaconClock.now.slotOrZero node.beaconClock.now.slotOrZero

View File

@ -39,14 +39,9 @@ type
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
UpdateLocalBlocksCallback* = SyncBlock* = object
proc(list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. blk*: SignedBeaconBlock
gcsafe.} resfut*: Future[Result[void, BlockError]]
SyncUpdateCallback*[T] =
proc(req: SyncRequest[T],
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.
gcsafe.}
SyncRequest*[T] = object SyncRequest*[T] = object
index*: uint64 index*: uint64
@ -73,12 +68,12 @@ type
counter*: uint64 counter*: uint64
pending*: Table[uint64, SyncRequest[T]] pending*: Table[uint64, SyncRequest[T]]
waiters: seq[SyncWaiter[T]] waiters: seq[SyncWaiter[T]]
syncUpdate*: SyncUpdateCallback[T]
getFinalizedSlot*: GetSlotCallback getFinalizedSlot*: GetSlotCallback
debtsQueue: HeapQueue[SyncRequest[T]] debtsQueue: HeapQueue[SyncRequest[T]]
debtsCount: uint64 debtsCount: uint64
readyQueue: HeapQueue[SyncResult[T]] readyQueue: HeapQueue[SyncResult[T]]
suspects: seq[SyncResult[T]] suspects: seq[SyncResult[T]]
outQueue: AsyncQueue[SyncBlock]
SyncManager*[A, B] = ref object SyncManager*[A, B] = ref object
pool: PeerPool[A, B] pool: PeerPool[A, B]
@ -92,10 +87,11 @@ type
getLocalHeadSlot: GetSlotCallback getLocalHeadSlot: GetSlotCallback
getLocalWallSlot: GetSlotCallback getLocalWallSlot: GetSlotCallback
getFinalizedSlot: GetSlotCallback getFinalizedSlot: GetSlotCallback
syncUpdate: SyncUpdateCallback[A]
chunkSize: uint64 chunkSize: uint64
queue: SyncQueue[A] queue: SyncQueue[A]
failures: seq[SyncFailure[A]] failures: seq[SyncFailure[A]]
syncFut: Future[void]
outQueue: AsyncQueue[SyncBlock]
inProgress*: bool inProgress*: bool
SyncMoment* = object SyncMoment* = object
@ -110,6 +106,15 @@ type
SyncManagerError* = object of CatchableError SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]] BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]]
proc validate*[T](sq: SyncQueue[T],
blk: SignedBeaconBlock): Future[Result[void, BlockError]] {.async.} =
let sblock = SyncBlock(
blk: blk,
resfut: newFuture[Result[void, BlockError]]("sync.manager.validate")
)
await sq.outQueue.addLast(sblock)
return await sblock.resfut
proc getShortMap*[T](req: SyncRequest[T], proc getShortMap*[T](req: SyncRequest[T],
data: openarray[SignedBeaconBlock]): string = data: openarray[SignedBeaconBlock]): string =
## Returns all slot numbers in ``data`` as placement map. ## Returns all slot numbers in ``data`` as placement map.
@ -207,8 +212,8 @@ proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
start, last: Slot, chunkSize: uint64, start, last: Slot, chunkSize: uint64,
updateCb: SyncUpdateCallback[T],
getFinalizedSlotCb: GetSlotCallback, getFinalizedSlotCb: GetSlotCallback,
outputQueue: AsyncQueue[SyncBlock],
queueSize: int = -1): SyncQueue[T] = queueSize: int = -1): SyncQueue[T] =
## Create new synchronization queue with parameters ## Create new synchronization queue with parameters
## ##
@ -267,14 +272,14 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
lastSlot: last, lastSlot: last,
chunkSize: chunkSize, chunkSize: chunkSize,
queueSize: queueSize, queueSize: queueSize,
syncUpdate: updateCb,
getFinalizedSlot: getFinalizedSlotCb, getFinalizedSlot: getFinalizedSlotCb,
waiters: newSeq[SyncWaiter[T]](), waiters: newSeq[SyncWaiter[T]](),
counter: 1'u64, counter: 1'u64,
pending: initTable[uint64, SyncRequest[T]](), pending: initTable[uint64, SyncRequest[T]](),
debtsQueue: initHeapQueue[SyncRequest[T]](), debtsQueue: initHeapQueue[SyncRequest[T]](),
inpSlot: start, inpSlot: start,
outSlot: start outSlot: start,
outQueue: outputQueue
) )
proc `<`*[T](a, b: SyncRequest[T]): bool {.inline.} = proc `<`*[T](a, b: SyncRequest[T]): bool {.inline.} =
@ -430,7 +435,19 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
if sq.outSlot != minSlot: if sq.outSlot != minSlot:
break break
let item = sq.readyQueue.pop() let item = sq.readyQueue.pop()
let res = sq.syncUpdate(item.request, item.data)
# Validating received blocks one by one
var res: Result[void, BlockError]
if len(item.data) > 0:
for blk in item.data:
debug "Pushing block", block_root = blk.root,
block_slot = blk.message.slot
res = await sq.validate(blk)
if not(res.isOk):
break
else:
res = Result[void, BlockError].ok()
if res.isOk: if res.isOk:
sq.outSlot = sq.outSlot + item.request.count sq.outSlot = sq.outSlot + item.request.count
sq.wakeupWaiters() sq.wakeupWaiters()
@ -570,7 +587,6 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalHeadSlotCb: GetSlotCallback, getLocalHeadSlotCb: GetSlotCallback,
getLocalWallSlotCb: GetSlotCallback, getLocalWallSlotCb: GetSlotCallback,
getFinalizedSlotCb: GetSlotCallback, getFinalizedSlotCb: GetSlotCallback,
updateLocalBlocksCb: UpdateLocalBlocksCallback,
maxWorkers = 10, maxWorkers = 10,
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4), maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
@ -578,33 +594,28 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
int(SECONDS_PER_SLOT)).seconds, int(SECONDS_PER_SLOT)).seconds,
chunkSize = uint64(SLOTS_PER_EPOCH), chunkSize = uint64(SLOTS_PER_EPOCH),
toleranceValue = uint64(1), toleranceValue = uint64(1),
maxRecurringFailures = 3 maxRecurringFailures = 3,
outputQueueSize = 1,
): SyncManager[A, B] = ): SyncManager[A, B] =
proc syncUpdate(req: SyncRequest[A], var outputQueue = newAsyncQueue[SyncBlock](outputQueueSize)
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.gcsafe.} =
let peer = req.item
let res = updateLocalBlocksCb(list)
if res.isOk:
peer.updateScore(PeerScoreGoodBlocks)
return res
let queue = SyncQueue.init(A, getFinalizedSlotCb(), getLocalWallSlotCb(), let queue = SyncQueue.init(A, getFinalizedSlotCb(), getLocalWallSlotCb(),
chunkSize, syncUpdate, getFinalizedSlotCb, 2) chunkSize, getFinalizedSlotCb, outputQueue, 1)
result = SyncManager[A, B]( result = SyncManager[A, B](
pool: pool, pool: pool,
maxWorkersCount: maxWorkers, maxWorkersCount: maxWorkers,
maxStatusAge: maxStatusAge, maxStatusAge: maxStatusAge,
getLocalHeadSlot: getLocalHeadSlotCb, getLocalHeadSlot: getLocalHeadSlotCb,
syncUpdate: syncUpdate,
getLocalWallSlot: getLocalWallSlotCb, getLocalWallSlot: getLocalWallSlotCb,
getFinalizedSlot: getFinalizedSlotCb, getFinalizedSlot: getFinalizedSlotCb,
maxHeadAge: maxHeadAge, maxHeadAge: maxHeadAge,
maxRecurringFailures: maxRecurringFailures, maxRecurringFailures: maxRecurringFailures,
sleepTime: sleepTime, sleepTime: sleepTime,
chunkSize: chunkSize, chunkSize: chunkSize,
queue: queue queue: queue,
outQueue: outputQueue
) )
proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
@ -809,7 +820,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
finally: finally:
man.pool.release(peer) man.pool.release(peer)
proc sync*[A, B](man: SyncManager[A, B]) {.async.} = proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
# This procedure manages main loop of SyncManager and in this loop it # This procedure manages main loop of SyncManager and in this loop it
# performs # performs
# 1. It checks for current sync status, "are we synced?". # 1. It checks for current sync status, "are we synced?".
@ -890,8 +901,8 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
wall_head_slot = wallSlot, local_head_slot = headSlot, wall_head_slot = wallSlot, local_head_slot = headSlot,
queue_last_slot = man.queue.lastSlot, topics = "syncman" queue_last_slot = man.queue.lastSlot, topics = "syncman"
man.queue = SyncQueue.init(A, man.getFinalizedSlot(), wallSlot, man.queue = SyncQueue.init(A, man.getFinalizedSlot(), wallSlot,
man.chunkSize, man.syncUpdate, man.chunkSize, man.getFinalizedSlot,
man.getFinalizedSlot, 2) man.outQueue, 1)
debug "Synchronization loop starting new worker", peer = peer, debug "Synchronization loop starting new worker", peer = peer,
wall_head_slot = wallSlot, local_head_slot = headSlot, wall_head_slot = wallSlot, local_head_slot = headSlot,
@ -966,3 +977,21 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
# Cleaning up failures. # Cleaning up failures.
man.failures.setLen(0) man.failures.setLen(0)
await man.queue.resetWait(none[Slot]()) await man.queue.resetWait(none[Slot]())
proc start*[A, B](man: SyncManager[A, B]) =
## Starts SyncManager's main loop.
man.syncFut = man.syncLoop()
proc getBlock*[A, B](man: SyncManager[A, B]): Future[SyncBlock] =
## Get the block that was received during synchronization.
man.outQueue.popFirst()
proc done*(blk: SyncBlock) =
## Send signal to SyncManager that the block ``blk`` has passed
## verification successfully.
blk.resfut.complete(Result[void, BlockError].ok())
proc fail*(blk: SyncBlock, error: BlockError) =
## Send signal to SyncManager that the block ``blk`` has NOT passed
## verification with specific ``error``.
blk.resfut.complete(Result[void, BlockError].err(error))

View File

@ -26,15 +26,11 @@ suite "SyncManager test suite":
item.message.slot = curslot item.message.slot = curslot
curslot = curslot + 1'u64 curslot = curslot + 1'u64
proc syncUpdate(req: SyncRequest[SomeTPeer],
data: openarray[SignedBeaconBlock]): Result[void, BlockError] {.
gcsafe.} =
discard
test "[SyncQueue] Start and finish slots equal": test "[SyncQueue] Start and finish slots equal":
let p1 = SomeTPeer() let p1 = SomeTPeer()
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64, syncUpdate, let aq = newAsyncQueue[SyncBlock](1)
getFirstSlotAtFinalizedEpoch) var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
check len(queue) == 1 check len(queue) == 1
var r11 = queue.pop(Slot(0), p1) var r11 = queue.pop(Slot(0), p1)
check len(queue) == 0 check len(queue) == 0
@ -49,8 +45,9 @@ suite "SyncManager test suite":
r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64 r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64
test "[SyncQueue] Two full requests success/fail": test "[SyncQueue] Two full requests success/fail":
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64, syncUpdate, let aq = newAsyncQueue[SyncBlock](1)
getFirstSlotAtFinalizedEpoch) var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer() let p1 = SomeTPeer()
let p2 = SomeTPeer() let p2 = SomeTPeer()
check len(queue) == 2 check len(queue) == 2
@ -77,8 +74,9 @@ suite "SyncManager test suite":
r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64 r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64
test "[SyncQueue] Full and incomplete success/fail start from zero": test "[SyncQueue] Full and incomplete success/fail start from zero":
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64, syncUpdate, let aq = newAsyncQueue[SyncBlock](1)
getFirstSlotAtFinalizedEpoch) var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer() let p1 = SomeTPeer()
let p2 = SomeTPeer() let p2 = SomeTPeer()
let p3 = SomeTPeer() let p3 = SomeTPeer()
@ -116,8 +114,9 @@ suite "SyncManager test suite":
r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64 r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64
test "[SyncQueue] Full and incomplete success/fail start from non-zero": test "[SyncQueue] Full and incomplete success/fail start from non-zero":
var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64, syncUpdate, let aq = newAsyncQueue[SyncBlock](1)
getFirstSlotAtFinalizedEpoch) var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer() let p1 = SomeTPeer()
let p2 = SomeTPeer() let p2 = SomeTPeer()
check len(queue) == 5 check len(queue) == 5
@ -144,8 +143,9 @@ suite "SyncManager test suite":
r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64 r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64
test "[SyncQueue] Smart and stupid success/fail": test "[SyncQueue] Smart and stupid success/fail":
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate, let aq = newAsyncQueue[SyncBlock](1)
getFirstSlotAtFinalizedEpoch) var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer() let p1 = SomeTPeer()
let p2 = SomeTPeer() let p2 = SomeTPeer()
check len(queue) == 5 check len(queue) == 5
@ -172,8 +172,9 @@ suite "SyncManager test suite":
r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64 r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64
test "[SyncQueue] One smart and one stupid + debt split + empty": test "[SyncQueue] One smart and one stupid + debt split + empty":
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate, let aq = newAsyncQueue[SyncBlock](1)
getFirstSlotAtFinalizedEpoch) var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer() let p1 = SomeTPeer()
let p2 = SomeTPeer() let p2 = SomeTPeer()
let p3 = SomeTPeer() let p3 = SomeTPeer()
@ -207,19 +208,21 @@ suite "SyncManager test suite":
proc test(): Future[bool] {.async.} = proc test(): Future[bool] {.async.} =
var counter = 0 var counter = 0
proc syncReceiver(req: SyncRequest[SomeTPeer], proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} =
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. while true:
gcsafe.} = let sblock = await aq.popFirst()
for item in list: if sblock.blk.message.slot == Slot(counter):
if item.message.slot == Slot(counter):
inc(counter) inc(counter)
else: else:
return err(Invalid) sblock.fail(BlockError.Invalid)
return ok() sblock.done()
var aq = newAsyncQueue[SyncBlock](1)
var chain = createChain(Slot(0), Slot(2)) var chain = createChain(Slot(0), Slot(2))
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64, var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64,
syncReceiver, getFirstSlotAtFinalizedEpoch, 1) getFirstSlotAtFinalizedEpoch, aq, 1)
var validatorFut = simpleValidator(aq)
let p1 = SomeTPeer() let p1 = SomeTPeer()
let p2 = SomeTPeer() let p2 = SomeTPeer()
let p3 = SomeTPeer() let p3 = SomeTPeer()
@ -227,14 +230,16 @@ suite "SyncManager test suite":
var r12 = queue.pop(Slot(2), p2) var r12 = queue.pop(Slot(2), p2)
var r13 = queue.pop(Slot(2), p3) var r13 = queue.pop(Slot(2), p3)
var f13 = queue.push(r13, @[chain[2]]) var f13 = queue.push(r13, @[chain[2]])
var f12 = queue.push(r12, @[chain[1]]) #
await sleepAsync(100.milliseconds) await sleepAsync(100.milliseconds)
doAssert(f12.finished == false) # doAssert(f12.finished == false)
doAssert(f13.finished == false) doAssert(f13.finished == false)
doAssert(counter == 0) doAssert(counter == 0)
var f11 = queue.push(r11, @[chain[0]]) var f11 = queue.push(r11, @[chain[0]])
await sleepAsync(100.milliseconds)
doAssert(counter == 1) doAssert(counter == 1)
doAssert(f11.finished == true and f11.failed == false) doAssert(f11.finished == true and f11.failed == false)
var f12 = queue.push(r12, @[chain[1]])
await sleepAsync(100.milliseconds) await sleepAsync(100.milliseconds)
doAssert(f12.finished == true and f12.failed == false) doAssert(f12.finished == true and f12.failed == false)
doAssert(f13.finished == true and f13.failed == false) doAssert(f13.finished == true and f13.failed == false)
@ -242,6 +247,8 @@ suite "SyncManager test suite":
doAssert(r11.item == p1) doAssert(r11.item == p1)
doAssert(r12.item == p2) doAssert(r12.item == p2)
doAssert(r13.item == p3) doAssert(r13.item == p3)
await validatorFut.cancelAndWait()
result = true result = true
check waitFor(test()) check waitFor(test())
@ -250,24 +257,27 @@ suite "SyncManager test suite":
proc test(): Future[bool] {.async.} = proc test(): Future[bool] {.async.} =
var counter = 5 var counter = 5
proc syncReceiver(req: SyncRequest[SomeTPeer], proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} =
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. while true:
gcsafe.} = let sblock = await aq.popFirst()
for item in list: if sblock.blk.message.slot == Slot(counter):
if item.message.slot == Slot(counter):
inc(counter) inc(counter)
else: else:
return err(Invalid) sblock.fail(BlockError.Invalid)
return ok() sblock.done()
var aq = newAsyncQueue[SyncBlock](1)
var chain = createChain(Slot(5), Slot(11)) var chain = createChain(Slot(5), Slot(11))
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64, var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64,
syncReceiver, getFirstSlotAtFinalizedEpoch, 2) getFirstSlotAtFinalizedEpoch, aq, 2)
let p1 = SomeTPeer() let p1 = SomeTPeer()
let p2 = SomeTPeer() let p2 = SomeTPeer()
let p3 = SomeTPeer() let p3 = SomeTPeer()
let p4 = SomeTPeer() let p4 = SomeTPeer()
var validatorFut = simpleValidator(aq)
var r21 = queue.pop(Slot(11), p1) var r21 = queue.pop(Slot(11), p1)
var r22 = queue.pop(Slot(11), p2) var r22 = queue.pop(Slot(11), p2)
var r23 = queue.pop(Slot(11), p3) var r23 = queue.pop(Slot(11), p3)
@ -279,19 +289,21 @@ suite "SyncManager test suite":
doAssert(f22.finished == true and f22.failed == false) doAssert(f22.finished == true and f22.failed == false)
doAssert(counter == 5) doAssert(counter == 5)
var f21 = queue.push(r21, @[chain[0], chain[1]]) var f21 = queue.push(r21, @[chain[0], chain[1]])
doAssert(f21.finished == true and f21.failed == false)
await sleepAsync(100.milliseconds) await sleepAsync(100.milliseconds)
doAssert(f21.finished == true and f21.failed == false)
doAssert(f24.finished == true and f24.failed == false) doAssert(f24.finished == true and f24.failed == false)
doAssert(counter == 9) doAssert(counter == 9)
var f23 = queue.push(r23, @[chain[4], chain[5]]) var f23 = queue.push(r23, @[chain[4], chain[5]])
await sleepAsync(100.milliseconds)
doAssert(f23.finished == true and f23.failed == false) doAssert(f23.finished == true and f23.failed == false)
doAssert(counter == 12) doAssert(counter == 12)
await sleepAsync(100.milliseconds)
doAssert(counter == 12) doAssert(counter == 12)
doAssert(r21.item == p1) doAssert(r21.item == p1)
doAssert(r22.item == p2) doAssert(r22.item == p2)
doAssert(r23.item == p3) doAssert(r23.item == p3)
doAssert(r24.item == p4) doAssert(r24.item == p4)
await validatorFut.cancelAndWait()
result = true result = true
check waitFor(test()) check waitFor(test())
@ -300,19 +312,19 @@ suite "SyncManager test suite":
proc test(): Future[bool] {.async.} = proc test(): Future[bool] {.async.} =
var counter = 5 var counter = 5
proc syncReceiver(req: SyncRequest[SomeTPeer], proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} =
list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. while true:
gcsafe.} = let sblock = await aq.popFirst()
for item in list: if sblock.blk.message.slot == Slot(counter):
if item.message.slot == Slot(counter):
inc(counter) inc(counter)
else: else:
return err(Invalid) sblock.fail(BlockError.Invalid)
return ok() sblock.done()
var aq = newAsyncQueue[SyncBlock](1)
var chain = createChain(Slot(5), Slot(18)) var chain = createChain(Slot(5), Slot(18))
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64, var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64,
syncReceiver, getFirstSlotAtFinalizedEpoch, 2) getFirstSlotAtFinalizedEpoch, aq, 2)
let p1 = SomeTPeer() let p1 = SomeTPeer()
let p2 = SomeTPeer() let p2 = SomeTPeer()
let p3 = SomeTPeer() let p3 = SomeTPeer()
@ -321,6 +333,8 @@ suite "SyncManager test suite":
let p6 = SomeTPeer() let p6 = SomeTPeer()
let p7 = SomeTPeer() let p7 = SomeTPeer()
var validatorFut = simpleValidator(aq)
var r21 = queue.pop(Slot(20), p1) var r21 = queue.pop(Slot(20), p1)
var r22 = queue.pop(Slot(20), p2) var r22 = queue.pop(Slot(20), p2)
var r23 = queue.pop(Slot(20), p3) var r23 = queue.pop(Slot(20), p3)
@ -337,11 +351,13 @@ suite "SyncManager test suite":
var f26 = queue.push(r26, @[chain[10], chain[11]]) var f26 = queue.push(r26, @[chain[10], chain[11]])
var f27 = queue.push(r27, @[chain[12], chain[13]]) var f27 = queue.push(r27, @[chain[12], chain[13]])
await sleepAsync(100.milliseconds)
doAssert(f21.finished == true and f21.failed == false) doAssert(f21.finished == true and f21.failed == false)
doAssert(e21.finished == true and e21.failed == false) doAssert(e21.finished == true and e21.failed == false)
doAssert(f26.finished == false) doAssert(f26.finished == false)
doAssert(f27.finished == false) doAssert(f27.finished == false)
await queue.resetWait(none[Slot]()) await queue.resetWait(none[Slot]())
await sleepAsync(100.milliseconds)
doAssert(f26.finished == true and f26.failed == false) doAssert(f26.finished == true and f26.failed == false)
doAssert(f27.finished == true and f27.failed == false) doAssert(f27.finished == true and f27.failed == false)
doAssert(queue.inpSlot == Slot(7) and queue.outSlot == Slot(7)) doAssert(queue.inpSlot == Slot(7) and queue.outSlot == Slot(7))
@ -355,12 +371,15 @@ suite "SyncManager test suite":
var o25 = queue.push(r25, @[chain[8], chain[9]]) var o25 = queue.push(r25, @[chain[8], chain[9]])
var o26 = queue.push(r26, @[chain[10], chain[11]]) var o26 = queue.push(r26, @[chain[10], chain[11]])
var o27 = queue.push(r27, @[chain[12], chain[13]]) var o27 = queue.push(r27, @[chain[12], chain[13]])
await sleepAsync(100.milliseconds)
doAssert(o21.finished == true and o21.failed == false) doAssert(o21.finished == true and o21.failed == false)
doAssert(o22.finished == true and o22.failed == false) doAssert(o22.finished == true and o22.failed == false)
doAssert(o25.finished == true and o25.failed == false) doAssert(o25.finished == true and o25.failed == false)
doAssert(o26.finished == true and o26.failed == false) doAssert(o26.finished == true and o26.failed == false)
doAssert(o27.finished == true and o27.failed == false) doAssert(o27.finished == true and o27.failed == false)
doAssert(len(queue) == 12) doAssert(len(queue) == 12)
await validatorFut.cancelAndWait()
result = true result = true
check waitFor(test()) check waitFor(test())