fix(blockexchange): ensures futures are asyncSpawned (#1037)

* fix(blockexchange): asyncSpawn advertising of local store blocks

* fix(blockexchange): asyncSpawn discoveryQueueLoop

- prevents silently swallowing async errors

* fix(blockexchange): asyncSpawns block exchange tasks

- prevents silently swallow future exceptions
This commit is contained in:
Eric 2024-12-16 13:01:49 +07:00 committed by GitHub
parent 5f2ba14281
commit b0cc27f563
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 67 additions and 57 deletions

View File

@ -18,6 +18,8 @@ import ../protobuf/presence
import ../peers
import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
@ -42,7 +44,7 @@ type
advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
trackedFutures*: TrackedFutures # Advertise tasks futures
advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
@ -70,20 +72,26 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} =
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)
proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
await sleepAsync(b.advertiseLocalStoreLoopSleep)
await sleepAsync(b.advertiseLocalStoreLoopSleep)
except CancelledError:
break # do not propagate as advertiseLocalStoreLoop was asyncSpawned
except CatchableError as e:
error "failed to advertise blocks in local store", error = e.msgDetail
info "Exiting advertise task loop"
proc processQueueLoop(b: Advertiser) {.async.} =
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
let
@ -129,9 +137,11 @@ proc start*(b: Advertiser) {.async.} =
b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(processQueueLoop(b))
let fut = b.processQueueLoop().track(b)
asyncSpawn fut
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b).track(b)
asyncSpawn b.advertiseLocalStoreLoop
proc stop*(b: Advertiser) {.async.} =
## Stop the advertiser
@ -145,19 +155,9 @@ proc stop*(b: Advertiser) {.async.} =
b.advertiserRunning = false
# Stop incoming tasks from callback and localStore loop
b.localStore.onBlockStored = CidCallback.none
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLocalStoreLoop.cancelAndWait()
trace "Advertise loop stopped"
# Clear up remaining tasks
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"
trace "Advertiser stopped"
trace "Stopping advertise loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Advertiser loop and tasks stopped"
proc new*(
T: type Advertiser,
@ -173,5 +173,6 @@ proc new*(
discovery: discovery,
concurrentAdvReqs: concurrentAdvReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
trackedFutures: TrackedFutures.new(),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)

View File

@ -23,6 +23,7 @@ import ../network
import ../peers
import ../../utils
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
@ -50,12 +51,12 @@ type
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
try:
@ -66,13 +67,15 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
except CatchableError as exc:
warn "Exception in discovery loop", exc = exc.msg
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
try:
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
await sleepAsync(b.discoveryLoopSleep)
except CancelledError:
discard # do not propagate as discoveryQueueLoop was asyncSpawned
await sleepAsync(b.discoveryLoopSleep)
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
## Run discovery tasks
##
@ -116,6 +119,11 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
return
except CatchableError as exc:
warn "Exception in discovery task runner", exc = exc.msg
except Exception as e:
# Raised by b.discovery.removeProvider somehow...
# This should not be catchable, and we should never get here. Therefore,
# raise a Defect.
raiseAssert "Exception when removing provider"
info "Exiting discovery task runner"
@ -139,9 +147,11 @@ proc start*(b: DiscoveryEngine) {.async.} =
b.discEngineRunning = true
for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b))
let fut = b.discoveryTaskLoop().track(b)
asyncSpawn fut
b.discoveryLoop = discoveryQueueLoop(b)
b.discoveryLoop = b.discoveryQueueLoop().track(b)
asyncSpawn b.discoveryLoop
proc stop*(b: DiscoveryEngine) {.async.} =
## Stop the discovery engine
@ -153,16 +163,9 @@ proc stop*(b: DiscoveryEngine) {.async.} =
return
b.discEngineRunning = false
for task in b.discoveryTasks:
if not task.finished:
trace "Awaiting discovery task to stop"
await task.cancelAndWait()
trace "Discovery task stopped"
if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait()
trace "Discovery loop stopped"
trace "Stopping discovery loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Discovery loop and tasks stopped"
trace "Discovery engine stopped"
@ -187,6 +190,7 @@ proc new*(
pendingBlocks: pendingBlocks,
concurrentDiscReqs: concurrentDiscReqs,
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
trackedFutures: TrackedFutures.new(),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock)

View File

@ -22,6 +22,8 @@ import pkg/questionable
import ../../stores/blockstore
import ../../blocktype
import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../merkletree
import ../../logutils
import ../../manifest
@ -70,7 +72,7 @@ type
peers*: PeerCtxStore # Peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for
concurrentTasks: int # Number of concurrent peers we're serving at any given time
blockexcTasks: seq[Future[void]] # Future to control blockexc task
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
peersPerRequest: int # Max number of peers to request from
@ -88,7 +90,7 @@ type
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).}
proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
@ -104,7 +106,8 @@ proc start*(b: BlockExcEngine) {.async.} =
b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
let fut = b.blockexcTaskRunner().track(b)
asyncSpawn fut
proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
@ -119,11 +122,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
return
b.blockexcRunning = false
for task in b.blockexcTasks:
if not task.finished:
trace "Awaiting task to stop"
await task.cancelAndWait()
trace "Task stopped"
await b.trackedFutures.cancelTracked()
trace "NetworkStore stopped"
@ -565,16 +564,21 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
task.peerWants.keepItIf(it.address notin successAddresses)
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} =
## process tasks
##
trace "Starting blockexc task runner"
while b.blockexcRunning:
let
peerCtx = await b.taskQueue.pop()
try:
let
peerCtx = await b.taskQueue.pop()
await b.taskHandler(peerCtx)
await b.taskHandler(peerCtx)
except CancelledError:
break # do not propagate as blockexcTaskRunner was asyncSpawned
except CatchableError as e:
error "error running block exchange task", error = e.msgDetail
info "Exiting blockexc task runner"
@ -603,6 +607,7 @@ proc new*(
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures.new(),
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery,
advertiser: advertiser,