From 55fcece0b2321a0bbc567b8d4813f2acc52f0034 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Mon, 10 Aug 2020 10:15:50 +0300 Subject: [PATCH] SyncManager fix to process blocks one by one. (#1464) * Allow sync manager process blocks one by one. * Log storeBlock() and updateHead() duration. * Calculate duration only for blocks added without any error. * Fix float compilation error. * Fix duration. * Fix SyncQueue tests. --- beacon_chain/beacon_node.nim | 69 ++++++++++----------- beacon_chain/sync_manager.nim | 87 ++++++++++++++++++--------- tests/test_sync_manager.nim | 109 ++++++++++++++++++++-------------- 3 files changed, 155 insertions(+), 110 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 62026499d..0384ebd30 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -564,34 +564,6 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} = let fepoch = node.chainDag.headState.data.data.finalized_checkpoint.epoch compute_start_slot_at_epoch(fepoch) - proc updateLocalBlocks(list: openArray[SignedBeaconBlock]): Result[void, BlockError] = - debug "Forward sync imported blocks", count = len(list), - local_head_slot = getLocalHeadSlot() - let sm = now(chronos.Moment) - for blk in list: - let res = node.storeBlock(blk) - # We going to ignore `BlockError.Unviable` errors because we have working - # backward sync and it can happens that we can perform overlapping - # requests. - # For the same reason we ignore Duplicate blocks as if they are duplicate - # from before the current finalized epoch, we can drop them - # (and they may have no parents anymore in the fork choice if it was pruned) - if res.isErr and res.error notin {BlockError.Unviable, BlockError.Old, BLockError.Duplicate}: - return res - discard node.updateHead(node.beaconClock.now().slotOrZero) - - let dur = now(chronos.Moment) - sm - let secs = float(chronos.seconds(1).nanoseconds) - var storeSpeed = 0.0 - if not(dur.isZero()): - let v = float(len(list)) * (secs / float(dur.nanoseconds)) - # We doing round manually because stdlib.round is deprecated - storeSpeed = round(v * 10000) / 10000 - - info "Forward sync blocks got imported successfully", count = len(list), - local_head_slot = getLocalHeadSlot(), store_speed = storeSpeed - ok() - proc scoreCheck(peer: Peer): bool = if peer.score < PeerScoreLowLimit: try: @@ -608,16 +580,41 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} = node.syncManager = newSyncManager[Peer, PeerID]( node.network.peerPool, getLocalHeadSlot, getLocalWallSlot, - getFirstSlotAtFinalizedEpoch, updateLocalBlocks, - # 4 blocks per chunk is the optimal value right now, because our current - # syncing speed is around 4 blocks per second. So there no need to request - # more then 4 blocks right now. As soon as `store_speed` value become - # significantly more then 4 blocks per second you can increase this - # value appropriately. - chunkSize = 4 + getFirstSlotAtFinalizedEpoch, chunkSize = 32 ) - await node.syncManager.sync() + node.syncManager.start() + + while true: + let sblock = await node.syncManager.getBlock() + let sm1 = now(chronos.Moment) + let res = node.storeBlock(sblock.blk) + let em1 = now(chronos.Moment) + if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}): + let sm2 = now(chronos.Moment) + discard node.updateHead(node.beaconClock.now().slotOrZero) + let em2 = now(chronos.Moment) + sblock.done() + let duration1 = if res.isOk(): em1 - sm1 else: ZeroDuration + let duration2 = if res.isOk(): em2 - sm2 else: ZeroDuration + let duration = if res.isOk(): em2 - sm1 else: ZeroDuration + let storeSpeed = + block: + let secs = float(chronos.seconds(1).nanoseconds) + if not(duration.isZero()): + let v = secs / float(duration.nanoseconds) + round(v * 10_000) / 10_000 + else: + 0.0 + debug "Block got imported successfully", + local_head_slot = getLocalHeadSlot(), store_speed = storeSpeed, + block_root = shortLog(sblock.blk.root), + block_slot = sblock.blk.message.slot, + store_block_duration = $duration1, + update_head_duration = $duration2, + store_duration = $duration + else: + sblock.fail(res.error) proc currentSlot(node: BeaconNode): Slot = node.beaconClock.now.slotOrZero diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index aa1d9b6b7..a59f5078d 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -39,14 +39,9 @@ type GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} - UpdateLocalBlocksCallback* = - proc(list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. - gcsafe.} - - SyncUpdateCallback*[T] = - proc(req: SyncRequest[T], - list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. - gcsafe.} + SyncBlock* = object + blk*: SignedBeaconBlock + resfut*: Future[Result[void, BlockError]] SyncRequest*[T] = object index*: uint64 @@ -73,12 +68,12 @@ type counter*: uint64 pending*: Table[uint64, SyncRequest[T]] waiters: seq[SyncWaiter[T]] - syncUpdate*: SyncUpdateCallback[T] getFinalizedSlot*: GetSlotCallback debtsQueue: HeapQueue[SyncRequest[T]] debtsCount: uint64 readyQueue: HeapQueue[SyncResult[T]] suspects: seq[SyncResult[T]] + outQueue: AsyncQueue[SyncBlock] SyncManager*[A, B] = ref object pool: PeerPool[A, B] @@ -92,10 +87,11 @@ type getLocalHeadSlot: GetSlotCallback getLocalWallSlot: GetSlotCallback getFinalizedSlot: GetSlotCallback - syncUpdate: SyncUpdateCallback[A] chunkSize: uint64 queue: SyncQueue[A] failures: seq[SyncFailure[A]] + syncFut: Future[void] + outQueue: AsyncQueue[SyncBlock] inProgress*: bool SyncMoment* = object @@ -110,6 +106,15 @@ type SyncManagerError* = object of CatchableError BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]] +proc validate*[T](sq: SyncQueue[T], + blk: SignedBeaconBlock): Future[Result[void, BlockError]] {.async.} = + let sblock = SyncBlock( + blk: blk, + resfut: newFuture[Result[void, BlockError]]("sync.manager.validate") + ) + await sq.outQueue.addLast(sblock) + return await sblock.resfut + proc getShortMap*[T](req: SyncRequest[T], data: openarray[SignedBeaconBlock]): string = ## Returns all slot numbers in ``data`` as placement map. @@ -207,8 +212,8 @@ proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} = proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], start, last: Slot, chunkSize: uint64, - updateCb: SyncUpdateCallback[T], getFinalizedSlotCb: GetSlotCallback, + outputQueue: AsyncQueue[SyncBlock], queueSize: int = -1): SyncQueue[T] = ## Create new synchronization queue with parameters ## @@ -267,14 +272,14 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], lastSlot: last, chunkSize: chunkSize, queueSize: queueSize, - syncUpdate: updateCb, getFinalizedSlot: getFinalizedSlotCb, waiters: newSeq[SyncWaiter[T]](), counter: 1'u64, pending: initTable[uint64, SyncRequest[T]](), debtsQueue: initHeapQueue[SyncRequest[T]](), inpSlot: start, - outSlot: start + outSlot: start, + outQueue: outputQueue ) proc `<`*[T](a, b: SyncRequest[T]): bool {.inline.} = @@ -430,7 +435,19 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], if sq.outSlot != minSlot: break let item = sq.readyQueue.pop() - let res = sq.syncUpdate(item.request, item.data) + + # Validating received blocks one by one + var res: Result[void, BlockError] + if len(item.data) > 0: + for blk in item.data: + debug "Pushing block", block_root = blk.root, + block_slot = blk.message.slot + res = await sq.validate(blk) + if not(res.isOk): + break + else: + res = Result[void, BlockError].ok() + if res.isOk: sq.outSlot = sq.outSlot + item.request.count sq.wakeupWaiters() @@ -570,7 +587,6 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], getLocalHeadSlotCb: GetSlotCallback, getLocalWallSlotCb: GetSlotCallback, getFinalizedSlotCb: GetSlotCallback, - updateLocalBlocksCb: UpdateLocalBlocksCallback, maxWorkers = 10, maxStatusAge = uint64(SLOTS_PER_EPOCH * 4), maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), @@ -578,33 +594,28 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], int(SECONDS_PER_SLOT)).seconds, chunkSize = uint64(SLOTS_PER_EPOCH), toleranceValue = uint64(1), - maxRecurringFailures = 3 + maxRecurringFailures = 3, + outputQueueSize = 1, ): SyncManager[A, B] = - proc syncUpdate(req: SyncRequest[A], - list: openarray[SignedBeaconBlock]): Result[void, BlockError] {.gcsafe.} = - let peer = req.item - let res = updateLocalBlocksCb(list) - if res.isOk: - peer.updateScore(PeerScoreGoodBlocks) - return res + var outputQueue = newAsyncQueue[SyncBlock](outputQueueSize) let queue = SyncQueue.init(A, getFinalizedSlotCb(), getLocalWallSlotCb(), - chunkSize, syncUpdate, getFinalizedSlotCb, 2) + chunkSize, getFinalizedSlotCb, outputQueue, 1) result = SyncManager[A, B]( pool: pool, maxWorkersCount: maxWorkers, maxStatusAge: maxStatusAge, getLocalHeadSlot: getLocalHeadSlotCb, - syncUpdate: syncUpdate, getLocalWallSlot: getLocalWallSlotCb, getFinalizedSlot: getFinalizedSlotCb, maxHeadAge: maxHeadAge, maxRecurringFailures: maxRecurringFailures, sleepTime: sleepTime, chunkSize: chunkSize, - queue: queue + queue: queue, + outQueue: outputQueue ) proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, @@ -809,7 +820,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B], finally: man.pool.release(peer) -proc sync*[A, B](man: SyncManager[A, B]) {.async.} = +proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = # This procedure manages main loop of SyncManager and in this loop it # performs # 1. It checks for current sync status, "are we synced?". @@ -890,8 +901,8 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} = wall_head_slot = wallSlot, local_head_slot = headSlot, queue_last_slot = man.queue.lastSlot, topics = "syncman" man.queue = SyncQueue.init(A, man.getFinalizedSlot(), wallSlot, - man.chunkSize, man.syncUpdate, - man.getFinalizedSlot, 2) + man.chunkSize, man.getFinalizedSlot, + man.outQueue, 1) debug "Synchronization loop starting new worker", peer = peer, wall_head_slot = wallSlot, local_head_slot = headSlot, @@ -966,3 +977,21 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} = # Cleaning up failures. man.failures.setLen(0) await man.queue.resetWait(none[Slot]()) + +proc start*[A, B](man: SyncManager[A, B]) = + ## Starts SyncManager's main loop. + man.syncFut = man.syncLoop() + +proc getBlock*[A, B](man: SyncManager[A, B]): Future[SyncBlock] = + ## Get the block that was received during synchronization. + man.outQueue.popFirst() + +proc done*(blk: SyncBlock) = + ## Send signal to SyncManager that the block ``blk`` has passed + ## verification successfully. + blk.resfut.complete(Result[void, BlockError].ok()) + +proc fail*(blk: SyncBlock, error: BlockError) = + ## Send signal to SyncManager that the block ``blk`` has NOT passed + ## verification with specific ``error``. + blk.resfut.complete(Result[void, BlockError].err(error)) diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 4e29aa37b..e060a5d7e 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -26,15 +26,11 @@ suite "SyncManager test suite": item.message.slot = curslot curslot = curslot + 1'u64 - proc syncUpdate(req: SyncRequest[SomeTPeer], - data: openarray[SignedBeaconBlock]): Result[void, BlockError] {. - gcsafe.} = - discard - test "[SyncQueue] Start and finish slots equal": let p1 = SomeTPeer() - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64, syncUpdate, - getFirstSlotAtFinalizedEpoch) + let aq = newAsyncQueue[SyncBlock](1) + var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64, + getFirstSlotAtFinalizedEpoch, aq) check len(queue) == 1 var r11 = queue.pop(Slot(0), p1) check len(queue) == 0 @@ -49,8 +45,9 @@ suite "SyncManager test suite": r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64 test "[SyncQueue] Two full requests success/fail": - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64, syncUpdate, - getFirstSlotAtFinalizedEpoch) + let aq = newAsyncQueue[SyncBlock](1) + var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64, + getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() let p2 = SomeTPeer() check len(queue) == 2 @@ -77,8 +74,9 @@ suite "SyncManager test suite": r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64 test "[SyncQueue] Full and incomplete success/fail start from zero": - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64, syncUpdate, - getFirstSlotAtFinalizedEpoch) + let aq = newAsyncQueue[SyncBlock](1) + var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64, + getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() let p2 = SomeTPeer() let p3 = SomeTPeer() @@ -116,8 +114,9 @@ suite "SyncManager test suite": r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64 test "[SyncQueue] Full and incomplete success/fail start from non-zero": - var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64, syncUpdate, - getFirstSlotAtFinalizedEpoch) + let aq = newAsyncQueue[SyncBlock](1) + var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64, + getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() let p2 = SomeTPeer() check len(queue) == 5 @@ -144,8 +143,9 @@ suite "SyncManager test suite": r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64 test "[SyncQueue] Smart and stupid success/fail": - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate, - getFirstSlotAtFinalizedEpoch) + let aq = newAsyncQueue[SyncBlock](1) + var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, + getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() let p2 = SomeTPeer() check len(queue) == 5 @@ -172,8 +172,9 @@ suite "SyncManager test suite": r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64 test "[SyncQueue] One smart and one stupid + debt split + empty": - var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate, - getFirstSlotAtFinalizedEpoch) + let aq = newAsyncQueue[SyncBlock](1) + var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, + getFirstSlotAtFinalizedEpoch, aq) let p1 = SomeTPeer() let p2 = SomeTPeer() let p3 = SomeTPeer() @@ -207,19 +208,21 @@ suite "SyncManager test suite": proc test(): Future[bool] {.async.} = var counter = 0 - proc syncReceiver(req: SyncRequest[SomeTPeer], - list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. - gcsafe.} = - for item in list: - if item.message.slot == Slot(counter): + proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blk.message.slot == Slot(counter): inc(counter) else: - return err(Invalid) - return ok() + sblock.fail(BlockError.Invalid) + sblock.done() + var aq = newAsyncQueue[SyncBlock](1) var chain = createChain(Slot(0), Slot(2)) var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64, - syncReceiver, getFirstSlotAtFinalizedEpoch, 1) + getFirstSlotAtFinalizedEpoch, aq, 1) + + var validatorFut = simpleValidator(aq) let p1 = SomeTPeer() let p2 = SomeTPeer() let p3 = SomeTPeer() @@ -227,14 +230,16 @@ suite "SyncManager test suite": var r12 = queue.pop(Slot(2), p2) var r13 = queue.pop(Slot(2), p3) var f13 = queue.push(r13, @[chain[2]]) - var f12 = queue.push(r12, @[chain[1]]) + # await sleepAsync(100.milliseconds) - doAssert(f12.finished == false) + # doAssert(f12.finished == false) doAssert(f13.finished == false) doAssert(counter == 0) var f11 = queue.push(r11, @[chain[0]]) + await sleepAsync(100.milliseconds) doAssert(counter == 1) doAssert(f11.finished == true and f11.failed == false) + var f12 = queue.push(r12, @[chain[1]]) await sleepAsync(100.milliseconds) doAssert(f12.finished == true and f12.failed == false) doAssert(f13.finished == true and f13.failed == false) @@ -242,6 +247,8 @@ suite "SyncManager test suite": doAssert(r11.item == p1) doAssert(r12.item == p2) doAssert(r13.item == p3) + + await validatorFut.cancelAndWait() result = true check waitFor(test()) @@ -250,24 +257,27 @@ suite "SyncManager test suite": proc test(): Future[bool] {.async.} = var counter = 5 - proc syncReceiver(req: SyncRequest[SomeTPeer], - list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. - gcsafe.} = - for item in list: - if item.message.slot == Slot(counter): + proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blk.message.slot == Slot(counter): inc(counter) else: - return err(Invalid) - return ok() + sblock.fail(BlockError.Invalid) + sblock.done() + var aq = newAsyncQueue[SyncBlock](1) var chain = createChain(Slot(5), Slot(11)) var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64, - syncReceiver, getFirstSlotAtFinalizedEpoch, 2) + getFirstSlotAtFinalizedEpoch, aq, 2) + let p1 = SomeTPeer() let p2 = SomeTPeer() let p3 = SomeTPeer() let p4 = SomeTPeer() + var validatorFut = simpleValidator(aq) + var r21 = queue.pop(Slot(11), p1) var r22 = queue.pop(Slot(11), p2) var r23 = queue.pop(Slot(11), p3) @@ -279,19 +289,21 @@ suite "SyncManager test suite": doAssert(f22.finished == true and f22.failed == false) doAssert(counter == 5) var f21 = queue.push(r21, @[chain[0], chain[1]]) - doAssert(f21.finished == true and f21.failed == false) await sleepAsync(100.milliseconds) + doAssert(f21.finished == true and f21.failed == false) doAssert(f24.finished == true and f24.failed == false) doAssert(counter == 9) var f23 = queue.push(r23, @[chain[4], chain[5]]) + await sleepAsync(100.milliseconds) doAssert(f23.finished == true and f23.failed == false) doAssert(counter == 12) - await sleepAsync(100.milliseconds) doAssert(counter == 12) doAssert(r21.item == p1) doAssert(r22.item == p2) doAssert(r23.item == p3) doAssert(r24.item == p4) + + await validatorFut.cancelAndWait() result = true check waitFor(test()) @@ -300,19 +312,19 @@ suite "SyncManager test suite": proc test(): Future[bool] {.async.} = var counter = 5 - proc syncReceiver(req: SyncRequest[SomeTPeer], - list: openarray[SignedBeaconBlock]): Result[void, BlockError] {. - gcsafe.} = - for item in list: - if item.message.slot == Slot(counter): + proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} = + while true: + let sblock = await aq.popFirst() + if sblock.blk.message.slot == Slot(counter): inc(counter) else: - return err(Invalid) - return ok() + sblock.fail(BlockError.Invalid) + sblock.done() + var aq = newAsyncQueue[SyncBlock](1) var chain = createChain(Slot(5), Slot(18)) var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64, - syncReceiver, getFirstSlotAtFinalizedEpoch, 2) + getFirstSlotAtFinalizedEpoch, aq, 2) let p1 = SomeTPeer() let p2 = SomeTPeer() let p3 = SomeTPeer() @@ -321,6 +333,8 @@ suite "SyncManager test suite": let p6 = SomeTPeer() let p7 = SomeTPeer() + var validatorFut = simpleValidator(aq) + var r21 = queue.pop(Slot(20), p1) var r22 = queue.pop(Slot(20), p2) var r23 = queue.pop(Slot(20), p3) @@ -337,11 +351,13 @@ suite "SyncManager test suite": var f26 = queue.push(r26, @[chain[10], chain[11]]) var f27 = queue.push(r27, @[chain[12], chain[13]]) + await sleepAsync(100.milliseconds) doAssert(f21.finished == true and f21.failed == false) doAssert(e21.finished == true and e21.failed == false) doAssert(f26.finished == false) doAssert(f27.finished == false) await queue.resetWait(none[Slot]()) + await sleepAsync(100.milliseconds) doAssert(f26.finished == true and f26.failed == false) doAssert(f27.finished == true and f27.failed == false) doAssert(queue.inpSlot == Slot(7) and queue.outSlot == Slot(7)) @@ -355,12 +371,15 @@ suite "SyncManager test suite": var o25 = queue.push(r25, @[chain[8], chain[9]]) var o26 = queue.push(r26, @[chain[10], chain[11]]) var o27 = queue.push(r27, @[chain[12], chain[13]]) + await sleepAsync(100.milliseconds) doAssert(o21.finished == true and o21.failed == false) doAssert(o22.finished == true and o22.failed == false) doAssert(o25.finished == true and o25.failed == false) doAssert(o26.finished == true and o26.failed == false) doAssert(o27.finished == true and o27.failed == false) doAssert(len(queue) == 12) + + await validatorFut.cancelAndWait() result = true check waitFor(test())