refactor(trackedfutures): remove return of future from tracked futures api (#1046)
- cleans up all instances of `.track` to use the `module.trackedfutures.track(future)` procedure, for better readability - removes the `track` override that is no longer used in the codebase
This commit is contained in:
parent
20bb5e5a38
commit
8645d336ff
|
@ -137,10 +137,12 @@ proc start*(b: Advertiser) {.async.} =
|
||||||
|
|
||||||
b.advertiserRunning = true
|
b.advertiserRunning = true
|
||||||
for i in 0..<b.concurrentAdvReqs:
|
for i in 0..<b.concurrentAdvReqs:
|
||||||
let fut = b.processQueueLoop().track(b)
|
let fut = b.processQueueLoop()
|
||||||
|
b.trackedFutures.track(fut)
|
||||||
asyncSpawn fut
|
asyncSpawn fut
|
||||||
|
|
||||||
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b).track(b)
|
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
|
||||||
|
b.trackedFutures.track(b.advertiseLocalStoreLoop)
|
||||||
asyncSpawn b.advertiseLocalStoreLoop
|
asyncSpawn b.advertiseLocalStoreLoop
|
||||||
|
|
||||||
proc stop*(b: Advertiser) {.async.} =
|
proc stop*(b: Advertiser) {.async.} =
|
||||||
|
|
|
@ -147,10 +147,12 @@ proc start*(b: DiscoveryEngine) {.async.} =
|
||||||
|
|
||||||
b.discEngineRunning = true
|
b.discEngineRunning = true
|
||||||
for i in 0..<b.concurrentDiscReqs:
|
for i in 0..<b.concurrentDiscReqs:
|
||||||
let fut = b.discoveryTaskLoop().track(b)
|
let fut = b.discoveryTaskLoop()
|
||||||
|
b.trackedFutures.track(fut)
|
||||||
asyncSpawn fut
|
asyncSpawn fut
|
||||||
|
|
||||||
b.discoveryLoop = b.discoveryQueueLoop().track(b)
|
b.discoveryLoop = b.discoveryQueueLoop()
|
||||||
|
b.trackedFutures.track(b.discoveryLoop)
|
||||||
asyncSpawn b.discoveryLoop
|
asyncSpawn b.discoveryLoop
|
||||||
|
|
||||||
proc stop*(b: DiscoveryEngine) {.async.} =
|
proc stop*(b: DiscoveryEngine) {.async.} =
|
||||||
|
|
|
@ -106,7 +106,8 @@ proc start*(b: BlockExcEngine) {.async.} =
|
||||||
|
|
||||||
b.blockexcRunning = true
|
b.blockexcRunning = true
|
||||||
for i in 0..<b.concurrentTasks:
|
for i in 0..<b.concurrentTasks:
|
||||||
let fut = b.blockexcTaskRunner().track(b)
|
let fut = b.blockexcTaskRunner()
|
||||||
|
b.trackedFutures.track(fut)
|
||||||
asyncSpawn fut
|
asyncSpawn fut
|
||||||
|
|
||||||
proc stop*(b: BlockExcEngine) {.async.} =
|
proc stop*(b: BlockExcEngine) {.async.} =
|
||||||
|
|
|
@ -351,7 +351,9 @@ proc onSlotFreed(sales: Sales,
|
||||||
if err =? queue.push(found).errorOption:
|
if err =? queue.push(found).errorOption:
|
||||||
error "failed to push slot items to queue", error = err.msgDetail
|
error "failed to push slot items to queue", error = err.msgDetail
|
||||||
|
|
||||||
asyncSpawn addSlotToQueue().track(sales)
|
let fut = addSlotToQueue()
|
||||||
|
sales.trackedFutures.track(fut)
|
||||||
|
asyncSpawn fut
|
||||||
|
|
||||||
proc subscribeRequested(sales: Sales) {.async.} =
|
proc subscribeRequested(sales: Sales) {.async.} =
|
||||||
let context = sales.context
|
let context = sales.context
|
||||||
|
|
|
@ -323,7 +323,7 @@ proc addWorker(self: SlotQueue): ?!void =
|
||||||
|
|
||||||
let worker = SlotQueueWorker.init()
|
let worker = SlotQueueWorker.init()
|
||||||
try:
|
try:
|
||||||
discard worker.doneProcessing.track(self)
|
self.trackedFutures.track(worker.doneProcessing)
|
||||||
self.workers.addLastNoWait(worker)
|
self.workers.addLastNoWait(worker)
|
||||||
except AsyncQueueFullError:
|
except AsyncQueueFullError:
|
||||||
return failure("failed to add worker, worker queue full")
|
return failure("failed to add worker, worker queue full")
|
||||||
|
@ -343,7 +343,7 @@ proc dispatch(self: SlotQueue,
|
||||||
|
|
||||||
if onProcessSlot =? self.onProcessSlot:
|
if onProcessSlot =? self.onProcessSlot:
|
||||||
try:
|
try:
|
||||||
discard worker.doneProcessing.track(self)
|
self.trackedFutures.track(worker.doneProcessing)
|
||||||
await onProcessSlot(item, worker.doneProcessing)
|
await onProcessSlot(item, worker.doneProcessing)
|
||||||
await worker.doneProcessing
|
await worker.doneProcessing
|
||||||
|
|
||||||
|
@ -418,7 +418,9 @@ proc run(self: SlotQueue) {.async: (raises: []).} =
|
||||||
|
|
||||||
trace "processing item"
|
trace "processing item"
|
||||||
|
|
||||||
asyncSpawn self.dispatch(worker, item).track(self)
|
let fut = self.dispatch(worker, item)
|
||||||
|
self.trackedFutures.track(fut)
|
||||||
|
asyncSpawn fut
|
||||||
|
|
||||||
await sleepAsync(1.millis) # poll
|
await sleepAsync(1.millis) # poll
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
|
@ -444,7 +446,9 @@ proc start*(self: SlotQueue) =
|
||||||
if err =? self.addWorker().errorOption:
|
if err =? self.addWorker().errorOption:
|
||||||
error "start: error adding new worker", error = err.msg
|
error "start: error adding new worker", error = err.msg
|
||||||
|
|
||||||
asyncSpawn self.run().track(self)
|
let fut = self.run()
|
||||||
|
self.trackedFutures.track(fut)
|
||||||
|
asyncSpawn fut
|
||||||
|
|
||||||
proc stop*(self: SlotQueue) {.async.} =
|
proc stop*(self: SlotQueue) {.async.} =
|
||||||
if not self.running:
|
if not self.running:
|
||||||
|
|
|
@ -78,7 +78,8 @@ proc scheduler(machine: Machine) {.async: (raises: []).} =
|
||||||
machine.state = next
|
machine.state = next
|
||||||
debug "enter state", state = fromState & " => " & $machine.state
|
debug "enter state", state = fromState & " => " & $machine.state
|
||||||
running = machine.run(machine.state)
|
running = machine.run(machine.state)
|
||||||
asyncSpawn running.track(machine)
|
machine.trackedFutures.track(running)
|
||||||
|
asyncSpawn running
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
break # do not propagate bc it is asyncSpawned
|
break # do not propagate bc it is asyncSpawned
|
||||||
|
|
||||||
|
@ -90,7 +91,9 @@ proc start*(machine: Machine, initialState: State) =
|
||||||
machine.scheduled = newAsyncQueue[Event]()
|
machine.scheduled = newAsyncQueue[Event]()
|
||||||
|
|
||||||
machine.started = true
|
machine.started = true
|
||||||
asyncSpawn machine.scheduler().track(machine)
|
let fut = machine.scheduler()
|
||||||
|
machine.trackedFutures.track(fut)
|
||||||
|
asyncSpawn fut
|
||||||
machine.schedule(Event.transition(machine.state, initialState))
|
machine.schedule(Event.transition(machine.state, initialState))
|
||||||
|
|
||||||
proc stop*(machine: Machine) {.async.} =
|
proc stop*(machine: Machine) {.async.} =
|
||||||
|
|
|
@ -19,9 +19,9 @@ proc removeFuture(self: TrackedFutures, future: FutureBase) =
|
||||||
if not self.cancelling and not future.isNil:
|
if not self.cancelling and not future.isNil:
|
||||||
self.futures.del(future.id)
|
self.futures.del(future.id)
|
||||||
|
|
||||||
proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] =
|
proc track*[T](self: TrackedFutures, fut: Future[T]) =
|
||||||
if self.cancelling:
|
if self.cancelling:
|
||||||
return fut
|
return
|
||||||
|
|
||||||
self.futures[fut.id] = FutureBase(fut)
|
self.futures[fut.id] = FutureBase(fut)
|
||||||
|
|
||||||
|
@ -30,14 +30,6 @@ proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] =
|
||||||
|
|
||||||
fut.addCallback(cb)
|
fut.addCallback(cb)
|
||||||
|
|
||||||
return fut
|
|
||||||
|
|
||||||
proc track*[T, U](future: Future[T], self: U): Future[T] =
|
|
||||||
## Convenience method that allows chaining future, eg:
|
|
||||||
## `await someFut().track(sales)`, where `sales` has declared a
|
|
||||||
## `trackedFutures` property.
|
|
||||||
self.trackedFutures.track(future)
|
|
||||||
|
|
||||||
proc cancelTracked*(self: TrackedFutures) {.async: (raises: []).} =
|
proc cancelTracked*(self: TrackedFutures) {.async: (raises: []).} =
|
||||||
self.cancelling = true
|
self.cancelling = true
|
||||||
|
|
||||||
|
|
|
@ -18,36 +18,36 @@ asyncchecksuite "tracked futures":
|
||||||
|
|
||||||
test "tracks unfinished futures":
|
test "tracks unfinished futures":
|
||||||
let fut = newFuture[void]("test")
|
let fut = newFuture[void]("test")
|
||||||
discard fut.track(module)
|
module.trackedFutures.track(fut)
|
||||||
check module.trackedFutures.len == 1
|
check module.trackedFutures.len == 1
|
||||||
|
|
||||||
test "does not track completed futures":
|
test "does not track completed futures":
|
||||||
let fut = newFuture[void]("test")
|
let fut = newFuture[void]("test")
|
||||||
fut.complete()
|
fut.complete()
|
||||||
discard fut.track(module)
|
module.trackedFutures.track(fut)
|
||||||
check eventually module.trackedFutures.len == 0
|
check eventually module.trackedFutures.len == 0
|
||||||
|
|
||||||
test "does not track failed futures":
|
test "does not track failed futures":
|
||||||
let fut = newFuture[void]("test")
|
let fut = newFuture[void]("test")
|
||||||
fut.fail((ref CatchableError)(msg: "some error"))
|
fut.fail((ref CatchableError)(msg: "some error"))
|
||||||
discard fut.track(module)
|
module.trackedFutures.track(fut)
|
||||||
check eventually module.trackedFutures.len == 0
|
check eventually module.trackedFutures.len == 0
|
||||||
|
|
||||||
test "does not track cancelled futures":
|
test "does not track cancelled futures":
|
||||||
let fut = newFuture[void]("test")
|
let fut = newFuture[void]("test")
|
||||||
await fut.cancelAndWait()
|
await fut.cancelAndWait()
|
||||||
discard fut.track(module)
|
module.trackedFutures.track(fut)
|
||||||
check eventually module.trackedFutures.len == 0
|
check eventually module.trackedFutures.len == 0
|
||||||
|
|
||||||
test "removes tracked future when finished":
|
test "removes tracked future when finished":
|
||||||
let fut = newFuture[void]("test")
|
let fut = newFuture[void]("test")
|
||||||
discard fut.track(module)
|
module.trackedFutures.track(fut)
|
||||||
fut.complete()
|
fut.complete()
|
||||||
check eventually module.trackedFutures.len == 0
|
check eventually module.trackedFutures.len == 0
|
||||||
|
|
||||||
test "removes tracked future when cancelled":
|
test "removes tracked future when cancelled":
|
||||||
let fut = newFuture[void]("test")
|
let fut = newFuture[void]("test")
|
||||||
discard fut.track(module)
|
module.trackedFutures.track(fut)
|
||||||
await fut.cancelAndWait()
|
await fut.cancelAndWait()
|
||||||
check eventually module.trackedFutures.len == 0
|
check eventually module.trackedFutures.len == 0
|
||||||
|
|
||||||
|
@ -55,9 +55,9 @@ asyncchecksuite "tracked futures":
|
||||||
let fut1 = newFuture[void]("test1")
|
let fut1 = newFuture[void]("test1")
|
||||||
let fut2 = newFuture[void]("test2")
|
let fut2 = newFuture[void]("test2")
|
||||||
let fut3 = newFuture[void]("test3")
|
let fut3 = newFuture[void]("test3")
|
||||||
discard fut1.track(module)
|
module.trackedFutures.track(fut1)
|
||||||
discard fut2.track(module)
|
module.trackedFutures.track(fut2)
|
||||||
discard fut3.track(module)
|
module.trackedFutures.track(fut3)
|
||||||
await module.trackedFutures.cancelTracked()
|
await module.trackedFutures.cancelTracked()
|
||||||
check eventually fut1.cancelled
|
check eventually fut1.cancelled
|
||||||
check eventually fut2.cancelled
|
check eventually fut2.cancelled
|
||||||
|
|
|
@ -157,10 +157,11 @@ proc waitUntilOutput*(node: NodeProcess, output: string) {.async.} =
|
||||||
trace "waiting until", output
|
trace "waiting until", output
|
||||||
|
|
||||||
let started = newFuture[void]()
|
let started = newFuture[void]()
|
||||||
let fut = node.captureOutput(output, started).track(node)
|
let fut = node.captureOutput(output, started)
|
||||||
|
node.trackedFutures.track(fut)
|
||||||
asyncSpawn fut
|
asyncSpawn fut
|
||||||
await started.wait(60.seconds) # allow enough time for proof generation
|
await started.wait(60.seconds) # allow enough time for proof generation
|
||||||
|
|
||||||
proc waitUntilStarted*(node: NodeProcess) {.async.} =
|
proc waitUntilStarted*(node: NodeProcess) {.async.} =
|
||||||
try:
|
try:
|
||||||
await node.waitUntilOutput(node.startedOutput)
|
await node.waitUntilOutput(node.startedOutput)
|
||||||
|
|
Loading…
Reference in New Issue