Adds heartbeat loop and loopmeasure to task handler for discovery.nim

This commit is contained in:
benbierens 2023-07-03 14:53:53 +02:00
parent 30ba4bd3d4
commit 91bb6daa19
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
3 changed files with 17 additions and 2 deletions

View File

@ -54,6 +54,7 @@ type
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
discoveryLoop*: Future[void] # Discovery loop task handle
heartbeatLoop*: Future[void]
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks
minPeersPerBlock*: int # Max number of peers with block
@ -78,6 +79,10 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
trace "About to sleep discovery loop"
await sleepAsync(b.discoveryLoopSleep)
proc heartbeatLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
await sleepAsync(1.seconds)
proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
@ -206,6 +211,7 @@ proc start*(b: DiscoveryEngine) {.async.} =
b.advertiseLoop = advertiseQueueLoop(b)
b.discoveryLoop = discoveryQueueLoop(b)
b.heartbeatLoop = heartbeatLoop(b)
proc stop*(b: DiscoveryEngine) {.async.} =
## Stop the discovery engine
@ -239,6 +245,9 @@ proc stop*(b: DiscoveryEngine) {.async.} =
await b.discoveryLoop.cancelAndWait()
trace "Discovery loop stopped"
if not b.heartbeatLoop.isNil and not b.heartbeatLoop.finished:
await b.heartbeatLoop.cancelAndWait()
trace "Discovery engine stopped"
proc new*(
@ -256,7 +265,7 @@ proc new*(
advertiseType = BlockType.Both
): DiscoveryEngine =
## Create a discovery engine instance for advertising services
##
##
DiscoveryEngine(
localStore: localStore,
peers: peers,

View File

@ -20,6 +20,7 @@ import pkg/stint
import ../../stores/blockstore
import ../../blocktype as bt
import ../../utils
import ../../loopmeasure
import ../protobuf/blockexc
import ../protobuf/presence
@ -64,6 +65,7 @@ type
wallet*: WalletRef # Nitro wallet for micropayments
pricing*: ?Pricing # Optional bandwidth pricing
discovery*: DiscoveryEngine
loopMeasure*: LoopMeasure
Pricing* = object
address*: EthAddress
@ -491,7 +493,9 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
peerCtx = await b.taskQueue.pop()
trace "Got new task from queue", peerId = peerCtx.id
b.loopMeasure.loopArm()
await b.taskHandler(peerCtx)
b.loopMeasure.loopDisarm("blockexcTaskRunner")
trace "Exiting blockexc task runner"
@ -503,6 +507,7 @@ proc new*(
discovery: DiscoveryEngine,
peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager,
loopMeasure: LoopMeasure,
concurrentTasks = DefaultConcurrentTasks,
peersPerRequest = DefaultMaxPeersPerRequest
): BlockExcEngine =
@ -514,6 +519,7 @@ proc new*(
localStore: localStore,
peers: peerStore,
pendingBlocks: pendingBlocks,
loopMeasure: loopMeasure,
peersPerRequest: peersPerRequest,
network: network,
wallet: wallet,

View File

@ -227,7 +227,7 @@ proc new*(
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks, loopMeasure)
store = NetworkStore.new(engine, repoStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery)