From 91bb6daa19c202ffd296618b2310f1e0d4783e1c Mon Sep 17 00:00:00 2001 From: benbierens Date: Mon, 3 Jul 2023 14:53:53 +0200 Subject: [PATCH] Adds heartbeat loop and loopmeasure to task handler for discovery.nim --- codex/blockexchange/engine/discovery.nim | 11 ++++++++++- codex/blockexchange/engine/engine.nim | 6 ++++++ codex/codex.nim | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 5a930967..2bffc2de 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -54,6 +54,7 @@ type advertiseQueue*: AsyncQueue[Cid] # Advertise queue advertiseTasks*: seq[Future[void]] # Advertise tasks discoveryLoop*: Future[void] # Discovery loop task handle + heartbeatLoop*: Future[void] discoveryQueue*: AsyncQueue[Cid] # Discovery queue discoveryTasks*: seq[Future[void]] # Discovery tasks minPeersPerBlock*: int # Max number of peers with block @@ -78,6 +79,10 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = trace "About to sleep discovery loop" await sleepAsync(b.discoveryLoopSleep) +proc heartbeatLoop(b: DiscoveryEngine) {.async.} = + while b.discEngineRunning: + await sleepAsync(1.seconds) + proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: if cids =? await b.localStore.listBlocks(blockType = b.advertiseType): @@ -206,6 +211,7 @@ proc start*(b: DiscoveryEngine) {.async.} = b.advertiseLoop = advertiseQueueLoop(b) b.discoveryLoop = discoveryQueueLoop(b) + b.heartbeatLoop = heartbeatLoop(b) proc stop*(b: DiscoveryEngine) {.async.} = ## Stop the discovery engine @@ -239,6 +245,9 @@ proc stop*(b: DiscoveryEngine) {.async.} = await b.discoveryLoop.cancelAndWait() trace "Discovery loop stopped" + if not b.heartbeatLoop.isNil and not b.heartbeatLoop.finished: + await b.heartbeatLoop.cancelAndWait() + trace "Discovery engine stopped" proc new*( @@ -256,7 +265,7 @@ proc new*( advertiseType = BlockType.Both ): DiscoveryEngine = ## Create a discovery engine instance for advertising services - ## + ## DiscoveryEngine( localStore: localStore, peers: peers, diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index b8a05935..d1fe1dfe 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -20,6 +20,7 @@ import pkg/stint import ../../stores/blockstore import ../../blocktype as bt import ../../utils +import ../../loopmeasure import ../protobuf/blockexc import ../protobuf/presence @@ -64,6 +65,7 @@ type wallet*: WalletRef # Nitro wallet for micropayments pricing*: ?Pricing # Optional bandwidth pricing discovery*: DiscoveryEngine + loopMeasure*: LoopMeasure Pricing* = object address*: EthAddress @@ -491,7 +493,9 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = peerCtx = await b.taskQueue.pop() trace "Got new task from queue", peerId = peerCtx.id + b.loopMeasure.loopArm() await b.taskHandler(peerCtx) + b.loopMeasure.loopDisarm("blockexcTaskRunner") trace "Exiting blockexc task runner" @@ -503,6 +507,7 @@ proc new*( discovery: DiscoveryEngine, peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, + loopMeasure: LoopMeasure, concurrentTasks = DefaultConcurrentTasks, peersPerRequest = DefaultMaxPeersPerRequest ): BlockExcEngine = @@ -514,6 +519,7 @@ proc new*( localStore: localStore, peers: peerStore, pendingBlocks: pendingBlocks, + loopMeasure: loopMeasure, peersPerRequest: peersPerRequest, network: network, wallet: wallet, diff --git a/codex/codex.nim b/codex/codex.nim index 692ccc81..a7de75f4 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -227,7 +227,7 @@ proc new*( peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) - engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) + engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks, loopMeasure) store = NetworkStore.new(engine, repoStore) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery)