IPv4/IPv6 dualstack (#456)

* Initial commit.

* Fix tests.

* Fix linux compilation issue.

* Add getDomain() implementation.
Add getDomain() tests.
Add datagram tests.

* Fix style errors.

* Deprecate NetFlag.
Deprecate new flags in ServerFlags.
Add isAvailable().
Fix setDualstack() to ignore errors on `Auto`.
Updatetests.

* Deprecate some old procedures.
Improve datagram transport a bit.

* Address review comments, and fix tests.

* Fix setDescriptorBlocking() issue.
Recover connect() dualstack behavior.
Add test for connect() IPv6-[IPv4 mapped] addresses.

* Fix alignment code issue.
Fix TcpNoDelay was not available on Windows.

* Add dualstack support to HTTP/HTTPS client/server.
This commit is contained in:
Eugene Kabanov 2023-10-30 15:27:50 +02:00 committed by GitHub
parent 8375770fe5
commit a70b145964
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 795 additions and 302 deletions

View File

@ -126,6 +126,7 @@ type
connectionsCount*: int
socketFlags*: set[SocketFlags]
flags*: HttpClientFlags
dualstack*: DualStackType
HttpAddress* = object
id*: string
@ -263,7 +264,8 @@ proc new*(t: typedesc[HttpSessionRef],
maxConnections = -1,
idleTimeout = HttpConnectionIdleTimeout,
idlePeriod = HttpConnectionCheckPeriod,
socketFlags: set[SocketFlags] = {}): HttpSessionRef {.
socketFlags: set[SocketFlags] = {},
dualstack = DualStackType.Auto): HttpSessionRef {.
raises: [] .} =
## Create new HTTP session object.
##
@ -283,7 +285,8 @@ proc new*(t: typedesc[HttpSessionRef],
idleTimeout: idleTimeout,
idlePeriod: idlePeriod,
connections: initTable[string, seq[HttpClientConnectionRef]](),
socketFlags: socketFlags
socketFlags: socketFlags,
dualstack: dualstack
)
res.watcherFut =
if HttpClientFlag.Http11Pipeline in flags:
@ -620,7 +623,8 @@ proc connect(session: HttpSessionRef,
let transp =
try:
await connect(address, bufferSize = session.connectionBufferSize,
flags = session.socketFlags)
flags = session.socketFlags,
dualstack = session.dualstack)
except CancelledError as exc:
raise exc
except CatchableError:

View File

@ -191,7 +191,8 @@ proc new*(htype: typedesc[HttpServerRef],
backlogSize: int = DefaultBacklogSize,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
maxRequestBodySize: int = 1_048_576): HttpResult[HttpServerRef] {.
maxRequestBodySize: int = 1_048_576,
dualstack = DualStackType.Auto): HttpResult[HttpServerRef] {.
raises: [].} =
let serverUri =
@ -206,7 +207,7 @@ proc new*(htype: typedesc[HttpServerRef],
let serverInstance =
try:
createStreamServer(address, flags = socketFlags, bufferSize = bufferSize,
backlog = backlogSize)
backlog = backlogSize, dualstack = dualstack)
except TransportOsError as exc:
return err(exc.msg)
except CatchableError as exc:

View File

@ -92,7 +92,8 @@ proc new*(htype: typedesc[SecureHttpServerRef],
backlogSize: int = DefaultBacklogSize,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
maxRequestBodySize: int = 1_048_576
maxRequestBodySize: int = 1_048_576,
dualstack = DualStackType.Auto
): HttpResult[SecureHttpServerRef] {.raises: [].} =
doAssert(not(isNil(tlsPrivateKey)), "TLS private key must not be nil!")
@ -110,7 +111,7 @@ proc new*(htype: typedesc[SecureHttpServerRef],
let serverInstance =
try:
createStreamServer(address, flags = socketFlags, bufferSize = bufferSize,
backlog = backlogSize)
backlog = backlogSize, dualstack = dualstack)
except TransportOsError as exc:
return err(exc.msg)
except CatchableError as exc:

View File

@ -21,62 +21,109 @@ const
asyncInvalidSocket* = AsyncFD(osdefs.INVALID_SOCKET)
asyncInvalidPipe* = asyncInvalidSocket
proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool =
proc setSocketBlocking*(s: SocketHandle, blocking: bool): bool {.
deprecated: "Please use setDescriptorBlocking() instead".} =
## Sets blocking mode on socket.
when defined(windows) or defined(nimdoc):
var mode = clong(ord(not blocking))
if osdefs.ioctlsocket(s, osdefs.FIONBIO, addr(mode)) == -1:
false
else:
true
else:
let x: int = osdefs.fcntl(s, osdefs.F_GETFL, 0)
if x == -1:
false
else:
let mode =
if blocking: x and not osdefs.O_NONBLOCK else: x or osdefs.O_NONBLOCK
if osdefs.fcntl(s, osdefs.F_SETFL, mode) == -1:
false
else:
setDescriptorBlocking(s, blocking).isOkOr:
return false
true
proc setSockOpt*(socket: AsyncFD, level, optname, optval: int): bool =
## `setsockopt()` for integer options.
## Returns ``true`` on success, ``false`` on error.
proc setSockOpt2*(socket: AsyncFD,
level, optname, optval: int): Result[void, OSErrorCode] =
var value = cint(optval)
osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(value), SockLen(sizeof(value))) >= cint(0)
let res = osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(value), SockLen(sizeof(value)))
if res == -1:
return err(osLastError())
ok()
proc setSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: int): bool =
proc setSockOpt2*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: int): Result[void, OSErrorCode] =
## `setsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname), value,
SockLen(valuelen)) >= cint(0)
let res = osdefs.setsockopt(SocketHandle(socket), cint(level), cint(optname),
value, SockLen(valuelen))
if res == -1:
return err(osLastError())
ok()
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool =
proc setSockOpt*(socket: AsyncFD, level, optname, optval: int): bool {.
deprecated: "Please use setSockOpt2() instead".} =
## `setsockopt()` for integer options.
## Returns ``true`` on success, ``false`` on error.
setSockOpt2(socket, level, optname, optval).isOk
proc setSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: int): bool {.
deprecated: "Please use setSockOpt2() instead".} =
## `setsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
setSockOpt2(socket, level, optname, value, valuelen).isOk
proc getSockOpt2*(socket: AsyncFD,
level, optname: int): Result[cint, OSErrorCode] =
var
value: cint
size = SockLen(sizeof(value))
let res = osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(value), addr(size))
if res == -1:
return err(osLastError())
ok(value)
proc getSockOpt2*(socket: AsyncFD, level, optname: int,
T: type): Result[T, OSErrorCode] =
var
value = default(T)
size = SockLen(sizeof(value))
let res = osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname),
cast[ptr byte](addr(value)), addr(size))
if res == -1:
return err(osLastError())
ok(value)
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool {.
deprecated: "Please use getSockOpt2() instead".} =
## `getsockopt()` for integer options.
## Returns ``true`` on success, ``false`` on error.
var res: cint
var size = SockLen(sizeof(res))
if osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname),
addr(res), addr(size)) >= cint(0):
value = int(res)
value = getSockOpt2(socket, level, optname).valueOr:
return false
true
else:
false
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: var int): bool =
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var pointer,
valuelen: var int): bool {.
deprecated: "Please use getSockOpt2() instead".} =
## `getsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
osdefs.getsockopt(SocketHandle(socket), cint(level), cint(optname),
value, cast[ptr SockLen](addr valuelen)) >= cint(0)
proc getSocketError*(socket: AsyncFD, err: var int): bool =
proc getSocketError*(socket: AsyncFD, err: var int): bool {.
deprecated: "Please use getSocketError() instead".} =
## Recover error code associated with socket handle ``socket``.
getSockOpt(socket, cint(osdefs.SOL_SOCKET), cint(osdefs.SO_ERROR), err)
err = getSockOpt2(socket, cint(osdefs.SOL_SOCKET),
cint(osdefs.SO_ERROR)).valueOr:
return false
true
proc getSocketError2*(socket: AsyncFD): Result[cint, OSErrorCode] =
getSockOpt2(socket, cint(osdefs.SOL_SOCKET), cint(osdefs.SO_ERROR))
proc isAvailable*(domain: Domain): bool =
when defined(windows):
let fd = wsaSocket(toInt(domain), toInt(SockType.SOCK_STREAM),
toInt(Protocol.IPPROTO_TCP), nil, GROUP(0), 0'u32)
if fd == osdefs.INVALID_SOCKET:
return if osLastError() == osdefs.WSAEAFNOSUPPORT: false else: true
discard closeFd(fd)
true
else:
let fd = osdefs.socket(toInt(domain), toInt(SockType.SOCK_STREAM),
toInt(Protocol.IPPROTO_TCP))
if fd == -1:
return if osLastError() == osdefs.EAFNOSUPPORT: false else: true
discard closeFd(fd)
true
proc createAsyncSocket2*(domain: Domain, sockType: SockType,
protocol: Protocol,
@ -93,15 +140,12 @@ proc createAsyncSocket2*(domain: Domain, sockType: SockType,
if fd == osdefs.INVALID_SOCKET:
return err(osLastError())
let bres = setDescriptorBlocking(fd, false)
if bres.isErr():
setDescriptorBlocking(fd, false).isOkOr:
discard closeFd(fd)
return err(bres.error())
let res = register2(AsyncFD(fd))
if res.isErr():
return err(error)
register2(AsyncFD(fd)).isOkOr:
discard closeFd(fd)
return err(res.error())
return err(error)
ok(AsyncFD(fd))
else:
@ -114,23 +158,20 @@ proc createAsyncSocket2*(domain: Domain, sockType: SockType,
let fd = osdefs.socket(toInt(domain), socketType, toInt(protocol))
if fd == -1:
return err(osLastError())
let res = register2(AsyncFD(fd))
if res.isErr():
register2(AsyncFD(fd)).isOkOr:
discard closeFd(fd)
return err(res.error())
return err(error)
ok(AsyncFD(fd))
else:
let fd = osdefs.socket(toInt(domain), toInt(sockType), toInt(protocol))
if fd == -1:
return err(osLastError())
let bres = setDescriptorFlags(cint(fd), true, true)
if bres.isErr():
setDescriptorFlags(cint(fd), true, true).isOkOr:
discard closeFd(fd)
return err(bres.error())
let res = register2(AsyncFD(fd))
if res.isErr():
return err(error)
register2(AsyncFD(fd)).isOkOr:
discard closeFd(fd)
return err(bres.error())
return err(error)
ok(AsyncFD(fd))
proc wrapAsyncSocket2*(sock: cint|SocketHandle): Result[AsyncFD, OSErrorCode] =
@ -230,3 +271,26 @@ proc createAsyncPipe*(): tuple[read: AsyncFD, write: AsyncFD] =
else:
let pipes = res.get()
(read: AsyncFD(pipes.read), write: AsyncFD(pipes.write))
proc getDualstack*(fd: AsyncFD): Result[bool, OSErrorCode] =
## Returns `true` if `IPV6_V6ONLY` socket option set to `false`.
var
flag = cint(0)
size = SockLen(sizeof(flag))
let res = osdefs.getsockopt(SocketHandle(fd), cint(osdefs.IPPROTO_IPV6),
cint(osdefs.IPV6_V6ONLY), addr(flag), addr(size))
if res == -1:
return err(osLastError())
ok(flag == cint(0))
proc setDualstack*(fd: AsyncFD, value: bool): Result[void, OSErrorCode] =
## Sets `IPV6_V6ONLY` socket option value to `false` if `value == true` and
## to `true` if `value == false`.
var
flag = cint(if value: 0 else: 1)
size = SockLen(sizeof(flag))
let res = osdefs.setsockopt(SocketHandle(fd), cint(osdefs.IPPROTO_IPV6),
cint(osdefs.IPV6_V6ONLY), addr(flag), size)
if res == -1:
return err(osLastError())
ok()

View File

@ -670,6 +670,19 @@ when defined(windows):
if not(isNil(aftercb)):
loop.callbacks.addLast(AsyncCallback(function: aftercb, udata: param))
proc unregisterAndCloseFd*(fd: AsyncFD): Result[void, OSErrorCode] =
## Unregister from system queue and close asynchronous socket.
##
## NOTE: Use this function to close temporary sockets/pipes only (which
## are not exposed to the public and not supposed to be used/reused).
## Please use closeSocket(AsyncFD) and closeHandle(AsyncFD) instead.
doAssert(fd != AsyncFD(osdefs.INVALID_SOCKET))
unregister(fd)
if closeFd(SocketHandle(fd)) != 0:
err(osLastError())
else:
ok()
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
fd in disp.handles

View File

@ -122,6 +122,7 @@ when defined(windows):
SO_UPDATE_ACCEPT_CONTEXT* = 0x700B
SO_CONNECT_TIME* = 0x700C
SO_UPDATE_CONNECT_CONTEXT* = 0x7010
SO_PROTOCOL_INFOW* = 0x2005
FILE_FLAG_FIRST_PIPE_INSTANCE* = 0x00080000'u32
FILE_FLAG_OPEN_NO_RECALL* = 0x00100000'u32
@ -258,6 +259,9 @@ when defined(windows):
FIONBIO* = WSAIOW(102, 126)
HANDLE_FLAG_INHERIT* = 1'u32
IPV6_V6ONLY* = 27
MAX_PROTOCOL_CHAIN* = 7
WSAPROTOCOL_LEN* = 255
type
LONG* = int32
@ -441,6 +445,32 @@ when defined(windows):
prefix*: SOCKADDR_INET
prefixLength*: uint8
WSAPROTOCOLCHAIN* {.final, pure.} = object
chainLen*: int32
chainEntries*: array[MAX_PROTOCOL_CHAIN, DWORD]
WSAPROTOCOL_INFO* {.final, pure.} = object
dwServiceFlags1*: uint32
dwServiceFlags2*: uint32
dwServiceFlags3*: uint32
dwServiceFlags4*: uint32
dwProviderFlags*: uint32
providerId*: GUID
dwCatalogEntryId*: DWORD
protocolChain*: WSAPROTOCOLCHAIN
iVersion*: int32
iAddressFamily*: int32
iMaxSockAddr*: int32
iMinSockAddr*: int32
iSocketType*: int32
iProtocol*: int32
iProtocolMaxOffset*: int32
iNetworkByteOrder*: int32
iSecurityScheme*: int32
dwMessageSize*: uint32
dwProviderReserved*: uint32
szProtocol*: array[WSAPROTOCOL_LEN + 1, WCHAR]
MibIpForwardRow2* {.final, pure.} = object
interfaceLuid*: uint64
interfaceIndex*: uint32
@ -890,7 +920,7 @@ elif defined(macos) or defined(macosx):
O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM,
SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
@ -915,7 +945,7 @@ elif defined(macos) or defined(macosx):
O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM,
SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
@ -977,7 +1007,8 @@ elif defined(linux):
SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL,
MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_REUSEADDR, SO_REUSEPORT,
SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS,
SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6,
IPV6_MULTICAST_HOPS,
SOCK_DGRAM, SOCK_STREAM, SHUT_RD, SHUT_WR, SHUT_RDWR,
POLLIN, POLLOUT, POLLERR, POLLHUP, POLLNVAL,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
@ -1005,7 +1036,7 @@ elif defined(linux):
SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL,
MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_REUSEADDR, SO_REUSEPORT,
SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS,
SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
SOCK_DGRAM, SOCK_STREAM, SHUT_RD, SHUT_WR, SHUT_RDWR,
POLLIN, POLLOUT, POLLERR, POLLHUP, POLLNVAL,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
@ -1127,7 +1158,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM,
SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC,
SHUT_RD, SHUT_WR, SHUT_RDWR,
@ -1154,7 +1185,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM,
SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPPROTO_IPV6,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC,
SHUT_RD, SHUT_WR, SHUT_RDWR,
@ -1182,47 +1213,52 @@ when defined(linux):
SOCK_CLOEXEC* = 0x80000
TCP_NODELAY* = cint(1)
IPPROTO_TCP* = 6
elif defined(freebsd) or defined(netbsd) or defined(dragonfly):
O_CLOEXEC* = 0x80000
POSIX_SPAWN_USEVFORK* = 0x40
IPV6_V6ONLY* = 26
elif defined(freebsd):
const
SOCK_NONBLOCK* = 0x20000000
SOCK_CLOEXEC* = 0x10000000
TCP_NODELAY* = cint(1)
IPPROTO_TCP* = 6
O_CLOEXEC* = 0x00100000
POSIX_SPAWN_USEVFORK* = 0x00
IPV6_V6ONLY* = 27
elif defined(netbsd):
const
SOCK_NONBLOCK* = 0x20000000
SOCK_CLOEXEC* = 0x10000000
TCP_NODELAY* = cint(1)
IPPROTO_TCP* = 6
O_CLOEXEC* = 0x00400000
POSIX_SPAWN_USEVFORK* = 0x00
IPV6_V6ONLY* = 27
elif defined(dragonfly):
const
SOCK_NONBLOCK* = 0x20000000
SOCK_CLOEXEC* = 0x10000000
TCP_NODELAY* = cint(1)
IPPROTO_TCP* = 6
O_CLOEXEC* = 0x00020000
POSIX_SPAWN_USEVFORK* = 0x00
IPV6_V6ONLY* = 27
elif defined(openbsd):
const
SOCK_CLOEXEC* = 0x8000
SOCK_NONBLOCK* = 0x4000
TCP_NODELAY* = cint(1)
IPPROTO_TCP* = 6
O_CLOEXEC* = 0x10000
POSIX_SPAWN_USEVFORK* = 0x00
IPV6_V6ONLY* = 27
elif defined(macos) or defined(macosx):
const
TCP_NODELAY* = cint(1)
IP_MULTICAST_TTL* = cint(10)
IPPROTO_TCP* = 6
when defined(linux):
const
O_CLOEXEC* = 0x80000
POSIX_SPAWN_USEVFORK* = 0x40
elif defined(freebsd):
const
O_CLOEXEC* = 0x00100000
POSIX_SPAWN_USEVFORK* = 0x00
elif defined(openbsd):
const
O_CLOEXEC* = 0x10000
POSIX_SPAWN_USEVFORK* = 0x00
elif defined(netbsd):
const
O_CLOEXEC* = 0x00400000
POSIX_SPAWN_USEVFORK* = 0x00
elif defined(dragonfly):
const
O_CLOEXEC* = 0x00020000
POSIX_SPAWN_USEVFORK* = 0x00
elif defined(macos) or defined(macosx):
const
POSIX_SPAWN_USEVFORK* = 0x00
IPV6_V6ONLY* = 27
when defined(linux) or defined(macos) or defined(macosx) or defined(freebsd) or
defined(openbsd) or defined(netbsd) or defined(dragonfly):

View File

@ -1328,6 +1328,7 @@ elif defined(windows):
ERROR_CONNECTION_REFUSED* = OSErrorCode(1225)
ERROR_CONNECTION_ABORTED* = OSErrorCode(1236)
WSAEMFILE* = OSErrorCode(10024)
WSAEAFNOSUPPORT* = OSErrorCode(10047)
WSAENETDOWN* = OSErrorCode(10050)
WSAENETRESET* = OSErrorCode(10052)
WSAECONNABORTED* = OSErrorCode(10053)

View File

@ -346,6 +346,10 @@ else:
return err(osLastError())
ok()
proc setDescriptorBlocking*(s: SocketHandle,
value: bool): Result[void, OSErrorCode] =
setDescriptorBlocking(cint(s), value)
proc setDescriptorInheritance*(s: cint,
value: bool): Result[void, OSErrorCode] =
let flags = handleEintr(osdefs.fcntl(s, osdefs.F_GETFD))

View File

@ -11,7 +11,7 @@
import std/[strutils]
import stew/[base10, byteutils]
import ".."/[asyncloop, osdefs, oserrno]
import ".."/[asyncloop, osdefs, oserrno, handles]
from std/net import Domain, `==`, IpAddress, IpAddressFamily, parseIpAddress,
SockType, Protocol, Port, `$`
@ -31,6 +31,9 @@ type
ReuseAddr, ReusePort, TcpNoDelay, NoAutoRead, GCUserData, FirstPipe,
NoPipeFlash, Broadcast
DualStackType* {.pure.} = enum
Auto, Enabled, Disabled, Default
AddressFamily* {.pure.} = enum
None, IPv4, IPv6, Unix
@ -76,6 +79,7 @@ when defined(windows) or defined(nimdoc):
asock*: AsyncFD # Current AcceptEx() socket
errorCode*: OSErrorCode # Current error code
abuffer*: array[128, byte] # Windows AcceptEx() buffer
dualstack*: DualStackType # IPv4/IPv6 dualstack parameters
when defined(windows):
aovl*: CustomOverlapped # AcceptEx OVERLAPPED structure
else:
@ -90,6 +94,7 @@ else:
bufferSize*: int # Size of internal transports' buffer
loopFuture*: Future[void] # Server's main Future
errorCode*: OSErrorCode # Current error code
dualstack*: DualStackType # IPv4/IPv6 dualstack parameters
type
TransportError* = object of AsyncError
@ -720,3 +725,75 @@ proc raiseTransportError*(ecode: OSErrorCode) {.
raise getTransportTooManyError(ecode)
else:
raise getTransportOsError(ecode)
proc isAvailable*(family: AddressFamily): bool =
case family
of AddressFamily.None:
raiseAssert "Invalid address family"
of AddressFamily.IPv4:
isAvailable(Domain.AF_INET)
of AddressFamily.IPv6:
isAvailable(Domain.AF_INET6)
of AddressFamily.Unix:
isAvailable(Domain.AF_UNIX)
proc getDomain*(socket: AsyncFD): Result[AddressFamily, OSErrorCode] =
## Returns address family which is used to create socket ``socket``.
##
## Note: `chronos` supports only `AF_INET`, `AF_INET6` and `AF_UNIX` sockets.
## For all other types of sockets this procedure returns
## `EAFNOSUPPORT/WSAEAFNOSUPPORT` error.
when defined(windows):
let protocolInfo = ? getSockOpt2(socket, cint(osdefs.SOL_SOCKET),
cint(osdefs.SO_PROTOCOL_INFOW),
WSAPROTOCOL_INFO)
if protocolInfo.iAddressFamily == toInt(Domain.AF_INET):
ok(AddressFamily.IPv4)
elif protocolInfo.iAddressFamily == toInt(Domain.AF_INET6):
ok(AddressFamily.IPv6)
else:
err(WSAEAFNOSUPPORT)
else:
var
saddr = Sockaddr_storage()
slen = SockLen(sizeof(saddr))
if getsockname(SocketHandle(socket), cast[ptr SockAddr](addr saddr),
addr slen) != 0:
return err(osLastError())
if int(saddr.ss_family) == toInt(Domain.AF_INET):
ok(AddressFamily.IPv4)
elif int(saddr.ss_family) == toInt(Domain.AF_INET6):
ok(AddressFamily.IPv6)
elif int(saddr.ss_family) == toInt(Domain.AF_UNIX):
ok(AddressFamily.Unix)
else:
err(EAFNOSUPPORT)
proc setDualstack*(socket: AsyncFD, family: AddressFamily,
flag: DualStackType): Result[void, OSErrorCode] =
if family == AddressFamily.IPv6:
case flag
of DualStackType.Auto:
# In case of `Auto` we going to ignore all the errors.
discard setDualstack(socket, true)
ok()
of DualStackType.Enabled:
? setDualstack(socket, true)
ok()
of DualStackType.Disabled:
? setDualstack(socket, false)
ok()
of DualStackType.Default:
ok()
else:
ok()
proc setDualstack*(socket: AsyncFD,
flag: DualStackType): Result[void, OSErrorCode] =
let family =
case flag
of DualStackType.Auto:
getDomain(socket).get(AddressFamily.IPv6)
else:
? getDomain(socket)
setDualstack(socket, family, flag)

View File

@ -11,7 +11,7 @@
import std/deques
when not(defined(windows)): import ".."/selectors2
import ".."/[asyncloop, osdefs, oserrno, handles]
import ".."/[asyncloop, osdefs, oserrno, osutils, handles]
import "."/common
type
@ -247,57 +247,65 @@ when defined(windows):
udata: pointer,
child: DatagramTransport,
bufferSize: int,
ttl: int): DatagramTransport {.
ttl: int,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
var localSock: AsyncFD
doAssert(remote.family == local.family)
doAssert(not isNil(cbproc))
doAssert(remote.family in {AddressFamily.IPv4, AddressFamily.IPv6})
var res = if isNil(child): DatagramTransport() else: child
let localSock =
if sock == asyncInvalidSocket:
localSock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM,
Protocol.IPPROTO_UDP)
if localSock == asyncInvalidSocket:
raiseTransportOsError(osLastError())
let proto =
if local.family == AddressFamily.Unix:
Protocol.IPPROTO_IP
else:
if not setSocketBlocking(SocketHandle(sock), false):
raiseTransportOsError(osLastError())
localSock = sock
let bres = register2(localSock)
if bres.isErr():
raiseTransportOsError(bres.error())
Protocol.IPPROTO_UDP
let res = createAsyncSocket2(local.getDomain(), SockType.SOCK_DGRAM,
proto)
if res.isErr():
raiseTransportOsError(res.error)
res.get()
else:
setDescriptorBlocking(SocketHandle(sock), false).isOkOr:
raiseTransportOsError(error)
register2(sock).isOkOr:
raiseTransportOsError(error)
sock
## Apply ServerFlags here
if ServerFlags.ReuseAddr in flags:
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEADDR, 1):
let err = osLastError()
setSockOpt2(localSock, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
raiseTransportOsError(error)
if ServerFlags.ReusePort in flags:
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEPORT, 1):
let err = osLastError()
setSockOpt2(localSock, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
raiseTransportOsError(error)
if ServerFlags.Broadcast in flags:
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_BROADCAST, 1):
let err = osLastError()
setSockOpt2(localSock, SOL_SOCKET, SO_BROADCAST, 1).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
raiseTransportOsError(error)
if ttl > 0:
if not setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IP_TTL, ttl):
let err = osLastError()
setSockOpt2(localSock, osdefs.IPPROTO_IP, osdefs.IP_TTL, ttl).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
raiseTransportOsError(error)
## IPV6_V6ONLY
if sock == asyncInvalidSocket:
setDualstack(localSock, local.family, dualstack).isOkOr:
closeSocket(localSock)
raiseTransportOsError(error)
else:
setDualstack(localSock, dualstack).isOkOr:
raiseTransportOsError(error)
## Fix for Q263823.
var bytesRet: DWORD
@ -457,70 +465,75 @@ else:
udata: pointer,
child: DatagramTransport,
bufferSize: int,
ttl: int): DatagramTransport {.
ttl: int,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
var localSock: AsyncFD
doAssert(remote.family == local.family)
doAssert(not isNil(cbproc))
var res = if isNil(child): DatagramTransport() else: child
let localSock =
if sock == asyncInvalidSocket:
let proto =
if local.family == AddressFamily.Unix:
Protocol.IPPROTO_IP
else:
Protocol.IPPROTO_UDP
localSock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM,
let res = createAsyncSocket2(local.getDomain(), SockType.SOCK_DGRAM,
proto)
if localSock == asyncInvalidSocket:
raiseTransportOsError(osLastError())
if res.isErr():
raiseTransportOsError(res.error)
res.get()
else:
if not setSocketBlocking(SocketHandle(sock), false):
raiseTransportOsError(osLastError())
localSock = sock
let bres = register2(localSock)
if bres.isErr():
raiseTransportOsError(bres.error())
setDescriptorBlocking(SocketHandle(sock), false).isOkOr:
raiseTransportOsError(error)
register2(sock).isOkOr:
raiseTransportOsError(error)
sock
## Apply ServerFlags here
if ServerFlags.ReuseAddr in flags:
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEADDR, 1):
let err = osLastError()
setSockOpt2(localSock, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
raiseTransportOsError(error)
if ServerFlags.ReusePort in flags:
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_REUSEPORT, 1):
let err = osLastError()
setSockOpt2(localSock, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
raiseTransportOsError(error)
if ServerFlags.Broadcast in flags:
if not setSockOpt(localSock, osdefs.SOL_SOCKET, osdefs.SO_BROADCAST, 1):
let err = osLastError()
setSockOpt2(localSock, SOL_SOCKET, SO_BROADCAST, 1).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(err)
raiseTransportOsError(error)
if ttl > 0:
let tres =
if local.family == AddressFamily.IPv4:
setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IP_MULTICAST_TTL,
cint(ttl))
setSockOpt2(localSock, osdefs.IPPROTO_IP, osdefs.IP_MULTICAST_TTL,
cint(ttl)).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(error)
elif local.family == AddressFamily.IPv6:
setSockOpt(localSock, osdefs.IPPROTO_IP, osdefs.IPV6_MULTICAST_HOPS,
cint(ttl))
setSockOpt2(localSock, osdefs.IPPROTO_IP, osdefs.IPV6_MULTICAST_HOPS,
cint(ttl)).isOkOr:
if sock == asyncInvalidSocket:
closeSocket(localSock)
raiseTransportOsError(error)
else:
raiseAssert "Unsupported address bound to local socket"
if not tres:
let err = osLastError()
## IPV6_V6ONLY
if sock == asyncInvalidSocket:
setDualstack(localSock, local.family, dualstack).isOkOr:
closeSocket(localSock)
raiseTransportOsError(err)
raiseTransportOsError(error)
else:
setDualstack(localSock, dualstack).isOkOr:
raiseTransportOsError(error)
if local.family != AddressFamily.None:
var saddr: Sockaddr_storage
@ -594,7 +607,8 @@ proc newDatagramTransport*(cbproc: DatagramCallback,
udata: pointer = nil,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
## Create new UDP datagram transport (IPv4).
@ -610,7 +624,7 @@ proc newDatagramTransport*(cbproc: DatagramCallback,
## ``ttl`` - TTL for UDP datagram packet (only usable when flags has
## ``Broadcast`` option).
newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child,
bufSize, ttl)
bufSize, ttl, dualstack)
proc newDatagramTransport*[T](cbproc: DatagramCallback,
udata: ref T,
@ -620,13 +634,15 @@ proc newDatagramTransport*[T](cbproc: DatagramCallback,
flags: set[ServerFlags] = {},
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
var fflags = flags + {GCUserData}
GC_ref(udata)
newDatagramTransportCommon(cbproc, remote, local, sock, fflags,
cast[pointer](udata), child, bufSize, ttl)
cast[pointer](udata), child, bufSize, ttl,
dualstack)
proc newDatagramTransport6*(cbproc: DatagramCallback,
remote: TransportAddress = AnyAddress6,
@ -636,7 +652,8 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
udata: pointer = nil,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
## Create new UDP datagram transport (IPv6).
@ -652,7 +669,7 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
## ``ttl`` - TTL for UDP datagram packet (only usable when flags has
## ``Broadcast`` option).
newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child,
bufSize, ttl)
bufSize, ttl, dualstack)
proc newDatagramTransport6*[T](cbproc: DatagramCallback,
udata: ref T,
@ -662,13 +679,15 @@ proc newDatagramTransport6*[T](cbproc: DatagramCallback,
flags: set[ServerFlags] = {},
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
var fflags = flags + {GCUserData}
GC_ref(udata)
newDatagramTransportCommon(cbproc, remote, local, sock, fflags,
cast[pointer](udata), child, bufSize, ttl)
cast[pointer](udata), child, bufSize, ttl,
dualstack)
proc join*(transp: DatagramTransport): Future[void] =
## Wait until the transport ``transp`` will be closed.

View File

@ -639,6 +639,7 @@ when defined(windows):
child: StreamTransport = nil,
localAddress = TransportAddress(),
flags: set[SocketFlags] = {},
dualstack = DualStackType.Auto
): Future[StreamTransport] =
## Open new connection to remote peer with address ``address`` and create
## new transport object ``StreamTransport`` for established connection.
@ -658,23 +659,32 @@ when defined(windows):
toSAddr(raddress, saddr, slen)
proto = Protocol.IPPROTO_TCP
sock = createAsyncSocket(raddress.getDomain(), SockType.SOCK_STREAM,
proto)
if sock == asyncInvalidSocket:
retFuture.fail(getTransportOsError(osLastError()))
sock = createAsyncSocket2(raddress.getDomain(), SockType.SOCK_STREAM,
proto).valueOr:
retFuture.fail(getTransportOsError(error))
return retFuture
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
if SocketFlags.TcpNoDelay in flags:
setSockOpt2(sock, osdefs.IPPROTO_TCP, osdefs.TCP_NODELAY, 1).isOkOr:
sock.closeSocket()
retFuture.fail(getTransportOsError(error))
return retFuture
if SocketFlags.ReuseAddr in flags:
if not(setSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1)):
let err = osLastError()
setSockOpt2(sock, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr:
sock.closeSocket()
retFuture.fail(getTransportOsError(err))
retFuture.fail(getTransportOsError(error))
return retFuture
if SocketFlags.ReusePort in flags:
if not(setSockOpt(sock, SOL_SOCKET, SO_REUSEPORT, 1)):
let err = osLastError()
setSockOpt2(sock, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr:
sock.closeSocket()
retFuture.fail(getTransportOsError(err))
retFuture.fail(getTransportOsError(error))
return retFuture
# IPV6_V6ONLY.
setDualstack(sock, address.family, dualstack).isOkOr:
sock.closeSocket()
retFuture.fail(getTransportOsError(error))
return retFuture
if localAddress != TransportAddress():
@ -966,14 +976,9 @@ when defined(windows):
if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}:
server.apending = true
# TODO No way to report back errors!
server.asock =
block:
let sock = createAsyncSocket(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
if sock == asyncInvalidSocket:
raiseOsDefect(osLastError(),
"acceptLoop(): Unablet to create new socket")
sock
server.asock = createAsyncSocket2(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP).valueOr:
raiseOsDefect(error, "acceptLoop(): Unablet to create new socket")
var dwBytesReceived = DWORD(0)
let dwReceiveDataLength = DWORD(0)
@ -1167,15 +1172,13 @@ when defined(windows):
if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
# TCP Sockets part
var loop = getThreadDispatcher()
server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
if server.asock == asyncInvalidSocket:
let err = osLastError()
case err
server.asock = createAsyncSocket2(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP).valueOr:
case error
of ERROR_TOO_MANY_OPEN_FILES, WSAENOBUFS, WSAEMFILE:
retFuture.fail(getTransportTooManyError(err))
retFuture.fail(getTransportTooManyError(error))
else:
retFuture.fail(getTransportOsError(err))
retFuture.fail(getTransportOsError(error))
return retFuture
var dwBytesReceived = DWORD(0)
@ -1468,6 +1471,7 @@ else:
child: StreamTransport = nil,
localAddress = TransportAddress(),
flags: set[SocketFlags] = {},
dualstack = DualStackType.Auto,
): Future[StreamTransport] =
## Open new connection to remote peer with address ``address`` and create
## new transport object ``StreamTransport`` for established connection.
@ -1483,35 +1487,36 @@ else:
else:
Protocol.IPPROTO_TCP
let sock = createAsyncSocket(address.getDomain(), SockType.SOCK_STREAM,
proto)
if sock == asyncInvalidSocket:
let err = osLastError()
case err
let sock = createAsyncSocket2(address.getDomain(), SockType.SOCK_STREAM,
proto).valueOr:
case error
of oserrno.EMFILE:
retFuture.fail(getTransportTooManyError())
else:
retFuture.fail(getTransportOsError(err))
retFuture.fail(getTransportOsError(error))
return retFuture
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
if SocketFlags.TcpNoDelay in flags:
if not(setSockOpt(sock, osdefs.IPPROTO_TCP, osdefs.TCP_NODELAY, 1)):
let err = osLastError()
setSockOpt2(sock, osdefs.IPPROTO_TCP, osdefs.TCP_NODELAY, 1).isOkOr:
sock.closeSocket()
retFuture.fail(getTransportOsError(err))
retFuture.fail(getTransportOsError(error))
return retFuture
if SocketFlags.ReuseAddr in flags:
if not(setSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1)):
let err = osLastError()
setSockOpt2(sock, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr:
sock.closeSocket()
retFuture.fail(getTransportOsError(err))
retFuture.fail(getTransportOsError(error))
return retFuture
if SocketFlags.ReusePort in flags:
if not(setSockOpt(sock, SOL_SOCKET, SO_REUSEPORT, 1)):
let err = osLastError()
setSockOpt2(sock, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr:
sock.closeSocket()
retFuture.fail(getTransportOsError(err))
retFuture.fail(getTransportOsError(error))
return retFuture
# IPV6_V6ONLY.
setDualstack(sock, address.family, dualstack).isOkOr:
sock.closeSocket()
retFuture.fail(getTransportOsError(error))
return retFuture
if localAddress != TransportAddress():
@ -1532,17 +1537,14 @@ else:
proc continuation(udata: pointer) =
if not(retFuture.finished()):
var err = 0
let res = removeWriter2(sock)
if res.isErr():
removeWriter2(sock).isOkOr:
discard unregisterAndCloseFd(sock)
retFuture.fail(getTransportOsError(res.error()))
retFuture.fail(getTransportOsError(error))
return
if not(sock.getSocketError(err)):
let err = sock.getSocketError2().valueOr:
discard unregisterAndCloseFd(sock)
retFuture.fail(getTransportOsError(res.error()))
retFuture.fail(getTransportOsError(error))
return
if err != 0:
@ -1578,10 +1580,9 @@ else:
# http://www.madore.org/~david/computers/connect-intr.html
case errorCode
of oserrno.EINPROGRESS, oserrno.EINTR:
let res = addWriter2(sock, continuation)
if res.isErr():
addWriter2(sock, continuation).isOkOr:
discard unregisterAndCloseFd(sock)
retFuture.fail(getTransportOsError(res.error()))
retFuture.fail(getTransportOsError(error))
return retFuture
retFuture.cancelCallback = cancel
break
@ -1782,11 +1783,13 @@ proc connect*(address: TransportAddress,
bufferSize = DefaultStreamBufferSize,
child: StreamTransport = nil,
flags: set[TransportFlags],
localAddress = TransportAddress()): Future[StreamTransport] =
localAddress = TransportAddress(),
dualstack = DualStackType.Auto
): Future[StreamTransport] =
# Retro compatibility with TransportFlags
var mappedFlags: set[SocketFlags]
if TcpNoDelay in flags: mappedFlags.incl(SocketFlags.TcpNoDelay)
address.connect(bufferSize, child, localAddress, mappedFlags)
connect(address, bufferSize, child, localAddress, mappedFlags, dualstack)
proc close*(server: StreamServer) =
## Release ``server`` resources.
@ -1848,7 +1851,8 @@ proc createStreamServer*(host: TransportAddress,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil,
udata: pointer = nil): StreamServer {.
udata: pointer = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [TransportOsError].} =
## Create new TCP stream server.
##
@ -1874,42 +1878,48 @@ proc createStreamServer*(host: TransportAddress,
elif defined(windows):
# Windows
if host.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
serverSocket =
if sock == asyncInvalidSocket:
serverSocket = createAsyncSocket(host.getDomain(),
SockType.SOCK_STREAM,
# TODO (cheatfate): `valueOr` generates weird compile error.
let res = createAsyncSocket2(host.getDomain(), SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
if serverSocket == asyncInvalidSocket:
raiseTransportOsError(osLastError())
if res.isErr():
raiseTransportOsError(res.error())
res.get()
else:
let bres = setDescriptorBlocking(SocketHandle(sock), false)
if bres.isErr():
raiseTransportOsError(bres.error())
let wres = register2(sock)
if wres.isErr():
raiseTransportOsError(wres.error())
serverSocket = sock
# SO_REUSEADDR is not useful for Unix domain sockets.
setDescriptorBlocking(SocketHandle(sock), false).isOkOr:
raiseTransportOsError(error)
register2(sock).isOkOr:
raiseTransportOsError(error)
sock
# SO_REUSEADDR
if ServerFlags.ReuseAddr in flags:
if not(setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1)):
let err = osLastError()
setSockOpt2(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr:
if sock == asyncInvalidSocket:
discard closeFd(SocketHandle(serverSocket))
raiseTransportOsError(err)
raiseTransportOsError(error)
# SO_REUSEPORT
if ServerFlags.ReusePort in flags:
if not(setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEPORT, 1)):
let err = osLastError()
setSockOpt2(serverSocket, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr:
if sock == asyncInvalidSocket:
discard closeFd(SocketHandle(serverSocket))
raiseTransportOsError(err)
# TCP flags are not useful for Unix domain sockets.
raiseTransportOsError(error)
# TCP_NODELAY
if ServerFlags.TcpNoDelay in flags:
if not(setSockOpt(serverSocket, osdefs.IPPROTO_TCP,
osdefs.TCP_NODELAY, 1)):
let err = osLastError()
setSockOpt2(serverSocket, osdefs.IPPROTO_TCP,
osdefs.TCP_NODELAY, 1).isOkOr:
if sock == asyncInvalidSocket:
discard closeFd(SocketHandle(serverSocket))
raiseTransportOsError(err)
raiseTransportOsError(error)
# IPV6_V6ONLY.
if sock == asyncInvalidSocket:
setDualstack(serverSocket, host.family, dualstack).isOkOr:
discard closeFd(SocketHandle(serverSocket))
raiseTransportOsError(error)
else:
setDualstack(serverSocket, dualstack).isOkOr:
raiseTransportOsError(error)
host.toSAddr(saddr, slen)
if bindSocket(SocketHandle(serverSocket),
cast[ptr SockAddr](addr saddr), slen) != 0:
@ -1936,47 +1946,54 @@ proc createStreamServer*(host: TransportAddress,
serverSocket = AsyncFD(0)
else:
# Posix
serverSocket =
if sock == asyncInvalidSocket:
let proto = if host.family == AddressFamily.Unix:
Protocol.IPPROTO_IP
else:
Protocol.IPPROTO_TCP
serverSocket = createAsyncSocket(host.getDomain(),
SockType.SOCK_STREAM,
# TODO (cheatfate): `valueOr` generates weird compile error.
let res = createAsyncSocket2(host.getDomain(), SockType.SOCK_STREAM,
proto)
if serverSocket == asyncInvalidSocket:
raiseTransportOsError(osLastError())
if res.isErr():
raiseTransportOsError(res.error())
res.get()
else:
let bres = setDescriptorFlags(cint(sock), true, true)
if bres.isErr():
raiseTransportOsError(osLastError())
let rres = register2(sock)
if rres.isErr():
raiseTransportOsError(osLastError())
serverSocket = sock
setDescriptorFlags(cint(sock), true, true).isOkOr:
raiseTransportOsError(error)
register2(sock).isOkOr:
raiseTransportOsError(error)
sock
if host.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
# SO_REUSEADDR and SO_REUSEPORT are not useful for Unix domain sockets.
# SO_REUSEADDR
if ServerFlags.ReuseAddr in flags:
if not(setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1)):
let err = osLastError()
setSockOpt2(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1).isOkOr:
if sock == asyncInvalidSocket:
discard unregisterAndCloseFd(serverSocket)
raiseTransportOsError(err)
raiseTransportOsError(error)
# SO_REUSEPORT
if ServerFlags.ReusePort in flags:
if not(setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEPORT, 1)):
let err = osLastError()
setSockOpt2(serverSocket, SOL_SOCKET, SO_REUSEPORT, 1).isOkOr:
if sock == asyncInvalidSocket:
discard unregisterAndCloseFd(serverSocket)
raiseTransportOsError(err)
# TCP flags are not useful for Unix domain sockets.
raiseTransportOsError(error)
# TCP_NODELAY
if ServerFlags.TcpNoDelay in flags:
if not(setSockOpt(serverSocket, osdefs.IPPROTO_TCP,
osdefs.TCP_NODELAY, 1)):
let err = osLastError()
setSockOpt2(serverSocket, osdefs.IPPROTO_TCP,
osdefs.TCP_NODELAY, 1).isOkOr:
if sock == asyncInvalidSocket:
discard unregisterAndCloseFd(serverSocket)
raiseTransportOsError(err)
raiseTransportOsError(error)
# IPV6_V6ONLY
if sock == asyncInvalidSocket:
setDualstack(serverSocket, host.family, dualstack).isOkOr:
discard closeFd(SocketHandle(serverSocket))
raiseTransportOsError(error)
else:
setDualstack(serverSocket, dualstack).isOkOr:
raiseTransportOsError(error)
elif host.family in {AddressFamily.Unix}:
# We do not care about result here, because if file cannot be removed,
# `bindSocket` will return EADDRINUSE.
@ -2016,6 +2033,7 @@ proc createStreamServer*(host: TransportAddress,
sres.status = Starting
sres.loopFuture = newFuture[void]("stream.transport.server")
sres.udata = udata
sres.dualstack = dualstack
if localAddress.family == AddressFamily.None:
sres.local = host
else:
@ -2029,8 +2047,7 @@ proc createStreamServer*(host: TransportAddress,
cb = acceptPipeLoop
if not(isNil(cbproc)):
sres.aovl.data = CompletionData(cb: cb,
udata: cast[pointer](sres))
sres.aovl.data = CompletionData(cb: cb, udata: cast[pointer](sres))
else:
if host.family == AddressFamily.Unix:
sres.sock =
@ -2055,10 +2072,11 @@ proc createStreamServer*(host: TransportAddress,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil,
udata: pointer = nil): StreamServer {.
udata: pointer = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [CatchableError].} =
createStreamServer(host, nil, flags, sock, backlog, bufferSize,
child, init, cast[pointer](udata))
child, init, cast[pointer](udata), dualstack)
proc createStreamServer*[T](host: TransportAddress,
cbproc: StreamCallback,
@ -2068,12 +2086,13 @@ proc createStreamServer*[T](host: TransportAddress,
backlog: int = DefaultBacklogSize,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer {.
init: TransportInitCallback = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [CatchableError].} =
var fflags = flags + {GCUserData}
GC_ref(udata)
createStreamServer(host, cbproc, fflags, sock, backlog, bufferSize,
child, init, cast[pointer](udata))
child, init, cast[pointer](udata), dualstack)
proc createStreamServer*[T](host: TransportAddress,
flags: set[ServerFlags] = {},
@ -2082,12 +2101,13 @@ proc createStreamServer*[T](host: TransportAddress,
backlog: int = DefaultBacklogSize,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer {.
init: TransportInitCallback = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [CatchableError].} =
var fflags = flags + {GCUserData}
GC_ref(udata)
createStreamServer(host, nil, fflags, sock, backlog, bufferSize,
child, init, cast[pointer](udata))
child, init, cast[pointer](udata), dualstack)
proc getUserData*[T](server: StreamServer): T {.inline.} =
## Obtain user data stored in ``server`` object.

View File

@ -533,6 +533,54 @@ suite "Datagram Transport test suite":
result = res
proc performDualstackTest(
sstack: DualStackType, saddr: TransportAddress,
cstack: DualStackType, caddr: TransportAddress
): Future[bool] {.async.} =
var
expectStr = "ANYADDRESS MESSAGE"
event = newAsyncEvent()
res = 0
proc process1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var bmsg = transp.getMessage()
var smsg = cast[string](bmsg)
if smsg == expectStr:
inc(res)
event.fire()
proc process2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
discard
let
sdgram = newDatagramTransport(process1, local = saddr,
dualstack = sstack)
localcaddr =
if caddr.family == AddressFamily.IPv4:
AnyAddress
else:
AnyAddress6
cdgram = newDatagramTransport(process2, local = localcaddr,
dualstack = cstack)
var address = caddr
address.port = sdgram.localAddress().port
try:
await cdgram.sendTo(address, addr expectStr[0], len(expectStr))
except CatchableError:
discard
try:
await event.wait().wait(500.milliseconds)
except CatchableError:
discard
await allFutures(sdgram.closeWait(), cdgram.closeWait())
res == 1
test "close(transport) test":
check waitFor(testTransportClose()) == true
test m1:
@ -557,5 +605,83 @@ suite "Datagram Transport test suite":
check waitFor(testBroadcast()) == 1
test "0.0.0.0/::0 (INADDR_ANY) test":
check waitFor(testAnyAddress()) == 6
asyncTest "[IP] getDomain(socket) [SOCK_DGRAM] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
block:
let res = createAsyncSocket2(Domain.AF_INET, SockType.SOCK_DGRAM,
Protocol.IPPROTO_UDP)
check res.isOk()
let fres = getDomain(res.get())
check fres.isOk()
discard unregisterAndCloseFd(res.get())
check fres.get() == AddressFamily.IPv4
block:
let res = createAsyncSocket2(Domain.AF_INET6, SockType.SOCK_DGRAM,
Protocol.IPPROTO_UDP)
check res.isOk()
let fres = getDomain(res.get())
check fres.isOk()
discard unregisterAndCloseFd(res.get())
check fres.get() == AddressFamily.IPv6
when not(defined(windows)):
block:
let res = createAsyncSocket2(Domain.AF_UNIX, SockType.SOCK_DGRAM,
Protocol.IPPROTO_IP)
check res.isOk()
let fres = getDomain(res.get())
check fres.isOk()
discard unregisterAndCloseFd(res.get())
check fres.get() == AddressFamily.Unix
else:
skip()
asyncTest "[IP] DualStack [UDP] server [DualStackType.Auto] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
let serverAddress = initTAddress("[::]:0")
check:
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0"))) == true
check:
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == true
check:
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Auto, initTAddress("[::1]:0"))) == true
else:
skip()
asyncTest "[IP] DualStack [UDP] server [DualStackType.Enabled] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
let serverAddress = initTAddress("[::]:0")
check:
(await performDualstackTest(
DualStackType.Enabled, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0"))) == true
(await performDualstackTest(
DualStackType.Enabled, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == true
(await performDualstackTest(
DualStackType.Enabled, serverAddress,
DualStackType.Auto, initTAddress("[::1]:0"))) == true
else:
skip()
asyncTest "[IP] DualStack [UDP] server [DualStackType.Disabled] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
let serverAddress = initTAddress("[::]:0")
check:
(await performDualstackTest(
DualStackType.Disabled, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0"))) == false
(await performDualstackTest(
DualStackType.Disabled, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == false
(await performDualstackTest(
DualStackType.Disabled, serverAddress,
DualStackType.Auto, initTAddress("[::1]:0"))) == true
else:
skip()
test "Transports leak test":
checkLeaks()

View File

@ -1372,6 +1372,42 @@ suite "Stream Transport test suite":
if not(sleepFut.finished()):
await cancelAndWait(sleepFut)
proc performDualstackTest(
sstack: DualStackType, saddr: TransportAddress,
cstack: DualStackType, caddr: TransportAddress
): Future[bool] {.async.} =
let server = createStreamServer(saddr, dualstack = sstack)
var address = caddr
address.port = server.localAddress().port
var acceptFut = server.accept()
let
clientTransp =
try:
let res = await connect(address,
dualstack = cstack).wait(500.milliseconds)
Opt.some(res)
except CatchableError:
Opt.none(StreamTransport)
serverTransp =
if clientTransp.isSome():
let res = await acceptFut
Opt.some(res)
else:
Opt.none(StreamTransport)
let testResult = clientTransp.isSome() and serverTransp.isSome()
var pending: seq[FutureBase]
if clientTransp.isSome():
pending.add(closeWait(clientTransp.get()))
if serverTransp.isSome():
pending.add(closeWait(serverTransp.get()))
else:
pending.add(cancelAndWait(acceptFut))
await allFutures(pending)
server.stop()
await server.closeWait()
testResult
markFD = getCurrentFD()
for i in 0..<len(addresses):
@ -1469,6 +1505,97 @@ suite "Stream Transport test suite":
waitFor(testConnectCancelLeaksTest())
test "[IP] accept() cancellation leaks test":
waitFor(testAcceptCancelLeaksTest())
asyncTest "[IP] getDomain(socket) [SOCK_STREAM] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
block:
let res = createAsyncSocket2(Domain.AF_INET, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
check res.isOk()
let fres = getDomain(res.get())
check fres.isOk()
discard unregisterAndCloseFd(res.get())
check fres.get() == AddressFamily.IPv4
block:
let res = createAsyncSocket2(Domain.AF_INET6, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
check res.isOk()
let fres = getDomain(res.get())
check fres.isOk()
discard unregisterAndCloseFd(res.get())
check fres.get() == AddressFamily.IPv6
when not(defined(windows)):
block:
let res = createAsyncSocket2(Domain.AF_UNIX, SockType.SOCK_STREAM,
Protocol.IPPROTO_IP)
check res.isOk()
let fres = getDomain(res.get())
check fres.isOk()
discard unregisterAndCloseFd(res.get())
check fres.get() == AddressFamily.Unix
else:
skip()
asyncTest "[IP] DualStack [TCP] server [DualStackType.Auto] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
let serverAddress = initTAddress("[::]:0")
check:
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0"))) == true
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == true
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Auto, initTAddress("[::1]:0"))) == true
else:
skip()
asyncTest "[IP] DualStack [TCP] server [DualStackType.Enabled] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
let serverAddress = initTAddress("[::]:0")
check:
(await performDualstackTest(
DualStackType.Enabled, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0"))) == true
(await performDualstackTest(
DualStackType.Enabled, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == true
(await performDualstackTest(
DualStackType.Enabled, serverAddress,
DualStackType.Auto, initTAddress("[::1]:0"))) == true
else:
skip()
asyncTest "[IP] DualStack [TCP] server [DualStackType.Disabled] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
let serverAddress = initTAddress("[::]:0")
check:
(await performDualstackTest(
DualStackType.Disabled, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0"))) == false
(await performDualstackTest(
DualStackType.Disabled, serverAddress,
DualStackType.Auto, initTAddress("127.0.0.1:0").toIPv6())) == false
(await performDualstackTest(
DualStackType.Disabled, serverAddress,
DualStackType.Auto, initTAddress("[::1]:0"))) == true
else:
skip()
asyncTest "[IP] DualStack [TCP] connect [IPv4 mapped address] test":
if isAvailable(AddressFamily.IPv4) and isAvailable(AddressFamily.IPv6):
let serverAddress = initTAddress("[::]:0")
check:
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Disabled, initTAddress("127.0.0.1:0"))) == true
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Disabled, initTAddress("127.0.0.1:0").toIPv6())) == false
(await performDualstackTest(
DualStackType.Auto, serverAddress,
DualStackType.Disabled, initTAddress("[::1]:0"))) == true
else:
skip()
test "Leaks test":
checkLeaks()
test "File descriptors leak test":