diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index 01e2bab..34089c7 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -126,6 +126,7 @@ type connectionsCount*: int socketFlags*: set[SocketFlags] flags*: HttpClientFlags + dualstack*: DualStackType HttpAddress* = object id*: string @@ -263,7 +264,8 @@ proc new*(t: typedesc[HttpSessionRef], maxConnections = -1, idleTimeout = HttpConnectionIdleTimeout, idlePeriod = HttpConnectionCheckPeriod, - socketFlags: set[SocketFlags] = {}): HttpSessionRef {. + socketFlags: set[SocketFlags] = {}, + dualstack = DualStackType.Auto): HttpSessionRef {. raises: [] .} = ## Create new HTTP session object. ## @@ -283,7 +285,8 @@ proc new*(t: typedesc[HttpSessionRef], idleTimeout: idleTimeout, idlePeriod: idlePeriod, connections: initTable[string, seq[HttpClientConnectionRef]](), - socketFlags: socketFlags + socketFlags: socketFlags, + dualstack: dualstack ) res.watcherFut = if HttpClientFlag.Http11Pipeline in flags: @@ -620,7 +623,8 @@ proc connect(session: HttpSessionRef, let transp = try: await connect(address, bufferSize = session.connectionBufferSize, - flags = session.socketFlags) + flags = session.socketFlags, + dualstack = session.dualstack) except CancelledError as exc: raise exc except CatchableError: diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index f0788e2..2ab5317 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -191,7 +191,8 @@ proc new*(htype: typedesc[HttpServerRef], backlogSize: int = DefaultBacklogSize, httpHeadersTimeout = 10.seconds, maxHeadersSize: int = 8192, - maxRequestBodySize: int = 1_048_576): HttpResult[HttpServerRef] {. + maxRequestBodySize: int = 1_048_576, + dualstack = DualStackType.Auto): HttpResult[HttpServerRef] {. raises: [].} = let serverUri = @@ -206,7 +207,7 @@ proc new*(htype: typedesc[HttpServerRef], let serverInstance = try: createStreamServer(address, flags = socketFlags, bufferSize = bufferSize, - backlog = backlogSize) + backlog = backlogSize, dualstack = dualstack) except TransportOsError as exc: return err(exc.msg) except CatchableError as exc: diff --git a/chronos/apps/http/shttpserver.nim b/chronos/apps/http/shttpserver.nim index 6d321a0..0300597 100644 --- a/chronos/apps/http/shttpserver.nim +++ b/chronos/apps/http/shttpserver.nim @@ -92,7 +92,8 @@ proc new*(htype: typedesc[SecureHttpServerRef], backlogSize: int = DefaultBacklogSize, httpHeadersTimeout = 10.seconds, maxHeadersSize: int = 8192, - maxRequestBodySize: int = 1_048_576 + maxRequestBodySize: int = 1_048_576, + dualstack = DualStackType.Auto ): HttpResult[SecureHttpServerRef] {.raises: [].} = doAssert(not(isNil(tlsPrivateKey)), "TLS private key must not be nil!") @@ -110,7 +111,7 @@ proc new*(htype: typedesc[SecureHttpServerRef], let serverInstance = try: createStreamServer(address, flags = socketFlags, bufferSize = bufferSize, - backlog = backlogSize) + backlog = backlogSize, dualstack = dualstack) except TransportOsError as exc: return err(exc.msg) except CatchableError as exc: diff --git a/chronos/handles.nim b/chronos/handles.nim index 2348b33..afa57fb 100644 --- a/chronos/handles.nim +++ b/chronos/handles.nim @@ -21,66 +21,113 @@ const asyncInvalidSocket* = AsyncFD(osdefs.INVALID_SOCKET) asyncInvalidPipe* = asyncInvalidSocket -proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool = +proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool {. + deprecated: "Please use setDescriptorBlocking() instead".} = ## Sets blocking mode on socket. - when defined(windows) or defined(nimdoc): - var mode = clong(ord(not blocking)) - if osdefs.ioctlsocket(s, osdefs.FIONBIO, addr(mode)) == -1: - false - else: - true - else: - let x: int = osdefs.fcntl(s, osdefs.F_GETFL, 0) - if x == -1: - false - else: - let mode = - if blocking: x and not osdefs.O_NONBLOCK else: x or osdefs.O_NONBLOCK - if osdefs.fcntl(s, osdefs.F_SETFL, mode) == -1: - false - else: - true + setDescriptorBlocking(s, blocking).isOkOr: + return false + true -proc setSockOpt*(socket: AsyncFD, level, optname, optval: int): bool = - ## `setsockopt()` for integer options. - ## Returns ``true`` on success, ``false`` on error. +proc setSockOpt2*(socket: AsyncFD, + level, optname, optval: int): Result[void, OSErrorCode] = var value = cint(optval) - osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname), - addr(value), SockLen(sizeof(value))) >= cint(0) + let res = osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname), + addr(value), SockLen(sizeof(value))) + if res == -1: + return err(osLastError()) + ok() -proc setSockOpt*(socket: AsyncFD, level, optname: int, value: pointer, - valuelen: int): bool = +proc setSockOpt2*(socket: AsyncFD, level, optname: int, value: pointer, + valuelen: int): Result[void, OSErrorCode] = ## `setsockopt()` for custom options (pointer and length). ## Returns ``true`` on success, ``false`` on error. - osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname), value, - SockLen(valuelen)) >= cint(0) + let res = osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname), + value, SockLen(valuelen)) + if res == -1: + return err(osLastError()) + ok() -proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool = +proc setSockOpt*(socket: AsyncFD, level, optname, optval: int): bool {. + deprecated: "Please use setSockOpt2() instead".} = + ## `setsockopt()` for integer options. + ## Returns ``true`` on success, ``false`` on error. + setSockOpt2(socket, level, optname, optval).isOk + +proc setSockOpt*(socket: AsyncFD, level, optname: int, value: pointer, + valuelen: int): bool {. + deprecated: "Please use setSockOpt2() instead".} = + ## `setsockopt()` for custom options (pointer and length). + ## Returns ``true`` on success, ``false`` on error. + setSockOpt2(socket, level, optname, value, valuelen).isOk + +proc getSockOpt2*(socket: AsyncFD, + level, optname: int): Result[cint, OSErrorCode] = + var + value: cint + size = SockLen(sizeof(value)) + let res = osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname), + addr(value), addr(size)) + if res == -1: + return err(osLastError()) + ok(value) + +proc getSockOpt2*(socket: AsyncFD, level, optname: int, + T: type): Result[T, OSErrorCode] = + var + value = default(T) + size = SockLen(sizeof(value)) + let res = osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname), + cast[ptr byte](addr(value)), addr(size)) + if res == -1: + return err(osLastError()) + ok(value) + +proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool {. + deprecated: "Please use getSockOpt2() instead".} = ## `getsockopt()` for integer options. ## Returns ``true`` on success, ``false`` on error. - var res: cint - var size = SockLen(sizeof(res)) - if osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname), - addr(res), addr(size)) >= cint(0): - value = int(res) - true - else: - false + value = getSockOpt2(socket, level, optname).valueOr: + return false + true -proc getSockOpt*(socket: AsyncFD, level, optname: int, value: pointer, - valuelen: var int): bool = +proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var pointer, + valuelen: var int): bool {. + deprecated: "Please use getSockOpt2() instead".} = ## `getsockopt()` for custom options (pointer and length). ## Returns ``true`` on success, ``false`` on error. osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname), value, cast[ptr SockLen](addr valuelen)) >= cint(0) -proc getSocketError*(socket: AsyncFD, err: var int): bool = +proc getSocketError*(socket: AsyncFD, err: var int): bool {. + deprecated: "Please use getSocketError() instead".} = ## Recover error code associated with socket handle ``socket``. - getSockOpt(socket, cint(osdefs.SOL_SOCKET), cint(osdefs.SO_ERROR), err) + err = getSockOpt2(socket, cint(osdefs.SOL_SOCKET), + cint(osdefs.SO_ERROR)).valueOr: + return false + true + +proc getSocketError2*(socket: AsyncFD): Result[cint, OSErrorCode] = + getSockOpt2(socket, cint(osdefs.SOL_SOCKET), cint(osdefs.SO_ERROR)) + +proc isAvailable*(domain: Domain): bool = + when defined(windows): + let fd = wsaSocket(toInt(domain), toInt(SockType.SOCK_STREAM), + toInt(Protocol.IPPROTO_TCP), nil, GROUP(0), 0'u32) + if fd == osdefs.INVALID_SOCKET: + return if osLastError() == osdefs.WSAEAFNOSUPPORT: false else: true + discard closeFd(fd) + true + else: + let fd = osdefs.socket(toInt(domain), toInt(SockType.SOCK_STREAM), + toInt(Protocol.IPPROTO_TCP)) + if fd == -1: + return if osLastError() == osdefs.EAFNOSUPPORT: false else: true + discard closeFd(fd) + true proc createAsyncSocket2*(domain: Domain, sockType: SockType, - protocol: Protocol, - inherit = true): Result[AsyncFD, OSErrorCode] = + protocol: Protocol, + inherit = true): Result[AsyncFD, OSErrorCode] = ## Creates new asynchronous socket. when defined(windows): let flags = @@ -93,15 +140,12 @@ proc createAsyncSocket2*(domain: Domain, sockType: SockType, if fd == osdefs.INVALID_SOCKET: return err(osLastError()) - let bres = setDescriptorBlocking(fd, false) - if bres.isErr(): + setDescriptorBlocking(fd, false).isOkOr: discard closeFd(fd) - return err(bres.error()) - - let res = register2(AsyncFD(fd)) - if res.isErr(): + return err(error) + register2(AsyncFD(fd)).isOkOr: discard closeFd(fd) - return err(res.error()) + return err(error) ok(AsyncFD(fd)) else: @@ -114,23 +158,20 @@ proc createAsyncSocket2*(domain: Domain, sockType: SockType, let fd = osdefs.socket(toInt(domain), socketType, toInt(protocol)) if fd == -1: return err(osLastError()) - let res = register2(AsyncFD(fd)) - if res.isErr(): + register2(AsyncFD(fd)).isOkOr: discard closeFd(fd) - return err(res.error()) + return err(error) ok(AsyncFD(fd)) else: let fd = osdefs.socket(toInt(domain), toInt(sockType), toInt(protocol)) if fd == -1: return err(osLastError()) - let bres = setDescriptorFlags(cint(fd), true, true) - if bres.isErr(): + setDescriptorFlags(cint(fd), true, true).isOkOr: discard closeFd(fd) - return err(bres.error()) - let res = register2(AsyncFD(fd)) - if res.isErr(): + return err(error) + register2(AsyncFD(fd)).isOkOr: discard closeFd(fd) - return err(bres.error()) + return err(error) ok(AsyncFD(fd)) proc wrapAsyncSocket2*(sock: cint|SocketHandle): Result[AsyncFD, OSErrorCode] = @@ -230,3 +271,26 @@ proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] = else: let pipes = res.get() (read: AsyncFD(pipes.read), write: AsyncFD(pipes.write)) + +proc getDualstack*(fd: AsyncFD): Result[bool, OSErrorCode] = + ## Returns `true` if `IPV6_V6ONLY` socket option set to `false`. + var + flag = cint(0) + size = SockLen(sizeof(flag)) + let res = osdefs.getsockopt(SocketHandle(fd), cint(osdefs.IPPROTO_IPV6), + cint(osdefs.IPV6_V6ONLY), addr(flag), addr(size)) + if res == -1: + return err(osLastError()) + ok(flag == cint(0)) + +proc setDualstack*(fd: AsyncFD, value: bool): Result[void, OSErrorCode] = + ## Sets `IPV6_V6ONLY` socket option value to `false` if `value == true` and + ## to `true` if `value == false`. + var + flag = cint(if value: 0 else: 1) + size = SockLen(sizeof(flag)) + let res = osdefs.setsockopt(SocketHandle(fd), cint(osdefs.IPPROTO_IPV6), + cint(osdefs.IPV6_V6ONLY), addr(flag), size) + if res == -1: + return err(osLastError()) + ok() diff --git a/chronos/internal/asyncengine.nim b/chronos/internal/asyncengine.nim index 5a46f04..ebcc278 100644 --- a/chronos/internal/asyncengine.nim +++ b/chronos/internal/asyncengine.nim @@ -670,6 +670,19 @@ when defined(windows): if not(isNil(aftercb)): loop.callbacks.addLast(AsyncCallback(function: aftercb, udata: param)) + proc unregisterAndCloseFd*(fd: AsyncFD): Result[void, OSErrorCode] = + ## Unregister from system queue and close asynchronous socket. + ## + ## NOTE: Use this function to close temporary sockets/pipes only (which + ## are not exposed to the public and not supposed to be used/reused). + ## Please use closeSocket(AsyncFD) and closeHandle(AsyncFD) instead. + doAssert(fd != AsyncFD(osdefs.INVALID_SOCKET)) + unregister(fd) + if closeFd(SocketHandle(fd)) != 0: + err(osLastError()) + else: + ok() + proc contains*(disp: PDispatcher, fd: AsyncFD): bool = ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. fd in disp.handles diff --git a/chronos/osdefs.nim b/chronos/osdefs.nim index 78de4b7..ab07721 100644 --- a/chronos/osdefs.nim +++ b/chronos/osdefs.nim @@ -122,6 +122,7 @@ when defined(windows): SO_UPDATE_ACCEPT_CONTEXT* = 0x700B SO_CONNECT_TIME* = 0x700C SO_UPDATE_CONNECT_CONTEXT* = 0x7010 + SO_PROTOCOL_INFOW* = 0x2005 FILE_FLAG_FIRST_PIPE_INSTANCE* = 0x00080000'u32 FILE_FLAG_OPEN_NO_RECALL* = 0x00100000'u32 @@ -258,6 +259,9 @@ when defined(windows): FIONBIO* = WSAIOW(102, 126) HANDLE_FLAG_INHERIT* = 1'u32 + IPV6_V6ONLY* = 27 + MAX_PROTOCOL_CHAIN* = 7 + WSAPROTOCOL_LEN* = 255 type LONG* = int32 @@ -441,6 +445,32 @@ when defined(windows): prefix*: SOCKADDR_INET prefixLength*: uint8 + WSAPROTOCOLCHAIN* {.final, pure.} = object + chainLen*: int32 + chainEntries*: array[MAX_PROTOCOL_CHAIN, DWORD] + + WSAPROTOCOL_INFO* {.final, pure.} = object + dwServiceFlags1*: uint32 + dwServiceFlags2*: uint32 + dwServiceFlags3*: uint32 + dwServiceFlags4*: uint32 + dwProviderFlags*: uint32 + providerId*: GUID + dwCatalogEntryId*: DWORD + protocolChain*: WSAPROTOCOLCHAIN + iVersion*: int32 + iAddressFamily*: int32 + iMaxSockAddr*: int32 + iMinSockAddr*: int32 + iSocketType*: int32 + iProtocol*: int32 + iProtocolMaxOffset*: int32 + iNetworkByteOrder*: int32 + iSecurityScheme*: int32 + dwMessageSize*: uint32 + dwProviderReserved*: uint32 + szProtocol*: array[WSAPROTOCOL_LEN + 1, WCHAR] + MibIpForwardRow2* {.final, pure.} = object interfaceLuid*: uint64 interfaceIndex*: uint32 @@ -890,7 +920,7 @@ elif defined(macos) or defined(macosx): O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM, SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK, AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR, - SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, + SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, @@ -915,7 +945,7 @@ elif defined(macos) or defined(macosx): O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM, SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK, AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR, - SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, + SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, @@ -977,7 +1007,8 @@ elif defined(linux): SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL, MSG_PEEK, AF_INET, AF_INET6, AF_UNIX, SO_REUSEADDR, SO_REUSEPORT, - SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, + SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6, + IPV6_MULTICAST_HOPS, SOCK_DGRAM, SOCK_STREAM, SHUT_RD, SHUT_WR, SHUT_RDWR, POLLIN, POLLOUT, POLLERR, POLLHUP, POLLNVAL, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, @@ -1005,7 +1036,7 @@ elif defined(linux): SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL, MSG_PEEK, AF_INET, AF_INET6, AF_UNIX, SO_REUSEADDR, SO_REUSEPORT, - SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, + SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, SOCK_DGRAM, SOCK_STREAM, SHUT_RD, SHUT_WR, SHUT_RDWR, POLLIN, POLLOUT, POLLERR, POLLHUP, POLLNVAL, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, @@ -1127,7 +1158,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM, SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK, AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR, - SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, + SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, SHUT_RD, SHUT_WR, SHUT_RDWR, @@ -1154,7 +1185,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM, SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK, AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR, - SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, + SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, SHUT_RD, SHUT_WR, SHUT_RDWR, @@ -1182,47 +1213,52 @@ when defined(linux): SOCK_CLOEXEC* = 0x80000 TCP_NODELAY* = cint(1) IPPROTO_TCP* = 6 -elif defined(freebsd) or defined(netbsd) or defined(dragonfly): + O_CLOEXEC* = 0x80000 + POSIX_SPAWN_USEVFORK* = 0x40 + IPV6_V6ONLY* = 26 +elif defined(freebsd): const SOCK_NONBLOCK* = 0x20000000 SOCK_CLOEXEC* = 0x10000000 TCP_NODELAY* = cint(1) IPPROTO_TCP* = 6 + O_CLOEXEC* = 0x00100000 + POSIX_SPAWN_USEVFORK* = 0x00 + IPV6_V6ONLY* = 27 +elif defined(netbsd): + const + SOCK_NONBLOCK* = 0x20000000 + SOCK_CLOEXEC* = 0x10000000 + TCP_NODELAY* = cint(1) + IPPROTO_TCP* = 6 + O_CLOEXEC* = 0x00400000 + POSIX_SPAWN_USEVFORK* = 0x00 + IPV6_V6ONLY* = 27 +elif defined(dragonfly): + const + SOCK_NONBLOCK* = 0x20000000 + SOCK_CLOEXEC* = 0x10000000 + TCP_NODELAY* = cint(1) + IPPROTO_TCP* = 6 + O_CLOEXEC* = 0x00020000 + POSIX_SPAWN_USEVFORK* = 0x00 + IPV6_V6ONLY* = 27 elif defined(openbsd): const SOCK_CLOEXEC* = 0x8000 SOCK_NONBLOCK* = 0x4000 TCP_NODELAY* = cint(1) IPPROTO_TCP* = 6 + O_CLOEXEC* = 0x10000 + POSIX_SPAWN_USEVFORK* = 0x00 + IPV6_V6ONLY* = 27 elif defined(macos) or defined(macosx): const TCP_NODELAY* = cint(1) IP_MULTICAST_TTL* = cint(10) IPPROTO_TCP* = 6 - -when defined(linux): - const - O_CLOEXEC* = 0x80000 - POSIX_SPAWN_USEVFORK* = 0x40 -elif defined(freebsd): - const - O_CLOEXEC* = 0x00100000 - POSIX_SPAWN_USEVFORK* = 0x00 -elif defined(openbsd): - const - O_CLOEXEC* = 0x10000 - POSIX_SPAWN_USEVFORK* = 0x00 -elif defined(netbsd): - const - O_CLOEXEC* = 0x00400000 - POSIX_SPAWN_USEVFORK* = 0x00 -elif defined(dragonfly): - const - O_CLOEXEC* = 0x00020000 - POSIX_SPAWN_USEVFORK* = 0x00 -elif defined(macos) or defined(macosx): - const POSIX_SPAWN_USEVFORK* = 0x00 + IPV6_V6ONLY* = 27 when defined(linux) or defined(macos) or defined(macosx) or defined(freebsd) or defined(openbsd) or defined(netbsd) or defined(dragonfly): diff --git a/chronos/oserrno.nim b/chronos/oserrno.nim index 4f1c765..2a9f82c 100644 --- a/chronos/oserrno.nim +++ b/chronos/oserrno.nim @@ -1328,6 +1328,7 @@ elif defined(windows): ERROR_CONNECTION_REFUSED* = OSErrorCode(1225) ERROR_CONNECTION_ABORTED* = OSErrorCode(1236) WSAEMFILE* = OSErrorCode(10024) + WSAEAFNOSUPPORT* = OSErrorCode(10047) WSAENETDOWN* = OSErrorCode(10050) WSAENETRESET* = OSErrorCode(10052) WSAECONNABORTED* = OSErrorCode(10053) diff --git a/chronos/osutils.nim b/chronos/osutils.nim index 86505c2..f9c09f2 100644 --- a/chronos/osutils.nim +++ b/chronos/osutils.nim @@ -346,6 +346,10 @@ else: return err(osLastError()) ok() + proc setDescriptorBlocking*(s: SocketHandle, + value: bool): Result[void, OSErrorCode] = + setDescriptorBlocking(cint(s), value) + proc setDescriptorInheritance*(s: cint, value: bool): Result[void, OSErrorCode] = let flags = handleEintr(osdefs.fcntl(s, osdefs.F_GETFD)) diff --git a/chronos/transports/common.nim b/chronos/transports/common.nim index 4b4be7d..b7776e5 100644 --- a/chronos/transports/common.nim +++ b/chronos/transports/common.nim @@ -11,7 +11,7 @@ import std/[strutils] import stew/[base10, byteutils] -import ".."/[asyncloop, osdefs, oserrno] +import ".."/[asyncloop, osdefs, oserrno, handles] from std/net import Domain, `==`, IpAddress, IpAddressFamily, parseIpAddress, SockType, Protocol, Port, `$` @@ -31,6 +31,9 @@ type ReuseAddr, ReusePort, TcpNoDelay, NoAutoRead, GCUserData, FirstPipe, NoPipeFlash, Broadcast + DualStackType* {.pure.} = enum + Auto, Enabled, Disabled, Default + AddressFamily* {.pure.} = enum None, IPv4, IPv6, Unix @@ -76,6 +79,7 @@ when defined(windows) or defined(nimdoc): asock*: AsyncFD # Current AcceptEx() socket errorCode*: OSErrorCode # Current error code abuffer*: array[128, byte] # Windows AcceptEx() buffer + dualstack*: DualStackType # IPv4/IPv6 dualstack parameters when defined(windows): aovl*: CustomOverlapped # AcceptEx OVERLAPPED structure else: @@ -90,6 +94,7 @@ else: bufferSize*: int # Size of internal transports' buffer loopFuture*: Future[void] # Server's main Future errorCode*: OSErrorCode # Current error code + dualstack*: DualStackType # IPv4/IPv6 dualstack parameters type TransportError* = object of AsyncError @@ -720,3 +725,75 @@ proc raiseTransportError*(ecode: OSErrorCode) {. raise getTransportTooManyError(ecode) else: raise getTransportOsError(ecode) + +proc isAvailable*(family: AddressFamily): bool = + case family + of AddressFamily.None: + raiseAssert "Invalid address family" + of AddressFamily.IPv4: + isAvailable(Domain.AF_INET) + of AddressFamily.IPv6: + isAvailable(Domain.AF_INET6) + of AddressFamily.Unix: + isAvailable(Domain.AF_UNIX) + +proc getDomain*(socket: AsyncFD): Result[AddressFamily, OSErrorCode] = + ## Returns address family which is used to create socket ``socket``. + ## + ## Note: `chronos` supports only `AF_INET`, `AF_INET6` and `AF_UNIX` sockets. + ## For all other types of sockets this procedure returns + ## `EAFNOSUPPORT/WSAEAFNOSUPPORT` error. + when defined(windows): + let protocolInfo = ? getSockOpt2(socket, cint(osdefs.SOL_SOCKET), + cint(osdefs.SO_PROTOCOL_INFOW), + WSAPROTOCOL_INFO) + if protocolInfo.iAddressFamily == toInt(Domain.AF_INET): + ok(AddressFamily.IPv4) + elif protocolInfo.iAddressFamily == toInt(Domain.AF_INET6): + ok(AddressFamily.IPv6) + else: + err(WSAEAFNOSUPPORT) + else: + var + saddr = Sockaddr_storage() + slen = SockLen(sizeof(saddr)) + if getsockname(SocketHandle(socket), cast[ptr SockAddr](addr saddr), + addr slen) != 0: + return err(osLastError()) + if int(saddr.ss_family) == toInt(Domain.AF_INET): + ok(AddressFamily.IPv4) + elif int(saddr.ss_family) == toInt(Domain.AF_INET6): + ok(AddressFamily.IPv6) + elif int(saddr.ss_family) == toInt(Domain.AF_UNIX): + ok(AddressFamily.Unix) + else: + err(EAFNOSUPPORT) + +proc setDualstack*(socket: AsyncFD, family: AddressFamily, + flag: DualStackType): Result[void, OSErrorCode] = + if family == AddressFamily.IPv6: + case flag + of DualStackType.Auto: + # In case of `Auto` we going to ignore all the errors. + discard setDualstack(socket, true) + ok() + of DualStackType.Enabled: + ? setDualstack(socket, true) + ok() + of DualStackType.Disabled: + ? setDualstack(socket, false) + ok() + of DualStackType.Default: + ok() + else: + ok() + +proc setDualstack*(socket: AsyncFD, + flag: DualStackType): Result[void, OSErrorCode] = + let family = + case flag + of DualStackType.Auto: + getDomain(socket).get(AddressFamily.IPv6) + else: + ? getDomain(socket) + setDualstack(socket, family, flag) diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index af29c2a..aec18ae 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -11,7 +11,7 @@ import std/deques when not(defined(windows)): import ".."/selectors2 -import ".."/[asyncloop, osdefs, oserrno, handles] +import ".."/[asyncloop, osdefs, oserrno, osutils, handles] import "."/common type @@ -247,57 +247,65 @@ when defined(windows): udata: pointer, child: DatagramTransport, bufferSize: int, - ttl: int): DatagramTransport {. + ttl: int, + dualstack = DualStackType.Auto + ): DatagramTransport {. raises: [TransportOsError].} = - var localSock: AsyncFD - doAssert(remote.family == local.family) doAssert(not isNil(cbproc)) - doAssert(remote.family in {AddressFamily.IPv4, AddressFamily.IPv6}) - var res = if isNil(child): DatagramTransport() else: child - if sock == asyncInvalidSocket: - localSock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM, - Protocol.IPPROTO_UDP) - - if localSock == asyncInvalidSocket: - raiseTransportOsError(osLastError()) - else: - if not setSocketBlocking(SocketHandle(sock), false): - raiseTransportOsError(osLastError()) - localSock = sock - let bres = register2(localSock) - if bres.isErr(): - raiseTransportOsError(bres.error()) + let localSock = + if sock == asyncInvalidSocket: + let proto = + if local.family == AddressFamily.Unix: + Protocol.IPPROTO_IP + else: + Protocol.IPPROTO_UDP + let res = createAsyncSocket2(local.getDomain(), SockType.SOCK_DGRAM, + proto) + if res.isErr(): + raiseTransportOsError(res.error) + res.get() + else: + setDescriptorBlocking(SocketHandle(sock), false).isOkOr: + raiseTransportOsError(error) + register2(sock).isOkOr: + raiseTransportOsError(error) + sock ## Apply ServerFlags here if ServerFlags.ReuseAddr in flags: - if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEADDR, 1): - let err = osLastError() + setSockOpt2(localSock, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr: if sock == asyncInvalidSocket: closeSocket(localSock) - raiseTransportOsError(err) + raiseTransportOsError(error) if ServerFlags.ReusePort in flags: - if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEPORT, 1): - let err = osLastError() + setSockOpt2(localSock, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr: if sock == asyncInvalidSocket: closeSocket(localSock) - raiseTransportOsError(err) + raiseTransportOsError(error) if ServerFlags.Broadcast in flags: - if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_BROADCAST, 1): - let err = osLastError() + setSockOpt2(localSock, SOL_SOCKET, SO_BROADCAST, 1).isOkOr: if sock == asyncInvalidSocket: closeSocket(localSock) - raiseTransportOsError(err) + raiseTransportOsError(error) if ttl > 0: - if not setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IP_TTL, ttl): - let err = osLastError() + setSockOpt2(localSock, osdefs.IPPROTO_IP, osdefs.IP_TTL, ttl).isOkOr: if sock == asyncInvalidSocket: closeSocket(localSock) - raiseTransportOsError(err) + raiseTransportOsError(error) + + ## IPV6_V6ONLY + if sock == asyncInvalidSocket: + setDualstack(localSock, local.family, dualstack).isOkOr: + closeSocket(localSock) + raiseTransportOsError(error) + else: + setDualstack(localSock, dualstack).isOkOr: + raiseTransportOsError(error) ## Fix for Q263823. var bytesRet: DWORD @@ -457,70 +465,75 @@ else: udata: pointer, child: DatagramTransport, bufferSize: int, - ttl: int): DatagramTransport {. + ttl: int, + dualstack = DualStackType.Auto + ): DatagramTransport {. raises: [TransportOsError].} = - var localSock: AsyncFD - doAssert(remote.family == local.family) doAssert(not isNil(cbproc)) - var res = if isNil(child): DatagramTransport() else: child - if sock == asyncInvalidSocket: - let proto = - if local.family == AddressFamily.Unix: - Protocol.IPPROTO_IP - else: - Protocol.IPPROTO_UDP - localSock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM, - proto) - if localSock == asyncInvalidSocket: - raiseTransportOsError(osLastError()) - else: - if not setSocketBlocking(SocketHandle(sock), false): - raiseTransportOsError(osLastError()) - localSock = sock - let bres = register2(localSock) - if bres.isErr(): - raiseTransportOsError(bres.error()) + let localSock = + if sock == asyncInvalidSocket: + let proto = + if local.family == AddressFamily.Unix: + Protocol.IPPROTO_IP + else: + Protocol.IPPROTO_UDP + let res = createAsyncSocket2(local.getDomain(), SockType.SOCK_DGRAM, + proto) + if res.isErr(): + raiseTransportOsError(res.error) + res.get() + else: + setDescriptorBlocking(SocketHandle(sock), false).isOkOr: + raiseTransportOsError(error) + register2(sock).isOkOr: + raiseTransportOsError(error) + sock ## Apply ServerFlags here if ServerFlags.ReuseAddr in flags: - if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEADDR, 1): - let err = osLastError() + setSockOpt2(localSock, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr: if sock == asyncInvalidSocket: closeSocket(localSock) - raiseTransportOsError(err) + raiseTransportOsError(error) if ServerFlags.ReusePort in flags: - if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEPORT, 1): - let err = osLastError() + setSockOpt2(localSock, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr: if sock == asyncInvalidSocket: closeSocket(localSock) - raiseTransportOsError(err) + raiseTransportOsError(error) if ServerFlags.Broadcast in flags: - if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_BROADCAST, 1): - let err = osLastError() + setSockOpt2(localSock, SOL_SOCKET, SO_BROADCAST, 1).isOkOr: if sock == asyncInvalidSocket: closeSocket(localSock) - raiseTransportOsError(err) + raiseTransportOsError(error) if ttl > 0: - let tres = - if local.family == AddressFamily.IPv4: - setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IP_MULTICAST_TTL, - cint(ttl)) - elif local.family == AddressFamily.IPv6: - setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IPV6_MULTICAST_HOPS, - cint(ttl)) - else: - raiseAssert "Unsupported address bound to local socket" + if local.family == AddressFamily.IPv4: + setSockOpt2(localSock, osdefs.IPPROTO_IP, osdefs.IP_MULTICAST_TTL, + cint(ttl)).isOkOr: + if sock == asyncInvalidSocket: + closeSocket(localSock) + raiseTransportOsError(error) + elif local.family == AddressFamily.IPv6: + setSockOpt2(localSock, osdefs.IPPROTO_IP, osdefs.IPV6_MULTICAST_HOPS, + cint(ttl)).isOkOr: + if sock == asyncInvalidSocket: + closeSocket(localSock) + raiseTransportOsError(error) + else: + raiseAssert "Unsupported address bound to local socket" - if not tres: - let err = osLastError() - if sock == asyncInvalidSocket: - closeSocket(localSock) - raiseTransportOsError(err) + ## IPV6_V6ONLY + if sock == asyncInvalidSocket: + setDualstack(localSock, local.family, dualstack).isOkOr: + closeSocket(localSock) + raiseTransportOsError(error) + else: + setDualstack(localSock, dualstack).isOkOr: + raiseTransportOsError(error) if local.family != AddressFamily.None: var saddr: Sockaddr_storage @@ -594,8 +607,9 @@ proc newDatagramTransport*(cbproc: DatagramCallback, udata: pointer = nil, child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize, - ttl: int = 0 - ): DatagramTransport {. + ttl: int = 0, + dualstack = DualStackType.Auto + ): DatagramTransport {. raises: [TransportOsError].} = ## Create new UDP datagram transport (IPv4). ## @@ -610,7 +624,7 @@ proc newDatagramTransport*(cbproc: DatagramCallback, ## ``ttl`` - TTL for UDP datagram packet (only usable when flags has ## ``Broadcast`` option). newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child, - bufSize, ttl) + bufSize, ttl, dualstack) proc newDatagramTransport*[T](cbproc: DatagramCallback, udata: ref T, @@ -620,13 +634,15 @@ proc newDatagramTransport*[T](cbproc: DatagramCallback, flags: set[ServerFlags] = {}, child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize, - ttl: int = 0 - ): DatagramTransport {. + ttl: int = 0, + dualstack = DualStackType.Auto + ): DatagramTransport {. raises: [TransportOsError].} = var fflags = flags + {GCUserData} GC_ref(udata) newDatagramTransportCommon(cbproc, remote, local, sock, fflags, - cast[pointer](udata), child, bufSize, ttl) + cast[pointer](udata), child, bufSize, ttl, + dualstack) proc newDatagramTransport6*(cbproc: DatagramCallback, remote: TransportAddress = AnyAddress6, @@ -636,8 +652,9 @@ proc newDatagramTransport6*(cbproc: DatagramCallback, udata: pointer = nil, child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize, - ttl: int = 0 - ): DatagramTransport {. + ttl: int = 0, + dualstack = DualStackType.Auto + ): DatagramTransport {. raises: [TransportOsError].} = ## Create new UDP datagram transport (IPv6). ## @@ -652,7 +669,7 @@ proc newDatagramTransport6*(cbproc: DatagramCallback, ## ``ttl`` - TTL for UDP datagram packet (only usable when flags has ## ``Broadcast`` option). newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child, - bufSize, ttl) + bufSize, ttl, dualstack) proc newDatagramTransport6*[T](cbproc: DatagramCallback, udata: ref T, @@ -662,13 +679,15 @@ proc newDatagramTransport6*[T](cbproc: DatagramCallback, flags: set[ServerFlags] = {}, child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize, - ttl: int = 0 - ): DatagramTransport {. + ttl: int = 0, + dualstack = DualStackType.Auto + ): DatagramTransport {. raises: [TransportOsError].} = var fflags = flags + {GCUserData} GC_ref(udata) newDatagramTransportCommon(cbproc, remote, local, sock, fflags, - cast[pointer](udata), child, bufSize, ttl) + cast[pointer](udata), child, bufSize, ttl, + dualstack) proc join*(transp: DatagramTransport): Future[void] = ## Wait until the transport ``transp`` will be closed. diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index f96650c..8982b99 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -639,7 +639,8 @@ when defined(windows): child: StreamTransport = nil, localAddress = TransportAddress(), flags: set[SocketFlags] = {}, - ): Future[StreamTransport] = + dualstack = DualStackType.Auto + ): Future[StreamTransport] = ## Open new connection to remote peer with address ``address`` and create ## new transport object ``StreamTransport`` for established connection. ## ``bufferSize`` is size of internal buffer for transport. @@ -658,24 +659,33 @@ when defined(windows): toSAddr(raddress, saddr, slen) proto = Protocol.IPPROTO_TCP - sock = createAsyncSocket(raddress.getDomain(), SockType.SOCK_STREAM, - proto) - if sock == asyncInvalidSocket: - retFuture.fail(getTransportOsError(osLastError())) + sock = createAsyncSocket2(raddress.getDomain(), SockType.SOCK_STREAM, + proto).valueOr: + retFuture.fail(getTransportOsError(error)) return retFuture + if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: + if SocketFlags.TcpNoDelay in flags: + setSockOpt2(sock, osdefs.IPPROTO_TCP, osdefs.TCP_NODELAY, 1).isOkOr: + sock.closeSocket() + retFuture.fail(getTransportOsError(error)) + return retFuture + if SocketFlags.ReuseAddr in flags: - if not(setSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1)): - let err = osLastError() + setSockOpt2(sock, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr: sock.closeSocket() - retFuture.fail(getTransportOsError(err)) + retFuture.fail(getTransportOsError(error)) return retFuture if SocketFlags.ReusePort in flags: - if not(setSockOpt(sock, SOL_SOCKET, SO_REUSEPORT, 1)): - let err = osLastError() + setSockOpt2(sock, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr: sock.closeSocket() - retFuture.fail(getTransportOsError(err)) + retFuture.fail(getTransportOsError(error)) return retFuture + # IPV6_V6ONLY. + setDualstack(sock, address.family, dualstack).isOkOr: + sock.closeSocket() + retFuture.fail(getTransportOsError(error)) + return retFuture if localAddress != TransportAddress(): if localAddress.family != address.family: @@ -966,14 +976,9 @@ when defined(windows): if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}: server.apending = true # TODO No way to report back errors! - server.asock = - block: - let sock = createAsyncSocket(server.domain, SockType.SOCK_STREAM, - Protocol.IPPROTO_TCP) - if sock == asyncInvalidSocket: - raiseOsDefect(osLastError(), - "acceptLoop(): Unablet to create new socket") - sock + server.asock = createAsyncSocket2(server.domain, SockType.SOCK_STREAM, + Protocol.IPPROTO_TCP).valueOr: + raiseOsDefect(error, "acceptLoop(): Unablet to create new socket") var dwBytesReceived = DWORD(0) let dwReceiveDataLength = DWORD(0) @@ -1167,15 +1172,13 @@ when defined(windows): if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}: # TCP Sockets part var loop = getThreadDispatcher() - server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM, - Protocol.IPPROTO_TCP) - if server.asock == asyncInvalidSocket: - let err = osLastError() - case err + server.asock = createAsyncSocket2(server.domain, SockType.SOCK_STREAM, + Protocol.IPPROTO_TCP).valueOr: + case error of ERROR_TOO_MANY_OPEN_FILES, WSAENOBUFS, WSAEMFILE: - retFuture.fail(getTransportTooManyError(err)) + retFuture.fail(getTransportTooManyError(error)) else: - retFuture.fail(getTransportOsError(err)) + retFuture.fail(getTransportOsError(error)) return retFuture var dwBytesReceived = DWORD(0) @@ -1468,7 +1471,8 @@ else: child: StreamTransport = nil, localAddress = TransportAddress(), flags: set[SocketFlags] = {}, - ): Future[StreamTransport] = + dualstack = DualStackType.Auto, + ): Future[StreamTransport] = ## Open new connection to remote peer with address ``address`` and create ## new transport object ``StreamTransport`` for established connection. ## ``bufferSize`` - size of internal buffer for transport. @@ -1483,36 +1487,37 @@ else: else: Protocol.IPPROTO_TCP - let sock = createAsyncSocket(address.getDomain(), SockType.SOCK_STREAM, - proto) - if sock == asyncInvalidSocket: - let err = osLastError() - case err + let sock = createAsyncSocket2(address.getDomain(), SockType.SOCK_STREAM, + proto).valueOr: + case error of oserrno.EMFILE: retFuture.fail(getTransportTooManyError()) else: - retFuture.fail(getTransportOsError(err)) + retFuture.fail(getTransportOsError(error)) return retFuture if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: if SocketFlags.TcpNoDelay in flags: - if not(setSockOpt(sock, osdefs.IPPROTO_TCP, osdefs.TCP_NODELAY, 1)): - let err = osLastError() + setSockOpt2(sock, osdefs.IPPROTO_TCP, osdefs.TCP_NODELAY, 1).isOkOr: sock.closeSocket() - retFuture.fail(getTransportOsError(err)) + retFuture.fail(getTransportOsError(error)) return retFuture + if SocketFlags.ReuseAddr in flags: - if not(setSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1)): - let err = osLastError() + setSockOpt2(sock, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr: sock.closeSocket() - retFuture.fail(getTransportOsError(err)) + retFuture.fail(getTransportOsError(error)) return retFuture if SocketFlags.ReusePort in flags: - if not(setSockOpt(sock, SOL_SOCKET, SO_REUSEPORT, 1)): - let err = osLastError() + setSockOpt2(sock, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr: sock.closeSocket() - retFuture.fail(getTransportOsError(err)) + retFuture.fail(getTransportOsError(error)) return retFuture + # IPV6_V6ONLY. + setDualstack(sock, address.family, dualstack).isOkOr: + sock.closeSocket() + retFuture.fail(getTransportOsError(error)) + return retFuture if localAddress != TransportAddress(): if localAddress.family != address.family: @@ -1532,17 +1537,14 @@ else: proc continuation(udata: pointer) = if not(retFuture.finished()): - var err = 0 - - let res = removeWriter2(sock) - if res.isErr(): + removeWriter2(sock).isOkOr: discard unregisterAndCloseFd(sock) - retFuture.fail(getTransportOsError(res.error())) + retFuture.fail(getTransportOsError(error)) return - if not(sock.getSocketError(err)): + let err = sock.getSocketError2().valueOr: discard unregisterAndCloseFd(sock) - retFuture.fail(getTransportOsError(res.error())) + retFuture.fail(getTransportOsError(error)) return if err != 0: @@ -1578,10 +1580,9 @@ else: # http://www.madore.org/~david/computers/connect-intr.html case errorCode of oserrno.EINPROGRESS, oserrno.EINTR: - let res = addWriter2(sock, continuation) - if res.isErr(): + addWriter2(sock, continuation).isOkOr: discard unregisterAndCloseFd(sock) - retFuture.fail(getTransportOsError(res.error())) + retFuture.fail(getTransportOsError(error)) return retFuture retFuture.cancelCallback = cancel break @@ -1782,11 +1783,13 @@ proc connect*(address: TransportAddress, bufferSize = DefaultStreamBufferSize, child: StreamTransport = nil, flags: set[TransportFlags], - localAddress = TransportAddress()): Future[StreamTransport] = + localAddress = TransportAddress(), + dualstack = DualStackType.Auto + ): Future[StreamTransport] = # Retro compatibility with TransportFlags var mappedFlags: set[SocketFlags] if TcpNoDelay in flags: mappedFlags.incl(SocketFlags.TcpNoDelay) - address.connect(bufferSize, child, localAddress, mappedFlags) + connect(address, bufferSize, child, localAddress, mappedFlags, dualstack) proc close*(server: StreamServer) = ## Release ``server`` resources. @@ -1848,7 +1851,8 @@ proc createStreamServer*(host: TransportAddress, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, init: TransportInitCallback = nil, - udata: pointer = nil): StreamServer {. + udata: pointer = nil, + dualstack = DualStackType.Auto): StreamServer {. raises: [TransportOsError].} = ## Create new TCP stream server. ## @@ -1874,42 +1878,48 @@ proc createStreamServer*(host: TransportAddress, elif defined(windows): # Windows if host.family in {AddressFamily.IPv4, AddressFamily.IPv6}: - if sock == asyncInvalidSocket: - serverSocket = createAsyncSocket(host.getDomain(), - SockType.SOCK_STREAM, - Protocol.IPPROTO_TCP) - - if serverSocket == asyncInvalidSocket: - raiseTransportOsError(osLastError()) - else: - let bres = setDescriptorBlocking(SocketHandle(sock), false) - if bres.isErr(): - raiseTransportOsError(bres.error()) - let wres = register2(sock) - if wres.isErr(): - raiseTransportOsError(wres.error()) - serverSocket = sock - # SO_REUSEADDR is not useful for Unix domain sockets. + serverSocket = + if sock == asyncInvalidSocket: + # TODO (cheatfate): `valueOr` generates weird compile error. + let res = createAsyncSocket2(host.getDomain(), SockType.SOCK_STREAM, + Protocol.IPPROTO_TCP) + if res.isErr(): + raiseTransportOsError(res.error()) + res.get() + else: + setDescriptorBlocking(SocketHandle(sock), false).isOkOr: + raiseTransportOsError(error) + register2(sock).isOkOr: + raiseTransportOsError(error) + sock + # SO_REUSEADDR if ServerFlags.ReuseAddr in flags: - if not(setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1)): - let err = osLastError() + setSockOpt2(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr: if sock == asyncInvalidSocket: discard closeFd(SocketHandle(serverSocket)) - raiseTransportOsError(err) + raiseTransportOsError(error) + # SO_REUSEPORT if ServerFlags.ReusePort in flags: - if not(setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEPORT, 1)): - let err = osLastError() + setSockOpt2(serverSocket, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr: if sock == asyncInvalidSocket: discard closeFd(SocketHandle(serverSocket)) - raiseTransportOsError(err) - # TCP flags are not useful for Unix domain sockets. + raiseTransportOsError(error) + # TCP_NODELAY if ServerFlags.TcpNoDelay in flags: - if not(setSockOpt(serverSocket, osdefs.IPPROTO_TCP, - osdefs.TCP_NODELAY, 1)): - let err = osLastError() + setSockOpt2(serverSocket, osdefs.IPPROTO_TCP, + osdefs.TCP_NODELAY, 1).isOkOr: if sock == asyncInvalidSocket: discard closeFd(SocketHandle(serverSocket)) - raiseTransportOsError(err) + raiseTransportOsError(error) + # IPV6_V6ONLY. + if sock == asyncInvalidSocket: + setDualstack(serverSocket, host.family, dualstack).isOkOr: + discard closeFd(SocketHandle(serverSocket)) + raiseTransportOsError(error) + else: + setDualstack(serverSocket, dualstack).isOkOr: + raiseTransportOsError(error) + host.toSAddr(saddr, slen) if bindSocket(SocketHandle(serverSocket), cast[ptr SockAddr](addr saddr), slen) != 0: @@ -1936,47 +1946,54 @@ proc createStreamServer*(host: TransportAddress, serverSocket = AsyncFD(0) else: # Posix - if sock == asyncInvalidSocket: - let proto = if host.family == AddressFamily.Unix: - Protocol.IPPROTO_IP + serverSocket = + if sock == asyncInvalidSocket: + let proto = if host.family == AddressFamily.Unix: + Protocol.IPPROTO_IP + else: + Protocol.IPPROTO_TCP + # TODO (cheatfate): `valueOr` generates weird compile error. + let res = createAsyncSocket2(host.getDomain(), SockType.SOCK_STREAM, + proto) + if res.isErr(): + raiseTransportOsError(res.error()) + res.get() else: - Protocol.IPPROTO_TCP - serverSocket = createAsyncSocket(host.getDomain(), - SockType.SOCK_STREAM, - proto) - if serverSocket == asyncInvalidSocket: - raiseTransportOsError(osLastError()) - else: - let bres = setDescriptorFlags(cint(sock), true, true) - if bres.isErr(): - raiseTransportOsError(osLastError()) - let rres = register2(sock) - if rres.isErr(): - raiseTransportOsError(osLastError()) - serverSocket = sock + setDescriptorFlags(cint(sock), true, true).isOkOr: + raiseTransportOsError(error) + register2(sock).isOkOr: + raiseTransportOsError(error) + sock if host.family in {AddressFamily.IPv4, AddressFamily.IPv6}: - # SO_REUSEADDR and SO_REUSEPORT are not useful for Unix domain sockets. + # SO_REUSEADDR if ServerFlags.ReuseAddr in flags: - if not(setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1)): - let err = osLastError() + setSockOpt2(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr: if sock == asyncInvalidSocket: discard unregisterAndCloseFd(serverSocket) - raiseTransportOsError(err) + raiseTransportOsError(error) + # SO_REUSEPORT if ServerFlags.ReusePort in flags: - if not(setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEPORT, 1)): - let err = osLastError() + setSockOpt2(serverSocket, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr: if sock == asyncInvalidSocket: discard unregisterAndCloseFd(serverSocket) - raiseTransportOsError(err) - # TCP flags are not useful for Unix domain sockets. + raiseTransportOsError(error) + # TCP_NODELAY if ServerFlags.TcpNoDelay in flags: - if not(setSockOpt(serverSocket, osdefs.IPPROTO_TCP, - osdefs.TCP_NODELAY, 1)): - let err = osLastError() + setSockOpt2(serverSocket, osdefs.IPPROTO_TCP, + osdefs.TCP_NODELAY, 1).isOkOr: if sock == asyncInvalidSocket: discard unregisterAndCloseFd(serverSocket) - raiseTransportOsError(err) + raiseTransportOsError(error) + # IPV6_V6ONLY + if sock == asyncInvalidSocket: + setDualstack(serverSocket, host.family, dualstack).isOkOr: + discard closeFd(SocketHandle(serverSocket)) + raiseTransportOsError(error) + else: + setDualstack(serverSocket, dualstack).isOkOr: + raiseTransportOsError(error) + elif host.family in {AddressFamily.Unix}: # We do not care about result here, because if file cannot be removed, # `bindSocket` will return EADDRINUSE. @@ -2016,6 +2033,7 @@ proc createStreamServer*(host: TransportAddress, sres.status = Starting sres.loopFuture = newFuture[void]("stream.transport.server") sres.udata = udata + sres.dualstack = dualstack if localAddress.family == AddressFamily.None: sres.local = host else: @@ -2029,8 +2047,7 @@ proc createStreamServer*(host: TransportAddress, cb = acceptPipeLoop if not(isNil(cbproc)): - sres.aovl.data = CompletionData(cb: cb, - udata: cast[pointer](sres)) + sres.aovl.data = CompletionData(cb: cb, udata: cast[pointer](sres)) else: if host.family == AddressFamily.Unix: sres.sock = @@ -2055,10 +2072,11 @@ proc createStreamServer*(host: TransportAddress, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, init: TransportInitCallback = nil, - udata: pointer = nil): StreamServer {. + udata: pointer = nil, + dualstack = DualStackType.Auto): StreamServer {. raises: [CatchableError].} = createStreamServer(host, nil, flags, sock, backlog, bufferSize, - child, init, cast[pointer](udata)) + child, init, cast[pointer](udata), dualstack) proc createStreamServer*[T](host: TransportAddress, cbproc: StreamCallback, @@ -2068,12 +2086,13 @@ proc createStreamServer*[T](host: TransportAddress, backlog: int = DefaultBacklogSize, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, - init: TransportInitCallback = nil): StreamServer {. + init: TransportInitCallback = nil, + dualstack = DualStackType.Auto): StreamServer {. raises: [CatchableError].} = var fflags = flags + {GCUserData} GC_ref(udata) createStreamServer(host, cbproc, fflags, sock, backlog, bufferSize, - child, init, cast[pointer](udata)) + child, init, cast[pointer](udata), dualstack) proc createStreamServer*[T](host: TransportAddress, flags: set[ServerFlags] = {}, @@ -2082,12 +2101,13 @@ proc createStreamServer*[T](host: TransportAddress, backlog: int = DefaultBacklogSize, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, - init: TransportInitCallback = nil): StreamServer {. + init: TransportInitCallback = nil, + dualstack = DualStackType.Auto): StreamServer {. raises: [CatchableError].} = var fflags = flags + {GCUserData} GC_ref(udata) createStreamServer(host, nil, fflags, sock, backlog, bufferSize, - child, init, cast[pointer](udata)) + child, init, cast[pointer](udata), dualstack) proc getUserData*[T](server: StreamServer): T {.inline.} = ## Obtain user data stored in ``server`` object. diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index 7db04f9..ae7ab23 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -533,6 +533,54 @@ suite "Datagram Transport test suite": result = res + proc performDualstackTest( + sstack: DualStackType, saddr: TransportAddress, + cstack: DualStackType, caddr: TransportAddress + ): Future[bool] {.async.} = + var + expectStr = "ANYADDRESS MESSAGE" + event = newAsyncEvent() + res = 0 + + proc process1(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var bmsg = transp.getMessage() + var smsg = cast[string](bmsg) + if smsg == expectStr: + inc(res) + event.fire() + + proc process2(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + discard + + let + sdgram = newDatagramTransport(process1, local = saddr, + dualstack = sstack) + localcaddr = + if caddr.family == AddressFamily.IPv4: + AnyAddress + else: + AnyAddress6 + + cdgram = newDatagramTransport(process2, local = localcaddr, + dualstack = cstack) + + var address = caddr + address.port = sdgram.localAddress().port + + try: + await cdgram.sendTo(address, addr expectStr[0], len(expectStr)) + except CatchableError: + discard + try: + await event.wait().wait(500.milliseconds) + except CatchableError: + discard + + await allFutures(sdgram.closeWait(), cdgram.closeWait()) + res == 1 + test "close(transport) test": check waitFor(testTransportClose()) == true test m1: @@ -557,5 +605,83 @@ suite "Datagram Transport test suite": check waitFor(testBroadcast()) == 1 test "0.0.0.0/::0 (INADDR_ANY) test": check waitFor(testAnyAddress()) == 6 + asyncTest "[IP] getDomain(socket) [SOCK_DGRAM] test": + if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6): + block: + let res = createAsyncSocket2(Domain.AF_INET, SockType.SOCK_DGRAM, + Protocol.IPPROTO_UDP) + check res.isOk() + let fres = getDomain(res.get()) + check fres.isOk() + discard unregisterAndCloseFd(res.get()) + check fres.get() == AddressFamily.IPv4 + + block: + let res = createAsyncSocket2(Domain.AF_INET6, SockType.SOCK_DGRAM, + Protocol.IPPROTO_UDP) + check res.isOk() + let fres = getDomain(res.get()) + check fres.isOk() + discard unregisterAndCloseFd(res.get()) + check fres.get() == AddressFamily.IPv6 + + when not(defined(windows)): + block: + let res = createAsyncSocket2(Domain.AF_UNIX, SockType.SOCK_DGRAM, + Protocol.IPPROTO_IP) + check res.isOk() + let fres = getDomain(res.get()) + check fres.isOk() + discard unregisterAndCloseFd(res.get()) + check fres.get() == AddressFamily.Unix + else: + skip() + asyncTest "[IP] DualStack [UDP] server [DualStackType.Auto] test": + if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6): + let serverAddress = initTAddress("[::]:0") + check: + (await performDualstackTest( + DualStackType.Auto, serverAddress, + DualStackType.Auto, initTAddress("127.0.0.1:0"))) == true + check: + (await performDualstackTest( + DualStackType.Auto, serverAddress, + DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == true + check: + (await performDualstackTest( + DualStackType.Auto, serverAddress, + DualStackType.Auto, initTAddress("[::1]:0"))) == true + else: + skip() + asyncTest "[IP] DualStack [UDP] server [DualStackType.Enabled] test": + if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6): + let serverAddress = initTAddress("[::]:0") + check: + (await performDualstackTest( + DualStackType.Enabled, serverAddress, + DualStackType.Auto, initTAddress("127.0.0.1:0"))) == true + (await performDualstackTest( + DualStackType.Enabled, serverAddress, + DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == true + (await performDualstackTest( + DualStackType.Enabled, serverAddress, + DualStackType.Auto, initTAddress("[::1]:0"))) == true + else: + skip() + asyncTest "[IP] DualStack [UDP] server [DualStackType.Disabled] test": + if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6): + let serverAddress = initTAddress("[::]:0") + check: + (await performDualstackTest( + DualStackType.Disabled, serverAddress, + DualStackType.Auto, initTAddress("127.0.0.1:0"))) == false + (await performDualstackTest( + DualStackType.Disabled, serverAddress, + DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == false + (await performDualstackTest( + DualStackType.Disabled, serverAddress, + DualStackType.Auto, initTAddress("[::1]:0"))) == true + else: + skip() test "Transports leak test": checkLeaks() diff --git a/tests/teststream.nim b/tests/teststream.nim index 762e996..b042792 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -1372,6 +1372,42 @@ suite "Stream Transport test suite": if not(sleepFut.finished()): await cancelAndWait(sleepFut) + proc performDualstackTest( + sstack: DualStackType, saddr: TransportAddress, + cstack: DualStackType, caddr: TransportAddress + ): Future[bool] {.async.} = + let server = createStreamServer(saddr, dualstack = sstack) + var address = caddr + address.port = server.localAddress().port + var acceptFut = server.accept() + let + clientTransp = + try: + let res = await connect(address, + dualstack = cstack).wait(500.milliseconds) + Opt.some(res) + except CatchableError: + Opt.none(StreamTransport) + serverTransp = + if clientTransp.isSome(): + let res = await acceptFut + Opt.some(res) + else: + Opt.none(StreamTransport) + + let testResult = clientTransp.isSome() and serverTransp.isSome() + var pending: seq[FutureBase] + if clientTransp.isSome(): + pending.add(closeWait(clientTransp.get())) + if serverTransp.isSome(): + pending.add(closeWait(serverTransp.get())) + else: + pending.add(cancelAndWait(acceptFut)) + await allFutures(pending) + server.stop() + await server.closeWait() + testResult + markFD = getCurrentFD() for i in 0..