From cbd8e03823c00dd230e48a4613c0f594b77616eb Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 1 Apr 2020 12:10:56 +0300 Subject: [PATCH] Add allFinished() primitive. (#87) * Add allCompleted() primitive. * Rename it to allFinished(). * Fix allFinished Future's static name. Use `mitems()` instead of `for`. --- chronos/asyncfutures2.nim | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 1d80679..40754b4 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -701,6 +701,43 @@ proc allFutures*[T](futs: varargs[Future[T]]): Future[void] = return retFuture +proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] = + ## Returns a future which will complete only when all futures in ``futs`` + ## will be completed, failed or canceled. + ## + ## Returned sequence will hold all the Future[T] objects passed to + ## ``allCompleted`` with the order preserved. + ## + ## If the argument is empty, the returned future COMPLETES immediately. + ## + ## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled. + var retFuture = newFuture[seq[Future[T]]]("chronos.allFinished()") + let totalFutures = len(futs) + var completedFutures = 0 + + var nfuts = @futs + + proc cb(udata: pointer) {.gcsafe.} = + if not(retFuture.finished()): + inc(completedFutures) + if completedFutures == totalFutures: + retFuture.complete(nfuts) + + proc cancellation(udata: pointer) {.gcsafe.} = + # On cancel we remove all our callbacks only. + for fut in nfuts.mitems(): + if not(fut.finished()): + fut.removeCallback(cb) + + for fut in nfuts: + fut.addCallback(cb) + + retFuture.cancelCallback = cancellation + if len(nfuts) == 0: + retFuture.complete(nfuts) + + return retFuture + proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] = ## Returns a future which will complete and return completed Future[T] inside, ## when one of the futures in ``futs`` will be completed, failed or canceled.