OS definitions and exceptions changes. (AsyncProc part 1). (#361)

* Initial commit.

* Finalize Windows part.

* Finalize Posix part.

* Fix style issues.

* Move osnet declarations to osdefs.

* Move osnet declarations to osdefs 2.

* Fix Nim 1.2 issues.

* Fix 1.2 compilation issues 2.

* Address review comments.
Fix SignalFdInfo issue.
This commit is contained in:
Eugene Kabanov 2023-02-21 12:48:36 +02:00 committed by GitHub
parent 77ffff322c
commit 0f70a6b8ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 3096 additions and 1654 deletions

View File

@ -8,7 +8,7 @@
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import std/[os, tables, strutils, heapqueue, deques, sequtils]
import std/sequtils
import stew/base10
import ./srcloc
export srcloc

View File

@ -13,12 +13,13 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[os, tables, strutils, heapqueue, lists, nativesockets, net,
deques]
import ./timer
from nativesockets import Port
import std/[tables, strutils, heapqueue, lists, options, deques]
import stew/results
import "."/[osdefs, osutils, timer]
export Port, SocketFlag
export timer
export Port
export timer, results
#{.injectStmt: newGcInvariant().}
@ -143,20 +144,17 @@ export timer
# TODO: Check if yielded future is nil and throw a more meaningful exception
const
unixPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(dragonfly) or defined(macos) or
defined(linux) or defined(android) or
defined(solaris)
MaxEventsCount* = 64
when defined(windows):
import winlean, sets, hashes
elif unixPlatform:
import ./selectors2
import std/[sets, hashes]
elif defined(macosx) or defined(freebsd) or defined(netbsd) or
defined(openbsd) or defined(dragonfly) or defined(macos) or
defined(linux) or defined(android) or defined(solaris):
import "."/selectors2
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
MSG_NOSIGNAL
from posix import SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
MSG_NOSIGNAL,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE
export SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
@ -190,7 +188,7 @@ type
idlers*: Deque[AsyncCallback]
trackers*: Table[string, TrackerBase]
proc sentinelCallbackImpl(arg: pointer) {.gcsafe, raises: [Defect].} =
proc sentinelCallbackImpl(arg: pointer) {.gcsafe.} =
raiseAssert "Sentinel callback MUST not be scheduled"
const
@ -270,38 +268,32 @@ template processCallbacks(loop: untyped) =
if not(isNil(callable.function)):
callable.function(callable.udata)
proc raiseAsDefect*(exc: ref Exception, msg: string) {.
raises: [Defect], noreturn, noinline.} =
proc raiseAsDefect*(exc: ref Exception, msg: string) {.noreturn, noinline.} =
# Reraise an exception as a Defect, where it's unexpected and can't be handled
# We include the stack trace in the message because otherwise, it's easily
# lost - Nim doesn't print it for `parent` exceptions for example (!)
raise (ref Defect)(
msg: msg & "\n" & exc.msg & "\n" & exc.getStackTrace(), parent: exc)
proc raiseOsDefect*(error: OSErrorCode, msg = "") {.noreturn, noinline.} =
# Reraise OS error code as a Defect, where it's unexpected and can't be
# handled. We include the stack trace in the message because otherwise,
# it's easily lost.
raise (ref Defect)(msg: msg & "\n[" & $int(error) & "] " & osErrorMsg(error) &
"\n" & getStackTrace())
func toException*(v: OSErrorCode): ref OSError = newOSError(v)
# This helper will allow to use `tryGet()` and raise OSError for
# Result[T, OSErrorCode] values.
when defined(windows):
type
WSAPROC_TRANSMITFILE = proc(hSocket: SocketHandle, hFile: Handle,
nNumberOfBytesToWrite: DWORD,
nNumberOfBytesPerSend: DWORD,
lpOverlapped: POVERLAPPED,
lpTransmitBuffers: pointer,
dwReserved: DWORD): cint {.
gcsafe, stdcall, raises: [].}
LPFN_GETQUEUEDCOMPLETIONSTATUSEX = proc(completionPort: Handle,
lpPortEntries: ptr OVERLAPPED_ENTRY,
ulCount: DWORD,
ulEntriesRemoved: var ULONG,
dwMilliseconds: DWORD,
fAlertable: WINBOOL): WINBOOL {.
gcsafe, stdcall, raises: [].}
CompletionKey = ULONG_PTR
CompletionData* = object
cb*: CallbackFunc
errCode*: OSErrorCode
bytesCount*: int32
bytesCount*: uint32
udata*: pointer
CustomOverlapped* = object of OVERLAPPED
@ -314,7 +306,7 @@ when defined(windows):
dwNumberOfBytesTransferred: DWORD
PDispatcher* = ref object of PDispatcherBase
ioPort: Handle
ioPort: HANDLE
handles: HashSet[AsyncFD]
connectEx*: WSAPROC_CONNECTEX
acceptEx*: WSAPROC_ACCEPTEX
@ -328,86 +320,82 @@ when defined(windows):
AsyncFD* = distinct int
proc getModuleHandle(lpModuleName: WideCString): Handle {.
stdcall, dynlib: "kernel32", importc: "GetModuleHandleW", sideEffect.}
proc getProcAddress(hModule: Handle, lpProcName: cstring): pointer {.
stdcall, dynlib: "kernel32", importc: "GetProcAddress", sideEffect.}
proc rtlNtStatusToDosError(code: uint64): ULONG {.
stdcall, dynlib: "ntdll", importc: "RtlNtStatusToDosError", sideEffect.}
proc hash(x: AsyncFD): Hash {.borrow.}
proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow, gcsafe.}
proc getFunc(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
proc getFunc(s: SocketHandle, fun: var pointer, guid: GUID): bool =
var bytesRet: DWORD
fun = nil
result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
result = wsaIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, unsafeAddr(guid),
sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
addr bytesRet, nil, nil) == 0
addr(bytesRet), nil, nil) == 0
proc globalInit() {.raises: [Defect, OSError].} =
var wsa: WSAData
if wsaStartup(0x0202'i16, addr wsa) != 0:
raiseOSError(osLastError())
proc globalInit() =
var wsa = WSAData()
let res = wsaStartup(0x0202'u16, addr wsa)
if res != 0:
raiseOsDefect(osLastError(),
"globalInit(): Unable to initialize Windows Sockets API")
proc initAPI(loop: PDispatcher) {.raises: [Defect, CatchableError].} =
var
WSAID_TRANSMITFILE = GUID(
D1: 0xb5367df0'i32, D2: 0xcbac'i16, D3: 0x11cf'i16,
D4: [0x95'i8, 0xca'i8, 0x00'i8, 0x80'i8,
0x5f'i8, 0x48'i8, 0xa1'i8, 0x92'i8])
proc initAPI(loop: PDispatcher) =
var funcPointer: pointer = nil
let kernel32 = getModuleHandle(newWideCString("kernel32.dll"))
loop.getQueuedCompletionStatusEx = cast[LPFN_GETQUEUEDCOMPLETIONSTATUSEX](
getProcAddress(kernel32, "GetQueuedCompletionStatusEx"))
let sock = winlean.socket(winlean.AF_INET, 1, 6)
if sock == INVALID_SOCKET:
raiseOSError(osLastError())
let sock = osdefs.socket(osdefs.AF_INET, 1, 6)
if sock == osdefs.INVALID_SOCKET:
raiseOsDefect(osLastError(), "initAPI(): Unable to create control socket")
var funcPointer: pointer = nil
if not getFunc(sock, funcPointer, WSAID_CONNECTEX):
let err = osLastError()
close(sock)
raiseOSError(err)
loop.connectEx = cast[WSAPROC_CONNECTEX](funcPointer)
if not getFunc(sock, funcPointer, WSAID_ACCEPTEX):
let err = osLastError()
close(sock)
raiseOSError(err)
loop.acceptEx = cast[WSAPROC_ACCEPTEX](funcPointer)
if not getFunc(sock, funcPointer, WSAID_GETACCEPTEXSOCKADDRS):
let err = osLastError()
close(sock)
raiseOSError(err)
loop.getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](funcPointer)
if not getFunc(sock, funcPointer, WSAID_TRANSMITFILE):
let err = osLastError()
close(sock)
raiseOSError(err)
loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer)
close(sock)
block:
let res = getFunc(sock, funcPointer, WSAID_CONNECTEX)
if not(res):
raiseOsDefect(osLastError(), "initAPI(): Unable to initialize " &
"dispatcher's ConnectEx()")
loop.connectEx = cast[WSAPROC_CONNECTEX](funcPointer)
proc newDispatcher*(): PDispatcher {.raises: [Defect, CatchableError].} =
block:
let res = getFunc(sock, funcPointer, WSAID_ACCEPTEX)
if not(res):
raiseOsDefect(osLastError(), "initAPI(): Unable to initialize " &
"dispatcher's AcceptEx()")
loop.acceptEx = cast[WSAPROC_ACCEPTEX](funcPointer)
block:
let res = getFunc(sock, funcPointer, WSAID_GETACCEPTEXSOCKADDRS)
if not(res):
raiseOsDefect(osLastError(), "initAPI(): Unable to initialize " &
"dispatcher's GetAcceptExSockAddrs()")
loop.getAcceptExSockAddrs =
cast[WSAPROC_GETACCEPTEXSOCKADDRS](funcPointer)
block:
let res = getFunc(sock, funcPointer, WSAID_TRANSMITFILE)
if not(res):
raiseOsDefect(osLastError(), "initAPI(): Unable to initialize " &
"dispatcher's TransmitFile()")
loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer)
if closeFd(sock) != 0:
raiseOsDefect(osLastError(), "initAPI(): Unable to close control socket")
proc newDispatcher*(): PDispatcher =
## Creates a new Dispatcher instance.
var res = PDispatcher()
res.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
when declared(initHashSet):
# After 0.20.0 Nim's stdlib version
res.handles = initHashSet[AsyncFD]()
else:
# Pre 0.20.0 Nim's stdlib version
res.handles = initSet[AsyncFD]()
when declared(initHeapQueue):
# After 0.20.0 Nim's stdlib version
res.timers = initHeapQueue[TimerCallback]()
else:
# Pre 0.20.0 Nim's stdlib version
res.timers = newHeapQueue[TimerCallback]()
res.callbacks = initDeque[AsyncCallback](64)
let port = createIoCompletionPort(osdefs.INVALID_HANDLE_VALUE,
HANDLE(0), 0, 1)
if port == osdefs.INVALID_HANDLE_VALUE:
raiseOsDefect(osLastError(), "newDispatcher(): Unable to create " &
"IOCP port")
var res = PDispatcher(
ioPort: port,
handles: initHashSet[AsyncFD](),
timers: initHeapQueue[TimerCallback](),
callbacks: initDeque[AsyncCallback](64),
idlers: initDeque[AsyncCallback](),
trackers: initTable[string, TrackerBase]()
)
res.callbacks.addLast(SentinelCallback)
res.idlers = initDeque[AsyncCallback]()
res.trackers = initTable[string, TrackerBase]()
initAPI(res)
res
@ -416,24 +404,29 @@ when defined(windows):
proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].}
proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].}
proc getIoHandler*(disp: PDispatcher): Handle =
proc getIoHandler*(disp: PDispatcher): HANDLE =
## Returns the underlying IO Completion Port handle (Windows) or selector
## (Unix) for the specified dispatcher.
return disp.ioPort
disp.ioPort
proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
proc register2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getThreadDispatcher()
if createIoCompletionPort(fd.Handle, loop.ioPort,
cast[CompletionKey](fd), 1) == 0:
raiseOSError(osLastError())
if createIoCompletionPort(HANDLE(fd), loop.ioPort, cast[CompletionKey](fd),
1) == osdefs.INVALID_HANDLE_VALUE:
return err(osLastError())
loop.handles.incl(fd)
ok()
proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
proc register*(fd: AsyncFD) {.raises: [Defect, OSError].} =
## Register file descriptor ``fd`` in thread's dispatcher.
register2(fd).tryGet()
proc unregister*(fd: AsyncFD) =
## Unregisters ``fd``.
getThreadDispatcher().handles.excl(fd)
proc poll*() {.raises: [Defect, CatchableError].} =
proc poll*() =
## Perform single asynchronous step, processing timers and completing
## tasks. Blocks until at least one event has completed.
##
@ -444,7 +437,7 @@ when defined(windows):
var
curTime = Moment.now()
curTimeout = DWORD(0)
events: array[MaxEventsCount, OVERLAPPED_ENTRY]
events: array[MaxEventsCount, osdefs.OVERLAPPED_ENTRY]
# On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`,
# complete pending work of the outer `processCallbacks` call.
@ -463,13 +456,13 @@ when defined(windows):
cast[ptr POVERLAPPED](addr events[0].lpOverlapped),
curTimeout
)
if res == WINBOOL(0):
if res == FALSE:
let errCode = osLastError()
if not(isNil(events[0].lpOverlapped)):
1
else:
if int32(errCode) != WAIT_TIMEOUT:
raiseOSError(errCode)
if uint32(errCode) != WAIT_TIMEOUT:
raiseOsDefect(errCode, "poll(): Unable to get OS events")
0
else:
1
@ -483,16 +476,16 @@ when defined(windows):
curTimeout,
WINBOOL(0)
)
if res == WINBOOL(0):
if res == FALSE:
let errCode = osLastError()
if int32(errCode) != WAIT_TIMEOUT:
raiseOSError(errCode)
if uint32(errCode) != WAIT_TIMEOUT:
raiseOsDefect(errCode, "poll(): Unable to get OS events")
0
else:
eventsReceived
int(eventsReceived)
for i in 0 ..< networkEventsCount:
var customOverlapped = events[i].lpOverlapped
var customOverlapped = PtrCustomOverlapped(events[i].lpOverlapped)
customOverlapped.data.errCode =
block:
let res = cast[uint64](customOverlapped.internal)
@ -525,25 +518,35 @@ when defined(windows):
## Closes a socket and ensures that it is unregistered.
let loop = getThreadDispatcher()
loop.handles.excl(fd)
close(SocketHandle(fd))
let param =
if closeFd(SocketHandle(fd)) == 0:
OSErrorCode(0)
else:
osLastError()
if not isNil(aftercb):
var acb = AsyncCallback(function: aftercb)
var acb = AsyncCallback(function: aftercb, udata: cast[pointer](param))
loop.callbacks.addLast(acb)
proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Closes a (pipe/file) handle and ensures that it is unregistered.
let loop = getThreadDispatcher()
loop.handles.excl(fd)
discard closeHandle(Handle(fd))
let param =
if closeFd(HANDLE(fd)) == 0:
OSErrorCode(0)
else:
osLastError()
if not isNil(aftercb):
var acb = AsyncCallback(function: aftercb)
var acb = AsyncCallback(function: aftercb, udata: cast[pointer](param))
loop.callbacks.addLast(acb)
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
return fd in disp.handles
fd in disp.handles
elif unixPlatform:
elif defined(macosx) or defined(freebsd) or defined(netbsd) or
defined(openbsd) or defined(dragonfly) or defined(macos) or
defined(linux) or defined(android) or defined(solaris):
const
SIG_IGN = cast[proc(x: cint) {.raises: [], noconv, gcsafe.}](1)
@ -564,24 +567,27 @@ elif unixPlatform:
# We are ignoring SIGPIPE signal, because we are working with EPIPE.
posix.signal(cint(SIGPIPE), SIG_IGN)
proc initAPI(disp: PDispatcher) {.raises: [Defect, CatchableError].} =
proc initAPI(disp: PDispatcher) {.raises: [Defect].} =
discard
proc newDispatcher*(): PDispatcher {.raises: [Defect, CatchableError].} =
proc newDispatcher*(): PDispatcher {.raises: [Defect].} =
## Create new dispatcher.
var res = PDispatcher()
res.selector = newSelector[SelectorData]()
when declared(initHeapQueue):
# After 0.20.0 Nim's stdlib version
res.timers = initHeapQueue[TimerCallback]()
else:
# Before 0.20.0 Nim's stdlib version
res.timers.newHeapQueue()
res.callbacks = initDeque[AsyncCallback](64)
let selector =
try:
newSelector[SelectorData]()
except IOSelectorsException as exc:
raiseAsDefect exc, "Could not initialize selector"
except CatchableError as exc:
raiseAsDefect exc, "Could not initialize selector"
var res = PDispatcher(
selector: selector,
timers: initHeapQueue[TimerCallback](),
callbacks: initDeque[AsyncCallback](64),
idlers: initDeque[AsyncCallback](),
keys: newSeq[ReadyKey](64),
trackers: initTable[string, TrackerBase]()
)
res.callbacks.addLast(SentinelCallback)
res.idlers = initDeque[AsyncCallback]()
res.keys = newSeq[ReadyKey](64)
res.trackers = initTable[string, TrackerBase]()
initAPI(res)
res
@ -592,24 +598,33 @@ elif unixPlatform:
proc getIoHandler*(disp: PDispatcher): Selector[SelectorData] =
## Returns system specific OS queue.
return disp.selector
disp.selector
proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
proc register2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getThreadDispatcher()
var data: SelectorData
loop.selector.registerHandle(int(fd), {}, data)
try:
var data: SelectorData
loop.selector.registerHandle(int(fd), {}, data)
except CatchableError:
return err(osLastError())
ok()
proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
proc unregister2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Unregister file descriptor ``fd`` from thread's dispatcher.
getThreadDispatcher().selector.unregister(int(fd))
let loop = getThreadDispatcher()
try:
loop.selector.unregister(int(fd))
except CatchableError:
return err(osLastError())
ok()
proc contains*(disp: PDispatcher, fd: AsyncFD): bool {.inline.} =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
result = int(fd) in disp.selector
int(fd) in disp.selector
proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {.
raises: [Defect, IOSelectorsException, ValueError].} =
proc addReader2*(fd: AsyncFD, cb: CallbackFunc,
udata: pointer = nil): Result[void, OSErrorCode] =
## Start watching the file descriptor ``fd`` for read availability and then
## call the callback ``cb`` with specified argument ``udata``.
let loop = getThreadDispatcher()
@ -617,15 +632,18 @@ elif unixPlatform:
withData(loop.selector, int(fd), adata) do:
let acb = AsyncCallback(function: cb, udata: udata)
adata.reader = acb
newEvents.incl(Event.Read)
if not(isNil(adata.writer.function)):
newEvents.incl(Event.Write)
do:
raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents)
return err(OSErrorCode(osdefs.EBADF))
proc removeReader*(fd: AsyncFD) {.
raises: [Defect, IOSelectorsException, ValueError].} =
try:
loop.selector.updateHandle(int(fd), newEvents)
except CatchableError:
return err(osLastError())
ok()
proc removeReader2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Stop watching the file descriptor ``fd`` for read availability.
let loop = getThreadDispatcher()
var newEvents: set[Event]
@ -635,11 +653,16 @@ elif unixPlatform:
if not(isNil(adata.writer.function)):
newEvents.incl(Event.Write)
do:
raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents)
return err(OSErrorCode(osdefs.EBADF))
proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {.
raises: [Defect, IOSelectorsException, ValueError].} =
try:
loop.selector.updateHandle(int(fd), newEvents)
except CatchableError:
return err(osLastError())
ok()
proc addWriter2*(fd: AsyncFD, cb: CallbackFunc,
udata: pointer = nil): Result[void, OSErrorCode] =
## Start watching the file descriptor ``fd`` for write availability and then
## call the callback ``cb`` with specified argument ``udata``.
let loop = getThreadDispatcher()
@ -647,15 +670,18 @@ elif unixPlatform:
withData(loop.selector, int(fd), adata) do:
let acb = AsyncCallback(function: cb, udata: udata)
adata.writer = acb
newEvents.incl(Event.Write)
if not(isNil(adata.reader.function)):
newEvents.incl(Event.Read)
do:
raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents)
return err(OSErrorCode(osdefs.EBADF))
proc removeWriter*(fd: AsyncFD) {.
raises: [Defect, IOSelectorsException, ValueError].} =
try:
loop.selector.updateHandle(int(fd), newEvents)
except CatchableError:
return err(osLastError())
ok()
proc removeWriter2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Stop watching the file descriptor ``fd`` for write availability.
let loop = getThreadDispatcher()
var newEvents: set[Event]
@ -665,8 +691,54 @@ elif unixPlatform:
if not(isNil(adata.reader.function)):
newEvents.incl(Event.Read)
do:
raise newException(ValueError, "File descriptor not registered.")
loop.selector.updateHandle(int(fd), newEvents)
return err(OSErrorCode(osdefs.EBADF))
try:
loop.selector.updateHandle(int(fd), newEvents)
except CatchableError:
return err(osLastError())
ok()
proc register*(fd: AsyncFD) {.raises: [Defect, OSError].} =
## Register file descriptor ``fd`` in thread's dispatcher.
register2(fd).tryGet()
proc unregister*(fd: AsyncFD) {.raises: [Defect, OSError].} =
## Unregister file descriptor ``fd`` from thread's dispatcher.
unregister2(fd).tryGet()
proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {.
raises: [Defect, OSError].} =
## Start watching the file descriptor ``fd`` for read availability and then
## call the callback ``cb`` with specified argument ``udata``.
addReader2(fd, cb, udata).tryGet()
proc removeReader*(fd: AsyncFD) {.raises: [Defect, OSError].} =
## Stop watching the file descriptor ``fd`` for read availability.
removeReader2(fd).tryGet()
proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {.
raises: [Defect, OSError].} =
## Start watching the file descriptor ``fd`` for write availability and then
## call the callback ``cb`` with specified argument ``udata``.
addWriter2(fd, cb, udata).tryGet()
proc removeWriter*(fd: AsyncFD) {.raises: [Defect, OSError].} =
## Stop watching the file descriptor ``fd`` for write availability.
removeWriter2(fd).tryGet()
proc unregisterAndCloseFd*(fd: AsyncFD): Result[void, OSErrorCode] =
## Unregister from system queue and close asynchronous socket.
##
## NOTE: Use this function to close temporary sockets/pipes only (which
## are not exposed to the public and not supposed to be used/reused).
## Please use closeSocket(AsyncFD) and closeHandle(AsyncFD) instead.
doAssert(fd != AsyncFD(osdefs.INVALID_SOCKET))
? unregister2(fd)
if closeFd(cint(fd)) != 0:
err(osLastError())
else:
ok()
proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Close asynchronous socket.
@ -678,15 +750,21 @@ elif unixPlatform:
let loop = getThreadDispatcher()
proc continuation(udata: pointer) =
if SocketHandle(fd) in loop.selector:
try:
unregister(fd)
except CatchableError as exc:
raiseAsDefect(exc, "unregister failed")
close(SocketHandle(fd))
let param =
if SocketHandle(fd) in loop.selector:
let ures = unregister2(fd)
if ures.isErr():
discard closeFd(cint(fd))
ures.error()
else:
if closeFd(cint(fd)) != 0:
osLastError()
else:
OSErrorCode(0)
else:
OSErrorCode(osdefs.EBADF)
if not isNil(aftercb):
aftercb(nil)
aftercb(cast[pointer](param))
withData(loop.selector, int(fd), adata) do:
# We are scheduling reader and writer callbacks to be called
@ -739,7 +817,7 @@ elif unixPlatform:
let loop = getThreadDispatcher()
loop.selector.unregister(sigfd)
proc poll*() {.raises: [Defect, CatchableError].} =
proc poll*() {.gcsafe.} =
## Perform single asynchronous step.
let loop = getThreadDispatcher()
var curTime = Moment.now()
@ -758,17 +836,21 @@ elif unixPlatform:
loop.processTimersGetTimeout(curTimeout)
# Processing IO descriptors and all hardware events.
let count = loop.selector.selectInto(curTimeout, loop.keys)
let count =
try:
loop.selector.selectInto(curTimeout, loop.keys)
except IOSelectorsException:
raiseOsDefect(osLastError(), "poll(): Unable to get OS events")
for i in 0..<count:
let fd = loop.keys[i].fd
let events = loop.keys[i].events
withData(loop.selector, fd, adata) do:
if Event.Read in events or events == {Event.Error}:
if (Event.Read in events) or (events == {Event.Error}):
if not isNil(adata.reader.function):
loop.callbacks.addLast(adata.reader)
if Event.Write in events or events == {Event.Error}:
if (Event.Write in events) or (events == {Event.Error}):
if not isNil(adata.writer.function):
loop.callbacks.addLast(adata.writer)

