diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index e4a97db1..5ff82e48 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -18,6 +18,8 @@ import ../protobuf/presence import ../peers import ../../utils +import ../../utils/exceptions +import ../../utils/trackedfutures import ../../discovery import ../../stores/blockstore import ../../logutils @@ -42,7 +44,7 @@ type advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle advertiseQueue*: AsyncQueue[Cid] # Advertise queue - advertiseTasks*: seq[Future[void]] # Advertise tasks + trackedFutures*: TrackedFutures # Advertise tasks futures advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests @@ -70,20 +72,26 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} = await b.addCidToQueue(cid) await b.addCidToQueue(manifest.treeCid) -proc advertiseLocalStoreLoop(b: Advertiser) {.async.} = +proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} = while b.advertiserRunning: - if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): - trace "Advertiser begins iterating blocks..." - for c in cids: - if cid =? await c: - await b.advertiseBlock(cid) - trace "Advertiser iterating blocks finished." + try: + if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): + trace "Advertiser begins iterating blocks..." + for c in cids: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating blocks finished." - await sleepAsync(b.advertiseLocalStoreLoopSleep) + await sleepAsync(b.advertiseLocalStoreLoopSleep) + + except CancelledError: + break # do not propagate as advertiseLocalStoreLoop was asyncSpawned + except CatchableError as e: + error "failed to advertise blocks in local store", error = e.msgDetail info "Exiting advertise task loop" -proc processQueueLoop(b: Advertiser) {.async.} = +proc processQueueLoop(b: Advertiser) {.async: (raises: []).} = while b.advertiserRunning: try: let @@ -129,9 +137,11 @@ proc start*(b: Advertiser) {.async.} = b.advertiserRunning = true for i in 0..