From 711f1f88ee5add73c1fcb6a689cb4589977564e9 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 12 Aug 2020 12:29:11 +0300 Subject: [PATCH] Use one single async queue and loop for processing blocks. (#1487) * Initial commit * Fix compilation problem. * Address review comments. --- beacon_chain/beacon_node.nim | 88 +++++++++++++++-------------- beacon_chain/beacon_node_common.nim | 5 +- beacon_chain/request_manager.nim | 58 ++++++++++++------- beacon_chain/sync_manager.nim | 21 ++++--- tests/test_peer_pool.nim | 20 +++---- 5 files changed, 107 insertions(+), 85 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 853fcb136..14af7bc66 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -284,13 +284,9 @@ proc init*(T: type BeaconNode, forkDigest: enrForkId.forkDigest, topicBeaconBlocks: topicBeaconBlocks, topicAggregateAndProofs: topicAggregateAndProofs, + blocksQueue: newAsyncQueue[SyncBlock](1), ) - - res.requestManager = RequestManager.init(network, - proc(signedBlock: SignedBeaconBlock) = - onBeaconBlock(res, signedBlock) - ) - + res.requestManager = RequestManager.init(network, res.blocksQueue) res.addLocalValidators() # This merely configures the BeaconSync @@ -560,14 +556,45 @@ proc runOnSecondLoop(node: BeaconNode) {.async.} = ticks_delay.set(sleepTime.nanoseconds.float / nanosecondsIn1s) debug "onSecond task completed", sleepTime, processingTime -proc runForwardSyncLoop(node: BeaconNode) {.async.} = +proc importBlock(node: BeaconNode, + sblock: SignedBeaconBlock): Result[void, BlockError] = + let sm1 = now(chronos.Moment) + let res = node.storeBlock(sblock) + let em1 = now(chronos.Moment) + if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}): + let sm2 = now(chronos.Moment) + discard node.updateHead(node.beaconClock.now().slotOrZero) + let em2 = now(chronos.Moment) + let storeBlockDuration = if res.isOk(): em1 - sm1 else: ZeroDuration + let updateHeadDuration = if res.isOk(): em2 - sm2 else: ZeroDuration + let overallDuration = if res.isOk(): em2 - sm1 else: ZeroDuration + let storeSpeed = + block: + let secs = float(chronos.seconds(1).nanoseconds) + if not(overallDuration.isZero()): + let v = secs / float(overallDuration.nanoseconds) + round(v * 10_000) / 10_000 + else: + 0.0 + debug "Block got imported successfully", + local_head_slot = node.chainDag.head.slot, store_speed = storeSpeed, + block_root = shortLog(sblock.root), + block_slot = sblock.message.slot, + store_block_duration = $storeBlockDuration, + update_head_duration = $updateHeadDuration, + overall_duration = $overallDuration + ok() + else: + err(res.error()) + +proc startSyncManager(node: BeaconNode) = func getLocalHeadSlot(): Slot = - result = node.chainDag.head.slot + node.chainDag.head.slot proc getLocalWallSlot(): Slot {.gcsafe.} = let epoch = node.beaconClock.now().slotOrZero.compute_epoch_at_slot() + 1'u64 - result = epoch.compute_start_slot_at_epoch() + epoch.compute_start_slot_at_epoch() func getFirstSlotAtFinalizedEpoch(): Slot {.gcsafe.} = let fepoch = node.chainDag.headState.data.data.finalized_checkpoint.epoch @@ -581,49 +608,23 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} = score_high_limit = PeerScoreHighLimit except: discard - result = false + false else: - result = true + true node.network.peerPool.setScoreCheck(scoreCheck) node.syncManager = newSyncManager[Peer, PeerID]( node.network.peerPool, getLocalHeadSlot, getLocalWallSlot, - getFirstSlotAtFinalizedEpoch, chunkSize = 32 + getFirstSlotAtFinalizedEpoch, node.blocksQueue, chunkSize = 32 ) - node.syncManager.start() +proc runBlockProcessingLoop(node: BeaconNode) {.async.} = + ## Incoming blocks processing loop. while true: - let sblock = await node.syncManager.getBlock() - let sm1 = now(chronos.Moment) - let res = node.storeBlock(sblock.blk) - let em1 = now(chronos.Moment) - if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}): - let sm2 = now(chronos.Moment) - discard node.updateHead(node.beaconClock.now().slotOrZero) - let em2 = now(chronos.Moment) - sblock.done() - let duration1 = if res.isOk(): em1 - sm1 else: ZeroDuration - let duration2 = if res.isOk(): em2 - sm2 else: ZeroDuration - let duration = if res.isOk(): em2 - sm1 else: ZeroDuration - let storeSpeed = - block: - let secs = float(chronos.seconds(1).nanoseconds) - if not(duration.isZero()): - let v = secs / float(duration.nanoseconds) - round(v * 10_000) / 10_000 - else: - 0.0 - debug "Block got imported successfully", - local_head_slot = getLocalHeadSlot(), store_speed = storeSpeed, - block_root = shortLog(sblock.blk.root), - block_slot = sblock.blk.message.slot, - store_block_duration = $duration1, - update_head_duration = $duration2, - store_duration = $duration - else: - sblock.fail(res.error) + let sblock = await node.blocksQueue.popFirst() + sblock.complete(node.importBlock(sblock.blk)) proc currentSlot(node: BeaconNode): Slot = node.beaconClock.now.slotOrZero @@ -866,9 +867,10 @@ proc run*(node: BeaconNode) = asyncCheck node.onSlotStart(curSlot, nextSlot) node.onSecondLoop = runOnSecondLoop(node) - node.forwardSyncLoop = runForwardSyncLoop(node) + node.blockProcessingLoop = runBlockProcessingLoop(node) node.requestManager.start() + node.startSyncManager() # main event loop while status == BeaconNodeStatus.Running: diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index 040d225fe..522aad0a2 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -36,7 +36,6 @@ type graffitiBytes*: GraffitiBytes network*: Eth2Node netKeys*: KeyPair - requestManager*: RequestManager db*: BeaconChainDB config*: BeaconNodeConf attachedValidators*: ValidatorPool @@ -47,10 +46,12 @@ type beaconClock*: BeaconClock rpcServer*: RpcServer forkDigest*: ForkDigest + blocksQueue*: AsyncQueue[SyncBlock] + requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerID] topicBeaconBlocks*: string topicAggregateAndProofs*: string - forwardSyncLoop*: Future[void] + blockProcessingLoop*: Future[void] onSecondLoop*: Future[void] genesisSnapshotContent*: string diff --git a/beacon_chain/request_manager.nim b/beacon_chain/request_manager.nim index 0e6471726..f908757fc 100644 --- a/beacon_chain/request_manager.nim +++ b/beacon_chain/request_manager.nim @@ -2,25 +2,25 @@ import options, sequtils, strutils import chronos, chronicles import spec/[datatypes, digest], eth2_network, beacon_node_types, sync_protocol, sync_manager, ssz/merkleization +export sync_manager logScope: topics = "requman" const - SYNC_MAX_REQUESTED_BLOCKS* = 4 # Spec allows up to MAX_REQUEST_BLOCKS. - ## Maximum number of blocks which will be requested in each `beaconBlocksByRoot` invocation. + SYNC_MAX_REQUESTED_BLOCKS* = 32 # Spec allows up to MAX_REQUEST_BLOCKS. + ## Maximum number of blocks which will be requested in each + ## `beaconBlocksByRoot` invocation. PARALLEL_REQUESTS* = 2 ## Number of peers we using to resolve our request. type RequestManager* = object network*: Eth2Node - queue*: AsyncQueue[FetchRecord] - responseHandler*: FetchAncestorsResponseHandler + inpQueue*: AsyncQueue[FetchRecord] + outQueue*: AsyncQueue[SyncBlock] loopFuture: Future[void] - FetchAncestorsResponseHandler = proc (b: SignedBeaconBlock) {.gcsafe.} - func shortLog*(x: seq[Eth2Digest]): string = "[" & x.mapIt(shortLog(it)).join(", ") & "]" @@ -28,10 +28,11 @@ func shortLog*(x: seq[FetchRecord]): string = "[" & x.mapIt(shortLog(it.root)).join(", ") & "]" proc init*(T: type RequestManager, network: Eth2Node, - responseCb: FetchAncestorsResponseHandler): T = - T( - network: network, queue: newAsyncQueue[FetchRecord](), - responseHandler: responseCb + outputQueue: AsyncQueue[SyncBlock]): RequestManager = + RequestManager( + network: network, + inpQueue: newAsyncQueue[FetchRecord](), + outQueue: outputQueue ) proc checkResponse(roots: openArray[Eth2Digest], @@ -48,6 +49,15 @@ proc checkResponse(roots: openArray[Eth2Digest], checks.del(res) return true +proc validate(rman: RequestManager, + b: SignedBeaconBlock): Future[Result[void, BlockError]] {.async.} = + let sblock = SyncBlock( + blk: b, + resfut: newFuture[Result[void, BlockError]]("request.manager.validate") + ) + await rman.outQueue.addLast(sblock) + return await sblock.resfut + proc fetchAncestorBlocksFromNetwork(rman: RequestManager, items: seq[Eth2Digest]) {.async.} = var peer: Peer @@ -60,9 +70,19 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager, if blocks.isOk: let ublocks = blocks.get() if checkResponse(items, ublocks): - for b in ublocks: - rman.responseHandler(b) - peer.updateScore(PeerScoreGoodBlocks) + var res: Result[void, BlockError] + if len(ublocks) > 0: + for b in ublocks: + res = await rman.validate(b) + if not(res.isOk): + break + else: + res = Result[void, BlockError].ok() + + if res.isOk(): + peer.updateScore(PeerScoreGoodBlocks) + else: + peer.updateScore(PeerScoreBadBlocks) else: peer.updateScore(PeerScoreBadResponse) else: @@ -85,12 +105,12 @@ proc requestManagerLoop(rman: RequestManager) {.async.} = while true: try: rootList.setLen(0) - let req = await rman.queue.popFirst() + let req = await rman.inpQueue.popFirst() rootList.add(req.root) - var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.queue)) + var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.inpQueue)) while count > 0: - rootList.add(rman.queue.popFirstNoWait().root) + rootList.add(rman.inpQueue.popFirstNoWait().root) dec(count) let start = SyncMoment.now(Slot(0)) @@ -111,7 +131,7 @@ proc requestManagerLoop(rman: RequestManager) {.async.} = debug "Request manager tick", blocks_count = len(rootList), succeed = succeed, failed = (len(workers) - succeed), - queue_size = len(rman.queue), + queue_size = len(rman.inpQueue), sync_speed = speed(start, finish) except CatchableError as exc: @@ -119,7 +139,7 @@ proc requestManagerLoop(rman: RequestManager) {.async.} = proc start*(rman: var RequestManager) = ## Start Request Manager's loop. - rman.loopFuture = requestManagerLoop(rman) + rman.loopFuture = rman.requestManagerLoop() proc stop*(rman: RequestManager) = ## Stop Request Manager's loop. @@ -130,4 +150,4 @@ proc fetchAncestorBlocks*(rman: RequestManager, roots: seq[FetchRecord]) = ## Enqueue list missing blocks roots ``roots`` for download by ## Request Manager ``rman``. for item in roots: - rman.queue.addLastNoWait(item) + rman.inpQueue.addLastNoWait(item) diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index a59f5078d..38f8a5a49 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -440,7 +440,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], var res: Result[void, BlockError] if len(item.data) > 0: for blk in item.data: - debug "Pushing block", block_root = blk.root, + trace "Pushing block", block_root = blk.root, block_slot = blk.message.slot res = await sq.validate(blk) if not(res.isOk): @@ -587,6 +587,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], getLocalHeadSlotCb: GetSlotCallback, getLocalWallSlotCb: GetSlotCallback, getFinalizedSlotCb: GetSlotCallback, + outputQueue: AsyncQueue[SyncBlock], maxWorkers = 10, maxStatusAge = uint64(SLOTS_PER_EPOCH * 4), maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), @@ -594,12 +595,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], int(SECONDS_PER_SLOT)).seconds, chunkSize = uint64(SLOTS_PER_EPOCH), toleranceValue = uint64(1), - maxRecurringFailures = 3, - outputQueueSize = 1, + maxRecurringFailures = 3 ): SyncManager[A, B] = - var outputQueue = newAsyncQueue[SyncBlock](outputQueueSize) - let queue = SyncQueue.init(A, getFinalizedSlotCb(), getLocalWallSlotCb(), chunkSize, getFinalizedSlotCb, outputQueue, 1) @@ -982,16 +980,17 @@ proc start*[A, B](man: SyncManager[A, B]) = ## Starts SyncManager's main loop. man.syncFut = man.syncLoop() -proc getBlock*[A, B](man: SyncManager[A, B]): Future[SyncBlock] = - ## Get the block that was received during synchronization. - man.outQueue.popFirst() - proc done*(blk: SyncBlock) = - ## Send signal to SyncManager that the block ``blk`` has passed + ## Send signal to [Sync/Request]Manager that the block ``blk`` has passed ## verification successfully. blk.resfut.complete(Result[void, BlockError].ok()) proc fail*(blk: SyncBlock, error: BlockError) = - ## Send signal to SyncManager that the block ``blk`` has NOT passed + ## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed ## verification with specific ``error``. blk.resfut.complete(Result[void, BlockError].err(error)) + +proc complete*(blk: SyncBlock, res: Result[void, BlockError]) {.inline.} = + ## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk`` + ## verification. + blk.resfut.complete(res) diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index 0b849fd5a..5ac4bc34f 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -80,7 +80,7 @@ suiteReport "PeerPool testing suite": doAssert(fut1.finished == false) doAssert(fut2.finished == false) peer0.close() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut1.finished == false) doAssert(fut2.finished == true and fut2.failed == false) result = true @@ -102,11 +102,11 @@ suiteReport "PeerPool testing suite": doAssert(fut2.finished == false) doAssert(fut3.finished == false) peer0.close() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut2.finished == true and fut2.failed == false) doAssert(fut3.finished == false) peer1.close() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut3.finished == true and fut3.failed == false) result = true @@ -128,11 +128,11 @@ suiteReport "PeerPool testing suite": doAssert(fut2.finished == true and fut2.failed == false) doAssert(fut3.finished == false) peer0.close() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut1.finished == true and fut1.failed == false) doAssert(fut3.finished == false) peer2.close() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut3.finished == true and fut3.failed == false) result = true @@ -160,21 +160,21 @@ suiteReport "PeerPool testing suite": doAssert(fut4.finished == false) doAssert(fut5.finished == false) - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut3.finished == false) doAssert(fut4.finished == false) doAssert(fut5.finished == false) peer0.close() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut3.finished == true and fut3.failed == false) doAssert(fut4.finished == false) doAssert(fut5.finished == false) peer1.close() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut4.finished == true and fut4.failed == false) doAssert(fut5.finished == false) peer2.close() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) doAssert(fut5.finished == true and fut5.failed == false) result = true @@ -442,7 +442,7 @@ suiteReport "PeerPool testing suite": proc testConsumer() {.async.} = var p = await pool.acquire() - await sleepAsync(10.milliseconds) + await sleepAsync(100.milliseconds) pool.release(p) proc testClose(): Future[bool] {.async.} =