diff --git a/chronos.nimble b/chronos.nimble index 639d1d3..18f7a47 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,7 +1,7 @@ mode = ScriptMode.Verbose packageName = "chronos" -version = "3.0.11" +version = "3.1.0" author = "Status Research & Development GmbH" description = "Networking framework with async/await support" license = "MIT or Apache License 2.0" diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index ed8b43d..ca16554 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -567,24 +567,24 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or # We are ignoring SIGPIPE signal, because we are working with EPIPE. posix.signal(cint(SIGPIPE), SIG_IGN) - proc initAPI(disp: PDispatcher) {.raises: [Defect].} = + proc initAPI(disp: PDispatcher) = discard - proc newDispatcher*(): PDispatcher {.raises: [Defect].} = + proc newDispatcher*(): PDispatcher = ## Create new dispatcher. let selector = - try: - newSelector[SelectorData]() - except IOSelectorsException as exc: - raiseAsDefect exc, "Could not initialize selector" - except CatchableError as exc: - raiseAsDefect exc, "Could not initialize selector" + block: + let res = Selector.new(SelectorData) + if res.isErr(): raiseOsDefect(res.error(), + "Could not initialize selector") + res.get() + var res = PDispatcher( selector: selector, timers: initHeapQueue[TimerCallback](), - callbacks: initDeque[AsyncCallback](64), + callbacks: initDeque[AsyncCallback](asyncEventsCount), idlers: initDeque[AsyncCallback](), - keys: newSeq[ReadyKey](64), + keys: newSeq[ReadyKey](asyncEventsCount), trackers: initTable[string, TrackerBase]() ) res.callbacks.addLast(SentinelCallback) @@ -600,28 +600,18 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or ## Returns system specific OS queue. disp.selector + proc contains*(disp: PDispatcher, fd: AsyncFD): bool {.inline.} = + ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. + cint(fd) in disp.selector + proc register2*(fd: AsyncFD): Result[void, OSErrorCode] = ## Register file descriptor ``fd`` in thread's dispatcher. - let loop = getThreadDispatcher() - try: - var data: SelectorData - loop.selector.registerHandle(int(fd), {}, data) - except CatchableError: - return err(osLastError()) - ok() + var data: SelectorData + getThreadDispatcher().selector.registerHandle2(cint(fd), {}, data) proc unregister2*(fd: AsyncFD): Result[void, OSErrorCode] = ## Unregister file descriptor ``fd`` from thread's dispatcher. - let loop = getThreadDispatcher() - try: - loop.selector.unregister(int(fd)) - except CatchableError: - return err(osLastError()) - ok() - - proc contains*(disp: PDispatcher, fd: AsyncFD): bool {.inline.} = - ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. - int(fd) in disp.selector + getThreadDispatcher().selector.unregister2(cint(fd)) proc addReader2*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil): Result[void, OSErrorCode] = @@ -629,37 +619,27 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or ## call the callback ``cb`` with specified argument ``udata``. let loop = getThreadDispatcher() var newEvents = {Event.Read} - withData(loop.selector, int(fd), adata) do: + withData(loop.selector, cint(fd), adata) do: let acb = AsyncCallback(function: cb, udata: udata) adata.reader = acb if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) do: return err(OSErrorCode(osdefs.EBADF)) - - try: - loop.selector.updateHandle(int(fd), newEvents) - except CatchableError: - return err(osLastError()) - ok() + loop.selector.updateHandle2(cint(fd), newEvents) proc removeReader2*(fd: AsyncFD): Result[void, OSErrorCode] = ## Stop watching the file descriptor ``fd`` for read availability. let loop = getThreadDispatcher() var newEvents: set[Event] - withData(loop.selector, int(fd), adata) do: + withData(loop.selector, cint(fd), adata) do: # We need to clear `reader` data, because `selectors` don't do it adata.reader = default(AsyncCallback) if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) do: return err(OSErrorCode(osdefs.EBADF)) - - try: - loop.selector.updateHandle(int(fd), newEvents) - except CatchableError: - return err(osLastError()) - ok() + loop.selector.updateHandle2(cint(fd), newEvents) proc addWriter2*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil): Result[void, OSErrorCode] = @@ -667,37 +647,27 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or ## call the callback ``cb`` with specified argument ``udata``. let loop = getThreadDispatcher() var newEvents = {Event.Write} - withData(loop.selector, int(fd), adata) do: + withData(loop.selector, cint(fd), adata) do: let acb = AsyncCallback(function: cb, udata: udata) adata.writer = acb if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) do: return err(OSErrorCode(osdefs.EBADF)) - - try: - loop.selector.updateHandle(int(fd), newEvents) - except CatchableError: - return err(osLastError()) - ok() + loop.selector.updateHandle2(cint(fd), newEvents) proc removeWriter2*(fd: AsyncFD): Result[void, OSErrorCode] = ## Stop watching the file descriptor ``fd`` for write availability. let loop = getThreadDispatcher() var newEvents: set[Event] - withData(loop.selector, int(fd), adata) do: + withData(loop.selector, cint(fd), adata) do: # We need to clear `writer` data, because `selectors` don't do it adata.writer = default(AsyncCallback) if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) do: return err(OSErrorCode(osdefs.EBADF)) - - try: - loop.selector.updateHandle(int(fd), newEvents) - except CatchableError: - return err(osLastError()) - ok() + loop.selector.updateHandle2(cint(fd), newEvents) proc register*(fd: AsyncFD) {.raises: [Defect, OSError].} = ## Register file descriptor ``fd`` in thread's dispatcher. @@ -766,7 +736,7 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or if not isNil(aftercb): aftercb(cast[pointer](param)) - withData(loop.selector, int(fd), adata) do: + withData(loop.selector, cint(fd), adata) do: # We are scheduling reader and writer callbacks to be called # explicitly, so they can get an error and continue work. # Callbacks marked as deleted so we don't need to get REAL notifications @@ -795,27 +765,59 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or ## You can execute ``aftercb`` before actual socket close operation. closeSocket(fd, aftercb) - when ioselSupportedPlatform: - proc addSignal*(signal: int, cb: CallbackFunc, - udata: pointer = nil): int {. - raises: [Defect, IOSelectorsException, ValueError, OSError].} = + when asyncEventEngine in ["epoll", "kqueue"]: + proc addSignal2*(signal: int, cb: CallbackFunc, + udata: pointer = nil): Result[int, OSErrorCode] = ## Start watching signal ``signal``, and when signal appears, call the ## callback ``cb`` with specified argument ``udata``. Returns signal ## identifier code, which can be used to remove signal callback ## via ``removeSignal``. let loop = getThreadDispatcher() var data: SelectorData - result = loop.selector.registerSignal(signal, data) - withData(loop.selector, result, adata) do: + let sigfd = ? loop.selector.registerSignal(signal, data) + withData(loop.selector, sigfd, adata) do: adata.reader = AsyncCallback(function: cb, udata: udata) do: - raise newException(ValueError, "File descriptor not registered.") + return err(OSErrorCode(osdefs.EBADF)) + ok(sigfd) - proc removeSignal*(sigfd: int) {. - raises: [Defect, IOSelectorsException].} = - ## Remove watching signal ``signal``. + proc addProcess2*(pid: int, cb: CallbackFunc, + udata: pointer = nil): Result[int, OSErrorCode] = + ## Registers callback ``cb`` to be called when process with process + ## identifier ``pid`` exited. Returns process' descriptor, which can be + ## used to clear process callback via ``removeProcess``. let loop = getThreadDispatcher() - loop.selector.unregister(sigfd) + var data: SelectorData + let procfd = ? loop.selector.registerProcess(pid, data) + withData(loop.selector, procfd, adata) do: + adata.reader = AsyncCallback(function: cb, udata: udata) + do: + return err(OSErrorCode(osdefs.EBADF)) + ok(procfd) + + proc removeSignal2*(sigfd: int): Result[void, OSErrorCode] = + ## Remove watching signal ``signal``. + getThreadDispatcher().selector.unregister2(cint(sigfd)) + + proc removeProcess2*(procfd: int): Result[void, OSErrorCode] = + ## Remove process' watching using process' descriptor ``procfd``. + getThreadDispatcher().selector.unregister2(cint(procfd)) + + proc addSignal*(signal: int, cb: CallbackFunc, + udata: pointer = nil): int {.raises: [Defect, OSError].} = + ## Start watching signal ``signal``, and when signal appears, call the + ## callback ``cb`` with specified argument ``udata``. Returns signal + ## identifier code, which can be used to remove signal callback + ## via ``removeSignal``. + addSignal2(signal, cb, udata).tryGet() + + proc removeSignal*(sigfd: int) {.raises: [Defect, OSError].} = + ## Remove watching signal ``signal``. + removeSignal2(sigfd).tryGet() + + proc removeProcess*(procfd: int) {.raises: [Defect, OSError].} = + ## Remove process' watching using process' descriptor ``procfd``. + removeProcess2(procfd).tryGet() proc poll*() {.gcsafe.} = ## Perform single asynchronous step. @@ -823,10 +825,6 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or var curTime = Moment.now() var curTimeout = 0 - when ioselSupportedPlatform: - let customSet = {Event.Timer, Event.Signal, Event.Process, - Event.Vnode} - # On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`, # complete pending work of the outer `processCallbacks` call. # On non-reentrant `poll` calls, this only removes sentinel element. @@ -837,15 +835,17 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or # Processing IO descriptors and all hardware events. let count = - try: - loop.selector.selectInto(curTimeout, loop.keys) - except IOSelectorsException: - raiseOsDefect(osLastError(), "poll(): Unable to get OS events") - for i in 0..", pure, final.} = object - ssi_signo*: uint32 - ssi_errno*: int32 - ssi_code*: int32 - ssi_pid*: uint32 - ssi_uid*: uint32 - ssi_fd*: int32 - ssi_tid*: uint32 - ssi_band*: uint32 - ssi_overrun*: uint32 - ssi_trapno*: uint32 - ssi_status*: int32 - ssi_int*: int32 - ssi_ptr*: uint64 - ssi_utime*: uint64 - ssi_stime*: uint64 - ssi_addr*: uint64 - pad* {.importc: "__pad".}: array[0..47, uint8] - -proc timerfd_create(clock_id: ClockId, flags: cint): cint - {.cdecl, importc: "timerfd_create", header: "".} -proc timerfd_settime(ufd: cint, flags: cint, - utmr: var Itimerspec, otmr: var Itimerspec): cint - {.cdecl, importc: "timerfd_settime", header: "".} -proc eventfd(count: cuint, flags: cint): cint - {.cdecl, importc: "eventfd", header: "".} - -when not defined(android): - proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint - {.cdecl, importc: "signalfd", header: "".} - -when hasThreadSupport: - type - SelectorImpl[T] = object - epollFD: cint - numFD: int - fds: ptr SharedArray[SelectorKey[T]] - count: int - Selector*[T] = ptr SelectorImpl[T] -else: - type - SelectorImpl[T] = object - epollFD: cint - numFD: int - fds: seq[SelectorKey[T]] - count: int - Selector*[T] = ref SelectorImpl[T] type + SelectorImpl[T] = object + epollFd: cint + sigFd: Opt[cint] + pidFd: Opt[cint] + fds: Table[int32, SelectorKey[T]] + signals: Table[int32, SelectorKey[T]] + processes: Table[int32, SelectorKey[T]] + signalMask: Sigset + virtualHoles: Deque[int32] + virtualId: int32 + childrenExited: bool + pendingEvents: Deque[ReadyKey] + + Selector*[T] = ref SelectorImpl[T] + SelectEventImpl = object efd: cint + SelectEvent* = ptr SelectEventImpl -proc newSelector*[T](): Selector[T] {.raises: [Defect, OSError].} = - # Retrieve the maximum fd count (for current OS) via getrlimit() - var a = RLimit() - # Start with a reasonable size, checkFd() will grow this on demand - const numFD = 1024 - - var epollFD = epoll_create(MAX_EPOLL_EVENTS) - if epollFD < 0: - raiseOSError(osLastError()) - - when hasThreadSupport: - result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) - result.epollFD = epollFD - result.numFD = numFD - result.fds = allocSharedArray[SelectorKey[T]](numFD) +proc getVirtualId[T](s: Selector[T]): SelectResult[int32] = + if len(s.virtualHoles) > 0: + ok(s.virtualHoles.popLast()) else: - result = Selector[T]() - result.epollFD = epollFD - result.numFD = numFD - result.fds = newSeq[SelectorKey[T]](numFD) - - for i in 0 ..< numFD: - result.fds[i].ident = InvalidIdent - -proc close*[T](s: Selector[T]) = - let res = posix.close(s.epollFD) - when hasThreadSupport: - deallocSharedArray(s.fds) - deallocShared(cast[pointer](s)) - if res != 0: - raiseIOSelectorsError(osLastError()) - -proc newSelectEvent*(): SelectEvent {.raises: [Defect, OSError, IOSelectorsException].} = - let fdci = eventfd(0, 0) - if fdci == -1: - raiseIOSelectorsError(osLastError()) - setNonBlocking(fdci) - result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) - result.efd = fdci - -proc trigger*(ev: SelectEvent) {.raises: [Defect, IOSelectorsException].} = - var data: uint64 = 1 - if posix.write(ev.efd, addr data, sizeof(uint64)) == -1: - raiseIOSelectorsError(osLastError()) - -proc close*(ev: SelectEvent) {.raises: [Defect, IOSelectorsException].} = - let res = posix.close(ev.efd) - deallocShared(cast[pointer](ev)) - if res != 0: - raiseIOSelectorsError(osLastError()) - -template checkFd(s, f) = - if f >= s.numFD: - var numFD = s.numFD - while numFD <= f: numFD *= 2 - when hasThreadSupport: - s.fds = reallocSharedArray(s.fds, numFD) + if s.virtualId == low(int32): + err(OSErrorCode(EMFILE)) else: - s.fds.setLen(numFD) - for i in s.numFD ..< numFD: - s.fds[i].ident = InvalidIdent - s.numFD = numFD + dec(s.virtualId) + ok(s.virtualId) -proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, +proc isVirtualId(ident: int32): bool = + ident < 0'i32 + +proc toString(key: int32|cint|SocketHandle|int): string = + let fdi32 = when key is int32: key else: int32(key) + if isVirtualId(fdi32): + if fdi32 == -1: + "InvalidIdent" + else: + "V" & Base10.toString(uint32(-fdi32)) + else: + Base10.toString(uint32(fdi32)) + +template addKey[T](s: Selector[T], key: int32, skey: SelectorKey[T]) = + if s.fds.hasKeyOrPut(key, skey): + raiseAssert "Descriptor [" & key.toString() & + "] is already registered in the selector!" + +template getKey[T](s: Selector[T], key: int32): SelectorKey[T] = + let + defaultKey = SelectorKey[T](ident: InvalidIdent) + pkey = s.fds.getOrDefault(key, defaultKey) + doAssert(pkey.ident != InvalidIdent, + "Descriptor [" & key.toString() & + "] is not registered in the selector!") + pkey + +template checkKey[T](s: Selector[T], key: int32): bool = + s.fds.contains(key) + +proc addSignal[T](s: Selector[T], signal: int, skey: SelectorKey[T]) = + if s.signals.hasKeyOrPut(int32(signal), skey): + raiseAssert "Signal [" & $signal & "] is already registered in the selector" + +template addProcess[T](s: Selector[T], pid: int, skey: SelectorKey[T]) = + if s.processes.hasKeyOrPut(int32(pid), skey): + raiseAssert "Process [" & $pid & "] is already registered in the selector" + +proc freeKey[T](s: Selector[T], key: int32) = + s.fds.del(key) + if isVirtualId(key): + s.virtualHoles.addFirst(key) + +proc freeSignal[T](s: Selector[T], ident: int32) = + s.signals.del(ident) + +proc freeProcess[T](s: Selector[T], ident: int32) = + s.processes.del(ident) + +proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] = + var nmask: Sigset + if sigemptyset(nmask) < 0: + return err(osLastError()) + let epollFd = epoll_create(asyncEventsCount) + if epollFd < 0: + return err(osLastError()) + let selector = Selector[T]( + epollFd: epollFd, + fds: initTable[int32, SelectorKey[T]](asyncInitialSize), + signalMask: nmask, + virtualId: -1'i32, # Should start with -1, because `InvalidIdent` == -1 + childrenExited: false, + virtualHoles: initDeque[int32](), + pendingEvents: initDeque[ReadyKey]() + ) + ok(selector) + +proc close2*[T](s: Selector[T]): SelectResult[void] = + s.fds.clear() + s.signals.clear() + s.processes.clear() + s.virtualHoles.clear() + s.virtualId = -1'i32 + if handleEintr(osdefs.close(s.epollFd)) != 0: + err(osLastError()) + else: + ok() + +proc new*(t: typedesc[SelectEvent]): SelectResult[SelectEvent] = + let eFd = eventfd(0, EFD_CLOEXEC or EFD_NONBLOCK) + if eFd == -1: + return err(osLastError()) + var res = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + res.efd = eFd + ok(res) + +proc trigger2*(event: SelectEvent): SelectResult[void] = + var data: uint64 = 1 + let res = handleEintr(osdefs.write(event.efd, addr data, sizeof(uint64))) + if res == -1: + err(osLastError()) + elif res != sizeof(uint64): + err(OSErrorCode(osdefs.EINVAL)) + else: + ok() + +proc close2*(event: SelectEvent): SelectResult[void] = + let evFd = event.efd + deallocShared(cast[pointer](event)) + let res = handleEintr(osdefs.close(evFd)) + if res == -1: + err(osLastError()) + else: + ok() + +proc init(t: typedesc[EpollEvent], fdi: cint, events: set[Event]): EpollEvent = + var res = uint32(EPOLLRDHUP) + if Event.Read in events: res = res or uint32(EPOLLIN) + if Event.Write in events: res = res or uint32(EPOLLOUT) + if Event.Oneshot in events: res = res or uint32(EPOLLONESHOT) + # We need this double conversion of type because otherwise in x64 environment + # negative cint could be converted to big uint64. + EpollEvent(events: res, data: EpollData(u64: uint64(uint32(fdi)))) + +proc registerHandle2*[T](s: Selector[T], fd: cint, events: set[Event], + data: T): SelectResult[void] = + let skey = SelectorKey[T](ident: fd, events: events, param: 0, data: data) + + s.addKey(fd, skey) + + if events != {}: + let epollEvents = EpollEvent.init(fd, events) + if epoll_ctl(s.epollFd, EPOLL_CTL_ADD, fd, unsafeAddr(epollEvents)) != 0: + s.freeKey(fd) + return err(osLastError()) + ok() + +proc updateHandle2*[T](s: Selector[T], fd: cint, + events: set[Event]): SelectResult[void] = + const EventsMask = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, + Event.User, Event.Oneshot, Event.Error} + s.fds.withValue(int32(fd), pkey): + doAssert(pkey[].events * EventsMask == {}, + "Descriptor [" & fd.toString() & "] could not be updated!") + if pkey[].events != events: + let epollEvents = EpollEvent.init(fd, events) + if pkey[].events == {}: + if epoll_ctl(s.epollFd, EPOLL_CTL_ADD, fd, + unsafeAddr(epollEvents)) != 0: + return err(osLastError()) + else: + if events != {}: + if epoll_ctl(s.epollFd, EPOLL_CTL_MOD, fd, + unsafeAddr(epollEvents)) != 0: + return err(osLastError()) + else: + if epoll_ctl(s.epollFd, EPOLL_CTL_DEL, fd, + unsafeAddr epollEvents) != 0: + return err(osLastError()) + pkey.events = events + do: + raiseAssert "Descriptor [" & fd.toString() & + "] is not registered in the selector!" + ok() + +proc blockSignal[T](s: Selector[T], signal: int): SelectResult[bool] = + let isMember = sigismember(s.signalMask, cint(signal)) + if isMember < 0: + err(osLastError()) + elif isMember > 0: + ok(false) + else: + var omask, nmask: Sigset + if sigemptyset(nmask) < 0: + return err(osLastError()) + if sigemptyset(omask) < 0: + return err(osLastError()) + if sigaddset(nmask, cint(signal)) < 0: + return err(osLastError()) + ? blockSignals(nmask, omask) + if sigaddset(s.signalMask, cint(signal)) < 0: + # Try to restore previous state of signals mask + let errorCode = osLastError() + discard unblockSignals(nmask, omask) + return err(errorCode) + ok(true) + +proc unblockSignal[T](s: Selector[T], signal: int): SelectResult[bool] = + let isMember = sigismember(s.signalMask, cint(signal)) + if isMember < 0: + err(osLastError()) + elif isMember == 0: + ok(false) + else: + var omask, nmask: Sigset + if sigemptyset(nmask) < 0: + return err(osLastError()) + if sigemptyset(omask) < 0: + return err(osLastError()) + if sigaddset(nmask, cint(signal)) < 0: + return err(osLastError()) + ? unblockSignals(nmask, omask) + if sigdelset(s.signalMask, cint(signal)) < 0: + # Try to restore previous state of signals mask + let errorCode = osLastError() + discard blockSignals(nmask, omask) + return err(errorCode) + ok(true) + +template checkSignal(signal: int) = + doAssert((signal >= 0) and (signal <= int(high(int32))), + "Invalid signal value [" & $signal & "]") + +proc registerSignalEvent[T](s: Selector[T], signal: int, + events: set[Event], param: int, + data: T): SelectResult[cint] = + checkSignal(signal) + + let + fdi32 = ? s.getVirtualId() + selectorKey = SelectorKey[T](ident: signal, events: events, + param: param, data: data) + signalKey = SelectorKey[T](ident: fdi32, events: events, + param: param, data: data) + + s.addKey(fdi32, selectorKey) + s.addSignal(signal, signalKey) + + let mres = + block: + let res = s.blockSignal(signal) + if res.isErr(): + s.freeKey(fdi32) + s.freeSignal(int32(signal)) + return err(res.error()) + res.get() + + if not(mres): + raiseAssert "Signal [" & $signal & "] could have only one handler at " & + "the same time!" + + if s.sigFd.isSome(): + let res = signalfd(s.sigFd.get(), s.signalMask, + SFD_NONBLOCK or SFD_CLOEXEC) + if res == -1: + let errorCode = osLastError() + s.freeKey(fdi32) + s.freeSignal(int32(signal)) + discard s.unblockSignal(signal) + return err(errorCode) + else: + let sigFd = signalfd(-1, s.signalMask, SFD_NONBLOCK or SFD_CLOEXEC) + if sigFd == -1: + let errorCode = osLastError() + s.freeKey(fdi32) + s.freeSignal(int32(signal)) + discard s.unblockSignal(signal) + return err(errorCode) + + let fdKey = SelectorKey[T](ident: sigFd, events: {Event.Signal}) + s.addKey(sigFd, fdKey) + + let event = EpollEvent.init(sigFd, {Event.Read}) + if epoll_ctl(s.epollFd, EPOLL_CTL_ADD, sigFd, unsafeAddr(event)) != 0: + let errorCode = osLastError() + s.freeKey(fdi32) + s.freeSignal(int32(signal)) + s.freeKey(sigFd) + discard s.unblockSignal(signal) + discard handleEintr(osdefs.close(sigFd)) + return err(errorCode) + + s.sigFd = Opt.some(sigFd) + + ok(cint(fdi32)) + +proc registerSignal*[T](s: Selector[T], signal: int, + data: T): SelectResult[cint] = + registerSignalEvent(s, signal, {Event.Signal}, 0, data) + +proc registerTimer2*[T](s: Selector[T], timeout: int, oneshot: bool, + data: T): SelectResult[cint] = + let timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC or TFD_NONBLOCK) + if timerFd == -1: + return err(osLastError()) + + let + fdi32 = int32(timerFd) + (key, event) = + if oneshot: + ( + SelectorKey[T](ident: timerFd, events: {Event.Timer, Event.Oneshot}, + param: 0, data: data), + EpollEvent.init(timerFd, {Event.Read, Event.Oneshot}) + ) + else: + ( + SelectorKey[T](ident: timerFd, events: {Event.Timer}, + param: 0, data: data), + EpollEvent.init(timerFd, {Event.Read}) + ) + var timeStruct = + if oneshot: + Itimerspec( + it_interval: Timespec(tv_sec: osdefs.Time(0), tv_nsec: 0), + it_value: Timespec(tv_sec: osdefs.Time(timeout div 1_000), + tv_nsec: (timeout %% 1000) * 1_000_000) + ) + else: + Itimerspec( + it_interval: Timespec(tv_sec: osdefs.Time(timeout div 1_000), + tv_nsec: 0), + it_value: Timespec(tv_sec: osdefs.Time(timeout div 1_000), + tv_nsec: 0), + ) + + s.addKey(fdi32, key) + + var oldTs = Itimerspec() + if timerfd_settime(timerFd, cint(0), timeStruct, oldTs) != 0: + let errorCode = osLastError() + s.freeKey(fdi32) + discard handleEintr(osdefs.close(timerFd)) + return err(errorCode) + + if epoll_ctl(s.epollFd, EPOLL_CTL_ADD, timerFd, unsafeAddr(event)) != 0: + let errorCode = osLastError() + s.freeKey(fdi32) + discard handleEintr(osdefs.close(timerFd)) + return err(errorCode) + + ok(cint(fdi32)) + +proc registerEvent2*[T](s: Selector[T], ev: SelectEvent, + data: T): SelectResult[cint] = + doAssert(not(isNil(ev))) + let + key = SelectorKey[T](ident: ev.efd, events: {Event.User}, + param: 0, data: data) + event = EpollEvent.init(ev.efd, {Event.Read}) + + s.addKey(ev.efd, key) + + if epoll_ctl(s.epollFd, EPOLL_CTL_ADD, ev.efd, unsafeAddr(event)) != 0: + s.freeKey(ev.efd) + return err(osLastError()) + + ok(ev.efd) + +template checkPid(pid: int) = + when sizeof(int) == 8: + doAssert(pid >= 0 and pid <= int(high(uint32)), + "Invalid process idientified (pid) value") + else: + doAssert(pid >= 0 and pid <= high(int32), + "Invalid process idientified (pid) value") + +proc registerProcess*[T](s: Selector, pid: int, data: T): SelectResult[cint] = + checkPid(pid) + + let + fdi32 = ? s.getVirtualId() + events = {Event.Process, Event.Oneshot} + selectorKey = SelectorKey[T](ident: pid, events: events, param: 0, + data: data) + processKey = SelectorKey[T](ident: fdi32, events: events, param: 0, + data: data) + + s.addProcess(pid, processKey) + s.addKey(fdi32, selectorKey) + + if s.pidFd.isNone(): + let res = registerSignalEvent(s, int(SIGCHLD), {Event.Signal}, 0, data) + if res.isErr(): + s.freeKey(fdi32) + s.freeProcess(int32(pid)) + return err(res.error()) + s.pidFd = Opt.some(cast[cint](res.get())) + + ok(cint(fdi32)) + +proc unregister2*[T](s: Selector[T], fd: cint): SelectResult[void] = + let + fdi32 = int32(fd) + pkey = s.getKey(fdi32) + + if pkey.events != {}: + if {Event.Read, Event.Write, Event.User} * pkey.events != {}: + if epoll_ctl(s.epollFd, EPOLL_CTL_DEL, cint(pkey.ident), nil) != 0: + return err(osLastError()) + + elif Event.Timer in pkey.events: + if Event.Finished notin pkey.events: + if epoll_ctl(s.epollFd, EPOLL_CTL_DEL, fd, nil) != 0: + let errorCode = osLastError() + discard handleEintr(osdefs.close(fd)) + return err(errorCode) + if handleEintr(osdefs.close(fd)) == -1: + return err(osLastError()) + + elif Event.Signal in pkey.events: + if not(s.signals.hasKey(int32(pkey.ident))): + raiseAssert "Signal " & pkey.ident.toString() & + " is not registered in the selector!" + let sigFd = + block: + doAssert(s.sigFd.isSome(), "signalfd descriptor is missing") + s.sigFd.get() + + s.freeSignal(int32(pkey.ident)) + + if len(s.signals) > 0: + let res = signalfd(sigFd, s.signalMask, SFD_NONBLOCK or SFD_CLOEXEC) + if res == -1: + let errorCode = osLastError() + discard s.unblockSignal(pkey.ident) + return err(errorCode) + else: + s.freeKey(sigFd) + s.sigFd = Opt.none(cint) + + if epoll_ctl(s.epollFd, EPOLL_CTL_DEL, sigFd, nil) != 0: + let errorCode = osLastError() + discard handleEintr(osdefs.close(sigFd)) + discard s.unblockSignal(pkey.ident) + return err(errorCode) + + if handleEintr(osdefs.close(sigFd)) != 0: + let errorCode = osLastError() + discard s.unblockSignal(pkey.ident) + return err(errorCode) + + let mres = ? s.unblockSignal(pkey.ident) + doAssert(mres, "Signal is not present in stored mask!") + + elif Event.Process in pkey.events: + if not(s.processes.hasKey(int32(pkey.ident))): + raiseAssert "Process " & pkey.ident.toString() & + " is not registered in the selector!" + + let pidFd = + block: + doAssert(s.pidFd.isSome(), "process descriptor is missing") + s.pidFd.get() + + s.freeProcess(int32(pkey.ident)) + + # We need to filter pending events queue for just unregistered process. + if len(s.pendingEvents) > 0: + s.pendingEvents = + block: + var res = initDeque[ReadyKey](len(s.pendingEvents)) + for item in s.pendingEvents.items(): + if item.fd != fdi32: + res.addLast(item) + res + + if len(s.processes) == 0: + s.pidFd = Opt.none(cint) + let res = s.unregister2(pidFd) + if res.isErr(): + return err(res.error()) + + s.freeKey(fdi32) + ok() + +proc unregister2*[T](s: Selector[T], event: SelectEvent): SelectResult[void] = + s.unregister2(event.efd) + +proc prepareKey[T](s: Selector[T], event: EpollEvent): Opt[ReadyKey] = + let + defaultKey = SelectorKey[T](ident: InvalidIdent) + fdi32 = + block: + doAssert(event.data.u64 <= uint64(high(uint32)), + "Invalid user data value in epoll event object") + cast[int32](event.data.u64) + + var + pkey = s.getKey(fdi32) + rkey = ReadyKey(fd: fdi32) + + if (event.events and EPOLLERR) != 0: + rkey.events.incl(Event.Error) + rkey.errorCode = OSErrorCode(ECONNRESET) + + if (event.events and EPOLLHUP) != 0 or (event.events and EPOLLRDHUP) != 0: + rkey.events.incl(Event.Error) + rkey.errorCode = OSErrorCode(ECONNRESET) + + if (event.events and EPOLLOUT) != 0: + rkey.events.incl(Event.Write) + + if (event.events and EPOLLIN) != 0: + if Event.Read in pkey.events: + rkey.events.incl(Event.Read) + + elif Event.Timer in pkey.events: + var data: uint64 + rkey.events.incl(Event.Timer) + let res = handleEintr(osdefs.read(fdi32, addr data, sizeof(uint64))) + if res != sizeof(uint64): + rkey.events.incl(Event.Error) + rkey.errorCode = osLastError() + + elif Event.Signal in pkey.events: + var data: SignalFdInfo + let res = handleEintr(osdefs.read(fdi32, addr data, sizeof(SignalFdInfo))) + if res != sizeof(SignalFdInfo): + # We could not obtain `signal` number so we can't report an error to + # proper handler. + return Opt.none(ReadyKey) + if data.ssi_signo != uint32(SIGCHLD) or len(s.processes) == 0: + let skey = s.signals.getOrDefault(cast[int32](data.ssi_signo), + defaultKey) + if skey.ident == InvalidIdent: + # We do not have any handlers for received event so we can't report + # an error to proper handler. + return Opt.none(ReadyKey) + rkey.events.incl(Event.Signal) + rkey.fd = skey.ident + else: + # Indicate that SIGCHLD has been seen. + s.childrenExited = true + # Current signal processing. + let pidKey = s.processes.getOrDefault(cast[int32](data.ssi_pid), + defaultKey) + if pidKey.ident == InvalidIdent: + # We do not have any handlers with signal's pid. + return Opt.none(ReadyKey) + rkey.events.incl({Event.Process, Event.Oneshot, Event.Finished}) + rkey.fd = pidKey.ident + # Mark process descriptor inside fds table as finished. + var fdKey = s.fds.getOrDefault(int32(pidKey.ident), defaultKey) + if fdKey.ident != InvalidIdent: + fdKey.events.incl(Event.Finished) + s.fds[int32(pidKey.ident)] = fdKey + + elif Event.User in pkey.events: + var data: uint64 + let res = handleEintr(osdefs.read(fdi32, addr data, sizeof(uint64))) + if res != sizeof(uint64): + let errorCode = osLastError() + if errorCode == EAGAIN: + return Opt.none(ReadyKey) + else: + rkey.events.incl({Event.User, Event.Error}) + rkey.errorCode = errorCode + else: + rkey.events.incl(Event.User) + + if Event.Oneshot in rkey.events: + if Event.Timer in rkey.events: + if epoll_ctl(s.epollFd, EPOLL_CTL_DEL, fdi32, nil) != 0: + rkey.events.incl(Event.Error) + rkey.errorCode = osLastError() + # we are marking key with `Finished` event, to avoid double decrease. + rkey.events.incl(Event.Finished) + pkey.events.incl(Event.Finished) + s.fds[fdi32] = pkey + + ok(rkey) + +proc checkProcesses[T](s: Selector[T]) = + # If SIGCHLD has been seen we need to check all processes we are monitoring + # for completion, because in Linux SIGCHLD could be masked. + # You can get more information in article "Signalfd is useless" - + # https://ldpreload.com/blog/signalfd-is-useless?reposted-on-request + if not(s.childrenExited): + return + + let + defaultKey = SelectorKey[T](ident: InvalidIdent) + flags = WNOHANG or WNOWAIT or WSTOPPED or WEXITED + s.childrenExited = false + for pid, pidKey in s.processes.pairs(): + var fdKey = s.fds.getOrDefault(int32(pidKey.ident), defaultKey) + if fdKey.ident != InvalidIdent: + if Event.Finished notin fdKey.events: + var sigInfo = SigInfo() + let res = handleEintr(osdefs.waitid(P_PID, cast[Id](pid), + sigInfo, flags)) + if (res == 0) and (cint(sigInfo.si_pid) == cint(pid)): + fdKey.events.incl(Event.Finished) + let rkey = ReadyKey(fd: pidKey.ident, events: fdKey.events) + s.pendingEvents.addLast(rkey) + s.fds[int32(pidKey.ident)] = fdKey + +proc selectInto2*[T](s: Selector[T], timeout: int, + readyKeys: var openArray[ReadyKey] + ): SelectResult[int] = + var + queueEvents: array[asyncEventsCount, EpollEvent] + k: int = 0 + + verifySelectParams(timeout, -1, int(high(cint))) + + let + maxEventsCount = min(len(queueEvents), len(readyKeys)) + maxPendingEventsCount = min(maxEventsCount, len(s.pendingEvents)) + maxNewEventsCount = max(maxEventsCount - maxPendingEventsCount, 0) + + let + eventsCount = + if maxNewEventsCount > 0: + let res = handleEintr(epoll_wait(s.epollFd, addr(queueEvents[0]), + cint(maxNewEventsCount), + cint(timeout))) + if res < 0: + return err(osLastError()) + res + else: + 0 + + s.childrenExited = false + + for i in 0 ..< eventsCount: + let rkey = s.prepareKey(queueEvents[i]).valueOr: continue + readyKeys[k] = rkey + inc(k) + + s.checkProcesses() + + let pendingEventsCount = min(len(readyKeys) - eventsCount, + len(s.pendingEvents)) + + for i in 0 ..< pendingEventsCount: + readyKeys[k] = s.pendingEvents.popFirst() + inc(k) + + ok(k) + +proc select2*[T](s: Selector[T], timeout: int): SelectResult[seq[ReadyKey]] = + var res = newSeq[ReadyKey](asyncEventsCount) + let count = ? selectInto2(s, timeout, res) + res.setLen(count) + ok(res) + +proc newSelector*[T](): Selector[T] {. + raises: [Defect, OSError].} = + let res = Selector.new(T) + if res.isErr(): raiseOSError(res.error()) + res.get() + +proc close*[T](s: Selector[T]) {. + raises: [Defect, IOSelectorsException].} = + let res = s.close2() + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc newSelectEvent*(): SelectEvent {. + raises: [Defect, IOSelectorsException].} = + let res = SelectEvent.new() + if res.isErr(): raiseIOSelectorsError(res.error()) + res.get() + +proc trigger*(event: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let res = event.trigger2() + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc close*(event: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let res = event.close2() + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc registerHandle*[T](s: Selector[T], fd: cint | SocketHandle, events: set[Event], data: T) {. raises: [Defect, IOSelectorsException].} = - let fdi = int(fd) - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor " & $fdi & " already registered") - s.setKey(fdi, events, 0, data) - if events != {}: - var epv = EpollEvent(events: EPOLLRDHUP) - epv.data.u64 = fdi.uint - if Event.Read in events: epv.events = epv.events or EPOLLIN - if Event.Write in events: epv.events = epv.events or EPOLLOUT - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - inc(s.count) + let res = registerHandle2(s, fd, events, data) + if res.isErr(): raiseIOSelectorsError(res.error()) -proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) {. +proc updateHandle*[T](s: Selector[T], fd: cint | SocketHandle, + events: set[Event]) {. raises: [Defect, IOSelectorsException].} = - let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, - Event.User, Event.Oneshot, Event.Error} - let fdi = int(fd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, - "Descriptor " & $fdi & " is not registered in the selector!") - doAssert(pkey.events * maskEvents == {}) - if pkey.events != events: - var epv = EpollEvent(events: EPOLLRDHUP) - epv.data.u64 = fdi.uint + let res = updateHandle2(s, fd, events) + if res.isErr(): raiseIOSelectorsError(res.error()) - if Event.Read in events: epv.events = epv.events or EPOLLIN - if Event.Write in events: epv.events = epv.events or EPOLLOUT +proc unregister*[T](s: Selector[T], fd: cint | SocketHandle) {. + raises: [Defect, IOSelectorsException].} = + let res = unregister2(s, fd) + if res.isErr(): raiseIOSelectorsError(res.error()) - if pkey.events == {}: - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - inc(s.count) - else: - if events != {}: - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - else: - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - dec(s.count) - pkey.events = events - -proc unregister*[T](s: Selector[T], fd: int|SocketHandle) {.raises: [Defect, IOSelectorsException].} = - let fdi = int(fd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, - "Descriptor " & $fdi & " is not registered in the selector!") - if pkey.events != {}: - when not defined(android): - if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events: - var epv = EpollEvent() - # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc. - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - dec(s.count) - elif Event.Timer in pkey.events: - if Event.Finished notin pkey.events: - var epv = EpollEvent() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - dec(s.count) - if posix.close(cint(fdi)) != 0: - raiseIOSelectorsError(osLastError()) - elif Event.Signal in pkey.events: - var epv = EpollEvent() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - var nmask, omask: Sigset - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, cint(s.fds[fdi].param)) - unblockSignals(nmask, omask) - dec(s.count) - if posix.close(cint(fdi)) != 0: - raiseIOSelectorsError(osLastError()) - elif Event.Process in pkey.events: - if Event.Finished notin pkey.events: - var epv = EpollEvent() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - var nmask, omask: Sigset - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, SIGCHLD) - unblockSignals(nmask, omask) - dec(s.count) - if posix.close(cint(fdi)) != 0: - raiseIOSelectorsError(osLastError()) - else: - if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events: - var epv = EpollEvent() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - dec(s.count) - elif Event.Timer in pkey.events: - if Event.Finished notin pkey.events: - var epv = EpollEvent() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - dec(s.count) - if posix.close(cint(fdi)) != 0: - raiseIOSelectorsError(osLastError()) - clearKey(pkey) - -proc unregister*[T](s: Selector[T], ev: SelectEvent) {. +proc unregister*[T](s: Selector[T], event: SelectEvent) {. raises: [Defect, IOSelectorsException].} = - let fdi = int(ev.efd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") - doAssert(Event.User in pkey.events) - var epv = EpollEvent() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - dec(s.count) - clearKey(pkey) + let res = unregister2(s, event) + if res.isErr(): raiseIOSelectorsError(res.error()) proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, - data: T): int {. + data: T): cint {. discardable, raises: [Defect, IOSelectorsException].} = - var - newTs: Itimerspec - oldTs: Itimerspec - let fdi = timerfd_create(CLOCK_MONOTONIC, 0).int - if fdi == -1: - raiseIOSelectorsError(osLastError()) - setNonBlocking(fdi.cint) + let res = registerTimer2(s, timeout, oneshot, data) + if res.isErr(): raiseIOSelectorsError(res.error()) + res.get() - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent) - - var events = {Event.Timer} - var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) - epv.data.u64 = fdi.uint - - if oneshot: - newTs.it_interval.tv_sec = posix.Time(0) - newTs.it_interval.tv_nsec = 0 - newTs.it_value.tv_sec = posix.Time(timeout div 1_000) - newTs.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 - incl(events, Event.Oneshot) - epv.events = epv.events or EPOLLONESHOT - else: - newTs.it_interval.tv_sec = posix.Time(timeout div 1000) - newTs.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 - newTs.it_value.tv_sec = newTs.it_interval.tv_sec - newTs.it_value.tv_nsec = newTs.it_interval.tv_nsec - - if timerfd_settime(fdi.cint, cint(0), newTs, oldTs) != 0: - raiseIOSelectorsError(osLastError()) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - s.setKey(fdi, events, 0, data) - inc(s.count) - result = fdi - -when not defined(android): - proc registerSignal*[T](s: Selector[T], signal: int, - data: T): int {. - discardable, raises: [Defect, OSError, IOSelectorsException].} = - var - nmask: Sigset - omask: Sigset - - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, cint(signal)) - blockSignals(nmask, omask) - - let fdi = signalfd(-1, nmask, 0).int - if fdi == -1: - raiseIOSelectorsError(osLastError()) - setNonBlocking(fdi.cint) - - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent) - - var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) - epv.data.u64 = fdi.uint - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - s.setKey(fdi, {Event.Signal}, signal, data) - inc(s.count) - result = fdi - - proc registerProcess*[T](s: Selector, pid: int, - data: T): int {. - discardable, raises: [Defect, IOSelectorsException].} = - var - nmask: Sigset - omask: Sigset - - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, posix.SIGCHLD) - blockSignals(nmask, omask) - - let fdi = signalfd(-1, nmask, 0).int - if fdi == -1: - raiseIOSelectorsError(osLastError()) - setNonBlocking(fdi.cint) - - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent) - - var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data) - inc(s.count) - result = fdi - -proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) {.raises: [Defect, IOSelectorsException].} = - let fdi = int(ev.efd) - doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") - s.setKey(fdi, {Event.User}, 0, data) - var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) - epv.data.u64 = ev.efd.uint - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0: - raiseIOSelectorsError(osLastError()) - inc(s.count) +proc registerEvent*[T](s: Selector[T], event: SelectEvent, + data: T) {. + raises: [Defect, IOSelectorsException].} = + let res = registerEvent2(s, event, data) + if res.isErr(): raiseIOSelectorsError(res.error()) proc selectInto*[T](s: Selector[T], timeout: int, - results: var openArray[ReadyKey]): int {.raises: [Defect, IOSelectorsException].} = - var - resTable: array[MAX_EPOLL_EVENTS, EpollEvent] - maxres = MAX_EPOLL_EVENTS - i, k: int - - if maxres > len(results): - maxres = len(results) - - verifySelectParams(timeout) - - let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint, - timeout.cint) - if count < 0: - result = 0 - let err = osLastError() - if cint(err) != EINTR: - raiseIOSelectorsError(err) - elif count == 0: - result = 0 - else: - i = 0 - k = 0 - while i < count: - let fdi = int(resTable[i].data.u64) - let pevents = resTable[i].events - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent) - var rkey = ReadyKey(fd: fdi, events: {}) - - if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0: - if (pevents and EPOLLHUP) != 0: - rkey.errorCode = OSErrorCode ECONNRESET - else: - # Try reading SO_ERROR from fd. - var error: cint - var size = SockLen sizeof(error) - if getsockopt(SocketHandle fdi, SOL_SOCKET, SO_ERROR, addr(error), - addr(size)) == 0'i32: - rkey.errorCode = OSErrorCode error - - rkey.events.incl(Event.Error) - if (pevents and EPOLLOUT) != 0: - rkey.events.incl(Event.Write) - when not defined(android): - if (pevents and EPOLLIN) != 0: - if Event.Read in pkey.events: - rkey.events.incl(Event.Read) - elif Event.Timer in pkey.events: - var data: uint64 = 0 - if posix.read(cint(fdi), addr data, - sizeof(uint64)) != sizeof(uint64): - raiseIOSelectorsError(osLastError()) - rkey.events.incl(Event.Timer) - elif Event.Signal in pkey.events: - var data = SignalFdInfo() - if posix.read(cint(fdi), addr data, sizeof(SignalFdInfo)) != - sizeof(SignalFdInfo): - raiseIOSelectorsError(osLastError()) - rkey.events.incl(Event.Signal) - elif Event.Process in pkey.events: - var data = SignalFdInfo() - if posix.read(cint(fdi), addr data, sizeof(SignalFdInfo)) != - sizeof(SignalFdInfo): - raiseIOSelectorsError(osLastError()) - if data.ssi_pid == uint32(pkey.param): - rkey.events.incl(Event.Process) - else: - inc(i) - continue - elif Event.User in pkey.events: - var data: uint64 = 0 - if posix.read(cint(fdi), addr data, - sizeof(uint64)) != sizeof(uint64): - let err = osLastError() - if err == OSErrorCode(EAGAIN): - inc(i) - continue - else: - raiseIOSelectorsError(err) - rkey.events.incl(Event.User) - else: - if (pevents and EPOLLIN) != 0: - if Event.Read in pkey.events: - rkey.events.incl(Event.Read) - elif Event.Timer in pkey.events: - var data: uint64 = 0 - if posix.read(cint(fdi), addr data, - sizeof(uint64)) != sizeof(uint64): - raiseIOSelectorsError(osLastError()) - rkey.events.incl(Event.Timer) - elif Event.User in pkey.events: - var data: uint64 = 0 - if posix.read(cint(fdi), addr data, - sizeof(uint64)) != sizeof(uint64): - let err = osLastError() - if err == OSErrorCode(EAGAIN): - inc(i) - continue - else: - raiseIOSelectorsError(err) - rkey.events.incl(Event.User) - - if Event.Oneshot in pkey.events: - var epv = EpollEvent() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0: - raiseIOSelectorsError(osLastError()) - # we will not clear key until it will be unregistered, so - # application can obtain data, but we will decrease counter, - # because epoll is empty. - dec(s.count) - # we are marking key with `Finished` event, to avoid double decrease. - pkey.events.incl(Event.Finished) - - results[k] = rkey - inc(k) - inc(i) - result = k + readyKeys: var openArray[ReadyKey]): int {. + raises: [Defect, IOSelectorsException].} = + let res = selectInto2(s, timeout, readyKeys) + if res.isErr(): raiseIOSelectorsError(res.error()) + res.get() proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = - result = newSeq[ReadyKey](MAX_EPOLL_EVENTS) - let count = selectInto(s, timeout, result) - result.setLen(count) + let res = select2(s, timeout) + if res.isErr(): raiseIOSelectorsError(res.error()) + res.get() -template isEmpty*[T](s: Selector[T]): bool = - (s.count == 0) +proc contains*[T](s: Selector[T], fd: SocketHandle|cint): bool {.inline.} = + s.checkKey(int32(fd)) -proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = - let fdi = int(fd) - fdi < s.numFD and s.fds[fdi].ident != InvalidIdent +proc setData*[T](s: Selector[T], fd: SocketHandle|cint, data: T): bool = + s.fds.withValue(int32(fd), skey): + skey[].data = data + return true + do: + return false -proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = - let fdi = int(fd) - s.checkFd(fdi) - if fdi in s: - s.fds[fdi].data = data - result = true - -template withData*[T](s: Selector[T], fd: SocketHandle|int, value, - body: untyped) = - mixin checkFd - let fdi = int(fd) - if fdi in s: - var value = addr(s.fds[fdi].data) +template withData*[T](s: Selector[T], fd: SocketHandle|cint, value, + body: untyped) = + s.fds.withValue(int32(fd), skey): + var value = addr(skey[].data) body -template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, - body2: untyped) = - let fdi = int(fd) - if fdi in s: - var value = addr(s.fds[fdi].data) +template withData*[T](s: Selector[T], fd: SocketHandle|cint, value, body1, + body2: untyped) = + s.fds.withValue(int32(fd), skey): + var value = addr(skey[].data) body1 - else: + do: body2 -proc getFd*[T](s: Selector[T]): int = - return s.epollFd.int +proc getFd*[T](s: Selector[T]): cint = s.epollFd diff --git a/chronos/ioselects/ioselectors_kqueue.nim b/chronos/ioselects/ioselectors_kqueue.nim index e346f82..4ff746e 100644 --- a/chronos/ioselects/ioselectors_kqueue.nim +++ b/chronos/ioselects/ioselectors_kqueue.nim @@ -6,66 +6,27 @@ # See the file "copying.txt", included in this # distribution, for details about the copyright. # - # This module implements BSD kqueue(). -import posix, times, kqueue +{.push raises: [Defect].} +import std/[kqueue, deques, tables] +import stew/base10 const - # Maximum number of events that can be returned. - MAX_KQUEUE_EVENTS = 64 # SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them # to be constants and GC-safe. - SIG_DFL = cast[proc(x: cint) {.raises: [],noconv,gcsafe.}](0) - SIG_IGN = cast[proc(x: cint) {.raises: [],noconv,gcsafe.}](1) - -when defined(kqcache): - const CACHE_EVENTS = true - -when defined(macosx) or defined(freebsd) or defined(dragonfly): - when defined(macosx): - const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS) - else: - const MAX_DESCRIPTORS_ID = 27 # KERN_MAXFILESPERPROC (FreeBSD) - proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, - newp: pointer, newplen: csize_t): cint - {.importc: "sysctl",header: """#include - #include """} -elif defined(netbsd) or defined(openbsd): - # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using - # KERN_MAXFILES, because KERN_MAXFILES is always bigger, - # than KERN_MAXFILESPERPROC. - const MAX_DESCRIPTORS_ID = 7 # KERN_MAXFILES - proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, - newp: pointer, newplen: csize_t): cint - {.importc: "sysctl",header: """#include - #include """} - -when hasThreadSupport: - type - SelectorImpl[T] = object - kqFD: cint - maxFD: int - changes: ptr SharedArray[KEvent] - fds: ptr SharedArray[SelectorKey[T]] - count: int - changesLock: Lock - changesSize: int - changesLength: int - sock: cint - Selector*[T] = ptr SelectorImpl[T] -else: - type - SelectorImpl[T] = object - kqFD: cint - maxFD: int - changes: seq[KEvent] - fds: seq[SelectorKey[T]] - count: int - sock: cint - Selector*[T] = ref SelectorImpl[T] + SIG_DFL = cast[proc(x: cint) {.raises: [], noconv, gcsafe.}](0) + SIG_IGN = cast[proc(x: cint) {.raises: [], noconv, gcsafe.}](1) type + SelectorImpl[T] = object + kqFd: cint + fds: Table[int32, SelectorKey[T]] + virtualHoles: Deque[int32] + virtualId: int32 + + Selector*[T] = ref SelectorImpl[T] + SelectEventImpl = object rfd: cint wfd: cint @@ -74,272 +35,338 @@ type # SelectEvent is declared as `ptr` to be placed in `shared memory`, # so you can share one SelectEvent handle between threads. -proc getUnique[T](s: Selector[T]): int {.inline.} = - # we create duplicated handles to get unique indexes for our `fds` array. - result = posix.fcntl(s.sock, F_DUPFD, s.sock) - if result == -1: - raiseIOSelectorsError(osLastError()) - -proc newSelector*[T](): owned(Selector[T]) = - var maxFD = 0.cint - var size = csize_t(sizeof(cint)) - var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint] - # Obtain maximum number of opened file descriptors for process - if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size, - nil, 0) != 0: - raiseIOSelectorsError(osLastError()) - - var kqFD = kqueue() - if kqFD < 0: - raiseIOSelectorsError(osLastError()) - - # we allocating empty socket to duplicate it handle in future, to get unique - # indexes for `fds` array. This is needed to properly identify - # {Event.Timer, Event.Signal, Event.Process} events. - let usock = posix.socket(posix.AF_INET, posix.SOCK_STREAM, - posix.IPPROTO_TCP).cint - if usock == -1: - let err = osLastError() - discard posix.close(kqFD) - raiseIOSelectorsError(err) - - when hasThreadSupport: - result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) - result.fds = allocSharedArray[SelectorKey[T]](maxFD) - result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS) - result.changesSize = MAX_KQUEUE_EVENTS - initLock(result.changesLock) +proc getVirtualId[T](s: Selector[T]): SelectResult[int32] = + if len(s.virtualHoles) > 0: + ok(s.virtualHoles.popLast()) else: - result = Selector[T]() - result.fds = newSeq[SelectorKey[T]](maxFD) - result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS) + if s.virtualId == low(int32): + err(OSErrorCode(EMFILE)) + else: + dec(s.virtualId) + ok(s.virtualId) - for i in 0 ..< maxFD: - result.fds[i].ident = InvalidIdent +proc isVirtualId(ident: int32): bool = + ident < 0'i32 - result.sock = usock - result.kqFD = kqFD - result.maxFD = maxFD.int +proc toString(key: int32|cint|SocketHandle|int): string = + let fdi32 = when key is int32: key else: int32(key) + if isVirtualId(fdi32): + if fdi32 == -1: + "InvalidIdent" + else: + "V" & Base10.toString(uint32(-fdi32)) + else: + Base10.toString(uint32(fdi32)) -proc close*[T](s: Selector[T]) = - let res1 = posix.close(s.kqFD) - let res2 = posix.close(s.sock) - when hasThreadSupport: - deinitLock(s.changesLock) - deallocSharedArray(s.fds) - deallocShared(cast[pointer](s)) - if res1 != 0 or res2 != 0: - raiseIOSelectorsError(osLastError()) +template addKey[T](s: Selector[T], key: int32, skey: SelectorKey[T]) = + if s.fds.hasKeyOrPut(key, skey): + raiseAssert "Descriptor [" & key.toString() & + "] is already registered in the selector!" -proc newSelectEvent*(): SelectEvent = +template getKey[T](s: Selector[T], key: int32): SelectorKey[T] = + let + defaultKey = SelectorKey[T](ident: InvalidIdent) + pkey = s.fds.getOrDefault(key, defaultKey) + doAssert(pkey.ident != InvalidIdent, "Descriptor [" & key.toString() & + "] is not registered in the selector!") + pkey + +template checkKey[T](s: Selector[T], key: int32): bool = + s.fds.contains(key) + +proc freeKey[T](s: Selector[T], key: int32) = + s.fds.del(key) + if isVirtualId(key): + s.virtualHoles.addFirst(key) + +template getIdent(event: KEvent): int32 = + doAssert(event.ident <= uint(high(uint32)), + "Invalid event ident value [" & Base10.toString(event.ident) & + "] in the kqueue event object") + cast[int32](uint32(event.ident)) + +template getUdata(event: KEvent): int32 = + let udata = cast[uint](event.udata) + doAssert(event.ident <= uint(high(uint32)), + "Invalid event udata value [" & Base10.toString(udata) & + "] in the kqueue event object with ident [" & + Base10.toString(event.ident) & "]") + cast[int32](uint32(udata)) + +proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] = + let kqFd = + block: + let res = handleEintr(kqueue()) + if res == -1: + return err(osLastError()) + cint(res) + + let selector = Selector[T]( + kqFd: kqFd, + fds: initTable[int32, SelectorKey[T]](asyncInitialSize), + virtualId: -1'i32, # Should start with -1, because `InvalidIdent` == -1 + virtualHoles: initDeque[int32]() + ) + ok(selector) + +proc close2*[T](s: Selector[T]): SelectResult[void] = + s.fds.clear() + s.virtualHoles.clear() + s.virtualId = -1'i32 + if handleEintr(osdefs.close(s.kqFd)) != 0: + err(osLastError()) + else: + ok() + +proc new*(t: typedesc[SelectEvent]): SelectResult[SelectEvent] = var fds: array[2, cint] - if posix.pipe(fds) != 0: - raiseIOSelectorsError(osLastError()) - setNonBlocking(fds[0]) - setNonBlocking(fds[1]) - result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) - result.rfd = fds[0] - result.wfd = fds[1] + when declared(pipe2): + if osdefs.pipe2(fds, osdefs.O_NONBLOCK or osdefs.O_CLOEXEC) == -1: + return err(osLastError()) -proc trigger*(ev: SelectEvent) = + var res = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + res.rfd = fds[0] + res.wfd = fds[1] + ok(res) + else: + if osdefs.pipe(fds) == -1: + return err(osLastError()) + + let res1 = setDescriptorFlags(fds[0], true, true) + if res1.isErr(): + discard closeFd(fds[0]) + discard closeFd(fds[1]) + return err(res1.error()) + let res2 = setDescriptorFlags(fds[1], true, true) + if res2.isErr(): + discard closeFd(fds[0]) + discard closeFd(fds[1]) + return err(res2.error()) + + var res = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + res.rfd = fds[0] + res.wfd = fds[1] + ok(res) + +proc trigger2*(event: SelectEvent): SelectResult[void] = var data: uint64 = 1 - if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): - raiseIOSelectorsError(osLastError()) + let res = handleEintr(osdefs.write(event.wfd, addr data, sizeof(uint64))) + if res == -1: + err(osLastError()) + elif res != sizeof(uint64): + err(OSErrorCode(osdefs.EINVAL)) + else: + ok() + +proc close2*(ev: SelectEvent): SelectResult[void] = + let + rfd = ev.rfd + wfd = ev.wfd -proc close*(ev: SelectEvent) = - let res1 = posix.close(ev.rfd) - let res2 = posix.close(ev.wfd) deallocShared(cast[pointer](ev)) - if res1 != 0 or res2 != 0: - raiseIOSelectorsError(osLastError()) -template checkFd(s, f) = - if f >= s.maxFD: - raiseIOSelectorsError("Maximum number of descriptors is exhausted!") + if closeFd(rfd) != 0: + let errorCode = osLastError() + discard closeFd(wfd) + err(errorCode) + else: + if closeFd(wfd) != 0: + err(osLastError()) + else: + ok() -when hasThreadSupport: - template withChangeLock[T](s: Selector[T], body: untyped) = - acquire(s.changesLock) - {.locks: [s.changesLock].}: - try: - body - finally: - release(s.changesLock) -else: - template withChangeLock(s, body: untyped) = - body +template modifyKQueue(changes: var openArray[KEvent], index: int, nident: uint, + nfilter: cshort, nflags: cushort, nfflags: cuint, + ndata: int, nudata: pointer) = + changes[index] = KEvent(ident: nident, filter: nfilter, flags: nflags, + fflags: nfflags, data: ndata, udata: nudata) -when hasThreadSupport: - template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, - nflags: cushort, nfflags: cuint, ndata: int, - nudata: pointer) = - mixin withChangeLock - s.withChangeLock(): - if s.changesLength == s.changesSize: - # if cache array is full, we allocating new with size * 2 - let newSize = s.changesSize shl 1 - let rdata = allocSharedArray[KEvent](newSize) - copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent)) - s.changesSize = newSize - s.changes[s.changesLength] = KEvent(ident: nident, - filter: nfilter, flags: nflags, - fflags: nfflags, data: ndata, - udata: nudata) - inc(s.changesLength) - - when not declared(CACHE_EVENTS): - template flushKQueue[T](s: Selector[T]) = - mixin withChangeLock - s.withChangeLock(): - if s.changesLength > 0: - if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), - nil, 0, nil) == -1: - raiseIOSelectorsError(osLastError()) - s.changesLength = 0 -else: - template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, - nflags: cushort, nfflags: cuint, ndata: int, - nudata: pointer) = - s.changes.add(KEvent(ident: nident, - filter: nfilter, flags: nflags, - fflags: nfflags, data: ndata, - udata: nudata)) - - when not declared(CACHE_EVENTS): - template flushKQueue[T](s: Selector[T]) = - let length = cint(len(s.changes)) - if length > 0: - if kevent(s.kqFD, addr(s.changes[0]), length, - nil, 0, nil) == -1: - raiseIOSelectorsError(osLastError()) - s.changes.setLen(0) - -proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, - events: set[Event], data: T) = - let fdi = int(fd) - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent) - s.setKey(fdi, events, 0, data) +proc registerHandle2*[T](s: Selector[T], fd: cint, events: set[Event], + data: T): SelectResult[void] = + let selectorKey = SelectorKey[T](ident: fd, events: events, + param: 0, data: data) + s.addKey(fd, selectorKey) if events != {}: + var + changes: array[2, KEvent] + k = 0 if Event.Read in events: - modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) + changes.modifyKQueue(k, uint(uint32(fd)), EVFILT_READ, EV_ADD, 0, 0, nil) + inc(k) if Event.Write in events: - modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil) - inc(s.count) + changes.modifyKQueue(k, uint(uint32(fd)), EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(k) + if k > 0: + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(k), nil, + 0, nil)) == -1: + s.freeKey(fd) + return err(osLastError()) + ok() - when not declared(CACHE_EVENTS): - flushKQueue(s) - -proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, - events: set[Event]) = - let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, +proc updateHandle2*[T](s: Selector[T], fd: cint, + events: set[Event]): SelectResult[void] = + let EventsMask = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, Event.User, Event.Oneshot, Event.Error} - let fdi = int(fd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, - "Descriptor $# is not registered in the queue!" % $fdi) - doAssert(pkey.events * maskEvents == {}) - - if pkey.events != events: - if (Event.Read in pkey.events) and (Event.Read notin events): - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) - dec(s.count) - if (Event.Write in pkey.events) and (Event.Write notin events): - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) - dec(s.count) - if (Event.Read notin pkey.events) and (Event.Read in events): - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) - if (Event.Write notin pkey.events) and (Event.Write in events): - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) - inc(s.count) - - when not declared(CACHE_EVENTS): - flushKQueue(s) - - pkey.events = events + s.fds.withValue(int32(fd), pkey): + doAssert(pkey[].events * EventsMask == {}, + "Descriptor [" & fd.toString() & "] could not be updated!") + if pkey.events != events: + var + changes: array[4, KEvent] + k = 0 + if (Event.Read in pkey[].events) and (Event.Read notin events): + changes.modifyKQueue(k, uint(uint32(fd)), EVFILT_READ, EV_DELETE, + 0, 0, nil) + inc(k) + if (Event.Write in pkey[].events) and (Event.Write notin events): + changes.modifyKQueue(k, uint(uint32(fd)), EVFILT_WRITE, EV_DELETE, + 0, 0, nil) + inc(k) + if (Event.Read notin pkey[].events) and (Event.Read in events): + changes.modifyKQueue(k, uint(uint32(fd)), EVFILT_READ, EV_ADD, + 0, 0, nil) + inc(k) + if (Event.Write notin pkey[].events) and (Event.Write in events): + changes.modifyKQueue(k, uint(uint32(fd)), EVFILT_WRITE, EV_ADD, + 0, 0, nil) + inc(k) + if k > 0: + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(k), nil, + 0, nil)) == -1: + return err(osLastError()) + pkey[].events = events + do: + raiseAssert "Descriptor [" & fd.toString() & + "] is not registered in the selector!" + ok() proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, - data: T): int {.discardable.} = - let fdi = getUnique(s) - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent) - - let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer} - let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD - - s.setKey(fdi, events, 0, data) + data: T): SelectResult[cint] = + let + fdi32 = ? s.getVirtualId() + events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer} + flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD + selectorKey = SelectorKey[T](ident: fdi32, events: events, param: timeout, + data: data) + var changes: array[1, KEvent] + s.addKey(fdi32, selectorKey) # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds # too - modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil) + changes.modifyKQueue(0, uint(uint32(fdi32)), EVFILT_TIMER, flags, 0, + cint(timeout), nil) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, 0, nil)) == -1: + s.freeKey(fdi32) + return err(osLastError()) - when not declared(CACHE_EVENTS): - flushKQueue(s) + ok(cint(fdi32)) - inc(s.count) - result = fdi +proc blockSignal(signal: int): SelectResult[void] = + var omask, nmask: Sigset + if sigemptyset(nmask) < 0: + return err(osLastError()) + if sigemptyset(omask) < 0: + return err(osLastError()) + if sigaddset(nmask, cint(signal)) < 0: + return err(osLastError()) + ? blockSignals(nmask, omask) + ok() + +proc unblockSignal(signal: int): SelectResult[void] = + var omask, nmask: Sigset + if sigemptyset(nmask) < 0: + return err(osLastError()) + if sigemptyset(omask) < 0: + return err(osLastError()) + if sigaddset(nmask, cint(signal)) < 0: + return err(osLastError()) + ? unblockSignals(nmask, omask) + ok() + +template checkSignal(signal: int) = + doAssert((signal >= 0) and (signal <= int(high(int32))), + "Invalid signal value [" & $signal & "]") proc registerSignal*[T](s: Selector[T], signal: int, - data: T): int {.discardable.} = - let fdi = getUnique(s) - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent) + data: T): SelectResult[cint] = + checkSignal(signal) - s.setKey(fdi, {Event.Signal}, signal, data) - var nmask, omask: Sigset - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, cint(signal)) - blockSignals(nmask, omask) - # to be compatible with linux semantic we need to "eat" signals - posix.signal(cint(signal), SIG_IGN) + let + fdi32 = ? s.getVirtualId() + events = {Event.Signal} + selectorKey = SelectorKey[T](ident: fdi32, events: events, + param: signal, data: data) - modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, - cast[pointer](fdi)) + var changes: array[1, KEvent] + s.addKey(fdi32, selectorKey) - when not declared(CACHE_EVENTS): - flushKQueue(s) + let res = blockSignal(signal) + if res.isErr(): + s.freeKey(fdi32) + return err(res.error()) - inc(s.count) - result = fdi + # To be compatible with linux semantic we need to "eat" signals + signal(cint(signal), SIG_IGN) + changes.modifyKQueue(0, uint(signal), EVFILT_SIGNAL, EV_ADD, 0, 0, + cast[pointer](uint32(fdi32))) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, 0, nil)) == -1: + let errorCode = osLastError() + s.freeKey(fdi32) + discard unblockSignal(signal) + return err(errorCode) + + ok(cint(fdi32)) + +template checkPid(pid: int) = + when sizeof(int) == 8: + doAssert(pid >= 0 and pid <= int(high(uint32)), + "Invalid process idientified (pid) value") + else: + doAssert(pid >= 0 and pid <= high(int32), + "Invalid process idientified (pid) value") proc registerProcess*[T](s: Selector[T], pid: int, - data: T): int {.discardable.} = - let fdi = getUnique(s) - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent) + data: T): SelectResult[cint] = + checkPid(pid) - var kflags: cushort = EV_ONESHOT or EV_ADD - setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data) + let + fdi32 = ? s.getVirtualId() + events = {Event.Process, Event.Oneshot} + flags: cushort = EV_ONESHOT or EV_ADD + selectorKey = SelectorKey[T](ident: fdi32, events: events, + param: pid, data: data) + var changes: array[1, KEvent] + s.addKey(fdi32, selectorKey) - modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, - cast[pointer](fdi)) + changes.modifyKQueue(0, uint(uint32(pid)), EVFILT_PROC, flags, NOTE_EXIT, + 0, cast[pointer](uint32(fdi32))) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, 0, nil)) == -1: + s.freeKey(fdi32) + return err(osLastError()) - when not declared(CACHE_EVENTS): - flushKQueue(s) + ok(cint(fdi32)) - inc(s.count) - result = fdi +proc registerEvent2*[T](s: Selector[T], ev: SelectEvent, + data: T): SelectResult[cint] = + doAssert(not(isNil(ev))) + let + selectorKey = SelectorKey[T](ident: ev.rfd, events: {Event.User}, + param: 0, data: data) -proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = - let fdi = ev.rfd.int - doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") - setKey(s, fdi, {Event.User}, 0, data) + var changes: array[1, KEvent] + s.addKey(ev.rfd, selectorKey) - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + changes.modifyKQueue(0, uint(uint32(ev.rfd)), EVFILT_READ, EV_ADD, 0, 0, nil) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, 0, nil)) == -1: + s.freeKey(ev.rfd) + return err(osLastError()) - when not declared(CACHE_EVENTS): - flushKQueue(s) - - inc(s.count) + ok(ev.rfd) template processVnodeEvents(events: set[Event]): cuint = - var rfflags = 0.cuint + var rfflags = cuint(0) if events == {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend, Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename, Event.VnodeRevoke}: @@ -355,271 +382,329 @@ template processVnodeEvents(events: set[Event]): cuint = if Event.VnodeRevoke in events: rfflags = rfflags or NOTE_REVOKE rfflags -proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) = - let fdi = fd.int - setKey(s, fdi, {Event.Vnode} + events, 0, data) - var fflags = processVnodeEvents(events) +proc registerVnode2*[T](s: Selector[T], fd: cint, events: set[Event], + data: T): SelectResult[cint] = + let + events = {Event.Vnode} + events + fflags = processVnodeEvents(events) + selectorKey = SelectorKey[T](ident: fd, events: events, + param: 0, data: data) - modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil) + var changes: array[1, KEvent] + s.addKey(fd, selectorKey) - when not declared(CACHE_EVENTS): - flushKQueue(s) + changes.modifyKQueue(0, uint(uint32(fd)), EVFILT_VNODE, EV_ADD or EV_CLEAR, + fflags, 0, nil) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, 0, nil)) == -1: + s.freeKey(fd) + return err(osLastError()) - inc(s.count) + ok(fd) -proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = - let fdi = int(fd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, - "Descriptor [" & $fdi & "] is not registered in the queue!") +proc unregister2*[T](s: Selector[T], fd: cint): SelectResult[void] = + let + fdi32 = int32(fd) + pkey = s.getKey(fdi32) + + var changes: array[2, KEvent] + var k = 0 if pkey.events != {}: if pkey.events * {Event.Read, Event.Write} != {}: if Event.Read in pkey.events: - modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) - dec(s.count) + changes.modifyKQueue(k, uint(uint32(fdi32)), EVFILT_READ, EV_DELETE, + 0, 0, nil) + inc(k) if Event.Write in pkey.events: - modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil) - dec(s.count) - when not declared(CACHE_EVENTS): - flushKQueue(s) + changes.modifyKQueue(k, uint(uint32(fdi32)), EVFILT_WRITE, EV_DELETE, + 0, 0, nil) + inc(k) + if k > 0: + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(k), nil, + 0, nil)) == -1: + return err(osLastError()) + elif Event.Timer in pkey.events: if Event.Finished notin pkey.events: - modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil) - when not declared(CACHE_EVENTS): - flushKQueue(s) - dec(s.count) - if posix.close(cint(pkey.ident)) != 0: - raiseIOSelectorsError(osLastError()) + changes.modifyKQueue(0, uint(uint32(fdi32)), EVFILT_TIMER, EV_DELETE, + 0, 0, nil) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, + 0, nil)) == -1: + return err(osLastError()) + elif Event.Signal in pkey.events: - var nmask, omask: Sigset - let signal = cint(pkey.param) - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, signal) - unblockSignals(nmask, omask) - posix.signal(signal, SIG_DFL) - modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil) - when not declared(CACHE_EVENTS): - flushKQueue(s) - dec(s.count) - if posix.close(cint(pkey.ident)) != 0: - raiseIOSelectorsError(osLastError()) + let sig = cint(pkey.param) + osdefs.signal(sig, SIG_DFL) + changes.modifyKQueue(0, uint(uint32(pkey.param)), EVFILT_SIGNAL, + EV_DELETE, 0, 0, nil) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, + 0, nil)) == -1: + discard unblockSignal(sig) + return err(osLastError()) + + ? unblockSignal(sig) + elif Event.Process in pkey.events: if Event.Finished notin pkey.events: - modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil) - when not declared(CACHE_EVENTS): - flushKQueue(s) - dec(s.count) - if posix.close(cint(pkey.ident)) != 0: - raiseIOSelectorsError(osLastError()) + changes.modifyKQueue(0, uint(uint32(pkey.param)), EVFILT_PROC, + EV_DELETE, 0, 0, nil) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, + 0, nil)) == -1: + return err(osLastError()) + elif Event.Vnode in pkey.events: - modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil) - when not declared(CACHE_EVENTS): - flushKQueue(s) - dec(s.count) + changes.modifyKQueue(0, uint(uint32(fdi32)), EVFILT_VNODE, EV_DELETE, + 0, 0, nil) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, + 0, nil)) == -1: + return err(osLastError()) + elif Event.User in pkey.events: - modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) - when not declared(CACHE_EVENTS): - flushKQueue(s) - dec(s.count) + changes.modifyKQueue(0, uint(uint32(fdi32)), EVFILT_READ, EV_DELETE, + 0, 0, nil) + if handleEintr(kevent(s.kqFd, addr(changes[0]), cint(1), nil, + 0, nil)) == -1: + return err(osLastError()) - clearKey(pkey) + s.freeKey(fdi32) + ok() -proc unregister*[T](s: Selector[T], ev: SelectEvent) = - let fdi = int(ev.rfd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") - doAssert(Event.User in pkey.events) - modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) - when not declared(CACHE_EVENTS): - flushKQueue(s) - clearKey(pkey) - dec(s.count) +proc unregister2*[T](s: Selector[T], event: SelectEvent): SelectResult[void] = + s.unregister2(event.rfd) -proc selectInto*[T](s: Selector[T], timeout: int, - results: var openArray[ReadyKey]): int = +proc prepareKey[T](s: Selector[T], event: KEvent): Opt[ReadyKey] = + let fdi32 = event.getIdent() + + var rkey = ReadyKey(fd: fdi32, events: {}) + var pkey = + case event.filter: + of EVFILT_READ, EVFILT_WRITE, EVFILT_TIMER, EVFILT_VNODE: + s.getKey(fdi32) + of EVFILT_SIGNAL, EVFILT_PROC: + let virtualFd = event.getUdata() + s.getKey(virtualFd) + else: + raiseAssert "Unsupported kqueue filter [" & $event.filter & "] reported!" + + case event.filter + of EVFILT_READ: + if (event.flags and EV_EOF) != 0: + rkey.events.incl(Event.Error) + rkey.errorCode = OSErrorCode(ECONNRESET) + + if Event.User in pkey.events: + var data: uint64 = 0 + if handleEintr(osdefs.read(cint(event.ident), addr data, + sizeof(uint64))) != sizeof(uint64): + let errorCode = osLastError() + if errorCode == EAGAIN: + # Someone already consumed event data + return Opt.none(ReadyKey) + else: + rkey.events.incl(Event.Error) + rkey.errorCode = errorCode + rkey.events.incl(Event.User) + else: + rkey.events.incl(Event.Read) + + of EVFILT_WRITE: + if (event.flags and EV_EOF) != 0: + rkey.events.incl(Event.Error) + rkey.errorCode = OSErrorCode(ECONNRESET) + + rkey.events.incl(Event.Write) + + of EVFILT_TIMER: + rkey.events.incl(Event.Timer) + if Event.Oneshot in pkey.events: + # we are marking key with `Finished` event, to avoid double decrease. + pkey.events.incl(Event.Finished) + rkey.events.incl({Event.Oneshot, Event.Finished}) + s.fds[fdi32] = pkey + + of EVFILT_VNODE: + rkey.events.incl(Event.Vnode) + if (event.fflags and NOTE_DELETE) != 0: rkey.events.incl(Event.VnodeDelete) + if (event.fflags and NOTE_WRITE) != 0: rkey.events.incl(Event.VnodeWrite) + if (event.fflags and NOTE_EXTEND) != 0: rkey.events.incl(Event.VnodeExtend) + if (event.fflags and NOTE_ATTRIB) != 0: rkey.events.incl(Event.VnodeAttrib) + if (event.fflags and NOTE_LINK) != 0: rkey.events.incl(Event.VnodeLink) + if (event.fflags and NOTE_RENAME) != 0: rkey.events.incl(Event.VnodeRename) + if (event.fflags and NOTE_REVOKE) != 0: rkey.events.incl(Event.VnodeRevoke) + + of EVFILT_SIGNAL: + rkey.events.incl(Event.Signal) + rkey.fd = pkey.ident + + of EVFILT_PROC: + rkey.events.incl({Event.Process, Event.Oneshot, Event.Finished}) + rkey.fd = pkey.ident + pkey.events.incl(Event.Finished) + s.fds[int32(pkey.ident)] = pkey + + else: + raiseAssert "Unsupported kqueue filter [" & $event.filter & "] reported!" + + ok(rkey) + +proc selectInto2*[T](s: Selector[T], timeout: int, + readyKeys: var openArray[ReadyKey] + ): SelectResult[int] = var tv: Timespec - resTable: array[MAX_KQUEUE_EVENTS, KEvent] - ptv = addr tv - maxres = MAX_KQUEUE_EVENTS + queueEvents: array[asyncEventsCount, KEvent] - verifySelectParams(timeout) + verifySelectParams(timeout, -1, high(int)) - if timeout != -1: - if timeout >= 1000: - tv.tv_sec = posix.Time(timeout div 1_000) - tv.tv_nsec = (timeout %% 1_000) * 1_000_000 - else: - tv.tv_sec = posix.Time(0) - tv.tv_nsec = timeout * 1_000_000 - else: - ptv = nil - - if maxres > len(results): - maxres = len(results) - - var count = 0 - when not declared(CACHE_EVENTS): - count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv) - else: - when hasThreadSupport: - s.withChangeLock(): - if s.changesLength > 0: - count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), - addr(resTable[0]), cint(maxres), ptv) - s.changesLength = 0 + let + ptrTimeout = + if timeout != -1: + if timeout >= 1000: + tv.tv_sec = Time(timeout div 1_000) + tv.tv_nsec = (timeout %% 1_000) * 1_000_000 else: - count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), - ptv) - else: - let length = cint(len(s.changes)) - if length > 0: - count = kevent(s.kqFD, addr(s.changes[0]), length, - addr(resTable[0]), cint(maxres), ptv) - s.changes.setLen(0) + tv.tv_sec = Time(0) + tv.tv_nsec = timeout * 1_000_000 + addr tv else: - count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), - ptv) - - if count < 0: - result = 0 - let err = osLastError() - if cint(err) != EINTR: - raiseIOSelectorsError(err) - elif count == 0: - result = 0 - else: - var i = 0 - var k = 0 # do not delete this, because `continue` used in cycle. - var pkey: ptr SelectorKey[T] - while i < count: - let kevent = addr(resTable[i]) - var rkey = ReadyKey(fd: int(kevent.ident), events: {}) - - if (kevent.flags and EV_ERROR) != 0: - rkey.events = {Event.Error} - rkey.errorCode = OSErrorCode(kevent.data) - - case kevent.filter: - of EVFILT_READ: - pkey = addr(s.fds[int(kevent.ident)]) - rkey.events.incl(Event.Read) - if Event.User in pkey.events: - var data: uint64 = 0 - if posix.read(cint(kevent.ident), addr data, - sizeof(uint64)) != sizeof(uint64): - let err = osLastError() - if err == OSErrorCode(EAGAIN): - # someone already consumed event data - inc(i) + nil + maxEventsCount = cint(min(asyncEventsCount, len(readyKeys))) + eventsCount = + block: + var res = 0 + while true: + res = kevent(s.kqFd, nil, cint(0), addr(queueEvents[0]), + maxEventsCount, ptrTimeout) + if res < 0: + let errorCode = osLastError() + if errorCode == EINTR: continue - else: - raiseIOSelectorsError(err) - rkey.events = {Event.User} - of EVFILT_WRITE: - pkey = addr(s.fds[int(kevent.ident)]) - rkey.events.incl(Event.Write) - rkey.events = {Event.Write} - of EVFILT_TIMER: - pkey = addr(s.fds[int(kevent.ident)]) - if Event.Oneshot in pkey.events: - # we will not clear key until it will be unregistered, so - # application can obtain data, but we will decrease counter, - # because kqueue is empty. - dec(s.count) - # we are marking key with `Finished` event, to avoid double decrease. - pkey.events.incl(Event.Finished) - rkey.events.incl(Event.Timer) - of EVFILT_VNODE: - pkey = addr(s.fds[int(kevent.ident)]) - rkey.events.incl(Event.Vnode) - if (kevent.fflags and NOTE_DELETE) != 0: - rkey.events.incl(Event.VnodeDelete) - if (kevent.fflags and NOTE_WRITE) != 0: - rkey.events.incl(Event.VnodeWrite) - if (kevent.fflags and NOTE_EXTEND) != 0: - rkey.events.incl(Event.VnodeExtend) - if (kevent.fflags and NOTE_ATTRIB) != 0: - rkey.events.incl(Event.VnodeAttrib) - if (kevent.fflags and NOTE_LINK) != 0: - rkey.events.incl(Event.VnodeLink) - if (kevent.fflags and NOTE_RENAME) != 0: - rkey.events.incl(Event.VnodeRename) - if (kevent.fflags and NOTE_REVOKE) != 0: - rkey.events.incl(Event.VnodeRevoke) - of EVFILT_SIGNAL: - pkey = addr(s.fds[cast[int](kevent.udata)]) - rkey.fd = cast[int](kevent.udata) - rkey.events.incl(Event.Signal) - of EVFILT_PROC: - rkey.fd = cast[int](kevent.udata) - pkey = addr(s.fds[cast[int](kevent.udata)]) - # we will not clear key, until it will be unregistered, so - # application can obtain data, but we will decrease counter, - # because kqueue is empty. - dec(s.count) - # we are marking key with `Finished` event, to avoid double decrease. - pkey.events.incl(Event.Finished) - rkey.events.incl(Event.Process) - else: - doAssert(true, "Unsupported kqueue filter in the queue!") + return err(errorCode) + else: + break + res - if (kevent.flags and EV_EOF) != 0: - # TODO this error handling needs to be rethought. - # `fflags` can sometimes be `0x80000000` and thus we use 'cast' - # here: - if kevent.fflags != 0: - rkey.errorCode = cast[OSErrorCode](kevent.fflags) - else: - # This assumes we are dealing with sockets. - # TODO: For future-proofing it might be a good idea to give the - # user access to the raw `kevent`. - rkey.errorCode = OSErrorCode(ECONNRESET) - rkey.events.incl(Event.Error) + var k = 0 + for i in 0 ..< eventsCount: + let rkey = s.prepareKey(queueEvents[i]).valueOr: continue + readyKeys[k] = rkey + inc(k) - results[k] = rkey - inc(k) - inc(i) - result = k + ok(k) -proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = - result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS) - let count = selectInto(s, timeout, result) - result.setLen(count) +proc select2*[T](s: Selector[T], + timeout: int): Result[seq[ReadyKey], OSErrorCode] = + var res = newSeq[ReadyKey](asyncEventsCount) + let count = ? selectInto2(s, timeout, res) + res.setLen(count) + ok(res) -template isEmpty*[T](s: Selector[T]): bool = - (s.count == 0) +proc newSelector*[T](): owned(Selector[T]) {. + raises: [Defect, IOSelectorsException].} = + let res = Selector.new(T) + if res.isErr(): + raiseIOSelectorsError(res.error()) + res.get() -proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = - let fdi = fd.int - fdi < s.maxFD and s.fds[fd.int].ident != InvalidIdent +proc newSelectEvent*(): SelectEvent {. + raises: [Defect, IOSelectorsException].} = + let res = SelectEvent.new() + if res.isErr(): + raiseIOSelectorsError(res.error()) + res.get() -proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = - let fdi = int(fd) - if fdi in s: - s.fds[fdi].data = data - result = true +proc trigger*(ev: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let res = ev.trigger2() + if res.isErr(): + raiseIOSelectorsError(res.error()) -template withData*[T](s: Selector[T], fd: SocketHandle|int, value, - body: untyped) = - let fdi = int(fd) - if fdi in s: - var value = addr(s.fds[fdi].data) +proc close*(ev: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let res = ev.close2() + if res.isErr(): + raiseIOSelectorsError(res.error()) + +proc registerHandle*[T](s: Selector[T], fd: cint | SocketHandle, + events: set[Event], data: T) {. + raises: [Defect, IOSelectorsException].} = + let res = registerHandle2(s, cint(fd), events, data) + if res.isErr(): + raiseIOSelectorsError(res.error()) + +proc updateHandle*[T](s: Selector[T], fd: cint | SocketHandle, + events: set[Event]) {. + raises: [Defect, IOSelectorsException].} = + let res = updateHandle2(s, cint(fd), events) + if res.isErr(): + raiseIOSelectorsError(res.error()) + +proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) {. + raises: [Defect, IOSelectorsException].} = + let res = registerEvent2(s, ev, data) + if res.isErr(): + raiseIOSelectorsError(res.error()) + +proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) {. + raises: [Defect, IOSelectorsException].} = + let res = registerVnode2(s, fd, events, data) + if res.isErr(): + raiseIOSelectorsError(res.error()) + +proc unregister*[T](s: Selector[T], event: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let res = unregister2(s, event) + if res.isErr(): + raiseIOSelectorsError(res.error()) + +proc unregister*[T](s: Selector[T], fd: cint|SocketHandle) {. + raises: [Defect, IOSelectorsException].} = + let res = unregister2(s, fd) + if res.isErr(): + raiseIOSelectorsError(res.error()) + +proc selectInto*[T](s: Selector[T], timeout: int, + results: var openArray[ReadyKey]): int {. + raises: [Defect, IOSelectorsException].} = + let res = selectInto2(s, timeout, results) + if res.isErr(): + raiseIOSelectorsError(res.error()) + res.get() + +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] {. + raises: [Defect, IOSelectorsException].} = + let res = select2(s, timeout) + if res.isErr(): + raiseIOSelectorsError(res.error()) + res.get() + +proc close*[T](s: Selector[T]) {.raises: [Defect, IOSelectorsException].} = + let res = s.close2() + if res.isErr(): + raiseIOSelectorsError(res.error()) + +proc contains*[T](s: Selector[T], fd: SocketHandle|cint): bool {.inline.} = + s.checkKey(int32(fd)) + +proc setData*[T](s: Selector[T], fd: SocketHandle|cint, data: T): bool = + s.fds.withValue(int32(fd), skey): + skey[].data = data + return true + do: + return false + +template withData*[T](s: Selector[T], fd: SocketHandle|cint, value, + body: untyped) = + s.fds.withValue(int32(fd), skey): + var value = addr(skey[].data) body -template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, - body2: untyped) = - let fdi = int(fd) - if fdi in s: - var value = addr(s.fds[fdi].data) +template withData*[T](s: Selector[T], fd: SocketHandle|cint, value, body1, + body2: untyped) = + s.fds.withValue(int32(fd), skey): + var value = addr(skey[].data) body1 - else: + do: body2 - -proc getFd*[T](s: Selector[T]): int = - return s.kqFD.int +proc getFd*[T](s: Selector[T]): cint = s.kqFd diff --git a/chronos/ioselects/ioselectors_poll.nim b/chronos/ioselects/ioselectors_poll.nim index 8c2e9f5..9ff8ad1 100644 --- a/chronos/ioselects/ioselectors_poll.nim +++ b/chronos/ioselects/ioselectors_poll.nim @@ -8,31 +8,19 @@ # # This module implements Posix poll(). +import std/tables +import stew/base10 -import posix, times - -# Maximum number of events that can be returned -const MAX_POLL_EVENTS = 64 - -when hasThreadSupport: - type - SelectorImpl[T] = object - maxFD : int - pollcnt: int - fds: ptr SharedArray[SelectorKey[T]] - pollfds: ptr SharedArray[TPollFd] - count: int - lock: Lock - Selector*[T] = ptr SelectorImpl[T] +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} else: - type - SelectorImpl[T] = object - maxFD : int - pollcnt: int - fds: seq[SelectorKey[T]] - pollfds: seq[TPollFd] - count: int - Selector*[T] = ref SelectorImpl[T] + {.push raises: [].} + +type + SelectorImpl[T] = object + fds: Table[int32, SelectorKey[T]] + pollfds: seq[TPollFd] + Selector*[T] = ref SelectorImpl[T] type SelectEventImpl = object @@ -40,271 +28,316 @@ type wfd: cint SelectEvent* = ptr SelectEventImpl -when hasThreadSupport: - template withPollLock[T](s: Selector[T], body: untyped) = - acquire(s.lock) - {.locks: [s.lock].}: - try: - body - finally: - release(s.lock) -else: - template withPollLock(s, body: untyped) = - body +proc toString(key: int32): string = + Base10.toString(uint32(key)) -proc newSelector*[T](): Selector[T] = - var a = RLimit() - if getrlimit(posix.RLIMIT_NOFILE, a) != 0: - raiseIOSelectorsError(osLastError()) - var maxFD = int(a.rlim_max) +template addKey[T](s: Selector[T], key: int32, skey: SelectorKey[T]) = + if s.fds.hasKeyOrPut(key, skey): + raiseAssert "Descriptor [" & key.toString() & + "] is already registered in the selector!" - when hasThreadSupport: - result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) - result.maxFD = maxFD - result.fds = allocSharedArray[SelectorKey[T]](maxFD) - result.pollfds = allocSharedArray[TPollFd](maxFD) - initLock(result.lock) +template getKey[T](s: Selector[T], key: int32): SelectorKey[T] = + let + defaultKey = SelectorKey[T](ident: InvalidIdent) + pkey = s.fds.getOrDefault(key, defaultKey) + doAssert(pkey.ident != InvalidIdent, + "Descriptor [" & key.toString() & + "] is not registered in the selector!") + pkey + +template checkKey[T](s: Selector[T], key: int32): bool = + s.fds.contains(key) + +proc freeKey[T](s: Selector[T], key: int32) = + s.fds.del(key) + +proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] = + let selector = Selector[T]( + fds: initTable[int32, SelectorKey[T]](asyncInitialSize) + ) + ok(selector) + +proc close2*[T](s: Selector[T]): SelectResult[void] = + s.fds.clear() + s.pollfds.clear() + +proc new*(t: typedesc[SelectEvent]): SelectResult[SelectEvent] = + let flags = {DescriptorFlag.NonBlock, DescriptorFlag.CloseOnExec} + let pipes = ? createOsPipe(flags, flags) + var res = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + res.rfd = pipes.read + res.wfd = pipes.write + ok(res) + +proc trigger2*(event: SelectEvent): SelectResult[void] = + var data: uint64 = 1 + let res = handleEintr(osdefs.write(event.wfd, addr data, sizeof(uint64))) + if res == -1: + err(osLastError()) + elif res != sizeof(uint64): + err(OSErrorCode(osdefs.EINVAL)) else: - result = Selector[T]() - result.maxFD = maxFD - result.fds = newSeq[SelectorKey[T]](maxFD) - result.pollfds = newSeq[TPollFd](maxFD) + ok() - for i in 0 ..< maxFD: - result.fds[i].ident = InvalidIdent +proc close2*(event: SelectEvent): SelectResult[void] = + let + rfd = event.rfd + wfd = event.wfd + deallocShared(cast[pointer](event)) + let rres = handleEintr(osdefs.close(rfd)) + if rres == -1: + discard osdefs.close(wfd) + return err(osLastError()) + let wres = handleEintr(osdefs.close(wfd)) + if wres == -1: + err(osLastError()) + else: + ok() -proc close*[T](s: Selector[T]) = - when hasThreadSupport: - deinitLock(s.lock) - deallocSharedArray(s.fds) - deallocSharedArray(s.pollfds) - deallocShared(cast[pointer](s)) +template toPollEvents(events: set[Event]): cshort = + var res = cshort(0) + if Event.Read in events: res = res or POLLIN + if Event.Write in events: res = res or POLLOUT + res template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) = - withPollLock(s): - var pollev: cshort = 0 - if Event.Read in events: pollev = pollev or POLLIN - if Event.Write in events: pollev = pollev or POLLOUT - s.pollfds[s.pollcnt].fd = cint(sock) - s.pollfds[s.pollcnt].events = pollev - inc(s.count) - inc(s.pollcnt) + s.pollfds.add(TPollFd(fd: sock, events: toPollEvents(events), revents: 0)) template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) = - withPollLock(s): - var i = 0 - var pollev: cshort = 0 - if Event.Read in events: pollev = pollev or POLLIN - if Event.Write in events: pollev = pollev or POLLOUT - - while i < s.pollcnt: - if s.pollfds[i].fd == sock: - s.pollfds[i].events = pollev - break - inc(i) - doAssert(i < s.pollcnt, - "Descriptor [" & $sock & "] is not registered in the queue!") + var updated = false + for mitem in s.pollfds.mitems(): + if mitem.fd == sock: + mitem.events = toPollEvents(events) + break + if not(updated): + raiseAssert "Descriptor [" & $sock & "] is not registered in the queue!" template pollRemove[T](s: Selector[T], sock: cint) = - withPollLock(s): - var i = 0 - while i < s.pollcnt: - if s.pollfds[i].fd == sock: - if i == s.pollcnt - 1: - s.pollfds[i].fd = 0 - s.pollfds[i].events = 0 - s.pollfds[i].revents = 0 - else: - while i < (s.pollcnt - 1): - s.pollfds[i].fd = s.pollfds[i + 1].fd - s.pollfds[i].events = s.pollfds[i + 1].events - inc(i) - break - inc(i) - dec(s.pollcnt) - dec(s.count) + let index = + block: + var res = -1 + for key, item in s.pollfds.pairs(): + if item.fd == sock: + res = key + break + res + if index < 0: + raiseAssert "Descriptor [" & $sock & "] is not registered in the queue!" + else: + s.pollfds.del(index) -template checkFd(s, f) = - if f >= s.maxFD: - raiseIOSelectorsError("Maximum number of descriptors is exhausted!") +proc registerHandle2*[T](s: Selector[T], fd: cint, events: set[Event], + data: T): SelectResult[void] = + let skey = SelectorKey[T](ident: fd, events: events, param: 0, data: data) -proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, - events: set[Event], data: T) = - var fdi = int(fd) - s.checkFd(fdi) - doAssert(s.fds[fdi].ident == InvalidIdent) - setKey(s, fdi, events, 0, data) - if events != {}: s.pollAdd(fdi.cint, events) + s.addKey(fd, skey) + if events != {}: + s.pollAdd(fd, events) + ok() -proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, - events: set[Event]) = - let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, - Event.User, Event.Oneshot, Event.Error} - let fdi = int(fd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, - "Descriptor [" & $fdi & "] is not registered in the queue!") - doAssert(pkey.events * maskEvents == {}) - - if pkey.events != events: - if pkey.events == {}: - s.pollAdd(fd.cint, events) - else: - if events != {}: - s.pollUpdate(fd.cint, events) +proc updateHandle2*[T](s: Selector[T], fd: cint, + events: set[Event]): SelectResult[void] = + const EventsMask = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, + Event.User, Event.Oneshot, Event.Error} + s.fds.withValue(int32(fd), pkey): + doAssert(pkey[].events * EventsMask == {}, + "Descriptor [" & fd.toString() & "] could not be updated!") + if pkey[].events != events: + if pkey[].events == {}: + s.pollAdd(fd, events) else: - s.pollRemove(fd.cint) - pkey.events = events + if events != {}: + s.pollUpdate(fd, events) + else: + s.pollRemove(fd) + pkey.events = events + do: + raiseAssert "Descriptor [" & fd.toString() & + "] is not registered in the selector!" + ok() -proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = - var fdi = int(ev.rfd) - doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") - var events = {Event.User} - setKey(s, fdi, events, 0, data) - events.incl(Event.Read) - s.pollAdd(fdi.cint, events) +proc registerEvent2*[T](s: Selector[T], ev: SelectEvent, + data: T): SelectResult[cint] = + doAssert(not(isNil(ev))) + let + key = SelectorKey[T](ident: ev.rfd, events: {Event.User}, + param: 0, data: data) -proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = - let fdi = int(fd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, - "Descriptor [" & $fdi & "] is not registered in the queue!") - pkey.ident = InvalidIdent + s.addKey(ev.rfd, key) + s.pollAdd(ev.rfd, {Event.Read}.toPollEvents()) + ok(ev.rfd) + +proc unregister2*[T](s: Selector[T], fd: cint): SelectResult[void] = + let pkey = s.getKey(fd) if pkey.events != {}: - pkey.events = {} - s.pollRemove(fdi.cint) + if {Event.Read, Event.Write, Event.User} * pkey.events != {}: + s.pollRemove(fd) + s.freeKey(fd) + ok() -proc unregister*[T](s: Selector[T], ev: SelectEvent) = - let fdi = int(ev.rfd) - s.checkFd(fdi) - var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") - doAssert(Event.User in pkey.events) - pkey.ident = InvalidIdent - pkey.events = {} - s.pollRemove(fdi.cint) +proc unregister2*[T](s: Selector[T], event: SelectEvent): SelectResult[void] = + s.unregister2(event.rfd) -proc newSelectEvent*(): SelectEvent = - var fds: array[2, cint] - if posix.pipe(fds) != 0: - raiseIOSelectorsError(osLastError()) - setNonBlocking(fds[0]) - setNonBlocking(fds[1]) - result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) - result.rfd = fds[0] - result.wfd = fds[1] +proc prepareKey[T](s: Selector[T], event: var TPollfd): Opt[ReadyKey] = + let + defaultKey = SelectorKey[T](ident: InvalidIdent) + fdi32 = int32(event.fd) + revents = event.revents -proc trigger*(ev: SelectEvent) = - var data: uint64 = 1 - if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): - raiseIOSelectorsError(osLastError()) + var + pkey = s.getKey(fdi32) + rkey = ReadyKey(fd: event.fd) -proc close*(ev: SelectEvent) = - let res1 = posix.close(ev.rfd) - let res2 = posix.close(ev.wfd) - deallocShared(cast[pointer](ev)) - if res1 != 0 or res2 != 0: - raiseIOSelectorsError(osLastError()) + # Cleanup all the received events. + event.revents = 0 + + if (revents and POLLIN) != 0: + if Event.User in pkey.events: + var data: uint64 = 0 + let res = handleEintr(osdefs.read(event.fd, addr data, sizeof(uint64))) + if res != sizeof(uint64): + let errorCode = osLastError() + if errorCode == EAGAIN: + return Opt.none(ReadyKey) + else: + rkey.events.incl({Event.User, Event.Error}) + rkey.errorCode = errorCode + else: + rkey.events.incl(Event.User) + else: + rkey.events.incl(Event.Read) + + if (revents and POLLOUT) != 0: + rkey.events.incl(Event.Write) + + if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or + (revents and POLLNVAL) != 0: + rkey.events.incl(Event.Error) + + ok(rkey) + +proc selectInto2*[T](s: Selector[T], timeout: int, + readyKeys: var openArray[ReadyKey]): SelectResult[int] = + var k = 0 + + verifySelectParams(timeout, -1, int(high(cint))) + + let + maxEventsCount = min(len(s.pollfds), len(readyKeys)) + eventsCount = + if maxEventsCount > 0: + let res = handleEintr(poll(addr(s.pollfds[0]), Tnfds(maxEventsCount), + timeout)) + if res < 0: + return err(osLastError()) + res + else: + 0 + + for i in 0 ..< len(s.pollfds): + if s.pollfds[i].revents != 0: + let rkey = s.prepareKey(s.pollfds[i]).valueOr: continue + readyKeys[k] = rkey + inc(k) + if k == eventsCount: break + + ok(k) + +proc select2*[T](s: Selector[T], timeout: int): SelectResult[seq[ReadyKey]] = + var res = newSeq[ReadyKey](asyncEventsCount) + let count = ? selectInto2(s, timeout, res) + res.setLen(count) + ok(res) + +proc newSelector*[T](): Selector[T] {. + raises: [Defect, OSError].} = + let res = Selector.new(T) + if res.isErr(): raiseOSError(res.error) + res.get() + +proc close*[T](s: Selector[T]) {. + raises: [Defect, IOSelectorsException].} = + let res = s.close2() + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc newSelectEvent*(): SelectEvent {. + raises: [Defect, IOSelectorsException].} = + let res = SelectEvent.new() + if res.isErr(): raiseIOSelectorsError(res.error()) + res.get() + +proc trigger*(event: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let res = event.trigger2() + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc close*(event: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let res = event.close2() + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc registerHandle*[T](s: Selector[T], fd: cint | SocketHandle, + events: set[Event], data: T) {. + raises: [Defect, IOSelectorsException].} = + let res = registerHandle2(s, cint(fd), events, data) + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc updateHandle*[T](s: Selector[T], fd: cint | SocketHandle, + events: set[Event]) {. + raises: [Defect, IOSelectorsException].} = + let res = updateHandle2(s, cint(fd), events) + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc unregister*[T](s: Selector[T], fd: cint | SocketHandle) {. + raises: [Defect, IOSelectorsException].} = + let res = unregister2(s, cint(fd)) + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc unregister*[T](s: Selector[T], event: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let res = unregister2(s, event) + if res.isErr(): raiseIOSelectorsError(res.error()) + +proc registerEvent*[T](s: Selector[T], event: SelectEvent, + data: T) {. + raises: [Defect, IOSelectorsException].} = + let res = registerEvent2(s, event, data) + if res.isErr(): raiseIOSelectorsError(res.error()) proc selectInto*[T](s: Selector[T], timeout: int, - results: var openArray[ReadyKey]): int = - var maxres = MAX_POLL_EVENTS - if maxres > len(results): - maxres = len(results) - - verifySelectParams(timeout) - - s.withPollLock(): - let count = posix.poll(addr(s.pollfds[0]), Tnfds(s.pollcnt), timeout) - if count < 0: - result = 0 - let err = osLastError() - if cint(err) != EINTR: - raiseIOSelectorsError(err) - elif count == 0: - result = 0 - else: - var i = 0 - var k = 0 - var rindex = 0 - while (i < s.pollcnt) and (k < count) and (rindex < maxres): - let revents = s.pollfds[i].revents - if revents != 0: - let fd = s.pollfds[i].fd - var pkey = addr(s.fds[fd]) - var rkey = ReadyKey(fd: int(fd), events: {}) - - if (revents and POLLIN) != 0: - rkey.events.incl(Event.Read) - if Event.User in pkey.events: - var data: uint64 = 0 - if posix.read(fd, addr data, sizeof(uint64)) != sizeof(uint64): - let err = osLastError() - if err != OSErrorCode(EAGAIN): - raiseIOSelectorsError(err) - else: - # someone already consumed event data - inc(i) - continue - rkey.events = {Event.User} - if (revents and POLLOUT) != 0: - rkey.events.incl(Event.Write) - if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or - (revents and POLLNVAL) != 0: - rkey.events.incl(Event.Error) - results[rindex] = rkey - s.pollfds[i].revents = 0 - inc(rindex) - inc(k) - inc(i) - result = k + readyKeys: var openArray[ReadyKey]): int {. + raises: [Defect, IOSelectorsException].} = + let res = selectInto2(s, timeout, readyKeys) + if res.isErr(): raiseIOSelectorsError(res.error()) + res.get() proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = - result = newSeq[ReadyKey](MAX_POLL_EVENTS) - let count = selectInto(s, timeout, result) - result.setLen(count) + let res = select2(s, timeout) + if res.isErr(): raiseIOSelectorsError(res.error()) + res.get() -template isEmpty*[T](s: Selector[T]): bool = - (s.count == 0) +proc contains*[T](s: Selector[T], fd: SocketHandle|cint): bool {.inline.} = + s.checkKey(int32(fd)) -proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = - return s.fds[fd.int].ident != InvalidIdent +proc setData*[T](s: Selector[T], fd: SocketHandle|cint, data: T): bool = + s.fds.withValue(int32(fd), skey): + skey[].data = data + return true + do: + return false -proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T = - let fdi = int(fd) - s.checkFd(fdi) - if fdi in s: - result = s.fds[fdi].data - -proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = - let fdi = int(fd) - s.checkFd(fdi) - if fdi in s: - s.fds[fdi].data = data - result = true - -template withData*[T](s: Selector[T], fd: SocketHandle|int, value, +template withData*[T](s: Selector[T], fd: SocketHandle|cint, value, body: untyped) = - mixin checkFd - let fdi = int(fd) - s.checkFd(fdi) - if fdi in s: - var value = addr(s.getData(fdi)) + s.fds.withValue(int32(fd), skey): + var value = addr(skey[].data) body -template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, +template withData*[T](s: Selector[T], fd: SocketHandle|cint, value, body1, body2: untyped) = - mixin checkFd - let fdi = int(fd) - s.checkFd(fdi) - if fdi in s: - var value = addr(s.getData(fdi)) + s.fds.withValue(int32(fd), skey): + var value = addr(skey[].data) body1 - else: + do: body2 - -proc getFd*[T](s: Selector[T]): int = - return -1 +proc getFd*[T](s: Selector[T]): int = -1 diff --git a/chronos/ioselects/ioselectors_select.nim b/chronos/ioselects/ioselectors_select.nim deleted file mode 100644 index 9a2914f..0000000 --- a/chronos/ioselects/ioselectors_select.nim +++ /dev/null @@ -1,465 +0,0 @@ -# -# -# Nim's Runtime Library -# (c) Copyright 2016 Eugene Kabanov -# -# See the file "copying.txt", included in this -# distribution, for details about the copyright. -# - -# This module implements Posix and Windows select(). - -import times, nativesockets - -when defined(windows): - import winlean - when defined(gcc): - {.passl: "-lws2_32".} - elif defined(vcc): - {.passl: "ws2_32.lib".} - const platformHeaders = """#include - #include """ - const EAGAIN = WSAEWOULDBLOCK -else: - const platformHeaders = """#include - #include - #include - #include """ -type - Fdset {.importc: "fd_set", header: platformHeaders, pure, final.} = object -var - FD_SETSIZE {.importc: "FD_SETSIZE", header: platformHeaders.}: cint - -proc IOFD_SET(fd: SocketHandle, fdset: ptr Fdset) - {.cdecl, importc: "FD_SET", header: platformHeaders, inline.} -proc IOFD_CLR(fd: SocketHandle, fdset: ptr Fdset) - {.cdecl, importc: "FD_CLR", header: platformHeaders, inline.} -proc IOFD_ZERO(fdset: ptr Fdset) - {.cdecl, importc: "FD_ZERO", header: platformHeaders, inline.} - -when defined(windows): - proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint - {.stdcall, importc: "FD_ISSET", header: platformHeaders, inline.} - proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset, - timeout: ptr Timeval): cint - {.stdcall, importc: "select", header: platformHeaders.} -else: - proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint - {.cdecl, importc: "FD_ISSET", header: platformHeaders, inline.} - proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset, - timeout: ptr Timeval): cint - {.cdecl, importc: "select", header: platformHeaders.} - -when hasThreadSupport: - type - SelectorImpl[T] = object - rSet: Fdset - wSet: Fdset - eSet: Fdset - maxFD: int - fds: ptr SharedArray[SelectorKey[T]] - count: int - lock: Lock - Selector*[T] = ptr SelectorImpl[T] -else: - type - SelectorImpl[T] = object - rSet: Fdset - wSet: Fdset - eSet: Fdset - maxFD: int - fds: seq[SelectorKey[T]] - count: int - Selector*[T] = ref SelectorImpl[T] - -type - SelectEventImpl = object - rsock: SocketHandle - wsock: SocketHandle - SelectEvent* = ptr SelectEventImpl - -when hasThreadSupport: - template withSelectLock[T](s: Selector[T], body: untyped) = - acquire(s.lock) - {.locks: [s.lock].}: - try: - body - finally: - release(s.lock) -else: - template withSelectLock[T](s: Selector[T], body: untyped) = - body - -proc newSelector*[T](): Selector[T] = - when hasThreadSupport: - result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) - result.fds = allocSharedArray[SelectorKey[T]](FD_SETSIZE) - initLock result.lock - else: - result = Selector[T]() - result.fds = newSeq[SelectorKey[T]](FD_SETSIZE) - - for i in 0 ..< FD_SETSIZE: - result.fds[i].ident = InvalidIdent - - IOFD_ZERO(addr result.rSet) - IOFD_ZERO(addr result.wSet) - IOFD_ZERO(addr result.eSet) - -proc close*[T](s: Selector[T]) = - when hasThreadSupport: - deallocSharedArray(s.fds) - deallocShared(cast[pointer](s)) - -when defined(windows): - proc newSelectEvent*(): SelectEvent = - var ssock = createNativeSocket() - var wsock = createNativeSocket() - var rsock: SocketHandle = INVALID_SOCKET - var saddr = Sockaddr_in() - - saddr.sin_family = winlean.AF_INET - saddr.sin_port = 0 - saddr.sin_addr.s_addr = INADDR_ANY - if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)), - sizeof(saddr).SockLen) < 0'i32: - raiseIOSelectorsError(osLastError()) - - if winlean.listen(ssock, 1) != 0: - raiseIOSelectorsError(osLastError()) - - var namelen = sizeof(saddr).SockLen - if getsockname(ssock, cast[ptr SockAddr](addr(saddr)), - addr(namelen)) != 0'i32: - raiseIOSelectorsError(osLastError()) - - saddr.sin_addr.s_addr = 0x0100007F - if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)), - sizeof(saddr).SockLen) != 0: - raiseIOSelectorsError(osLastError()) - namelen = sizeof(saddr).SockLen - rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)), - cast[ptr SockLen](addr(namelen))) - if rsock == SocketHandle(-1): - raiseIOSelectorsError(osLastError()) - - if winlean.closesocket(ssock) != 0: - raiseIOSelectorsError(osLastError()) - - var mode = clong(1) - if ioctlsocket(rsock, FIONBIO, addr(mode)) != 0: - raiseIOSelectorsError(osLastError()) - mode = clong(1) - if ioctlsocket(wsock, FIONBIO, addr(mode)) != 0: - raiseIOSelectorsError(osLastError()) - - result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) - result.rsock = rsock - result.wsock = wsock - - proc trigger*(ev: SelectEvent) = - var data: uint64 = 1 - if winlean.send(ev.wsock, cast[pointer](addr data), - cint(sizeof(uint64)), 0) != sizeof(uint64): - raiseIOSelectorsError(osLastError()) - - proc close*(ev: SelectEvent) = - let res1 = winlean.closesocket(ev.rsock) - let res2 = winlean.closesocket(ev.wsock) - deallocShared(cast[pointer](ev)) - if res1 != 0 or res2 != 0: - raiseIOSelectorsError(osLastError()) - -else: - proc newSelectEvent*(): SelectEvent = - var fds: array[2, cint] - if posix.pipe(fds) != 0: - raiseIOSelectorsError(osLastError()) - setNonBlocking(fds[0]) - setNonBlocking(fds[1]) - result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) - result.rsock = SocketHandle(fds[0]) - result.wsock = SocketHandle(fds[1]) - - proc trigger*(ev: SelectEvent) = - var data: uint64 = 1 - if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64): - raiseIOSelectorsError(osLastError()) - - proc close*(ev: SelectEvent) = - let res1 = posix.close(cint(ev.rsock)) - let res2 = posix.close(cint(ev.wsock)) - deallocShared(cast[pointer](ev)) - if res1 != 0 or res2 != 0: - raiseIOSelectorsError(osLastError()) - -proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], - data: T) = - var i = 0 - let fdi = int(fd) - while i < FD_SETSIZE: - if s.fds[i].ident == InvalidIdent: - var pkey = addr(s.fds[i]) - pkey.ident = fdi - pkey.events = events - pkey.data = data - break - inc(i) - if i >= FD_SETSIZE: - raiseIOSelectorsError("Maximum number of descriptors is exhausted!") - -proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] = - var i = 0 - let fdi = int(fd) - while i < FD_SETSIZE: - if s.fds[i].ident == fdi: - result = addr(s.fds[i]) - break - inc(i) - doAssert(i < FD_SETSIZE, - "Descriptor [" & $int(fd) & "] is not registered in the queue!") - -proc delKey[T](s: Selector[T], fd: SocketHandle) = - var empty: T - var i = 0 - while i < FD_SETSIZE: - if s.fds[i].ident == fd.int: - s.fds[i].ident = InvalidIdent - s.fds[i].events = {} - s.fds[i].data = empty - break - inc(i) - doAssert(i < FD_SETSIZE, - "Descriptor [" & $int(fd) & "] is not registered in the queue!") - -proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, - events: set[Event], data: T) = - when not defined(windows): - let fdi = int(fd) - s.withSelectLock(): - s.setSelectKey(fd, events, data) - when not defined(windows): - if fdi > s.maxFD: s.maxFD = fdi - if Event.Read in events: - IOFD_SET(fd, addr s.rSet) - inc(s.count) - if Event.Write in events: - IOFD_SET(fd, addr s.wSet) - IOFD_SET(fd, addr s.eSet) - inc(s.count) - -proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = - when not defined(windows): - let fdi = int(ev.rsock) - s.withSelectLock(): - s.setSelectKey(ev.rsock, {Event.User}, data) - when not defined(windows): - if fdi > s.maxFD: s.maxFD = fdi - IOFD_SET(ev.rsock, addr s.rSet) - inc(s.count) - -proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, - events: set[Event]) = - let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, - Event.User, Event.Oneshot, Event.Error} - s.withSelectLock(): - var pkey = s.getKey(fd) - doAssert(pkey.events * maskEvents == {}) - if pkey.events != events: - if (Event.Read in pkey.events) and (Event.Read notin events): - IOFD_CLR(fd, addr s.rSet) - dec(s.count) - if (Event.Write in pkey.events) and (Event.Write notin events): - IOFD_CLR(fd, addr s.wSet) - IOFD_CLR(fd, addr s.eSet) - dec(s.count) - if (Event.Read notin pkey.events) and (Event.Read in events): - IOFD_SET(fd, addr s.rSet) - inc(s.count) - if (Event.Write notin pkey.events) and (Event.Write in events): - IOFD_SET(fd, addr s.wSet) - IOFD_SET(fd, addr s.eSet) - inc(s.count) - pkey.events = events - -proc unregister*[T](s: Selector[T], fd: SocketHandle|int) = - s.withSelectLock(): - let fd = fd.SocketHandle - var pkey = s.getKey(fd) - if Event.Read in pkey.events or Event.User in pkey.events: - IOFD_CLR(fd, addr s.rSet) - dec(s.count) - if Event.Write in pkey.events: - IOFD_CLR(fd, addr s.wSet) - IOFD_CLR(fd, addr s.eSet) - dec(s.count) - s.delKey(fd) - -proc unregister*[T](s: Selector[T], ev: SelectEvent) = - let fd = ev.rsock - s.withSelectLock(): - var pkey = s.getKey(fd) - IOFD_CLR(fd, addr s.rSet) - dec(s.count) - s.delKey(fd) - -proc selectInto*[T](s: Selector[T], timeout: int, - results: var openArray[ReadyKey]): int = - var tv = Timeval() - var ptv = addr tv - var rset, wset, eset: Fdset - - verifySelectParams(timeout) - - if timeout != -1: - when defined(genode): - tv.tv_sec = Time(timeout div 1_000) - else: - tv.tv_sec = timeout.int32 div 1_000 - tv.tv_usec = (timeout.int32 %% 1_000) * 1_000 - else: - ptv = nil - - s.withSelectLock(): - rset = s.rSet - wset = s.wSet - eset = s.eSet - - var count = ioselect(cint(s.maxFD) + 1, addr(rset), addr(wset), - addr(eset), ptv) - if count < 0: - result = 0 - when defined(windows): - raiseIOSelectorsError(osLastError()) - else: - let err = osLastError() - if cint(err) != EINTR: - raiseIOSelectorsError(err) - elif count == 0: - result = 0 - else: - var rindex = 0 - var i = 0 - var k = 0 - - while (i < FD_SETSIZE) and (k < count): - if s.fds[i].ident != InvalidIdent: - var flag = false - var pkey = addr(s.fds[i]) - var rkey = ReadyKey(fd: int(pkey.ident), events: {}) - let fd = SocketHandle(pkey.ident) - if IOFD_ISSET(fd, addr rset) != 0: - if Event.User in pkey.events: - var data: uint64 = 0 - if recv(fd, cast[pointer](addr(data)), - sizeof(uint64).cint, 0) != sizeof(uint64): - let err = osLastError() - if cint(err) != EAGAIN: - raiseIOSelectorsError(err) - else: - inc(i) - inc(k) - continue - else: - flag = true - rkey.events = {Event.User} - else: - flag = true - rkey.events = {Event.Read} - if IOFD_ISSET(fd, addr wset) != 0: - rkey.events.incl(Event.Write) - if IOFD_ISSET(fd, addr eset) != 0: - rkey.events.incl(Event.Error) - flag = true - if flag: - results[rindex] = rkey - inc(rindex) - inc(k) - inc(i) - result = rindex - -proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = - result = newSeq[ReadyKey](FD_SETSIZE) - var count = selectInto(s, timeout, result) - result.setLen(count) - -proc flush*[T](s: Selector[T]) = discard - -template isEmpty*[T](s: Selector[T]): bool = - (s.count == 0) - -proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = - s.withSelectLock(): - result = false - - let fdi = int(fd) - for i in 0..", pure, final.} = object + ssi_signo*: uint32 + ssi_errno*: int32 + ssi_code*: int32 + ssi_pid*: uint32 + ssi_uid*: uint32 + ssi_fd*: int32 + ssi_tid*: uint32 + ssi_band*: uint32 + ssi_overrun*: uint32 + ssi_trapno*: uint32 + ssi_status*: int32 + ssi_int*: int32 + ssi_ptr*: uint64 + ssi_utime*: uint64 + ssi_stime*: uint64 + ssi_addr*: uint64 + pad* {.importc: "__pad".}: array[0..47, uint8] + proc epoll_create*(size: cint): cint {.importc: "epoll_create", header: "", sideEffect.} @@ -933,9 +953,19 @@ elif defined(linux): timeout: cint): cint {. importc: "epoll_wait", header: "", sideEffect.} + proc timerfd_create*(clock_id: ClockId, flags: cint): cint {. + cdecl, importc: "timerfd_create", header: "".} + proc timerfd_settime*(ufd: cint, flags: cint, + utmr: var Itimerspec, otmr: var Itimerspec): cint {. + cdecl, importc: "timerfd_settime", header: "".} + proc eventfd*(count: cuint, flags: cint): cint {. + cdecl, importc: "eventfd", header: "".} + proc signalfd*(fd: cint, mask: var Sigset, flags: cint): cint {. + cdecl, importc: "signalfd", header: "".} + else: - import std/[posix, os] - export posix, os + import std/posix + export posix var IP_MULTICAST_TTL* {.importc: "IP_MULTICAST_TTL", header: "".}: cint diff --git a/chronos/selectors2.nim b/chronos/selectors2.nim index d9ef778..bbb52a5 100644 --- a/chronos/selectors2.nim +++ b/chronos/selectors2.nim @@ -31,22 +31,32 @@ # support - changes could potentially be backported to nim but are not # backwards-compatible. -import os, nativesockets +import stew/results +import osdefs, osutils +export results -const hasThreadSupport = compileOption("threads") and defined(threadsafe) +const + asyncEventsCount* {.intdefine.} = 64 + ## Number of epoll events retrieved by syscall. + asyncInitialSize* {.intdefine.} = 64 + ## Initial size of Selector[T]'s array of file descriptors. + asyncEventEngine* {.strdefine.} = + when defined(linux): + "epoll" + elif defined(macosx) or defined(macos) or defined(ios) or + defined(freebsd) or defined(netbsd) or defined(openbsd) or + defined(dragonfly): + "kqueue" + elif defined(posix): + "poll" + else: + "" + ## Engine type which is going to be used by module. -const ioselSupportedPlatform* = defined(macosx) or defined(freebsd) or - defined(netbsd) or defined(openbsd) or - defined(dragonfly) or - (defined(linux) and not defined(android)) - ## This constant is used to determine whether the destination platform is - ## fully supported by ``ioselectors`` module. - -const bsdPlatform = defined(macosx) or defined(freebsd) or - defined(netbsd) or defined(openbsd) or - defined(dragonfly) + hasThreadSupport = compileOption("threads") when defined(nimdoc): + type Selector*[T] = ref object ## An object which holds descriptors to be checked for read/write status @@ -236,30 +246,16 @@ when defined(nimdoc): ## For *poll* and *select* selectors ``-1`` is returned. else: - import strutils - when hasThreadSupport: - import locks - - type - SharedArray[T] = UncheckedArray[T] - - proc allocSharedArray[T](nsize: int): ptr SharedArray[T] = - result = cast[ptr SharedArray[T]](allocShared0(sizeof(T) * nsize)) - - proc reallocSharedArray[T](sa: ptr SharedArray[T], nsize: int): ptr SharedArray[T] = - result = cast[ptr SharedArray[T]](reallocShared(sa, sizeof(T) * nsize)) - - proc deallocSharedArray[T](sa: ptr SharedArray[T]) = - deallocShared(cast[pointer](sa)) type + IOSelectorsException* = object of CatchableError + + SelectResult*[T] = Result[T, OSErrorCode] + Event* {.pure.} = enum Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot, Finished, VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink, VnodeRename, VnodeRevoke - type - IOSelectorsException* = object of CatchableError - ReadyKey* = object fd* : int events*: set[Event] @@ -285,78 +281,54 @@ else: var err = newException(IOSelectorsException, msg) raise err - proc setNonBlocking(fd: cint) {.inline.} = - setBlocking(fd.SocketHandle, false) - - when not defined(windows): - import posix - - template setKey(s, pident, pevents, pparam, pdata: untyped) = - var skey = addr(s.fds[pident]) - skey.ident = pident - skey.events = pevents - skey.param = pparam - skey.data = data - - when ioselSupportedPlatform: - template blockSignals(newmask: var Sigset, oldmask: var Sigset) = + when asyncEventEngine in ["epoll", "kqueue"]: + proc blockSignals(newmask: Sigset, + oldmask: var Sigset): Result[void, OSErrorCode] = + var nmask = newmask + # We do this trick just because Nim's posix.nim has declaration like + # this: + # proc pthread_sigmask(a1: cint; a2, a3: var Sigset): cint + # proc sigprocmask*(a1: cint, a2, a3: var Sigset): cint when hasThreadSupport: - if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1: - raiseIOSelectorsError(osLastError()) + if pthread_sigmask(SIG_BLOCK, nmask, oldmask) == -1: + err(osLastError()) + else: + ok() else: - if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1: - raiseIOSelectorsError(osLastError()) + if sigprocmask(SIG_BLOCK, nmask, oldmask) == -1: + err(osLastError()) + else: + ok() - template unblockSignals(newmask: var Sigset, oldmask: var Sigset) = + proc unblockSignals(newmask: Sigset, + oldmask: var Sigset): Result[void, OSErrorCode] = + # We do this trick just because Nim's posix.nim has declaration like + # this: + # proc pthread_sigmask(a1: cint; a2, a3: var Sigset): cint + # proc sigprocmask*(a1: cint, a2, a3: var Sigset): cint + var nmask = newmask when hasThreadSupport: - if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1: - raiseIOSelectorsError(osLastError()) + if pthread_sigmask(SIG_UNBLOCK, nmask, oldmask) == -1: + err(osLastError()) + else: + ok() else: - if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1: - raiseIOSelectorsError(osLastError()) + if sigprocmask(SIG_UNBLOCK, nmask, oldmask) == -1: + err(osLastError()) + else: + ok() - template clearKey[T](key: ptr SelectorKey[T]) = - var empty: T - key.ident = InvalidIdent - key.events = {} - key.data = empty - - proc verifySelectParams(timeout: int) = + template verifySelectParams(timeout, min, max: int) = # Timeout of -1 means: wait forever # Anything higher is the time to wait in milliseconds. - doAssert(timeout >= -1, "Cannot select with a negative value, got " & $timeout) + doAssert((timeout >= min) and (timeout <= max), + "Cannot select with incorrect timeout value, got " & $timeout) - when defined(linux): - include ./ioselects/ioselectors_epoll - elif bsdPlatform: - include ./ioselects/ioselectors_kqueue - elif defined(windows): - include ./ioselects/ioselectors_select - elif defined(solaris): - include ./ioselects/ioselectors_poll # need to replace it with event ports - elif defined(genode): - include ./ioselects/ioselectors_select # TODO: use the native VFS layer - elif defined(nintendoswitch): - include ./ioselects/ioselectors_select - else: - include ./ioselects/ioselectors_poll - -proc register*[T](s: Selector[T], fd: int | SocketHandle, - events: set[Event], data: T) {.deprecated: "use registerHandle instead".} = - ## **Deprecated since v0.18.0:** Use ``registerHandle`` instead. - s.registerHandle(fd, events, data) - -proc setEvent*(ev: SelectEvent) {.deprecated: "use trigger instead", - raises: [Defect, IOSelectorsException].} = - ## Trigger event ``ev``. - ## - ## **Deprecated since v0.18.0:** Use ``trigger`` instead. - ev.trigger() - -proc update*[T](s: Selector[T], fd: int | SocketHandle, - events: set[Event]) {.deprecated: "use updateHandle instead".} = - ## Update file/socket descriptor ``fd``, registered in selector - ## ``s`` with new events set ``event``. - ## - ## **Deprecated since v0.18.0:** Use ``updateHandle`` instead. - s.updateHandle() +when asyncEventEngine == "epoll": + include ./ioselects/ioselectors_epoll +elif asyncEventEngine == "kqueue": + include ./ioselects/ioselectors_kqueue +elif asyncEventEngine == "poll": + include ./ioselects/ioselectors_poll +else: + {.fatal: "Event engine `" & asyncEventEngine & "` is not supported!".}