Fix callSoon to raise Defect only. (#145)

* Initial commit.

* Move pragmas to forward declaration.
Add raises to callSoon declaration.

* Fix for IOSelectorsException.
This commit is contained in:
Eugene Kabanov 2021-01-11 19:15:23 +02:00 committed by GitHub
parent 46c0bf3c5a
commit 0933feaa35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 53 deletions

View File

@ -206,7 +206,8 @@ type
proc `<`(a, b: TimerCallback): bool =
result = a.finishAt < b.finishAt
proc callSoon*(cbproc: CallbackFunc, data: pointer = nil) {.gcsafe.}
proc callSoon*(cbproc: CallbackFunc, data: pointer = nil) {.
gcsafe, raises: [Defect].}
func getAsyncTimestamp*(a: Duration): auto {.inline.} =
## Return rounded up value of duration with milliseconds resolution.
@ -382,17 +383,12 @@ when defined(windows) or defined(nimdoc):
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc setGlobalDispatcher*(disp: PDispatcher) =
## Set current thread's dispatcher instance to ``disp``.
if not gDisp.isNil:
doAssert gDisp.callbacks.len == 0
gDisp = disp
proc getGlobalDispatcher*(): PDispatcher =
## Returns current thread's dispatcher instance.
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].}
proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].}
proc setGlobalDispatcher*(disp: PDispatcher) {.
gcsafe, deprecated: "Use setThreadDispatcher() instead".}
proc getGlobalDispatcher*(): PDispatcher {.
gcsafe, deprecated: "Use getThreadDispatcher() instead".}
proc getIoHandler*(disp: PDispatcher): Handle =
## Returns the underlying IO Completion Port handle (Windows) or selector
@ -401,7 +397,7 @@ when defined(windows) or defined(nimdoc):
proc register*(fd: AsyncFD) =
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
if createIoCompletionPort(fd.Handle, loop.ioPort,
cast[CompletionKey](fd), 1) == 0:
raiseOSError(osLastError())
@ -409,7 +405,7 @@ when defined(windows) or defined(nimdoc):
proc poll*() =
## Perform single asynchronous step.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var curTime = Moment.now()
var curTimeout = DWORD(0)
@ -452,7 +448,7 @@ when defined(windows) or defined(nimdoc):
proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Closes a socket and ensures that it is unregistered.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
loop.handles.excl(fd)
close(SocketHandle(fd))
if not isNil(aftercb):
@ -461,7 +457,7 @@ when defined(windows) or defined(nimdoc):
proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Closes a (pipe/file) handle and ensures that it is unregistered.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
loop.handles.excl(fd)
discard closeHandle(Handle(fd))
if not isNil(aftercb):
@ -470,7 +466,7 @@ when defined(windows) or defined(nimdoc):
proc unregister*(fd: AsyncFD) =
## Unregisters ``fd``.
getGlobalDispatcher().handles.excl(fd)
getThreadDispatcher().handles.excl(fd)
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
@ -525,17 +521,12 @@ elif unixPlatform:
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc setGlobalDispatcher*(disp: PDispatcher) =
## Set current thread's dispatcher instance to ``disp``.
if not gDisp.isNil:
doAssert gDisp.callbacks.len == 0
gDisp = disp
proc getGlobalDispatcher*(): PDispatcher =
## Returns current thread's dispatcher instance.
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].}
proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].}
proc setGlobalDispatcher*(disp: PDispatcher) {.
gcsafe, deprecated: "Use setThreadDispatcher() instead".}
proc getGlobalDispatcher*(): PDispatcher {.
gcsafe, deprecated: "Use getThreadDispatcher() instead".}
proc getIoHandler*(disp: PDispatcher): Selector[SelectorData] =
## Returns system specific OS queue.
@ -543,7 +534,7 @@ elif unixPlatform:
proc register*(fd: AsyncFD) =
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var data: SelectorData
data.rdata.fd = fd
data.wdata.fd = fd
@ -551,7 +542,7 @@ elif unixPlatform:
proc unregister*(fd: AsyncFD) =
## Unregister file descriptor ``fd`` from thread's dispatcher.
getGlobalDispatcher().selector.unregister(int(fd))
getThreadDispatcher().selector.unregister(int(fd))
proc contains*(disp: PDispatcher, fd: AsyncFd): bool {.inline.} =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
@ -560,7 +551,7 @@ elif unixPlatform:
proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) =
## Start watching the file descriptor ``fd`` for read availability and then
## call the callback ``cb`` with specified argument ``udata``.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var newEvents = {Event.Read}
withData(loop.selector, int(fd), adata) do:
let acb = AsyncCallback(function: cb, udata: addr adata.rdata)
@ -575,7 +566,7 @@ elif unixPlatform:
proc removeReader*(fd: AsyncFD) =
## Stop watching the file descriptor ``fd`` for read availability.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var newEvents: set[Event]
withData(loop.selector, int(fd), adata) do:
# We need to clear `reader` data, because `selectors` don't do it
@ -590,7 +581,7 @@ elif unixPlatform:
proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) =
## Start watching the file descriptor ``fd`` for write availability and then
## call the callback ``cb`` with specified argument ``udata``.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var newEvents = {Event.Write}
withData(loop.selector, int(fd), adata) do:
let acb = AsyncCallback(function: cb, udata: addr adata.wdata)
@ -605,7 +596,7 @@ elif unixPlatform:
proc removeWriter*(fd: AsyncFD) =
## Stop watching the file descriptor ``fd`` for write availability.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var newEvents: set[Event]
withData(loop.selector, int(fd), adata) do:
# We need to clear `writer` data, because `selectors` don't do it
@ -624,7 +615,7 @@ elif unixPlatform:
## closing socket, while operation pending, socket will be closed as
## soon as all pending operations will be notified.
## You can execute ``aftercb`` before actual socket close operation.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
proc continuation(udata: pointer) =
if SocketHandle(fd) in loop.selector:
@ -669,7 +660,7 @@ elif unixPlatform:
## callback ``cb`` with specified argument ``udata``. Returns signal
## identifier code, which can be used to remove signal callback
## via ``removeSignal``.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var data: SelectorData
result = loop.selector.registerSignal(signal, data)
withData(loop.selector, result, adata) do:
@ -681,12 +672,12 @@ elif unixPlatform:
proc removeSignal*(sigfd: int) =
## Remove watching signal ``signal``.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
loop.selector.unregister(sigfd)
proc poll*() =
## Perform single asynchronous step.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var curTime = Moment.now()
var curTimeout = 0
@ -732,11 +723,48 @@ else:
proc initAPI() = discard
proc globalInit() = discard
proc setThreadDispatcher*(disp: PDispatcher) =
## Set current thread's dispatcher instance to ``disp``.
if not gDisp.isNil:
doAssert gDisp.callbacks.len == 0
gDisp = disp
proc getThreadDispatcher*(): PDispatcher =
## Returns current thread's dispatcher instance.
template getErrorMessage(exc): string =
"Cannot create thread dispatcher: " & exc.msg
if gDisp.isNil:
when defined(windows):
let disp =
try:
newDispatcher()
except CatchableError as exc:
raise newException(Defect, getErrorMessage(exc))
else:
let disp =
try:
newDispatcher()
except IOSelectorsException as exc:
raise newException(Defect, getErrorMessage(exc))
except CatchableError as exc:
raise newException(Defect, getErrorMessage(exc))
setThreadDispatcher(disp)
return gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
## Set current thread's dispatcher instance to ``disp``.
setThreadDispatcher(disp)
proc getGlobalDispatcher*(): PDispatcher =
## Returns current thread's dispatcher instance.
getThreadDispatcher()
proc setTimer*(at: Moment, cb: CallbackFunc,
udata: pointer = nil): TimerCallback =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
result = TimerCallback(finishAt: at,
function: AsyncCallback(function: cb, udata: udata))
loop.timers.push(result)
@ -761,7 +789,7 @@ proc addTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) {.
proc removeTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) =
## Remove timer callback ``cb`` with absolute timestamp ``at`` from waiting
## queue.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var list = cast[seq[TimerCallback]](loop.timers)
var index = -1
for i in 0..<len(list):
@ -973,7 +1001,7 @@ proc callSoon*(cbproc: CallbackFunc, data: pointer = nil) =
## The callback is called when control returns to the event loop.
doAssert(not isNil(cbproc))
let acb = AsyncCallback(function: cbproc, udata: data)
getGlobalDispatcher().callbacks.addLast(acb)
getThreadDispatcher().callbacks.addLast(acb)
proc runForever*() =
## Begins a never ending global dispatcher poll loop.
@ -990,12 +1018,12 @@ proc waitFor*[T](fut: Future[T]): T =
proc addTracker*[T](id: string, tracker: T) =
## Add new ``tracker`` object to current thread dispatcher with identifier
## ``id``.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
loop.trackers[id] = tracker
proc getTracker*(id: string): TrackerBase =
## Get ``tracker`` from current thread dispatcher using identifier ``id``.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
result = loop.trackers.getOrDefault(id, nil)
when defined(chronosFutureTracking):

