reviewed most of the code

This commit is contained in:
Zahary Karadjov 2018-05-29 21:04:11 +03:00
parent 645108813f
commit b77e0417bc
4 changed files with 72 additions and 2 deletions

View File

@ -18,17 +18,25 @@ type
function*: CallbackFunc
udata*: pointer
# ZAH: This can probably be stored with a cheaper representation
# until the moment it needs to be printed to the screen (e.g. seq[StackTraceEntry])
StackTrace = string
FutureBase* = ref object of RootObj ## Untyped future.
callbacks: Deque[AsyncCallback]
finished: bool
error*: ref Exception ## Stored exception
errorStackTrace*: string
errorStackTrace*: StackTrace
when not defined(release):
stackTrace: string ## For debugging purposes only.
stackTrace: StackTrace ## For debugging purposes only.
id: int
fromProc: string
# ZAH: we have discussed some possible optimizations where
# the future can be stored within the caller's stack frame.
# How much refactoring is needed to make this a regular non-ref type?
# Obviously, it will still be allocated on the heap when necessary.
Future*[T] = ref object of FutureBase ## Typed future.
value: T ## Stored value
@ -42,6 +50,8 @@ type
when not defined(release):
var currentID = 0
# ZAH: This seems unnecessary. Isn't it easy to introduce a seperate
# module for the dispatcher type, so it can be directly referenced here?
var callSoonHolder {.threadvar.}: CallSoonProc
proc getCallSoonProc*(): CallSoonProc {.gcsafe.} =
@ -65,6 +75,11 @@ template setupFutureBase(fromProc: string) =
result.fromProc = fromProc
currentID.inc()
## ZAH: As far as I undestand `fromProc` is just a debugging helper.
## It would be more efficient if it's represented as a simple statically
## known `char *` in the final program (so it needs to be a `cstring` in Nim).
## The public API can be defined as a template expecting a `static[string]`
## and converting this immediately to a `cstring`.
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
## Creates a new future.
##
@ -106,6 +121,7 @@ proc checkFinished[T](future: Future[T]) =
err.cause = future
raise err
# ZAH: I've seen this code in asyncloop
proc call(callbacks: var Deque[AsyncCallback]) =
var count = len(callbacks)
if count > 0:
@ -115,6 +131,7 @@ proc call(callbacks: var Deque[AsyncCallback]) =
dec(count)
proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) =
# ZAH: perhaps this is the default behavior with latest Nim (no need for the `len` check)
if len(callbacks) == 0:
callbacks = initDeque[AsyncCallback]()
callbacks.addLast(item)
@ -122,6 +139,16 @@ proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) =
proc remove(callbacks: var Deque[AsyncCallback], item: AsyncCallback) =
if len(callbacks) > 0:
var count = len(callbacks)
# ZAH: This is not the most efficient way to implement this.
# When you discover an element suitalbe for removal, you can put the last
# element in its place and reduce the length. The problem is that the
# order of callbacks will be changed, which is unfortunate.
#
# Shifting the elements in-place will still be more efficient than the
# current copying due to the CPU cache (because otherwise we may end up
# touching memory that's residing on a different cache line).
#
# I recommend implementing this proper remove logic in the Deque type.
while count > 0:
var p = callbacks.popFirst()
if p.function != item.function or p.udata != item.udata:
@ -176,6 +203,7 @@ proc fail*[T](future: Future[T], error: ref Exception) =
proc clearCallbacks(future: FutureBase) =
if len(future.callbacks) > 0:
# ZAH: This could have been a single call to `setLen`
var count = len(future.callbacks)
while count > 0:
discard future.callbacks.popFirst()
@ -187,6 +215,7 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## If future has already completed then ``cb`` will be called immediately.
assert cb != nil
if future.finished:
# ZAH: it seems that the Future needs to know its associated Dispatcher
callSoon(cb, udata)
else:
let acb = AsyncCallback(function: cb, udata: udata)
@ -214,6 +243,7 @@ proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## If future has already completed then ``cb`` will be called immediately.
##
## It's recommended to use ``addCallback`` or ``then`` instead.
# ZAH: how about `setLen(1); callbacks[0] = cb`
future.clearCallbacks
future.addCallback(cb, udata)
@ -358,6 +388,8 @@ proc asyncCheck*[T](future: Future[T]) =
##
## This should be used instead of ``discard`` to discard void futures.
assert(not future.isNil, "Future is nil")
# ZAH: This should probably add a callback instead of replacing all call-backs.
# Perhaps a new API can be introduced to avoid the breaking change.
future.callback = asyncCheckProxy[T]
# proc (udata: pointer) =
# if future.failed:
@ -365,12 +397,22 @@ proc asyncCheck*[T](future: Future[T]) =
# raise future.error
proc spawn*[T](future: Future[T]) =
# ZAH: What is the purpose of this?
assert(not future.isNil, "Future is nil")
future.callback = spawnProxy[T]
# ZAH: The return type here could be a Future[(T, Y)]
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once both ``fut1`` and ``fut2``
## complete.
# ZAH: The Rust implementation of futures is making the case that the
# `and` combinator can be implemented in a more efficient way without
# resorting to closures and callbacks. I haven't thought this through
# completely yet, but here is their write-up:
# http://aturon.github.io/2016/09/07/futures-design/
#
# We should investigate this further, before settling on the final design.
# The same reasoning applies to `or` and `all`.
var retFuture = newFuture[void]("asyncdispatch.`and`")
proc cb(data: pointer) =
if not retFuture.finished:
@ -402,6 +444,8 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
fut2.callback = cb
return retFuture
# ZAH: The return type here could be a tuple
# This will enable waiting a heterogenous collection of futures.
proc all*[T](futs: varargs[Future[T]]): auto =
## Returns a future which will complete once
## all futures in ``futs`` complete.

