Add AF_UNIX sockets support.
Add Windows emulation of AF_UNIX sockets via Named Pipes. Add tests for AF_UNIX sockets. TransportAddress object change.
This commit is contained in:
parent
d2667409ea
commit
a4c27806ea
|
@ -17,7 +17,7 @@ import asyncfutures2 except callSoon
|
||||||
import nativesockets, net, deques
|
import nativesockets, net, deques
|
||||||
|
|
||||||
export Port, SocketFlag
|
export Port, SocketFlag
|
||||||
export asyncfutures2
|
export asyncfutures2, timer
|
||||||
|
|
||||||
#{.injectStmt: newGcInvariant().}
|
#{.injectStmt: newGcInvariant().}
|
||||||
|
|
||||||
|
@ -409,6 +409,15 @@ when defined(windows) or defined(nimdoc):
|
||||||
var acb = AsyncCallback(function: aftercb)
|
var acb = AsyncCallback(function: aftercb)
|
||||||
loop.callbacks.addLast(acb)
|
loop.callbacks.addLast(acb)
|
||||||
|
|
||||||
|
proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
|
||||||
|
## Closes a (pipe/file) handle and ensures that it is unregistered.
|
||||||
|
let loop = getGlobalDispatcher()
|
||||||
|
loop.handles.excl(fd)
|
||||||
|
doAssert closeHandle(Handle(fd)) == 1
|
||||||
|
if not isNil(aftercb):
|
||||||
|
var acb = AsyncCallback(function: aftercb)
|
||||||
|
loop.callbacks.addLast(acb)
|
||||||
|
|
||||||
proc unregister*(fd: AsyncFD) =
|
proc unregister*(fd: AsyncFD) =
|
||||||
## Unregisters ``fd``.
|
## Unregisters ``fd``.
|
||||||
getGlobalDispatcher().handles.excl(fd)
|
getGlobalDispatcher().handles.excl(fd)
|
||||||
|
|
|
@ -6,11 +6,9 @@
|
||||||
# Licensed under either of
|
# Licensed under either of
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
# MIT license (LICENSE-MIT)
|
# MIT license (LICENSE-MIT)
|
||||||
|
from net import IpAddressFamily, IpAddress, `$`, parseIpAddress
|
||||||
import os, net, strutils
|
import os, strutils, nativesockets
|
||||||
from nativesockets import toInt
|
|
||||||
import ../asyncloop
|
import ../asyncloop
|
||||||
export net
|
|
||||||
|
|
||||||
when defined(windows):
|
when defined(windows):
|
||||||
import winlean
|
import winlean
|
||||||
|
@ -25,12 +23,24 @@ const
|
||||||
type
|
type
|
||||||
ServerFlags* = enum
|
ServerFlags* = enum
|
||||||
## Server's flags
|
## Server's flags
|
||||||
ReuseAddr, ReusePort, TcpNoDelay, NoAutoRead, GCUserData
|
ReuseAddr, ReusePort, TcpNoDelay, NoAutoRead, GCUserData, FirstPipe,
|
||||||
|
NoPipeFlash
|
||||||
|
|
||||||
|
AddressFamily* {.pure.} = enum
|
||||||
|
None, IPv4, IPv6, Unix
|
||||||
|
|
||||||
TransportAddress* = object
|
TransportAddress* = object
|
||||||
## Transport network address
|
## Transport network address
|
||||||
address*: IpAddress # IP Address
|
case family*: AddressFamily
|
||||||
port*: Port # IP port
|
of AddressFamily.None:
|
||||||
|
discard
|
||||||
|
of AddressFamily.IPv4:
|
||||||
|
address_v4*: array[4, uint8]
|
||||||
|
of AddressFamily.IPv6:
|
||||||
|
address_v6*: array[16, uint8]
|
||||||
|
of AddressFamily.Unix:
|
||||||
|
address_un*: array[108, uint8]
|
||||||
|
port*: Port # Port number
|
||||||
|
|
||||||
ServerCommand* = enum
|
ServerCommand* = enum
|
||||||
## Server's commands
|
## Server's commands
|
||||||
|
@ -94,6 +104,8 @@ type
|
||||||
TransportAddressError* = object of TransportError
|
TransportAddressError* = object of TransportError
|
||||||
## Transport's address specific exception
|
## Transport's address specific exception
|
||||||
code*: OSErrorCode
|
code*: OSErrorCode
|
||||||
|
TransportNoSupport* = object of TransportError
|
||||||
|
## Transport's capability not supported exception
|
||||||
|
|
||||||
TransportState* = enum
|
TransportState* = enum
|
||||||
## Transport's state
|
## Transport's state
|
||||||
|
@ -108,85 +120,154 @@ type
|
||||||
WriteError # Write error
|
WriteError # Write error
|
||||||
|
|
||||||
var
|
var
|
||||||
AnyAddress* = TransportAddress(
|
AnyAddress* = TransportAddress(family: AddressFamily.IPv4, port: Port(0))
|
||||||
address: IpAddress(family: IpAddressFamily.IPv4), port: Port(0)
|
## Default INADDR_ANY address for IPv4
|
||||||
) ## Default INADDR_ANY address for IPv4
|
AnyAddress6* = TransportAddress(family: AddressFamily.IPv6, port: Port(0))
|
||||||
AnyAddress6* = TransportAddress(
|
## Default INADDR_ANY address for IPv6
|
||||||
address: IpAddress(family: IpAddressFamily.IPv6), port: Port(0)
|
|
||||||
) ## Default INADDR_ANY address for IPv6
|
|
||||||
|
|
||||||
proc getDomain*(address: IpAddress): Domain =
|
proc `==`*(lhs, rhs: TransportAddress): bool =
|
||||||
## Returns OS specific Domain from IP Address.
|
## Compare two transport addresses ``lhs`` and ``rhs``. Return ``true`` if
|
||||||
case address.family
|
## addresses are equal.
|
||||||
of IpAddressFamily.IPv4:
|
if lhs.family != lhs.family:
|
||||||
result = Domain.AF_INET
|
return false
|
||||||
of IpAddressFamily.IPv6:
|
if lhs.family == AddressFamily.IPv4:
|
||||||
result = Domain.AF_INET6
|
result = equalMem(unsafeAddr lhs.address_v4[0],
|
||||||
|
unsafeAddr rhs.address_v4[0], sizeof(lhs.address_v4)) and
|
||||||
|
(lhs.port == rhs.port)
|
||||||
|
elif lhs.family == AddressFamily.IPv6:
|
||||||
|
result = equalMem(unsafeAddr lhs.address_v6[0],
|
||||||
|
unsafeAddr rhs.address_v6[0], sizeof(lhs.address_v6)) and
|
||||||
|
(lhs.port == rhs.port)
|
||||||
|
elif lhs.family == AddressFamily.Unix:
|
||||||
|
result = equalMem(unsafeAddr lhs.address_un[0],
|
||||||
|
unsafeAddr rhs.address_un[0], sizeof(lhs.address_un))
|
||||||
|
|
||||||
proc getDomain*(address: TransportAddress): Domain =
|
proc getDomain*(address: TransportAddress): Domain =
|
||||||
## Returns OS specific Domain from TransportAddress.
|
## Returns OS specific Domain from TransportAddress.
|
||||||
result = address.address.getDomain()
|
case address.family
|
||||||
|
of AddressFamily.IPv4:
|
||||||
|
result = Domain.AF_INET
|
||||||
|
of AddressFamily.IPv6:
|
||||||
|
result = Domain.AF_INET6
|
||||||
|
of AddressFamily.Unix:
|
||||||
|
when defined(windows):
|
||||||
|
result = cast[Domain](1)
|
||||||
|
else:
|
||||||
|
result = Domain.AF_UNIX
|
||||||
|
else:
|
||||||
|
result = cast[Domain](0)
|
||||||
|
|
||||||
proc `$`*(address: TransportAddress): string =
|
proc `$`*(address: TransportAddress): string =
|
||||||
## Returns string representation of ``address``.
|
## Returns string representation of ``address``.
|
||||||
case address.address.family
|
case address.family
|
||||||
of IpAddressFamily.IPv4:
|
of AddressFamily.IPv4:
|
||||||
result = $address.address
|
var a = IpAddress(
|
||||||
|
family: IpAddressFamily.IPv4,
|
||||||
|
address_v4: address.address_v4
|
||||||
|
)
|
||||||
|
result = $a
|
||||||
result.add(":")
|
result.add(":")
|
||||||
of IpAddressFamily.IPv6:
|
result.add($int(address.port))
|
||||||
result = "[" & $address.address & "]"
|
of AddressFamily.IPv6:
|
||||||
result.add(":")
|
var a = IpAddress(family: IpAddressFamily.IPv6,
|
||||||
result.add($int(address.port))
|
address_v6: address.address_v6)
|
||||||
|
result = "[" & $a & "]:"
|
||||||
|
result.add($(int(address.port)))
|
||||||
|
of AddressFamily.Unix:
|
||||||
|
const length = sizeof(address.address_un) + 1
|
||||||
|
var buffer: array[length, char]
|
||||||
|
if not equalMem(addr buffer[0], unsafeAddr address.address_un[0],
|
||||||
|
sizeof(address.address_un)):
|
||||||
|
copyMem(addr buffer[0], unsafeAddr address.address_un[0],
|
||||||
|
sizeof(address.address_un))
|
||||||
|
result = $cast[cstring](addr buffer)
|
||||||
|
else:
|
||||||
|
result = ""
|
||||||
|
else:
|
||||||
|
raise newException(TransportAddressError, "Unknown address family!")
|
||||||
|
|
||||||
proc initTAddress*(address: string): TransportAddress =
|
proc initTAddress*(address: string): TransportAddress =
|
||||||
## Parses string representation of ``address``.
|
## Parses string representation of ``address``. ``address`` can be IPv4, IPv6
|
||||||
|
## or Unix domain address.
|
||||||
##
|
##
|
||||||
## IPv4 transport address format is ``a.b.c.d:port``.
|
## IPv4 transport address format is ``a.b.c.d:port``.
|
||||||
## IPv6 transport address format is ``[::]:port``.
|
## IPv6 transport address format is ``[::]:port``.
|
||||||
var parts = address.rsplit(":", maxsplit = 1)
|
## Unix transport address format is ``/address``.
|
||||||
if len(parts) != 2:
|
if len(address) > 0:
|
||||||
raise newException(TransportAddressError, "Format is <address>:<port>!")
|
if address[0] == '/':
|
||||||
|
result = TransportAddress(family: AddressFamily.Unix, port: Port(1))
|
||||||
try:
|
let size = if len(address) < (sizeof(result.address_un) - 1): len(address)
|
||||||
let port = parseInt(parts[1])
|
else: (sizeof(result.address_un) - 1)
|
||||||
doAssert(port > 0 and port < 65536)
|
copyMem(addr result.address_un[0], unsafeAddr address[0], size)
|
||||||
result.port = Port(port)
|
|
||||||
except:
|
|
||||||
raise newException(TransportAddressError, "Illegal port number!")
|
|
||||||
|
|
||||||
try:
|
|
||||||
if parts[0][0] == '[' and parts[0][^1] == ']':
|
|
||||||
result.address = parseIpAddress(parts[0][1..^2])
|
|
||||||
else:
|
else:
|
||||||
result.address = parseIpAddress(parts[0])
|
var port: Port
|
||||||
except:
|
var parts = address.rsplit(":", maxsplit = 1)
|
||||||
raise newException(TransportAddressError, getCurrentException().msg)
|
if len(parts) != 2:
|
||||||
|
raise newException(TransportAddressError,
|
||||||
|
"Format is <address>:<port> or </address>!")
|
||||||
|
|
||||||
|
try:
|
||||||
|
let portint = parseInt(parts[1])
|
||||||
|
doAssert(portint > 0 and portint < 65536)
|
||||||
|
port = Port(portint)
|
||||||
|
except:
|
||||||
|
raise newException(TransportAddressError, "Illegal port number!")
|
||||||
|
|
||||||
|
try:
|
||||||
|
var ipaddr: IpAddress
|
||||||
|
if parts[0][0] == '[' and parts[0][^1] == ']':
|
||||||
|
ipaddr = parseIpAddress(parts[0][1..^2])
|
||||||
|
else:
|
||||||
|
ipaddr = parseIpAddress(parts[0])
|
||||||
|
if ipaddr.family == IpAddressFamily.IPv4:
|
||||||
|
result = TransportAddress(family: AddressFamily.IPv4)
|
||||||
|
result.address_v4 = ipaddr.address_v4
|
||||||
|
elif ipaddr.family == IpAddressFamily.IPv6:
|
||||||
|
result = TransportAddress(family: AddressFamily.IPv6)
|
||||||
|
result.address_v6 = ipaddr.address_v6
|
||||||
|
else:
|
||||||
|
raise newException(TransportAddressError, "Incorrect address family!")
|
||||||
|
result.port = port
|
||||||
|
except:
|
||||||
|
raise newException(TransportAddressError, getCurrentException().msg)
|
||||||
|
else:
|
||||||
|
result = TransportAddress(family: AddressFamily.Unix)
|
||||||
|
|
||||||
proc initTAddress*(address: string, port: Port): TransportAddress =
|
proc initTAddress*(address: string, port: Port): TransportAddress =
|
||||||
## Initialize ``TransportAddress`` with IP address ``address`` and
|
## Initialize ``TransportAddress`` with IP (IPv4 or IPv6) address ``address``
|
||||||
## port number ``port``.
|
## and port number ``port``.
|
||||||
try:
|
try:
|
||||||
result.address = parseIpAddress(address)
|
var ipaddr = parseIpAddress(address)
|
||||||
result.port = port
|
if ipaddr.family == IpAddressFamily.IPv4:
|
||||||
|
result = TransportAddress(family: AddressFamily.IPv4, port: port)
|
||||||
|
result.address_v4 = ipaddr.address_v4
|
||||||
|
elif ipaddr.family == IpAddressFamily.IPv6:
|
||||||
|
result = TransportAddress(family: AddressFamily.IPv6, port: port)
|
||||||
|
result.address_v6 = ipaddr.address_v6
|
||||||
|
else:
|
||||||
|
raise newException(TransportAddressError, "Incorrect address family!")
|
||||||
except:
|
except:
|
||||||
raise newException(TransportAddressError, getCurrentException().msg)
|
raise newException(TransportAddressError, getCurrentException().msg)
|
||||||
|
|
||||||
proc initTAddress*(address: string, port: int): TransportAddress =
|
proc initTAddress*(address: string, port: int): TransportAddress {.inline.} =
|
||||||
## Initialize ``TransportAddress`` with IP address ``address`` and
|
## Initialize ``TransportAddress`` with IP (IPv4 or IPv6) address ``address``
|
||||||
## port number ``port``.
|
## and port number ``port``.
|
||||||
if port < 0 or port >= 65536:
|
if port < 0 or port >= 65536:
|
||||||
raise newException(TransportAddressError, "Illegal port number!")
|
raise newException(TransportAddressError, "Illegal port number!")
|
||||||
try:
|
else:
|
||||||
result.address = parseIpAddress(address)
|
result = initTAddress(address, Port(port))
|
||||||
result.port = Port(port)
|
|
||||||
except:
|
|
||||||
raise newException(TransportAddressError, getCurrentException().msg)
|
|
||||||
|
|
||||||
proc initTAddress*(address: IpAddress, port: Port): TransportAddress =
|
proc initTAddress*(address: IpAddress, port: Port): TransportAddress =
|
||||||
## Initialize ``TransportAddress`` with net.nim ``IpAddress`` and
|
## Initialize ``TransportAddress`` with net.nim ``IpAddress`` and
|
||||||
## port number ``port``.
|
## port number ``port``.
|
||||||
result.address = address
|
if address.family == IpAddressFamily.IPv4:
|
||||||
result.port = port
|
result = TransportAddress(family: AddressFamily.IPv4, port: port)
|
||||||
|
result.address_v4 = address.address_v4
|
||||||
|
elif address.family == IpAddressFamily.IPv6:
|
||||||
|
result = TransportAddress(family: AddressFamily.IPv6, port: port)
|
||||||
|
result.address_v6 = address.address_v6
|
||||||
|
else:
|
||||||
|
raise newException(TransportAddressError, "Incorrect address family!")
|
||||||
|
|
||||||
proc getAddrInfo(address: string, port: Port, domain: Domain,
|
proc getAddrInfo(address: string, port: Port, domain: Domain,
|
||||||
sockType: SockType = SockType.SOCK_STREAM,
|
sockType: SockType = SockType.SOCK_STREAM,
|
||||||
|
@ -205,8 +286,72 @@ proc getAddrInfo(address: string, port: Port, domain: Domain,
|
||||||
else:
|
else:
|
||||||
raise newException(TransportAddressError, $gai_strerror(gaiResult))
|
raise newException(TransportAddressError, $gai_strerror(gaiResult))
|
||||||
|
|
||||||
|
proc fromSAddr*(sa: ptr Sockaddr_storage, sl: Socklen,
|
||||||
|
address: var TransportAddress) =
|
||||||
|
## Set transport address ``address`` with value from OS specific socket
|
||||||
|
## address storage.
|
||||||
|
if int(sa.ss_family) == toInt(Domain.AF_INET) and
|
||||||
|
int(sl) == sizeof(Sockaddr_in):
|
||||||
|
address = TransportAddress(family: AddressFamily.IPv4)
|
||||||
|
let s = cast[ptr Sockaddr_in](sa)
|
||||||
|
copyMem(addr address.address_v4[0], addr s.sin_addr,
|
||||||
|
sizeof(address.address_v4))
|
||||||
|
address.port = Port(nativesockets.ntohs(s.sin_port))
|
||||||
|
elif int(sa.ss_family) == toInt(Domain.AF_INET6) and
|
||||||
|
int(sl) == sizeof(Sockaddr_in6):
|
||||||
|
address = TransportAddress(family: AddressFamily.IPv6)
|
||||||
|
let s = cast[ptr Sockaddr_in6](sa)
|
||||||
|
copyMem(addr address.address_v6[0], addr s.sin6_addr,
|
||||||
|
sizeof(address.address_v6))
|
||||||
|
address.port = Port(nativesockets.ntohs(s.sin6_port))
|
||||||
|
elif int(sa.ss_family) == toInt(Domain.AF_UNIX):
|
||||||
|
when not defined(windows):
|
||||||
|
address = TransportAddress(family: AddressFamily.Unix)
|
||||||
|
if int(sl) > sizeof(sa.ss_family):
|
||||||
|
var length = int(sl) - sizeof(sa.ss_family)
|
||||||
|
if length > (sizeof(address.address_un) - 1):
|
||||||
|
length = sizeof(address.address_un) - 1
|
||||||
|
let s = cast[ptr Sockaddr_un](sa)
|
||||||
|
copyMem(addr address.address_un[0], addr s.sun_path[0], length)
|
||||||
|
address.port = Port(1)
|
||||||
|
else:
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc toSAddr*(address: TransportAddress, sa: var Sockaddr_storage,
|
||||||
|
sl: var Socklen) =
|
||||||
|
## Set socket OS specific socket address storage with address from transport
|
||||||
|
## address ``address``.
|
||||||
|
case address.family
|
||||||
|
of AddressFamily.IPv4:
|
||||||
|
sl = Socklen(sizeof(Sockaddr_in))
|
||||||
|
let s = cast[ptr Sockaddr_in](addr sa)
|
||||||
|
s.sin_family = type(s.sin_family)(toInt(Domain.AF_INET))
|
||||||
|
s.sin_port = nativesockets.htons(uint16(address.port))
|
||||||
|
copyMem(addr s.sin_addr, unsafeAddr address.address_v4[0],
|
||||||
|
sizeof(s.sin_addr))
|
||||||
|
of AddressFamily.IPv6:
|
||||||
|
sl = Socklen(sizeof(Sockaddr_in6))
|
||||||
|
let s = cast[ptr Sockaddr_in6](addr sa)
|
||||||
|
s.sin6_family = type(s.sin6_family)(toInt(Domain.AF_INET6))
|
||||||
|
s.sin6_port = nativesockets.htons(uint16(address.port))
|
||||||
|
copyMem(addr s.sin6_addr, unsafeAddr address.address_v6[0],
|
||||||
|
sizeof(s.sin6_addr))
|
||||||
|
of AddressFamily.Unix:
|
||||||
|
when not defined(windows):
|
||||||
|
if address.port == Port(0):
|
||||||
|
sl = Socklen(sizeof(sa.ss_family))
|
||||||
|
else:
|
||||||
|
let s = cast[ptr Sockaddr_un](addr sa)
|
||||||
|
var name = cast[cstring](unsafeAddr address.address_un[0])
|
||||||
|
sl = Socklen(sizeof(sa.ss_family) + len(name) + 1)
|
||||||
|
s.sun_family = type(s.sun_family)(toInt(Domain.AF_UNIX))
|
||||||
|
copyMem(addr s.sun_path, unsafeAddr address.address_un[0],
|
||||||
|
len(name) + 1)
|
||||||
|
else:
|
||||||
|
discard
|
||||||
|
|
||||||
proc resolveTAddress*(address: string,
|
proc resolveTAddress*(address: string,
|
||||||
family = IpAddressFamily.IPv4): seq[TransportAddress] =
|
family = AddressFamily.IPv4): seq[TransportAddress] =
|
||||||
## Resolve string representation of ``address``.
|
## Resolve string representation of ``address``.
|
||||||
##
|
##
|
||||||
## Supported formats are:
|
## Supported formats are:
|
||||||
|
@ -220,6 +365,8 @@ proc resolveTAddress*(address: string,
|
||||||
hostname: string
|
hostname: string
|
||||||
port: int
|
port: int
|
||||||
|
|
||||||
|
doAssert(family in {AddressFamily.IPv4, AddressFamily.IPv6})
|
||||||
|
|
||||||
result = newSeq[TransportAddress]()
|
result = newSeq[TransportAddress]()
|
||||||
var parts = address.rsplit(":", maxsplit = 1)
|
var parts = address.rsplit(":", maxsplit = 1)
|
||||||
if len(parts) != 2:
|
if len(parts) != 2:
|
||||||
|
@ -237,14 +384,14 @@ proc resolveTAddress*(address: string,
|
||||||
else:
|
else:
|
||||||
hostname = parts[0]
|
hostname = parts[0]
|
||||||
|
|
||||||
var domain = if family == IpAddressFamily.IPv4: Domain.AF_INET else:
|
var domain = if family == AddressFamily.IPv4: Domain.AF_INET else:
|
||||||
Domain.AF_INET6
|
Domain.AF_INET6
|
||||||
var aiList = getAddrInfo(hostname, Port(port), domain)
|
var aiList = getAddrInfo(hostname, Port(port), domain)
|
||||||
var it = aiList
|
var it = aiList
|
||||||
while it != nil:
|
while it != nil:
|
||||||
var ta: TransportAddress
|
var ta: TransportAddress
|
||||||
fromSockAddr(cast[ptr Sockaddr_storage](it.ai_addr)[],
|
fromSAddr(cast[ptr Sockaddr_storage](it.ai_addr),
|
||||||
SockLen(it.ai_addrlen), ta.address, ta.port)
|
SockLen(it.ai_addrlen), ta)
|
||||||
# For some reason getAddrInfo() sometimes returns duplicate addresses,
|
# For some reason getAddrInfo() sometimes returns duplicate addresses,
|
||||||
# for example getAddrInfo(`localhost`) returns `127.0.0.1` twice.
|
# for example getAddrInfo(`localhost`) returns `127.0.0.1` twice.
|
||||||
if ta notin result:
|
if ta notin result:
|
||||||
|
@ -253,22 +400,24 @@ proc resolveTAddress*(address: string,
|
||||||
freeAddrInfo(aiList)
|
freeAddrInfo(aiList)
|
||||||
|
|
||||||
proc resolveTAddress*(address: string, port: Port,
|
proc resolveTAddress*(address: string, port: Port,
|
||||||
family = IpAddressFamily.IPv4): seq[TransportAddress] =
|
family = AddressFamily.IPv4): seq[TransportAddress] =
|
||||||
## Resolve string representation of ``address``.
|
## Resolve string representation of ``address``.
|
||||||
##
|
##
|
||||||
## ``address`` could be dot IPv4/IPv6 address or hostname.
|
## ``address`` could be dot IPv4/IPv6 address or hostname.
|
||||||
##
|
##
|
||||||
## If hostname address is detected, then network address translation via DNS
|
## If hostname address is detected, then network address translation via DNS
|
||||||
## will be performed.
|
## will be performed.
|
||||||
|
assert(family in {AddressFamily.IPv4, AddressFamily.IPv6})
|
||||||
|
|
||||||
result = newSeq[TransportAddress]()
|
result = newSeq[TransportAddress]()
|
||||||
var domain = if family == IpAddressFamily.IPv4: Domain.AF_INET else:
|
var domain = if family == AddressFamily.IPv4: Domain.AF_INET else:
|
||||||
Domain.AF_INET6
|
Domain.AF_INET6
|
||||||
var aiList = getAddrInfo(address, port, domain)
|
var aiList = getAddrInfo(address, port, domain)
|
||||||
var it = aiList
|
var it = aiList
|
||||||
while it != nil:
|
while it != nil:
|
||||||
var ta: TransportAddress
|
var ta: TransportAddress
|
||||||
fromSockAddr(cast[ptr Sockaddr_storage](it.ai_addr)[],
|
fromSAddr(cast[ptr Sockaddr_storage](it.ai_addr),
|
||||||
SockLen(it.ai_addrlen), ta.address, ta.port)
|
SockLen(it.ai_addrlen), ta)
|
||||||
# For some reason getAddrInfo() sometimes returns duplicate addresses,
|
# For some reason getAddrInfo() sometimes returns duplicate addresses,
|
||||||
# for example getAddrInfo(`localhost`) returns `127.0.0.1` twice.
|
# for example getAddrInfo(`localhost`) returns `127.0.0.1` twice.
|
||||||
if ta notin result:
|
if ta notin result:
|
||||||
|
@ -290,13 +439,6 @@ template getError*(t: untyped): ref Exception =
|
||||||
(t).error = nil
|
(t).error = nil
|
||||||
err
|
err
|
||||||
|
|
||||||
proc raiseTransportOsError*(err: OSErrorCode) =
|
|
||||||
## Raises transport specific OS error.
|
|
||||||
var msg = "(" & $int(err) & ") " & osErrorMsg(err)
|
|
||||||
var tre = newException(TransportOsError, msg)
|
|
||||||
tre.code = err
|
|
||||||
raise tre
|
|
||||||
|
|
||||||
template getTransportOsError*(err: OSErrorCode): ref TransportOsError =
|
template getTransportOsError*(err: OSErrorCode): ref TransportOsError =
|
||||||
var msg = "(" & $int(err) & ") " & osErrorMsg(err)
|
var msg = "(" & $int(err) & ") " & osErrorMsg(err)
|
||||||
var tre = newException(TransportOsError, msg)
|
var tre = newException(TransportOsError, msg)
|
||||||
|
@ -306,6 +448,10 @@ template getTransportOsError*(err: OSErrorCode): ref TransportOsError =
|
||||||
template getTransportOsError*(err: cint): ref TransportOsError =
|
template getTransportOsError*(err: cint): ref TransportOsError =
|
||||||
getTransportOsError(OSErrorCode(err))
|
getTransportOsError(OSErrorCode(err))
|
||||||
|
|
||||||
|
proc raiseTransportOsError*(err: OSErrorCode) =
|
||||||
|
## Raises transport specific OS error.
|
||||||
|
raise getTransportOsError(err)
|
||||||
|
|
||||||
type
|
type
|
||||||
SeqHeader = object
|
SeqHeader = object
|
||||||
length, reserved: int
|
length, reserved: int
|
||||||
|
@ -321,8 +467,28 @@ when defined(windows):
|
||||||
|
|
||||||
const
|
const
|
||||||
ERROR_OPERATION_ABORTED* = 995
|
ERROR_OPERATION_ABORTED* = 995
|
||||||
|
ERROR_PIPE_CONNECTED* = 535
|
||||||
|
ERROR_PIPE_BUSY* = 231
|
||||||
ERROR_SUCCESS* = 0
|
ERROR_SUCCESS* = 0
|
||||||
ERROR_CONNECTION_REFUSED* = 1225
|
ERROR_CONNECTION_REFUSED* = 1225
|
||||||
|
PIPE_TYPE_BYTE* = 0
|
||||||
|
PIPE_READMODE_BYTE* = 0
|
||||||
|
PIPE_TYPE_MESSAGE* = 0x4
|
||||||
|
PIPE_READMODE_MESSAGE* = 0x2
|
||||||
|
PIPE_WAIT* = 0
|
||||||
|
PIPE_UNLIMITED_INSTANCES* = 255
|
||||||
|
ERROR_BROKEN_PIPE* = 109
|
||||||
|
ERROR_PIPE_NOT_CONNECTED* = 233
|
||||||
|
ERROR_NO_DATA* = 232
|
||||||
|
|
||||||
proc cancelIo*(hFile: HANDLE): WINBOOL
|
proc cancelIo*(hFile: HANDLE): WINBOOL
|
||||||
{.stdcall, dynlib: "kernel32", importc: "CancelIo".}
|
{.stdcall, dynlib: "kernel32", importc: "CancelIo".}
|
||||||
|
proc connectNamedPipe*(hPipe: HANDLE, lpOverlapped: ptr OVERLAPPED): WINBOOL
|
||||||
|
{.stdcall, dynlib: "kernel32", importc: "ConnectNamedPipe".}
|
||||||
|
proc disconnectNamedPipe*(hPipe: HANDLE): WINBOOL
|
||||||
|
{.stdcall, dynlib: "kernel32", importc: "DisconnectNamedPipe".}
|
||||||
|
proc setNamedPipeHandleState*(hPipe: HANDLE, lpMode, lpMaxCollectionCount,
|
||||||
|
lpCollectDataTimeout: ptr DWORD): WINBOOL
|
||||||
|
{.stdcall, dynlib: "kernel32", importc: "SetNamedPipeHandleState".}
|
||||||
|
proc resetEvent*(hEvent: HANDLE): WINBOOL
|
||||||
|
{.stdcall, dynlib: "kernel32", importc: "ResetEvent".}
|
||||||
|
|
|
@ -94,8 +94,7 @@ when defined(windows):
|
||||||
transp.setWriterWSABuffer(vector)
|
transp.setWriterWSABuffer(vector)
|
||||||
var ret: cint
|
var ret: cint
|
||||||
if vector.kind == WithAddress:
|
if vector.kind == WithAddress:
|
||||||
toSockAddr(vector.address.address, vector.address.port,
|
toSAddr(vector.address, transp.waddr, transp.walen)
|
||||||
transp.waddr, transp.walen)
|
|
||||||
ret = WSASendTo(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
|
ret = WSASendTo(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
|
||||||
DWORD(0), cast[ptr SockAddr](addr transp.waddr),
|
DWORD(0), cast[ptr SockAddr](addr transp.waddr),
|
||||||
cint(transp.walen),
|
cint(transp.walen),
|
||||||
|
@ -139,7 +138,7 @@ when defined(windows):
|
||||||
let bytesCount = transp.rovl.data.bytesCount
|
let bytesCount = transp.rovl.data.bytesCount
|
||||||
if bytesCount == 0:
|
if bytesCount == 0:
|
||||||
transp.state.incl({ReadEof, ReadPaused})
|
transp.state.incl({ReadEof, ReadPaused})
|
||||||
fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port)
|
fromSAddr(addr transp.raddr, transp.ralen, raddr)
|
||||||
transp.buflen = bytesCount
|
transp.buflen = bytesCount
|
||||||
asyncCheck transp.function(transp, raddr)
|
asyncCheck transp.function(transp, raddr)
|
||||||
elif int(err) == ERROR_OPERATION_ABORTED:
|
elif int(err) == ERROR_OPERATION_ABORTED:
|
||||||
|
@ -200,7 +199,7 @@ when defined(windows):
|
||||||
child: DatagramTransport,
|
child: DatagramTransport,
|
||||||
bufferSize: int): DatagramTransport =
|
bufferSize: int): DatagramTransport =
|
||||||
var localSock: AsyncFD
|
var localSock: AsyncFD
|
||||||
assert(remote.address.family == local.address.family)
|
assert(remote.family == local.family)
|
||||||
assert(not isNil(cbproc))
|
assert(not isNil(cbproc))
|
||||||
|
|
||||||
if isNil(child):
|
if isNil(child):
|
||||||
|
@ -209,12 +208,8 @@ when defined(windows):
|
||||||
result = child
|
result = child
|
||||||
|
|
||||||
if sock == asyncInvalidSocket:
|
if sock == asyncInvalidSocket:
|
||||||
if local.address.family == IpAddressFamily.IPv4:
|
localSock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM,
|
||||||
localSock = createAsyncSocket(Domain.AF_INET, SockType.SOCK_DGRAM,
|
Protocol.IPPROTO_UDP)
|
||||||
Protocol.IPPROTO_UDP)
|
|
||||||
else:
|
|
||||||
localSock = createAsyncSocket(Domain.AF_INET6, SockType.SOCK_DGRAM,
|
|
||||||
Protocol.IPPROTO_UDP)
|
|
||||||
if localSock == asyncInvalidSocket:
|
if localSock == asyncInvalidSocket:
|
||||||
raiseTransportOsError(osLastError())
|
raiseTransportOsError(osLastError())
|
||||||
else:
|
else:
|
||||||
|
@ -239,10 +234,10 @@ when defined(windows):
|
||||||
addr bytesRet, nil, nil) != 0:
|
addr bytesRet, nil, nil) != 0:
|
||||||
raiseTransportOsError(osLastError())
|
raiseTransportOsError(osLastError())
|
||||||
|
|
||||||
if local.port != Port(0):
|
if local.family != AddressFamily.None:
|
||||||
var saddr: Sockaddr_storage
|
var saddr: Sockaddr_storage
|
||||||
var slen: SockLen
|
var slen: SockLen
|
||||||
toSockAddr(local.address, local.port, saddr, slen)
|
toSAddr(local, saddr, slen)
|
||||||
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
||||||
slen) != 0:
|
slen) != 0:
|
||||||
let err = osLastError()
|
let err = osLastError()
|
||||||
|
@ -253,12 +248,7 @@ when defined(windows):
|
||||||
else:
|
else:
|
||||||
var saddr: Sockaddr_storage
|
var saddr: Sockaddr_storage
|
||||||
var slen: SockLen
|
var slen: SockLen
|
||||||
if local.address.family == IpAddressFamily.IPv4:
|
saddr.ss_family = type(saddr.ss_family)(local.getDomain())
|
||||||
saddr.ss_family = winlean.AF_INET
|
|
||||||
slen = SockLen(sizeof(SockAddr_in))
|
|
||||||
else:
|
|
||||||
saddr.ss_family = winlean.AF_INET6
|
|
||||||
slen = SockLen(sizeof(SockAddr_in6))
|
|
||||||
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
||||||
slen) != 0:
|
slen) != 0:
|
||||||
let err = osLastError()
|
let err = osLastError()
|
||||||
|
@ -269,7 +259,7 @@ when defined(windows):
|
||||||
if remote.port != Port(0):
|
if remote.port != Port(0):
|
||||||
var saddr: Sockaddr_storage
|
var saddr: Sockaddr_storage
|
||||||
var slen: SockLen
|
var slen: SockLen
|
||||||
toSockAddr(remote.address, remote.port, saddr, slen)
|
toSAddr(remote, saddr, slen)
|
||||||
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
||||||
slen) != 0:
|
slen) != 0:
|
||||||
let err = osLastError()
|
let err = osLastError()
|
||||||
|
@ -320,7 +310,7 @@ else:
|
||||||
cast[ptr SockAddr](addr transp.raddr),
|
cast[ptr SockAddr](addr transp.raddr),
|
||||||
addr transp.ralen)
|
addr transp.ralen)
|
||||||
if res >= 0:
|
if res >= 0:
|
||||||
fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port)
|
fromSAddr(addr transp.raddr, transp.ralen, raddr)
|
||||||
transp.buflen = res
|
transp.buflen = res
|
||||||
asyncCheck transp.function(transp, raddr)
|
asyncCheck transp.function(transp, raddr)
|
||||||
else:
|
else:
|
||||||
|
@ -350,8 +340,7 @@ else:
|
||||||
var vector = transp.queue.popFirst()
|
var vector = transp.queue.popFirst()
|
||||||
while true:
|
while true:
|
||||||
if vector.kind == WithAddress:
|
if vector.kind == WithAddress:
|
||||||
toSockAddr(vector.address.address, vector.address.port,
|
toSAddr(vector.address, transp.waddr, transp.walen)
|
||||||
transp.waddr, transp.walen)
|
|
||||||
res = posix.sendto(fd, vector.buf, vector.buflen, MSG_NOSIGNAL,
|
res = posix.sendto(fd, vector.buf, vector.buflen, MSG_NOSIGNAL,
|
||||||
cast[ptr SockAddr](addr transp.waddr),
|
cast[ptr SockAddr](addr transp.waddr),
|
||||||
transp.walen)
|
transp.walen)
|
||||||
|
@ -387,7 +376,7 @@ else:
|
||||||
child: DatagramTransport = nil,
|
child: DatagramTransport = nil,
|
||||||
bufferSize: int): DatagramTransport =
|
bufferSize: int): DatagramTransport =
|
||||||
var localSock: AsyncFD
|
var localSock: AsyncFD
|
||||||
assert(remote.address.family == local.address.family)
|
assert(remote.family == local.family)
|
||||||
assert(not isNil(cbproc))
|
assert(not isNil(cbproc))
|
||||||
|
|
||||||
if isNil(child):
|
if isNil(child):
|
||||||
|
@ -396,12 +385,13 @@ else:
|
||||||
result = child
|
result = child
|
||||||
|
|
||||||
if sock == asyncInvalidSocket:
|
if sock == asyncInvalidSocket:
|
||||||
if local.address.family == IpAddressFamily.IPv4:
|
var proto = Protocol.IPPROTO_UDP
|
||||||
localSock = createAsyncSocket(Domain.AF_INET, SockType.SOCK_DGRAM,
|
if local.family == AddressFamily.Unix:
|
||||||
Protocol.IPPROTO_UDP)
|
# `Protocol` enum is missing `0` value, so we making here cast, until
|
||||||
else:
|
# `Protocol` enum will not support IPPROTO_IP == 0.
|
||||||
localSock = createAsyncSocket(Domain.AF_INET6, SockType.SOCK_DGRAM,
|
proto = cast[Protocol](0)
|
||||||
Protocol.IPPROTO_UDP)
|
localSock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM,
|
||||||
|
proto)
|
||||||
if localSock == asyncInvalidSocket:
|
if localSock == asyncInvalidSocket:
|
||||||
raiseTransportOsError(osLastError())
|
raiseTransportOsError(osLastError())
|
||||||
else:
|
else:
|
||||||
|
@ -418,10 +408,10 @@ else:
|
||||||
closeSocket(localSock)
|
closeSocket(localSock)
|
||||||
raiseTransportOsError(err)
|
raiseTransportOsError(err)
|
||||||
|
|
||||||
if local.port != Port(0):
|
if local.family != AddressFamily.None:
|
||||||
var saddr: Sockaddr_storage
|
var saddr: Sockaddr_storage
|
||||||
var slen: SockLen
|
var slen: SockLen
|
||||||
toSockAddr(local.address, local.port, saddr, slen)
|
toSAddr(local, saddr, slen)
|
||||||
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
if bindAddr(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
||||||
slen) != 0:
|
slen) != 0:
|
||||||
let err = osLastError()
|
let err = osLastError()
|
||||||
|
@ -430,10 +420,10 @@ else:
|
||||||
raiseTransportOsError(err)
|
raiseTransportOsError(err)
|
||||||
result.local = local
|
result.local = local
|
||||||
|
|
||||||
if remote.port != Port(0):
|
if remote.family != AddressFamily.None:
|
||||||
var saddr: Sockaddr_storage
|
var saddr: Sockaddr_storage
|
||||||
var slen: SockLen
|
var slen: SockLen
|
||||||
toSockAddr(remote.address, remote.port, saddr, slen)
|
toSAddr(remote, saddr, slen)
|
||||||
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
|
||||||
slen) != 0:
|
slen) != 0:
|
||||||
let err = osLastError()
|
let err = osLastError()
|
||||||
|
|
|
@ -11,6 +11,8 @@ import net, nativesockets, os, deques
|
||||||
import ../asyncloop, ../handles, ../sendfile
|
import ../asyncloop, ../handles, ../sendfile
|
||||||
import common
|
import common
|
||||||
|
|
||||||
|
{.deadCodeElim: on.}
|
||||||
|
|
||||||
when defined(windows):
|
when defined(windows):
|
||||||
import winlean
|
import winlean
|
||||||
else:
|
else:
|
||||||
|
@ -33,6 +35,23 @@ type
|
||||||
Pipe, # Pipe transport
|
Pipe, # Pipe transport
|
||||||
File # File transport
|
File # File transport
|
||||||
|
|
||||||
|
TransportFlags* = enum
|
||||||
|
None,
|
||||||
|
# Default value
|
||||||
|
WinServerPipe,
|
||||||
|
# This is internal flag which used to differentiate between server pipe
|
||||||
|
# handle and client pipe handle.
|
||||||
|
WinNoPipeFlash
|
||||||
|
# By default `AddressFamily.Unix` transports in Windows are using
|
||||||
|
# `FlushFileBuffers()` when transport closing.
|
||||||
|
# This flag disables usage of `FlushFileBuffers()` on `AddressFamily.Unix`
|
||||||
|
# transport shutdown. If both server and client are running in the same
|
||||||
|
# thread, because of `FlushFileBuffers()` will ensure that all bytes
|
||||||
|
# or messages written to the pipe are read by the client, it is possible to
|
||||||
|
# get stuck on transport `close()`.
|
||||||
|
# Please use this flag only if you are making both client and server in
|
||||||
|
# the same thread.
|
||||||
|
|
||||||
when defined(windows):
|
when defined(windows):
|
||||||
const SO_UPDATE_CONNECT_CONTEXT = 0x7010
|
const SO_UPDATE_CONNECT_CONTEXT = 0x7010
|
||||||
|
|
||||||
|
@ -52,6 +71,7 @@ when defined(windows):
|
||||||
rovl: CustomOverlapped # Reader OVERLAPPED structure
|
rovl: CustomOverlapped # Reader OVERLAPPED structure
|
||||||
wovl: CustomOverlapped # Writer OVERLAPPED structure
|
wovl: CustomOverlapped # Writer OVERLAPPED structure
|
||||||
roffset: int # Pending reading offset
|
roffset: int # Pending reading offset
|
||||||
|
flags: set[TransportFlags] # Internal flags
|
||||||
case kind*: TransportKind
|
case kind*: TransportKind
|
||||||
of TransportKind.Socket:
|
of TransportKind.Socket:
|
||||||
domain: Domain # Socket transport domain (IPv4/IPv6)
|
domain: Domain # Socket transport domain (IPv4/IPv6)
|
||||||
|
@ -83,7 +103,6 @@ else:
|
||||||
todo2: int
|
todo2: int
|
||||||
|
|
||||||
type
|
type
|
||||||
|
|
||||||
StreamCallback* = proc(server: StreamServer,
|
StreamCallback* = proc(server: StreamServer,
|
||||||
client: StreamTransport): Future[void] {.gcsafe.}
|
client: StreamTransport): Future[void] {.gcsafe.}
|
||||||
## New remote client connection callback
|
## New remote client connection callback
|
||||||
|
@ -92,7 +111,7 @@ type
|
||||||
|
|
||||||
TransportInitCallback* = proc(server: StreamServer,
|
TransportInitCallback* = proc(server: StreamServer,
|
||||||
fd: AsyncFD): StreamTransport {.gcsafe.}
|
fd: AsyncFD): StreamTransport {.gcsafe.}
|
||||||
## Custom transport initialization procedure, which can allocated inherited
|
## Custom transport initialization procedure, which can allocate inherited
|
||||||
## StreamTransport object.
|
## StreamTransport object.
|
||||||
|
|
||||||
StreamServer* = ref object of SocketServer
|
StreamServer* = ref object of SocketServer
|
||||||
|
@ -106,26 +125,26 @@ proc remoteAddress*(transp: StreamTransport): TransportAddress =
|
||||||
## Returns ``transp`` remote socket address.
|
## Returns ``transp`` remote socket address.
|
||||||
if transp.kind != TransportKind.Socket:
|
if transp.kind != TransportKind.Socket:
|
||||||
raise newException(TransportError, "Socket required!")
|
raise newException(TransportError, "Socket required!")
|
||||||
if transp.remote.port == Port(0):
|
if transp.remote.family == AddressFamily.None:
|
||||||
var saddr: Sockaddr_storage
|
var saddr: Sockaddr_storage
|
||||||
var slen = SockLen(sizeof(saddr))
|
var slen = SockLen(sizeof(saddr))
|
||||||
if getpeername(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
|
if getpeername(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
|
||||||
addr slen) != 0:
|
addr slen) != 0:
|
||||||
raiseTransportOsError(osLastError())
|
raiseTransportOsError(osLastError())
|
||||||
fromSockAddr(saddr, slen, transp.remote.address, transp.remote.port)
|
fromSAddr(addr saddr, slen, transp.remote)
|
||||||
result = transp.remote
|
result = transp.remote
|
||||||
|
|
||||||
proc localAddress*(transp: StreamTransport): TransportAddress =
|
proc localAddress*(transp: StreamTransport): TransportAddress =
|
||||||
## Returns ``transp`` local socket address.
|
## Returns ``transp`` local socket address.
|
||||||
if transp.kind != TransportKind.Socket:
|
if transp.kind != TransportKind.Socket:
|
||||||
raise newException(TransportError, "Socket required!")
|
raise newException(TransportError, "Socket required!")
|
||||||
if transp.local.port == Port(0):
|
if transp.local.family == AddressFamily.None:
|
||||||
var saddr: Sockaddr_storage
|
var saddr: Sockaddr_storage
|
||||||
var slen = SockLen(sizeof(saddr))
|
var slen = SockLen(sizeof(saddr))
|
||||||
if getsockname(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
|
if getsockname(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
|
||||||
addr slen) != 0:
|
addr slen) != 0:
|
||||||
raiseTransportOsError(osLastError())
|
raiseTransportOsError(osLastError())
|
||||||
fromSockAddr(saddr, slen, transp.local.address, transp.local.port)
|
fromSAddr(addr saddr, slen, transp.local)
|
||||||
result = transp.local
|
result = transp.local
|
||||||
|
|
||||||
template setReadError(t, e: untyped) =
|
template setReadError(t, e: untyped) =
|
||||||
|
@ -209,6 +228,13 @@ when defined(windows):
|
||||||
transp.queue.addFirst(vector)
|
transp.queue.addFirst(vector)
|
||||||
else:
|
else:
|
||||||
vector.writer.complete(int(getFileSize(vector)))
|
vector.writer.complete(int(getFileSize(vector)))
|
||||||
|
elif transp.kind == TransportKind.Pipe:
|
||||||
|
if vector.kind == VectorKind.DataBuffer:
|
||||||
|
if bytesCount < transp.wwsabuf.len:
|
||||||
|
vector.shiftVectorBuffer(bytesCount)
|
||||||
|
transp.queue.addFirst(vector)
|
||||||
|
else:
|
||||||
|
vector.writer.complete(transp.wwsabuf.len)
|
||||||
elif int(err) == ERROR_OPERATION_ABORTED:
|
elif int(err) == ERROR_OPERATION_ABORTED:
|
||||||
# CancelIO() interrupt
|
# CancelIO() interrupt
|
||||||
transp.state.incl(WritePaused)
|
transp.state.incl(WritePaused)
|
||||||
|
@ -275,6 +301,35 @@ when defined(windows):
|
||||||
vector.writer.fail(getTransportOsError(err))
|
vector.writer.fail(getTransportOsError(err))
|
||||||
else:
|
else:
|
||||||
transp.queue.addFirst(vector)
|
transp.queue.addFirst(vector)
|
||||||
|
elif transp.kind == TransportKind.Pipe:
|
||||||
|
let pipe = Handle(transp.wovl.data.fd)
|
||||||
|
var vector = transp.queue.popFirst()
|
||||||
|
if vector.kind == VectorKind.DataBuffer:
|
||||||
|
transp.wovl.zeroOvelappedOffset()
|
||||||
|
transp.setWriterWSABuffer(vector)
|
||||||
|
let ret = writeFile(pipe, cast[pointer](transp.wwsabuf.buf),
|
||||||
|
DWORD(transp.wwsabuf.len), addr bytesCount,
|
||||||
|
cast[POVERLAPPED](addr transp.wovl))
|
||||||
|
if ret == 0:
|
||||||
|
let err = osLastError()
|
||||||
|
if int(err) == ERROR_OPERATION_ABORTED:
|
||||||
|
# CancelIO() interrupt
|
||||||
|
transp.state.excl(WritePending)
|
||||||
|
transp.state.incl(WritePaused)
|
||||||
|
vector.writer.complete(0)
|
||||||
|
elif int(err) == ERROR_IO_PENDING:
|
||||||
|
transp.queue.addFirst(vector)
|
||||||
|
elif int(err) == ERROR_NO_DATA:
|
||||||
|
# The pipe is being closed.
|
||||||
|
transp.state.excl(WritePending)
|
||||||
|
transp.state.incl(WritePaused)
|
||||||
|
vector.writer.complete(0)
|
||||||
|
else:
|
||||||
|
transp.state.excl(WritePending)
|
||||||
|
transp.state = transp.state + {WritePaused, WriteError}
|
||||||
|
vector.writer.fail(getTransportOsError(err))
|
||||||
|
else:
|
||||||
|
transp.queue.addFirst(vector)
|
||||||
break
|
break
|
||||||
|
|
||||||
if len(transp.queue) == 0:
|
if len(transp.queue) == 0:
|
||||||
|
@ -283,7 +338,6 @@ when defined(windows):
|
||||||
proc readStreamLoop(udata: pointer) {.gcsafe, nimcall.} =
|
proc readStreamLoop(udata: pointer) {.gcsafe, nimcall.} =
|
||||||
var ovl = cast[PtrCustomOverlapped](udata)
|
var ovl = cast[PtrCustomOverlapped](udata)
|
||||||
var transp = cast[StreamTransport](ovl.data.udata)
|
var transp = cast[StreamTransport](ovl.data.udata)
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
if ReadPending in transp.state:
|
if ReadPending in transp.state:
|
||||||
## Continuation
|
## Continuation
|
||||||
|
@ -312,7 +366,11 @@ when defined(windows):
|
||||||
elif int(err) == ERROR_OPERATION_ABORTED:
|
elif int(err) == ERROR_OPERATION_ABORTED:
|
||||||
# CancelIO() interrupt
|
# CancelIO() interrupt
|
||||||
transp.state.incl(ReadPaused)
|
transp.state.incl(ReadPaused)
|
||||||
elif int(err) in {ERROR_NETNAME_DELETED, WSAECONNABORTED}:
|
elif transp.kind == TransportKind.Socket and
|
||||||
|
(int(err) in {ERROR_NETNAME_DELETED, WSAECONNABORTED}):
|
||||||
|
transp.state.incl({ReadEof, ReadPaused})
|
||||||
|
elif transp.kind == TransportKind.Pipe and
|
||||||
|
(int(err) in {ERROR_BROKEN_PIPE, ERROR_PIPE_NOT_CONNECTED}):
|
||||||
transp.state.incl({ReadEof, ReadPaused})
|
transp.state.incl({ReadEof, ReadPaused})
|
||||||
else:
|
else:
|
||||||
transp.setReadError(err)
|
transp.setReadError(err)
|
||||||
|
@ -339,7 +397,7 @@ when defined(windows):
|
||||||
cast[POVERLAPPED](addr transp.rovl), nil)
|
cast[POVERLAPPED](addr transp.rovl), nil)
|
||||||
if ret != 0:
|
if ret != 0:
|
||||||
let err = osLastError()
|
let err = osLastError()
|
||||||
if int(err) == ERROR_OPERATION_ABORTED:
|
if int32(err) == ERROR_OPERATION_ABORTED:
|
||||||
# CancelIO() interrupt
|
# CancelIO() interrupt
|
||||||
transp.state.excl(ReadPending)
|
transp.state.excl(ReadPending)
|
||||||
transp.state.incl(ReadPaused)
|
transp.state.incl(ReadPaused)
|
||||||
|
@ -356,6 +414,32 @@ when defined(windows):
|
||||||
if not isNil(transp.reader):
|
if not isNil(transp.reader):
|
||||||
transp.reader.complete()
|
transp.reader.complete()
|
||||||
transp.reader = nil
|
transp.reader = nil
|
||||||
|
elif transp.kind == TransportKind.Pipe:
|
||||||
|
let pipe = Handle(transp.rovl.data.fd)
|
||||||
|
transp.roffset = transp.offset
|
||||||
|
transp.setReaderWSABuffer()
|
||||||
|
let ret = readFile(pipe, cast[pointer](transp.rwsabuf.buf),
|
||||||
|
DWORD(transp.rwsabuf.len), addr bytesCount,
|
||||||
|
cast[POVERLAPPED](addr transp.rovl))
|
||||||
|
if ret == 0:
|
||||||
|
let err = osLastError()
|
||||||
|
if int32(err) == ERROR_OPERATION_ABORTED:
|
||||||
|
# CancelIO() interrupt
|
||||||
|
transp.state.excl(ReadPending)
|
||||||
|
transp.state.incl(ReadPaused)
|
||||||
|
elif int32(err) in {ERROR_BROKEN_PIPE, ERROR_PIPE_NOT_CONNECTED}:
|
||||||
|
transp.state.excl(ReadPending)
|
||||||
|
transp.state.incl({ReadEof, ReadPaused})
|
||||||
|
if not isNil(transp.reader):
|
||||||
|
transp.reader.complete()
|
||||||
|
transp.reader = nil
|
||||||
|
elif int32(err) != ERROR_IO_PENDING:
|
||||||
|
transp.state.excl(ReadPending)
|
||||||
|
transp.state.incl(ReadPaused)
|
||||||
|
transp.setReadError(err)
|
||||||
|
if not isNil(transp.reader):
|
||||||
|
transp.reader.complete()
|
||||||
|
transp.reader = nil
|
||||||
else:
|
else:
|
||||||
transp.state.incl(ReadPaused)
|
transp.state.incl(ReadPaused)
|
||||||
if not isNil(transp.reader):
|
if not isNil(transp.reader):
|
||||||
|
@ -383,6 +467,27 @@ when defined(windows):
|
||||||
GC_ref(transp)
|
GC_ref(transp)
|
||||||
result = transp
|
result = transp
|
||||||
|
|
||||||
|
proc newStreamPipeTransport(fd: AsyncFD, bufsize: int,
|
||||||
|
child: StreamTransport,
|
||||||
|
flags: set[TransportFlags] = {}): StreamTransport =
|
||||||
|
var transp: StreamTransport
|
||||||
|
if not isNil(child):
|
||||||
|
transp = child
|
||||||
|
else:
|
||||||
|
transp = StreamTransport(kind: TransportKind.Pipe)
|
||||||
|
transp.fd = fd
|
||||||
|
transp.rovl.data = CompletionData(fd: fd, cb: readStreamLoop,
|
||||||
|
udata: cast[pointer](transp))
|
||||||
|
transp.wovl.data = CompletionData(fd: fd, cb: writeStreamLoop,
|
||||||
|
udata: cast[pointer](transp))
|
||||||
|
transp.buffer = newSeq[byte](bufsize)
|
||||||
|
transp.flags = flags
|
||||||
|
transp.state = {ReadPaused, WritePaused}
|
||||||
|
transp.queue = initDeque[StreamVector]()
|
||||||
|
transp.future = newFuture[void]("stream.pipe.transport")
|
||||||
|
GC_ref(transp)
|
||||||
|
result = transp
|
||||||
|
|
||||||
proc bindToDomain(handle: AsyncFD, domain: Domain): bool =
|
proc bindToDomain(handle: AsyncFD, domain: Domain): bool =
|
||||||
result = true
|
result = true
|
||||||
if domain == Domain.AF_INET6:
|
if domain == Domain.AF_INET6:
|
||||||
|
@ -391,7 +496,7 @@ when defined(windows):
|
||||||
if bindAddr(SocketHandle(handle), cast[ptr SockAddr](addr(saddr)),
|
if bindAddr(SocketHandle(handle), cast[ptr SockAddr](addr(saddr)),
|
||||||
sizeof(saddr).SockLen) != 0'i32:
|
sizeof(saddr).SockLen) != 0'i32:
|
||||||
result = false
|
result = false
|
||||||
else:
|
elif domain == Domain.AF_INET:
|
||||||
var saddr: Sockaddr_in
|
var saddr: Sockaddr_in
|
||||||
saddr.sin_family = type(saddr.sin_family)(toInt(domain))
|
saddr.sin_family = type(saddr.sin_family)(toInt(domain))
|
||||||
if bindAddr(SocketHandle(handle), cast[ptr SockAddr](addr(saddr)),
|
if bindAddr(SocketHandle(handle), cast[ptr SockAddr](addr(saddr)),
|
||||||
|
@ -400,66 +505,161 @@ when defined(windows):
|
||||||
|
|
||||||
proc connect*(address: TransportAddress,
|
proc connect*(address: TransportAddress,
|
||||||
bufferSize = DefaultStreamBufferSize,
|
bufferSize = DefaultStreamBufferSize,
|
||||||
child: StreamTransport = nil): Future[StreamTransport] =
|
child: StreamTransport = nil,
|
||||||
|
flags: set[TransportFlags] = {}): Future[StreamTransport] =
|
||||||
## Open new connection to remote peer with address ``address`` and create
|
## Open new connection to remote peer with address ``address`` and create
|
||||||
## new transport object ``StreamTransport`` for established connection.
|
## new transport object ``StreamTransport`` for established connection.
|
||||||
## ``bufferSize`` is size of internal buffer for transport.
|
## ``bufferSize`` is size of internal buffer for transport.
|
||||||
let loop = getGlobalDispatcher()
|
let loop = getGlobalDispatcher()
|
||||||
var
|
|
||||||
saddr: Sockaddr_storage
|
|
||||||
slen: SockLen
|
|
||||||
sock: AsyncFD
|
|
||||||
povl: RefCustomOverlapped
|
|
||||||
|
|
||||||
var retFuture = newFuture[StreamTransport]("stream.transport.connect")
|
var retFuture = newFuture[StreamTransport]("stream.transport.connect")
|
||||||
toSockAddr(address.address, address.port, saddr, slen)
|
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||||
sock = createAsyncSocket(address.address.getDomain(), SockType.SOCK_STREAM,
|
## Socket handling part
|
||||||
Protocol.IPPROTO_TCP)
|
var
|
||||||
|
saddr: Sockaddr_storage
|
||||||
|
slen: SockLen
|
||||||
|
sock: AsyncFD
|
||||||
|
povl: RefCustomOverlapped
|
||||||
|
proto: Protocol
|
||||||
|
|
||||||
if sock == asyncInvalidSocket:
|
toSAddr(address, saddr, slen)
|
||||||
retFuture.fail(getTransportOsError(OSErrorCode(wsaGetLastError())))
|
proto = Protocol.IPPROTO_TCP
|
||||||
return retFuture
|
sock = createAsyncSocket(address.getDomain(), SockType.SOCK_STREAM, proto)
|
||||||
|
if sock == asyncInvalidSocket:
|
||||||
|
result.fail(getTransportOsError(osLastError()))
|
||||||
|
|
||||||
if not bindToDomain(sock, address.address.getDomain()):
|
if not bindToDomain(sock, address.getDomain()):
|
||||||
let err = wsaGetLastError()
|
let err = wsaGetLastError()
|
||||||
sock.closeSocket()
|
|
||||||
retFuture.fail(getTransportOsError(err))
|
|
||||||
return retFuture
|
|
||||||
|
|
||||||
proc continuation(udata: pointer) =
|
|
||||||
var ovl = cast[RefCustomOverlapped](udata)
|
|
||||||
if not retFuture.finished:
|
|
||||||
if ovl.data.errCode == OSErrorCode(-1):
|
|
||||||
if setsockopt(SocketHandle(sock), cint(SOL_SOCKET),
|
|
||||||
cint(SO_UPDATE_CONNECT_CONTEXT), nil,
|
|
||||||
SockLen(0)) != 0'i32:
|
|
||||||
sock.closeSocket()
|
|
||||||
retFuture.fail(getTransportOsError(wsaGetLastError()))
|
|
||||||
else:
|
|
||||||
retFuture.complete(newStreamSocketTransport(povl.data.fd,
|
|
||||||
bufferSize,
|
|
||||||
child))
|
|
||||||
else:
|
|
||||||
sock.closeSocket()
|
|
||||||
retFuture.fail(getTransportOsError(ovl.data.errCode))
|
|
||||||
GC_unref(ovl)
|
|
||||||
|
|
||||||
povl = RefCustomOverlapped()
|
|
||||||
GC_ref(povl)
|
|
||||||
povl.data = CompletionData(fd: sock, cb: continuation)
|
|
||||||
var res = loop.connectEx(SocketHandle(sock),
|
|
||||||
cast[ptr SockAddr](addr saddr),
|
|
||||||
DWORD(slen), nil, 0, nil,
|
|
||||||
cast[POVERLAPPED](povl))
|
|
||||||
# We will not process immediate completion, to avoid undefined behavior.
|
|
||||||
if not res:
|
|
||||||
let err = osLastError()
|
|
||||||
if int32(err) != ERROR_IO_PENDING:
|
|
||||||
GC_unref(povl)
|
|
||||||
sock.closeSocket()
|
sock.closeSocket()
|
||||||
retFuture.fail(getTransportOsError(err))
|
retFuture.fail(getTransportOsError(err))
|
||||||
|
return retFuture
|
||||||
|
|
||||||
|
proc socketContinuation(udata: pointer) =
|
||||||
|
var ovl = cast[RefCustomOverlapped](udata)
|
||||||
|
if not retFuture.finished:
|
||||||
|
if ovl.data.errCode == OSErrorCode(-1):
|
||||||
|
if setsockopt(SocketHandle(sock), cint(SOL_SOCKET),
|
||||||
|
cint(SO_UPDATE_CONNECT_CONTEXT), nil,
|
||||||
|
SockLen(0)) != 0'i32:
|
||||||
|
let err = wsaGetLastError()
|
||||||
|
sock.closeSocket()
|
||||||
|
retFuture.fail(getTransportOsError(err))
|
||||||
|
else:
|
||||||
|
retFuture.complete(newStreamSocketTransport(povl.data.fd,
|
||||||
|
bufferSize,
|
||||||
|
child))
|
||||||
|
else:
|
||||||
|
sock.closeSocket()
|
||||||
|
retFuture.fail(getTransportOsError(ovl.data.errCode))
|
||||||
|
GC_unref(ovl)
|
||||||
|
|
||||||
|
povl = RefCustomOverlapped()
|
||||||
|
GC_ref(povl)
|
||||||
|
povl.data = CompletionData(fd: sock, cb: socketContinuation)
|
||||||
|
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||||
|
var res = loop.connectEx(SocketHandle(sock),
|
||||||
|
cast[ptr SockAddr](addr saddr),
|
||||||
|
DWORD(slen), nil, 0, nil,
|
||||||
|
cast[POVERLAPPED](povl))
|
||||||
|
# We will not process immediate completion, to avoid undefined behavior.
|
||||||
|
if not res:
|
||||||
|
let err = osLastError()
|
||||||
|
if int32(err) != ERROR_IO_PENDING:
|
||||||
|
GC_unref(povl)
|
||||||
|
sock.closeSocket()
|
||||||
|
retFuture.fail(getTransportOsError(err))
|
||||||
|
|
||||||
|
elif address.family == AddressFamily.Unix:
|
||||||
|
## Unix domain socket emulation with Windows Named Pipes.
|
||||||
|
proc pipeContinuation(udata: pointer) {.gcsafe.} =
|
||||||
|
var pipeSuffix = $cast[cstring](unsafeAddr address.address_un[0])
|
||||||
|
var pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1])
|
||||||
|
var pipeHandle = createFileW(pipeName, GENERIC_READ or GENERIC_WRITE,
|
||||||
|
FILE_SHARE_READ or FILE_SHARE_WRITE,
|
||||||
|
nil, OPEN_EXISTING,
|
||||||
|
FILE_FLAG_OVERLAPPED, Handle(0))
|
||||||
|
if pipeHandle == INVALID_HANDLE_VALUE:
|
||||||
|
let err = osLastError()
|
||||||
|
if int32(err) == ERROR_PIPE_BUSY:
|
||||||
|
addTimer(fastEpochTime() + 50, pipeContinuation, nil)
|
||||||
|
else:
|
||||||
|
retFuture.fail(getTransportOsError(err))
|
||||||
|
else:
|
||||||
|
register(AsyncFD(pipeHandle))
|
||||||
|
retFuture.complete(newStreamPipeTransport(AsyncFD(pipeHandle),
|
||||||
|
bufferSize, child))
|
||||||
|
pipeContinuation(nil)
|
||||||
|
|
||||||
return retFuture
|
return retFuture
|
||||||
|
|
||||||
|
proc acceptPipeLoop(udata: pointer) {.gcsafe, nimcall.} =
|
||||||
|
var ovl = cast[PtrCustomOverlapped](udata)
|
||||||
|
var server = cast[StreamServer](ovl.data.udata)
|
||||||
|
var loop = getGlobalDispatcher()
|
||||||
|
|
||||||
|
while true:
|
||||||
|
if server.apending:
|
||||||
|
## Continuation
|
||||||
|
server.apending = false
|
||||||
|
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if ovl.data.errCode == OSErrorCode(-1):
|
||||||
|
var ntransp: StreamTransport
|
||||||
|
var flags = {WinServerPipe}
|
||||||
|
if NoPipeFlash in server.flags:
|
||||||
|
flags.incl(WinNoPipeFlash)
|
||||||
|
if not isNil(server.init):
|
||||||
|
var transp = server.init(server, server.sock)
|
||||||
|
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
|
||||||
|
transp, flags)
|
||||||
|
else:
|
||||||
|
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
|
||||||
|
nil, flags)
|
||||||
|
asyncCheck server.function(server, ntransp)
|
||||||
|
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
|
||||||
|
# CancelIO() interrupt
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
doAssert disconnectNamedPipe(Handle(server.sock)) == 1
|
||||||
|
doAssert closeHandle(HANDLE(server.sock)) == 1
|
||||||
|
raiseTransportOsError(osLastError())
|
||||||
|
else:
|
||||||
|
## Initiation
|
||||||
|
server.apending = true
|
||||||
|
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||||
|
## Server was already stopped/closed exiting
|
||||||
|
break
|
||||||
|
|
||||||
|
var pipeSuffix = $cast[cstring](addr server.local.address_un)
|
||||||
|
var pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1])
|
||||||
|
var openMode = PIPE_ACCESS_DUPLEX or FILE_FLAG_OVERLAPPED
|
||||||
|
if FirstPipe notin server.flags:
|
||||||
|
openMode = openMode or FILE_FLAG_FIRST_PIPE_INSTANCE
|
||||||
|
server.flags.incl(FirstPipe)
|
||||||
|
let pipeMode = int32(PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT)
|
||||||
|
let pipeHandle = createNamedPipe(pipeName, openMode, pipeMode,
|
||||||
|
PIPE_UNLIMITED_INSTANCES,
|
||||||
|
DWORD(server.bufferSize),
|
||||||
|
DWORD(server.bufferSize),
|
||||||
|
DWORD(0), nil)
|
||||||
|
if pipeHandle == INVALID_HANDLE_VALUE:
|
||||||
|
raiseTransportOsError(osLastError())
|
||||||
|
server.sock = AsyncFD(pipeHandle)
|
||||||
|
server.aovl.data.fd = AsyncFD(pipeHandle)
|
||||||
|
register(server.sock)
|
||||||
|
let res = connectNamedPipe(pipeHandle,
|
||||||
|
cast[POVERLAPPED](addr server.aovl))
|
||||||
|
if res == 0:
|
||||||
|
let err = osLastError()
|
||||||
|
if int32(err) == ERROR_IO_PENDING:
|
||||||
|
discard
|
||||||
|
elif int32(err) == ERROR_PIPE_CONNECTED:
|
||||||
|
discard
|
||||||
|
else:
|
||||||
|
raiseTransportOsError(err)
|
||||||
|
break
|
||||||
|
|
||||||
proc acceptLoop(udata: pointer) {.gcsafe, nimcall.} =
|
proc acceptLoop(udata: pointer) {.gcsafe, nimcall.} =
|
||||||
var ovl = cast[PtrCustomOverlapped](udata)
|
var ovl = cast[PtrCustomOverlapped](udata)
|
||||||
var server = cast[StreamServer](ovl.data.udata)
|
var server = cast[StreamServer](ovl.data.udata)
|
||||||
|
@ -469,36 +669,37 @@ when defined(windows):
|
||||||
if server.apending:
|
if server.apending:
|
||||||
## Continuation
|
## Continuation
|
||||||
server.apending = false
|
server.apending = false
|
||||||
if server.status == ServerStatus.Stopped:
|
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||||
|
## Server was already stopped/closed exiting
|
||||||
server.asock.closeSocket()
|
server.asock.closeSocket()
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
if ovl.data.errCode == OSErrorCode(-1):
|
if ovl.data.errCode == OSErrorCode(-1):
|
||||||
if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET),
|
if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET),
|
||||||
cint(SO_UPDATE_ACCEPT_CONTEXT),
|
cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock,
|
||||||
addr server.sock,
|
|
||||||
SockLen(sizeof(SocketHandle))) != 0'i32:
|
SockLen(sizeof(SocketHandle))) != 0'i32:
|
||||||
let err = OSErrorCode(wsaGetLastError())
|
let err = OSErrorCode(wsaGetLastError())
|
||||||
server.asock.closeSocket()
|
server.asock.closeSocket()
|
||||||
raiseTransportOsError(err)
|
raiseTransportOsError(err)
|
||||||
else:
|
else:
|
||||||
|
var ntransp: StreamTransport
|
||||||
if not isNil(server.init):
|
if not isNil(server.init):
|
||||||
var transp = server.init(server, server.asock)
|
let transp = server.init(server, server.asock)
|
||||||
let ntransp = newStreamSocketTransport(server.asock,
|
ntransp = newStreamSocketTransport(server.asock,
|
||||||
server.bufferSize,
|
server.bufferSize,
|
||||||
transp)
|
transp)
|
||||||
asyncCheck server.function(server, ntransp)
|
|
||||||
else:
|
else:
|
||||||
let ntransp = newStreamSocketTransport(server.asock,
|
ntransp = newStreamSocketTransport(server.asock,
|
||||||
server.bufferSize, nil)
|
server.bufferSize, nil)
|
||||||
asyncCheck server.function(server, ntransp)
|
asyncCheck server.function(server, ntransp)
|
||||||
|
|
||||||
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
|
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
|
||||||
# CancelIO() interrupt
|
# CancelIO() interrupt
|
||||||
server.asock.closeSocket()
|
server.asock.closeSocket()
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
let err = OSErrorCode(wsaGetLastError())
|
|
||||||
server.asock.closeSocket()
|
server.asock.closeSocket()
|
||||||
raiseTransportOsError(err)
|
raiseTransportOsError(ovl.data.errCode)
|
||||||
else:
|
else:
|
||||||
## Initiation
|
## Initiation
|
||||||
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||||
|
@ -547,7 +748,7 @@ when defined(windows):
|
||||||
|
|
||||||
proc resumeAccept(server: StreamServer) {.inline.} =
|
proc resumeAccept(server: StreamServer) {.inline.} =
|
||||||
if not server.apending:
|
if not server.apending:
|
||||||
acceptLoop(cast[pointer](addr server.aovl))
|
server.aovl.data.cb(addr server.aovl)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
|
@ -681,10 +882,16 @@ else:
|
||||||
saddr: Sockaddr_storage
|
saddr: Sockaddr_storage
|
||||||
slen: SockLen
|
slen: SockLen
|
||||||
sock: AsyncFD
|
sock: AsyncFD
|
||||||
|
proto: Protocol
|
||||||
var retFuture = newFuture[StreamTransport]("transport.connect")
|
var retFuture = newFuture[StreamTransport]("transport.connect")
|
||||||
toSockAddr(address.address, address.port, saddr, slen)
|
address.toSAddr(saddr, slen)
|
||||||
sock = createAsyncSocket(address.address.getDomain(), SockType.SOCK_STREAM,
|
proto = Protocol.IPPROTO_TCP
|
||||||
Protocol.IPPROTO_TCP)
|
if address.family == AddressFamily.Unix:
|
||||||
|
# `Protocol` enum is missing `0` value, so we making here cast, until
|
||||||
|
# `Protocol` enum will not support IPPROTO_IP == 0.
|
||||||
|
proto = cast[Protocol](0)
|
||||||
|
sock = createAsyncSocket(address.getDomain(), SockType.SOCK_STREAM,
|
||||||
|
proto)
|
||||||
if sock == asyncInvalidSocket:
|
if sock == asyncInvalidSocket:
|
||||||
retFuture.fail(getTransportOsError(osLastError()))
|
retFuture.fail(getTransportOsError(osLastError()))
|
||||||
return retFuture
|
return retFuture
|
||||||
|
@ -800,7 +1007,16 @@ proc close*(server: StreamServer) =
|
||||||
GC_unref(server)
|
GC_unref(server)
|
||||||
if server.status == ServerStatus.Stopped:
|
if server.status == ServerStatus.Stopped:
|
||||||
server.status = ServerStatus.Closed
|
server.status = ServerStatus.Closed
|
||||||
server.sock.closeSocket(continuation)
|
when defined(windows):
|
||||||
|
if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||||
|
server.sock.closeSocket(continuation)
|
||||||
|
elif server.local.family in {AddressFamily.Unix}:
|
||||||
|
if NoPipeFlash notin server.flags:
|
||||||
|
discard flushFileBuffers(Handle(server.sock))
|
||||||
|
doAssert disconnectNamedPipe(Handle(server.sock)) == 1
|
||||||
|
closeHandle(server.sock, continuation)
|
||||||
|
else:
|
||||||
|
server.sock.closeSocket(continuation)
|
||||||
|
|
||||||
proc closeWait*(server: StreamServer): Future[void] =
|
proc closeWait*(server: StreamServer): Future[void] =
|
||||||
## Close server ``server`` and release all resources.
|
## Close server ``server`` and release all resources.
|
||||||
|
@ -833,53 +1049,112 @@ proc createStreamServer*(host: TransportAddress,
|
||||||
saddr: Sockaddr_storage
|
saddr: Sockaddr_storage
|
||||||
slen: SockLen
|
slen: SockLen
|
||||||
serverSocket: AsyncFD
|
serverSocket: AsyncFD
|
||||||
if sock == asyncInvalidSocket:
|
|
||||||
serverSocket = createAsyncSocket(host.address.getDomain(),
|
when defined(windows):
|
||||||
SockType.SOCK_STREAM,
|
# Windows
|
||||||
Protocol.IPPROTO_TCP)
|
if host.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||||
if serverSocket == asyncInvalidSocket:
|
if sock == asyncInvalidSocket:
|
||||||
raiseTransportOsError(osLastError())
|
serverSocket = createAsyncSocket(host.getDomain(),
|
||||||
|
SockType.SOCK_STREAM,
|
||||||
|
Protocol.IPPROTO_TCP)
|
||||||
|
if serverSocket == asyncInvalidSocket:
|
||||||
|
raiseTransportOsError(osLastError())
|
||||||
|
else:
|
||||||
|
if not setSocketBlocking(SocketHandle(sock), false):
|
||||||
|
raiseTransportOsError(osLastError())
|
||||||
|
register(sock)
|
||||||
|
serverSocket = sock
|
||||||
|
# SO_REUSEADDR is not useful for Unix domain sockets.
|
||||||
|
if ServerFlags.ReuseAddr in flags:
|
||||||
|
if not setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1):
|
||||||
|
let err = osLastError()
|
||||||
|
if sock == asyncInvalidSocket:
|
||||||
|
serverSocket.closeSocket()
|
||||||
|
raiseTransportOsError(err)
|
||||||
|
# TCP flags are not useful for Unix domain sockets.
|
||||||
|
if ServerFlags.TcpNoDelay in flags:
|
||||||
|
if not setSockOpt(serverSocket, handles.IPPROTO_TCP,
|
||||||
|
handles.TCP_NODELAY, 1):
|
||||||
|
let err = osLastError()
|
||||||
|
if sock == asyncInvalidSocket:
|
||||||
|
serverSocket.closeSocket()
|
||||||
|
raiseTransportOsError(err)
|
||||||
|
host.toSAddr(saddr, slen)
|
||||||
|
if bindAddr(SocketHandle(serverSocket), cast[ptr SockAddr](addr saddr),
|
||||||
|
slen) != 0:
|
||||||
|
let err = osLastError()
|
||||||
|
if sock == asyncInvalidSocket:
|
||||||
|
serverSocket.closeSocket()
|
||||||
|
raiseTransportOsError(err)
|
||||||
|
|
||||||
|
if nativesockets.listen(SocketHandle(serverSocket), cint(backlog)) != 0:
|
||||||
|
let err = osLastError()
|
||||||
|
if sock == asyncInvalidSocket:
|
||||||
|
serverSocket.closeSocket()
|
||||||
|
raiseTransportOsError(err)
|
||||||
|
elif host.family == AddressFamily.Unix:
|
||||||
|
serverSocket = AsyncFD(0)
|
||||||
else:
|
else:
|
||||||
if not setSocketBlocking(SocketHandle(sock), false):
|
# Posix
|
||||||
raiseTransportOsError(osLastError())
|
if sock == asyncInvalidSocket:
|
||||||
register(sock)
|
var proto = Protocol.IPPROTO_TCP
|
||||||
serverSocket = sock
|
if host.family == AddressFamily.Unix:
|
||||||
|
# `Protocol` enum is missing `0` value, so we making here cast, until
|
||||||
|
# `Protocol` enum will not support IPPROTO_IP == 0.
|
||||||
|
proto = cast[Protocol](0)
|
||||||
|
serverSocket = createAsyncSocket(host.getDomain(),
|
||||||
|
SockType.SOCK_STREAM,
|
||||||
|
proto)
|
||||||
|
if serverSocket == asyncInvalidSocket:
|
||||||
|
raiseTransportOsError(osLastError())
|
||||||
|
else:
|
||||||
|
if not setSocketBlocking(SocketHandle(sock), false):
|
||||||
|
raiseTransportOsError(osLastError())
|
||||||
|
register(sock)
|
||||||
|
serverSocket = sock
|
||||||
|
|
||||||
if ServerFlags.ReuseAddr in flags:
|
if host.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||||
if not setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1):
|
# SO_REUSEADDR is not useful for Unix domain sockets.
|
||||||
|
if ServerFlags.ReuseAddr in flags:
|
||||||
|
if not setSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, 1):
|
||||||
|
let err = osLastError()
|
||||||
|
if sock == asyncInvalidSocket:
|
||||||
|
serverSocket.closeSocket()
|
||||||
|
raiseTransportOsError(err)
|
||||||
|
# TCP flags are not useful for Unix domain sockets.
|
||||||
|
if ServerFlags.TcpNoDelay in flags:
|
||||||
|
if not setSockOpt(serverSocket, handles.IPPROTO_TCP,
|
||||||
|
handles.TCP_NODELAY, 1):
|
||||||
|
let err = osLastError()
|
||||||
|
if sock == asyncInvalidSocket:
|
||||||
|
serverSocket.closeSocket()
|
||||||
|
raiseTransportOsError(err)
|
||||||
|
elif host.family in {AddressFamily.Unix}:
|
||||||
|
# We do not care about result here, because if file cannot be removed,
|
||||||
|
# `bindAddr` will return EADDRINUSE.
|
||||||
|
discard posix.unlink(cast[cstring](unsafeAddr host.address_un[0]))
|
||||||
|
|
||||||
|
host.toSAddr(saddr, slen)
|
||||||
|
if bindAddr(SocketHandle(serverSocket), cast[ptr SockAddr](addr saddr),
|
||||||
|
slen) != 0:
|
||||||
let err = osLastError()
|
let err = osLastError()
|
||||||
if sock == asyncInvalidSocket:
|
if sock == asyncInvalidSocket:
|
||||||
serverSocket.closeSocket()
|
serverSocket.closeSocket()
|
||||||
raiseTransportOsError(err)
|
raiseTransportOsError(err)
|
||||||
|
|
||||||
if ServerFlags.TcpNoDelay in flags:
|
if nativesockets.listen(SocketHandle(serverSocket), cint(backlog)) != 0:
|
||||||
if not setSockOpt(serverSocket, handles.IPPROTO_TCP,
|
|
||||||
handles.TCP_NODELAY, 1):
|
|
||||||
let err = osLastError()
|
let err = osLastError()
|
||||||
if sock == asyncInvalidSocket:
|
if sock == asyncInvalidSocket:
|
||||||
serverSocket.closeSocket()
|
serverSocket.closeSocket()
|
||||||
raiseTransportOsError(err)
|
raiseTransportOsError(err)
|
||||||
|
|
||||||
toSockAddr(host.address, host.port, saddr, slen)
|
|
||||||
if bindAddr(SocketHandle(serverSocket), cast[ptr SockAddr](addr saddr),
|
|
||||||
slen) != 0:
|
|
||||||
let err = osLastError()
|
|
||||||
if sock == asyncInvalidSocket:
|
|
||||||
serverSocket.closeSocket()
|
|
||||||
raiseTransportOsError(err)
|
|
||||||
|
|
||||||
if nativesockets.listen(SocketHandle(serverSocket), cint(backlog)) != 0:
|
|
||||||
let err = osLastError()
|
|
||||||
if sock == asyncInvalidSocket:
|
|
||||||
serverSocket.closeSocket()
|
|
||||||
raiseTransportOsError(err)
|
|
||||||
|
|
||||||
if not isNil(child):
|
if not isNil(child):
|
||||||
result = child
|
result = child
|
||||||
else:
|
else:
|
||||||
result = StreamServer()
|
result = StreamServer()
|
||||||
|
|
||||||
result.sock = serverSocket
|
result.sock = serverSocket
|
||||||
|
result.flags = flags
|
||||||
result.function = cbproc
|
result.function = cbproc
|
||||||
result.init = init
|
result.init = init
|
||||||
result.bufferSize = bufferSize
|
result.bufferSize = bufferSize
|
||||||
|
@ -889,10 +1164,17 @@ proc createStreamServer*(host: TransportAddress,
|
||||||
result.local = host
|
result.local = host
|
||||||
|
|
||||||
when defined(windows):
|
when defined(windows):
|
||||||
result.aovl.data = CompletionData(fd: serverSocket, cb: acceptLoop,
|
var cb: CallbackFunc
|
||||||
|
if host.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||||
|
cb = acceptLoop
|
||||||
|
elif host.family == AddressFamily.Unix:
|
||||||
|
cb = acceptPipeLoop
|
||||||
|
|
||||||
|
result.aovl.data = CompletionData(fd: serverSocket, cb: cb,
|
||||||
udata: cast[pointer](result))
|
udata: cast[pointer](result))
|
||||||
result.domain = host.address.getDomain()
|
result.domain = host.getDomain()
|
||||||
result.apending = false
|
result.apending = false
|
||||||
|
|
||||||
GC_ref(result)
|
GC_ref(result)
|
||||||
|
|
||||||
proc createStreamServer*[T](host: TransportAddress,
|
proc createStreamServer*[T](host: TransportAddress,
|
||||||
|
@ -967,6 +1249,9 @@ proc writeFile*(transp: StreamTransport, handle: int,
|
||||||
##
|
##
|
||||||
## You can specify starting ``offset`` in opened file and number of bytes
|
## You can specify starting ``offset`` in opened file and number of bytes
|
||||||
## to transfer from file to transport via ``size``.
|
## to transfer from file to transport via ``size``.
|
||||||
|
when defined(windows):
|
||||||
|
if transp.kind != TransportKind.Socket:
|
||||||
|
raise newException(TransportNoSupport, "writeFile() is not supported!")
|
||||||
var retFuture = newFuture[int]("transport.writeFile")
|
var retFuture = newFuture[int]("transport.writeFile")
|
||||||
transp.checkClosed(retFuture)
|
transp.checkClosed(retFuture)
|
||||||
var vector = StreamVector(kind: DataFile, writer: retFuture,
|
var vector = StreamVector(kind: DataFile, writer: retFuture,
|
||||||
|
@ -1172,7 +1457,7 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
|
||||||
if transp.offset > 0:
|
if transp.offset > 0:
|
||||||
let s = len(result)
|
let s = len(result)
|
||||||
let o = s + transp.offset
|
let o = s + transp.offset
|
||||||
if n == -1:
|
if n < 0:
|
||||||
# grabbing all incoming data, until EOF
|
# grabbing all incoming data, until EOF
|
||||||
result.setLen(o)
|
result.setLen(o)
|
||||||
copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]),
|
copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]),
|
||||||
|
@ -1259,7 +1544,19 @@ proc close*(transp: StreamTransport) =
|
||||||
transp.state.incl({WriteClosed, ReadClosed})
|
transp.state.incl({WriteClosed, ReadClosed})
|
||||||
when defined(windows):
|
when defined(windows):
|
||||||
discard cancelIo(Handle(transp.fd))
|
discard cancelIo(Handle(transp.fd))
|
||||||
closeSocket(transp.fd, continuation)
|
if transp.kind == TransportKind.Pipe:
|
||||||
|
if WinServerPipe in transp.flags:
|
||||||
|
if WinNoPipeFlash notin transp.flags:
|
||||||
|
discard flushFileBuffers(Handle(transp.fd))
|
||||||
|
doAssert disconnectNamedPipe(Handle(transp.fd)) == 1
|
||||||
|
else:
|
||||||
|
if WinNoPipeFlash notin transp.flags:
|
||||||
|
discard flushFileBuffers(Handle(transp.fd))
|
||||||
|
closeHandle(transp.fd, continuation)
|
||||||
|
elif transp.kind == TransportKind.Socket:
|
||||||
|
closeSocket(transp.fd, continuation)
|
||||||
|
else:
|
||||||
|
closeSocket(transp.fd, continuation)
|
||||||
|
|
||||||
proc closeWait*(transp: StreamTransport): Future[void] =
|
proc closeWait*(transp: StreamTransport): Future[void] =
|
||||||
## Close and frees resources of transport ``transp``.
|
## Close and frees resources of transport ``transp``.
|
||||||
|
|
|
@ -115,36 +115,6 @@ proc serveClient4(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
transp.close()
|
transp.close()
|
||||||
await transp.join()
|
await transp.join()
|
||||||
|
|
||||||
proc serveClient5(server: StreamServer, transp: StreamTransport) {.async.} =
|
|
||||||
var data = await transp.read()
|
|
||||||
doAssert(len(data) == len(ConstantMessage) * MessagesCount)
|
|
||||||
transp.close()
|
|
||||||
var expect = ""
|
|
||||||
for i in 0..<MessagesCount:
|
|
||||||
expect.add(ConstantMessage)
|
|
||||||
doAssert(equalMem(addr expect[0], addr data[0], len(data)))
|
|
||||||
var counter = cast[ptr int](server.udata)
|
|
||||||
dec(counter[])
|
|
||||||
if counter[] == 0:
|
|
||||||
server.stop()
|
|
||||||
server.close()
|
|
||||||
await server.join()
|
|
||||||
|
|
||||||
proc serveClient6(server: StreamServer, transp: StreamTransport) {.async.} =
|
|
||||||
var expect = ConstantMessage
|
|
||||||
var skip = await transp.consume(len(ConstantMessage) * (MessagesCount - 1))
|
|
||||||
doAssert(skip == len(ConstantMessage) * (MessagesCount - 1))
|
|
||||||
var data = await transp.read()
|
|
||||||
doAssert(len(data) == len(ConstantMessage))
|
|
||||||
transp.close()
|
|
||||||
doAssert(equalMem(addr data[0], addr expect[0], len(expect)))
|
|
||||||
var counter = cast[ptr int](server.udata)
|
|
||||||
dec(counter[])
|
|
||||||
if counter[] == 0:
|
|
||||||
server.stop()
|
|
||||||
server.close()
|
|
||||||
await server.join()
|
|
||||||
|
|
||||||
proc serveClient7(server: StreamServer, transp: StreamTransport) {.async.} =
|
proc serveClient7(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
var answer = "DONE\r\n"
|
var answer = "DONE\r\n"
|
||||||
var expect = ""
|
var expect = ""
|
||||||
|
@ -157,6 +127,8 @@ proc serveClient7(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
doAssert(res == len(answer))
|
doAssert(res == len(answer))
|
||||||
transp.close()
|
transp.close()
|
||||||
await transp.join()
|
await transp.join()
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
|
||||||
proc serveClient8(server: StreamServer, transp: StreamTransport) {.async.} =
|
proc serveClient8(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
var answer = "DONE\r\n"
|
var answer = "DONE\r\n"
|
||||||
|
@ -176,9 +148,9 @@ proc serveClient8(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
var res = await transp.write(answer)
|
var res = await transp.write(answer)
|
||||||
doAssert(res == len(answer))
|
doAssert(res == len(answer))
|
||||||
transp.close()
|
transp.close()
|
||||||
|
await transp.join()
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
|
||||||
|
|
||||||
proc swarmWorker1(address: TransportAddress): Future[int] {.async.} =
|
proc swarmWorker1(address: TransportAddress): Future[int] {.async.} =
|
||||||
var transp = await connect(address)
|
var transp = await connect(address)
|
||||||
|
@ -274,26 +246,6 @@ proc swarmWorker4(address: TransportAddress): Future[int] {.async.} =
|
||||||
transp.close()
|
transp.close()
|
||||||
await transp.join()
|
await transp.join()
|
||||||
|
|
||||||
proc swarmWorker5(address: TransportAddress): Future[int] {.async.} =
|
|
||||||
var transp = await connect(address)
|
|
||||||
var data = ConstantMessage
|
|
||||||
for i in 0..<MessagesCount:
|
|
||||||
var res = await transp.write(data)
|
|
||||||
result = MessagesCount
|
|
||||||
transp.close()
|
|
||||||
await transp.join()
|
|
||||||
|
|
||||||
proc swarmWorker6(address: TransportAddress): Future[int] {.async.} =
|
|
||||||
var transp = await connect(address)
|
|
||||||
var data = ConstantMessage
|
|
||||||
var seqdata = newSeq[byte](len(data))
|
|
||||||
copyMem(addr seqdata[0], addr data[0], len(data))
|
|
||||||
for i in 0..<MessagesCount:
|
|
||||||
var res = await transp.write(seqdata)
|
|
||||||
result = MessagesCount
|
|
||||||
transp.close()
|
|
||||||
await transp.join()
|
|
||||||
|
|
||||||
proc swarmWorker7(address: TransportAddress): Future[int] {.async.} =
|
proc swarmWorker7(address: TransportAddress): Future[int] {.async.} =
|
||||||
var transp = await connect(address)
|
var transp = await connect(address)
|
||||||
var data = BigMessagePattern
|
var data = BigMessagePattern
|
||||||
|
@ -334,7 +286,6 @@ proc waitAll[T](futs: seq[Future[T]]): Future[void] =
|
||||||
return retFuture
|
return retFuture
|
||||||
|
|
||||||
proc swarmManager1(address: TransportAddress): Future[int] {.async.} =
|
proc swarmManager1(address: TransportAddress): Future[int] {.async.} =
|
||||||
var retFuture = newFuture[void]("swarm.manager.readLine")
|
|
||||||
var workers = newSeq[Future[int]](ClientsCount)
|
var workers = newSeq[Future[int]](ClientsCount)
|
||||||
var count = ClientsCount
|
var count = ClientsCount
|
||||||
for i in 0..<ClientsCount:
|
for i in 0..<ClientsCount:
|
||||||
|
@ -345,7 +296,6 @@ proc swarmManager1(address: TransportAddress): Future[int] {.async.} =
|
||||||
result += res
|
result += res
|
||||||
|
|
||||||
proc swarmManager2(address: TransportAddress): Future[int] {.async.} =
|
proc swarmManager2(address: TransportAddress): Future[int] {.async.} =
|
||||||
var retFuture = newFuture[void]("swarm.manager.readExactly")
|
|
||||||
var workers = newSeq[Future[int]](ClientsCount)
|
var workers = newSeq[Future[int]](ClientsCount)
|
||||||
var count = ClientsCount
|
var count = ClientsCount
|
||||||
for i in 0..<ClientsCount:
|
for i in 0..<ClientsCount:
|
||||||
|
@ -356,7 +306,6 @@ proc swarmManager2(address: TransportAddress): Future[int] {.async.} =
|
||||||
result += res
|
result += res
|
||||||
|
|
||||||
proc swarmManager3(address: TransportAddress): Future[int] {.async.} =
|
proc swarmManager3(address: TransportAddress): Future[int] {.async.} =
|
||||||
var retFuture = newFuture[void]("swarm.manager.readUntil")
|
|
||||||
var workers = newSeq[Future[int]](ClientsCount)
|
var workers = newSeq[Future[int]](ClientsCount)
|
||||||
var count = ClientsCount
|
var count = ClientsCount
|
||||||
for i in 0..<ClientsCount:
|
for i in 0..<ClientsCount:
|
||||||
|
@ -367,7 +316,6 @@ proc swarmManager3(address: TransportAddress): Future[int] {.async.} =
|
||||||
result += res
|
result += res
|
||||||
|
|
||||||
proc swarmManager4(address: TransportAddress): Future[int] {.async.} =
|
proc swarmManager4(address: TransportAddress): Future[int] {.async.} =
|
||||||
var retFuture = newFuture[void]("swarm.manager.writeFile")
|
|
||||||
var workers = newSeq[Future[int]](FilesCount)
|
var workers = newSeq[Future[int]](FilesCount)
|
||||||
var count = FilesCount
|
var count = FilesCount
|
||||||
for i in 0..<FilesCount:
|
for i in 0..<FilesCount:
|
||||||
|
@ -377,161 +325,195 @@ proc swarmManager4(address: TransportAddress): Future[int] {.async.} =
|
||||||
var res = workers[i].read()
|
var res = workers[i].read()
|
||||||
result += res
|
result += res
|
||||||
|
|
||||||
proc swarmManager5(address: TransportAddress): Future[int] {.async.} =
|
proc test1(address: TransportAddress): Future[int] {.async.} =
|
||||||
var retFuture = newFuture[void]("swarm.manager.read")
|
var server = createStreamServer(address, serveClient1, {ReuseAddr})
|
||||||
var workers = newSeq[Future[int]](ClientsCount)
|
|
||||||
var count = ClientsCount
|
|
||||||
for i in 0..<ClientsCount:
|
|
||||||
workers[i] = swarmWorker5(address)
|
|
||||||
await waitAll(workers)
|
|
||||||
for i in 0..<ClientsCount:
|
|
||||||
var res = workers[i].read()
|
|
||||||
result += res
|
|
||||||
|
|
||||||
proc swarmManager6(address: TransportAddress): Future[int] {.async.} =
|
|
||||||
var retFuture = newFuture[void]("swarm.manager.consume")
|
|
||||||
var workers = newSeq[Future[int]](ClientsCount)
|
|
||||||
var count = ClientsCount
|
|
||||||
for i in 0..<ClientsCount:
|
|
||||||
workers[i] = swarmWorker6(address)
|
|
||||||
await waitAll(workers)
|
|
||||||
for i in 0..<ClientsCount:
|
|
||||||
var res = workers[i].read()
|
|
||||||
result += res
|
|
||||||
|
|
||||||
proc test1(): Future[int] {.async.} =
|
|
||||||
var ta = initTAddress("127.0.0.1:31344")
|
|
||||||
var server = createStreamServer(ta, serveClient1, {ReuseAddr})
|
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmManager1(ta)
|
result = await swarmManager1(address)
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
|
||||||
proc test2(): Future[int] {.async.} =
|
proc test2(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31345")
|
|
||||||
var counter = 0
|
var counter = 0
|
||||||
var server = createStreamServer(ta, serveClient2, {ReuseAddr})
|
var server = createStreamServer(address, serveClient2, {ReuseAddr})
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmManager2(ta)
|
result = await swarmManager2(address)
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
|
||||||
proc test3(): Future[int] {.async.} =
|
proc test3(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31346")
|
|
||||||
var counter = 0
|
var counter = 0
|
||||||
var server = createStreamServer(ta, serveClient3, {ReuseAddr})
|
var server = createStreamServer(address, serveClient3, {ReuseAddr})
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmManager3(ta)
|
result = await swarmManager3(address)
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
|
||||||
proc test4(): Future[int] {.async.} =
|
proc testSendFile(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31347")
|
var server = createStreamServer(address, serveClient4, {ReuseAddr})
|
||||||
var server = createStreamServer(ta, serveClient4, {ReuseAddr})
|
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmManager4(ta)
|
result = await swarmManager4(address)
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
|
||||||
proc test5(): Future[int] {.async.} =
|
proc testWR(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31348")
|
|
||||||
var counter = ClientsCount
|
var counter = ClientsCount
|
||||||
var server = createStreamServer(ta, serveClient5, {ReuseAddr},
|
|
||||||
udata = cast[pointer](addr counter))
|
|
||||||
server.start()
|
|
||||||
result = await swarmManager5(ta)
|
|
||||||
|
|
||||||
proc test6(): Future[int] {.async.} =
|
proc swarmWorker(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31349")
|
var transp = await connect(address)
|
||||||
|
var data = ConstantMessage
|
||||||
|
for i in 0..<MessagesCount:
|
||||||
|
var res = await transp.write(data)
|
||||||
|
result = MessagesCount
|
||||||
|
transp.close()
|
||||||
|
await transp.join()
|
||||||
|
|
||||||
|
proc swarmManager(address: TransportAddress): Future[int] {.async.} =
|
||||||
|
var workers = newSeq[Future[int]](ClientsCount)
|
||||||
|
var count = ClientsCount
|
||||||
|
for i in 0..<ClientsCount:
|
||||||
|
workers[i] = swarmWorker(address)
|
||||||
|
await waitAll(workers)
|
||||||
|
for i in 0..<ClientsCount:
|
||||||
|
var res = workers[i].read()
|
||||||
|
result += res
|
||||||
|
|
||||||
|
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
|
var data = await transp.read()
|
||||||
|
doAssert(len(data) == len(ConstantMessage) * MessagesCount)
|
||||||
|
transp.close()
|
||||||
|
var expect = ""
|
||||||
|
for i in 0..<MessagesCount:
|
||||||
|
expect.add(ConstantMessage)
|
||||||
|
doAssert(equalMem(addr expect[0], addr data[0], len(data)))
|
||||||
|
dec(counter)
|
||||||
|
if counter == 0:
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
var server = createStreamServer(address, serveClient, {ReuseAddr})
|
||||||
|
server.start()
|
||||||
|
result = await swarmManager(address)
|
||||||
|
await server.join()
|
||||||
|
|
||||||
|
proc testWCR(address: TransportAddress): Future[int] {.async.} =
|
||||||
var counter = ClientsCount
|
var counter = ClientsCount
|
||||||
var server = createStreamServer(ta, serveClient6, {ReuseAddr},
|
|
||||||
udata = cast[pointer](addr counter))
|
|
||||||
server.start()
|
|
||||||
result = await swarmManager6(ta)
|
|
||||||
|
|
||||||
proc test7(): Future[int] {.async.} =
|
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31350")
|
var expect = ConstantMessage
|
||||||
var server = createStreamServer(ta, serveClient7, {ReuseAddr})
|
var skip = await transp.consume(len(ConstantMessage) * (MessagesCount - 1))
|
||||||
|
doAssert(skip == len(ConstantMessage) * (MessagesCount - 1))
|
||||||
|
var data = await transp.read()
|
||||||
|
doAssert(len(data) == len(ConstantMessage))
|
||||||
|
transp.close()
|
||||||
|
doAssert(equalMem(addr data[0], addr expect[0], len(expect)))
|
||||||
|
dec(counter)
|
||||||
|
if counter == 0:
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
proc swarmWorker(address: TransportAddress): Future[int] {.async.} =
|
||||||
|
var transp = await connect(address)
|
||||||
|
var data = ConstantMessage
|
||||||
|
var seqdata = newSeq[byte](len(data))
|
||||||
|
copyMem(addr seqdata[0], addr data[0], len(data))
|
||||||
|
for i in 0..<MessagesCount:
|
||||||
|
var res = await transp.write(seqdata)
|
||||||
|
result = MessagesCount
|
||||||
|
transp.close()
|
||||||
|
await transp.join()
|
||||||
|
|
||||||
|
proc swarmManager(address: TransportAddress): Future[int] {.async.} =
|
||||||
|
var workers = newSeq[Future[int]](ClientsCount)
|
||||||
|
for i in 0..<ClientsCount:
|
||||||
|
workers[i] = swarmWorker(address)
|
||||||
|
await waitAll(workers)
|
||||||
|
for i in 0..<ClientsCount:
|
||||||
|
var res = workers[i].read()
|
||||||
|
result += res
|
||||||
|
|
||||||
|
var server = createStreamServer(address, serveClient, {ReuseAddr})
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmWorker7(ta)
|
result = await swarmManager(address)
|
||||||
|
await server.join()
|
||||||
|
|
||||||
|
proc test7(address: TransportAddress): Future[int] {.async.} =
|
||||||
|
var server = createStreamServer(address, serveClient7, {ReuseAddr})
|
||||||
|
server.start()
|
||||||
|
result = await swarmWorker7(address)
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
|
||||||
proc test8(): Future[int] {.async.} =
|
proc test8(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31350")
|
var server = createStreamServer(address, serveClient8, {ReuseAddr})
|
||||||
var server = createStreamServer(ta, serveClient8, {ReuseAddr})
|
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmWorker8(ta)
|
result = await swarmWorker8(address)
|
||||||
server.stop()
|
|
||||||
server.close()
|
|
||||||
await server.join()
|
await server.join()
|
||||||
|
|
||||||
proc serveClient9(server: StreamServer, transp: StreamTransport) {.async.} =
|
# proc serveClient9(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
var expect = ""
|
# var expect = ""
|
||||||
for i in 0..<BigMessageCount:
|
# for i in 0..<BigMessageCount:
|
||||||
expect.add(BigMessagePattern)
|
# expect.add(BigMessagePattern)
|
||||||
var res = await transp.write(expect)
|
# var res = await transp.write(expect)
|
||||||
doAssert(res == len(expect))
|
# doAssert(res == len(expect))
|
||||||
transp.close()
|
# transp.close()
|
||||||
await transp.join()
|
# await transp.join()
|
||||||
|
|
||||||
proc swarmWorker9(address: TransportAddress): Future[int] {.async.} =
|
# proc swarmWorker9(address: TransportAddress): Future[int] {.async.} =
|
||||||
var transp = await connect(address)
|
# var transp = await connect(address)
|
||||||
var expect = ""
|
# var expect = ""
|
||||||
for i in 0..<BigMessageCount:
|
# for i in 0..<BigMessageCount:
|
||||||
expect.add(BigMessagePattern)
|
# expect.add(BigMessagePattern)
|
||||||
var line = await transp.readLine()
|
# var line = await transp.readLine()
|
||||||
if line == expect:
|
# if line == expect:
|
||||||
result = 1
|
# result = 1
|
||||||
else:
|
# else:
|
||||||
result = 0
|
# result = 0
|
||||||
transp.close()
|
# transp.close()
|
||||||
await transp.join()
|
# await transp.join()
|
||||||
|
|
||||||
proc test9(): Future[int] {.async.} =
|
# proc test9(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31351")
|
# let flags = {ReuseAddr, NoPipeFlash}
|
||||||
var server = createStreamServer(ta, serveClient9, {ReuseAddr})
|
# var server = createStreamServer(address, serveClient9, flags)
|
||||||
server.start()
|
# server.start()
|
||||||
result = await swarmWorker9(ta)
|
# result = await swarmWorker9(address)
|
||||||
server.stop()
|
# server.stop()
|
||||||
server.close()
|
# server.close()
|
||||||
await server.join()
|
# await server.join()
|
||||||
|
|
||||||
proc serveClient10(server: StreamServer, transp: StreamTransport) {.async.} =
|
# proc serveClient10(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
var expect = ""
|
# var expect = ""
|
||||||
for i in 0..<BigMessageCount:
|
# for i in 0..<BigMessageCount:
|
||||||
expect.add(BigMessagePattern)
|
# expect.add(BigMessagePattern)
|
||||||
var res = await transp.write(expect)
|
# var res = await transp.write(expect)
|
||||||
doAssert(res == len(expect))
|
# doAssert(res == len(expect))
|
||||||
transp.close()
|
# transp.close()
|
||||||
await transp.join()
|
# await transp.join()
|
||||||
|
|
||||||
proc swarmWorker10(address: TransportAddress): Future[int] {.async.} =
|
# proc swarmWorker10(address: TransportAddress): Future[int] {.async.} =
|
||||||
var transp = await connect(address)
|
# var transp = await connect(address)
|
||||||
var expect = ""
|
# var expect = ""
|
||||||
for i in 0..<BigMessageCount:
|
# for i in 0..<BigMessageCount:
|
||||||
expect.add(BigMessagePattern)
|
# expect.add(BigMessagePattern)
|
||||||
var line = await transp.read()
|
# var line = await transp.read()
|
||||||
if equalMem(addr line[0], addr expect[0], len(expect)):
|
# if equalMem(addr line[0], addr expect[0], len(expect)):
|
||||||
result = 1
|
# result = 1
|
||||||
else:
|
# else:
|
||||||
result = 0
|
# result = 0
|
||||||
transp.close()
|
# transp.close()
|
||||||
await transp.join()
|
# await transp.join()
|
||||||
|
|
||||||
proc test10(): Future[int] {.async.} =
|
# proc test10(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31351")
|
# var server = createStreamServer(address, serveClient10, {ReuseAddr})
|
||||||
var server = createStreamServer(ta, serveClient10, {ReuseAddr})
|
# server.start()
|
||||||
server.start()
|
# result = await swarmWorker10(address)
|
||||||
result = await swarmWorker10(ta)
|
# server.stop()
|
||||||
server.stop()
|
# server.close()
|
||||||
server.close()
|
# await server.join()
|
||||||
await server.join()
|
|
||||||
|
|
||||||
proc serveClient11(server: StreamServer, transp: StreamTransport) {.async.} =
|
proc serveClient11(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
var res = await transp.write(BigMessagePattern)
|
var res = await transp.write(BigMessagePattern)
|
||||||
|
@ -549,11 +531,10 @@ proc swarmWorker11(address: TransportAddress): Future[int] {.async.} =
|
||||||
transp.close()
|
transp.close()
|
||||||
await transp.join()
|
await transp.join()
|
||||||
|
|
||||||
proc test11(): Future[int] {.async.} =
|
proc test11(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31352")
|
var server = createStreamServer(address, serveClient11, {ReuseAddr})
|
||||||
var server = createStreamServer(ta, serveClient11, {ReuseAddr})
|
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmWorker11(ta)
|
result = await swarmWorker11(address)
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
@ -575,11 +556,10 @@ proc swarmWorker12(address: TransportAddress): Future[int] {.async.} =
|
||||||
transp.close()
|
transp.close()
|
||||||
await transp.join()
|
await transp.join()
|
||||||
|
|
||||||
proc test12(): Future[int] {.async.} =
|
proc test12(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31353")
|
var server = createStreamServer(address, serveClient12, {ReuseAddr})
|
||||||
var server = createStreamServer(ta, serveClient12, {ReuseAddr})
|
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmWorker12(ta)
|
result = await swarmWorker12(address)
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
@ -598,11 +578,10 @@ proc swarmWorker13(address: TransportAddress): Future[int] {.async.} =
|
||||||
transp.close()
|
transp.close()
|
||||||
await transp.join()
|
await transp.join()
|
||||||
|
|
||||||
proc test13(): Future[int] {.async.} =
|
proc test13(address: TransportAddress): Future[int] {.async.} =
|
||||||
var ta = initTAddress("127.0.0.1:31354")
|
var server = createStreamServer(address, serveClient13, {ReuseAddr})
|
||||||
var server = createStreamServer(ta, serveClient13, {ReuseAddr})
|
|
||||||
server.start()
|
server.start()
|
||||||
result = await swarmWorker13(ta)
|
result = await swarmWorker13(address)
|
||||||
server.stop()
|
server.stop()
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
@ -610,10 +589,9 @@ proc test13(): Future[int] {.async.} =
|
||||||
proc serveClient14(server: StreamServer, transp: StreamTransport) {.async.} =
|
proc serveClient14(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc test14(): Future[int] {.async.} =
|
proc test14(address: TransportAddress): Future[int] {.async.} =
|
||||||
var subres = 0
|
var subres = 0
|
||||||
var ta = initTAddress("127.0.0.1:31354")
|
var server = createStreamServer(address, serveClient13, {ReuseAddr})
|
||||||
var server = createStreamServer(ta, serveClient13, {ReuseAddr})
|
|
||||||
|
|
||||||
proc swarmWorker(transp: StreamTransport): Future[void] {.async.} =
|
proc swarmWorker(transp: StreamTransport): Future[void] {.async.} =
|
||||||
var line = await transp.readLine()
|
var line = await transp.readLine()
|
||||||
|
@ -623,7 +601,7 @@ proc test14(): Future[int] {.async.} =
|
||||||
subres = 0
|
subres = 0
|
||||||
|
|
||||||
server.start()
|
server.start()
|
||||||
var transp = await connect(ta)
|
var transp = await connect(address)
|
||||||
var fut = swarmWorker(transp)
|
var fut = swarmWorker(transp)
|
||||||
transp.close()
|
transp.close()
|
||||||
await fut
|
await fut
|
||||||
|
@ -632,12 +610,15 @@ proc test14(): Future[int] {.async.} =
|
||||||
await server.join()
|
await server.join()
|
||||||
result = subres
|
result = subres
|
||||||
|
|
||||||
proc testConnectionRefused(): Future[bool] {.async.} =
|
proc testConnectionRefused(address: TransportAddress): Future[bool] {.async.} =
|
||||||
try:
|
try:
|
||||||
var transp = await connect(initTAddress("127.0.0.1:1"))
|
var transp = await connect(address)
|
||||||
except TransportOsError as e:
|
except TransportOsError as e:
|
||||||
when defined(windows):
|
when defined(windows):
|
||||||
result = (int(e.code) == ERROR_CONNECTION_REFUSED)
|
if address.family == AddressFamily.Unix:
|
||||||
|
result = (int(e.code) == ERROR_FILE_NOT_FOUND)
|
||||||
|
else:
|
||||||
|
result = (int(e.code) == ERROR_CONNECTION_REFUSED)
|
||||||
else:
|
else:
|
||||||
result = (int(e.code) == ECONNREFUSED)
|
result = (int(e.code) == ECONNREFUSED)
|
||||||
|
|
||||||
|
@ -656,41 +637,60 @@ when isMainModule:
|
||||||
$ClientsCount & " clients x " & $MessagesCount & " messages)"
|
$ClientsCount & " clients x " & $MessagesCount & " messages)"
|
||||||
m7 = "readLine() buffer overflow test"
|
m7 = "readLine() buffer overflow test"
|
||||||
m8 = "readUntil() buffer overflow test"
|
m8 = "readUntil() buffer overflow test"
|
||||||
m9 = "readLine() unexpected disconnect test"
|
|
||||||
m10 = "read() unexpected disconnect test"
|
|
||||||
m11 = "readExactly() unexpected disconnect test"
|
m11 = "readExactly() unexpected disconnect test"
|
||||||
m12 = "readUntil() unexpected disconnect test"
|
m12 = "readUntil() unexpected disconnect test"
|
||||||
m13 = "readLine() unexpected disconnect empty string test"
|
m13 = "readLine() unexpected disconnect empty string test"
|
||||||
m14 = "Closing socket while operation pending test (issue #8)"
|
m14 = "Closing socket while operation pending test (issue #8)"
|
||||||
m15 = "Connection refused test"
|
m15 = "Connection refused test"
|
||||||
|
|
||||||
|
when defined(windows):
|
||||||
|
var addresses = [
|
||||||
|
initTAddress("127.0.0.1:33335"),
|
||||||
|
initTAddress(r"/LOCAL\testpipe")
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
var addresses = [
|
||||||
|
initTAddress("127.0.0.1:33335"),
|
||||||
|
initTAddress(r"/tmp/testpipe")
|
||||||
|
]
|
||||||
|
var prefixes = ["[IP] ", "[UNIX] "]
|
||||||
suite "Stream Transport test suite":
|
suite "Stream Transport test suite":
|
||||||
test m8:
|
for i in 0..<len(addresses):
|
||||||
check waitFor(test8()) == 1
|
test prefixes[i] & m8:
|
||||||
test m7:
|
check waitFor(test8(addresses[i])) == 1
|
||||||
check waitFor(test7()) == 1
|
test prefixes[i] & m7:
|
||||||
test m9:
|
check waitFor(test7(addresses[i])) == 1
|
||||||
check waitFor(test9()) == 1
|
test prefixes[i] & m11:
|
||||||
test m10:
|
check waitFor(test11(addresses[i])) == 1
|
||||||
check waitFor(test10()) == 1
|
test prefixes[i] & m12:
|
||||||
test m11:
|
check waitFor(test12(addresses[i])) == 1
|
||||||
check waitFor(test11()) == 1
|
test prefixes[i] & m13:
|
||||||
test m12:
|
check waitFor(test13(addresses[i])) == 1
|
||||||
check waitFor(test12()) == 1
|
test prefixes[i] & m14:
|
||||||
test m13:
|
check waitFor(test14(addresses[i])) == 1
|
||||||
check waitFor(test13()) == 1
|
test prefixes[i] & m1:
|
||||||
test m14:
|
check waitFor(test1(addresses[i])) == ClientsCount * MessagesCount
|
||||||
check waitFor(test14()) == 1
|
test prefixes[i] & m2:
|
||||||
test m1:
|
check waitFor(test2(addresses[i])) == ClientsCount * MessagesCount
|
||||||
check waitFor(test1()) == ClientsCount * MessagesCount
|
test prefixes[i] & m3:
|
||||||
test m2:
|
check waitFor(test3(addresses[i])) == ClientsCount * MessagesCount
|
||||||
check waitFor(test2()) == ClientsCount * MessagesCount
|
test prefixes[i] & m5:
|
||||||
test m3:
|
check waitFor(testWR(addresses[i])) == ClientsCount * MessagesCount
|
||||||
check waitFor(test3()) == ClientsCount * MessagesCount
|
test prefixes[i] & m6:
|
||||||
test m5:
|
check waitFor(testWCR(addresses[i])) == ClientsCount * MessagesCount
|
||||||
check waitFor(test5()) == ClientsCount * MessagesCount
|
test prefixes[i] & m4:
|
||||||
test m6:
|
when defined(windows):
|
||||||
check waitFor(test6()) == ClientsCount * MessagesCount
|
if addresses[i].family == AddressFamily.IPv4:
|
||||||
test m4:
|
check waitFor(testSendFile(addresses[i])) == FilesCount
|
||||||
check waitFor(test4()) == FilesCount
|
else:
|
||||||
test m15:
|
discard
|
||||||
check waitFor(testConnectionRefused()) == true
|
else:
|
||||||
|
test prefixes[i] & m4:
|
||||||
|
check waitFor(testSendFile(addresses[i])) == FilesCount
|
||||||
|
test prefixes[i] & m15:
|
||||||
|
var address: TransportAddress
|
||||||
|
if addresses[i].family == AddressFamily.Unix:
|
||||||
|
address = initTAddress("/tmp/notexistingtestpipe")
|
||||||
|
else:
|
||||||
|
address = initTAddress("127.0.0.1:43335")
|
||||||
|
check waitFor(testConnectionRefused(address)) == true
|
||||||
|
|
Loading…
Reference in New Issue