View File

@ -70,8 +70,8 @@ proc newAsyncLock*(): AsyncLock =
## immediately.
# Workaround for callSoon() not worked correctly before
# getGlobalDispatcher() call.
discard getGlobalDispatcher()
# getThreadDispatcher() call.
discard getThreadDispatcher()
AsyncLock(waiters: newSeq[Future[void]](), locked: false, acquired: false)
proc wakeUpFirst(lock: AsyncLock): bool {.inline.} =
@ -143,8 +143,8 @@ proc newAsyncEvent*(): AsyncEvent =
## initially `false`.
# Workaround for callSoon() not worked correctly before
# getGlobalDispatcher() call.
discard getGlobalDispatcher()
# getThreadDispatcher() call.
discard getThreadDispatcher()
AsyncEvent(waiters: newSeq[Future[void]](), flag: false)
proc wait*(event: AsyncEvent): Future[void] =
@ -184,8 +184,8 @@ proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] =
## Creates a new asynchronous queue ``AsyncQueue``.
# Workaround for callSoon() not worked correctly before
# getGlobalDispatcher() call.
discard getGlobalDispatcher()
# getThreadDispatcher() call.
discard getThreadDispatcher()
AsyncQueue[T](
getters: newSeq[Future[void]](),
putters: newSeq[Future[void]](),

View File

@ -432,7 +432,7 @@ when defined(windows):
else:
transp.queue.addFirst(vector)
else:
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var size: int32
var flags: int32
@ -688,7 +688,7 @@ when defined(windows):
## Open new connection to remote peer with address ``address`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport.
let loop = getGlobalDispatcher()
let loop = getThreadDispatcher()
var retFuture = newFuture[StreamTransport]("stream.transport.connect")
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
@ -892,7 +892,7 @@ when defined(windows):
proc acceptLoop(udata: pointer) {.gcsafe, nimcall.} =
var ovl = cast[PtrCustomOverlapped](udata)
var server = cast[StreamServer](ovl.data.udata)
var loop = getGlobalDispatcher()
var loop = getThreadDispatcher()
while true:
if server.apending:
@ -1091,7 +1091,7 @@ when defined(windows):
if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
# TCP Sockets part
var loop = getGlobalDispatcher()
var loop = getThreadDispatcher()
server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
if server.asock == asyncInvalidSocket: