IOSelectors refactoring to properly support signals and processes. (AsyncProc 2) (#366)

* ioselectors_epoll() refactoring.

* ioselectors_kqueue() refactoring.

* ioselectors_poll() initial refactor.

* Remove `s.count` because it inconsistent and not used in `chronos`.

* Remove Windows version of select() engine.

* Add ability to switch event queue engine via `asyncEventEngine` command line option.

* Make it possible to switch between engines.

* Fix epoll regression.

* Fix poll() engine issues.

* Address review comments.

* Add proper trick.

* Address review comments.

* Bump version to 3.1.0.
This commit is contained in:
Eugene Kabanov 2023-03-24 17:52:55 +02:00 committed by GitHub
parent b0af576c7c
commit 1394c9e049
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1806 additions and 1911 deletions

View File

@ -1,7 +1,7 @@
mode = ScriptMode.Verbose
packageName = "chronos"
version = "3.0.11"
version = "3.1.0"
author = "Status Research & Development GmbH"
description = "Networking framework with async/await support"
license = "MIT or Apache License 2.0"

View File

@ -567,24 +567,24 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
# We are ignoring SIGPIPE signal, because we are working with EPIPE.
posix.signal(cint(SIGPIPE), SIG_IGN)
proc initAPI(disp: PDispatcher) {.raises: [Defect].} =
proc initAPI(disp: PDispatcher) =
discard
proc newDispatcher*(): PDispatcher {.raises: [Defect].} =
proc newDispatcher*(): PDispatcher =
## Create new dispatcher.
let selector =
try:
newSelector[SelectorData]()
except IOSelectorsException as exc:
raiseAsDefect exc, "Could not initialize selector"
except CatchableError as exc:
raiseAsDefect exc, "Could not initialize selector"
block:
let res = Selector.new(SelectorData)
if res.isErr(): raiseOsDefect(res.error(),
"Could not initialize selector")
res.get()
var res = PDispatcher(
selector: selector,
timers: initHeapQueue[TimerCallback](),
callbacks: initDeque[AsyncCallback](64),
callbacks: initDeque[AsyncCallback](asyncEventsCount),
idlers: initDeque[AsyncCallback](),
keys: newSeq[ReadyKey](64),
keys: newSeq[ReadyKey](asyncEventsCount),
trackers: initTable[string, TrackerBase]()
)
res.callbacks.addLast(SentinelCallback)
@ -600,28 +600,18 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
## Returns system specific OS queue.
disp.selector
proc contains*(disp: PDispatcher, fd: AsyncFD): bool {.inline.} =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
cint(fd) in disp.selector
proc register2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getThreadDispatcher()
try:
var data: SelectorData
loop.selector.registerHandle(int(fd), {}, data)
except CatchableError:
return err(osLastError())
ok()
var data: SelectorData
getThreadDispatcher().selector.registerHandle2(cint(fd), {}, data)
proc unregister2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Unregister file descriptor ``fd`` from thread's dispatcher.
let loop = getThreadDispatcher()
try:
loop.selector.unregister(int(fd))
except CatchableError:
return err(osLastError())
ok()
proc contains*(disp: PDispatcher, fd: AsyncFD): bool {.inline.} =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
int(fd) in disp.selector
getThreadDispatcher().selector.unregister2(cint(fd))
proc addReader2*(fd: AsyncFD, cb: CallbackFunc,
udata: pointer = nil): Result[void, OSErrorCode] =
@ -629,37 +619,27 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
## call the callback ``cb`` with specified argument ``udata``.
let loop = getThreadDispatcher()
var newEvents = {Event.Read}
withData(loop.selector, int(fd), adata) do:
withData(loop.selector, cint(fd), adata) do:
let acb = AsyncCallback(function: cb, udata: udata)
adata.reader = acb
if not(isNil(adata.writer.function)):
newEvents.incl(Event.Write)
do:
return err(OSErrorCode(osdefs.EBADF))
try:
loop.selector.updateHandle(int(fd), newEvents)
except CatchableError:
return err(osLastError())
ok()
loop.selector.updateHandle2(cint(fd), newEvents)
proc removeReader2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Stop watching the file descriptor ``fd`` for read availability.
let loop = getThreadDispatcher()
var newEvents: set[Event]
withData(loop.selector, int(fd), adata) do:
withData(loop.selector, cint(fd), adata) do:
# We need to clear `reader` data, because `selectors` don't do it
adata.reader = default(AsyncCallback)
if not(isNil(adata.writer.function)):
newEvents.incl(Event.Write)
do:
return err(OSErrorCode(osdefs.EBADF))
try:
loop.selector.updateHandle(int(fd), newEvents)
except CatchableError:
return err(osLastError())
ok()
loop.selector.updateHandle2(cint(fd), newEvents)
proc addWriter2*(fd: AsyncFD, cb: CallbackFunc,
udata: pointer = nil): Result[void, OSErrorCode] =
@ -667,37 +647,27 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
## call the callback ``cb`` with specified argument ``udata``.
let loop = getThreadDispatcher()
var newEvents = {Event.Write}
withData(loop.selector, int(fd), adata) do:
withData(loop.selector, cint(fd), adata) do:
let acb = AsyncCallback(function: cb, udata: udata)
adata.writer = acb
if not(isNil(adata.reader.function)):
newEvents.incl(Event.Read)
do:
return err(OSErrorCode(osdefs.EBADF))
try:
loop.selector.updateHandle(int(fd), newEvents)
except CatchableError:
return err(osLastError())
ok()
loop.selector.updateHandle2(cint(fd), newEvents)
proc removeWriter2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Stop watching the file descriptor ``fd`` for write availability.
let loop = getThreadDispatcher()
var newEvents: set[Event]
withData(loop.selector, int(fd), adata) do:
withData(loop.selector, cint(fd), adata) do:
# We need to clear `writer` data, because `selectors` don't do it
adata.writer = default(AsyncCallback)
if not(isNil(adata.reader.function)):
newEvents.incl(Event.Read)
do:
return err(OSErrorCode(osdefs.EBADF))
try:
loop.selector.updateHandle(int(fd), newEvents)
except CatchableError:
return err(osLastError())
ok()
loop.selector.updateHandle2(cint(fd), newEvents)
proc register*(fd: AsyncFD) {.raises: [Defect, OSError].} =
## Register file descriptor ``fd`` in thread's dispatcher.
@ -766,7 +736,7 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
if not isNil(aftercb):
aftercb(cast[pointer](param))
withData(loop.selector, int(fd), adata) do:
withData(loop.selector, cint(fd), adata) do:
# We are scheduling reader and writer callbacks to be called
# explicitly, so they can get an error and continue work.
# Callbacks marked as deleted so we don't need to get REAL notifications
@ -795,27 +765,59 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
## You can execute ``aftercb`` before actual socket close operation.
closeSocket(fd, aftercb)
when ioselSupportedPlatform:
proc addSignal*(signal: int, cb: CallbackFunc,
udata: pointer = nil): int {.
raises: [Defect, IOSelectorsException, ValueError, OSError].} =
when asyncEventEngine in ["epoll", "kqueue"]:
proc addSignal2*(signal: int, cb: CallbackFunc,
udata: pointer = nil): Result[int, OSErrorCode] =
## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb`` with specified argument ``udata``. Returns signal
## identifier code, which can be used to remove signal callback
## via ``removeSignal``.
let loop = getThreadDispatcher()
var data: SelectorData
result = loop.selector.registerSignal(signal, data)
withData(loop.selector, result, adata) do:
let sigfd = ? loop.selector.registerSignal(signal, data)
withData(loop.selector, sigfd, adata) do:
adata.reader = AsyncCallback(function: cb, udata: udata)
do:
raise newException(ValueError, "File descriptor not registered.")
return err(OSErrorCode(osdefs.EBADF))
ok(sigfd)
proc removeSignal*(sigfd: int) {.
raises: [Defect, IOSelectorsException].} =
## Remove watching signal ``signal``.
proc addProcess2*(pid: int, cb: CallbackFunc,
udata: pointer = nil): Result[int, OSErrorCode] =
## Registers callback ``cb`` to be called when process with process
## identifier ``pid`` exited. Returns process' descriptor, which can be
## used to clear process callback via ``removeProcess``.
let loop = getThreadDispatcher()
loop.selector.unregister(sigfd)
var data: SelectorData
let procfd = ? loop.selector.registerProcess(pid, data)
withData(loop.selector, procfd, adata) do:
adata.reader = AsyncCallback(function: cb, udata: udata)
do:
return err(OSErrorCode(osdefs.EBADF))
ok(procfd)
proc removeSignal2*(sigfd: int): Result[void, OSErrorCode] =
## Remove watching signal ``signal``.
getThreadDispatcher().selector.unregister2(cint(sigfd))
proc removeProcess2*(procfd: int): Result[void, OSErrorCode] =
## Remove process' watching using process' descriptor ``procfd``.
getThreadDispatcher().selector.unregister2(cint(procfd))
proc addSignal*(signal: int, cb: CallbackFunc,
udata: pointer = nil): int {.raises: [Defect, OSError].} =
## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb`` with specified argument ``udata``. Returns signal
## identifier code, which can be used to remove signal callback
## via ``removeSignal``.
addSignal2(signal, cb, udata).tryGet()
proc removeSignal*(sigfd: int) {.raises: [Defect, OSError].} =
## Remove watching signal ``signal``.
removeSignal2(sigfd).tryGet()
proc removeProcess*(procfd: int) {.raises: [Defect, OSError].} =
## Remove process' watching using process' descriptor ``procfd``.
removeProcess2(procfd).tryGet()
proc poll*() {.gcsafe.} =
## Perform single asynchronous step.
@ -823,10 +825,6 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
var curTime = Moment.now()
var curTimeout = 0
when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}
# On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`,
# complete pending work of the outer `processCallbacks` call.
# On non-reentrant `poll` calls, this only removes sentinel element.
@ -837,15 +835,17 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
# Processing IO descriptors and all hardware events.
let count =
try:
loop.selector.selectInto(curTimeout, loop.keys)
except IOSelectorsException:
raiseOsDefect(osLastError(), "poll(): Unable to get OS events")
for i in 0..<count:
block:
let res = loop.selector.selectInto2(curTimeout, loop.keys)
if res.isErr():
raiseOsDefect(res.error(), "poll(): Unable to get OS events")
res.get()
for i in 0 ..< count:
let fd = loop.keys[i].fd
let events = loop.keys[i].events
withData(loop.selector, fd, adata) do:
withData(loop.selector, cint(fd), adata) do:
if (Event.Read in events) or (events == {Event.Error}):
if not isNil(adata.reader.function):
loop.callbacks.addLast(adata.reader)
@ -858,7 +858,9 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
if not isNil(adata.reader.function):
loop.callbacks.addLast(adata.reader)
when ioselSupportedPlatform:
when asyncEventEngine in ["epoll", "kqueue"]:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}
if customSet * events != {}:
if not isNil(adata.reader.function):
loop.callbacks.addLast(adata.reader)
@ -885,17 +887,14 @@ else:
proc setThreadDispatcher*(disp: PDispatcher) =
## Set current thread's dispatcher instance to ``disp``.
if not gDisp.isNil:
if not(gDisp.isNil()):
doAssert gDisp.callbacks.len == 0
gDisp = disp
proc getThreadDispatcher*(): PDispatcher =
## Returns current thread's dispatcher instance.
if gDisp.isNil:
try:
setThreadDispatcher(newDispatcher())
except CatchableError as exc:
raiseAsDefect exc, "Cannot create dispatcher"
if gDisp.isNil():
setThreadDispatcher(newDispatcher())
gDisp
proc setGlobalDispatcher*(disp: PDispatcher) {.
@ -994,45 +993,37 @@ proc callIdle*(cbproc: CallbackFunc) =
include asyncfutures2
when not(defined(windows)):
when ioselSupportedPlatform:
proc waitSignal*(signal: int): Future[void] {.
raises: [Defect].} =
when asyncEventEngine in ["epoll", "kqueue"]:
proc waitSignal*(signal: int): Future[void] {.raises: [Defect].} =
var retFuture = newFuture[void]("chronos.waitSignal()")
var sigfd: int = -1
template getSignalException(e: untyped): untyped =
template getSignalException(e: OSErrorCode): untyped =
newException(AsyncError, "Could not manipulate signal handler, " &
"reason [" & $e.name & "]: " & $e.msg)
"reason [" & $int(e) & "]: " & osErrorMsg(e))
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if sigfd != -1:
try:
removeSignal(sigfd)
let res = removeSignal2(sigfd)
if res.isErr():
retFuture.fail(getSignalException(res.error()))
else:
retFuture.complete()
except IOSelectorsException as exc:
retFuture.fail(getSignalException(exc))
proc cancellation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if sigfd != -1:
try:
removeSignal(sigfd)
except IOSelectorsException as exc:
retFuture.fail(getSignalException(exc))
let res = removeSignal2(sigfd)
if res.isErr():
retFuture.fail(getSignalException(res.error()))
sigfd =
try:
addSignal(signal, continuation)
except IOSelectorsException as exc:
retFuture.fail(getSignalException(exc))
return retFuture
except ValueError as exc:
retFuture.fail(getSignalException(exc))
return retFuture
except OSError as exc:
retFuture.fail(getSignalException(exc))
return retFuture
block:
let res = addSignal2(signal, continuation)
if res.isErr():
retFuture.fail(getSignalException(res.error()))
res.get()
retFuture.cancelCallback = cancellation
retFuture

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -8,31 +8,19 @@
#
# This module implements Posix poll().
import std/tables
import stew/base10
import posix, times
# Maximum number of events that can be returned
const MAX_POLL_EVENTS = 64
when hasThreadSupport:
type
SelectorImpl[T] = object
maxFD : int
pollcnt: int
fds: ptr SharedArray[SelectorKey[T]]
pollfds: ptr SharedArray[TPollFd]
count: int
lock: Lock
Selector*[T] = ptr SelectorImpl[T]
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
type
SelectorImpl[T] = object
maxFD : int
pollcnt: int
fds: seq[SelectorKey[T]]
pollfds: seq[TPollFd]
count: int
Selector*[T] = ref SelectorImpl[T]
{.push raises: [].}
type
SelectorImpl[T] = object
fds: Table[int32, SelectorKey[T]]
pollfds: seq[TPollFd]
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object
@ -40,271 +28,316 @@ type
wfd: cint
SelectEvent* = ptr SelectEventImpl
when hasThreadSupport:
template withPollLock[T](s: Selector[T], body: untyped) =
acquire(s.lock)
{.locks: [s.lock].}:
try:
body
finally:
release(s.lock)
else:
template withPollLock(s, body: untyped) =
body
proc toString(key: int32): string =
Base10.toString(uint32(key))
proc newSelector*[T](): Selector[T] =
var a = RLimit()
if getrlimit(posix.RLIMIT_NOFILE, a) != 0:
raiseIOSelectorsError(osLastError())
var maxFD = int(a.rlim_max)
template addKey[T](s: Selector[T], key: int32, skey: SelectorKey[T]) =
if s.fds.hasKeyOrPut(key, skey):
raiseAssert "Descriptor [" & key.toString() &
"] is already registered in the selector!"
when hasThreadSupport:
result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
result.maxFD = maxFD
result.fds = allocSharedArray[SelectorKey[T]](maxFD)
result.pollfds = allocSharedArray[TPollFd](maxFD)
initLock(result.lock)
template getKey[T](s: Selector[T], key: int32): SelectorKey[T] =
let
defaultKey = SelectorKey[T](ident: InvalidIdent)
pkey = s.fds.getOrDefault(key, defaultKey)
doAssert(pkey.ident != InvalidIdent,
"Descriptor [" & key.toString() &
"] is not registered in the selector!")
pkey
template checkKey[T](s: Selector[T], key: int32): bool =
s.fds.contains(key)
proc freeKey[T](s: Selector[T], key: int32) =
s.fds.del(key)
proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] =
let selector = Selector[T](
fds: initTable[int32, SelectorKey[T]](asyncInitialSize)
)
ok(selector)
proc close2*[T](s: Selector[T]): SelectResult[void] =
s.fds.clear()
s.pollfds.clear()
proc new*(t: typedesc[SelectEvent]): SelectResult[SelectEvent] =
let flags = {DescriptorFlag.NonBlock, DescriptorFlag.CloseOnExec}
let pipes = ? createOsPipe(flags, flags)
var res = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
res.rfd = pipes.read
res.wfd = pipes.write
ok(res)
proc trigger2*(event: SelectEvent): SelectResult[void] =
var data: uint64 = 1
let res = handleEintr(osdefs.write(event.wfd, addr data, sizeof(uint64)))
if res == -1:
err(osLastError())
elif res != sizeof(uint64):
err(OSErrorCode(osdefs.EINVAL))
else:
result = Selector[T]()
result.maxFD = maxFD
result.fds = newSeq[SelectorKey[T]](maxFD)
result.pollfds = newSeq[TPollFd](maxFD)
ok()
for i in 0 ..< maxFD:
result.fds[i].ident = InvalidIdent
proc close2*(event: SelectEvent): SelectResult[void] =
let
rfd = event.rfd
wfd = event.wfd
deallocShared(cast[pointer](event))
let rres = handleEintr(osdefs.close(rfd))
if rres == -1:
discard osdefs.close(wfd)
return err(osLastError())
let wres = handleEintr(osdefs.close(wfd))
if wres == -1:
err(osLastError())
else:
ok()
proc close*[T](s: Selector[T]) =
when hasThreadSupport:
deinitLock(s.lock)
deallocSharedArray(s.fds)
deallocSharedArray(s.pollfds)
deallocShared(cast[pointer](s))
template toPollEvents(events: set[Event]): cshort =
var res = cshort(0)
if Event.Read in events: res = res or POLLIN
if Event.Write in events: res = res or POLLOUT
res
template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) =
withPollLock(s):
var pollev: cshort = 0
if Event.Read in events: pollev = pollev or POLLIN
if Event.Write in events: pollev = pollev or POLLOUT
s.pollfds[s.pollcnt].fd = cint(sock)
s.pollfds[s.pollcnt].events = pollev
inc(s.count)
inc(s.pollcnt)
s.pollfds.add(TPollFd(fd: sock, events: toPollEvents(events), revents: 0))
template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) =
withPollLock(s):
var i = 0
var pollev: cshort = 0
if Event.Read in events: pollev = pollev or POLLIN
if Event.Write in events: pollev = pollev or POLLOUT
while i < s.pollcnt:
if s.pollfds[i].fd == sock:
s.pollfds[i].events = pollev
break
inc(i)
doAssert(i < s.pollcnt,
"Descriptor [" & $sock & "] is not registered in the queue!")
var updated = false
for mitem in s.pollfds.mitems():
if mitem.fd == sock:
mitem.events = toPollEvents(events)
break
if not(updated):
raiseAssert "Descriptor [" & $sock & "] is not registered in the queue!"
template pollRemove[T](s: Selector[T], sock: cint) =
withPollLock(s):
var i = 0
while i < s.pollcnt:
if s.pollfds[i].fd == sock:
if i == s.pollcnt - 1:
s.pollfds[i].fd = 0
s.pollfds[i].events = 0
s.pollfds[i].revents = 0
else:
while i < (s.pollcnt - 1):
s.pollfds[i].fd = s.pollfds[i + 1].fd
s.pollfds[i].events = s.pollfds[i + 1].events
inc(i)
break
inc(i)
dec(s.pollcnt)
dec(s.count)
let index =
block:
var res = -1
for key, item in s.pollfds.pairs():
if item.fd == sock:
res = key
break
res
if index < 0:
raiseAssert "Descriptor [" & $sock & "] is not registered in the queue!"
else:
s.pollfds.del(index)
template checkFd(s, f) =
if f >= s.maxFD:
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
proc registerHandle2*[T](s: Selector[T], fd: cint, events: set[Event],
data: T): SelectResult[void] =
let skey = SelectorKey[T](ident: fd, events: events, param: 0, data: data)
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
var fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == InvalidIdent)
setKey(s, fdi, events, 0, data)
if events != {}: s.pollAdd(fdi.cint, events)
s.addKey(fd, skey)
if events != {}:
s.pollAdd(fd, events)
ok()
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != InvalidIdent,
"Descriptor [" & $fdi & "] is not registered in the queue!")
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
if pkey.events == {}:
s.pollAdd(fd.cint, events)
else:
if events != {}:
s.pollUpdate(fd.cint, events)
proc updateHandle2*[T](s: Selector[T], fd: cint,
events: set[Event]): SelectResult[void] =
const EventsMask = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
s.fds.withValue(int32(fd), pkey):
doAssert(pkey[].events * EventsMask == {},
"Descriptor [" & fd.toString() & "] could not be updated!")
if pkey[].events != events:
if pkey[].events == {}:
s.pollAdd(fd, events)
else:
s.pollRemove(fd.cint)
pkey.events = events
if events != {}:
s.pollUpdate(fd, events)
else:
s.pollRemove(fd)
pkey.events = events
do:
raiseAssert "Descriptor [" & fd.toString() &
"] is not registered in the selector!"
ok()
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
var fdi = int(ev.rfd)
doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
var events = {Event.User}
setKey(s, fdi, events, 0, data)
events.incl(Event.Read)
s.pollAdd(fdi.cint, events)
proc registerEvent2*[T](s: Selector[T], ev: SelectEvent,
data: T): SelectResult[cint] =
doAssert(not(isNil(ev)))
let
key = SelectorKey[T](ident: ev.rfd, events: {Event.User},
param: 0, data: data)
proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != InvalidIdent,
"Descriptor [" & $fdi & "] is not registered in the queue!")
pkey.ident = InvalidIdent
s.addKey(ev.rfd, key)
s.pollAdd(ev.rfd, {Event.Read}.toPollEvents())
ok(ev.rfd)
proc unregister2*[T](s: Selector[T], fd: cint): SelectResult[void] =
let pkey = s.getKey(fd)
if pkey.events != {}:
pkey.events = {}
s.pollRemove(fdi.cint)
if {Event.Read, Event.Write, Event.User} * pkey.events != {}:
s.pollRemove(fd)
s.freeKey(fd)
ok()
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
doAssert(Event.User in pkey.events)
pkey.ident = InvalidIdent
pkey.events = {}
s.pollRemove(fdi.cint)
proc unregister2*[T](s: Selector[T], event: SelectEvent): SelectResult[void] =
s.unregister2(event.rfd)
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) != 0:
raiseIOSelectorsError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rfd = fds[0]
result.wfd = fds[1]
proc prepareKey[T](s: Selector[T], event: var TPollfd): Opt[ReadyKey] =
let
defaultKey = SelectorKey[T](ident: InvalidIdent)
fdi32 = int32(event.fd)
revents = event.revents
proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())
var
pkey = s.getKey(fdi32)
rkey = ReadyKey(fd: event.fd)
proc close*(ev: SelectEvent) =
let res1 = posix.close(ev.rfd)
let res2 = posix.close(ev.wfd)
deallocShared(cast[pointer](ev))
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())
# Cleanup all the received events.
event.revents = 0
if (revents and POLLIN) != 0:
if Event.User in pkey.events:
var data: uint64 = 0
let res = handleEintr(osdefs.read(event.fd, addr data, sizeof(uint64)))
if res != sizeof(uint64):
let errorCode = osLastError()
if errorCode == EAGAIN:
return Opt.none(ReadyKey)
else:
rkey.events.incl({Event.User, Event.Error})
rkey.errorCode = errorCode
else:
rkey.events.incl(Event.User)
else:
rkey.events.incl(Event.Read)
if (revents and POLLOUT) != 0:
rkey.events.incl(Event.Write)
if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or
(revents and POLLNVAL) != 0:
rkey.events.incl(Event.Error)
ok(rkey)
proc selectInto2*[T](s: Selector[T], timeout: int,
readyKeys: var openArray[ReadyKey]): SelectResult[int] =
var k = 0
verifySelectParams(timeout, -1, int(high(cint)))
let
maxEventsCount = min(len(s.pollfds), len(readyKeys))
eventsCount =
if maxEventsCount > 0:
let res = handleEintr(poll(addr(s.pollfds[0]), Tnfds(maxEventsCount),
timeout))
if res < 0:
return err(osLastError())
res
else:
0
for i in 0 ..< len(s.pollfds):
if s.pollfds[i].revents != 0:
let rkey = s.prepareKey(s.pollfds[i]).valueOr: continue
readyKeys[k] = rkey
inc(k)
if k == eventsCount: break
ok(k)
proc select2*[T](s: Selector[T], timeout: int): SelectResult[seq[ReadyKey]] =
var res = newSeq[ReadyKey](asyncEventsCount)
let count = ? selectInto2(s, timeout, res)
res.setLen(count)
ok(res)
proc newSelector*[T](): Selector[T] {.
raises: [Defect, OSError].} =
let res = Selector.new(T)
if res.isErr(): raiseOSError(res.error)
res.get()
proc close*[T](s: Selector[T]) {.
raises: [Defect, IOSelectorsException].} =
let res = s.close2()
if res.isErr(): raiseIOSelectorsError(res.error())
proc newSelectEvent*(): SelectEvent {.
raises: [Defect, IOSelectorsException].} =
let res = SelectEvent.new()
if res.isErr(): raiseIOSelectorsError(res.error())
res.get()
proc trigger*(event: SelectEvent) {.
raises: [Defect, IOSelectorsException].} =
let res = event.trigger2()
if res.isErr(): raiseIOSelectorsError(res.error())
proc close*(event: SelectEvent) {.
raises: [Defect, IOSelectorsException].} =
let res = event.close2()
if res.isErr(): raiseIOSelectorsError(res.error())
proc registerHandle*[T](s: Selector[T], fd: cint | SocketHandle,
events: set[Event], data: T) {.
raises: [Defect, IOSelectorsException].} =
let res = registerHandle2(s, cint(fd), events, data)
if res.isErr(): raiseIOSelectorsError(res.error())
proc updateHandle*[T](s: Selector[T], fd: cint | SocketHandle,
events: set[Event]) {.
raises: [Defect, IOSelectorsException].} =
let res = updateHandle2(s, cint(fd), events)
if res.isErr(): raiseIOSelectorsError(res.error())
proc unregister*[T](s: Selector[T], fd: cint | SocketHandle) {.
raises: [Defect, IOSelectorsException].} =
let res = unregister2(s, cint(fd))
if res.isErr(): raiseIOSelectorsError(res.error())
proc unregister*[T](s: Selector[T], event: SelectEvent) {.
raises: [Defect, IOSelectorsException].} =
let res = unregister2(s, event)
if res.isErr(): raiseIOSelectorsError(res.error())
proc registerEvent*[T](s: Selector[T], event: SelectEvent,
data: T) {.
raises: [Defect, IOSelectorsException].} =
let res = registerEvent2(s, event, data)
if res.isErr(): raiseIOSelectorsError(res.error())
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openArray[ReadyKey]): int =
var maxres = MAX_POLL_EVENTS
if maxres > len(results):
maxres = len(results)
verifySelectParams(timeout)
s.withPollLock():
let count = posix.poll(addr(s.pollfds[0]), Tnfds(s.pollcnt), timeout)
if count < 0:
result = 0
let err = osLastError()
if cint(err) != EINTR:
raiseIOSelectorsError(err)
elif count == 0:
result = 0
else:
var i = 0
var k = 0
var rindex = 0
while (i < s.pollcnt) and (k < count) and (rindex < maxres):
let revents = s.pollfds[i].revents
if revents != 0:
let fd = s.pollfds[i].fd
var pkey = addr(s.fds[fd])
var rkey = ReadyKey(fd: int(fd), events: {})
if (revents and POLLIN) != 0:
rkey.events.incl(Event.Read)
if Event.User in pkey.events:
var data: uint64 = 0
if posix.read(fd, addr data, sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err != OSErrorCode(EAGAIN):
raiseIOSelectorsError(err)
else:
# someone already consumed event data
inc(i)
continue
rkey.events = {Event.User}
if (revents and POLLOUT) != 0:
rkey.events.incl(Event.Write)
if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or
(revents and POLLNVAL) != 0:
rkey.events.incl(Event.Error)
results[rindex] = rkey
s.pollfds[i].revents = 0
inc(rindex)
inc(k)
inc(i)
result = k
readyKeys: var openArray[ReadyKey]): int {.
raises: [Defect, IOSelectorsException].} =
let res = selectInto2(s, timeout, readyKeys)
if res.isErr(): raiseIOSelectorsError(res.error())
res.get()
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
result = newSeq[ReadyKey](MAX_POLL_EVENTS)
let count = selectInto(s, timeout, result)
result.setLen(count)
let res = select2(s, timeout)
if res.isErr(): raiseIOSelectorsError(res.error())
res.get()
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|cint): bool {.inline.} =
s.checkKey(int32(fd))
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd.int].ident != InvalidIdent
proc setData*[T](s: Selector[T], fd: SocketHandle|cint, data: T): bool =
s.fds.withValue(int32(fd), skey):
skey[].data = data
return true
do:
return false
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)
s.checkFd(fdi)
if fdi in s:
result = s.fds[fdi].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
let fdi = int(fd)
s.checkFd(fdi)
if fdi in s:
s.fds[fdi].data = data
result = true
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
template withData*[T](s: Selector[T], fd: SocketHandle|cint, value,
body: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if fdi in s:
var value = addr(s.getData(fdi))
s.fds.withValue(int32(fd), skey):
var value = addr(skey[].data)
body
template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
template withData*[T](s: Selector[T], fd: SocketHandle|cint, value, body1,
body2: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if fdi in s:
var value = addr(s.getData(fdi))
s.fds.withValue(int32(fd), skey):
var value = addr(skey[].data)
body1
else:
do:
body2
proc getFd*[T](s: Selector[T]): int =
return -1
proc getFd*[T](s: Selector[T]): int = -1

View File

@ -1,465 +0,0 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2016 Eugene Kabanov
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# This module implements Posix and Windows select().
import times, nativesockets
when defined(windows):
import winlean
when defined(gcc):
{.passl: "-lws2_32".}
elif defined(vcc):
{.passl: "ws2_32.lib".}
const platformHeaders = """#include <winsock2.h>
#include <windows.h>"""
const EAGAIN = WSAEWOULDBLOCK
else:
const platformHeaders = """#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>"""
type
Fdset {.importc: "fd_set", header: platformHeaders, pure, final.} = object
var
FD_SETSIZE {.importc: "FD_SETSIZE", header: platformHeaders.}: cint
proc IOFD_SET(fd: SocketHandle, fdset: ptr Fdset)
{.cdecl, importc: "FD_SET", header: platformHeaders, inline.}
proc IOFD_CLR(fd: SocketHandle, fdset: ptr Fdset)
{.cdecl, importc: "FD_CLR", header: platformHeaders, inline.}
proc IOFD_ZERO(fdset: ptr Fdset)
{.cdecl, importc: "FD_ZERO", header: platformHeaders, inline.}
when defined(windows):
proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint
{.stdcall, importc: "FD_ISSET", header: platformHeaders, inline.}
proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset,
timeout: ptr Timeval): cint
{.stdcall, importc: "select", header: platformHeaders.}
else:
proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint
{.cdecl, importc: "FD_ISSET", header: platformHeaders, inline.}
proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset,
timeout: ptr Timeval): cint
{.cdecl, importc: "select", header: platformHeaders.}
when hasThreadSupport:
type
SelectorImpl[T] = object
rSet: Fdset
wSet: Fdset
eSet: Fdset
maxFD: int
fds: ptr SharedArray[SelectorKey[T]]
count: int
lock: Lock
Selector*[T] = ptr SelectorImpl[T]
else:
type
SelectorImpl[T] = object
rSet: Fdset
wSet: Fdset
eSet: Fdset
maxFD: int
fds: seq[SelectorKey[T]]
count: int
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object
rsock: SocketHandle
wsock: SocketHandle
SelectEvent* = ptr SelectEventImpl
when hasThreadSupport:
template withSelectLock[T](s: Selector[T], body: untyped) =
acquire(s.lock)
{.locks: [s.lock].}:
try:
body
finally:
release(s.lock)
else:
template withSelectLock[T](s: Selector[T], body: untyped) =
body
proc newSelector*[T](): Selector[T] =
when hasThreadSupport:
result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
result.fds = allocSharedArray[SelectorKey[T]](FD_SETSIZE)
initLock result.lock
else:
result = Selector[T]()
result.fds = newSeq[SelectorKey[T]](FD_SETSIZE)
for i in 0 ..< FD_SETSIZE:
result.fds[i].ident = InvalidIdent
IOFD_ZERO(addr result.rSet)
IOFD_ZERO(addr result.wSet)
IOFD_ZERO(addr result.eSet)
proc close*[T](s: Selector[T]) =
when hasThreadSupport:
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
when defined(windows):
proc newSelectEvent*(): SelectEvent =
var ssock = createNativeSocket()
var wsock = createNativeSocket()
var rsock: SocketHandle = INVALID_SOCKET
var saddr = Sockaddr_in()
saddr.sin_family = winlean.AF_INET
saddr.sin_port = 0
saddr.sin_addr.s_addr = INADDR_ANY
if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)),
sizeof(saddr).SockLen) < 0'i32:
raiseIOSelectorsError(osLastError())
if winlean.listen(ssock, 1) != 0:
raiseIOSelectorsError(osLastError())
var namelen = sizeof(saddr).SockLen
if getsockname(ssock, cast[ptr SockAddr](addr(saddr)),
addr(namelen)) != 0'i32:
raiseIOSelectorsError(osLastError())
saddr.sin_addr.s_addr = 0x0100007F
if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)),
sizeof(saddr).SockLen) != 0:
raiseIOSelectorsError(osLastError())
namelen = sizeof(saddr).SockLen
rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)),
cast[ptr SockLen](addr(namelen)))
if rsock == SocketHandle(-1):
raiseIOSelectorsError(osLastError())
if winlean.closesocket(ssock) != 0:
raiseIOSelectorsError(osLastError())
var mode = clong(1)
if ioctlsocket(rsock, FIONBIO, addr(mode)) != 0:
raiseIOSelectorsError(osLastError())
mode = clong(1)
if ioctlsocket(wsock, FIONBIO, addr(mode)) != 0:
raiseIOSelectorsError(osLastError())
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rsock = rsock
result.wsock = wsock
proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if winlean.send(ev.wsock, cast[pointer](addr data),
cint(sizeof(uint64)), 0) != sizeof(uint64):
raiseIOSelectorsError(osLastError())
proc close*(ev: SelectEvent) =
let res1 = winlean.closesocket(ev.rsock)
let res2 = winlean.closesocket(ev.wsock)
deallocShared(cast[pointer](ev))
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())
else:
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) != 0:
raiseIOSelectorsError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rsock = SocketHandle(fds[0])
result.wsock = SocketHandle(fds[1])
proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())
proc close*(ev: SelectEvent) =
let res1 = posix.close(cint(ev.rsock))
let res2 = posix.close(cint(ev.wsock))
deallocShared(cast[pointer](ev))
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())
proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event],
data: T) =
var i = 0
let fdi = int(fd)
while i < FD_SETSIZE:
if s.fds[i].ident == InvalidIdent:
var pkey = addr(s.fds[i])
pkey.ident = fdi
pkey.events = events
pkey.data = data
break
inc(i)
if i >= FD_SETSIZE:
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
var i = 0
let fdi = int(fd)
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
result = addr(s.fds[i])
break
inc(i)
doAssert(i < FD_SETSIZE,
"Descriptor [" & $int(fd) & "] is not registered in the queue!")
proc delKey[T](s: Selector[T], fd: SocketHandle) =
var empty: T
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fd.int:
s.fds[i].ident = InvalidIdent
s.fds[i].events = {}
s.fds[i].data = empty
break
inc(i)
doAssert(i < FD_SETSIZE,
"Descriptor [" & $int(fd) & "] is not registered in the queue!")
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
when not defined(windows):
let fdi = int(fd)
s.withSelectLock():
s.setSelectKey(fd, events, data)
when not defined(windows):
if fdi > s.maxFD: s.maxFD = fdi
if Event.Read in events:
IOFD_SET(fd, addr s.rSet)
inc(s.count)
if Event.Write in events:
IOFD_SET(fd, addr s.wSet)
IOFD_SET(fd, addr s.eSet)
inc(s.count)
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
when not defined(windows):
let fdi = int(ev.rsock)
s.withSelectLock():
s.setSelectKey(ev.rsock, {Event.User}, data)
when not defined(windows):
if fdi > s.maxFD: s.maxFD = fdi
IOFD_SET(ev.rsock, addr s.rSet)
inc(s.count)
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
s.withSelectLock():
var pkey = s.getKey(fd)
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
if (Event.Read in pkey.events) and (Event.Read notin events):
IOFD_CLR(fd, addr s.rSet)
dec(s.count)
if (Event.Write in pkey.events) and (Event.Write notin events):
IOFD_CLR(fd, addr s.wSet)
IOFD_CLR(fd, addr s.eSet)
dec(s.count)
if (Event.Read notin pkey.events) and (Event.Read in events):
IOFD_SET(fd, addr s.rSet)
inc(s.count)
if (Event.Write notin pkey.events) and (Event.Write in events):
IOFD_SET(fd, addr s.wSet)
IOFD_SET(fd, addr s.eSet)
inc(s.count)
pkey.events = events
proc unregister*[T](s: Selector[T], fd: SocketHandle|int) =
s.withSelectLock():
let fd = fd.SocketHandle
var pkey = s.getKey(fd)
if Event.Read in pkey.events or Event.User in pkey.events:
IOFD_CLR(fd, addr s.rSet)
dec(s.count)
if Event.Write in pkey.events:
IOFD_CLR(fd, addr s.wSet)
IOFD_CLR(fd, addr s.eSet)
dec(s.count)
s.delKey(fd)
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fd = ev.rsock
s.withSelectLock():
var pkey = s.getKey(fd)
IOFD_CLR(fd, addr s.rSet)
dec(s.count)
s.delKey(fd)
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openArray[ReadyKey]): int =
var tv = Timeval()
var ptv = addr tv
var rset, wset, eset: Fdset
verifySelectParams(timeout)
if timeout != -1:
when defined(genode):
tv.tv_sec = Time(timeout div 1_000)
else:
tv.tv_sec = timeout.int32 div 1_000
tv.tv_usec = (timeout.int32 %% 1_000) * 1_000
else:
ptv = nil
s.withSelectLock():
rset = s.rSet
wset = s.wSet
eset = s.eSet
var count = ioselect(cint(s.maxFD) + 1, addr(rset), addr(wset),
addr(eset), ptv)
if count < 0:
result = 0
when defined(windows):
raiseIOSelectorsError(osLastError())
else:
let err = osLastError()
if cint(err) != EINTR:
raiseIOSelectorsError(err)
elif count == 0:
result = 0
else:
var rindex = 0
var i = 0
var k = 0
while (i < FD_SETSIZE) and (k < count):
if s.fds[i].ident != InvalidIdent:
var flag = false
var pkey = addr(s.fds[i])
var rkey = ReadyKey(fd: int(pkey.ident), events: {})
let fd = SocketHandle(pkey.ident)
if IOFD_ISSET(fd, addr rset) != 0:
if Event.User in pkey.events:
var data: uint64 = 0
if recv(fd, cast[pointer](addr(data)),
sizeof(uint64).cint, 0) != sizeof(uint64):
let err = osLastError()
if cint(err) != EAGAIN:
raiseIOSelectorsError(err)
else:
inc(i)
inc(k)
continue
else:
flag = true
rkey.events = {Event.User}
else:
flag = true
rkey.events = {Event.Read}
if IOFD_ISSET(fd, addr wset) != 0:
rkey.events.incl(Event.Write)
if IOFD_ISSET(fd, addr eset) != 0:
rkey.events.incl(Event.Error)
flag = true
if flag:
results[rindex] = rkey
inc(rindex)
inc(k)
inc(i)
result = rindex
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
result = newSeq[ReadyKey](FD_SETSIZE)
var count = selectInto(s, timeout, result)
result.setLen(count)
proc flush*[T](s: Selector[T]) = discard
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
s.withSelectLock():
result = false
let fdi = int(fd)
for i in 0..<FD_SETSIZE:
if s.fds[i].ident == fdi:
return true
when hasThreadSupport:
template withSelectLock[T](s: Selector[T], body: untyped) =
acquire(s.lock)
{.locks: [s.lock].}:
try:
body
finally:
release(s.lock)
else:
template withSelectLock[T](s: Selector[T], body: untyped) =
body
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
s.withSelectLock():
let fdi = int(fd)
for i in 0..<FD_SETSIZE:
if s.fds[i].ident == fdi:
return s.fds[i].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
s.withSelectLock():
let fdi = int(fd)
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
var pkey = addr(s.fds[i])
pkey.data = data
result = true
break
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body: untyped) =
mixin withSelectLock
s.withSelectLock():
var value: ptr T
let fdi = int(fd)
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
value = addr(s.fds[i].data)
break
inc(i)
if i != FD_SETSIZE:
body
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body1, body2: untyped) =
mixin withSelectLock
s.withSelectLock():
block:
var value: ptr T
let fdi = int(fd)
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
value = addr(s.fds[i].data)
break
inc(i)
if i != FD_SETSIZE:
body1
else:
body2
proc getFd*[T](s: Selector[T]): int =
return -1

View File

@ -920,6 +920,26 @@ elif defined(linux):
events*: uint32 # Epoll events
data*: EpollData # User data variable
SignalFdInfo* {.importc: "struct signalfd_siginfo",
header: "<sys/signalfd.h>", pure, final.} = object
ssi_signo*: uint32
ssi_errno*: int32
ssi_code*: int32
ssi_pid*: uint32
ssi_uid*: uint32
ssi_fd*: int32
ssi_tid*: uint32
ssi_band*: uint32
ssi_overrun*: uint32
ssi_trapno*: uint32
ssi_status*: int32
ssi_int*: int32
ssi_ptr*: uint64
ssi_utime*: uint64
ssi_stime*: uint64
ssi_addr*: uint64
pad* {.importc: "__pad".}: array[0..47, uint8]
proc epoll_create*(size: cint): cint {.importc: "epoll_create",
header: "<sys/epoll.h>", sideEffect.}
@ -933,9 +953,19 @@ elif defined(linux):
timeout: cint): cint {.
importc: "epoll_wait", header: "<sys/epoll.h>", sideEffect.}
proc timerfd_create*(clock_id: ClockId, flags: cint): cint {.
cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".}
proc timerfd_settime*(ufd: cint, flags: cint,
utmr: var Itimerspec, otmr: var Itimerspec): cint {.
cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".}
proc eventfd*(count: cuint, flags: cint): cint {.
cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
proc signalfd*(fd: cint, mask: var Sigset, flags: cint): cint {.
cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
else:
import std/[posix, os]
export posix, os
import std/posix
export posix
var IP_MULTICAST_TTL* {.importc: "IP_MULTICAST_TTL",
header: "<netinet/in.h>".}: cint

View File

@ -31,22 +31,32 @@
# support - changes could potentially be backported to nim but are not
# backwards-compatible.
import os, nativesockets
import stew/results
import osdefs, osutils
export results
const hasThreadSupport = compileOption("threads") and defined(threadsafe)
const
asyncEventsCount* {.intdefine.} = 64
## Number of epoll events retrieved by syscall.
asyncInitialSize* {.intdefine.} = 64
## Initial size of Selector[T]'s array of file descriptors.
asyncEventEngine* {.strdefine.} =
when defined(linux):
"epoll"
elif defined(macosx) or defined(macos) or defined(ios) or
defined(freebsd) or defined(netbsd) or defined(openbsd) or
defined(dragonfly):
"kqueue"
elif defined(posix):
"poll"
else:
""
## Engine type which is going to be used by module.
const ioselSupportedPlatform* = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(dragonfly) or
(defined(linux) and not defined(android))
## This constant is used to determine whether the destination platform is
## fully supported by ``ioselectors`` module.
const bsdPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(dragonfly)
hasThreadSupport = compileOption("threads")
when defined(nimdoc):
type
Selector*[T] = ref object
## An object which holds descriptors to be checked for read/write status
@ -236,30 +246,16 @@ when defined(nimdoc):
## For *poll* and *select* selectors ``-1`` is returned.
else:
import strutils
when hasThreadSupport:
import locks
type
SharedArray[T] = UncheckedArray[T]
proc allocSharedArray[T](nsize: int): ptr SharedArray[T] =
result = cast[ptr SharedArray[T]](allocShared0(sizeof(T) * nsize))
proc reallocSharedArray[T](sa: ptr SharedArray[T], nsize: int): ptr SharedArray[T] =
result = cast[ptr SharedArray[T]](reallocShared(sa, sizeof(T) * nsize))
proc deallocSharedArray[T](sa: ptr SharedArray[T]) =
deallocShared(cast[pointer](sa))
type
IOSelectorsException* = object of CatchableError
SelectResult*[T] = Result[T, OSErrorCode]
Event* {.pure.} = enum
Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot,
Finished, VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink,
VnodeRename, VnodeRevoke
type
IOSelectorsException* = object of CatchableError
ReadyKey* = object
fd* : int
events*: set[Event]
@ -285,78 +281,54 @@ else:
var err = newException(IOSelectorsException, msg)
raise err
proc setNonBlocking(fd: cint) {.inline.} =
setBlocking(fd.SocketHandle, false)
when not defined(windows):
import posix
template setKey(s, pident, pevents, pparam, pdata: untyped) =
var skey = addr(s.fds[pident])
skey.ident = pident
skey.events = pevents
skey.param = pparam
skey.data = data
when ioselSupportedPlatform:
template blockSignals(newmask: var Sigset, oldmask: var Sigset) =
when asyncEventEngine in ["epoll", "kqueue"]:
proc blockSignals(newmask: Sigset,
oldmask: var Sigset): Result[void, OSErrorCode] =
var nmask = newmask
# We do this trick just because Nim's posix.nim has declaration like
# this:
# proc pthread_sigmask(a1: cint; a2, a3: var Sigset): cint
# proc sigprocmask*(a1: cint, a2, a3: var Sigset): cint
when hasThreadSupport:
if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
if pthread_sigmask(SIG_BLOCK, nmask, oldmask) == -1:
err(osLastError())
else:
ok()
else:
if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
if sigprocmask(SIG_BLOCK, nmask, oldmask) == -1:
err(osLastError())
else:
ok()
template unblockSignals(newmask: var Sigset, oldmask: var Sigset) =
proc unblockSignals(newmask: Sigset,
oldmask: var Sigset): Result[void, OSErrorCode] =
# We do this trick just because Nim's posix.nim has declaration like
# this:
# proc pthread_sigmask(a1: cint; a2, a3: var Sigset): cint
# proc sigprocmask*(a1: cint, a2, a3: var Sigset): cint
var nmask = newmask
when hasThreadSupport:
if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
if pthread_sigmask(SIG_UNBLOCK, nmask, oldmask) == -1:
err(osLastError())
else:
ok()
else:
if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
if sigprocmask(SIG_UNBLOCK, nmask, oldmask) == -1:
err(osLastError())
else:
ok()
template clearKey[T](key: ptr SelectorKey[T]) =
var empty: T
key.ident = InvalidIdent
key.events = {}
key.data = empty
proc verifySelectParams(timeout: int) =
template verifySelectParams(timeout, min, max: int) =
# Timeout of -1 means: wait forever
# Anything higher is the time to wait in milliseconds.
doAssert(timeout >= -1, "Cannot select with a negative value, got " & $timeout)
doAssert((timeout >= min) and (timeout <= max),
"Cannot select with incorrect timeout value, got " & $timeout)
when defined(linux):
include ./ioselects/ioselectors_epoll
elif bsdPlatform:
include ./ioselects/ioselectors_kqueue
elif defined(windows):
include ./ioselects/ioselectors_select
elif defined(solaris):
include ./ioselects/ioselectors_poll # need to replace it with event ports
elif defined(genode):
include ./ioselects/ioselectors_select # TODO: use the native VFS layer
elif defined(nintendoswitch):
include ./ioselects/ioselectors_select
else:
include ./ioselects/ioselectors_poll
proc register*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) {.deprecated: "use registerHandle instead".} =
## **Deprecated since v0.18.0:** Use ``registerHandle`` instead.
s.registerHandle(fd, events, data)
proc setEvent*(ev: SelectEvent) {.deprecated: "use trigger instead",
raises: [Defect, IOSelectorsException].} =
## Trigger event ``ev``.
##
## **Deprecated since v0.18.0:** Use ``trigger`` instead.
ev.trigger()
proc update*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event]) {.deprecated: "use updateHandle instead".} =
## Update file/socket descriptor ``fd``, registered in selector
## ``s`` with new events set ``event``.
##
## **Deprecated since v0.18.0:** Use ``updateHandle`` instead.
s.updateHandle()
when asyncEventEngine == "epoll":
include ./ioselects/ioselectors_epoll
elif asyncEventEngine == "kqueue":
include ./ioselects/ioselectors_kqueue
elif asyncEventEngine == "poll":
include ./ioselects/ioselectors_poll
else:
{.fatal: "Event engine `" & asyncEventEngine & "` is not supported!".}