diff --git a/asyncdispatch2/asyncfutures2.nim b/asyncdispatch2/asyncfutures2.nim index c570a81..0536ca3 100644 --- a/asyncdispatch2/asyncfutures2.nim +++ b/asyncdispatch2/asyncfutures2.nim @@ -18,17 +18,25 @@ type function*: CallbackFunc udata*: pointer + # ZAH: This can probably be stored with a cheaper representation + # until the moment it needs to be printed to the screen (e.g. seq[StackTraceEntry]) + StackTrace = string + FutureBase* = ref object of RootObj ## Untyped future. callbacks: Deque[AsyncCallback] finished: bool error*: ref Exception ## Stored exception - errorStackTrace*: string + errorStackTrace*: StackTrace when not defined(release): - stackTrace: string ## For debugging purposes only. + stackTrace: StackTrace ## For debugging purposes only. id: int fromProc: string + # ZAH: we have discussed some possible optimizations where + # the future can be stored within the caller's stack frame. + # How much refactoring is needed to make this a regular non-ref type? + # Obviously, it will still be allocated on the heap when necessary. Future*[T] = ref object of FutureBase ## Typed future. value: T ## Stored value @@ -42,6 +50,8 @@ type when not defined(release): var currentID = 0 +# ZAH: This seems unnecessary. Isn't it easy to introduce a seperate +# module for the dispatcher type, so it can be directly referenced here? var callSoonHolder {.threadvar.}: CallSoonProc proc getCallSoonProc*(): CallSoonProc {.gcsafe.} = @@ -65,6 +75,11 @@ template setupFutureBase(fromProc: string) = result.fromProc = fromProc currentID.inc() +## ZAH: As far as I undestand `fromProc` is just a debugging helper. +## It would be more efficient if it's represented as a simple statically +## known `char *` in the final program (so it needs to be a `cstring` in Nim). +## The public API can be defined as a template expecting a `static[string]` +## and converting this immediately to a `cstring`. proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = ## Creates a new future. ## @@ -106,6 +121,7 @@ proc checkFinished[T](future: Future[T]) = err.cause = future raise err +# ZAH: I've seen this code in asyncloop proc call(callbacks: var Deque[AsyncCallback]) = var count = len(callbacks) if count > 0: @@ -115,6 +131,7 @@ proc call(callbacks: var Deque[AsyncCallback]) = dec(count) proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = + # ZAH: perhaps this is the default behavior with latest Nim (no need for the `len` check) if len(callbacks) == 0: callbacks = initDeque[AsyncCallback]() callbacks.addLast(item) @@ -122,6 +139,16 @@ proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = proc remove(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = if len(callbacks) > 0: var count = len(callbacks) + # ZAH: This is not the most efficient way to implement this. + # When you discover an element suitalbe for removal, you can put the last + # element in its place and reduce the length. The problem is that the + # order of callbacks will be changed, which is unfortunate. + # + # Shifting the elements in-place will still be more efficient than the + # current copying due to the CPU cache (because otherwise we may end up + # touching memory that's residing on a different cache line). + # + # I recommend implementing this proper remove logic in the Deque type. while count > 0: var p = callbacks.popFirst() if p.function != item.function or p.udata != item.udata: @@ -176,6 +203,7 @@ proc fail*[T](future: Future[T], error: ref Exception) = proc clearCallbacks(future: FutureBase) = if len(future.callbacks) > 0: + # ZAH: This could have been a single call to `setLen` var count = len(future.callbacks) while count > 0: discard future.callbacks.popFirst() @@ -187,6 +215,7 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## If future has already completed then ``cb`` will be called immediately. assert cb != nil if future.finished: + # ZAH: it seems that the Future needs to know its associated Dispatcher callSoon(cb, udata) else: let acb = AsyncCallback(function: cb, udata: udata) @@ -214,6 +243,7 @@ proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## If future has already completed then ``cb`` will be called immediately. ## ## It's recommended to use ``addCallback`` or ``then`` instead. + # ZAH: how about `setLen(1); callbacks[0] = cb` future.clearCallbacks future.addCallback(cb, udata) @@ -358,6 +388,8 @@ proc asyncCheck*[T](future: Future[T]) = ## ## This should be used instead of ``discard`` to discard void futures. assert(not future.isNil, "Future is nil") + # ZAH: This should probably add a callback instead of replacing all call-backs. + # Perhaps a new API can be introduced to avoid the breaking change. future.callback = asyncCheckProxy[T] # proc (udata: pointer) = # if future.failed: @@ -365,12 +397,22 @@ proc asyncCheck*[T](future: Future[T]) = # raise future.error proc spawn*[T](future: Future[T]) = + # ZAH: What is the purpose of this? assert(not future.isNil, "Future is nil") future.callback = spawnProxy[T] +# ZAH: The return type here could be a Future[(T, Y)] proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = ## Returns a future which will complete once both ``fut1`` and ``fut2`` ## complete. + # ZAH: The Rust implementation of futures is making the case that the + # `and` combinator can be implemented in a more efficient way without + # resorting to closures and callbacks. I haven't thought this through + # completely yet, but here is their write-up: + # http://aturon.github.io/2016/09/07/futures-design/ + # + # We should investigate this further, before settling on the final design. + # The same reasoning applies to `or` and `all`. var retFuture = newFuture[void]("asyncdispatch.`and`") proc cb(data: pointer) = if not retFuture.finished: @@ -402,6 +444,8 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = fut2.callback = cb return retFuture +# ZAH: The return type here could be a tuple +# This will enable waiting a heterogenous collection of futures. proc all*[T](futs: varargs[Future[T]]): auto = ## Returns a future which will complete once ## all futures in ``futs`` complete. diff --git a/asyncdispatch2/asyncloop.nim b/asyncdispatch2/asyncloop.nim index b3b9620..a08c51d 100644 --- a/asyncdispatch2/asyncloop.nim +++ b/asyncdispatch2/asyncloop.nim @@ -250,6 +250,8 @@ when defined(windows) or defined(nimdoc): ## (Unix) for the specified dispatcher. return disp.ioPort + # ZAH: Shouldn't all of these procs be defined over the Dispatcher type? + # The "global" variants can be defined as templates passing the global dispatcher proc register*(fd: AsyncFD) = ## Registers ``fd`` with the dispatcher. let p = getGlobalDispatcher() @@ -263,6 +265,7 @@ when defined(windows) or defined(nimdoc): var curTime = fastEpochTime() var curTimeout = DWORD(0) + # ZAH: Please extract this code in a template # Moving expired timers to `loop.callbacks` and calculate timeout var count = len(loop.timers) if count > 0: @@ -308,6 +311,8 @@ when defined(windows) or defined(nimdoc): if int32(errCode) != WAIT_TIMEOUT: raiseOSError(errCode) + # ZAH: Please extract the code below in a template + # Moving expired timers to `loop.callbacks`. curTime = fastEpochTime() count = len(loop.timers) @@ -322,6 +327,8 @@ when defined(windows) or defined(nimdoc): # poll() call. count = len(loop.callbacks) for i in 0.. 0: @@ -570,6 +578,8 @@ else: withData(loop.selector, fd, adata) do: loop.callbacks.addLast(adata.reader) + # ZAH: Please extract the code below in a template + # Moving expired timers to `loop.callbacks`. curTime = fastEpochTime() count = len(loop.timers) @@ -585,6 +595,8 @@ else: # poll() call. count = len(loop.callbacks) for i in 0.. c: moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c)) (t).offset = (t).offset - (c) @@ -341,6 +348,10 @@ when defined(windows): transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() transp.future = newFuture[void]("stream.socket.transport") + # ZAH: If these objects are going to be manually managed, why do we bother + # with using the GC at all? It's better to rely on a destructor. If someone + # wants to share a Transport reference, they can still create a GC-managed + # wrapping object. GC_ref(transp) result = cast[StreamTransport](transp) @@ -1060,6 +1071,7 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = while true: if (ReadError in transp.state): raise transp.getError() + # ZAH: Shouldn't this be {ReadEof, ReadClosed} * transp.state != {} if (ReadEof in transp.state) or (ReadClosed in transp.state): break