# # # Nim's Runtime Library # (c) Copyright 2016 Eugene Kabanov # # See the file "copying.txt", included in this # distribution, for details about the copyright. # # This module implements BSD kqueue(). import posix, times, kqueue const # Maximum number of events that can be returned. MAX_KQUEUE_EVENTS = 64 # SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them # to be constants and GC-safe. SIG_DFL = cast[proc(x: cint) {.raises: [],noconv,gcsafe.}](0) SIG_IGN = cast[proc(x: cint) {.raises: [],noconv,gcsafe.}](1) when defined(kqcache): const CACHE_EVENTS = true when defined(macosx) or defined(freebsd) or defined(dragonfly): when defined(macosx): const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS) else: const MAX_DESCRIPTORS_ID = 27 # KERN_MAXFILESPERPROC (FreeBSD) proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, newp: pointer, newplen: csize_t): cint {.importc: "sysctl",header: """#include #include """} elif defined(netbsd) or defined(openbsd): # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using # KERN_MAXFILES, because KERN_MAXFILES is always bigger, # than KERN_MAXFILESPERPROC. const MAX_DESCRIPTORS_ID = 7 # KERN_MAXFILES proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, newp: pointer, newplen: csize_t): cint {.importc: "sysctl",header: """#include #include """} when hasThreadSupport: type SelectorImpl[T] = object kqFD: cint maxFD: int changes: ptr SharedArray[KEvent] fds: ptr SharedArray[SelectorKey[T]] count: int changesLock: Lock changesSize: int changesLength: int sock: cint Selector*[T] = ptr SelectorImpl[T] else: type SelectorImpl[T] = object kqFD: cint maxFD: int changes: seq[KEvent] fds: seq[SelectorKey[T]] count: int sock: cint Selector*[T] = ref SelectorImpl[T] type SelectEventImpl = object rfd: cint wfd: cint SelectEvent* = ptr SelectEventImpl # SelectEvent is declared as `ptr` to be placed in `shared memory`, # so you can share one SelectEvent handle between threads. proc getUnique[T](s: Selector[T]): int {.inline.} = # we create duplicated handles to get unique indexes for our `fds` array. result = posix.fcntl(s.sock, F_DUPFD, s.sock) if result == -1: raiseIOSelectorsError(osLastError()) proc newSelector*[T](): owned(Selector[T]) = var maxFD = 0.cint var size = csize_t(sizeof(cint)) var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint] # Obtain maximum number of opened file descriptors for process if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size, nil, 0) != 0: raiseIOSelectorsError(osLastError()) var kqFD = kqueue() if kqFD < 0: raiseIOSelectorsError(osLastError()) # we allocating empty socket to duplicate it handle in future, to get unique # indexes for `fds` array. This is needed to properly identify # {Event.Timer, Event.Signal, Event.Process} events. let usock = posix.socket(posix.AF_INET, posix.SOCK_STREAM, posix.IPPROTO_TCP).cint if usock == -1: let err = osLastError() discard posix.close(kqFD) raiseIOSelectorsError(err) when hasThreadSupport: result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) result.fds = allocSharedArray[SelectorKey[T]](maxFD) result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS) result.changesSize = MAX_KQUEUE_EVENTS initLock(result.changesLock) else: result = Selector[T]() result.fds = newSeq[SelectorKey[T]](maxFD) result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS) for i in 0 ..< maxFD: result.fds[i].ident = InvalidIdent result.sock = usock result.kqFD = kqFD result.maxFD = maxFD.int proc close*[T](s: Selector[T]) = let res1 = posix.close(s.kqFD) let res2 = posix.close(s.sock) when hasThreadSupport: deinitLock(s.changesLock) deallocSharedArray(s.fds) deallocShared(cast[pointer](s)) if res1 != 0 or res2 != 0: raiseIOSelectorsError(osLastError()) proc newSelectEvent*(): SelectEvent = var fds: array[2, cint] if posix.pipe(fds) != 0: raiseIOSelectorsError(osLastError()) setNonBlocking(fds[0]) setNonBlocking(fds[1]) result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) result.rfd = fds[0] result.wfd = fds[1] proc trigger*(ev: SelectEvent) = var data: uint64 = 1 if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): raiseIOSelectorsError(osLastError()) proc close*(ev: SelectEvent) = let res1 = posix.close(ev.rfd) let res2 = posix.close(ev.wfd) deallocShared(cast[pointer](ev)) if res1 != 0 or res2 != 0: raiseIOSelectorsError(osLastError()) template checkFd(s, f) = if f >= s.maxFD: raiseIOSelectorsError("Maximum number of descriptors is exhausted!") when hasThreadSupport: template withChangeLock[T](s: Selector[T], body: untyped) = acquire(s.changesLock) {.locks: [s.changesLock].}: try: body finally: release(s.changesLock) else: template withChangeLock(s, body: untyped) = body when hasThreadSupport: template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, nflags: cushort, nfflags: cuint, ndata: int, nudata: pointer) = mixin withChangeLock s.withChangeLock(): if s.changesLength == s.changesSize: # if cache array is full, we allocating new with size * 2 let newSize = s.changesSize shl 1 let rdata = allocSharedArray[KEvent](newSize) copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent)) s.changesSize = newSize s.changes[s.changesLength] = KEvent(ident: nident, filter: nfilter, flags: nflags, fflags: nfflags, data: ndata, udata: nudata) inc(s.changesLength) when not declared(CACHE_EVENTS): template flushKQueue[T](s: Selector[T]) = mixin withChangeLock s.withChangeLock(): if s.changesLength > 0: if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), nil, 0, nil) == -1: raiseIOSelectorsError(osLastError()) s.changesLength = 0 else: template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, nflags: cushort, nfflags: cuint, ndata: int, nudata: pointer) = s.changes.add(KEvent(ident: nident, filter: nfilter, flags: nflags, fflags: nfflags, data: ndata, udata: nudata)) when not declared(CACHE_EVENTS): template flushKQueue[T](s: Selector[T]) = let length = cint(len(s.changes)) if length > 0: if kevent(s.kqFD, addr(s.changes[0]), length, nil, 0, nil) == -1: raiseIOSelectorsError(osLastError()) s.changes.setLen(0) proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event], data: T) = let fdi = int(fd) s.checkFd(fdi) doAssert(s.fds[fdi].ident == InvalidIdent) s.setKey(fdi, events, 0, data) if events != {}: if Event.Read in events: modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil) inc(s.count) if Event.Write in events: modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil) inc(s.count) when not declared(CACHE_EVENTS): flushKQueue(s) proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) = let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, Event.User, Event.Oneshot, Event.Error} let fdi = int(fd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != InvalidIdent, "Descriptor $# is not registered in the queue!" % $fdi) doAssert(pkey.events * maskEvents == {}) if pkey.events != events: if (Event.Read in pkey.events) and (Event.Read notin events): modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) dec(s.count) if (Event.Write in pkey.events) and (Event.Write notin events): modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) dec(s.count) if (Event.Read notin pkey.events) and (Event.Read in events): modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) inc(s.count) if (Event.Write notin pkey.events) and (Event.Write in events): modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) inc(s.count) when not declared(CACHE_EVENTS): flushKQueue(s) pkey.events = events proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, data: T): int {.discardable.} = let fdi = getUnique(s) s.checkFd(fdi) doAssert(s.fds[fdi].ident == InvalidIdent) let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer} let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD s.setKey(fdi, events, 0, data) # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds # too modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil) when not declared(CACHE_EVENTS): flushKQueue(s) inc(s.count) result = fdi proc registerSignal*[T](s: Selector[T], signal: int, data: T): int {.discardable.} = let fdi = getUnique(s) s.checkFd(fdi) doAssert(s.fds[fdi].ident == InvalidIdent) s.setKey(fdi, {Event.Signal}, signal, data) var nmask, omask: Sigset discard sigemptyset(nmask) discard sigemptyset(omask) discard sigaddset(nmask, cint(signal)) blockSignals(nmask, omask) # to be compatible with linux semantic we need to "eat" signals posix.signal(cint(signal), SIG_IGN) modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, cast[pointer](fdi)) when not declared(CACHE_EVENTS): flushKQueue(s) inc(s.count) result = fdi proc registerProcess*[T](s: Selector[T], pid: int, data: T): int {.discardable.} = let fdi = getUnique(s) s.checkFd(fdi) doAssert(s.fds[fdi].ident == InvalidIdent) var kflags: cushort = EV_ONESHOT or EV_ADD setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data) modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, cast[pointer](fdi)) when not declared(CACHE_EVENTS): flushKQueue(s) inc(s.count) result = fdi proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = let fdi = ev.rfd.int doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") setKey(s, fdi, {Event.User}, 0, data) modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) inc(s.count) template processVnodeEvents(events: set[Event]): cuint = var rfflags = 0.cuint if events == {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend, Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename, Event.VnodeRevoke}: rfflags = NOTE_DELETE or NOTE_WRITE or NOTE_EXTEND or NOTE_ATTRIB or NOTE_LINK or NOTE_RENAME or NOTE_REVOKE else: if Event.VnodeDelete in events: rfflags = rfflags or NOTE_DELETE if Event.VnodeWrite in events: rfflags = rfflags or NOTE_WRITE if Event.VnodeExtend in events: rfflags = rfflags or NOTE_EXTEND if Event.VnodeAttrib in events: rfflags = rfflags or NOTE_ATTRIB if Event.VnodeLink in events: rfflags = rfflags or NOTE_LINK if Event.VnodeRename in events: rfflags = rfflags or NOTE_RENAME if Event.VnodeRevoke in events: rfflags = rfflags or NOTE_REVOKE rfflags proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) = let fdi = fd.int setKey(s, fdi, {Event.Vnode} + events, 0, data) var fflags = processVnodeEvents(events) modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) inc(s.count) proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = let fdi = int(fd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != InvalidIdent, "Descriptor [" & $fdi & "] is not registered in the queue!") if pkey.events != {}: if pkey.events * {Event.Read, Event.Write} != {}: if Event.Read in pkey.events: modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) dec(s.count) if Event.Write in pkey.events: modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil) dec(s.count) when not declared(CACHE_EVENTS): flushKQueue(s) elif Event.Timer in pkey.events: if Event.Finished notin pkey.events: modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) dec(s.count) if posix.close(cint(pkey.ident)) != 0: raiseIOSelectorsError(osLastError()) elif Event.Signal in pkey.events: var nmask, omask: Sigset let signal = cint(pkey.param) discard sigemptyset(nmask) discard sigemptyset(omask) discard sigaddset(nmask, signal) unblockSignals(nmask, omask) posix.signal(signal, SIG_DFL) modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) dec(s.count) if posix.close(cint(pkey.ident)) != 0: raiseIOSelectorsError(osLastError()) elif Event.Process in pkey.events: if Event.Finished notin pkey.events: modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) dec(s.count) if posix.close(cint(pkey.ident)) != 0: raiseIOSelectorsError(osLastError()) elif Event.Vnode in pkey.events: modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) dec(s.count) elif Event.User in pkey.events: modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) dec(s.count) clearKey(pkey) proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = int(ev.rfd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") doAssert(Event.User in pkey.events) modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) clearKey(pkey) dec(s.count) proc selectInto*[T](s: Selector[T], timeout: int, results: var openArray[ReadyKey]): int = var tv: Timespec resTable: array[MAX_KQUEUE_EVENTS, KEvent] ptv = addr tv maxres = MAX_KQUEUE_EVENTS verifySelectParams(timeout) if timeout != -1: if timeout >= 1000: tv.tv_sec = posix.Time(timeout div 1_000) tv.tv_nsec = (timeout %% 1_000) * 1_000_000 else: tv.tv_sec = posix.Time(0) tv.tv_nsec = timeout * 1_000_000 else: ptv = nil if maxres > len(results): maxres = len(results) var count = 0 when not declared(CACHE_EVENTS): count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv) else: when hasThreadSupport: s.withChangeLock(): if s.changesLength > 0: count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), addr(resTable[0]), cint(maxres), ptv) s.changesLength = 0 else: count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv) else: let length = cint(len(s.changes)) if length > 0: count = kevent(s.kqFD, addr(s.changes[0]), length, addr(resTable[0]), cint(maxres), ptv) s.changes.setLen(0) else: count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv) if count < 0: result = 0 let err = osLastError() if cint(err) != EINTR: raiseIOSelectorsError(err) elif count == 0: result = 0 else: var i = 0 var k = 0 # do not delete this, because `continue` used in cycle. var pkey: ptr SelectorKey[T] while i < count: let kevent = addr(resTable[i]) var rkey = ReadyKey(fd: int(kevent.ident), events: {}) if (kevent.flags and EV_ERROR) != 0: rkey.events = {Event.Error} rkey.errorCode = OSErrorCode(kevent.data) case kevent.filter: of EVFILT_READ: pkey = addr(s.fds[int(kevent.ident)]) rkey.events.incl(Event.Read) if Event.User in pkey.events: var data: uint64 = 0 if posix.read(cint(kevent.ident), addr data, sizeof(uint64)) != sizeof(uint64): let err = osLastError() if err == OSErrorCode(EAGAIN): # someone already consumed event data inc(i) continue else: raiseIOSelectorsError(err) rkey.events = {Event.User} of EVFILT_WRITE: pkey = addr(s.fds[int(kevent.ident)]) rkey.events.incl(Event.Write) rkey.events = {Event.Write} of EVFILT_TIMER: pkey = addr(s.fds[int(kevent.ident)]) if Event.Oneshot in pkey.events: # we will not clear key until it will be unregistered, so # application can obtain data, but we will decrease counter, # because kqueue is empty. dec(s.count) # we are marking key with `Finished` event, to avoid double decrease. pkey.events.incl(Event.Finished) rkey.events.incl(Event.Timer) of EVFILT_VNODE: pkey = addr(s.fds[int(kevent.ident)]) rkey.events.incl(Event.Vnode) if (kevent.fflags and NOTE_DELETE) != 0: rkey.events.incl(Event.VnodeDelete) if (kevent.fflags and NOTE_WRITE) != 0: rkey.events.incl(Event.VnodeWrite) if (kevent.fflags and NOTE_EXTEND) != 0: rkey.events.incl(Event.VnodeExtend) if (kevent.fflags and NOTE_ATTRIB) != 0: rkey.events.incl(Event.VnodeAttrib) if (kevent.fflags and NOTE_LINK) != 0: rkey.events.incl(Event.VnodeLink) if (kevent.fflags and NOTE_RENAME) != 0: rkey.events.incl(Event.VnodeRename) if (kevent.fflags and NOTE_REVOKE) != 0: rkey.events.incl(Event.VnodeRevoke) of EVFILT_SIGNAL: pkey = addr(s.fds[cast[int](kevent.udata)]) rkey.fd = cast[int](kevent.udata) rkey.events.incl(Event.Signal) of EVFILT_PROC: rkey.fd = cast[int](kevent.udata) pkey = addr(s.fds[cast[int](kevent.udata)]) # we will not clear key, until it will be unregistered, so # application can obtain data, but we will decrease counter, # because kqueue is empty. dec(s.count) # we are marking key with `Finished` event, to avoid double decrease. pkey.events.incl(Event.Finished) rkey.events.incl(Event.Process) else: doAssert(true, "Unsupported kqueue filter in the queue!") if (kevent.flags and EV_EOF) != 0: # TODO this error handling needs to be rethought. # `fflags` can sometimes be `0x80000000` and thus we use 'cast' # here: if kevent.fflags != 0: rkey.errorCode = cast[OSErrorCode](kevent.fflags) else: # This assumes we are dealing with sockets. # TODO: For future-proofing it might be a good idea to give the # user access to the raw `kevent`. rkey.errorCode = OSErrorCode(ECONNRESET) rkey.events.incl(Event.Error) results[k] = rkey inc(k) inc(i) result = k proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS) let count = selectInto(s, timeout, result) result.setLen(count) template isEmpty*[T](s: Selector[T]): bool = (s.count == 0) proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = let fdi = fd.int fdi < s.maxFD and s.fds[fd.int].ident != InvalidIdent proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = let fdi = int(fd) if fdi in s: s.fds[fdi].data = data result = true template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body: untyped) = let fdi = int(fd) if fdi in s: var value = addr(s.fds[fdi].data) body template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, body2: untyped) = let fdi = int(fd) if fdi in s: var value = addr(s.fds[fdi].data) body1 else: body2 proc getFd*[T](s: Selector[T]): int = return s.kqFD.int