View File

@ -250,6 +250,8 @@ when defined(windows) or defined(nimdoc):
## (Unix) for the specified dispatcher.
return disp.ioPort
# ZAH: Shouldn't all of these procs be defined over the Dispatcher type?
# The "global" variants can be defined as templates passing the global dispatcher
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
let p = getGlobalDispatcher()
@ -263,6 +265,7 @@ when defined(windows) or defined(nimdoc):
var curTime = fastEpochTime()
var curTimeout = DWORD(0)
# ZAH: Please extract this code in a template
# Moving expired timers to `loop.callbacks` and calculate timeout
var count = len(loop.timers)
if count > 0:
@ -308,6 +311,8 @@ when defined(windows) or defined(nimdoc):
if int32(errCode) != WAIT_TIMEOUT:
raiseOSError(errCode)
# ZAH: Please extract the code below in a template
# Moving expired timers to `loop.callbacks`.
curTime = fastEpochTime()
count = len(loop.timers)
@ -322,6 +327,8 @@ when defined(windows) or defined(nimdoc):
# poll() call.
count = len(loop.callbacks)
for i in 0..<count:
# ZAH: instead of calling `popFirst` here in a loop, why don't we
# call `setLen(0)` at the end after iterating over all callbacks?
var callable = loop.callbacks.popFirst()
callable.function(callable.udata)
@ -527,6 +534,7 @@ else:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}
# ZAH: Please extract this code in a template
# Moving expired timers to `loop.callbacks` and calculate timeout.
var count = len(loop.timers)
if count > 0:
@ -570,6 +578,8 @@ else:
withData(loop.selector, fd, adata) do:
loop.callbacks.addLast(adata.reader)
# ZAH: Please extract the code below in a template
# Moving expired timers to `loop.callbacks`.
curTime = fastEpochTime()
count = len(loop.timers)
@ -585,6 +595,8 @@ else:
# poll() call.
count = len(loop.callbacks)
for i in 0..<count:
# ZAH: instead of calling `popFirst` here in a loop, why don't we
# call `setLen(0)` at the end after iterating over all callbacks?
var callable = loop.callbacks.popFirst()
callable.function(callable.udata)
@ -597,6 +609,7 @@ proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) =
let loop = getGlobalDispatcher()
var tcb = TimerCallback(finishAt: at,
function: AsyncCallback(function: cb, udata: udata))
# ZAH: This should use a priority queue (e.g. a binary heap)
loop.timers.push(tcb)
proc removeTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) =

View File

@ -443,6 +443,7 @@ else:
result.resumeRead()
proc close*(transp: DatagramTransport) =
## ZAH: This could use a destructor as well
## Closes and frees resources of transport ``transp``.
if ReadClosed notin transp.state and WriteClosed notin transp.state:
closeAsyncSocket(transp.fd)

View File

@ -39,6 +39,12 @@ type
fd*: AsyncFD # File descriptor
state: set[TransportState] # Current Transport state
reader: Future[void] # Current reader Future
# ZAH: I'm not quite certain, but it seems to me that the intermediate
# buffer is not necessary. The receiving code needs to know how to grow
# the output buffer of the future attached to the read operation. If this
# is the case, the buffering can be replaced with direct writing to this
# output buffer. Furthermore, we'll be able to signal additional 'progress'
# events for the future to make the API more complete.
buffer: seq[byte] # Reading buffer
offset: int # Reading buffer offset
error: ref Exception # Current error
@ -110,6 +116,7 @@ template checkPending(t: untyped) =
raise newException(TransportError, "Read operation already pending!")
template shiftBuffer(t, c: untyped) =
# ZAH: Nim is not C, you don't need to put () around template parameters
if (t).offset > c:
moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c))
(t).offset = (t).offset - (c)
@ -341,6 +348,10 @@ when defined(windows):
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = newFuture[void]("stream.socket.transport")
# ZAH: If these objects are going to be manually managed, why do we bother
# with using the GC at all? It's better to rely on a destructor. If someone
# wants to share a Transport reference, they can still create a GC-managed
# wrapping object.
GC_ref(transp)
result = cast[StreamTransport](transp)
@ -1060,6 +1071,7 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
while true:
if (ReadError in transp.state):
raise transp.getError()
# ZAH: Shouldn't this be {ReadEof, ReadClosed} * transp.state != {}
if (ReadEof in transp.state) or (ReadClosed in transp.state):
break