View File

@ -12,50 +12,31 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[net, nativesockets]
import ./asyncloop
when defined(windows) or defined(nimdoc):
import os, winlean, stew/base10
const
asyncInvalidSocket* = AsyncFD(-1)
TCP_NODELAY* = 1
IPPROTO_TCP* = 6
PIPE_TYPE_BYTE = 0x00000000'i32
PIPE_READMODE_BYTE = 0x00000000'i32
PIPE_WAIT = 0x00000000'i32
DEFAULT_PIPE_SIZE = 65536'i32
ERROR_PIPE_CONNECTED = 535
ERROR_PIPE_BUSY = 231
pipeHeaderName = r"\\.\pipe\LOCAL\chronos\"
proc connectNamedPipe(hNamedPipe: Handle, lpOverlapped: pointer): WINBOOL
{.importc: "ConnectNamedPipe", stdcall, dynlib: "kernel32".}
else:
import os, posix
const
asyncInvalidSocket* = AsyncFD(posix.INVALID_SOCKET)
TCP_NODELAY* = 1
IPPROTO_TCP* = 6
import "."/[asyncloop, osdefs, osutils]
import stew/results
from nativesockets import Domain, Protocol, SockType, toInt
export Domain, Protocol, SockType, results
const
asyncInvalidSocket* = AsyncFD(osdefs.INVALID_SOCKET)
asyncInvalidPipe* = asyncInvalidSocket
proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool =
## Sets blocking mode on socket.
when defined(windows) or defined(nimdoc):
var mode = clong(ord(not blocking))
if ioctlsocket(s, FIONBIO, addr(mode)) == -1:
if osdefs.ioctlsocket(s, osdefs.FIONBIO, addr(mode)) == -1:
false
else:
true
else:
let x: int = fcntl(s, F_GETFL, 0)
let x: int = osdefs.fcntl(s, osdefs.F_GETFL, 0)
if x == -1:
false
else:
let mode = if blocking: x and not O_NONBLOCK else: x or O_NONBLOCK
if fcntl(s, F_SETFL, mode) == -1:
let mode =
if blocking: x and not osdefs.O_NONBLOCK else: x or osdefs.O_NONBLOCK
if osdefs.fcntl(s, osdefs.F_SETFL, mode) == -1:
false
else:
true
@ -64,23 +45,23 @@ proc setSockOpt*(socket: AsyncFD, level, optname, optval: int): bool =
## `setsockopt()` for integer options.
## Returns ``true`` on success, ``false`` on error.
var value = cint(optval)
setsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(value), SockLen(sizeof(value))) >= cint(0)
osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(value), SockLen(sizeof(value))) >= cint(0)
proc setSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: int): bool =
## `setsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
setsockopt(SocketHandle(socket), cint(level), cint(optname), value,
SockLen(valuelen)) >= cint(0)
osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname), value,
SockLen(valuelen)) >= cint(0)
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool =
## `getsockopt()` for integer options.
## Returns ``true`` on success, ``false`` on error.
var res: cint
var size = SockLen(sizeof(res))
if getsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(res), addr(size)) >= cint(0):
if osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(res), addr(size)) >= cint(0):
value = int(res)
true
else:
@ -90,36 +71,122 @@ proc getSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: var int): bool =
## `getsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
getsockopt(SocketHandle(socket), cint(level), cint(optname),
value, cast[ptr SockLen](addr valuelen)) >= cint(0)
osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname),
value, cast[ptr SockLen](addr valuelen)) >= cint(0)
proc getSocketError*(socket: AsyncFD, err: var int): bool =
## Recover error code associated with socket handle ``socket``.
getSockOpt(socket, cint(SOL_SOCKET), cint(SO_ERROR), err)
getSockOpt(socket, cint(osdefs.SOL_SOCKET), cint(osdefs.SO_ERROR), err)
proc createAsyncSocket2*(domain: Domain, sockType: SockType,
protocol: Protocol,
inherit = true): Result[AsyncFD, OSErrorCode] =
## Creates new asynchronous socket.
when defined(windows):
let flags =
if inherit:
osdefs.WSA_FLAG_OVERLAPPED
else:
osdefs.WSA_FLAG_OVERLAPPED or osdefs.WSA_FLAG_NO_HANDLE_INHERIT
let fd = wsaSocket(toInt(domain), toInt(sockType), toInt(protocol),
nil, GROUP(0), flags)
if fd == osdefs.INVALID_SOCKET:
return err(osLastError())
let bres = setDescriptorBlocking(fd, false)
if bres.isErr():
discard closeFd(fd)
return err(bres.error())
let res = register2(AsyncFD(fd))
if res.isErr():
discard closeFd(fd)
return err(res.error())
ok(AsyncFD(fd))
else:
when declared(SOCK_NONBLOCK) and declared(SOCK_CLOEXEC):
let socketType =
if inherit:
toInt(sockType) or osdefs.SOCK_NONBLOCK
else:
toInt(sockType) or osdefs.SOCK_NONBLOCK or osdefs.SOCK_CLOEXEC
let fd = osdefs.socket(toInt(domain), socketType, toInt(protocol))
if fd == -1:
return err(osLastError())
let res = register2(AsyncFD(fd))
if res.isErr():
discard closeFd(fd)
return err(res.error())
ok(AsyncFD(fd))
else:
let fd = osdefs.socket(toInt(domain), toInt(sockType), toInt(protocol))
if fd == -1:
return err(osLastError())
let bres = setDescriptorFlags(cint(fd), true, true)
if bres.isErr():
discard closeFd(fd)
return err(bres.error())
let res = register2(AsyncFD(fd))
if res.isErr():
discard closeFd(fd)
return err(bres.error())
ok(AsyncFD(fd))
proc wrapAsyncSocket2*(sock: cint|SocketHandle): Result[AsyncFD, OSErrorCode] =
## Wraps socket to asynchronous socket handle.
let fd =
when defined(windows):
sock
else:
when sock is cint: sock else: cint(sock)
? setDescriptorFlags(fd, true, true)
? register2(AsyncFD(fd))
ok(AsyncFD(fd))
proc createAsyncSocket*(domain: Domain, sockType: SockType,
protocol: Protocol): AsyncFD {.
raises: [Defect, CatchableError].} =
protocol: Protocol,
inherit = true): AsyncFD =
## Creates new asynchronous socket.
## Returns ``asyncInvalidSocket`` on error.
let handle = createNativeSocket(domain, sockType, protocol)
if handle == osInvalidSocket:
createAsyncSocket2(domain, sockType, protocol, inherit).valueOr:
return asyncInvalidSocket
if not setSocketBlocking(handle, false):
close(handle)
return asyncInvalidSocket
register(AsyncFD(handle))
AsyncFD(handle)
proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD {.
proc wrapAsyncSocket*(sock: cint|SocketHandle): AsyncFD {.
raises: [Defect, CatchableError].} =
## Wraps socket to asynchronous socket handle.
## Return ``asyncInvalidSocket`` on error.
if not setSocketBlocking(sock, false):
close(sock)
wrapAsyncSocket2(sock).valueOr:
return asyncInvalidSocket
register(AsyncFD(sock))
AsyncFD(sock)
proc getMaxOpenFiles2*(): Result[int, OSErrorCode] =
## Returns maximum file descriptor number that can be opened by this process.
##
## Note: On Windows its impossible to obtain such number, so getMaxOpenFiles()
## will return constant value of 16384. You can get more information on this
## link https://docs.microsoft.com/en-us/archive/blogs/markrussinovich/pushing-the-limits-of-windows-handles
when defined(windows) or defined(nimdoc):
ok(16384)
else:
var limits: RLimit
if osdefs.getrlimit(osdefs.RLIMIT_NOFILE, limits) != 0:
return err(osLastError())
ok(int(limits.rlim_cur))
proc setMaxOpenFiles2*(count: int): Result[void, OSErrorCode] =
## Set maximum file descriptor number that can be opened by this process.
##
## Note: On Windows its impossible to set this value, so it just a nop call.
when defined(windows) or defined(nimdoc):
ok()
else:
var limits: RLimit
if getrlimit(osdefs.RLIMIT_NOFILE, limits) != 0:
return err(osLastError())
limits.rlim_cur = count
if setrlimit(osdefs.RLIMIT_NOFILE, limits) != 0:
return err(osLastError())
ok()
proc getMaxOpenFiles*(): int {.raises: [Defect, OSError].} =
## Returns maximum file descriptor number that can be opened by this process.
@ -127,93 +194,39 @@ proc getMaxOpenFiles*(): int {.raises: [Defect, OSError].} =
## Note: On Windows its impossible to obtain such number, so getMaxOpenFiles()
## will return constant value of 16384. You can get more information on this
## link https://docs.microsoft.com/en-us/archive/blogs/markrussinovich/pushing-the-limits-of-windows-handles
when defined(windows) or defined(nimdoc):
16384
else:
var limits: RLimit
if getrlimit(posix.RLIMIT_NOFILE, limits) != 0:
raiseOSError(osLastError())
int(limits.rlim_cur)
let res = getMaxOpenFiles2()
if res.isErr():
raiseOSError(res.error())
res.get()
proc setMaxOpenFiles*(count: int) {.raises: [Defect, OSError].} =
## Set maximum file descriptor number that can be opened by this process.
##
## Note: On Windows its impossible to set this value, so it just a nop call.
when defined(windows) or defined(nimdoc):
discard
let res = setMaxOpenFiles2(count)
if res.isErr():
raiseOSError(res.error())
proc getInheritable*(fd: AsyncFD): Result[bool, OSErrorCode] =
## Returns ``true`` if ``fd`` is inheritable handle.
when defined(windows):
var flags = 0'u32
if getHandleInformation(HANDLE(fd), flags) == FALSE:
return err(osLastError())
ok((flags and HANDLE_FLAG_INHERIT) == HANDLE_FLAG_INHERIT)
else:
var limits: RLimit
if getrlimit(posix.RLIMIT_NOFILE, limits) != 0:
raiseOSError(osLastError())
limits.rlim_cur = count
if setrlimit(posix.RLIMIT_NOFILE, limits) != 0:
raiseOSError(osLastError())
let flags = osdefs.fcntl(cint(fd), osdefs.F_GETFD)
if flags == -1:
return err(osLastError())
ok((flags and osdefs.FD_CLOEXEC) == osdefs.FD_CLOEXEC)
proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] =
## Create new asynchronouse pipe.
## Returns tuple of read pipe handle and write pipe handle``asyncInvalidPipe``
## on error.
when defined(windows):
var pipeIn, pipeOut: Handle
var pipeName: string
var uniq = 0'u64
var sa = SECURITY_ATTRIBUTES(nLength: sizeof(SECURITY_ATTRIBUTES).cint,
lpSecurityDescriptor: nil, bInheritHandle: 0)
while true:
QueryPerformanceCounter(uniq)
pipeName = pipeHeaderName & Base10.toString(uniq)
var openMode = FILE_FLAG_FIRST_PIPE_INSTANCE or FILE_FLAG_OVERLAPPED or
PIPE_ACCESS_INBOUND
var pipeMode = PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT
pipeIn = createNamedPipe(newWideCString(pipeName), openMode, pipeMode,
1'i32, DEFAULT_PIPE_SIZE, DEFAULT_PIPE_SIZE,
0'i32, addr sa)
if pipeIn == INVALID_HANDLE_VALUE:
let err = osLastError()
# If error in {ERROR_ACCESS_DENIED, ERROR_PIPE_BUSY}, then named pipe
# with such name already exists.
if int32(err) != ERROR_ACCESS_DENIED and int32(err) != ERROR_PIPE_BUSY:
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
continue
else:
break
var openMode = (GENERIC_WRITE or FILE_WRITE_DATA or SYNCHRONIZE)
pipeOut = createFileW(newWideCString(pipeName), openMode, 0, addr(sa),
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0)
if pipeOut == INVALID_HANDLE_VALUE:
discard closeHandle(pipeIn)
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
var ovl = OVERLAPPED()
let res = connectNamedPipe(pipeIn, cast[pointer](addr ovl))
if res == 0:
let err = osLastError()
case int32(err)
of ERROR_PIPE_CONNECTED:
discard
of ERROR_IO_PENDING:
var bytesRead = 0.DWORD
if getOverlappedResult(pipeIn, addr ovl, bytesRead, 1) == 0:
discard closeHandle(pipeIn)
discard closeHandle(pipeOut)
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
else:
discard closeHandle(pipeIn)
discard closeHandle(pipeOut)
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
(read: AsyncFD(pipeIn), write: AsyncFD(pipeOut))
elif defined(nimdoc): discard
let res = createOsPipe(AsyncDescriptorDefault, AsyncDescriptorDefault)
if res.isErr():
(read: asyncInvalidPipe, write: asyncInvalidPipe)
else:
var fds: array[2, cint]
if posix.pipe(fds) == -1:
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
if not(setSocketBlocking(SocketHandle(fds[0]), false)) or
not(setSocketBlocking(SocketHandle(fds[1]), false)):
return (read: asyncInvalidPipe, write: asyncInvalidPipe)
(read: AsyncFD(fds[0]), write: AsyncFD(fds[1]))
let pipes = res.get()
(read: AsyncFD(pipes.read), write: AsyncFD(pipes.write))

View File

@ -424,14 +424,14 @@ proc selectInto*[T](s: Selector[T], timeout: int,
rkey.events.incl(Event.Timer)
elif Event.Signal in pkey.events:
var data = SignalFdInfo()
if posix.read(cint(fdi), addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
if posix.read(cint(fdi), addr data, sizeof(SignalFdInfo)) !=
sizeof(SignalFdInfo):
raiseIOSelectorsError(osLastError())
rkey.events.incl(Event.Signal)
elif Event.Process in pkey.events:
var data = SignalFdInfo()
if posix.read(cint(fdi), addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
if posix.read(cint(fdi), addr data, sizeof(SignalFdInfo)) !=
sizeof(SignalFdInfo):
raiseIOSelectorsError(osLastError())
if data.ssi_pid == uint32(pkey.param):
rkey.events.incl(Event.Process)

1532
chronos/osdefs.nim Normal file

File diff suppressed because it is too large Load Diff

341
chronos/osutils.nim Normal file
View File

@ -0,0 +1,341 @@
#
# Chronos' OS helpers
#
# (c) Copyright 2022-Present Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import stew/results
import osdefs
export results
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
when defined(windows) or defined(nimdoc):
import stew/base10
const PipeHeaderName* = r"\\.\pipe\LOCAL\chronos\"
type
DescriptorFlag* {.pure.} = enum
CloseOnExec, NonBlock
const
AsyncDescriptorDefault* = {
DescriptorFlag.CloseOnExec, DescriptorFlag.NonBlock}
when defined(windows):
type
WINDESCRIPTOR* = SocketHandle|HANDLE
template handleEintr*(body: untyped): untyped =
discard
proc setDescriptorInheritance*(s: WINDESCRIPTOR,
value: bool): Result[void, OSErrorCode] =
var flags = 0'u32
let fd = when s is SocketHandle: HANDLE(s) else: s
if getHandleInformation(fd, flags) == FALSE:
return err(osLastError())
if value != ((flags and HANDLE_FLAG_INHERIT) == HANDLE_FLAG_INHERIT):
let mode = if value: HANDLE_FLAG_INHERIT else: 0'u32
if setHandleInformation(fd, HANDLE_FLAG_INHERIT, mode) == FALSE:
return err(osLastError())
ok()
proc getDescriptorInheritance*(s: WINDESCRIPTOR
): Result[bool, OSErrorCode] =
var flags = 0'u32
let fd = when s is SocketHandle: HANDLE(s) else: s
if getHandleInformation(fd, flags) == FALSE:
return err(osLastError())
ok((flags and HANDLE_FLAG_INHERIT) == HANDLE_FLAG_INHERIT)
proc setDescriptorBlocking*(s: SocketHandle,
value: bool): Result[void, OSErrorCode] =
var mode = clong(ord(not value))
if ioctlsocket(s, osdefs.FIONBIO, addr(mode)) == -1:
return err(osLastError())
ok()
proc setDescriptorFlags*(s: WINDESCRIPTOR, nonblock,
cloexec: bool): Result[void, OSErrorCode] =
? setDescriptorBlocking(s, not(nonblock))
? setDescriptorInheritance(s, not(cloexec))
ok()
proc closeFd*(s: SocketHandle): int =
int(osdefs.closesocket(s))
proc closeFd*(s: HANDLE): int =
if osdefs.closeHandle(s) == TRUE: 0 else: -1
proc toWideString*(s: string): Result[LPWSTR, OSErrorCode] =
if len(s) == 0:
ok(cast[LPWSTR](alloc0(sizeof(WCHAR))))
else:
let charsNeeded = multiByteToWideChar(CP_UTF8, 0'u32,
cast[ptr char](unsafeAddr s[0]),
cint(len(s)), nil, cint(0))
if charsNeeded <= cint(0):
return err(osLastError())
var buffer = cast[LPWSTR](alloc0((charsNeeded + 1) * sizeof(WCHAR)))
let res = multiByteToWideChar(CP_UTF8, 0'u32,
cast[ptr char](unsafeAddr s[0]),
cint(len(s)), buffer, charsNeeded)
if res != charsNeeded:
err(osLastError())
else:
ok(buffer)
proc toString*(w: LPWSTR): Result[string, OSErrorCode] =
if isNil(w):
ok("")
else:
let bytesNeeded = wideCharToMultiByte(CP_UTF8, 0'u32, w, cint(-1), nil,
cint(0), nil, nil)
if bytesNeeded <= cint(0):
return err(osLastError())
var buffer = newString(bytesNeeded)
let res = wideCharToMultiByte(CP_UTF8, 0'u32, w, cint(-1),
addr buffer[0], cint(len(buffer)), nil, nil)
if res != bytesNeeded:
err(osLastError())
else:
# We need to strip trailing `\x00`.
for i in countdown(len(buffer) - 1, 0):
if buffer[i] != '\x00':
buffer.setLen(i + 1)
break
ok(buffer)
proc free*(w: LPWSTR) =
if not(isNil(w)):
dealloc(cast[pointer](w))
proc createOsPipe*(readset, writeset: set[DescriptorFlag]
): Result[tuple[read: HANDLE, write: HANDLE], OSErrorCode] =
var
pipeIn, pipeOut: HANDLE
widePipeName: LPWSTR
uniq = 0'u64
rsa = getSecurityAttributes(DescriptorFlag.CloseOnExec notin readset)
wsa = getSecurityAttributes(DescriptorFlag.CloseOnExec notin writeset)
while true:
queryPerformanceCounter(uniq)
let pipeName = PipeHeaderName & Base10.toString(uniq)
let openMode =
if DescriptorFlag.NonBlock in readset:
osdefs.FILE_FLAG_FIRST_PIPE_INSTANCE or osdefs.FILE_FLAG_OVERLAPPED or
osdefs.PIPE_ACCESS_INBOUND
else:
osdefs.FILE_FLAG_FIRST_PIPE_INSTANCE or osdefs.PIPE_ACCESS_INBOUND
let pipeMode = osdefs.PIPE_TYPE_BYTE or osdefs.PIPE_READMODE_BYTE or
osdefs.PIPE_WAIT
widePipeName =
block:
let res = pipeName.toWideString()
if res.isErr():
return err(res.error())
res.get()
pipeIn = createNamedPipe(widePipeName, openMode, pipeMode,
1'u32, osdefs.DEFAULT_PIPE_SIZE,
osdefs.DEFAULT_PIPE_SIZE, 0'u32, addr rsa)
if pipeIn == osdefs.INVALID_HANDLE_VALUE:
let errorCode = osLastError()
free(widePipeName)
# If error in {ERROR_ACCESS_DENIED, ERROR_PIPE_BUSY}, then named pipe
# with such name already exists.
if (errorCode == osdefs.ERROR_ACCESS_DENIED) or
(errorCode == osdefs.ERROR_PIPE_BUSY):
continue
return err(errorCode)
else:
break
let openMode = osdefs.GENERIC_WRITE or osdefs.FILE_WRITE_DATA or
osdefs.SYNCHRONIZE
let openFlags =
if DescriptorFlag.NonBlock in writeset:
osdefs.FILE_FLAG_OVERLAPPED
else:
DWORD(0)
pipeOut = createFile(widePipeName, openMode, 0, addr wsa,
osdefs.OPEN_EXISTING, openFlags, HANDLE(0))
if pipeOut == osdefs.INVALID_HANDLE_VALUE:
let errorCode = osLastError()
free(widePipeName)
discard closeFd(pipeIn)
return err(errorCode)
var ovl = osdefs.OVERLAPPED()
let res =
if DescriptorFlag.NonBlock in writeset:
connectNamedPipe(pipeIn, addr ovl)
else:
connectNamedPipe(pipeIn, nil)
if res == 0:
let cleanupFlag =
block:
let errorCode = osLastError()
case int(errorCode)
of osdefs.ERROR_PIPE_CONNECTED:
false
of osdefs.ERROR_IO_PENDING:
if DescriptorFlag.NonBlock in writeset:
var bytesRead = 0.DWORD
if getOverlappedResult(pipeIn, addr ovl, bytesRead, 1) == FALSE:
true
else:
false
else:
true
else:
true
if cleanupFlag:
let errorCode = osLastError()
free(widePipeName)
discard closeFd(pipeIn)
discard closeFd(pipeOut)
return err(errorCode)
ok((read: pipeIn, write: pipeOut))
else:
template handleEintr*(body: untyped): untyped =
var res = 0
while true:
res = body
if not((res == -1) and (osLastError() == EINTR)):
break
res
proc setDescriptorBlocking*(s: cint,
value: bool): Result[void, OSErrorCode] =
let flags = handleEintr(osdefs.fcntl(s, osdefs.F_GETFL))
if flags == -1:
return err(osLastError())
if value != not((flags and osdefs.O_NONBLOCK) == osdefs.O_NONBLOCK):
let mode =
if value:
flags and not(osdefs.O_NONBLOCK)
else:
flags or osdefs.O_NONBLOCK
if handleEintr(osdefs.fcntl(s, osdefs.F_SETFL, mode)) == -1:
return err(osLastError())
ok()
proc setDescriptorInheritance*(s: cint,
value: bool): Result[void, OSErrorCode] =
let flags = handleEintr(osdefs.fcntl(s, osdefs.F_GETFD))
if flags == -1:
return err(osLastError())
if value != not((flags and osdefs.FD_CLOEXEC) == osdefs.FD_CLOEXEC):
let mode =
if value:
flags and not(osdefs.FD_CLOEXEC)
else:
flags or osdefs.FD_CLOEXEC
if handleEintr(osdefs.fcntl(s, osdefs.F_SETFD, mode)) == -1:
return err(osLastError())
ok()
proc getDescriptorInheritance*(s: cint): Result[bool, OSErrorCode] =
let flags = handleEintr(osdefs.fcntl(s, osdefs.F_GETFD))
if flags == -1:
return err(osLastError())
ok((flags and osdefs.FD_CLOEXEC) == osdefs.FD_CLOEXEC)
proc setDescriptorFlags*(s: cint, nonblock,
cloexec: bool): Result[void, OSErrorCode] =
? setDescriptorBlocking(s, not(nonblock))
? setDescriptorInheritance(s, not(cloexec))
ok()
proc closeFd*(s: cint): int =
handleEintr(osdefs.close(s))
proc closeFd*(s: SocketHandle): int =
handleEintr(osdefs.close(cint(s)))
proc acceptConn*(a1: cint, a2: ptr SockAddr, a3: ptr SockLen,
a4: set[DescriptorFlag]): Result[cint, OSErrorCode] =
when declared(accept4):
let flags =
block:
var res: cint = 0
if DescriptorFlag.CloseOnExec in a4:
res = res or osdefs.SOCK_CLOEXEC
if DescriptorFlag.NonBlock in a4:
res = res or osdefs.SOCK_NONBLOCK
res
let res = cint(handleEintr(accept4(a1, a2, a3, flags)))
if res == -1:
return err(osLastError())
ok(res)
else:
let sock = cint(handleEintr(cint(accept(SocketHandle(a1), a2, a3))))
if sock == -1:
return err(osLastError())
let
cloexec = DescriptorFlag.CloseOnExec in a4
nonblock = DescriptorFlag.NonBlock in a4
let res = setDescriptorFlags(sock, nonblock, cloexec)
if res.isErr():
discard closeFd(sock)
return err(res.error())
ok(sock)
proc createOsPipe*(readset, writeset: set[DescriptorFlag]
): Result[tuple[read: cint, write: cint], OSErrorCode] =
when declared(pipe2):
var fds: array[2, cint]
let readFlags =
block:
var res = cint(0)
if DescriptorFlag.CloseOnExec in readset:
res = res or osdefs.O_CLOEXEC
if DescriptorFlag.NonBlock in readset:
res = res or osdefs.O_NONBLOCK
res
if osdefs.pipe2(fds, readFlags) == -1:
return err(osLastError())
if readset != writeset:
let res = setDescriptorFlags(fds[1],
DescriptorFlag.NonBlock in writeset,
DescriptorFlag.CloseOnExec in writeset)
if res.isErr():
discard closeFd(fds[0])
discard closeFd(fds[1])
return err(res.error())
ok((read: fds[0], write: fds[1]))
else:
var fds: array[2, cint]
if osdefs.pipe(fds) == -1:
return err(osLastError())
block:
let res = setDescriptorFlags(fds[0],
DescriptorFlag.NonBlock in readset,
DescriptorFlag.CloseOnExec in readset)
if res.isErr():
discard closeFd(fds[0])
discard closeFd(fds[1])
return err(res.error())
block:
let res = setDescriptorFlags(fds[1],
DescriptorFlag.NonBlock in writeset,
DescriptorFlag.CloseOnExec in writeset)
if res.isErr():
discard closeFd(fds[0])
discard closeFd(fds[1])
return err(res.error())
ok((read: fds[0], write: fds[1]))

View File

@ -23,6 +23,7 @@
##
## You can specify which timer you want to use ``-d:asyncTimer=<system/mono>``.
import stew/base10
import "."/osdefs
const asyncTimer* {.strdefine.} = "mono"
@ -33,47 +34,38 @@ else:
when defined(windows):
when asyncTimer == "system":
from winlean import getSystemTimeAsFileTime, FILETIME
proc fastEpochTime*(): uint64 {.
inline, deprecated: "Use Moment.now()".} =
## Timer resolution is millisecond.
var t: FILETIME
getSystemTimeAsFileTime(t)
((cast[uint64](t.dwHighDateTime) shl 32) or
cast[uint64](t.dwLowDateTime)) div 10_000
((uint64(t.dwHighDateTime) shl 32) or uint64(t.dwLowDateTime)) div 10_000
proc fastEpochTimeNano(): uint64 {.inline.} =
## Timer resolution is nanosecond.
var t: FILETIME
getSystemTimeAsFileTime(t)
((cast[uint64](t.dwHighDateTime) shl 32) or
cast[uint64](t.dwLowDateTime)) * 100
((uint64(t.dwHighDateTime) shl 32) or uint64(t.dwLowDateTime)) * 100
else:
proc QueryPerformanceCounter*(res: var uint64) {.
importc: "QueryPerformanceCounter", stdcall, dynlib: "kernel32".}
proc QueryPerformanceFrequency(res: var uint64) {.
importc: "QueryPerformanceFrequency", stdcall, dynlib: "kernel32".}
var queryFrequencyM: uint64
var queryFrequencyN: uint64
proc fastEpochTimeNano(): uint64 {.inline.} =
## Procedure's resolution is nanosecond.
var res: uint64
QueryPerformanceCounter(res)
queryPerformanceCounter(res)
res * queryFrequencyN
proc fastEpochTime*(): uint64 {.inline, deprecated: "Use Moment.now()".} =
## Procedure's resolution is millisecond.
var res: uint64
QueryPerformanceCounter(res)
queryPerformanceCounter(res)
res div queryFrequencyM
proc setupQueryFrequence() =
var freq: uint64
QueryPerformanceFrequency(freq)
queryPerformanceFrequency(freq)
if freq < 1000:
queryFrequencyM = freq
else:
@ -85,13 +77,8 @@ when defined(windows):
elif defined(macosx):
when asyncTimer == "system":
from posix import Timeval
proc posix_gettimeofday(tp: var Timeval, unused: pointer = nil) {.
importc: "gettimeofday", header: "<sys/time.h>".}
proc fastEpochTime*(): uint64 {.
inline, deprecated: "Use Moment.now()".} =
proc fastEpochTime*(): uint64 {.inline, deprecated: "Use Moment.now()".} =
## Procedure's resolution is millisecond.
var t: Timeval
posix_gettimeofday(t)
@ -103,16 +90,6 @@ elif defined(macosx):
posix_gettimeofday(t)
uint64(t.tv_sec) * 1_000_000_000 + uint64(t.tv_usec) * 1_000
else:
type
MachTimebaseInfo {.importc: "struct mach_timebase_info",
header: "<mach/mach_time.h>", pure, final.} = object
numer: uint32
denom: uint32
proc mach_timebase_info(info: var MachTimebaseInfo) {.importc,
header: "<mach/mach_time.h>".}
proc mach_absolute_time(): uint64 {.importc, header: "<mach/mach_time.h>".}
var queryFrequencyN: uint64
var queryFrequencyD: uint64
@ -122,8 +99,7 @@ elif defined(macosx):
queryFrequencyN = info.numer
queryFrequencyD = info.denom
proc fastEpochTime*(): uint64 {.
inline, deprecated: "Use Moment.now()".} =
proc fastEpochTime*(): uint64 {.inline, deprecated: "Use Moment.now()".} =
## Procedure's resolution is millisecond.
let res = (mach_absolute_time() * queryFrequencyN) div queryFrequencyD
res div 1_000_000
@ -135,11 +111,8 @@ elif defined(macosx):
setupQueryFrequence()
elif defined(posix):
from posix import clock_gettime, Timespec, CLOCK_REALTIME, CLOCK_MONOTONIC
when asyncTimer == "system":
proc fastEpochTime*(): uint64 {.
inline, deprecated: "Use Moment.now()".} =
proc fastEpochTime*(): uint64 {.inline, deprecated: "Use Moment.now()".} =
## Procedure's resolution is millisecond.
var t: Timespec
discard clock_gettime(CLOCK_REALTIME, t)
@ -152,8 +125,7 @@ elif defined(posix):
uint64(t.tv_sec) * 1_000_000_000'u64 + uint64(t.tv_nsec)
else:
proc fastEpochTime*(): uint64 {.
inline, deprecated: "Use Moment.now()".} =
proc fastEpochTime*(): uint64 {.inline, deprecated: "Use Moment.now()".} =
## Procedure's resolution is millisecond.
var t: Timespec
discard clock_gettime(CLOCK_MONOTONIC, t)
@ -166,10 +138,7 @@ elif defined(posix):
uint64(t.tv_sec) * 1_000_000_000'u64 + uint64(t.tv_nsec)
elif defined(nimdoc):
proc fastEpochTime*(): uint64 {.deprecated: "Use Moment.now()".}
## Returns system's timer in milliseconds.
discard
else:
error("Sorry, your operation system is not yet supported!")

View File

@ -12,16 +12,11 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[os, strutils, nativesockets, net]
import stew/[base10, endians2, byteutils]
import ../asyncloop
import std/[strutils, nativesockets, net]
import stew/[base10, byteutils]
import ".."/[asyncloop, osdefs]
export net
when defined(windows) or defined(nimdoc):
import winlean
else:
import posix
const
DefaultStreamBufferSize* = 4096 ## Default buffer size for stream
## transports
@ -79,7 +74,7 @@ when defined(windows) or defined(nimdoc):
errorCode*: OSErrorCode # Current error code
abuffer*: array[128, byte] # Windows AcceptEx() buffer
when defined(windows):
aovl*: CustomOverlapped # AcceptEx OVERLAPPED structure
aovl*: CustomOverlapped # AcceptEx OVERLAPPED structure
else:
type
SocketServer* = ref object of RootRef
@ -605,46 +600,6 @@ proc isLiteral*[T](s: seq[T]): bool {.inline.} =
else:
(cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0
when defined(windows):
import winlean
const
ERROR_OPERATION_ABORTED* = 995
ERROR_PIPE_CONNECTED* = 535
ERROR_PIPE_BUSY* = 231
ERROR_SUCCESS* = 0
ERROR_CONNECTION_REFUSED* = 1225
PIPE_TYPE_BYTE* = 0
PIPE_READMODE_BYTE* = 0
PIPE_TYPE_MESSAGE* = 0x4
PIPE_READMODE_MESSAGE* = 0x2
PIPE_WAIT* = 0
PIPE_UNLIMITED_INSTANCES* = 255
ERROR_BROKEN_PIPE* = 109
ERROR_PIPE_NOT_CONNECTED* = 233
ERROR_NO_DATA* = 232
ERROR_CONNECTION_ABORTED* = 1236
ERROR_TOO_MANY_OPEN_FILES* = 4
WSAEMFILE* = 10024
WSAENETDOWN* = 10050
WSAENETRESET* = 10052
WSAECONNABORTED* = 10053
WSAECONNRESET* = 10054
WSAENOBUFS* = 10055
WSAETIMEDOUT* = 10060
proc cancelIo*(hFile: Handle): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "CancelIo".}
proc connectNamedPipe*(hPipe: Handle, lpOverlapped: ptr OVERLAPPED): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "ConnectNamedPipe".}
proc disconnectNamedPipe*(hPipe: Handle): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "DisconnectNamedPipe".}
proc setNamedPipeHandleState*(hPipe: Handle, lpMode, lpMaxCollectionCount,
lpCollectDataTimeout: ptr DWORD): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "SetNamedPipeHandleState".}
proc resetEvent*(hEvent: Handle): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "ResetEvent".}
template getTransportTooManyError*(code: int = 0): ref TransportTooManyError =
let msg =
when defined(posix):
@ -664,11 +619,11 @@ template getTransportTooManyError*(code: int = 0): ref TransportTooManyError =
case code
of 0:
"Too many open transports"
of ERROR_TOO_MANY_OPEN_FILES:
of osdefs.ERROR_TOO_MANY_OPEN_FILES:
"[ERROR_TOO_MANY_OPEN_FILES] Too many open files"
of WSAENOBUFS:
of osdefs.WSAENOBUFS:
"[WSAENOBUFS] No buffer space available"
of WSAEMFILE:
of osdefs.WSAEMFILE:
"[WSAEMFILE] Too many open sockets"
else:
"[" & $code & "] Too many open transports"
@ -697,15 +652,15 @@ template getConnectionAbortedError*(code: int): ref TransportAbortedError =
"[" & $code & "] Connection has been aborted"
elif defined(windows):
case code
of 0, WSAECONNABORTED:
of 0, osdefs.WSAECONNABORTED:
"[ECONNABORTED] Connection has been aborted before being accepted"
of WSAENETDOWN:
of osdefs.WSAENETDOWN:
"[ENETDOWN] Network is down"
of WSAENETRESET:
of osdefs.WSAENETRESET:
"[ENETRESET] Network dropped connection on reset"
of WSAECONNRESET:
of osdefs.WSAECONNRESET:
"[ECONNRESET] Connection reset by peer"
of WSAETIMEDOUT:
of osdefs.WSAETIMEDOUT:
"[ETIMEDOUT] Connection timed out"
else:
"[" & $code & "] Connection has been aborted"

View File

@ -12,19 +12,10 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[net, nativesockets, os, deques]
import ".."/[selectors2, asyncloop, handles]
import ./common
when defined(windows) or defined(nimdoc):
import winlean
else:
import posix
when (defined(linux) and not defined(android)) and defined(amd64):
const IP_MULTICAST_TTL: cint = 33
else:
var IP_MULTICAST_TTL* {.importc: "IP_MULTICAST_TTL",
header: "<netinet/in.h>".}: cint
import std/deques
when not(defined(windows)): import ".."/selectors2
import ".."/[asyncloop, osdefs, handles]
import "."/common
type
VectorKind = enum
@ -61,9 +52,9 @@ type
when defined(windows):
rovl: CustomOverlapped # Reader OVERLAPPED structure
wovl: CustomOverlapped # Writer OVERLAPPED structure
rflag: int32 # Reader flags storage
rwsabuf: TWSABuf # Reader WSABUF structure
wwsabuf: TWSABuf # Writer WSABUF structure
rflag: uint32 # Reader flags storage
rwsabuf: WSABUF # Reader WSABUF structure
wwsabuf: WSABUF # Writer WSABUF structure
DgramTransportTracker* = ref object of TrackerBase
opened*: int64
@ -82,7 +73,7 @@ proc remoteAddress*(transp: DatagramTransport): TransportAddress {.
addr slen) != 0:
raiseTransportOsError(osLastError())
fromSAddr(addr saddr, slen, transp.remote)
result = transp.remote
transp.remote
proc localAddress*(transp: DatagramTransport): TransportAddress {.
raises: [Defect, TransportOsError].} =
@ -94,7 +85,7 @@ proc localAddress*(transp: DatagramTransport): TransportAddress {.
addr slen) != 0:
raiseTransportOsError(osLastError())
fromSAddr(addr saddr, slen, transp.local)
result = transp.local
transp.local
template setReadError(t, e: untyped) =
(t).state.incl(ReadError)
@ -104,18 +95,20 @@ proc setupDgramTransportTracker(): DgramTransportTracker {.
gcsafe, raises: [Defect].}
proc getDgramTransportTracker(): DgramTransportTracker {.inline.} =
result = cast[DgramTransportTracker](getTracker(DgramTransportTrackerName))
if isNil(result):
result = setupDgramTransportTracker()
var res = cast[DgramTransportTracker](getTracker(DgramTransportTrackerName))
if isNil(res):
res = setupDgramTransportTracker()
doAssert(not(isNil(res)))
res
proc dumpTransportTracking(): string {.gcsafe.} =
var tracker = getDgramTransportTracker()
result = "Opened transports: " & $tracker.opened & "\n" &
"Closed transports: " & $tracker.closed
"Opened transports: " & $tracker.opened & "\n" &
"Closed transports: " & $tracker.closed
proc leakTransport(): bool {.gcsafe.} =
var tracker = getDgramTransportTracker()
result = (tracker.opened != tracker.closed)
let tracker = getDgramTransportTracker()
tracker.opened != tracker.closed
proc trackDgram(t: DatagramTransport) {.inline.} =
var tracker = getDgramTransportTracker()
@ -126,45 +119,18 @@ proc untrackDgram(t: DatagramTransport) {.inline.} =
inc(tracker.closed)
proc setupDgramTransportTracker(): DgramTransportTracker {.gcsafe.} =
result = new DgramTransportTracker
result.opened = 0
result.closed = 0
result.dump = dumpTransportTracking
result.isLeaked = leakTransport
addTracker(DgramTransportTrackerName, result)
let res = DgramTransportTracker(
opened: 0, closed: 0, dump: dumpTransportTracking, isLeaked: leakTransport)
addTracker(DgramTransportTrackerName, res)
res
when defined(nimdoc):
proc newDatagramTransportCommon(cbproc: DatagramCallback,
remote: TransportAddress,
local: TransportAddress,
sock: AsyncFD,
flags: set[ServerFlags],
udata: pointer,
child: DatagramTransport,
bufferSize: int,
ttl: int): DatagramTransport {.
raises: [Defect, CatchableError].} =
discard
proc resumeRead(transp: DatagramTransport) {.inline.} =
discard
proc resumeWrite(transp: DatagramTransport) {.inline.} =
discard
elif defined(windows):
when defined(windows):
template setWriterWSABuffer(t, v: untyped) =
(t).wwsabuf.buf = cast[cstring](v.buf)
(t).wwsabuf.len = cast[int32](v.buflen)
const
IOC_VENDOR = DWORD(0x18000000)
SIO_UDP_CONNRESET = DWORD(winlean.IOC_IN) or IOC_VENDOR or DWORD(12)
IPPROTO_IP = DWORD(0)
IP_TTL = DWORD(4)
(t).wwsabuf.len = cast[ULONG](v.buflen)
proc writeDatagramLoop(udata: pointer) =
var bytesCount: int32
var bytesCount: uint32
var ovl = cast[PtrCustomOverlapped](udata)
var transp = cast[DatagramTransport](ovl.data.udata)
while len(transp.queue) > 0:
@ -176,7 +142,7 @@ elif defined(windows):
if err == OSErrorCode(-1):
if not(vector.writer.finished()):
vector.writer.complete()
elif int(err) == ERROR_OPERATION_ABORTED:
elif int(err) == osdefs.ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.incl(WritePaused)
if not(vector.writer.finished()):
@ -191,26 +157,26 @@ elif defined(windows):
let fd = SocketHandle(transp.fd)
var vector = transp.queue.popFirst()
transp.setWriterWSABuffer(vector)
var ret: cint
if vector.kind == WithAddress:
var fixedAddress = windowsAnyAddressFix(vector.address)
toSAddr(fixedAddress, transp.waddr, transp.walen)
ret = WSASendTo(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
DWORD(0), cast[ptr SockAddr](addr transp.waddr),
cint(transp.walen),
cast[POVERLAPPED](addr transp.wovl), nil)
else:
ret = WSASend(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
DWORD(0), cast[POVERLAPPED](addr transp.wovl), nil)
let ret =
if vector.kind == WithAddress:
var fixedAddress = windowsAnyAddressFix(vector.address)
toSAddr(fixedAddress, transp.waddr, transp.walen)
wsaSendTo(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
DWORD(0), cast[ptr SockAddr](addr transp.waddr),
cint(transp.walen),
cast[POVERLAPPED](addr transp.wovl), nil)
else:
wsaSend(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
DWORD(0), cast[POVERLAPPED](addr transp.wovl), nil)
if ret != 0:
let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED:
if int(err) == osdefs.ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(WritePending)
transp.state.incl(WritePaused)
if not(vector.writer.finished()):
vector.writer.complete()
elif int(err) == ERROR_IO_PENDING:
elif int(err) == osdefs.ERROR_IO_PENDING:
transp.queue.addFirst(vector)
else:
transp.state.excl(WritePending)
@ -226,7 +192,7 @@ elif defined(windows):
proc readDatagramLoop(udata: pointer) =
var
bytesCount: int32
bytesCount: uint32
raddr: TransportAddress
var ovl = cast[PtrCustomOverlapped](udata)
var transp = cast[DatagramTransport](ovl.data.udata)
@ -240,9 +206,9 @@ elif defined(windows):
if bytesCount == 0:
transp.state.incl({ReadEof, ReadPaused})
fromSAddr(addr transp.raddr, transp.ralen, raddr)
transp.buflen = bytesCount
asyncCheck transp.function(transp, raddr)
elif int(err) == ERROR_OPERATION_ABORTED:
transp.buflen = int(bytesCount)
asyncSpawn transp.function(transp, raddr)
elif int(err) == osdefs.ERROR_OPERATION_ABORTED:
# CancelIO() interrupt or closeSocket() call.
transp.state.incl(ReadPaused)
if ReadClosed in transp.state and not(transp.future.finished()):
@ -256,7 +222,7 @@ elif defined(windows):
transp.setReadError(err)
transp.state.incl(ReadPaused)
transp.buflen = 0
asyncCheck transp.function(transp, raddr)
asyncSpawn transp.function(transp, raddr)
else:
## Initiation
if transp.state * {ReadEof, ReadClosed, ReadError} == {}:
@ -264,29 +230,29 @@ elif defined(windows):
let fd = SocketHandle(transp.fd)
transp.rflag = 0
transp.ralen = SockLen(sizeof(Sockaddr_storage))
let ret = WSARecvFrom(fd, addr transp.rwsabuf, DWORD(1),
let ret = wsaRecvFrom(fd, addr transp.rwsabuf, DWORD(1),
addr bytesCount, addr transp.rflag,
cast[ptr SockAddr](addr transp.raddr),
cast[ptr cint](addr transp.ralen),
cast[POVERLAPPED](addr transp.rovl), nil)
if ret != 0:
let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED:
if int(err) == osdefs.ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(ReadPending)
transp.state.incl(ReadPaused)
elif int(err) == common.WSAECONNRESET:
elif int(err) == osdefs.WSAECONNRESET:
transp.state.excl(ReadPending)
transp.state.incl({ReadPaused, ReadEof})
break
elif int(err) == ERROR_IO_PENDING:
elif int(err) == osdefs.ERROR_IO_PENDING:
discard
else:
transp.state.excl(ReadPending)
transp.state.incl(ReadPaused)
transp.setReadError(err)
transp.buflen = 0
asyncCheck transp.function(transp, raddr)
asyncSpawn transp.function(transp, raddr)
else:
# Transport closure happens in callback, and we not started new
# WSARecvFrom session.
@ -297,13 +263,17 @@ elif defined(windows):
GC_unref(transp)
break
proc resumeRead(transp: DatagramTransport) {.inline.} =
transp.state.excl(ReadPaused)
readDatagramLoop(cast[pointer](addr transp.rovl))
proc resumeRead(transp: DatagramTransport): Result[void, OSErrorCode] =
if ReadPaused in transp.state:
transp.state.excl(ReadPaused)
readDatagramLoop(cast[pointer](addr transp.rovl))
ok()
proc resumeWrite(transp: DatagramTransport) {.inline.} =
transp.state.excl(WritePaused)
writeDatagramLoop(cast[pointer](addr transp.wovl))
proc resumeWrite(transp: DatagramTransport): Result[void, OSErrorCode] =
if WritePaused in transp.state:
transp.state.excl(WritePaused)
writeDatagramLoop(cast[pointer](addr transp.wovl))
ok()
proc newDatagramTransportCommon(cbproc: DatagramCallback,
remote: TransportAddress,
@ -314,16 +284,13 @@ elif defined(windows):
child: DatagramTransport,
bufferSize: int,
ttl: int): DatagramTransport {.
raises: [Defect, CatchableError].} =
raises: [Defect, TransportOsError].} =
var localSock: AsyncFD
doAssert(remote.family == local.family)
doAssert(not isNil(cbproc))
doAssert(remote.family in {AddressFamily.IPv4, AddressFamily.IPv6})
if isNil(child):
result = DatagramTransport()
else:
result = child
var res = if isNil(child): DatagramTransport() else: child
if sock == asyncInvalidSocket:
localSock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM,
@ -335,25 +302,27 @@ elif defined(windows):
if not setSocketBlocking(SocketHandle(sock), false):
raiseTransportOsError(osLastError())
localSock = sock
register(localSock)
let bres = register2(localSock)
if bres.isErr():
raiseTransportOsError(bres.error())
## Apply ServerFlags here
if ServerFlags.ReuseAddr in flags:
if not setSockOpt(localSock, SOL_SOCKET, SO_REUSEADDR, 1):
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEADDR, 1):
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
if ServerFlags.Broadcast in flags:
if not setSockOpt(localSock, SOL_SOCKET, SO_BROADCAST, 1):
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_BROADCAST, 1):
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
if ttl > 0:
if not setSockOpt(localSock, IPPROTO_IP, IP_TTL, DWORD(ttl)):
if not setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IP_TTL, ttl):
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
@ -362,7 +331,7 @@ elif defined(windows):
## Fix for Q263823.
var bytesRet: DWORD
var bval = WINBOOL(0)
if WSAIoctl(SocketHandle(localSock), SIO_UDP_CONNRESET, addr bval,
if wsaIoctl(SocketHandle(localSock), osdefs.SIO_UDP_CONNRESET, addr bval,
sizeof(WINBOOL).DWORD, nil, DWORD(0),
addr bytesRet, nil, nil) != 0:
raiseTransportOsError(osLastError())
@ -372,8 +341,8 @@ elif defined(windows):
var slen: SockLen
toSAddr(local, saddr, slen)
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
if bindSocket(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
@ -382,8 +351,8 @@ elif defined(windows):
var saddr: Sockaddr_storage
var slen: SockLen
saddr.ss_family = type(saddr.ss_family)(local.getDomain())
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
if bindSocket(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
@ -400,28 +369,28 @@ elif defined(windows):
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
result.remote = fixedAddress
res.remote = fixedAddress
result.fd = localSock
result.function = cbproc
result.buffer = newSeq[byte](bufferSize)
result.queue = initDeque[GramVector]()
result.udata = udata
result.state = {WritePaused}
result.future = newFuture[void]("datagram.transport")
result.rovl.data = CompletionData(cb: readDatagramLoop,
udata: cast[pointer](result))
result.wovl.data = CompletionData(cb: writeDatagramLoop,
udata: cast[pointer](result))
result.rwsabuf = TWSABuf(buf: cast[cstring](addr result.buffer[0]),
len: int32(len(result.buffer)))
GC_ref(result)
res.fd = localSock
res.function = cbproc
res.buffer = newSeq[byte](bufferSize)
res.queue = initDeque[GramVector]()
res.udata = udata
res.state = {ReadPaused, WritePaused}
res.future = newFuture[void]("datagram.transport")
res.rovl.data = CompletionData(cb: readDatagramLoop,
udata: cast[pointer](res))
res.wovl.data = CompletionData(cb: writeDatagramLoop,
udata: cast[pointer](res))
res.rwsabuf = WSABUF(buf: cast[cstring](addr res.buffer[0]),
len: ULONG(len(res.buffer)))
GC_ref(res)
# Start tracking transport
trackDgram(result)
trackDgram(res)
if NoAutoRead notin flags:
result.resumeRead()
else:
result.state.incl(ReadPaused)
let rres = res.resumeRead()
if rres.isErr(): raiseTransportOsError(rres.error())
res
else:
# Linux/BSD/MacOS part
@ -440,14 +409,14 @@ else:
else:
while true:
transp.ralen = SockLen(sizeof(Sockaddr_storage))
var res = posix.recvfrom(fd, addr transp.buffer[0],
cint(len(transp.buffer)), cint(0),
cast[ptr SockAddr](addr transp.raddr),
addr transp.ralen)
var res = osdefs.recvfrom(fd, addr transp.buffer[0],
cint(len(transp.buffer)), cint(0),
cast[ptr SockAddr](addr transp.raddr),
addr transp.ralen)
if res >= 0:
fromSAddr(addr transp.raddr, transp.ralen, raddr)
transp.buflen = res
asyncCheck transp.function(transp, raddr)
asyncSpawn transp.function(transp, raddr)
else:
let err = osLastError()
if int(err) == EINTR:
@ -455,7 +424,7 @@ else:
else:
transp.buflen = 0
transp.setReadError(err)
asyncCheck transp.function(transp, raddr)
asyncSpawn transp.function(transp, raddr)
break
proc writeDatagramLoop(udata: pointer) =
@ -475,11 +444,11 @@ else:
while true:
if vector.kind == WithAddress:
toSAddr(vector.address, transp.waddr, transp.walen)
res = posix.sendto(fd, vector.buf, vector.buflen, MSG_NOSIGNAL,
cast[ptr SockAddr](addr transp.waddr),
transp.walen)
res = osdefs.sendto(fd, vector.buf, vector.buflen, MSG_NOSIGNAL,
cast[ptr SockAddr](addr transp.waddr),
transp.walen)
elif vector.kind == WithoutAddress:
res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL)
res = osdefs.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL)
if res >= 0:
if not(vector.writer.finished()):
vector.writer.complete()
@ -492,31 +461,20 @@ else:
vector.writer.fail(getTransportOsError(err))
break
else:
transp.state.incl(WritePaused)
try:
transp.fd.removeWriter()
except IOSelectorsException as exc:
raiseAsDefect exc, "removeWriter"
except ValueError as exc:
raiseAsDefect exc, "removeWriter"
transp.state.incl({WritePaused})
discard removeWriter2(transp.fd)
proc resumeWrite(transp: DatagramTransport) {.inline.} =
transp.state.excl(WritePaused)
try:
addWriter(transp.fd, writeDatagramLoop, cast[pointer](transp))
except IOSelectorsException as exc:
raiseAsDefect exc, "addWriter"
except ValueError as exc:
raiseAsDefect exc, "addWriter"
proc resumeWrite(transp: DatagramTransport): Result[void, OSErrorCode] =
if WritePaused in transp.state:
? addWriter2(transp.fd, writeDatagramLoop, cast[pointer](transp))
transp.state.excl(WritePaused)
ok()
proc resumeRead(transp: DatagramTransport) {.inline.} =
transp.state.excl(ReadPaused)
try:
addReader(transp.fd, readDatagramLoop, cast[pointer](transp))
except IOSelectorsException as exc:
raiseAsDefect exc, "addReader"
except ValueError as exc:
raiseAsDefect exc, "addReader"
proc resumeRead(transp: DatagramTransport): Result[void, OSErrorCode] =
if ReadPaused in transp.state:
? addReader2(transp.fd, readDatagramLoop, cast[pointer](transp))
transp.state.excl(ReadPaused)
ok()
proc newDatagramTransportCommon(cbproc: DatagramCallback,
remote: TransportAddress,
@ -527,15 +485,12 @@ else:
child: DatagramTransport,
bufferSize: int,
ttl: int): DatagramTransport {.
raises: [Defect, CatchableError].} =
raises: [Defect, TransportOsError].} =
var localSock: AsyncFD
doAssert(remote.family == local.family)
doAssert(not isNil(cbproc))
if isNil(child):
result = DatagramTransport()
else:
result = child
var res = if isNil(child): DatagramTransport() else: child
if sock == asyncInvalidSocket:
var proto = Protocol.IPPROTO_UDP
@ -551,32 +506,37 @@ else:
if not setSocketBlocking(SocketHandle(sock), false):
raiseTransportOsError(osLastError())
localSock = sock
register(localSock)
let bres = register2(localSock)
if bres.isErr():
raiseTransportOsError(bres.error())
## Apply ServerFlags here
if ServerFlags.ReuseAddr in flags:
if not setSockOpt(localSock, SOL_SOCKET, SO_REUSEADDR, 1):
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEADDR, 1):
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
if ServerFlags.Broadcast in flags:
if not setSockOpt(localSock, SOL_SOCKET, SO_BROADCAST, 1):
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_BROADCAST, 1):
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
if ttl > 0:
var res: bool
if local.family == AddressFamily.IPv4:
res = setSockOpt(localSock, posix.IPPROTO_IP, IP_MULTICAST_TTL,
cint(ttl))
elif local.family == AddressFamily.IPv6:
res = setSockOpt(localSock, posix.IPPROTO_IP, IPV6_MULTICAST_HOPS,
cint(ttl))
if not res:
let tres =
if local.family == AddressFamily.IPv4:
setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IP_MULTICAST_TTL,
cint(ttl))
elif local.family == AddressFamily.IPv6:
setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IPV6_MULTICAST_HOPS,
cint(ttl))
else:
raiseAssert "Unsupported address bound to local socket"
if not tres:
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
@ -586,8 +546,8 @@ else:
var saddr: Sockaddr_storage
var slen: SockLen
toSAddr(local, saddr, slen)
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
if bindSocket(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
let err = osLastError()
if sock == asyncInvalidSocket:
closeSocket(localSock)
@ -603,23 +563,23 @@ else:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
result.remote = remote
res.remote = remote
result.fd = localSock
result.function = cbproc
result.flags = flags
result.buffer = newSeq[byte](bufferSize)
result.queue = initDeque[GramVector]()
result.udata = udata
result.state = {WritePaused}
result.future = newFuture[void]("datagram.transport")
GC_ref(result)
res.fd = localSock
res.function = cbproc
res.flags = flags
res.buffer = newSeq[byte](bufferSize)
res.queue = initDeque[GramVector]()
res.udata = udata
res.state = {ReadPaused, WritePaused}
res.future = newFuture[void]("datagram.transport")
GC_ref(res)
# Start tracking transport
trackDgram(result)
trackDgram(res)
if NoAutoRead notin flags:
result.resumeRead()
else:
result.state.incl(ReadPaused)
let rres = res.resumeRead()
if rres.isErr(): raiseTransportOsError(rres.error())
res
proc close*(transp: DatagramTransport) =
## Closes and frees resources of transport ``transp``.
@ -656,7 +616,7 @@ proc newDatagramTransport*(cbproc: DatagramCallback,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0
): DatagramTransport {.
raises: [Defect, CatchableError].} =
raises: [Defect, TransportOsError].} =
## Create new UDP datagram transport (IPv4).
##
## ``cbproc`` - callback which will be called, when new datagram received.
@ -669,8 +629,8 @@ proc newDatagramTransport*(cbproc: DatagramCallback,
## ``bufSize`` - size of internal buffer.
## ``ttl`` - TTL for UDP datagram packet (only usable when flags has
## ``Broadcast`` option).
result = newDatagramTransportCommon(cbproc, remote, local, sock,
flags, udata, child, bufSize, ttl)
newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child,
bufSize, ttl)
proc newDatagramTransport*[T](cbproc: DatagramCallback,
udata: ref T,
@ -682,12 +642,11 @@ proc newDatagramTransport*[T](cbproc: DatagramCallback,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0
): DatagramTransport {.
raises: [Defect, CatchableError].} =
raises: [Defect, TransportOsError].} =
var fflags = flags + {GCUserData}
GC_ref(udata)
result = newDatagramTransportCommon(cbproc, remote, local, sock,
fflags, cast[pointer](udata),
child, bufSize, ttl)
newDatagramTransportCommon(cbproc, remote, local, sock, fflags,
cast[pointer](udata), child, bufSize, ttl)
proc newDatagramTransport6*(cbproc: DatagramCallback,
remote: TransportAddress = AnyAddress6,
@ -699,7 +658,7 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0
): DatagramTransport {.
raises: [Defect, CatchableError].} =
raises: [Defect, TransportOsError].} =
## Create new UDP datagram transport (IPv6).
##
## ``cbproc`` - callback which will be called, when new datagram received.
@ -712,8 +671,8 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
## ``bufSize`` - size of internal buffer.
## ``ttl`` - TTL for UDP datagram packet (only usable when flags has
## ``Broadcast`` option).
result = newDatagramTransportCommon(cbproc, remote, local, sock,
flags, udata, child, bufSize, ttl)
newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child,
bufSize, ttl)
proc newDatagramTransport6*[T](cbproc: DatagramCallback,
udata: ref T,
@ -725,12 +684,11 @@ proc newDatagramTransport6*[T](cbproc: DatagramCallback,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0
): DatagramTransport {.
raises: [Defect, CatchableError].} =
raises: [Defect, TransportOsError].} =
var fflags = flags + {GCUserData}
GC_ref(udata)
result = newDatagramTransportCommon(cbproc, remote, local, sock,
fflags, cast[pointer](udata),
child, bufSize, ttl)
newDatagramTransportCommon(cbproc, remote, local, sock, fflags,
cast[pointer](udata), child, bufSize, ttl)
proc join*(transp: DatagramTransport): Future[void] =
## Wait until the transport ``transp`` will be closed.
@ -753,7 +711,7 @@ proc join*(transp: DatagramTransport): Future[void] =
proc closeWait*(transp: DatagramTransport): Future[void] =
## Close transport ``transp`` and release all resources.
transp.close()
result = transp.join()
transp.join()
proc send*(transp: DatagramTransport, pbytes: pointer,
nbytes: int): Future[void] =
@ -768,7 +726,9 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
writer: retFuture)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
let wres = transp.resumeWrite()
if wres.isErr():
retFuture.fail(getTransportOsError(wres.error()))
return retFuture
proc send*(transp: DatagramTransport, msg: sink string,
@ -790,7 +750,9 @@ proc send*(transp: DatagramTransport, msg: sink string,
writer: cast[Future[void]](retFuture))
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
let wres = transp.resumeWrite()
if wres.isErr():
retFuture.fail(getTransportOsError(wres.error()))
return retFuture
proc send*[T](transp: DatagramTransport, msg: sink seq[T],
@ -812,7 +774,9 @@ proc send*[T](transp: DatagramTransport, msg: sink seq[T],
writer: cast[Future[void]](retFuture))
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
let wres = transp.resumeWrite()
if wres.isErr():
retFuture.fail(getTransportOsError(wres.error()))
return retFuture
proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
@ -825,7 +789,9 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
writer: retFuture, address: remote)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
let wres = transp.resumeWrite()
if wres.isErr():
retFuture.fail(getTransportOsError(wres.error()))
return retFuture
proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
@ -848,7 +814,9 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
address: remote)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
let wres = transp.resumeWrite()
if wres.isErr():
retFuture.fail(getTransportOsError(wres.error()))
return retFuture
proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
@ -871,7 +839,9 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
address: remote)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
let wres = transp.resumeWrite()
if wres.isErr():
retFuture.fail(getTransportOsError(wres.error()))
return retFuture
proc peekMessage*(transp: DatagramTransport, msg: var seq[byte],
@ -889,17 +859,21 @@ proc peekMessage*(transp: DatagramTransport, msg: var seq[byte],
proc getMessage*(transp: DatagramTransport): seq[byte] {.
raises: [Defect, CatchableError].} =
## Copy data from internal message buffer and return result.
var default: seq[byte]
if ReadError in transp.state:
transp.state.excl(ReadError)
raise transp.getError()
if transp.buflen > 0:
result = newSeq[byte](transp.buflen)
copyMem(addr result[0], addr transp.buffer[0], transp.buflen)
var res = newSeq[byte](transp.buflen)
copyMem(addr res[0], addr transp.buffer[0], transp.buflen)
res
else:
default
proc getUserData*[T](transp: DatagramTransport): T {.inline.} =
## Obtain user data stored in ``transp`` object.
result = cast[T](transp.udata)
cast[T](transp.udata)
proc closed*(transp: DatagramTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.
result = ({ReadClosed, WriteClosed} * transp.state != {})
{ReadClosed, WriteClosed} * transp.state != {}

View File

@ -17,11 +17,16 @@ else:
import std/algorithm
from std/strutils import toHex
import ./ipnet
import ".."/osdefs
import "."/ipnet
export ipnet
const
MaxAdapterAddressLength* = 8
MaxAdapterAddressLength* =
when defined(windows):
MAX_ADAPTER_ADDRESS_LENGTH
else:
8
type
InterfaceType* = enum
@ -344,169 +349,7 @@ proc cmp*(a, b: NetworkInterface): int =
cmp(a.ifIndex, b.ifIndex)
when defined(linux):
import posix
const
AF_NETLINK = cint(16)
AF_PACKET = cint(17)
NETLINK_ROUTE = cint(0)
NLMSG_ALIGNTO = 4'u
RTA_ALIGNTO = 4'u
# RTA_UNSPEC = 0'u16
RTA_DST = 1'u16
# RTA_SRC = 2'u16
# RTA_IIF = 3'u16
RTA_OIF = 4'u16
RTA_GATEWAY = 5'u16
# RTA_PRIORITY = 6'u16
RTA_PREFSRC = 7'u16
# RTA_METRICS = 8'u16
RTM_F_LOOKUP_TABLE = 0x1000
RTM_GETLINK = 18
RTM_GETADDR = 22
RTM_GETROUTE = 26
NLM_F_REQUEST = 1
NLM_F_ROOT = 0x100
NLM_F_MATCH = 0x200
NLM_F_DUMP = NLM_F_ROOT or NLM_F_MATCH
IFLIST_REPLY_BUFFER = 8192
InvalidSocketHandle = SocketHandle(-1)
NLMSG_DONE = 0x03
# NLMSG_MIN_TYPE = 0x10
NLMSG_ERROR = 0x02
# MSG_TRUNC = 0x20
IFLA_ADDRESS = 1
IFLA_IFNAME = 3
IFLA_MTU = 4
IFLA_OPERSTATE = 16
IFA_ADDRESS = 1
IFA_LOCAL = 2
# IFA_BROADCAST = 4
# ARPHRD_NETROM = 0
ARPHRD_ETHER = 1
ARPHRD_EETHER = 2
# ARPHRD_AX25 = 3
# ARPHRD_PRONET = 4
# ARPHRD_CHAOS = 5
# ARPHRD_IEEE802 = 6
ARPHRD_ARCNET = 7
# ARPHRD_APPLETLK = 8
# ARPHRD_DLCI = 15
ARPHRD_ATM = 19
# ARPHRD_METRICOM = 23
ARPHRD_IEEE1394 = 24
# ARPHRD_EUI64 = 27
# ARPHRD_INFINIBAND = 32
ARPHRD_SLIP = 256
ARPHRD_CSLIP = 257
ARPHRD_SLIP6 = 258
ARPHRD_CSLIP6 = 259
# ARPHRD_RSRVD = 260
# ARPHRD_ADAPT = 264
# ARPHRD_ROSE = 270
# ARPHRD_X25 = 271
# ARPHRD_HWX25 = 272
# ARPHRD_CAN = 280
ARPHRD_PPP = 512
ARPHRD_CISCO = 513
ARPHRD_HDLC = ARPHRD_CISCO
ARPHRD_LAPB = 516
# ARPHRD_DDCMP = 517
# ARPHRD_RAWHDLC = 518
# ARPHRD_TUNNEL = 768
# ARPHRD_TUNNEL6 = 769
ARPHRD_FRAD = 770
# ARPHRD_SKIP = 771
ARPHRD_LOOPBACK = 772
# ARPHRD_LOCALTLK = 773
# ARPHRD_FDDI = 774
# ARPHRD_BIF = 775
# ARPHRD_SIT = 776
# ARPHRD_IPDDP = 777
# ARPHRD_IPGRE = 778
# ARPHRD_PIMREG = 779
ARPHRD_HIPPI = 780
# ARPHRD_ASH = 781
# ARPHRD_ECONET = 782
# ARPHRD_IRDA = 783
# ARPHRD_FCPP = 784
# ARPHRD_FCAL = 785
# ARPHRD_FCPL = 786
# ARPHRD_FCFABRIC = 787
# ARPHRD_IEEE802_TR = 800
ARPHRD_IEEE80211 = 801
ARPHRD_IEEE80211_PRISM = 802
ARPHRD_IEEE80211_RADIOTAP = 803
# ARPHRD_IEEE802154 = 804
# ARPHRD_IEEE802154_MONITOR = 805
# ARPHRD_PHONET = 820
# ARPHRD_PHONET_PIPE = 821
# ARPHRD_CAIF = 822
# ARPHRD_IP6GRE = 823
# ARPHRD_NETLINK = 824
# ARPHRD_6LOWPAN = 825
# ARPHRD_VOID = 0xFFFF
# ARPHRD_NONE = 0xFFFE
type
Sockaddr_nl = object
family: cushort
pad: cushort
pid: uint32
groups: uint32
NlMsgHeader = object
nlmsg_len: uint32
nlmsg_type: uint16
nlmsg_flags: uint16
nlmsg_seq: uint32
nlmsg_pid: uint32
IfInfoMessage = object
ifi_family: byte
ifi_pad: byte
ifi_type: cushort
ifi_index: cint
ifi_flags: cuint
ifi_change: cuint
IfAddrMessage = object
ifa_family: byte
ifa_prefixlen: byte
ifa_flags: byte
ifa_scope: byte
ifa_index: uint32
RtMessage = object
rtm_family: byte
rtm_dst_len: byte
rtm_src_len: byte
rtm_tos: byte
rtm_table: byte
rtm_protocol: byte
rtm_scope: byte
rtm_type: byte
rtm_flags: cuint
RtAttr = object
rta_len: cushort
rta_type: cushort
RtGenMsg = object
rtgen_family: byte
NLReq = object
hdr: NlMsgHeader
msg: RtGenMsg
NLRouteReq = object
hdr: NlMsgHeader
msg: RtMessage
import ".."/osutils
template NLMSG_ALIGN(length: uint): uint =
(length + NLMSG_ALIGNTO - 1) and not(NLMSG_ALIGNTO - 1)
@ -621,11 +464,11 @@ when defined(linux):
address.family = cushort(AF_NETLINK)
address.groups = 0
address.pid = cast[uint32](pid)
var res = posix.socket(AF_NETLINK, posix.SOCK_DGRAM, NETLINK_ROUTE)
var res = osdefs.socket(AF_NETLINK, osdefs.SOCK_DGRAM, NETLINK_ROUTE)
if res != SocketHandle(-1):
if posix.bindSocket(res, cast[ptr SockAddr](addr address),
if osdefs.bindSocket(res, cast[ptr SockAddr](addr address),
SockLen(sizeof(Sockaddr_nl))) != 0:
discard posix.close(res)
discard osdefs.close(res)
res = SocketHandle(-1)
res
@ -652,7 +495,7 @@ when defined(linux):
rmsg.msg_iovlen = 1
rmsg.msg_name = cast[pointer](addr address)
rmsg.msg_namelen = SockLen(sizeof(Sockaddr_nl))
let res = posix.sendmsg(fd, addr rmsg, 0).TIovLen
let res = osdefs.sendmsg(fd, addr rmsg, 0).TIovLen
(res == iov.iov_len)
proc sendRouteMessage(fd: SocketHandle, pid: Pid, seqno: uint32,
@ -680,14 +523,14 @@ when defined(linux):
req.msg.rtm_flags = RTM_F_LOOKUP_TABLE
attr.rta_type = RTA_DST
if dest.family == AddressFamily.IPv4:
req.msg.rtm_family = byte(posix.AF_INET)
req.msg.rtm_family = byte(osdefs.AF_INET)
attr.rta_len = cast[cushort](RTA_LENGTH(4))
copyMem(RTA_DATA(attr), cast[ptr byte](unsafeAddr dest.address_v4[0]), 4)
req.hdr.nlmsg_len = uint32(NLMSG_ALIGN(uint(req.hdr.nlmsg_len)) +
RTA_ALIGN(uint(attr.rta_len)))
req.msg.rtm_dst_len = 4 * 8
elif dest.family == AddressFamily.IPv6:
req.msg.rtm_family = byte(posix.AF_INET6)
req.msg.rtm_family = byte(osdefs.AF_INET6)
attr.rta_len = cast[cushort](RTA_LENGTH(16))
copyMem(RTA_DATA(attr), cast[ptr byte](unsafeAddr dest.address_v6[0]), 16)
req.hdr.nlmsg_len = uint32(NLMSG_ALIGN(uint(req.hdr.nlmsg_len)) +
@ -700,7 +543,7 @@ when defined(linux):
rmsg.msg_iovlen = 1
rmsg.msg_name = cast[pointer](addr address)
rmsg.msg_namelen = SockLen(sizeof(Sockaddr_nl))
let res = posix.sendmsg(fd, addr rmsg, 0).TIovLen
let res = osdefs.sendmsg(fd, addr rmsg, 0).TIovLen
(res == iov.iov_len)
proc readNetlinkMessage(fd: SocketHandle, data: var seq[byte]): bool =
@ -715,7 +558,7 @@ when defined(linux):
rmsg.msg_iovlen = 1
rmsg.msg_name = cast[pointer](addr address)
rmsg.msg_namelen = SockLen(sizeof(Sockaddr_nl))
var length = posix.recvmsg(fd, addr rmsg, 0)
var length = osdefs.recvmsg(fd, addr rmsg, 0)
if length >= 0:
data.setLen(length)
true
@ -757,11 +600,11 @@ when defined(linux):
res
proc getAddress(f: int, p: pointer): TransportAddress =
if f == posix.AF_INET:
if f == osdefs.AF_INET:
var res = TransportAddress(family: AddressFamily.IPv4)
copyMem(addr res.address_v4[0], p, len(res.address_v4))
res
elif f == posix.AF_INET6:
elif f == osdefs.AF_INET6:
var res = TransportAddress(family: AddressFamily.IPv6)
copyMem(addr res.address_v6[0], p, len(res.address_v6))
res
@ -910,7 +753,7 @@ when defined(linux):
proc getInterfaces*(): seq[NetworkInterface] {.raises: [Defect].} =
## Return list of available interfaces.
var res: seq[NetworkInterface]
var pid = posix.getpid()
var pid = osdefs.getpid()
var sock = createNetlinkSocket(pid)
if sock == InvalidSocketHandle:
return res
@ -918,134 +761,23 @@ when defined(linux):
res = getLinks(sock, pid)
getAddresses(sock, pid, res)
sort(res, cmp)
discard posix.close(sock)
discard osdefs.close(sock)
res
proc getBestRoute*(address: TransportAddress): Route {.raises: [Defect].} =
## Return best applicable OS route, which will be used for connecting to
## address ``address``.
var pid = posix.getpid()
var pid = osdefs.getpid()
var res = Route()
var sock = createNetlinkSocket(pid)
if sock == InvalidSocketHandle:
res
else:
res = getRoute(sock, pid, address)
discard posix.close(sock)
discard osdefs.close(sock)
res
elif defined(macosx) or defined(bsd):
import posix
const
AF_LINK = 18
IFF_UP = 0x01
IFF_RUNNING = 0x40
PF_ROUTE = cint(17)
RTM_GET = 0x04'u8
RTF_UP = 0x01
RTF_GATEWAY = 0x02
RTM_VERSION = 5'u8
RTA_DST = 0x01
RTA_GATEWAY = 0x02
type
IfAddrs {.importc: "struct ifaddrs", header: "<ifaddrs.h>",
pure, final.} = object
ifa_next {.importc: "ifa_next".}: ptr IfAddrs
ifa_name {.importc: "ifa_name".}: ptr cchar
ifa_flags {.importc: "ifa_flags".}: cuint
ifa_addr {.importc: "ifa_addr".}: ptr SockAddr
ifa_netmask {.importc: "ifa_netmask".}: ptr SockAddr
ifa_dstaddr {.importc: "ifa_dstaddr".}: ptr SockAddr
ifa_data {.importc: "ifa_data".}: pointer
PIfAddrs = ptr IfAddrs
IfData {.importc: "struct if_data", header: "<net/if.h>",
pure, final.} = object
ifi_type {.importc: "ifi_type".}: byte
ifi_typelen {.importc: "ifi_typelen".}: byte
ifi_physical {.importc: "ifi_physical".}: byte
ifi_addrlen {.importc: "ifi_addrlen".}: byte
ifi_hdrlen {.importc: "ifi_hdrlen".}: byte
ifi_recvquota {.importc: "ifi_recvquota".}: byte
ifi_xmitquota {.importc: "ifi_xmitquota".}: byte
ifi_unused1 {.importc: "ifi_unused1".}: byte
ifi_mtu {.importc: "ifi_mtu".}: uint32
ifi_metric {.importc: "ifi_metric".}: uint32
ifi_baudrate {.importc: "ifi_baudrate".}: uint32
ifi_ipackets {.importc: "ifi_ipackets".}: uint32
ifi_ierrors {.importc: "ifi_ierrors".}: uint32
ifi_opackets {.importc: "ifi_opackets".}: uint32
ifi_oerrors {.importc: "ifi_oerrors".}: uint32
ifi_collisions {.importc: "ifi_collisions".}: uint32
ifi_ibytes {.importc: "ifi_ibytes".}: uint32
ifi_obytes {.importc: "ifi_obytes".}: uint32
ifi_imcasts {.importc: "ifi_imcasts".}: uint32
ifi_omcasts {.importc: "ifi_omcasts".}: uint32
ifi_iqdrops {.importc: "ifi_iqdrops".}: uint32
ifi_noproto {.importc: "ifi_noproto".}: uint32
ifi_recvtiming {.importc: "ifi_recvtiming".}: uint32
ifi_xmittiming {.importc: "ifi_xmittiming".}: uint32
ifi_lastchange {.importc: "ifi_lastchange".}: Timeval
ifi_unused2 {.importc: "ifi_unused2".}: uint32
ifi_hwassist {.importc: "ifi_hwassist".}: uint32
ifi_reserved1 {.importc: "ifi_reserved1".}: uint32
ifi_reserved2 {.importc: "ifi_reserved2".}: uint32
Sockaddr_dl = object
sdl_len: byte
sdl_family: byte
sdl_index: uint16
sdl_type: byte
sdl_nlen: byte
sdl_alen: byte
sdl_slen: byte
sdl_data: array[12, byte]
RtMetrics = object
rmx_locks: uint32
rmx_mtu: uint32
rmx_hopcount: uint32
rmx_expire: int32
rmx_recvpipe: uint32
rmx_sendpipe: uint32
rmx_ssthresh: uint32
rmx_rtt: uint32
rmx_rttvar: uint32
rmx_pksent: uint32
rmx_state: uint32
rmx_filler: array[3, uint32]
RtMsgHeader = object
rtm_msglen: uint16
rtm_version: byte
rtm_type: byte
rtm_index: uint16
rtm_flags: cint
rtm_addrs: cint
rtm_pid: Pid
rtm_seq: cint
rtm_errno: cint
rtm_use: cint
rtm_inits: uint32
rtm_rmx: RtMetrics
RtMessage = object
rtm: RtMsgHeader
space: array[512, byte]
proc getIfAddrs(ifap: ptr PIfAddrs): cint {.importc: "getifaddrs",
header: """#include <sys/types.h>
#include <sys/socket.h>
#include <ifaddrs.h>""".}
proc freeIfAddrs(ifap: ptr IfAddrs) {.importc: "freeifaddrs",
header: """#include <sys/types.h>
#include <sys/socket.h>
#include <ifaddrs.h>""".}
elif defined(macosx) or defined(macos) or defined(bsd):
proc toInterfaceType(f: byte): InterfaceType =
var ft = int(f)
@ -1070,7 +802,7 @@ elif defined(macosx) or defined(bsd):
var iface: NetworkInterface
var ifaddress: InterfaceAddress
iface.name = string($cstring(ifap.ifa_name))
iface.name = $cast[cstring](ifap.ifa_name)
iface.flags = uint64(ifap.ifa_flags)
var i = 0
while i < len(res):
@ -1093,20 +825,20 @@ elif defined(macosx) or defined(bsd):
res[i].maclen = int(link.sdl_alen)
res[i].ifType = toInterfaceType(data.ifi_type)
res[i].state = toInterfaceState(ifap.ifa_flags)
res[i].mtu = int(data.ifi_mtu)
elif family == posix.AF_INET:
res[i].mtu = cast[int](data.ifi_mtu)
elif family == osdefs.AF_INET:
fromSAddr(cast[ptr Sockaddr_storage](ifap.ifa_addr),
SockLen(sizeof(Sockaddr_in)), ifaddress.host)
elif family == posix.AF_INET6:
elif family == osdefs.AF_INET6:
fromSAddr(cast[ptr Sockaddr_storage](ifap.ifa_addr),
SockLen(sizeof(Sockaddr_in6)), ifaddress.host)
if not isNil(ifap.ifa_netmask):
var na: TransportAddress
var family = cint(ifap.ifa_netmask.sa_family)
if family == posix.AF_INET:
var family = cast[cint](ifap.ifa_netmask.sa_family)
if family == osdefs.AF_INET:
fromSAddr(cast[ptr Sockaddr_storage](ifap.ifa_netmask),
SockLen(sizeof(Sockaddr_in)), na)
elif family == posix.AF_INET6:
elif family == osdefs.AF_INET6:
fromSAddr(cast[ptr Sockaddr_storage](ifap.ifa_netmask),
SockLen(sizeof(Sockaddr_in6)), na)
ifaddress.net = IpNet.init(ifaddress.host, na)
@ -1137,15 +869,15 @@ elif defined(macosx) or defined(bsd):
var sock: cint
var msg: RtMessage
var res = Route()
var pid = posix.getpid()
var pid = osdefs.getpid()
if address.family notin {AddressFamily.IPv4, AddressFamily.IPv6}:
return
if address.family == AddressFamily.IPv4:
sock = cint(posix.socket(PF_ROUTE, posix.SOCK_RAW, posix.AF_INET))
sock = cint(osdefs.socket(PF_ROUTE, osdefs.SOCK_RAW, osdefs.AF_INET))
elif address.family == AddressFamily.IPv6:
sock = cint(posix.socket(PF_ROUTE, posix.SOCK_RAW, posix.AF_INET6))
sock = cint(osdefs.socket(PF_ROUTE, osdefs.SOCK_RAW, osdefs.AF_INET6))
if sock != -1:
var sastore: Sockaddr_storage
@ -1162,13 +894,13 @@ elif defined(macosx) or defined(bsd):
msg.rtm.rtm_addrs = RTA_DST
msg.space[0] = cast[byte](salen)
msg.rtm.rtm_msglen = uint16(sizeof(RtMessage))
let wres = posix.write(sock, addr msg, sizeof(RtMessage))
let wres = osdefs.write(sock, addr msg, sizeof(RtMessage))
if wres >= 0:
let rres =
block:
var pres = 0
while true:
pres = posix.read(sock, addr msg, sizeof(RtMessage))
pres = osdefs.read(sock, addr msg, sizeof(RtMessage))
if ((pres >= 0) and (msg.rtm.rtm_pid == pid) and
(msg.rtm.rtm_seq == 0xCAFE)) or (pres < 0):
break
@ -1198,142 +930,17 @@ elif defined(macosx) or defined(bsd):
if a.host.family == address.family:
res.source = a.host
break
discard posix.close(sock)
discard osdefs.close(sock)
res
elif defined(windows):
import winlean, dynlib
import dynlib
import ".."/osutils
const
WorkBufferSize = 16384'u32
MaxTries = 3
AF_UNSPEC = 0x00'u32
GAA_FLAG_INCLUDE_PREFIX = 0x0010'u32
CP_UTF8 = 65001'u32
ERROR_BUFFER_OVERFLOW* = 111'u32
ERROR_SUCCESS* = 0'u32
type
WCHAR = distinct uint16
SocketAddress = object
lpSockaddr: ptr SockAddr
iSockaddrLength: cint
IpAdapterUnicastAddressXpLh = object
length: uint32
flags: uint32
next: ptr IpAdapterUnicastAddressXpLh
address: SocketAddress
prefixOrigin: cint
suffixOrigin: cint
dadState: cint
validLifetime: uint32
preferredLifetime: uint32
leaseLifetime: uint32
onLinkPrefixLength: byte # This field is available only from Vista
IpAdapterAnycastAddressXp = object
length: uint32
flags: uint32
next: ptr IpAdapterAnycastAddressXp
address: SocketAddress
IpAdapterMulticastAddressXp = object
length: uint32
flags: uint32
next: ptr IpAdapterMulticastAddressXp
address: SocketAddress
IpAdapterDnsServerAddressXp = object
length: uint32
flags: uint32
next: ptr IpAdapterDnsServerAddressXp
address: SocketAddress
IpAdapterPrefixXp = object
length: uint32
flags: uint32
next: ptr IpAdapterPrefixXp
address: SocketAddress
prefixLength: uint32
IpAdapterAddressesXp = object
length: uint32
ifIndex: uint32
next: ptr IpAdapterAddressesXp
adapterName: cstring
unicastAddress: ptr IpAdapterUnicastAddressXpLh
anycastAddress: ptr IpAdapterAnycastAddressXp
multicastAddress: ptr IpAdapterMulticastAddressXp
dnsServerAddress: ptr IpAdapterDnsServerAddressXp
dnsSuffix: ptr WCHAR
description: ptr WCHAR
friendlyName: ptr WCHAR
physicalAddress: array[MaxAdapterAddressLength, byte]
physicalAddressLength: uint32
flags: uint32
mtu: uint32
ifType: uint32
operStatus: cint
ipv6IfIndex: uint32
zoneIndices: array[16, uint32]
firstPrefix: ptr IpAdapterPrefixXp
MibIpForwardRow = object
dwForwardDest: uint32
dwForwardMask: uint32
dwForwardPolicy: uint32
dwForwardNextHop: uint32
dwForwardIfIndex: uint32
dwForwardType: uint32
dwForwardProto: uint32
dwForwardAge: uint32
dwForwardNextHopAS: uint32
dwForwardMetric1: uint32
dwForwardMetric2: uint32
dwForwardMetric3: uint32
dwForwardMetric4: uint32
dwForwardMetric5: uint32
SOCKADDR_INET {.union.} = object
ipv4: Sockaddr_in
ipv6: Sockaddr_in6
si_family: uint16
IPADDRESS_PREFIX = object
prefix: SOCKADDR_INET
prefixLength: uint8
MibIpForwardRow2 = object
interfaceLuid: uint64
interfaceIndex: uint32
destinationPrefix: IPADDRESS_PREFIX
nextHop: SOCKADDR_INET
sitePrefixLength: byte
validLifetime: uint32
preferredLifetime: uint32
metric: uint32
protocol: uint32
loopback: bool
autoconfigureAddress: bool
publish: bool
immortal: bool
age: uint32
origin: uint32
GETBESTROUTE2 = proc(InterfaceLuid: ptr uint64, InterfaceIndex: uint32,
SourceAddress: ptr SOCKADDR_INET,
DestinationAddress: ptr SOCKADDR_INET,
AddressSortOptions: uint32,
BestRoute: ptr MibIpForwardRow2,
BestSourceAddress: ptr SOCKADDR_INET): DWORD {.
gcsafe, stdcall, raises: [].}
proc toInterfaceType(ft: uint32): InterfaceType {.inline.} =
if (ft >= 1'u32 and ft <= 196'u32) or
(ft == 237) or (ft == 243) or (ft == 244) or (ft == 259) or (ft == 281):
@ -1347,45 +954,14 @@ elif defined(windows):
else:
StatusUnknown
proc GetAdaptersAddresses(family: uint32, flags: uint32, reserved: pointer,
addresses: ptr IpAdapterAddressesXp,
sizeptr: ptr uint32): uint32 {.
stdcall, dynlib: "iphlpapi", importc: "GetAdaptersAddresses",
raises: [].}
proc WideCharToMultiByte(CodePage: uint32, dwFlags: uint32,
lpWideCharStr: ptr WCHAR, cchWideChar: cint,
lpMultiByteStr: ptr char, cbMultiByte: cint,
lpDefaultChar: ptr char,
lpUsedDefaultChar: ptr uint32): cint
{.stdcall, dynlib: "kernel32.dll", importc: "WideCharToMultiByte",
raises: [].}
proc getBestRouteXp(dwDestAddr: uint32, dwSourceAddr: uint32,
pBestRoute: ptr MibIpForwardRow): uint32 {.
stdcall, dynlib: "iphlpapi", importc: "GetBestRoute",
raises: [].}
proc `$`(bstr: ptr WCHAR): string =
var buffer: char
var count = WideCharToMultiByte(CP_UTF8, 0, bstr, -1, addr(buffer), 0,
nil, nil)
if count > 0:
var res = newString(count + 8)
let wres = WideCharToMultiByte(CP_UTF8, 0, bstr, -1, addr(res[0]),
count, nil, nil)
if wres > 0:
res.setLen(wres - 1)
else:
res.setLen(0)
res
else:
""
let res = toString(bstr)
if res.isErr(): "" else: res.get()
proc isVista(): bool =
var ver: OSVERSIONINFO
ver.dwOSVersionInfoSize = DWORD(sizeof(ver))
let res = getVersionExW(addr(ver))
let res = getVersionEx(addr(ver))
if res == 0:
false
else:
@ -1466,8 +1042,8 @@ elif defined(windows):
while true:
buffer = newSeq[byte](size)
var addresses = cast[ptr IpAdapterAddressesXp](addr buffer[0])
gres = GetAdaptersAddresses(AF_UNSPEC, GAA_FLAG_INCLUDE_PREFIX, nil,
addresses, addr size)
gres = getAdaptersAddresses(osdefs.AF_UNSPEC, GAA_FLAG_INCLUDE_PREFIX,
nil, addresses, addr size)
if gres == ERROR_SUCCESS:
buffer.setLen(size)
break
@ -1525,14 +1101,14 @@ elif defined(windows):
addr bestRoute,
cast[ptr SOCKADDR_INET](addr src))
if gres == 0:
if src.ss_family == winlean.AF_INET:
if src.ss_family == osdefs.AF_INET:
fromSAddr(addr src, SockLen(sizeof(Sockaddr_in)), res.source)
elif src.ss_family == winlean.AF_INET6:
elif src.ss_family == osdefs.AF_INET6:
fromSAddr(addr src, SockLen(sizeof(Sockaddr_in6)), res.source)
if bestRoute.nextHop.si_family == winlean.AF_INET:
if bestRoute.nextHop.si_family == osdefs.AF_INET:
fromSAddr(cast[ptr Sockaddr_storage](addr bestRoute.nextHop),
SockLen(sizeof(Sockaddr_in)), res.gateway)
elif bestRoute.nextHop.si_family == winlean.AF_INET6:
elif bestRoute.nextHop.si_family == osdefs.AF_INET6:
fromSAddr(cast[ptr Sockaddr_storage](addr bestRoute.nextHop),
SockLen(sizeof(Sockaddr_in6)), res.gateway)
if res.gateway.isZero():

File diff suppressed because it is too large Load Diff

View File

@ -7,14 +7,13 @@
# MIT license (LICENSE-MIT)
import std/[strutils, os]
import unittest2
import ../chronos
import ".."/chronos, ".."/chronos/osdefs
when defined(nimHasUsed): {.used.}
when defined(windows):
import winlean
else:
import posix
proc get_osfhandle*(fd: FileHandle): HANDLE {.
importc: "_get_osfhandle", header:"<io.h>".}
suite "Stream Transport test suite":
const
@ -1251,7 +1250,7 @@ suite "Stream Transport test suite":
-1
return res
var fut = wtransp.writer()
var fut {.used.} = wtransp.writer()
try:
await rtransp.readExactly(addr buffer[0], 16384 * 1024)
except CatchableError: